You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/04/26 17:18:00 UTC
[4/5] hive git commit: HIVE-12159: Create vectorized readers for the
complex types (Owen O'Malley, reviewed by Matt McCline and Prasanth)
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 9cfcc0e..2199b11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -27,12 +27,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closer;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.DataReaderFactory;
-import org.apache.orc.MetadataReaderFactory;
-import org.apache.orc.OrcUtils;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.impl.ColumnStatisticsImpl;
@@ -42,12 +38,9 @@ import org.apache.orc.DateColumnStatistics;
import org.apache.orc.DecimalColumnStatistics;
import org.apache.orc.DoubleColumnStatistics;
import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.DefaultMetadataReaderFactory;
import org.apache.orc.impl.InStream;
import org.apache.orc.IntegerColumnStatistics;
-import org.apache.orc.impl.MetadataReader;
import org.apache.orc.OrcConf;
-import org.apache.orc.impl.MetadataReaderProperties;
import org.apache.orc.impl.OrcIndex;
import org.apache.orc.impl.PositionProvider;
import org.apache.orc.impl.StreamName;
@@ -56,14 +49,11 @@ import org.apache.orc.StripeInformation;
import org.apache.orc.TimestampColumnStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -103,8 +93,6 @@ public class RecordReaderImpl implements RecordReader {
private final SargApplier sargApp;
// an array about which row groups aren't skipped
private boolean[] includedRowGroups = null;
- private final Configuration conf;
- private final MetadataReader metadata;
private final DataReader dataReader;
/**
@@ -146,130 +134,36 @@ public class RecordReaderImpl implements RecordReader {
return result;
}
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
- private Reader.Options options;
- private CompressionCodec codec;
- private List<OrcProto.Type> types;
- private List<StripeInformation> stripes;
- private int bufferSize;
- private FileSystem fileSystem;
- private Path path;
- private Configuration conf;
- private long strideRate;
- private MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory();
- private DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory();
-
- private Builder() {
-
- }
-
- public Builder withOptions(Reader.Options options) {
- this.options = options;
- return this;
- }
-
- public Builder withCodec(CompressionCodec codec) {
- this.codec = codec;
- return this;
- }
-
- public Builder withTypes(List<OrcProto.Type> types) {
- this.types = types;
- return this;
- }
-
- public Builder withStripes(List<StripeInformation> stripes) {
- this.stripes = stripes;
- return this;
- }
-
- public Builder withBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- return this;
- }
-
- public Builder withFileSystem(FileSystem fileSystem) {
- this.fileSystem = fileSystem;
- return this;
- }
-
- public Builder withPath(Path path) {
- this.path = path;
- return this;
- }
-
- public Builder withConf(Configuration conf) {
- this.conf = conf;
- return this;
- }
-
- public Builder withStrideRate(long strideRate) {
- this.strideRate = strideRate;
- return this;
- }
-
- public Builder withMetadataReaderFactory(MetadataReaderFactory metadataReaderFactory) {
- this.metadataReaderFactory = metadataReaderFactory;
- return this;
- }
-
- public Builder withDataReaderFactory(DataReaderFactory dataReaderFactory) {
- this.dataReaderFactory = dataReaderFactory;
- return this;
- }
-
- public RecordReaderImpl build() throws IOException {
- Preconditions.checkNotNull(metadataReaderFactory);
- Preconditions.checkNotNull(dataReaderFactory);
- Preconditions.checkNotNull(options);
- Preconditions.checkNotNull(types);
- Preconditions.checkNotNull(stripes);
- Preconditions.checkNotNull(fileSystem);
- Preconditions.checkNotNull(path);
- Preconditions.checkNotNull(conf);
-
- return new RecordReaderImpl(this);
- }
- }
-
- private RecordReaderImpl(Builder builder) throws IOException {
- Reader.Options options = builder.options;
- this.types = builder.types;
- TreeReaderFactory.TreeReaderSchema treeReaderSchema;
+ protected RecordReaderImpl(ReaderImpl fileReader,
+ Reader.Options options) throws IOException {
+ SchemaEvolution treeReaderSchema;
+ this.included = options.getInclude();
+ included[0] = true;
if (options.getSchema() == null) {
if (LOG.isInfoEnabled()) {
- LOG.info("Schema on read not provided -- using file schema " + types.toString());
+ LOG.info("Schema on read not provided -- using file schema " +
+ fileReader.getSchema());
}
- treeReaderSchema = new TreeReaderFactory.TreeReaderSchema().fileTypes(types).schemaTypes(types);
+ treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included);
} else {
// Now that we are creating a record reader for a file, validate that the schema to read
// is compatible with the file schema.
//
- List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
- treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
- }
- this.path = builder.path;
- this.codec = builder.codec;
- this.bufferSize = builder.bufferSize;
- this.included = options.getInclude();
- this.conf = builder.conf;
- this.rowIndexStride = builder.strideRate;
- this.metadata = builder.metadataReaderFactory.create(MetadataReaderProperties.builder()
- .withFileSystem(builder.fileSystem)
- .withPath(path)
- .withCodec(codec)
- .withBufferSize(bufferSize)
- .withTypeCount(types.size())
- .build());
+ treeReaderSchema = new SchemaEvolution(fileReader.getSchema(),
+ options.getSchema(),
+ included);
+ }
+ this.path = fileReader.path;
+ this.codec = fileReader.codec;
+ this.types = fileReader.types;
+ this.bufferSize = fileReader.bufferSize;
+ this.rowIndexStride = fileReader.rowIndexStride;
+ FileSystem fileSystem = fileReader.fileSystem;
SearchArgument sarg = options.getSearchArgument();
- if (sarg != null && builder.strideRate != 0) {
+ if (sarg != null && rowIndexStride != 0) {
sargApp = new SargApplier(
- sarg, options.getColumnNames(), builder.strideRate, types, included.length);
+ sarg, options.getColumnNames(), rowIndexStride, types, included.length);
} else {
sargApp = null;
}
@@ -277,7 +171,7 @@ public class RecordReaderImpl implements RecordReader {
long skippedRows = 0;
long offset = options.getOffset();
long maxOffset = options.getMaxOffset();
- for(StripeInformation stripe: builder.stripes) {
+ for(StripeInformation stripe: fileReader.getStripes()) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
skippedRows += stripe.getNumberOfRows();
@@ -289,25 +183,30 @@ public class RecordReaderImpl implements RecordReader {
Boolean zeroCopy = options.getUseZeroCopy();
if (zeroCopy == null) {
- zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf);
+ zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
+ }
+ if (options.getDataReader() == null) {
+ dataReader = RecordReaderUtils.createDefaultDataReader(
+ DataReaderProperties.builder()
+ .withBufferSize(bufferSize)
+ .withCompression(fileReader.compressionKind)
+ .withFileSystem(fileSystem)
+ .withPath(path)
+ .withTypeCount(types.size())
+ .withZeroCopy(zeroCopy)
+ .build());
+ } else {
+ dataReader = options.getDataReader();
}
- // TODO: we could change the ctor to pass this externally
- this.dataReader = builder.dataReaderFactory.create(DataReaderProperties.builder()
- .withFileSystem(builder.fileSystem)
- .withCodec(codec)
- .withPath(path)
- .withZeroCopy(zeroCopy)
- .build());
- this.dataReader.open();
-
firstRow = skippedRows;
totalRowCount = rows;
Boolean skipCorrupt = options.getSkipCorruptRecords();
if (skipCorrupt == null) {
- skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
+ skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
}
- reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
+ reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
+ treeReaderSchema, included, skipCorrupt);
indexes = new OrcProto.RowIndex[types.size()];
bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
advanceToNextRow(reader, 0L, true);
@@ -333,10 +232,10 @@ public class RecordReaderImpl implements RecordReader {
}
OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
- return metadata.readStripeFooter(stripe);
+ return dataReader.readStripeFooter(stripe);
}
- static enum Location {
+ enum Location {
BEFORE, MIN, MIDDLE, MAX, AFTER
}
@@ -895,7 +794,7 @@ public class RecordReaderImpl implements RecordReader {
return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
}
- private void clearStreams() throws IOException {
+ private void clearStreams() {
// explicit close of all streams to de-ref ByteBuffers
for (InStream is : streams.values()) {
is.close();
@@ -1149,31 +1048,27 @@ public class RecordReaderImpl implements RecordReader {
}
@Override
- public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException {
+ public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
try {
- final VectorizedRowBatch result;
if (rowInStripe >= rowCountInStripe) {
currentStripe += 1;
+ if (currentStripe >= stripes.size()) {
+ batch.size = 0;
+ return false;
+ }
readStripe();
}
- final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
+ int batchSize = computeBatchSize(batch.getMaxSize());
rowInStripe += batchSize;
- if (previous == null) {
- ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
- result = new VectorizedRowBatch(cols.length);
- result.cols = cols;
- } else {
- result = previous;
- result.selectedInUse = false;
- reader.setVectorColumnCount(result.getDataColumnCount());
- reader.nextVector(result.cols, batchSize);
- }
+ reader.setVectorColumnCount(batch.getDataColumnCount());
+ reader.nextBatch(batch, batchSize);
- result.size = batchSize;
+ batch.size = (int) batchSize;
+ batch.selectedInUse = false;
advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
- return result;
+ return batch.size != 0;
} catch (IOException e) {
// Rethrow exception with file name in log message
throw new IOException("Error reading file: " + path, e);
@@ -1216,16 +1111,8 @@ public class RecordReaderImpl implements RecordReader {
@Override
public void close() throws IOException {
- Closer closer = Closer.create();
- try {
- closer.register(metadata);
- closer.register(dataReader);
- clearStreams();
- } catch (IOException e) {
- throw closer.rethrow(e);
- } finally {
- closer.close();
- }
+ clearStreams();
+ dataReader.close();
}
@Override
@@ -1244,10 +1131,6 @@ public class RecordReaderImpl implements RecordReader {
return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
}
- MetadataReader getMetadataReader() {
- return metadata;
- }
-
private int findStripe(long rowNumber) {
for (int i = 0; i < stripes.size(); i++) {
StripeInformation stripe = stripes.get(i);
@@ -1276,8 +1159,8 @@ public class RecordReaderImpl implements RecordReader {
sargColumns = sargColumns == null ?
(sargApp == null ? null : sargApp.sargColumns) : sargColumns;
}
- return metadata.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
- bloomFilterIndex);
+ return dataReader.readRowIndex(stripe, stripeFooter, included, indexes,
+ sargColumns, bloomFilterIndex);
}
private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
index 177721d..4192588 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+import org.apache.orc.StripeInformation;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
@@ -44,6 +46,8 @@ import org.apache.orc.impl.DirectDecompressionCodec;
import org.apache.orc.OrcProto;
import com.google.common.collect.ComparisonChain;
+import org.apache.orc.impl.InStream;
+import org.apache.orc.impl.OrcIndex;
import org.apache.orc.impl.OutStream;
/**
@@ -53,34 +57,130 @@ public class RecordReaderUtils {
private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
private static class DefaultDataReader implements DataReader {
- private FSDataInputStream file;
- private ByteBufferAllocatorPool pool;
- private ZeroCopyReaderShim zcr;
- private FileSystem fs;
- private Path path;
- private boolean useZeroCopy;
- private CompressionCodec codec;
+ private FSDataInputStream file = null;
+ private final ByteBufferAllocatorPool pool;
+ private ZeroCopyReaderShim zcr = null;
+ private final FileSystem fs;
+ private final Path path;
+ private final boolean useZeroCopy;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ private final int typeCount;
+
+ private DefaultDataReader(DefaultDataReader other) {
+ this.pool = other.pool;
+ this.zcr = other.zcr;
+ this.bufferSize = other.bufferSize;
+ this.typeCount = other.typeCount;
+ this.fs = other.fs;
+ this.path = other.path;
+ this.useZeroCopy = other.useZeroCopy;
+ this.codec = other.codec;
+ }
private DefaultDataReader(DataReaderProperties properties) {
this.fs = properties.getFileSystem();
this.path = properties.getPath();
this.useZeroCopy = properties.getZeroCopy();
- this.codec = properties.getCodec();
+ this.codec = WriterImpl.createCodec(properties.getCompression());
+ this.bufferSize = properties.getBufferSize();
+ this.typeCount = properties.getTypeCount();
+ if (useZeroCopy) {
+ this.pool = new ByteBufferAllocatorPool();
+ } else {
+ this.pool = null;
+ }
}
@Override
public void open() throws IOException {
this.file = fs.open(path);
if (useZeroCopy) {
- pool = new ByteBufferAllocatorPool();
zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
} else {
- pool = null;
zcr = null;
}
}
@Override
+ public OrcIndex readRowIndex(StripeInformation stripe,
+ OrcProto.StripeFooter footer,
+ boolean[] included,
+ OrcProto.RowIndex[] indexes,
+ boolean[] sargColumns,
+ OrcProto.BloomFilterIndex[] bloomFilterIndices
+ ) throws IOException {
+ if (file == null) {
+ open();
+ }
+ if (footer == null) {
+ footer = readStripeFooter(stripe);
+ }
+ if (indexes == null) {
+ indexes = new OrcProto.RowIndex[typeCount];
+ }
+ if (bloomFilterIndices == null) {
+ bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
+ }
+ long offset = stripe.getOffset();
+ List<OrcProto.Stream> streams = footer.getStreamsList();
+ for (int i = 0; i < streams.size(); i++) {
+ OrcProto.Stream stream = streams.get(i);
+ OrcProto.Stream nextStream = null;
+ if (i < streams.size() - 1) {
+ nextStream = streams.get(i+1);
+ }
+ int col = stream.getColumn();
+ int len = (int) stream.getLength();
+ // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
+ // filter and combine the io to read row index and bloom filters for that column together
+ if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
+ boolean readBloomFilter = false;
+ if (sargColumns != null && sargColumns[col] &&
+ nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
+ len += nextStream.getLength();
+ i += 1;
+ readBloomFilter = true;
+ }
+ if ((included == null || included[col]) && indexes[col] == null) {
+ byte[] buffer = new byte[len];
+ file.readFully(offset, buffer, 0, buffer.length);
+ ByteBuffer bb = ByteBuffer.wrap(buffer);
+ indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+ Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
+ codec, bufferSize));
+ if (readBloomFilter) {
+ bb.position((int) stream.getLength());
+ bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
+ "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
+ nextStream.getLength(), codec, bufferSize));
+ }
+ }
+ }
+ offset += len;
+ }
+
+ OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
+ return index;
+ }
+
+ @Override
+ public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+ if (file == null) {
+ open();
+ }
+ long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
+ int tailLength = (int) stripe.getFooterLength();
+
+ // read the footer
+ ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+ file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+ return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
+ Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+ tailLength, codec, bufferSize));
+ }
+
+ @Override
public DiskRangeList readFileData(
DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
@@ -106,9 +206,14 @@ public class RecordReaderUtils {
zcr.releaseBuffer(buffer);
}
+ @Override
+ public DataReader clone() {
+ return new DefaultDataReader(this);
+ }
+
}
- static DataReader createDefaultDataReader(DataReaderProperties properties) {
+ public static DataReader createDefaultDataReader(DataReaderProperties properties) {
return new DefaultDataReader(properties);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
index f28ca13..6747691 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
-import org.apache.orc.OrcProto;
-import org.apache.orc.OrcUtils;
import org.apache.orc.TypeDescription;
/**
@@ -34,103 +33,134 @@ import org.apache.orc.TypeDescription;
* has been schema evolution.
*/
public class SchemaEvolution {
-
+ private final Map<TypeDescription, TypeDescription> readerToFile;
+ private final boolean[] included;
+ private final TypeDescription readerSchema;
private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
- public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
- List<OrcProto.Type> schemaTypes) throws IOException {
+ public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
+ this.included = included;
+ readerToFile = null;
+ this.readerSchema = readerSchema;
+ }
- // For ACID, the row is the ROW field in the outer STRUCT.
- final boolean isAcid = checkAcidSchema(fileTypes);
- final List<OrcProto.Type> rowSchema;
- int rowSubtype;
- if (isAcid) {
- rowSubtype = OrcRecordUpdater.ROW + 1;
- rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
+ public SchemaEvolution(TypeDescription fileSchema,
+ TypeDescription readerSchema,
+ boolean[] included) throws IOException {
+ readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1);
+ this.included = included;
+ if (checkAcidSchema(fileSchema)) {
+ this.readerSchema = createEventSchema(readerSchema);
} else {
- rowSubtype = 0;
- rowSchema = fileTypes;
+ this.readerSchema = readerSchema;
}
+ buildMapping(fileSchema, this.readerSchema);
+ }
- // Do checking on the overlap. Additional columns will be defaulted to NULL.
-
- int numFileColumns = rowSchema.get(0).getSubtypesCount();
- int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
-
- int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
-
- /**
- * Check type promotion.
- *
- * Currently, we only support integer type promotions that can be done "implicitly".
- * That is, we know that using a bigger integer tree reader on the original smaller integer
- * column will "just work".
- *
- * In the future, other type promotions might require type conversion.
- */
- // short -> int -> bigint as same integer readers are used for the above types.
-
- for (int i = 0; i < numReadColumns; i++) {
- OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
- OrcProto.Type rColType = schemaTypes.get(i);
- if (!fColType.getKind().equals(rColType.getKind())) {
-
- boolean ok = false;
- if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+ public TypeDescription getReaderSchema() {
+ return readerSchema;
+ }
- if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
- rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
- // type promotion possible, converting SHORT to INT/LONG requested type
- ok = true;
- }
- } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+ public TypeDescription getFileType(TypeDescription readerType) {
+ TypeDescription result;
+ if (readerToFile == null) {
+ if (included == null || included[readerType.getId()]) {
+ result = readerType;
+ } else {
+ result = null;
+ }
+ } else {
+ result = readerToFile.get(readerType);
+ }
+ return result;
+ }
- if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
- // type promotion possible, converting INT to LONG requested type
- ok = true;
+ void buildMapping(TypeDescription fileType,
+ TypeDescription readerType) throws IOException {
+ // if the column isn't included, don't map it
+ if (included != null && !included[readerType.getId()]) {
+ return;
+ }
+ boolean isOk = true;
+ // check the easy case first
+ if (fileType.getCategory() == readerType.getCategory()) {
+ switch (readerType.getCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case DOUBLE:
+ case FLOAT:
+ case STRING:
+ case TIMESTAMP:
+ case BINARY:
+ case DATE:
+ // these are always a match
+ break;
+ case CHAR:
+ case VARCHAR:
+ isOk = fileType.getMaxLength() == readerType.getMaxLength();
+ break;
+ case DECIMAL:
+ // TODO we don't enforce scale and precision checks, but probably should
+ break;
+ case UNION:
+ case MAP:
+ case LIST: {
+ // these must be an exact match
+ List<TypeDescription> fileChildren = fileType.getChildren();
+ List<TypeDescription> readerChildren = readerType.getChildren();
+ if (fileChildren.size() == readerChildren.size()) {
+ for(int i=0; i < fileChildren.size(); ++i) {
+ buildMapping(fileChildren.get(i), readerChildren.get(i));
+ }
+ } else {
+ isOk = false;
}
+ break;
}
-
- if (!ok) {
- throw new IOException("ORC does not support type conversion from " +
- fColType.getKind().name() + " to " + rColType.getKind().name());
+ case STRUCT: {
+ // allow either side to have fewer fields than the other
+ List<TypeDescription> fileChildren = fileType.getChildren();
+ List<TypeDescription> readerChildren = readerType.getChildren();
+ int jointSize = Math.min(fileChildren.size(), readerChildren.size());
+ for(int i=0; i < jointSize; ++i) {
+ buildMapping(fileChildren.get(i), readerChildren.get(i));
+ }
+ break;
}
+ default:
+ throw new IllegalArgumentException("Unknown type " + readerType);
}
- }
-
- List<OrcProto.Type> fullSchemaTypes;
-
- if (isAcid) {
- fullSchemaTypes = new ArrayList<OrcProto.Type>();
-
- // This copies the ACID struct type which is subtype = 0.
- // It has field names "operation" through "row".
- // And we copy the types for all fields EXCEPT ROW (which must be last!).
-
- for (int i = 0; i < rowSubtype; i++) {
- fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+ } else {
+ switch (fileType.getCategory()) {
+ case SHORT:
+ if (readerType.getCategory() != TypeDescription.Category.INT &&
+ readerType.getCategory() != TypeDescription.Category.LONG) {
+ isOk = false;
+ }
+ break;
+ case INT:
+ if (readerType.getCategory() != TypeDescription.Category.LONG) {
+ isOk = false;
+ }
+ break;
+ default:
+ isOk = false;
}
-
- // Add the row struct type.
- OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
+ }
+ if (isOk) {
+ readerToFile.put(readerType, fileType);
} else {
- fullSchemaTypes = schemaTypes;
+ throw new IOException("ORC does not support type conversion from " +
+ fileType + " to " + readerType);
}
-
- int innerStructSubtype = rowSubtype;
-
- // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
- // " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
-
- return new TreeReaderSchema().
- fileTypes(fileTypes).
- schemaTypes(fullSchemaTypes).
- innerStructSubtype(innerStructSubtype);
}
- private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
- if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
- List<String> rootFields = fileSchema.get(0).getFieldNamesList();
+ private static boolean checkAcidSchema(TypeDescription type) {
+ if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
+ List<String> rootFields = type.getFieldNames();
if (acidEventFieldNames.equals(rootFields)) {
return true;
}
@@ -142,26 +172,14 @@ public class SchemaEvolution {
* @param typeDescr
* @return ORC types for the ACID event based on the row's type description
*/
- public static List<OrcProto.Type> createEventSchema(TypeDescription typeDescr) {
-
- List<OrcProto.Type> result = new ArrayList<OrcProto.Type>();
-
- OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
- type.setKind(OrcProto.Type.Kind.STRUCT);
- type.addAllFieldNames(acidEventFieldNames);
- for (int i = 0; i < acidEventFieldNames.size(); i++) {
- type.addSubtypes(i + 1);
- }
- result.add(type.build());
-
- // Automatically add all fields except the last (ROW).
- for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
- type.clear();
- type.setKind(acidEventOrcTypeKinds.get(i));
- result.add(type.build());
- }
-
- OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
+ public static TypeDescription createEventSchema(TypeDescription typeDescr) {
+ TypeDescription result = TypeDescription.createStruct()
+ .addField("operation", TypeDescription.createInt())
+ .addField("originalTransaction", TypeDescription.createLong())
+ .addField("bucket", TypeDescription.createInt())
+ .addField("rowId", TypeDescription.createLong())
+ .addField("currentTransaction", TypeDescription.createLong())
+ .addField("row", typeDescr.clone());
return result;
}
@@ -174,14 +192,4 @@ public class SchemaEvolution {
acidEventFieldNames.add("currentTransaction");
acidEventFieldNames.add("row");
}
- public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
- new ArrayList<OrcProto.Type.Kind>();
- static {
- acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
- acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
- acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
- acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
- acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
- acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 8bb32ea..8ee8cd7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -24,6 +24,7 @@ import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -35,9 +36,12 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -56,8 +60,7 @@ import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.orc.TypeDescription;
import org.apache.orc.impl.BitFieldReader;
import org.apache.orc.impl.DynamicByteArray;
import org.apache.orc.impl.InStream;
@@ -75,60 +78,6 @@ import org.apache.orc.impl.StreamName;
*/
public class TreeReaderFactory {
- private static final Logger LOG =
- LoggerFactory.getLogger(TreeReaderFactory.class);
-
- public static class TreeReaderSchema {
-
- /**
- * The types in the ORC file.
- */
- List<OrcProto.Type> fileTypes;
-
- /**
- * The treeReaderSchema that the reader should read as.
- */
- List<OrcProto.Type> schemaTypes;
-
- /**
- * The subtype of the row STRUCT. Different than 0 for ACID.
- */
- int innerStructSubtype;
-
- public TreeReaderSchema() {
- fileTypes = null;
- schemaTypes = null;
- innerStructSubtype = -1;
- }
-
- public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) {
- this.fileTypes = fileTypes;
- return this;
- }
-
- public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) {
- this.schemaTypes = schemaTypes;
- return this;
- }
-
- public TreeReaderSchema innerStructSubtype(int innerStructSubtype) {
- this.innerStructSubtype = innerStructSubtype;
- return this;
- }
-
- public List<OrcProto.Type> getFileTypes() {
- return fileTypes;
- }
-
- public List<OrcProto.Type> getSchemaTypes() {
- return schemaTypes;
- }
-
- public int getInnerStructSubtype() {
- return innerStructSubtype;
- }
- }
-
public abstract static class TreeReader {
protected final int columnId;
protected BitFieldReader present = null;
@@ -230,36 +179,60 @@ public class TreeReaderFactory {
}
/**
+ * Called at the top level to read into the given batch.
+ * @param batch the batch to read into
+ * @param batchSize the number of rows to read
+ * @throws IOException
+ */
+ public void nextBatch(VectorizedRowBatch batch,
+ int batchSize) throws IOException {
+ batch.cols[0].reset();
+ batch.cols[0].ensureSize(batchSize, false);
+ nextVector(batch.cols[0], null, batchSize);
+ }
+
+ /**
* Populates the isNull vector array in the previousVector object based on
* the present stream values. This function is called from all the child
* readers, and they all set the values based on isNull field value.
*
- * @param previousVector The columnVector object whose isNull value is populated
+ * @param previous The columnVector object whose isNull value is populated
+ * @param isNull Whether the each value was null at a higher level. If
+ * isNull is null, all values are non-null.
* @param batchSize Size of the column vector
- * @return next column vector
* @throws IOException
*/
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- ColumnVector result = (ColumnVector) previousVector;
- if (present != null) {
+ public void nextVector(ColumnVector previous,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ if (present != null || isNull != null) {
// Set noNulls and isNull vector of the ColumnVector based on
// present stream
- result.noNulls = true;
+ previous.noNulls = true;
+ boolean allNull = true;
for (int i = 0; i < batchSize; i++) {
- result.isNull[i] = (present.next() != 1);
- if (result.noNulls && result.isNull[i]) {
- result.noNulls = false;
+ if (isNull == null || !isNull[i]) {
+ if (present != null && present.next() != 1) {
+ previous.noNulls = false;
+ previous.isNull[i] = true;
+ } else {
+ previous.isNull[i] = false;
+ allNull = false;
+ }
+ } else {
+ previous.noNulls = false;
+ previous.isNull[i] = true;
}
}
+ previous.isRepeating = !previous.noNulls && allNull;
} else {
- // There is not present stream, this means that all the values are
+ // There is no present stream, this means that all the values are
// present.
- result.noNulls = true;
+ previous.noNulls = true;
for (int i = 0; i < batchSize; i++) {
- result.isNull[i] = false;
+ previous.isNull[i] = false;
}
}
- return previousVector;
}
public BitFieldReader getPresent() {
@@ -267,6 +240,46 @@ public class TreeReaderFactory {
}
}
+ public static class NullTreeReader extends TreeReader {
+
+ public NullTreeReader(int columnId) throws IOException {
+ super(columnId);
+ }
+
+ @Override
+ public void startStripe(Map<StreamName, InStream> streams,
+ OrcProto.StripeFooter footer) {
+ // PASS
+ }
+
+ @Override
+ void skipRows(long rows) {
+ // PASS
+ }
+
+ @Override
+ public void seek(PositionProvider position) {
+ // PASS
+ }
+
+ @Override
+ public void seek(PositionProvider[] position) {
+ // PASS
+ }
+
+ @Override
+ Object next(Object previous) {
+ return null;
+ }
+
+ @Override
+ public void nextVector(ColumnVector vector, boolean[] isNull, int size) {
+ vector.noNulls = false;
+ vector.isNull[0] = true;
+ vector.isRepeating = true;
+ }
+ }
+
public static class BooleanTreeReader extends TreeReader {
protected BitFieldReader reader = null;
@@ -322,20 +335,16 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final LongColumnVector result;
- if (previousVector == null) {
- result = new LongColumnVector();
- } else {
- result = (LongColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
reader.nextVector(result, batchSize);
- return result;
}
}
@@ -387,20 +396,16 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final LongColumnVector result;
- if (previousVector == null) {
- result = new LongColumnVector();
- } else {
- result = (LongColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
- reader.nextVector(result, batchSize);
- return result;
+ reader.nextVector(result, result.vector, batchSize);
}
@Override
@@ -473,20 +478,16 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final LongColumnVector result;
- if (previousVector == null) {
- result = new LongColumnVector();
- } else {
- result = (LongColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
- reader.nextVector(result, batchSize);
- return result;
+ reader.nextVector(result, result.vector, batchSize);
}
@Override
@@ -559,20 +560,16 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final LongColumnVector result;
- if (previousVector == null) {
- result = new LongColumnVector();
- } else {
- result = (LongColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
- reader.nextVector(result, batchSize);
- return result;
+ reader.nextVector(result, result.vector, batchSize);
}
@Override
@@ -646,20 +643,16 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final LongColumnVector result;
- if (previousVector == null) {
- result = new LongColumnVector();
- } else {
- result = (LongColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
- reader.nextVector(result, batchSize);
- return result;
+ reader.nextVector(result, result.vector, batchSize);
}
@Override
@@ -719,16 +712,13 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final DoubleColumnVector result;
- if (previousVector == null) {
- result = new DoubleColumnVector();
- } else {
- result = (DoubleColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final DoubleColumnVector result = (DoubleColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
final boolean hasNulls = !result.noNulls;
boolean allNulls = hasNulls;
@@ -768,7 +758,6 @@ public class TreeReaderFactory {
}
result.isRepeating = repeating;
}
- return result;
}
@Override
@@ -832,16 +821,13 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final DoubleColumnVector result;
- if (previousVector == null) {
- result = new DoubleColumnVector();
- } else {
- result = (DoubleColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final DoubleColumnVector result = (DoubleColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
final boolean hasNulls = !result.noNulls;
boolean allNulls = hasNulls;
@@ -881,8 +867,6 @@ public class TreeReaderFactory {
}
result.isRepeating = repeating;
}
-
- return result;
}
@Override
@@ -974,19 +958,15 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final BytesColumnVector result;
- if (previousVector == null) {
- result = new BytesColumnVector();
- } else {
- result = (BytesColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final BytesColumnVector result = (BytesColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
- return result;
}
@Override
@@ -1011,7 +991,6 @@ public class TreeReaderFactory {
private final TimeZone readerTimeZone;
private TimeZone writerTimeZone;
private boolean hasSameTZRules;
- private TimestampWritable scratchTimestampWritable;
TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException {
this(columnId, null, null, null, null, skipCorrupt);
@@ -1115,9 +1094,9 @@ public class TreeReaderFactory {
int newNanos = parseNanos(nanos.next());
// fix the rounding when we divided by 1000.
if (millis >= 0) {
- millis += newNanos / 1000000;
+ millis += newNanos / WriterImpl.NANOS_PER_MILLI;
} else {
- millis -= newNanos / 1000000;
+ millis -= newNanos / WriterImpl.NANOS_PER_MILLI;
}
long offset = 0;
// If reader and writer time zones have different rules, adjust the timezone difference
@@ -1144,31 +1123,45 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final TimestampColumnVector result;
- if (previousVector == null) {
- result = new TimestampColumnVector();
- } else {
- result = (TimestampColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ TimestampColumnVector result = (TimestampColumnVector) previousVector;
+ super.nextVector(previousVector, isNull, batchSize);
- result.reset();
- if (scratchTimestampWritable == null) {
- scratchTimestampWritable = new TimestampWritable();
- }
- Object obj;
for (int i = 0; i < batchSize; i++) {
- obj = next(scratchTimestampWritable);
- if (obj == null) {
- result.noNulls = false;
- result.isNull[i] = true;
- } else {
- TimestampWritable writable = (TimestampWritable) obj;
- result.set(i, writable.getTimestamp());
+ if (result.noNulls || !result.isNull[i]) {
+ long millis = data.next() + base_timestamp;
+ int newNanos = parseNanos(nanos.next());
+ if (millis < 0 && newNanos != 0) {
+ millis -= 1;
+ }
+ millis *= WriterImpl.MILLIS_PER_SECOND;
+ long offset = 0;
+ // If reader and writer time zones have different rules, adjust the timezone difference
+ // between reader and writer taking day light savings into account.
+ if (!hasSameTZRules) {
+ offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis);
+ }
+ long adjustedMillis = millis + offset;
+ // Sometimes the reader timezone might have changed after adding the adjustedMillis.
+ // To account for that change, check for any difference in reader timezone after
+ // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time).
+ if (!hasSameTZRules &&
+ (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) {
+ long newOffset =
+ writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis);
+ adjustedMillis = millis + newOffset;
+ }
+ result.time[i] = adjustedMillis;
+ result.nanos[i] = newNanos;
+ if (result.isRepeating && i != 0 &&
+ (result.time[0] != result.time[i] ||
+ result.nanos[0] != result.nanos[i])) {
+ result.isRepeating = false;
+ }
}
}
-
- return result;
}
private static int parseNanos(long serialized) {
@@ -1253,20 +1246,16 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final LongColumnVector result;
- if (previousVector == null) {
- result = new LongColumnVector();
- } else {
- result = (LongColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
- reader.nextVector(result, batchSize);
- return result;
+ reader.nextVector(result, result.vector, batchSize);
}
@Override
@@ -1278,7 +1267,7 @@ public class TreeReaderFactory {
public static class DecimalTreeReader extends TreeReader {
protected InStream valueStream;
protected IntegerReader scaleReader = null;
- private LongColumnVector scratchScaleVector;
+ private int[] scratchScaleVector;
private final int precision;
private final int scale;
@@ -1293,7 +1282,7 @@ public class TreeReaderFactory {
super(columnId, present);
this.precision = precision;
this.scale = scale;
- this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE];
this.valueStream = valueStream;
if (scaleStream != null && encoding != null) {
checkEncoding(encoding);
@@ -1352,46 +1341,34 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final DecimalColumnVector result;
- if (previousVector == null) {
- result = new DecimalColumnVector(precision, scale);
- } else {
- result = (DecimalColumnVector) previousVector;
- }
-
- // Save the reference for isNull in the scratch vector
- boolean[] scratchIsNull = scratchScaleVector.isNull;
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final DecimalColumnVector result = (DecimalColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
+ if (batchSize > scratchScaleVector.length) {
+ scratchScaleVector = new int[(int) batchSize];
+ }
+ scaleReader.nextVector(result, scratchScaleVector, batchSize);
// Read value entries based on isNull entries
- if (result.isRepeating) {
- if (!result.isNull[0]) {
+ if (result.noNulls) {
+ for (int r=0; r < batchSize; ++r) {
BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
- short scaleInData = (short) scaleReader.next();
- HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
- dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale);
- result.set(0, dec);
+ HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
+ result.set(r, dec);
}
- } else {
- // result vector has isNull values set, use the same to read scale vector.
- scratchScaleVector.isNull = result.isNull;
- scaleReader.nextVector(scratchScaleVector, batchSize);
- for (int i = 0; i < batchSize; i++) {
- if (!result.isNull[i]) {
+ } else if (!result.isRepeating || !result.isNull[0]) {
+ for (int r=0; r < batchSize; ++r) {
+ if (!result.isNull[r]) {
BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
- short scaleInData = (short) scratchScaleVector.vector[i];
- HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
- dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale);
- result.set(i, dec);
+ HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]);
+ result.set(r, dec);
}
}
}
- // Switch back the null vector.
- scratchScaleVector.isNull = scratchIsNull;
- return result;
}
@Override
@@ -1481,8 +1458,10 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- return reader.nextVector(previousVector, batchSize);
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ reader.nextVector(previousVector, isNull, batchSize);
}
@Override
@@ -1501,7 +1480,7 @@ public class TreeReaderFactory {
BytesColumnVector result, final int batchSize) throws IOException {
// Read lengths
scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here...
- lengths.nextVector(scratchlcv, batchSize);
+ lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize);
int totalLength = 0;
if (!scratchlcv.isRepeating) {
for (int i = 0; i < batchSize; i++) {
@@ -1532,31 +1511,35 @@ public class TreeReaderFactory {
}
// This method has the common code for reading in bytes into a BytesColumnVector.
- public static void readOrcByteArrays(InStream stream, IntegerReader lengths,
- LongColumnVector scratchlcv,
- BytesColumnVector result, final int batchSize) throws IOException {
-
- byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize);
-
- // Too expensive to figure out 'repeating' by comparisons.
- result.isRepeating = false;
- int offset = 0;
- if (!scratchlcv.isRepeating) {
- for (int i = 0; i < batchSize; i++) {
- if (!scratchlcv.isNull[i]) {
- result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
- offset += scratchlcv.vector[i];
- } else {
- result.setRef(i, allBytes, 0, 0);
+ public static void readOrcByteArrays(InStream stream,
+ IntegerReader lengths,
+ LongColumnVector scratchlcv,
+ BytesColumnVector result,
+ int batchSize) throws IOException {
+ if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+ byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv,
+ result, (int) batchSize);
+
+ // Too expensive to figure out 'repeating' by comparisons.
+ result.isRepeating = false;
+ int offset = 0;
+ if (!scratchlcv.isRepeating) {
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
+ offset += scratchlcv.vector[i];
+ } else {
+ result.setRef(i, allBytes, 0, 0);
+ }
}
- }
- } else {
- for (int i = 0; i < batchSize; i++) {
- if (!scratchlcv.isNull[i]) {
- result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
- offset += scratchlcv.vector[0];
- } else {
- result.setRef(i, allBytes, 0, 0);
+ } else {
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
+ offset += scratchlcv.vector[0];
+ } else {
+ result.setRef(i, allBytes, 0, 0);
+ }
}
}
}
@@ -1641,19 +1624,16 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final BytesColumnVector result;
- if (previousVector == null) {
- result = new BytesColumnVector();
- } else {
- result = (BytesColumnVector) previousVector;
- }
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final BytesColumnVector result = (BytesColumnVector) previousVector;
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
- BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
- return result;
+ BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv,
+ result, batchSize);
}
@Override
@@ -1816,18 +1796,15 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final BytesColumnVector result;
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ final BytesColumnVector result = (BytesColumnVector) previousVector;
int offset;
int length;
- if (previousVector == null) {
- result = new BytesColumnVector();
- } else {
- result = (BytesColumnVector) previousVector;
- }
// Read present/isNull stream
- super.nextVector(result, batchSize);
+ super.nextVector(result, isNull, batchSize);
if (dictionaryBuffer != null) {
@@ -1838,7 +1815,8 @@ public class TreeReaderFactory {
// Read string offsets
scratchlcv.isNull = result.isNull;
- reader.nextVector(scratchlcv, batchSize);
+ scratchlcv.ensureSize((int) batchSize, false);
+ reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
if (!scratchlcv.isRepeating) {
// The vector has non-repeating strings. Iterate thru the batch
@@ -1878,7 +1856,6 @@ public class TreeReaderFactory {
}
}
}
- return result;
}
int getDictionaryEntryLength(int entry, int offset) {
@@ -1936,11 +1913,13 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
// Get the vector of strings from StringTreeReader, then make a 2nd pass to
// adjust down the length (right trim and truncate) if necessary.
- BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
-
+ super.nextVector(previousVector, isNull, batchSize);
+ BytesColumnVector result = (BytesColumnVector) previousVector;
int adjustedDownLen;
if (result.isRepeating) {
if (result.noNulls || !result.isNull[0]) {
@@ -1973,7 +1952,6 @@ public class TreeReaderFactory {
}
}
}
- return result;
}
}
@@ -2010,10 +1988,13 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
// Get the vector of strings from StringTreeReader, then make a 2nd pass to
// adjust down the length (truncate) if necessary.
- BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
+ super.nextVector(previousVector, isNull, batchSize);
+ BytesColumnVector result = (BytesColumnVector) previousVector;
int adjustedDownLen;
if (result.isRepeating) {
@@ -2045,62 +2026,26 @@ public class TreeReaderFactory {
}
}
}
- return result;
}
}
protected static class StructTreeReader extends TreeReader {
- private final int readColumnCount;
- private final int resultColumnCount;
protected final TreeReader[] fields;
- private final String[] fieldNames;
- protected StructTreeReader(
- int columnId,
- TreeReaderSchema treeReaderSchema,
- boolean[] included,
- boolean skipCorrupt) throws IOException {
+ protected StructTreeReader(int columnId,
+ TypeDescription readerSchema,
+ SchemaEvolution treeReaderSchema,
+ boolean[] included,
+ boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId);
-
- OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId);
+ TypeDescription fileSchema = treeReaderSchema.getFileType(readerSchema);
- readColumnCount = Math.min(fileStructType.getFieldNamesCount(), schemaStructType.getFieldNamesCount());
-
- if (columnId == treeReaderSchema.getInnerStructSubtype()) {
- // If there are more result columns than reader columns, we will default those additional
- // columns to NULL.
- resultColumnCount = schemaStructType.getFieldNamesCount();
- } else {
- resultColumnCount = readColumnCount;
- }
-
- this.fields = new TreeReader[readColumnCount];
- this.fieldNames = new String[readColumnCount];
-
- if (included == null) {
- for (int i = 0; i < readColumnCount; ++i) {
- int subtype = schemaStructType.getSubtypes(i);
- this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
- // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
- this.fieldNames[i] = schemaStructType.getFieldNames(i);
- }
- } else {
- for (int i = 0; i < readColumnCount; ++i) {
- int subtype = schemaStructType.getSubtypes(i);
- if (subtype >= included.length) {
- throw new IOException("subtype " + subtype + " exceeds the included array size " +
- included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() +
- " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() +
- " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype());
- }
- if (included[subtype]) {
- this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
- }
- // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
- this.fieldNames[i] = schemaStructType.getFieldNames(i);
- }
+ List<TypeDescription> childrenTypes = readerSchema.getChildren();
+ this.fields = new TreeReader[childrenTypes.size()];
+ for (int i = 0; i < fields.length; ++i) {
+ TypeDescription subtype = childrenTypes.get(i);
+ this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
}
}
@@ -2120,65 +2065,52 @@ public class TreeReaderFactory {
OrcStruct result = null;
if (valuePresent) {
if (previous == null) {
- result = new OrcStruct(resultColumnCount);
+ result = new OrcStruct(fields.length);
} else {
result = (OrcStruct) previous;
// If the input format was initialized with a file with a
// different number of fields, the number of fields needs to
// be updated to the correct number
- if (result.getNumFields() != resultColumnCount) {
- result.setNumFields(resultColumnCount);
- }
+ result.setNumFields(fields.length);
}
- for (int i = 0; i < readColumnCount; ++i) {
+ for (int i = 0; i < fields.length; ++i) {
if (fields[i] != null) {
result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
}
}
- if (resultColumnCount > readColumnCount) {
- for (int i = readColumnCount; i < resultColumnCount; ++i) {
- // Default new treeReaderSchema evolution fields to NULL.
- result.setFieldValue(i, null);
- }
- }
}
return result;
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- final ColumnVector[] result;
- if (previousVector == null) {
- result = new ColumnVector[readColumnCount];
- } else {
- result = (ColumnVector[]) previousVector;
+ public void nextBatch(VectorizedRowBatch batch,
+ int batchSize) throws IOException {
+ for(int i=0; i < fields.length &&
+ (vectorColumnCount == -1 || i < vectorColumnCount); ++i) {
+ batch.cols[i].reset();
+ batch.cols[i].ensureSize((int) batchSize, false);
+ fields[i].nextVector(batch.cols[i], null, batchSize);
}
+ }
- // Read all the members of struct as column vectors
- for (int i = 0; i < readColumnCount; i++) {
- if (fields[i] != null) {
- if (result[i] == null) {
- result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
- } else {
- fields[i].nextVector(result[i], batchSize);
- }
- }
- }
+ @Override
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ super.nextVector(previousVector, isNull, batchSize);
+ StructColumnVector result = (StructColumnVector) previousVector;
+ if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+ result.isRepeating = false;
- // Default additional treeReaderSchema evolution fields to NULL.
- if (vectorColumnCount != -1 && vectorColumnCount > readColumnCount) {
- for (int i = readColumnCount; i < vectorColumnCount; ++i) {
- ColumnVector colVector = result[i];
- if (colVector != null) {
- colVector.isRepeating = true;
- colVector.noNulls = false;
- colVector.isNull[0] = true;
+ // Read all the members of struct as column vectors
+ boolean[] mask = result.noNulls ? null : result.isNull;
+ for (int f = 0; f < fields.length; f++) {
+ if (fields[f] != null) {
+ fields[f].nextVector(result.fields[f], mask, batchSize);
}
}
}
-
- return result;
}
@Override
@@ -2208,19 +2140,18 @@ public class TreeReaderFactory {
protected final TreeReader[] fields;
protected RunLengthByteReader tags;
- protected UnionTreeReader(int columnId,
- TreeReaderSchema treeReaderSchema,
- boolean[] included,
- boolean skipCorrupt) throws IOException {
- super(columnId);
- OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
- int fieldCount = type.getSubtypesCount();
+ protected UnionTreeReader(int fileColumn,
+ TypeDescription readerSchema,
+ SchemaEvolution treeReaderSchema,
+ boolean[] included,
+ boolean skipCorrupt) throws IOException {
+ super(fileColumn);
+ List<TypeDescription> childrenTypes = readerSchema.getChildren();
+ int fieldCount = childrenTypes.size();
this.fields = new TreeReader[fieldCount];
for (int i = 0; i < fieldCount; ++i) {
- int subtype = type.getSubtypes(i);
- if (included == null || included[subtype]) {
- this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
- }
+ TypeDescription subtype = childrenTypes.get(i);
+ this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
}
}
@@ -2252,9 +2183,25 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previousVector, final int batchSize) throws IOException {
- throw new UnsupportedOperationException(
- "NextVector is not supported operation for Union type");
+ public void nextVector(ColumnVector previousVector,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ UnionColumnVector result = (UnionColumnVector) previousVector;
+ super.nextVector(result, isNull, batchSize);
+ if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+ result.isRepeating = false;
+ tags.nextVector(result.noNulls ? null : result.isNull, result.tags,
+ batchSize);
+ boolean[] ignore = new boolean[(int) batchSize];
+ for (int f = 0; f < result.fields.length; ++f) {
+ // build the ignore list for this tag
+ for (int r = 0; r < batchSize; ++r) {
+ ignore[r] = (!result.noNulls && result.isNull[r]) ||
+ result.tags[r] != f;
+ }
+ fields[f].nextVector(result.fields[f], ignore, batchSize);
+ }
+ }
}
@Override
@@ -2288,13 +2235,15 @@ public class TreeReaderFactory {
protected final TreeReader elementReader;
protected IntegerReader lengths = null;
- protected ListTreeReader(int columnId,
- TreeReaderSchema treeReaderSchema,
- boolean[] included,
- boolean skipCorrupt) throws IOException {
- super(columnId);
- OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
- elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt);
+ protected ListTreeReader(int fileColumn,
+ TypeDescription readerSchema,
+ SchemaEvolution treeReaderSchema,
+ boolean[] included,
+ boolean skipCorrupt) throws IOException {
+ super(fileColumn);
+ TypeDescription elementType = readerSchema.getChildren().get(0);
+ elementReader = createTreeReader(elementType, treeReaderSchema, included,
+ skipCorrupt);
}
@Override
@@ -2335,9 +2284,27 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previous, final int batchSize) throws IOException {
- throw new UnsupportedOperationException(
- "NextVector is not supported operation for List type");
+ public void nextVector(ColumnVector previous,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ ListColumnVector result = (ListColumnVector) previous;
+ super.nextVector(result, isNull, batchSize);
+ // if we have some none-null values, then read them
+ if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+ lengths.nextVector(result, result.lengths, batchSize);
+ // even with repeating lengths, the list doesn't repeat
+ result.isRepeating = false;
+ // build the offsets vector and figure out how many children to read
+ result.childCount = 0;
+ for (int r = 0; r < batchSize; ++r) {
+ if (result.noNulls || !result.isNull[r]) {
+ result.offsets[r] = result.childCount;
+ result.childCount += result.lengths[r];
+ }
+ }
+ result.child.ensureSize(result.childCount, false);
+ elementReader.nextVector(result.child, null, result.childCount);
+ }
}
@Override
@@ -2378,24 +2345,16 @@ public class TreeReaderFactory {
protected final TreeReader valueReader;
protected IntegerReader lengths = null;
- protected MapTreeReader(int columnId,
- TreeReaderSchema treeReaderSchema,
- boolean[] included,
- boolean skipCorrupt) throws IOException {
- super(columnId);
- OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
- int keyColumn = type.getSubtypes(0);
- int valueColumn = type.getSubtypes(1);
- if (included == null || included[keyColumn]) {
- keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt);
- } else {
- keyReader = null;
- }
- if (included == null || included[valueColumn]) {
- valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt);
- } else {
- valueReader = null;
- }
+ protected MapTreeReader(int fileColumn,
+ TypeDescription readerSchema,
+ SchemaEvolution treeReaderSchema,
+ boolean[] included,
+ boolean skipCorrupt) throws IOException {
+ super(fileColumn);
+ TypeDescription keyType = readerSchema.getChildren().get(0);
+ TypeDescription valueType = readerSchema.getChildren().get(1);
+ keyReader = createTreeReader(keyType, treeReaderSchema, included, skipCorrupt);
+ valueReader = createTreeReader(valueType, treeReaderSchema, included, skipCorrupt);
}
@Override
@@ -2429,9 +2388,28 @@ public class TreeReaderFactory {
}
@Override
- public Object nextVector(Object previous, final int batchSize) throws IOException {
- throw new UnsupportedOperationException(
- "NextVector is not supported operation for Map type");
+ public void nextVector(ColumnVector previous,
+ boolean[] isNull,
+ int batchSize) throws IOException {
+ MapColumnVector result = (MapColumnVector) previous;
+ super.nextVector(result, isNull, batchSize);
+ if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
+ lengths.nextVector(result, result.lengths, batchSize);
+ // even with repeating lengths, the map doesn't repeat
+ result.isRepeating = false;
+ // build the offsets vector and figure out how many children to read
+ result.childCount = 0;
+ for (int r = 0; r < batchSize; ++r) {
+ if (result.noNulls || !result.isNull[r]) {
+ result.offsets[r] = result.childCount;
+ result.childCount += result.lengths[r];
+ }
+ }
+ result.keys.ensureSize(result.childCount, false);
+ result.values.ensureSize(result.childCount, false);
+ keyReader.nextVector(result.keys, null, result.childCount);
+ valueReader.nextVector(result.values, null, result.childCount);
+ }
}
@Override
@@ -2471,61 +2449,61 @@ public class TreeReaderFactory {
}
}
- public static TreeReader createTreeReader(int columnId,
- TreeReaderSchema treeReaderSchema,
- boolean[] included,
- boolean skipCorrupt
- ) throws IOException {
- OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
- switch (type.getKind()) {
+ public static TreeReader createTreeReader(TypeDescription readerType,
+ SchemaEvolution evolution,
+ boolean[] included,
+ boolean skipCorrupt
+ ) throws IOException {
+ TypeDescription fileType = evolution.getFileType(readerType);
+ if (fileType == null ||
+ (included != null && !included[readerType.getId()])) {
+ return new NullTreeReader(0);
+ }
+ switch (readerType.getCategory()) {
case BOOLEAN:
- return new BooleanTreeReader(columnId);
+ return new BooleanTreeReader(fileType.getId());
case BYTE:
- return new ByteTreeReader(columnId);
+ return new ByteTreeReader(fileType.getId());
case DOUBLE:
- return new DoubleTreeReader(columnId);
+ return new DoubleTreeReader(fileType.getId());
case FLOAT:
- return new FloatTreeReader(columnId);
+ return new FloatTreeReader(fileType.getId());
case SHORT:
- return new ShortTreeReader(columnId);
+ return new ShortTreeReader(fileType.getId());
case INT:
- return new IntTreeReader(columnId);
+ return new IntTreeReader(fileType.getId());
case LONG:
- return new LongTreeReader(columnId, skipCorrupt);
+ return new LongTreeReader(fileType.getId(), skipCorrupt);
case STRING:
- return new StringTreeReader(columnId);
+ return new StringTreeReader(fileType.getId());
case CHAR:
- if (!type.hasMaximumLength()) {
- throw new IllegalArgumentException("ORC char type has no length specified");
- }
- return new CharTreeReader(columnId, type.getMaximumLength());
+ return new CharTreeReader(fileType.getId(), readerType.getMaxLength());
case VARCHAR:
- if (!type.hasMaximumLength()) {
- throw new IllegalArgumentException("ORC varchar type has no length specified");
- }
- return new VarcharTreeReader(columnId, type.getMaximumLength());
+ return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength());
case BINARY:
- return new BinaryTreeReader(columnId);
+ return new BinaryTreeReader(fileType.getId());
case TIMESTAMP:
- return new TimestampTreeReader(columnId, skipCorrupt);
+ return new TimestampTreeReader(fileType.getId(), skipCorrupt);
case DATE:
- return new DateTreeReader(columnId);
+ return new DateTreeReader(fileType.getId());
case DECIMAL:
- int precision =
- type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION;
- int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
- return new DecimalTreeReader(columnId, precision, scale);
+ return new DecimalTreeReader(fileType.getId(), readerType.getPrecision(),
+ readerType.getScale());
case STRUCT:
- return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+ return new StructTreeReader(fileType.getId(), readerType,
+ evolution, included, skipCorrupt);
case LIST:
- return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+ return new ListTreeReader(fileType.getId(), readerType,
+ evolution, included, skipCorrupt);
case MAP:
- return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+ return new MapTreeReader(fileType.getId(), readerType, evolution,
+ included, skipCorrupt);
case UNION:
- return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
+ return new UnionTreeReader(fileType.getId(), readerType,
+ evolution, included, skipCorrupt);
default:
throw new IllegalArgumentException("Unsupported type " +
- type.getKind());
+ readerType.getCategory());
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index 816b52d..e4d2e6e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -71,14 +71,29 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf);
}
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
/**
* Do we have schema on read in the configuration variables?
*/
- TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ false);
-
List<OrcProto.Type> types = file.getTypes();
- Reader.Options options = new Reader.Options();
- options.schema(schema);
+ int dataColumns = rbCtx.getDataColumnCount();
+ TypeDescription schema =
+ OrcInputFormat.getDesiredRowTypeDescr(conf, false, dataColumns);
+ if (schema == null) {
+ schema = file.getSchema();
+ // Even if the user isn't doing schema evolution, cut the schema
+ // to the desired size.
+ if (schema.getCategory() == TypeDescription.Category.STRUCT &&
+ schema.getChildren().size() > dataColumns) {
+ schema = schema.clone();
+ List<TypeDescription> children = schema.getChildren();
+ for(int c = children.size() - 1; c >= dataColumns; --c) {
+ children.remove(c);
+ }
+ }
+ }
+ Reader.Options options = new Reader.Options().schema(schema);
+
this.offset = fileSplit.getStart();
this.length = fileSplit.getLength();
options.range(offset, length);
@@ -87,8 +102,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
this.reader = file.rowsOptions(options);
- rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
-
columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf);
int partitionColumnCount = rbCtx.getPartitionColumnCount();
@@ -103,9 +116,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
@Override
public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
- if (!reader.hasNext()) {
- return false;
- }
try {
// Check and update partition cols if necessary. Ideally, this should be done
// in CreateValue as the partition is constant per split. But since Hive uses
@@ -118,7 +128,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
}
addPartitionCols = false;
}
- reader.nextBatch(value);
+ if (!reader.nextBatch(value)) {
+ return false;
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 70fe803..8e52907 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -101,8 +101,6 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
}
}
- private static final long NANOS_PER_MILLI = 1000000;
-
/**
* Set the value for a given column value within a batch.
* @param rowId the row to set