You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/04/20 22:05:22 UTC
[05/30] hive git commit: Revert "HIVE-12159: Create vectorized
readers for the complex types (Owen O'Malley, reviewed by Matt McCline)"
Revert "HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline)"
This reverts commit 0dd4621f34f6043071474220a082268cda124b9d.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d559b347
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d559b347
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d559b347
Branch: refs/heads/llap
Commit: d559b34755010b5ed3ecc31fa423d01788e5e875
Parents: 40e0c38
Author: Matt McCline <mm...@hortonworks.com>
Authored: Fri Apr 15 16:00:18 2016 -0700
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Fri Apr 15 16:00:18 2016 -0700
----------------------------------------------------------------------
.../llap/io/decode/OrcEncodedDataConsumer.java | 45 +-
orc/src/java/org/apache/orc/OrcUtils.java | 75 -
orc/src/java/org/apache/orc/Reader.java | 6 -
orc/src/java/org/apache/orc/RecordReader.java | 8 +-
.../java/org/apache/orc/TypeDescription.java | 62 +-
.../org/apache/orc/impl/BitFieldReader.java | 5 +-
.../java/org/apache/orc/impl/IntegerReader.java | 26 +-
.../apache/orc/impl/RunLengthByteReader.java | 36 +-
.../apache/orc/impl/RunLengthIntegerReader.java | 31 +-
.../orc/impl/RunLengthIntegerReaderV2.java | 33 +-
.../java/org/apache/orc/impl/WriterImpl.java | 47 +-
.../ql/exec/vector/VectorizedRowBatchCtx.java | 13 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 43 +-
.../hive/ql/io/orc/OrcRawRecordMerger.java | 3 +-
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 12 +-
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 50 +-
.../hadoop/hive/ql/io/orc/SchemaEvolution.java | 234 ++-
.../hive/ql/io/orc/TreeReaderFactory.java | 838 ++++-----
.../ql/io/orc/VectorizedOrcInputFormat.java | 32 +-
.../hadoop/hive/ql/io/orc/WriterImpl.java | 2 +
.../hive/ql/io/orc/TestTypeDescription.java | 4 +-
.../hive/ql/io/orc/TestVectorOrcFile.java | 1634 +++++++++---------
.../hive/ql/io/orc/TestVectorizedORCReader.java | 7 +-
.../hive/ql/exec/vector/BytesColumnVector.java | 11 -
.../ql/exec/vector/TimestampColumnVector.java | 2 +-
.../hive/ql/exec/vector/UnionColumnVector.java | 2 +
26 files changed, 1476 insertions(+), 1785 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index baaa4d7..7ee263d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.llap.io.decode;
import java.io.IOException;
-import java.util.List;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -28,12 +27,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionCodec;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
@@ -77,35 +71,6 @@ public class OrcEncodedDataConsumer
stripes[m.getStripeIx()] = m;
}
- private static ColumnVector createColumn(OrcProto.Type type,
- int batchSize) {
- switch (type.getKind()) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case DATE:
- return new LongColumnVector(batchSize);
- case FLOAT:
- case DOUBLE:
- return new DoubleColumnVector(batchSize);
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- return new BytesColumnVector(batchSize);
- case TIMESTAMP:
- return new TimestampColumnVector(batchSize);
- case DECIMAL:
- return new DecimalColumnVector(batchSize, type.getPrecision(),
- type.getScale());
- default:
- throw new IllegalArgumentException("LLAP does not support " +
- type.getKind());
- }
- }
-
@Override
protected void decodeBatch(OrcEncodedColumnBatch batch,
Consumer<ColumnVectorBatch> downstreamConsumer) {
@@ -147,15 +112,9 @@ public class OrcEncodedDataConsumer
ColumnVectorBatch cvb = cvbPool.take();
assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split.
cvb.size = batchSize;
- List<OrcProto.Type> types = fileMetadata.getTypes();
- int[] columnMapping = batch.getColumnIxs();
+
for (int idx = 0; idx < batch.getColumnIxs().length; idx++) {
- if (cvb.cols[idx] == null) {
- // skip over the top level struct, but otherwise assume no complex
- // types
- cvb.cols[idx] = createColumn(types.get(columnMapping[idx]), batchSize);
- }
- columnReaders[idx].nextVector(cvb.cols[idx], null, batchSize);
+ cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize);
}
// we are done reading a batch, send it to consumer for processing
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java
index 2ebe9a7..2e93254 100644
--- a/orc/src/java/org/apache/orc/OrcUtils.java
+++ b/orc/src/java/org/apache/orc/OrcUtils.java
@@ -449,79 +449,4 @@ public class OrcUtils {
return columnId;
}
- /**
- * Translate the given rootColumn from the list of types to a TypeDescription.
- * @param types all of the types
- * @param rootColumn translate this type
- * @return a new TypeDescription that matches the given rootColumn
- */
- public static
- TypeDescription convertTypeFromProtobuf(List<OrcProto.Type> types,
- int rootColumn) {
- OrcProto.Type type = types.get(rootColumn);
- switch (type.getKind()) {
- case BOOLEAN:
- return TypeDescription.createBoolean();
- case BYTE:
- return TypeDescription.createByte();
- case SHORT:
- return TypeDescription.createShort();
- case INT:
- return TypeDescription.createInt();
- case LONG:
- return TypeDescription.createLong();
- case FLOAT:
- return TypeDescription.createFloat();
- case DOUBLE:
- return TypeDescription.createDouble();
- case STRING:
- return TypeDescription.createString();
- case CHAR:
- return TypeDescription.createChar()
- .withMaxLength(type.getMaximumLength());
- case VARCHAR:
- return TypeDescription.createVarchar()
- .withMaxLength(type.getMaximumLength());
- case BINARY:
- return TypeDescription.createBinary();
- case TIMESTAMP:
- return TypeDescription.createTimestamp();
- case DATE:
- return TypeDescription.createDate();
- case DECIMAL: {
- TypeDescription result = TypeDescription.createDecimal();
- if (type.hasScale()) {
- result.withScale(type.getScale());
- }
- if (type.hasPrecision()) {
- result.withPrecision(type.getPrecision());
- }
- return result;
- }
- case LIST:
- return TypeDescription.createList(
- convertTypeFromProtobuf(types, type.getSubtypes(0)));
- case MAP:
- return TypeDescription.createMap(
- convertTypeFromProtobuf(types, type.getSubtypes(0)),
- convertTypeFromProtobuf(types, type.getSubtypes(1)));
- case STRUCT: {
- TypeDescription result = TypeDescription.createStruct();
- for(int f=0; f < type.getSubtypesCount(); ++f) {
- result.addField(type.getFieldNames(f),
- convertTypeFromProtobuf(types, type.getSubtypes(f)));
- }
- return result;
- }
- case UNION: {
- TypeDescription result = TypeDescription.createUnion();
- for(int f=0; f < type.getSubtypesCount(); ++f) {
- result.addUnionChild(
- convertTypeFromProtobuf(types, type.getSubtypes(f)));
- }
- return result;
- }
- }
- throw new IllegalArgumentException("Unknown ORC type " + type.getKind());
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/Reader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/Reader.java b/orc/src/java/org/apache/orc/Reader.java
index 62a05e9..be722b5 100644
--- a/orc/src/java/org/apache/orc/Reader.java
+++ b/orc/src/java/org/apache/orc/Reader.java
@@ -116,15 +116,9 @@ public interface Reader {
ColumnStatistics[] getStatistics();
/**
- * Get the type of rows in this ORC file.
- */
- TypeDescription getSchema();
-
- /**
* Get the list of types contained in the file. The root type is the first
* type in the list.
* @return the list of flattened types
- * @deprecated use getSchema instead
*/
List<OrcProto.Type> getTypes();
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/RecordReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/RecordReader.java b/orc/src/java/org/apache/orc/RecordReader.java
index 09ba0f0..7229dda 100644
--- a/orc/src/java/org/apache/orc/RecordReader.java
+++ b/orc/src/java/org/apache/orc/RecordReader.java
@@ -30,11 +30,13 @@ public interface RecordReader {
* controlled by the callers. Caller need to look at
* VectorizedRowBatch.size of the retunred object to know the batch
* size read.
- * @param batch a row batch object to read into
- * @return were more rows available to read?
+ * @param previousBatch a row batch object that can be reused by the reader
+ * @return the row batch that was read. The batch will have a non-zero row
+ * count if the pointer isn't at the end of the file
* @throws java.io.IOException
*/
- boolean nextBatch(VectorizedRowBatch batch) throws IOException;
+ VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch
+ ) throws IOException;
/**
* Get the row number of the row that will be returned by the following
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/TypeDescription.java b/orc/src/java/org/apache/orc/TypeDescription.java
index b8e057e..bd900ac 100644
--- a/orc/src/java/org/apache/orc/TypeDescription.java
+++ b/orc/src/java/org/apache/orc/TypeDescription.java
@@ -61,7 +61,7 @@ public class TypeDescription {
LIST("array", false),
MAP("map", false),
STRUCT("struct", false),
- UNION("uniontype", false);
+ UNION("union", false);
Category(String name, boolean isPrimitive) {
this.name = name;
@@ -258,66 +258,6 @@ public class TypeDescription {
return id;
}
- public TypeDescription clone() {
- TypeDescription result = new TypeDescription(category);
- result.maxLength = maxLength;
- result.precision = precision;
- result.scale = scale;
- if (fieldNames != null) {
- result.fieldNames.addAll(fieldNames);
- }
- if (children != null) {
- for(TypeDescription child: children) {
- TypeDescription clone = child.clone();
- clone.parent = result;
- result.children.add(clone);
- }
- }
- return result;
- }
-
- @Override
- public int hashCode() {
- return getId();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || other.getClass() != TypeDescription.class) {
- return false;
- }
- if (other == this) {
- return true;
- }
- TypeDescription castOther = (TypeDescription) other;
- if (category != castOther.category ||
- getId() != castOther.getId() ||
- getMaximumId() != castOther.getMaximumId() ||
- maxLength != castOther.maxLength ||
- scale != castOther.scale ||
- precision != castOther.precision) {
- return false;
- }
- if (children != null) {
- if (children.size() != castOther.children.size()) {
- return false;
- }
- for (int i = 0; i < children.size(); ++i) {
- if (!children.get(i).equals(castOther.children.get(i))) {
- return false;
- }
- }
- }
- if (category == Category.STRUCT) {
- for(int i=0; i < fieldNames.size(); ++i) {
- if (!fieldNames.get(i).equals(castOther.fieldNames.get(i))) {
- return false;
- }
- }
- }
- return true;
- }
-
/**
* Get the maximum id assigned to this type or its children.
* The first call will cause all of the the ids in tree to be assigned, so
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/BitFieldReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/BitFieldReader.java b/orc/src/java/org/apache/orc/impl/BitFieldReader.java
index dda7355..8d9d3cb 100644
--- a/orc/src/java/org/apache/orc/impl/BitFieldReader.java
+++ b/orc/src/java/org/apache/orc/impl/BitFieldReader.java
@@ -137,7 +137,7 @@ public class BitFieldReader {
long previousLen) throws IOException {
previous.isRepeating = true;
for (int i = 0; i < previousLen; i++) {
- if (previous.noNulls || !previous.isNull[i]) {
+ if (!previous.isNull[i]) {
previous.vector[i] = next();
} else {
// The default value of null for int types in vectorized
@@ -150,8 +150,7 @@ public class BitFieldReader {
// when determining the isRepeating flag.
if (previous.isRepeating
&& i > 0
- && ((previous.vector[0] != previous.vector[i]) ||
- (previous.isNull[0] != previous.isNull[i]))) {
+ && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
previous.isRepeating = false;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
index 8bef0f1..7dfd289 100644
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
import java.io.IOException;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
* Interface for reading integers.
@@ -57,25 +57,9 @@ public interface IntegerReader {
/**
* Return the next available vector for values.
- * @param column the column being read
- * @param data the vector to read into
- * @param length the number of numbers to read
- * @throws IOException
- */
- void nextVector(ColumnVector column,
- long[] data,
- int length
- ) throws IOException;
-
- /**
- * Return the next available vector for values. Does not change the
- * value of column.isRepeating.
- * @param column the column being read
- * @param data the vector to read into
- * @param length the number of numbers to read
+ * @return
* @throws IOException
*/
- void nextVector(ColumnVector column,
- int[] data,
- int length
- ) throws IOException;}
+ void nextVector(LongColumnVector previous, final int previousLen)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
index 24bd051..380f3391 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
import java.io.EOFException;
import java.io.IOException;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
* A reader that reads a sequence of bytes. A control byte is read before
@@ -92,16 +92,16 @@ public class RunLengthByteReader {
return result;
}
- public void nextVector(ColumnVector previous, long[] data, long size)
+ public void nextVector(LongColumnVector previous, long previousLen)
throws IOException {
previous.isRepeating = true;
- for (int i = 0; i < size; i++) {
+ for (int i = 0; i < previousLen; i++) {
if (!previous.isNull[i]) {
- data[i] = next();
+ previous.vector[i] = next();
} else {
// The default value of null for int types in vectorized
// processing is 1, so set that if the value is null
- data[i] = 1;
+ previous.vector[i] = 1;
}
// The default value for nulls in Vectorization for int types is 1
@@ -109,36 +109,12 @@ public class RunLengthByteReader {
// when determining the isRepeating flag.
if (previous.isRepeating
&& i > 0
- && ((data[0] != data[i]) ||
- (previous.isNull[0] != previous.isNull[i]))) {
+ && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
previous.isRepeating = false;
}
}
}
- /**
- * Read the next size bytes into the data array, skipping over any slots
- * where isNull is true.
- * @param isNull if non-null, skip any rows where isNull[r] is true
- * @param data the array to read into
- * @param size the number of elements to read
- * @throws IOException
- */
- public void nextVector(boolean[] isNull, int[] data,
- long size) throws IOException {
- if (isNull == null) {
- for(int i=0; i < size; ++i) {
- data[i] = next();
- }
- } else {
- for(int i=0; i < size; ++i) {
- if (!isNull[i]) {
- data[i] = next();
- }
- }
- }
- }
-
public void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
index b91a263..0c90cde 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
@@ -20,7 +20,7 @@ package org.apache.orc.impl;
import java.io.EOFException;
import java.io.IOException;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
* A reader that reads a sequence of integers.
@@ -99,17 +99,15 @@ public class RunLengthIntegerReader implements IntegerReader {
}
@Override
- public void nextVector(ColumnVector previous,
- long[] data,
- int previousLen) throws IOException {
+ public void nextVector(LongColumnVector previous, final int previousLen) throws IOException {
previous.isRepeating = true;
for (int i = 0; i < previousLen; i++) {
if (!previous.isNull[i]) {
- data[i] = next();
+ previous.vector[i] = next();
} else {
// The default value of null for int type in vectorized
// processing is 1, so set that if the value is null
- data[i] = 1;
+ previous.vector[i] = 1;
}
// The default value for nulls in Vectorization for int types is 1
@@ -117,32 +115,13 @@ public class RunLengthIntegerReader implements IntegerReader {
// when determining the isRepeating flag.
if (previous.isRepeating
&& i > 0
- && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) {
+ && (previous.vector[i - 1] != previous.vector[i] || previous.isNull[i - 1] != previous.isNull[i])) {
previous.isRepeating = false;
}
}
}
@Override
- public void nextVector(ColumnVector vector,
- int[] data,
- int size) throws IOException {
- if (vector.noNulls) {
- for(int r=0; r < data.length && r < size; ++r) {
- data[r] = (int) next();
- }
- } else if (!(vector.isRepeating && vector.isNull[0])) {
- for(int r=0; r < data.length && r < size; ++r) {
- if (!vector.isNull[r]) {
- data[r] = (int) next();
- } else {
- data[r] = 1;
- }
- }
- }
- }
-
- @Override
public void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
index 610d9b5..c6d685a 100644
--- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -21,9 +21,9 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
* A reader that reads a sequence of light weight compressed integers. Refer
@@ -360,17 +360,15 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
}
@Override
- public void nextVector(ColumnVector previous,
- long[] data,
- int previousLen) throws IOException {
+ public void nextVector(LongColumnVector previous, final int previousLen) throws IOException {
previous.isRepeating = true;
for (int i = 0; i < previousLen; i++) {
if (!previous.isNull[i]) {
- data[i] = next();
+ previous.vector[i] = next();
} else {
// The default value of null for int type in vectorized
// processing is 1, so set that if the value is null
- data[i] = 1;
+ previous.vector[i] = 1;
}
// The default value for nulls in Vectorization for int types is 1
@@ -378,29 +376,10 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
// when determining the isRepeating flag.
if (previous.isRepeating
&& i > 0
- && (data[0] != data[i] ||
- previous.isNull[0] != previous.isNull[i])) {
+ && (previous.vector[i - 1] != previous.vector[i] ||
+ previous.isNull[i - 1] != previous.isNull[i])) {
previous.isRepeating = false;
}
}
}
-
- @Override
- public void nextVector(ColumnVector vector,
- int[] data,
- int size) throws IOException {
- if (vector.noNulls) {
- for(int r=0; r < data.length && r < size; ++r) {
- data[r] = (int) next();
- }
- } else if (!(vector.isRepeating && vector.isNull[0])) {
- for(int r=0; r < data.length && r < size; ++r) {
- if (!vector.isNull[r]) {
- data[r] = (int) next();
- } else {
- data[r] = 1;
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index b2966e0..f8afe06 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -1693,10 +1693,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
- public static long NANOS_PER_MILLI = 1000000;
public static final int MILLIS_PER_SECOND = 1000;
static final int NANOS_PER_SECOND = 1000000000;
+ static final int MILLIS_PER_NANO = 1000000;
public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
private static class TimestampTreeWriter extends TreeWriter {
@@ -2262,36 +2261,32 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
} else {
// write the records in runs of the same tag
- int[] currentStart = new int[vec.fields.length];
- int[] currentLength = new int[vec.fields.length];
+ byte prevTag = 0;
+ int currentRun = 0;
+ boolean started = false;
for(int i=0; i < length; ++i) {
- // only need to deal with the non-nulls, since the nulls were dealt
- // with in the super method.
- if (vec.noNulls || !vec.isNull[i + offset]) {
+ if (!vec.isNull[i + offset]) {
byte tag = (byte) vec.tags[offset + i];
tags.write(tag);
- if (currentLength[tag] == 0) {
- // start a new sequence
- currentStart[tag] = i + offset;
- currentLength[tag] = 1;
- } else if (currentStart[tag] + currentLength[tag] == i + offset) {
- // ok, we are extending the current run for that tag.
- currentLength[tag] += 1;
- } else {
- // otherwise, we need to close off the old run and start a new one
- childrenWriters[tag].writeBatch(vec.fields[tag],
- currentStart[tag], currentLength[tag]);
- currentStart[tag] = i + offset;
- currentLength[tag] = 1;
+ if (!started) {
+ started = true;
+ currentRun = i;
+ prevTag = tag;
+ } else if (tag != prevTag) {
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, i - currentRun);
+ currentRun = i;
+ prevTag = tag;
}
+ } else if (started) {
+ started = false;
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, i - currentRun);
}
}
- // write out any left over sequences
- for(int tag=0; tag < currentStart.length; ++tag) {
- if (currentLength[tag] != 0) {
- childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag],
- currentLength[tag]);
- }
+ if (started) {
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, length - currentRun);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index 82a97e0..0724191 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@ -215,9 +215,12 @@ public class VectorizedRowBatchCtx {
LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated));
int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
- for (int i = 0; i < dataColumnCount; i++) {
- TypeInfo typeInfo = rowColumnTypeInfos[i];
- result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+
+ for (int i = 0; i < columnsToIncludeTruncated.length; i++) {
+ if (columnsToIncludeTruncated[i]) {
+ TypeInfo typeInfo = rowColumnTypeInfos[i];
+ result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+ }
}
for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) {
@@ -473,8 +476,8 @@ public class VectorizedRowBatchCtx {
bcv.isNull[0] = true;
bcv.isRepeating = true;
} else {
- bcv.setVal(0, sVal.getBytes());
- bcv.isRepeating = true;
+ bcv.fill(sVal.getBytes());
+ bcv.isNull[0] = false;
}
}
break;
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index fcb8ca4..fe0be7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -301,7 +301,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
/**
* Do we have schema on read in the configuration variables?
*/
- TypeDescription schema = getDesiredRowTypeDescr(conf, false, Integer.MAX_VALUE);
+ TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ false);
Reader.Options options = new Reader.Options().range(offset, length);
options.schema(schema);
@@ -1743,7 +1743,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
/**
* Do we have schema on read in the configuration variables?
*/
- TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+ TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ true);
final Reader reader;
final int bucket;
@@ -1994,13 +1994,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
/**
* Convert a Hive type property string that contains separated type names into a list of
* TypeDescription objects.
- * @param hiveTypeProperty the desired types from hive
- * @param maxColumns the maximum number of desired columns
* @return the list of TypeDescription objects.
*/
- public static ArrayList<TypeDescription>
- typeDescriptionsFromHiveTypeProperty(String hiveTypeProperty,
- int maxColumns) {
+ public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
+ String hiveTypeProperty) {
// CONSDIER: We need a type name parser for TypeDescription.
@@ -2008,9 +2005,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
for (TypeInfo typeInfo : typeInfoList) {
typeDescrList.add(convertTypeInfo(typeInfo));
- if (typeDescrList.size() >= maxColumns) {
- break;
- }
}
return typeDescrList;
}
@@ -2097,18 +2091,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
- /**
- * Generate the desired schema for reading the file.
- * @param conf the configuration
- * @param isAcidRead is this an acid format?
- * @param dataColumns the desired number of data columns for vectorized read
- * @return the desired schema or null if schema evolution isn't enabled
- * @throws IOException
- */
- public static TypeDescription getDesiredRowTypeDescr(Configuration conf,
- boolean isAcidRead,
- int dataColumns
- ) throws IOException {
+ public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcidRead)
+ throws IOException {
String columnNameProperty = null;
String columnTypeProperty = null;
@@ -2131,10 +2115,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
haveSchemaEvolutionProperties = false;
} else {
schemaEvolutionTypeDescrs =
- typeDescriptionsFromHiveTypeProperty(columnTypeProperty,
- dataColumns);
- if (schemaEvolutionTypeDescrs.size() !=
- Math.min(dataColumns, schemaEvolutionColumnNames.size())) {
+ typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+ if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
haveSchemaEvolutionProperties = false;
}
}
@@ -2165,9 +2147,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
return null;
}
schemaEvolutionTypeDescrs =
- typeDescriptionsFromHiveTypeProperty(columnTypeProperty, dataColumns);
- if (schemaEvolutionTypeDescrs.size() !=
- Math.min(dataColumns, schemaEvolutionColumnNames.size())) {
+ typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+ if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
return null;
}
@@ -2181,7 +2162,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
columnNum++;
}
- if (virtualColumnClipNum != -1 && virtualColumnClipNum < dataColumns) {
+ if (virtualColumnClipNum != -1) {
schemaEvolutionColumnNames =
Lists.newArrayList(schemaEvolutionColumnNames.subList(0, virtualColumnClipNum));
schemaEvolutionTypeDescrs = Lists.newArrayList(schemaEvolutionTypeDescrs.subList(0, virtualColumnClipNum));
@@ -2198,7 +2179,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// Desired schema does not include virtual columns or partition columns.
TypeDescription result = TypeDescription.createStruct();
- for (int i = 0; i < schemaEvolutionTypeDescrs.size(); i++) {
+ for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 0dd58b7..1fce282 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -447,8 +447,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.length = options.getLength();
this.validTxnList = validTxnList;
- TypeDescription typeDescr =
- OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE);
+ TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ true);
objectInspector = OrcRecordUpdater.createEventSchema
(OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 0bcf9e3..a031a92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -26,8 +26,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.TypeDescription;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.impl.ColumnStatisticsImpl;
@@ -73,7 +71,6 @@ public class ReaderImpl implements Reader {
private final List<OrcProto.StripeStatistics> stripeStats;
private final int metadataSize;
protected final List<OrcProto.Type> types;
- private final TypeDescription schema;
private final List<OrcProto.UserMetadataItem> userMetadata;
private final List<OrcProto.ColumnStatistics> fileStats;
private final List<StripeInformation> stripes;
@@ -246,11 +243,6 @@ public class ReaderImpl implements Reader {
return result;
}
- @Override
- public TypeDescription getSchema() {
- return schema;
- }
-
/**
* Ensure this is an ORC file to prevent users from trying to read text
* files or RC files as ORC files.
@@ -394,9 +386,7 @@ public class ReaderImpl implements Reader {
this.writerVersion = footerMetaData.writerVersion;
this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList());
}
- this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0);
}
-
/**
* Get the WriterVersion based on the ORC file postscript.
* @param writerVersion the integer writer version
@@ -678,7 +668,7 @@ public class ReaderImpl implements Reader {
options.include(include);
}
return new RecordReaderImpl(this.getStripes(), fileSystem, path,
- options, schema, types, codec, bufferSize, rowIndexStride, conf);
+ options, types, codec, bufferSize, rowIndexStride, conf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index c214658..3975409 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -27,8 +27,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.TypeDescription;
+import org.apache.orc.OrcUtils;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.impl.ColumnStatisticsImpl;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -96,6 +98,7 @@ public class RecordReaderImpl implements RecordReader {
private final SargApplier sargApp;
// an array about which row groups aren't skipped
private boolean[] includedRowGroups = null;
+ private final Configuration conf;
private final MetadataReader metadata;
private final DataReader dataReader;
@@ -142,33 +145,33 @@ public class RecordReaderImpl implements RecordReader {
FileSystem fileSystem,
Path path,
Reader.Options options,
- TypeDescription fileSchema,
List<OrcProto.Type> types,
CompressionCodec codec,
int bufferSize,
long strideRate,
Configuration conf
) throws IOException {
- SchemaEvolution treeReaderSchema;
- this.included = options.getInclude();
- included[0] = true;
+
+ TreeReaderFactory.TreeReaderSchema treeReaderSchema;
if (options.getSchema() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("Schema on read not provided -- using file schema " + types.toString());
}
- treeReaderSchema = new SchemaEvolution(fileSchema, included);
+ treeReaderSchema = new TreeReaderFactory.TreeReaderSchema().fileTypes(types).schemaTypes(types);
} else {
// Now that we are creating a record reader for a file, validate that the schema to read
// is compatible with the file schema.
//
- treeReaderSchema = new SchemaEvolution(fileSchema, options.getSchema(),
- included);
+ List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
+ treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
}
this.path = path;
this.codec = codec;
this.types = types;
this.bufferSize = bufferSize;
+ this.included = options.getInclude();
+ this.conf = conf;
this.rowIndexStride = strideRate;
this.metadata = new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size());
SearchArgument sarg = options.getSearchArgument();
@@ -207,8 +210,7 @@ public class RecordReaderImpl implements RecordReader {
skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
}
- reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
- treeReaderSchema, included, skipCorrupt);
+ reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
indexes = new OrcProto.RowIndex[types.size()];
bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
advanceToNextRow(reader, 0L, true);
@@ -237,7 +239,7 @@ public class RecordReaderImpl implements RecordReader {
return metadata.readStripeFooter(stripe);
}
- enum Location {
+ static enum Location {
BEFORE, MIN, MIDDLE, MAX, AFTER
}
@@ -1050,27 +1052,31 @@ public class RecordReaderImpl implements RecordReader {
}
@Override
- public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException {
try {
+ final VectorizedRowBatch result;
if (rowInStripe >= rowCountInStripe) {
currentStripe += 1;
- if (currentStripe >= stripes.size()) {
- batch.size = 0;
- return false;
- }
readStripe();
}
- int batchSize = computeBatchSize(batch.getMaxSize());
+ final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
rowInStripe += batchSize;
- reader.setVectorColumnCount(batch.getDataColumnCount());
- reader.nextBatch(batch, batchSize);
+ if (previous == null) {
+ ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
+ result = new VectorizedRowBatch(cols.length);
+ result.cols = cols;
+ } else {
+ result = previous;
+ result.selectedInUse = false;
+ reader.setVectorColumnCount(result.getDataColumnCount());
+ reader.nextVector(result.cols, batchSize);
+ }
- batch.size = (int) batchSize;
- batch.selectedInUse = false;
+ result.size = batchSize;
advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
- return batch.size != 0;
+ return result;
} catch (IOException e) {
// Rethrow exception with file name in log message
throw new IOException("Error reading file: " + path, e);
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
index 6747691..f28ca13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -20,12 +20,13 @@ package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
import org.apache.orc.TypeDescription;
/**
@@ -33,134 +34,103 @@ import org.apache.orc.TypeDescription;
* has been schema evolution.
*/
public class SchemaEvolution {
- private final Map<TypeDescription, TypeDescription> readerToFile;
- private final boolean[] included;
- private final TypeDescription readerSchema;
+
private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
- public SchemaEvolution(TypeDescription readerSchema, boolean[] included) {
- this.included = included;
- readerToFile = null;
- this.readerSchema = readerSchema;
- }
+ public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
+ List<OrcProto.Type> schemaTypes) throws IOException {
- public SchemaEvolution(TypeDescription fileSchema,
- TypeDescription readerSchema,
- boolean[] included) throws IOException {
- readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1);
- this.included = included;
- if (checkAcidSchema(fileSchema)) {
- this.readerSchema = createEventSchema(readerSchema);
+ // For ACID, the row is the ROW field in the outer STRUCT.
+ final boolean isAcid = checkAcidSchema(fileTypes);
+ final List<OrcProto.Type> rowSchema;
+ int rowSubtype;
+ if (isAcid) {
+ rowSubtype = OrcRecordUpdater.ROW + 1;
+ rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
} else {
- this.readerSchema = readerSchema;
+ rowSubtype = 0;
+ rowSchema = fileTypes;
}
- buildMapping(fileSchema, this.readerSchema);
- }
- public TypeDescription getReaderSchema() {
- return readerSchema;
- }
+ // Do checking on the overlap. Additional columns will be defaulted to NULL.
- public TypeDescription getFileType(TypeDescription readerType) {
- TypeDescription result;
- if (readerToFile == null) {
- if (included == null || included[readerType.getId()]) {
- result = readerType;
- } else {
- result = null;
- }
- } else {
- result = readerToFile.get(readerType);
- }
- return result;
- }
+ int numFileColumns = rowSchema.get(0).getSubtypesCount();
+ int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
- void buildMapping(TypeDescription fileType,
- TypeDescription readerType) throws IOException {
- // if the column isn't included, don't map it
- if (included != null && !included[readerType.getId()]) {
- return;
- }
- boolean isOk = true;
- // check the easy case first
- if (fileType.getCategory() == readerType.getCategory()) {
- switch (readerType.getCategory()) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case DOUBLE:
- case FLOAT:
- case STRING:
- case TIMESTAMP:
- case BINARY:
- case DATE:
- // these are always a match
- break;
- case CHAR:
- case VARCHAR:
- isOk = fileType.getMaxLength() == readerType.getMaxLength();
- break;
- case DECIMAL:
- // TODO we don't enforce scale and precision checks, but probably should
- break;
- case UNION:
- case MAP:
- case LIST: {
- // these must be an exact match
- List<TypeDescription> fileChildren = fileType.getChildren();
- List<TypeDescription> readerChildren = readerType.getChildren();
- if (fileChildren.size() == readerChildren.size()) {
- for(int i=0; i < fileChildren.size(); ++i) {
- buildMapping(fileChildren.get(i), readerChildren.get(i));
- }
- } else {
- isOk = false;
+ int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
+
+ /**
+ * Check type promotion.
+ *
+ * Currently, we only support integer type promotions that can be done "implicitly".
+ * That is, we know that using a bigger integer tree reader on the original smaller integer
+ * column will "just work".
+ *
+ * In the future, other type promotions might require type conversion.
+ */
+ // short -> int -> bigint as same integer readers are used for the above types.
+
+ for (int i = 0; i < numReadColumns; i++) {
+ OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
+ OrcProto.Type rColType = schemaTypes.get(i);
+ if (!fColType.getKind().equals(rColType.getKind())) {
+
+ boolean ok = false;
+ if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+
+ if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
+ rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+ // type promotion possible, converting SHORT to INT/LONG requested type
+ ok = true;
}
- break;
- }
- case STRUCT: {
- // allow either side to have fewer fields than the other
- List<TypeDescription> fileChildren = fileType.getChildren();
- List<TypeDescription> readerChildren = readerType.getChildren();
- int jointSize = Math.min(fileChildren.size(), readerChildren.size());
- for(int i=0; i < jointSize; ++i) {
- buildMapping(fileChildren.get(i), readerChildren.get(i));
+ } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+
+ if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+ // type promotion possible, converting INT to LONG requested type
+ ok = true;
}
- break;
}
- default:
- throw new IllegalArgumentException("Unknown type " + readerType);
- }
- } else {
- switch (fileType.getCategory()) {
- case SHORT:
- if (readerType.getCategory() != TypeDescription.Category.INT &&
- readerType.getCategory() != TypeDescription.Category.LONG) {
- isOk = false;
- }
- break;
- case INT:
- if (readerType.getCategory() != TypeDescription.Category.LONG) {
- isOk = false;
- }
- break;
- default:
- isOk = false;
+
+ if (!ok) {
+ throw new IOException("ORC does not support type conversion from " +
+ fColType.getKind().name() + " to " + rColType.getKind().name());
+ }
}
}
- if (isOk) {
- readerToFile.put(readerType, fileType);
+
+ List<OrcProto.Type> fullSchemaTypes;
+
+ if (isAcid) {
+ fullSchemaTypes = new ArrayList<OrcProto.Type>();
+
+ // This copies the ACID struct type which is subtype = 0.
+ // It has field names "operation" through "row".
+ // And we copy the types for all fields EXCEPT ROW (which must be last!).
+
+ for (int i = 0; i < rowSubtype; i++) {
+ fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+ }
+
+ // Add the row struct type.
+ OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
} else {
- throw new IOException("ORC does not support type conversion from " +
- fileType + " to " + readerType);
+ fullSchemaTypes = schemaTypes;
}
+
+ int innerStructSubtype = rowSubtype;
+
+ // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
+ // " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
+
+ return new TreeReaderSchema().
+ fileTypes(fileTypes).
+ schemaTypes(fullSchemaTypes).
+ innerStructSubtype(innerStructSubtype);
}
- private static boolean checkAcidSchema(TypeDescription type) {
- if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
- List<String> rootFields = type.getFieldNames();
+ private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
+ if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
+ List<String> rootFields = fileSchema.get(0).getFieldNamesList();
if (acidEventFieldNames.equals(rootFields)) {
return true;
}
@@ -172,14 +142,26 @@ public class SchemaEvolution {
* @param typeDescr
* @return ORC types for the ACID event based on the row's type description
*/
- public static TypeDescription createEventSchema(TypeDescription typeDescr) {
- TypeDescription result = TypeDescription.createStruct()
- .addField("operation", TypeDescription.createInt())
- .addField("originalTransaction", TypeDescription.createLong())
- .addField("bucket", TypeDescription.createInt())
- .addField("rowId", TypeDescription.createLong())
- .addField("currentTransaction", TypeDescription.createLong())
- .addField("row", typeDescr.clone());
+ public static List<OrcProto.Type> createEventSchema(TypeDescription typeDescr) {
+
+ List<OrcProto.Type> result = new ArrayList<OrcProto.Type>();
+
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ type.addAllFieldNames(acidEventFieldNames);
+ for (int i = 0; i < acidEventFieldNames.size(); i++) {
+ type.addSubtypes(i + 1);
+ }
+ result.add(type.build());
+
+ // Automatically add all fields except the last (ROW).
+ for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
+ type.clear();
+ type.setKind(acidEventOrcTypeKinds.get(i));
+ result.add(type.build());
+ }
+
+ OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
return result;
}
@@ -192,4 +174,14 @@ public class SchemaEvolution {
acidEventFieldNames.add("currentTransaction");
acidEventFieldNames.add("row");
}
+ public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
+ new ArrayList<OrcProto.Type.Kind>();
+ static {
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
+ }
}