You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2017/02/03 16:38:14 UTC
[15/22] hive git commit: HIVE-14007. Replace hive-orc module with ORC
1.3.1
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
deleted file mode 100644
index 9433e54..0000000
--- a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ /dev/null
@@ -1,1230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.ColumnStatistics;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.DataReader;
-import org.apache.orc.DateColumnStatistics;
-import org.apache.orc.DecimalColumnStatistics;
-import org.apache.orc.DoubleColumnStatistics;
-import org.apache.orc.IntegerColumnStatistics;
-import org.apache.orc.OrcConf;
-import org.apache.orc.StringColumnStatistics;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TimestampColumnStatistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
-import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.BloomFilterIO;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.ql.util.TimestampUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.orc.OrcProto;
-
-public class RecordReaderImpl implements RecordReader {
- static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
- private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
- private static final Object UNKNOWN_VALUE = new Object();
- protected final Path path;
- private final long firstRow;
- private final List<StripeInformation> stripes =
- new ArrayList<StripeInformation>();
- private OrcProto.StripeFooter stripeFooter;
- private final long totalRowCount;
- private final CompressionCodec codec;
- protected final TypeDescription schema;
- private final List<OrcProto.Type> types;
- private final int bufferSize;
- private final SchemaEvolution evolution;
- // the file included columns indexed by the file's column ids.
- private final boolean[] included;
- private final long rowIndexStride;
- private long rowInStripe = 0;
- private int currentStripe = -1;
- private long rowBaseInStripe = 0;
- private long rowCountInStripe = 0;
- private final Map<StreamName, InStream> streams =
- new HashMap<StreamName, InStream>();
- DiskRangeList bufferChunks = null;
- private final TreeReaderFactory.TreeReader reader;
- private final OrcProto.RowIndex[] indexes;
- private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
- private final SargApplier sargApp;
- // an array about which row groups aren't skipped
- private boolean[] includedRowGroups = null;
- private final DataReader dataReader;
-
- /**
- * Given a list of column names, find the given column and return the index.
- *
- * @param evolution the mapping from reader to file schema
- * @param columnName the column name to look for
- * @return the file column id or -1 if the column wasn't found
- */
- static int findColumns(SchemaEvolution evolution,
- String columnName) {
- TypeDescription readerSchema = evolution.getReaderBaseSchema();
- List<String> fieldNames = readerSchema.getFieldNames();
- List<TypeDescription> children = readerSchema.getChildren();
- for (int i = 0; i < fieldNames.size(); ++i) {
- if (columnName.equals(fieldNames.get(i))) {
- TypeDescription result = evolution.getFileType(children.get(i).getId());
- return result == null ? -1 : result.getId();
- }
- }
- return -1;
- }
-
- /**
- * Find the mapping from predicate leaves to columns.
- * @param sargLeaves the search argument that we need to map
- * @param evolution the mapping from reader to file schema
- * @return an array mapping the sarg leaves to file column ids
- */
- public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves,
- SchemaEvolution evolution) {
- int[] result = new int[sargLeaves.size()];
- Arrays.fill(result, -1);
- for(int i=0; i < result.length; ++i) {
- String colName = sargLeaves.get(i).getColumnName();
- result[i] = findColumns(evolution, colName);
- }
- return result;
- }
-
- protected RecordReaderImpl(ReaderImpl fileReader,
- Reader.Options options) throws IOException {
- boolean[] readerIncluded = options.getInclude();
- if (options.getSchema() == null) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Reader schema not provided -- using file schema " +
- fileReader.getSchema());
- }
- evolution = new SchemaEvolution(fileReader.getSchema(), readerIncluded);
- } else {
-
- // Now that we are creating a record reader for a file, validate that the schema to read
- // is compatible with the file schema.
- //
- evolution = new SchemaEvolution(fileReader.getSchema(),
- options.getSchema(), readerIncluded);
- if (LOG.isDebugEnabled() && evolution.hasConversion()) {
- LOG.debug("ORC file " + fileReader.path.toString() +
- " has data type conversion --\n" +
- "reader schema: " + options.getSchema().toString() + "\n" +
- "file schema: " + fileReader.getSchema());
- }
- }
- this.schema = evolution.getReaderSchema();
- this.path = fileReader.path;
- this.codec = fileReader.codec;
- this.types = fileReader.types;
- this.bufferSize = fileReader.bufferSize;
- this.rowIndexStride = fileReader.rowIndexStride;
- SearchArgument sarg = options.getSearchArgument();
- if (sarg != null && rowIndexStride != 0) {
- sargApp = new SargApplier(sarg, options.getColumnNames(), rowIndexStride,
- evolution);
- } else {
- sargApp = null;
- }
- long rows = 0;
- long skippedRows = 0;
- long offset = options.getOffset();
- long maxOffset = options.getMaxOffset();
- for(StripeInformation stripe: fileReader.getStripes()) {
- long stripeStart = stripe.getOffset();
- if (offset > stripeStart) {
- skippedRows += stripe.getNumberOfRows();
- } else if (stripeStart < maxOffset) {
- this.stripes.add(stripe);
- rows += stripe.getNumberOfRows();
- }
- }
-
- Boolean zeroCopy = options.getUseZeroCopy();
- if (zeroCopy == null) {
- zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
- }
- if (options.getDataReader() != null) {
- this.dataReader = options.getDataReader();
- } else {
- this.dataReader = RecordReaderUtils.createDefaultDataReader(
- DataReaderProperties.builder()
- .withBufferSize(bufferSize)
- .withCompression(fileReader.compressionKind)
- .withFileSystem(fileReader.fileSystem)
- .withPath(fileReader.path)
- .withTypeCount(types.size())
- .withZeroCopy(zeroCopy)
- .build());
- }
- this.dataReader.open();
-
- firstRow = skippedRows;
- totalRowCount = rows;
- Boolean skipCorrupt = options.getSkipCorruptRecords();
- if (skipCorrupt == null) {
- skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
- }
-
- reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
- evolution, readerIncluded, skipCorrupt);
- indexes = new OrcProto.RowIndex[types.size()];
- bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
- this.included = evolution.getFileIncluded();
- advanceToNextRow(reader, 0L, true);
- }
-
- public static final class PositionProviderImpl implements PositionProvider {
- private final OrcProto.RowIndexEntry entry;
- private int index;
-
- public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
- this(entry, 0);
- }
-
- public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
- this.entry = entry;
- this.index = startPos;
- }
-
- @Override
- public long getNext() {
- return entry.getPositions(index++);
- }
-
- @Override
- public String toString() {
- return "{" + entry.getPositionsList() + "; " + index + "}";
- }
- }
-
- public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
- ) throws IOException {
- return dataReader.readStripeFooter(stripe);
- }
-
- enum Location {
- BEFORE, MIN, MIDDLE, MAX, AFTER
- }
-
- /**
- * Given a point and min and max, determine if the point is before, at the
- * min, in the middle, at the max, or after the range.
- * @param point the point to test
- * @param min the minimum point
- * @param max the maximum point
- * @param <T> the type of the comparision
- * @return the location of the point
- */
- static <T> Location compareToRange(Comparable<T> point, T min, T max) {
- int minCompare = point.compareTo(min);
- if (minCompare < 0) {
- return Location.BEFORE;
- } else if (minCompare == 0) {
- return Location.MIN;
- }
- int maxCompare = point.compareTo(max);
- if (maxCompare > 0) {
- return Location.AFTER;
- } else if (maxCompare == 0) {
- return Location.MAX;
- }
- return Location.MIDDLE;
- }
-
- /**
- * Get the maximum value out of an index entry.
- * @param index
- * the index entry
- * @return the object for the maximum value or null if there isn't one
- */
- static Object getMax(ColumnStatistics index) {
- if (index instanceof IntegerColumnStatistics) {
- return ((IntegerColumnStatistics) index).getMaximum();
- } else if (index instanceof DoubleColumnStatistics) {
- return ((DoubleColumnStatistics) index).getMaximum();
- } else if (index instanceof StringColumnStatistics) {
- return ((StringColumnStatistics) index).getMaximum();
- } else if (index instanceof DateColumnStatistics) {
- return ((DateColumnStatistics) index).getMaximum();
- } else if (index instanceof DecimalColumnStatistics) {
- return ((DecimalColumnStatistics) index).getMaximum();
- } else if (index instanceof TimestampColumnStatistics) {
- return ((TimestampColumnStatistics) index).getMaximum();
- } else if (index instanceof BooleanColumnStatistics) {
- if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
- return Boolean.TRUE;
- } else {
- return Boolean.FALSE;
- }
- } else {
- return null;
- }
- }
-
- /**
- * Get the minimum value out of an index entry.
- * @param index
- * the index entry
- * @return the object for the minimum value or null if there isn't one
- */
- static Object getMin(ColumnStatistics index) {
- if (index instanceof IntegerColumnStatistics) {
- return ((IntegerColumnStatistics) index).getMinimum();
- } else if (index instanceof DoubleColumnStatistics) {
- return ((DoubleColumnStatistics) index).getMinimum();
- } else if (index instanceof StringColumnStatistics) {
- return ((StringColumnStatistics) index).getMinimum();
- } else if (index instanceof DateColumnStatistics) {
- return ((DateColumnStatistics) index).getMinimum();
- } else if (index instanceof DecimalColumnStatistics) {
- return ((DecimalColumnStatistics) index).getMinimum();
- } else if (index instanceof TimestampColumnStatistics) {
- return ((TimestampColumnStatistics) index).getMinimum();
- } else if (index instanceof BooleanColumnStatistics) {
- if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
- return Boolean.FALSE;
- } else {
- return Boolean.TRUE;
- }
- } else {
- return UNKNOWN_VALUE; // null is not safe here
- }
- }
-
- /**
- * Evaluate a predicate with respect to the statistics from the column
- * that is referenced in the predicate.
- * @param statsProto the statistics for the column mentioned in the predicate
- * @param predicate the leaf predicate we need to evaluation
- * @param bloomFilter
- * @return the set of truth values that may be returned for the given
- * predicate.
- */
- static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
- PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
- ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
- Object minValue = getMin(cs);
- Object maxValue = getMax(cs);
- BloomFilterIO bf = null;
- if (bloomFilter != null) {
- bf = new BloomFilterIO(bloomFilter);
- }
- return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
- }
-
- /**
- * Evaluate a predicate with respect to the statistics from the column
- * that is referenced in the predicate.
- * @param stats the statistics for the column mentioned in the predicate
- * @param predicate the leaf predicate we need to evaluation
- * @return the set of truth values that may be returned for the given
- * predicate.
- */
- public static TruthValue evaluatePredicate(ColumnStatistics stats,
- PredicateLeaf predicate,
- BloomFilterIO bloomFilter) {
- Object minValue = getMin(stats);
- Object maxValue = getMax(stats);
- return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
- }
-
- static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
- Object max, boolean hasNull, BloomFilterIO bloomFilter) {
- // if we didn't have any values, everything must have been null
- if (min == null) {
- if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
- return TruthValue.YES;
- } else {
- return TruthValue.NULL;
- }
- } else if (min == UNKNOWN_VALUE) {
- return TruthValue.YES_NO_NULL;
- }
-
- TruthValue result;
- Object baseObj = predicate.getLiteral();
- try {
- // Predicate object and stats objects are converted to the type of the predicate object.
- Object minValue = getBaseObjectForComparison(predicate.getType(), min);
- Object maxValue = getBaseObjectForComparison(predicate.getType(), max);
- Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj);
-
- result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
- if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
- result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
- }
- // in case failed conversion, return the default YES_NO_NULL truth value
- } catch (Exception e) {
- if (LOG.isWarnEnabled()) {
- final String statsType = min == null ?
- (max == null ? "null" : max.getClass().getSimpleName()) :
- min.getClass().getSimpleName();
- final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName();
- final String reason = e.getClass().getSimpleName() + " when evaluating predicate." +
- " Skipping ORC PPD." +
- " Exception: " + e.getMessage() +
- " StatsType: " + statsType +
- " PredicateType: " + predicateType;
- LOG.warn(reason);
- LOG.debug(reason, e);
- }
- if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) {
- result = TruthValue.YES_NO;
- } else {
- result = TruthValue.YES_NO_NULL;
- }
- }
- return result;
- }
-
- private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
- TruthValue result, BloomFilterIO bloomFilter) {
- // evaluate bloom filter only when
- // 1) Bloom filter is available
- // 2) Min/Max evaluation yield YES or MAYBE
- // 3) Predicate is EQUALS or IN list
- if (bloomFilter != null
- && result != TruthValue.NO_NULL && result != TruthValue.NO
- && (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
- || predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- || predicate.getOperator().equals(PredicateLeaf.Operator.IN))) {
- return true;
- }
- return false;
- }
-
- private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
- Object minValue,
- Object maxValue,
- boolean hasNull) {
- Location loc;
-
- switch (predicate.getOperator()) {
- case NULL_SAFE_EQUALS:
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.BEFORE || loc == Location.AFTER) {
- return TruthValue.NO;
- } else {
- return TruthValue.YES_NO;
- }
- case EQUALS:
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (minValue.equals(maxValue) && loc == Location.MIN) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- } else if (loc == Location.BEFORE || loc == Location.AFTER) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- case LESS_THAN:
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.AFTER) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- } else if (loc == Location.BEFORE || loc == Location.MIN) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- case LESS_THAN_EQUALS:
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.AFTER || loc == Location.MAX) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- } else if (loc == Location.BEFORE) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- case IN:
- if (minValue.equals(maxValue)) {
- // for a single value, look through to see if that value is in the
- // set
- for (Object arg : predicate.getLiteralList()) {
- predObj = getBaseObjectForComparison(predicate.getType(), arg);
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.MIN) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- }
- }
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- // are all of the values outside of the range?
- for (Object arg : predicate.getLiteralList()) {
- predObj = getBaseObjectForComparison(predicate.getType(), arg);
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.MIN || loc == Location.MIDDLE ||
- loc == Location.MAX) {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- }
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- }
- case BETWEEN:
- List<Object> args = predicate.getLiteralList();
- Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0));
-
- loc = compareToRange((Comparable) predObj1, minValue, maxValue);
- if (loc == Location.BEFORE || loc == Location.MIN) {
- Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1));
-
- Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue);
- if (loc2 == Location.AFTER || loc2 == Location.MAX) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- } else if (loc2 == Location.BEFORE) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- } else if (loc == Location.AFTER) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- case IS_NULL:
- // min = null condition above handles the all-nulls YES case
- return hasNull ? TruthValue.YES_NO : TruthValue.NO;
- default:
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- }
-
- private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
- final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
- switch (predicate.getOperator()) {
- case NULL_SAFE_EQUALS:
- // null safe equals does not return *_NULL variant. So set hasNull to false
- return checkInBloomFilter(bloomFilter, predObj, false);
- case EQUALS:
- return checkInBloomFilter(bloomFilter, predObj, hasNull);
- case IN:
- for (Object arg : predicate.getLiteralList()) {
- // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
- Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg);
- TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull);
- if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
- return result;
- }
- }
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- default:
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- }
-
- private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) {
- TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-
- if (predObj instanceof Long) {
- if (bf.testLong(((Long) predObj).longValue())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof Double) {
- if (bf.testDouble(((Double) predObj).doubleValue())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof String || predObj instanceof Text ||
- predObj instanceof HiveDecimalWritable ||
- predObj instanceof BigDecimal) {
- if (bf.testString(predObj.toString())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof Timestamp) {
- if (bf.testLong(((Timestamp) predObj).getTime())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof Date) {
- if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
- result = TruthValue.YES_NO_NULL;
- }
- } else {
- // if the predicate object is null and if hasNull says there are no nulls then return NO
- if (predObj == null && !hasNull) {
- result = TruthValue.NO;
- } else {
- result = TruthValue.YES_NO_NULL;
- }
- }
-
- if (result == TruthValue.YES_NO_NULL && !hasNull) {
- result = TruthValue.YES_NO;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Bloom filter evaluation: " + result.toString());
- }
-
- return result;
- }
-
- private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) {
- if (obj == null) {
- return null;
- }
- switch (type) {
- case BOOLEAN:
- if (obj instanceof Boolean) {
- return obj;
- } else {
- // will only be true if the string conversion yields "true", all other values are
- // considered false
- return Boolean.valueOf(obj.toString());
- }
- case DATE:
- if (obj instanceof Date) {
- return obj;
- } else if (obj instanceof String) {
- return Date.valueOf((String) obj);
- } else if (obj instanceof Timestamp) {
- return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L);
- }
- // always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?)
- break;
- case DECIMAL:
- if (obj instanceof Boolean) {
- return new HiveDecimalWritable(((Boolean) obj).booleanValue() ?
- HiveDecimal.ONE : HiveDecimal.ZERO);
- } else if (obj instanceof Integer) {
- return new HiveDecimalWritable(((Integer) obj).intValue());
- } else if (obj instanceof Long) {
- return new HiveDecimalWritable(((Long) obj));
- } else if (obj instanceof Float || obj instanceof Double ||
- obj instanceof String) {
- return new HiveDecimalWritable(obj.toString());
- } else if (obj instanceof BigDecimal) {
- return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
- } else if (obj instanceof HiveDecimal) {
- return new HiveDecimalWritable((HiveDecimal) obj);
- } else if (obj instanceof HiveDecimalWritable) {
- return obj;
- } else if (obj instanceof Timestamp) {
- return new HiveDecimalWritable(Double.toString(
- TimestampUtils.getDouble((Timestamp) obj)));
- }
- break;
- case FLOAT:
- if (obj instanceof Number) {
- // widening conversion
- return ((Number) obj).doubleValue();
- } else if (obj instanceof HiveDecimal) {
- return ((HiveDecimal) obj).doubleValue();
- } else if (obj instanceof String) {
- return Double.valueOf(obj.toString());
- } else if (obj instanceof Timestamp) {
- return TimestampUtils.getDouble((Timestamp) obj);
- } else if (obj instanceof HiveDecimal) {
- return ((HiveDecimal) obj).doubleValue();
- } else if (obj instanceof BigDecimal) {
- return ((BigDecimal) obj).doubleValue();
- }
- break;
- case LONG:
- if (obj instanceof Number) {
- // widening conversion
- return ((Number) obj).longValue();
- } else if (obj instanceof HiveDecimal) {
- return ((HiveDecimal) obj).longValue(); // TODO: lossy conversion!
- } else if (obj instanceof String) {
- return Long.valueOf(obj.toString());
- }
- break;
- case STRING:
- if (obj != null) {
- return (obj.toString());
- }
- break;
- case TIMESTAMP:
- if (obj instanceof Timestamp) {
- return obj;
- } else if (obj instanceof Integer) {
- return new Timestamp(((Number) obj).longValue());
- } else if (obj instanceof Float) {
- return TimestampUtils.doubleToTimestamp(((Float) obj).doubleValue());
- } else if (obj instanceof Double) {
- return TimestampUtils.doubleToTimestamp(((Double) obj).doubleValue());
- } else if (obj instanceof HiveDecimal) {
- return TimestampUtils.decimalToTimestamp((HiveDecimal) obj);
- } else if (obj instanceof HiveDecimalWritable) {
- return TimestampUtils.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal());
- } else if (obj instanceof Date) {
- return new Timestamp(((Date) obj).getTime());
- }
- // float/double conversion to timestamp is interpreted as seconds whereas integer conversion
- // to timestamp is interpreted as milliseconds by default. The integer to timestamp casting
- // is also config driven. The filter operator changes its promotion based on config:
- // "int.timestamp.conversion.in.seconds". Disable PPD for integer cases.
- break;
- default:
- break;
- }
-
- throw new IllegalArgumentException(String.format(
- "ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass()
- .getSimpleName(), type));
- }
-
- public static class SargApplier {
- public final static boolean[] READ_ALL_RGS = null;
- public final static boolean[] READ_NO_RGS = new boolean[0];
-
- private final SearchArgument sarg;
- private final List<PredicateLeaf> sargLeaves;
- private final int[] filterColumns;
- private final long rowIndexStride;
- // same as the above array, but indices are set to true
- private final boolean[] sargColumns;
- private SchemaEvolution evolution;
-
- public SargApplier(SearchArgument sarg, String[] columnNames,
- long rowIndexStride,
- SchemaEvolution evolution) {
- this.sarg = sarg;
- sargLeaves = sarg.getLeaves();
- filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, evolution);
- this.rowIndexStride = rowIndexStride;
- // included will not be null, row options will fill the array with trues if null
- sargColumns = new boolean[evolution.getFileIncluded().length];
- for (int i : filterColumns) {
- // filter columns may have -1 as index which could be partition column in SARG.
- if (i > 0) {
- sargColumns[i] = true;
- }
- }
- this.evolution = evolution;
- }
-
- /**
- * Pick the row groups that we need to load from the current stripe.
- *
- * @return an array with a boolean for each row group or null if all of the
- * row groups must be read.
- * @throws IOException
- */
- public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes,
- OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException {
- long rowsInStripe = stripe.getNumberOfRows();
- int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
- boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
- TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
- boolean hasSelected = false, hasSkipped = false;
- for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
- for (int pred = 0; pred < leafValues.length; ++pred) {
- int columnIx = filterColumns[pred];
- if (columnIx != -1) {
- if (indexes[columnIx] == null) {
- throw new AssertionError("Index is not populated for " + columnIx);
- }
- OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup);
- if (entry == null) {
- throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup);
- }
- OrcProto.ColumnStatistics stats = entry.getStatistics();
- OrcProto.BloomFilter bf = null;
- if (bloomFilterIndices != null && bloomFilterIndices[columnIx] != null) {
- bf = bloomFilterIndices[columnIx].getBloomFilter(rowGroup);
- }
- if (evolution != null && evolution.isPPDSafeConversion(columnIx)) {
- leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
- } else {
- leafValues[pred] = TruthValue.YES_NO_NULL;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Stats = " + stats);
- LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);
- }
- } else {
- // the column is a virtual column
- leafValues[pred] = TruthValue.YES_NO_NULL;
- }
- }
- result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
- hasSelected = hasSelected || result[rowGroup];
- hasSkipped = hasSkipped || (!result[rowGroup]);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
- (rowIndexStride * (rowGroup + 1) - 1) + " is " +
- (result[rowGroup] ? "" : "not ") + "included.");
- }
- }
-
- return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS;
- }
- }
-
- /**
- * Pick the row groups that we need to load from the current stripe.
- *
- * @return an array with a boolean for each row group or null if all of the
- * row groups must be read.
- * @throws IOException
- */
- protected boolean[] pickRowGroups() throws IOException {
- // if we don't have a sarg or indexes, we read everything
- if (sargApp == null) {
- return null;
- }
- readRowIndex(currentStripe, included, sargApp.sargColumns);
- return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
- }
-
- private void clearStreams() {
- // explicit close of all streams to de-ref ByteBuffers
- for (InStream is : streams.values()) {
- is.close();
- }
- if (bufferChunks != null) {
- if (dataReader.isTrackingDiskRanges()) {
- for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
- if (!(range instanceof BufferChunk)) {
- continue;
- }
- dataReader.releaseBuffer(((BufferChunk) range).getChunk());
- }
- }
- }
- bufferChunks = null;
- streams.clear();
- }
-
- /**
- * Read the current stripe into memory.
- *
- * @throws IOException
- */
- private void readStripe() throws IOException {
- StripeInformation stripe = beginReadStripe();
- includedRowGroups = pickRowGroups();
-
- // move forward to the first unskipped row
- if (includedRowGroups != null) {
- while (rowInStripe < rowCountInStripe &&
- !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
- rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
- }
- }
-
- // if we haven't skipped the whole stripe, read the data
- if (rowInStripe < rowCountInStripe) {
- // if we aren't projecting columns or filtering rows, just read it all
- if (included == null && includedRowGroups == null) {
- readAllDataStreams(stripe);
- } else {
- readPartialDataStreams(stripe);
- }
- reader.startStripe(streams, stripeFooter);
- // if we skipped the first row group, move the pointers forward
- if (rowInStripe != 0) {
- seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
- }
- }
- }
-
- private StripeInformation beginReadStripe() throws IOException {
- StripeInformation stripe = stripes.get(currentStripe);
- stripeFooter = readStripeFooter(stripe);
- clearStreams();
- // setup the position in the stripe
- rowCountInStripe = stripe.getNumberOfRows();
- rowInStripe = 0;
- rowBaseInStripe = 0;
- for (int i = 0; i < currentStripe; ++i) {
- rowBaseInStripe += stripes.get(i).getNumberOfRows();
- }
- // reset all of the indexes
- for (int i = 0; i < indexes.length; ++i) {
- indexes[i] = null;
- }
- return stripe;
- }
-
- private void readAllDataStreams(StripeInformation stripe) throws IOException {
- long start = stripe.getIndexLength();
- long end = start + stripe.getDataLength();
- // explicitly trigger 1 big read
- DiskRangeList toRead = new DiskRangeList(start, end);
- bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
- List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
- createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
- }
-
- /**
- * Plan the ranges of the file that we need to read given the list of
- * columns and row groups.
- *
- * @param streamList the list of streams available
- * @param indexes the indexes that have been loaded
- * @param includedColumns which columns are needed
- * @param includedRowGroups which row groups are needed
- * @param isCompressed does the file have generic compression
- * @param encodings the encodings for each column
- * @param types the types of the columns
- * @param compressionSize the compression block size
- * @return the list of disk ranges that will be loaded
- */
- static DiskRangeList planReadPartialDataStreams
- (List<OrcProto.Stream> streamList,
- OrcProto.RowIndex[] indexes,
- boolean[] includedColumns,
- boolean[] includedRowGroups,
- boolean isCompressed,
- List<OrcProto.ColumnEncoding> encodings,
- List<OrcProto.Type> types,
- int compressionSize,
- boolean doMergeBuffers) {
- long offset = 0;
- // figure out which columns have a present stream
- boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
- CreateHelper list = new CreateHelper();
- for (OrcProto.Stream stream : streamList) {
- long length = stream.getLength();
- int column = stream.getColumn();
- OrcProto.Stream.Kind streamKind = stream.getKind();
- // since stream kind is optional, first check if it exists
- if (stream.hasKind() &&
- (StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
- (column < includedColumns.length && includedColumns[column])) {
- // if we aren't filtering or it is a dictionary, load it.
- if (includedRowGroups == null
- || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
- RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
- } else {
- RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
- isCompressed, indexes[column], encodings.get(column), types.get(column),
- compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
- }
- }
- offset += length;
- }
- return list.extract();
- }
-
- void createStreams(List<OrcProto.Stream> streamDescriptions,
- DiskRangeList ranges,
- boolean[] includeColumn,
- CompressionCodec codec,
- int bufferSize,
- Map<StreamName, InStream> streams) throws IOException {
- long streamOffset = 0;
- for (OrcProto.Stream streamDesc : streamDescriptions) {
- int column = streamDesc.getColumn();
- if ((includeColumn != null &&
- (column < included.length && !includeColumn[column])) ||
- streamDesc.hasKind() &&
- (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
- streamOffset += streamDesc.getLength();
- continue;
- }
- List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
- ranges, streamOffset, streamDesc.getLength());
- StreamName name = new StreamName(column, streamDesc.getKind());
- streams.put(name, InStream.create(name.toString(), buffers,
- streamDesc.getLength(), codec, bufferSize));
- streamOffset += streamDesc.getLength();
- }
- }
-
- private void readPartialDataStreams(StripeInformation stripe) throws IOException {
- List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
- DiskRangeList toRead = planReadPartialDataStreams(streamList,
- indexes, included, includedRowGroups, codec != null,
- stripeFooter.getColumnsList(), types, bufferSize, true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
- }
- bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
- if (LOG.isDebugEnabled()) {
- LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
- }
-
- createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
- }
-
- /**
- * Read the next stripe until we find a row that we don't skip.
- *
- * @throws IOException
- */
- private void advanceStripe() throws IOException {
- rowInStripe = rowCountInStripe;
- while (rowInStripe >= rowCountInStripe &&
- currentStripe < stripes.size() - 1) {
- currentStripe += 1;
- readStripe();
- }
- }
-
- /**
- * Skip over rows that we aren't selecting, so that the next row is
- * one that we will read.
- *
- * @param nextRow the row we want to go to
- * @throws IOException
- */
- private boolean advanceToNextRow(
- TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe)
- throws IOException {
- long nextRowInStripe = nextRow - rowBaseInStripe;
- // check for row skipping
- if (rowIndexStride != 0 &&
- includedRowGroups != null &&
- nextRowInStripe < rowCountInStripe) {
- int rowGroup = (int) (nextRowInStripe / rowIndexStride);
- if (!includedRowGroups[rowGroup]) {
- while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
- rowGroup += 1;
- }
- if (rowGroup >= includedRowGroups.length) {
- if (canAdvanceStripe) {
- advanceStripe();
- }
- return canAdvanceStripe;
- }
- nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
- }
- }
- if (nextRowInStripe >= rowCountInStripe) {
- if (canAdvanceStripe) {
- advanceStripe();
- }
- return canAdvanceStripe;
- }
- if (nextRowInStripe != rowInStripe) {
- if (rowIndexStride != 0) {
- int rowGroup = (int) (nextRowInStripe / rowIndexStride);
- seekToRowEntry(reader, rowGroup);
- reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
- } else {
- reader.skipRows(nextRowInStripe - rowInStripe);
- }
- rowInStripe = nextRowInStripe;
- }
- return true;
- }
-
- @Override
- public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
- try {
- if (rowInStripe >= rowCountInStripe) {
- currentStripe += 1;
- if (currentStripe >= stripes.size()) {
- batch.size = 0;
- return false;
- }
- readStripe();
- }
-
- int batchSize = computeBatchSize(batch.getMaxSize());
-
- rowInStripe += batchSize;
- reader.setVectorColumnCount(batch.getDataColumnCount());
- reader.nextBatch(batch, batchSize);
- batch.selectedInUse = false;
- batch.size = batchSize;
- advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
- return batch.size != 0;
- } catch (IOException e) {
- // Rethrow exception with file name in log message
- throw new IOException("Error reading file: " + path, e);
- }
- }
-
- private int computeBatchSize(long targetBatchSize) {
- final int batchSize;
- // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
- // groups are selected then marker position is set to the end of range (subset of row groups
- // within strip). Batch size computed out of marker position makes sure that batch size is
- // aware of row group boundary and will not cause overflow when reading rows
- // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287
- if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) {
- int startRowGroup = (int) (rowInStripe / rowIndexStride);
- if (!includedRowGroups[startRowGroup]) {
- while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) {
- startRowGroup += 1;
- }
- }
-
- int endRowGroup = startRowGroup;
- while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) {
- endRowGroup += 1;
- }
-
- final long markerPosition =
- (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
- : rowCountInStripe;
- batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
-
- if (isLogDebugEnabled && batchSize < targetBatchSize) {
- LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
- }
- } else {
- batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
- }
- return batchSize;
- }
-
- @Override
- public void close() throws IOException {
- clearStreams();
- dataReader.close();
- }
-
- @Override
- public long getRowNumber() {
- return rowInStripe + rowBaseInStripe + firstRow;
- }
-
- /**
- * Return the fraction of rows that have been read from the selected.
- * section of the file
- *
- * @return fraction between 0.0 and 1.0 of rows consumed
- */
- @Override
- public float getProgress() {
- return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
- }
-
- private int findStripe(long rowNumber) {
- for (int i = 0; i < stripes.size(); i++) {
- StripeInformation stripe = stripes.get(i);
- if (stripe.getNumberOfRows() > rowNumber) {
- return i;
- }
- rowNumber -= stripe.getNumberOfRows();
- }
- throw new IllegalArgumentException("Seek after the end of reader range");
- }
-
- public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
- boolean[] sargColumns) throws IOException {
- return readRowIndex(stripeIndex, included, null, null, sargColumns);
- }
-
- public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
- OrcProto.RowIndex[] indexes,
- OrcProto.BloomFilterIndex[] bloomFilterIndex,
- boolean[] sargColumns) throws IOException {
- StripeInformation stripe = stripes.get(stripeIndex);
- OrcProto.StripeFooter stripeFooter = null;
- // if this is the current stripe, use the cached objects.
- if (stripeIndex == currentStripe) {
- stripeFooter = this.stripeFooter;
- indexes = indexes == null ? this.indexes : indexes;
- bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
- sargColumns = sargColumns == null ?
- (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
- }
- return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
- bloomFilterIndex);
- }
-
- private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
- throws IOException {
- PositionProvider[] index = new PositionProvider[indexes.length];
- for (int i = 0; i < indexes.length; ++i) {
- if (indexes[i] != null) {
- index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
- }
- }
- reader.seek(index);
- }
-
- @Override
- public void seekToRow(long rowNumber) throws IOException {
- if (rowNumber < 0) {
- throw new IllegalArgumentException("Seek to a negative row number " +
- rowNumber);
- } else if (rowNumber < firstRow) {
- throw new IllegalArgumentException("Seek before reader range " +
- rowNumber);
- }
- // convert to our internal form (rows from the beginning of slice)
- rowNumber -= firstRow;
-
- // move to the right stripe
- int rightStripe = findStripe(rowNumber);
- if (rightStripe != currentStripe) {
- currentStripe = rightStripe;
- readStripe();
- }
- readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns);
-
- // if we aren't to the right row yet, advance in the stripe.
- advanceToNextRow(reader, rowNumber, true);
- }
-
- private static final String TRANSLATED_SARG_SEPARATOR = "_";
- public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) {
- return rootColumn + TRANSLATED_SARG_SEPARATOR
- + ((indexInSourceTable == null) ? -1 : indexInSourceTable);
- }
-
- public static int[] mapTranslatedSargColumns(
- List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
- int[] result = new int[sargLeaves.size()];
- OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now.
- String lastRootStr = null;
- for (int i = 0; i < result.length; ++i) {
- String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
- assert rootAndIndex.length == 2;
- String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
- int index = Integer.parseInt(indexStr);
- // First, check if the column even maps to anything.
- if (index == -1) {
- result[i] = -1;
- continue;
- }
- assert index >= 0;
- // Then, find the root type if needed.
- if (!rootStr.equals(lastRootStr)) {
- lastRoot = types.get(Integer.parseInt(rootStr));
- lastRootStr = rootStr;
- }
- // Subtypes of the root types correspond, in order, to the columns in the table schema
- // (disregarding schema evolution that doesn't presently work). Get the index for the
- // corresponding subtype.
- result[i] = lastRoot.getSubtypes(index);
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
deleted file mode 100644
index 6100d50..0000000
--- a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
-import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.DataReader;
-import org.apache.orc.OrcProto;
-
-import com.google.common.collect.ComparisonChain;
-import org.apache.orc.StripeInformation;
-
-/**
- * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
- */
-public class RecordReaderUtils {
- private static final HadoopShims SHIMS = HadoopShims.Factory.get();
-
- private static class DefaultDataReader implements DataReader {
- private FSDataInputStream file = null;
- private final ByteBufferAllocatorPool pool;
- private HadoopShims.ZeroCopyReaderShim zcr = null;
- private final FileSystem fs;
- private final Path path;
- private final boolean useZeroCopy;
- private final CompressionCodec codec;
- private final int bufferSize;
- private final int typeCount;
-
- private DefaultDataReader(DefaultDataReader other) {
- this.pool = other.pool;
- this.bufferSize = other.bufferSize;
- this.typeCount = other.typeCount;
- this.fs = other.fs;
- this.path = other.path;
- this.useZeroCopy = other.useZeroCopy;
- this.codec = other.codec;
- }
-
- private DefaultDataReader(DataReaderProperties properties) {
- this.fs = properties.getFileSystem();
- this.path = properties.getPath();
- this.useZeroCopy = properties.getZeroCopy();
- this.codec = PhysicalFsWriter.createCodec(properties.getCompression());
- this.bufferSize = properties.getBufferSize();
- this.typeCount = properties.getTypeCount();
- if (useZeroCopy) {
- this.pool = new ByteBufferAllocatorPool();
- } else {
- this.pool = null;
- }
- }
-
- @Override
- public void open() throws IOException {
- this.file = fs.open(path);
- if (useZeroCopy) {
- zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
- } else {
- zcr = null;
- }
- }
-
- @Override
- public OrcIndex readRowIndex(StripeInformation stripe,
- OrcProto.StripeFooter footer,
- boolean[] included,
- OrcProto.RowIndex[] indexes,
- boolean[] sargColumns,
- OrcProto.BloomFilterIndex[] bloomFilterIndices
- ) throws IOException {
- if (file == null) {
- open();
- }
- if (footer == null) {
- footer = readStripeFooter(stripe);
- }
- if (indexes == null) {
- indexes = new OrcProto.RowIndex[typeCount];
- }
- if (bloomFilterIndices == null) {
- bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
- }
- long offset = stripe.getOffset();
- List<OrcProto.Stream> streams = footer.getStreamsList();
- for (int i = 0; i < streams.size(); i++) {
- OrcProto.Stream stream = streams.get(i);
- OrcProto.Stream nextStream = null;
- if (i < streams.size() - 1) {
- nextStream = streams.get(i+1);
- }
- int col = stream.getColumn();
- int len = (int) stream.getLength();
- // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
- // filter and combine the io to read row index and bloom filters for that column together
- if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
- boolean readBloomFilter = false;
- if (sargColumns != null && sargColumns[col] &&
- nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
- len += nextStream.getLength();
- i += 1;
- readBloomFilter = true;
- }
- if ((included == null || included[col]) && indexes[col] == null) {
- byte[] buffer = new byte[len];
- file.readFully(offset, buffer, 0, buffer.length);
- ByteBuffer bb = ByteBuffer.wrap(buffer);
- indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
- Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
- codec, bufferSize));
- if (readBloomFilter) {
- bb.position((int) stream.getLength());
- bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
- "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
- nextStream.getLength(), codec, bufferSize));
- }
- }
- }
- offset += len;
- }
-
- OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
- return index;
- }
-
- @Override
- public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
- if (file == null) {
- open();
- }
- long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
- int tailLength = (int) stripe.getFooterLength();
-
- // read the footer
- ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
- file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
- return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
- Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
- tailLength, codec, bufferSize));
- }
-
- @Override
- public DiskRangeList readFileData(
- DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
- return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
- }
-
- @Override
- public void close() throws IOException {
- if (pool != null) {
- pool.clear();
- }
- // close both zcr and file
- try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
- if (file != null) {
- file.close();
- }
- }
- }
-
- @Override
- public boolean isTrackingDiskRanges() {
- return zcr != null;
- }
-
- @Override
- public void releaseBuffer(ByteBuffer buffer) {
- zcr.releaseBuffer(buffer);
- }
-
- @Override
- public DataReader clone() {
- return new DefaultDataReader(this);
- }
-
- }
-
- public static DataReader createDefaultDataReader(DataReaderProperties properties) {
- return new DefaultDataReader(properties);
- }
-
- public static boolean[] findPresentStreamsByColumn(
- List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
- boolean[] hasNull = new boolean[types.size()];
- for(OrcProto.Stream stream: streamList) {
- if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
- hasNull[stream.getColumn()] = true;
- }
- }
- return hasNull;
- }
-
- /**
- * Does region A overlap region B? The end points are inclusive on both sides.
- * @param leftA A's left point
- * @param rightA A's right point
- * @param leftB B's left point
- * @param rightB B's right point
- * @return Does region A overlap region B?
- */
- static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
- if (leftA <= leftB) {
- return rightA >= leftB;
- }
- return rightB >= leftA;
- }
-
- public static void addEntireStreamToRanges(
- long offset, long length, CreateHelper list, boolean doMergeBuffers) {
- list.addOrMerge(offset, offset + length, doMergeBuffers, false);
- }
-
- public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
- boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
- OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
- long offset, long length, CreateHelper list, boolean doMergeBuffers) {
- for (int group = 0; group < includedRowGroups.length; ++group) {
- if (!includedRowGroups[group]) continue;
- int posn = getIndexPosition(
- encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
- long start = index.getEntry(group).getPositions(posn);
- final long nextGroupOffset;
- boolean isLast = group == (includedRowGroups.length - 1);
- nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
-
- start += offset;
- long end = offset + estimateRgEndOffset(
- isCompressed, isLast, nextGroupOffset, length, compressionSize);
- list.addOrMerge(start, end, doMergeBuffers, true);
- }
- }
-
- public static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
- long nextGroupOffset, long streamLength, int bufferSize) {
- // figure out the worst case last location
- // if adjacent groups have the same compressed block offset then stretch the slop
- // by factor of 2 to safely accommodate the next compression block.
- // One for the current compression block and another for the next compression block.
- long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
- return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
- }
-
- private static final int BYTE_STREAM_POSITIONS = 1;
- private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
- private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
- private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
-
- /**
- * Get the offset in the index positions for the column that the given
- * stream starts.
- * @param columnEncoding the encoding of the column
- * @param columnType the type of the column
- * @param streamType the kind of the stream
- * @param isCompressed is the file compressed
- * @param hasNulls does the column have a PRESENT stream?
- * @return the number of positions that will be used for that stream
- */
- public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
- OrcProto.Type.Kind columnType,
- OrcProto.Stream.Kind streamType,
- boolean isCompressed,
- boolean hasNulls) {
- if (streamType == OrcProto.Stream.Kind.PRESENT) {
- return 0;
- }
- int compressionValue = isCompressed ? 1 : 0;
- int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
- switch (columnType) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- case DATE:
- case STRUCT:
- case MAP:
- case LIST:
- case UNION:
- return base;
- case CHAR:
- case VARCHAR:
- case STRING:
- if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
- columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
- return base;
- } else {
- if (streamType == OrcProto.Stream.Kind.DATA) {
- return base;
- } else {
- return base + BYTE_STREAM_POSITIONS + compressionValue;
- }
- }
- case BINARY:
- if (streamType == OrcProto.Stream.Kind.DATA) {
- return base;
- }
- return base + BYTE_STREAM_POSITIONS + compressionValue;
- case DECIMAL:
- if (streamType == OrcProto.Stream.Kind.DATA) {
- return base;
- }
- return base + BYTE_STREAM_POSITIONS + compressionValue;
- case TIMESTAMP:
- if (streamType == OrcProto.Stream.Kind.DATA) {
- return base;
- }
- return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
- default:
- throw new IllegalArgumentException("Unknown type " + columnType);
- }
- }
-
- // for uncompressed streams, what is the most overlap with the following set
- // of rows (long vint literal group).
- static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
-
- /**
- * Is this stream part of a dictionary?
- * @return is this part of a dictionary?
- */
- public static boolean isDictionary(OrcProto.Stream.Kind kind,
- OrcProto.ColumnEncoding encoding) {
- assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
- OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
- return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
- (kind == OrcProto.Stream.Kind.LENGTH &&
- (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
- encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
- }
-
- /**
- * Build a string representation of a list of disk ranges.
- * @param range ranges to stringify
- * @return the resulting string
- */
- public static String stringifyDiskRanges(DiskRangeList range) {
- StringBuilder buffer = new StringBuilder();
- buffer.append("[");
- boolean isFirst = true;
- while (range != null) {
- if (!isFirst) {
- buffer.append(", {");
- } else {
- buffer.append("{");
- }
- isFirst = false;
- buffer.append(range.toString());
- buffer.append("}");
- range = range.next;
- }
- buffer.append("]");
- return buffer.toString();
- }
-
- /**
- * Read the list of ranges from the file.
- * @param file the file to read
- * @param base the base of the stripe
- * @param range the disk ranges within the stripe to read
- * @return the bytes read for each disk range, which is the same length as
- * ranges
- * @throws IOException
- */
- static DiskRangeList readDiskRanges(FSDataInputStream file,
- HadoopShims.ZeroCopyReaderShim zcr,
- long base,
- DiskRangeList range,
- boolean doForceDirect) throws IOException {
- if (range == null) return null;
- DiskRangeList prev = range.prev;
- if (prev == null) {
- prev = new MutateHelper(range);
- }
- while (range != null) {
- if (range.hasData()) {
- range = range.next;
- continue;
- }
- int len = (int) (range.getEnd() - range.getOffset());
- long off = range.getOffset();
- if (zcr != null) {
- file.seek(base + off);
- boolean hasReplaced = false;
- while (len > 0) {
- ByteBuffer partial = zcr.readBuffer(len, false);
- BufferChunk bc = new BufferChunk(partial, off);
- if (!hasReplaced) {
- range.replaceSelfWith(bc);
- hasReplaced = true;
- } else {
- range.insertAfter(bc);
- }
- range = bc;
- int read = partial.remaining();
- len -= read;
- off += read;
- }
- } else {
- // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless.
- byte[] buffer = new byte[len];
- file.readFully((base + off), buffer, 0, buffer.length);
- ByteBuffer bb = null;
- if (doForceDirect) {
- bb = ByteBuffer.allocateDirect(len);
- bb.put(buffer);
- bb.position(0);
- bb.limit(len);
- } else {
- bb = ByteBuffer.wrap(buffer);
- }
- range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
- }
- range = range.next;
- }
- return prev.next;
- }
-
-
- static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
- // This assumes sorted ranges (as do many other parts of ORC code.
- ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
- if (length == 0) return buffers;
- long streamEnd = offset + length;
- boolean inRange = false;
- while (range != null) {
- if (!inRange) {
- if (range.getEnd() <= offset) {
- range = range.next;
- continue; // Skip until we are in range.
- }
- inRange = true;
- if (range.getOffset() < offset) {
- // Partial first buffer, add a slice of it.
- buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
- if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
- range = range.next;
- continue;
- }
- } else if (range.getOffset() >= streamEnd) {
- break;
- }
- if (range.getEnd() > streamEnd) {
- // Partial last buffer (may also be the first buffer), add a slice of it.
- buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
- break;
- }
- // Buffer that belongs entirely to one stream.
- // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
- // because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
- buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
- if (range.getEnd() == streamEnd) break;
- range = range.next;
- }
- return buffers;
- }
-
- static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
- CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
- if ((codec == null || ((codec instanceof DirectDecompressionCodec)
- && ((DirectDecompressionCodec) codec).isAvailable()))) {
- /* codec is null or is available */
- return SHIMS.getZeroCopyReader(file, pool);
- }
- return null;
- }
-
- // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
- // which lacks a clear()/clean() operation
- public final static class ByteBufferAllocatorPool implements HadoopShims.ByteBufferPoolShim {
- private static final class Key implements Comparable<Key> {
- private final int capacity;
- private final long insertionGeneration;
-
- Key(int capacity, long insertionGeneration) {
- this.capacity = capacity;
- this.insertionGeneration = insertionGeneration;
- }
-
- @Override
- public int compareTo(Key other) {
- return ComparisonChain.start().compare(capacity, other.capacity)
- .compare(insertionGeneration, other.insertionGeneration).result();
- }
-
- @Override
- public boolean equals(Object rhs) {
- if (rhs == null) {
- return false;
- }
- try {
- Key o = (Key) rhs;
- return (compareTo(o) == 0);
- } catch (ClassCastException e) {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(capacity).append(insertionGeneration)
- .toHashCode();
- }
- }
-
- private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
-
- private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>();
-
- private long currentGeneration = 0;
-
- private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
- return direct ? directBuffers : buffers;
- }
-
- public void clear() {
- buffers.clear();
- directBuffers.clear();
- }
-
- @Override
- public ByteBuffer getBuffer(boolean direct, int length) {
- TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
- Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
- if (entry == null) {
- return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
- .allocate(length);
- }
- tree.remove(entry.getKey());
- return entry.getValue();
- }
-
- @Override
- public void putBuffer(ByteBuffer buffer) {
- TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
- while (true) {
- Key key = new Key(buffer.capacity(), currentGeneration++);
- if (!tree.containsKey(key)) {
- tree.put(key, buffer);
- return;
- }
- // Buffers are indexed by (capacity, generation).
- // If our key is not unique on the first try, we try again
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RedBlackTree.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RedBlackTree.java b/orc/src/java/org/apache/orc/impl/RedBlackTree.java
deleted file mode 100644
index 41aa4b9..0000000
--- a/orc/src/java/org/apache/orc/impl/RedBlackTree.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.orc.impl.DynamicIntArray;
-
-/**
- * A memory efficient red-black tree that does not allocate any objects per
- * an element. This class is abstract and assumes that the child class
- * handles the key and comparisons with the key.
- */
-abstract class RedBlackTree {
- public static final int NULL = -1;
-
- // Various values controlling the offset of the data within the array.
- private static final int LEFT_OFFSET = 0;
- private static final int RIGHT_OFFSET = 1;
- private static final int ELEMENT_SIZE = 2;
-
- protected int size = 0;
- private final DynamicIntArray data;
- protected int root = NULL;
- protected int lastAdd = 0;
- private boolean wasAdd = false;
-
- /**
- * Create a set with the given initial capacity.
- */
- public RedBlackTree(int initialCapacity) {
- data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE);
- }
-
- /**
- * Insert a new node into the data array, growing the array as necessary.
- *
- * @return Returns the position of the new node.
- */
- private int insert(int left, int right, boolean isRed) {
- int position = size;
- size += 1;
- setLeft(position, left, isRed);
- setRight(position, right);
- return position;
- }
-
- /**
- * Compare the value at the given position to the new value.
- * @return 0 if the values are the same, -1 if the new value is smaller and
- * 1 if the new value is larger.
- */
- protected abstract int compareValue(int position);
-
- /**
- * Is the given node red as opposed to black? To prevent having an extra word
- * in the data array, we just the low bit on the left child index.
- */
- protected boolean isRed(int position) {
- return position != NULL &&
- (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1;
- }
-
- /**
- * Set the red bit true or false.
- */
- private void setRed(int position, boolean isRed) {
- int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
- if (isRed) {
- data.set(offset, data.get(offset) | 1);
- } else {
- data.set(offset, data.get(offset) & ~1);
- }
- }
-
- /**
- * Get the left field of the given position.
- */
- protected int getLeft(int position) {
- return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1;
- }
-
- /**
- * Get the right field of the given position.
- */
- protected int getRight(int position) {
- return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
- }
-
- /**
- * Set the left field of the given position.
- * Note that we are storing the node color in the low bit of the left pointer.
- */
- private void setLeft(int position, int left) {
- int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
- data.set(offset, (left << 1) | (data.get(offset) & 1));
- }
-
- /**
- * Set the left field of the given position.
- * Note that we are storing the node color in the low bit of the left pointer.
- */
- private void setLeft(int position, int left, boolean isRed) {
- int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
- data.set(offset, (left << 1) | (isRed ? 1 : 0));
- }
-
- /**
- * Set the right field of the given position.
- */
- private void setRight(int position, int right) {
- data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right);
- }
-
- /**
- * Insert or find a given key in the tree and rebalance the tree correctly.
- * Rebalancing restores the red-black aspect of the tree to maintain the
- * invariants:
- * 1. If a node is red, both of its children are black.
- * 2. Each child of a node has the same black height (the number of black
- * nodes between it and the leaves of the tree).
- *
- * Inserted nodes are at the leaves and are red, therefore there is at most a
- * violation of rule 1 at the node we just put in. Instead of always keeping
- * the parents, this routine passing down the context.
- *
- * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are
- * left-right mirror images of each other). See Algorighms by Cormen,
- * Leiserson, and Rivest for the explaination of the subcases.
- *
- * @param node The node that we are fixing right now.
- * @param fromLeft Did we come down from the left?
- * @param parent Nodes' parent
- * @param grandparent Parent's parent
- * @param greatGrandparent Grandparent's parent
- * @return Does parent also need to be checked and/or fixed?
- */
- private boolean add(int node, boolean fromLeft, int parent,
- int grandparent, int greatGrandparent) {
- if (node == NULL) {
- if (root == NULL) {
- lastAdd = insert(NULL, NULL, false);
- root = lastAdd;
- wasAdd = true;
- return false;
- } else {
- lastAdd = insert(NULL, NULL, true);
- node = lastAdd;
- wasAdd = true;
- // connect the new node into the tree
- if (fromLeft) {
- setLeft(parent, node);
- } else {
- setRight(parent, node);
- }
- }
- } else {
- int compare = compareValue(node);
- boolean keepGoing;
-
- // Recurse down to find where the node needs to be added
- if (compare < 0) {
- keepGoing = add(getLeft(node), true, node, parent, grandparent);
- } else if (compare > 0) {
- keepGoing = add(getRight(node), false, node, parent, grandparent);
- } else {
- lastAdd = node;
- wasAdd = false;
- return false;
- }
-
- // we don't need to fix the root (because it is always set to black)
- if (node == root || !keepGoing) {
- return false;
- }
- }
-
-
- // Do we need to fix this node? Only if there are two reds right under each
- // other.
- if (isRed(node) && isRed(parent)) {
- if (parent == getLeft(grandparent)) {
- int uncle = getRight(grandparent);
- if (isRed(uncle)) {
- // case 1.1
- setRed(parent, false);
- setRed(uncle, false);
- setRed(grandparent, true);
- return true;
- } else {
- if (node == getRight(parent)) {
- // case 1.2
- // swap node and parent
- int tmp = node;
- node = parent;
- parent = tmp;
- // left-rotate on node
- setLeft(grandparent, parent);
- setRight(node, getLeft(parent));
- setLeft(parent, node);
- }
-
- // case 1.2 and 1.3
- setRed(parent, false);
- setRed(grandparent, true);
-
- // right-rotate on grandparent
- if (greatGrandparent == NULL) {
- root = parent;
- } else if (getLeft(greatGrandparent) == grandparent) {
- setLeft(greatGrandparent, parent);
- } else {
- setRight(greatGrandparent, parent);
- }
- setLeft(grandparent, getRight(parent));
- setRight(parent, grandparent);
- return false;
- }
- } else {
- int uncle = getLeft(grandparent);
- if (isRed(uncle)) {
- // case 2.1
- setRed(parent, false);
- setRed(uncle, false);
- setRed(grandparent, true);
- return true;
- } else {
- if (node == getLeft(parent)) {
- // case 2.2
- // swap node and parent
- int tmp = node;
- node = parent;
- parent = tmp;
- // right-rotate on node
- setRight(grandparent, parent);
- setLeft(node, getRight(parent));
- setRight(parent, node);
- }
- // case 2.2 and 2.3
- setRed(parent, false);
- setRed(grandparent, true);
- // left-rotate on grandparent
- if (greatGrandparent == NULL) {
- root = parent;
- } else if (getRight(greatGrandparent) == grandparent) {
- setRight(greatGrandparent, parent);
- } else {
- setLeft(greatGrandparent, parent);
- }
- setRight(grandparent, getLeft(parent));
- setLeft(parent, grandparent);
- return false;
- }
- }
- } else {
- return true;
- }
- }
-
- /**
- * Add the new key to the tree.
- * @return true if the element is a new one.
- */
- protected boolean add() {
- add(root, false, NULL, NULL, NULL);
- if (wasAdd) {
- setRed(root, false);
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Get the number of elements in the set.
- */
- public int size() {
- return size;
- }
-
- /**
- * Reset the table to empty.
- */
- public void clear() {
- root = NULL;
- size = 0;
- data.clear();
- }
-
- /**
- * Get the buffer size in bytes.
- */
- public long getSizeInBytes() {
- return data.getSizeInBytes();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
deleted file mode 100644
index 24bd051..0000000
--- a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-
-/**
- * A reader that reads a sequence of bytes. A control byte is read before
- * each run with positive values 0 to 127 meaning 3 to 130 repetitions. If the
- * byte is -1 to -128, 1 to 128 literal byte values follow.
- */
-public class RunLengthByteReader {
- private InStream input;
- private final byte[] literals =
- new byte[RunLengthByteWriter.MAX_LITERAL_SIZE];
- private int numLiterals = 0;
- private int used = 0;
- private boolean repeat = false;
-
- public RunLengthByteReader(InStream input) throws IOException {
- this.input = input;
- }
-
- public void setInStream(InStream input) {
- this.input = input;
- }
-
- private void readValues(boolean ignoreEof) throws IOException {
- int control = input.read();
- used = 0;
- if (control == -1) {
- if (!ignoreEof) {
- throw new EOFException("Read past end of buffer RLE byte from " + input);
- }
- used = numLiterals = 0;
- return;
- } else if (control < 0x80) {
- repeat = true;
- numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
- int val = input.read();
- if (val == -1) {
- throw new EOFException("Reading RLE byte got EOF");
- }
- literals[0] = (byte) val;
- } else {
- repeat = false;
- numLiterals = 0x100 - control;
- int bytes = 0;
- while (bytes < numLiterals) {
- int result = input.read(literals, bytes, numLiterals - bytes);
- if (result == -1) {
- throw new EOFException("Reading RLE byte literal got EOF in " + this);
- }
- bytes += result;
- }
- }
- }
-
- public boolean hasNext() throws IOException {
- return used != numLiterals || input.available() > 0;
- }
-
- public byte next() throws IOException {
- byte result;
- if (used == numLiterals) {
- readValues(false);
- }
- if (repeat) {
- result = literals[0];
- } else {
- result = literals[used];
- }
- ++used;
- return result;
- }
-
- public void nextVector(ColumnVector previous, long[] data, long size)
- throws IOException {
- previous.isRepeating = true;
- for (int i = 0; i < size; i++) {
- if (!previous.isNull[i]) {
- data[i] = next();
- } else {
- // The default value of null for int types in vectorized
- // processing is 1, so set that if the value is null
- data[i] = 1;
- }
-
- // The default value for nulls in Vectorization for int types is 1
- // and given that non null value can also be 1, we need to check for isNull also
- // when determining the isRepeating flag.
- if (previous.isRepeating
- && i > 0
- && ((data[0] != data[i]) ||
- (previous.isNull[0] != previous.isNull[i]))) {
- previous.isRepeating = false;
- }
- }
- }
-
- /**
- * Read the next size bytes into the data array, skipping over any slots
- * where isNull is true.
- * @param isNull if non-null, skip any rows where isNull[r] is true
- * @param data the array to read into
- * @param size the number of elements to read
- * @throws IOException
- */
- public void nextVector(boolean[] isNull, int[] data,
- long size) throws IOException {
- if (isNull == null) {
- for(int i=0; i < size; ++i) {
- data[i] = next();
- }
- } else {
- for(int i=0; i < size; ++i) {
- if (!isNull[i]) {
- data[i] = next();
- }
- }
- }
- }
-
- public void seek(PositionProvider index) throws IOException {
- input.seek(index);
- int consumed = (int) index.getNext();
- if (consumed != 0) {
- // a loop is required for cases where we break the run into two parts
- while (consumed > 0) {
- readValues(false);
- used = consumed;
- consumed -= numLiterals;
- }
- } else {
- used = 0;
- numLiterals = 0;
- }
- }
-
- public void skip(long items) throws IOException {
- while (items > 0) {
- if (used == numLiterals) {
- readValues(false);
- }
- long consume = Math.min(items, numLiterals - used);
- used += consume;
- items -= consume;
- }
- }
-
- @Override
- public String toString() {
- return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
- used + "/" + numLiterals + " from " + input;
- }
-}