You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/22 02:46:27 UTC
[18/35] hive git commit: HIVE-14089 : complex type support in LLAP IO
is broken (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index ebbdf8d..d5f5f9d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -17,18 +17,26 @@
*/
package org.apache.hadoop.hive.ql.io.orc.encoded;
+import org.apache.orc.impl.RunLengthByteReader;
+import org.apache.orc.impl.StreamName;
+
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.TypeDescription.Category;
import org.apache.orc.impl.PositionProvider;
import org.apache.orc.impl.SettableUncompressedStream;
import org.apache.orc.impl.TreeReaderFactory;
import org.apache.orc.OrcProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class EncodedTreeReaderFactory extends TreeReaderFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(EncodedTreeReaderFactory.class);
/**
* We choose to use a toy programming language, so we cannot use multiple inheritance.
* If we could, we could have this inherit TreeReader to contain the common impl, and then
@@ -36,7 +44,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
* Instead, we have a settable interface that the caller will cast to and call setBuffers.
*/
public interface SettableTreeReader {
- void setBuffers(ColumnStreamData[] streamBuffers, boolean sameStripe) throws IOException;
+ void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe) throws IOException;
}
public static class TimestampStreamReader extends TimestampTreeReader
@@ -84,8 +92,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -198,6 +207,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void seek(PositionProvider[] index) throws IOException {
+ // This string reader should simply redirect to its own seek (what other types already do).
+ this.seek(index[columnId]);
+ }
+
+ @Override
public void seek(PositionProvider index) throws IOException {
if (present != null) {
if (_isFileCompressed) {
@@ -211,7 +226,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
- if (_dataStream.available() > 0) {
+ if (_dataStream != null && _dataStream.available() > 0) {
if (_isFileCompressed) {
index.getNext();
}
@@ -222,14 +237,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
- if (_dataStream.available() > 0) {
+ if (_dataStream != null && _dataStream.available() > 0) {
if (_isFileCompressed) {
index.getNext();
}
((StringDirectTreeReader) reader).getStream().seek(index);
}
- if (_lengthStream.available() > 0) {
+ if (_lengthStream != null && _lengthStream.available() > 0) {
if (_isFileCompressed) {
index.getNext();
}
@@ -239,8 +254,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -371,8 +387,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -468,8 +485,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -571,8 +589,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -668,8 +687,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -758,8 +778,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -859,8 +880,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -979,8 +1001,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -1065,6 +1088,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void seek(PositionProvider[] index) throws IOException {
+ // This string reader should simply redirect to its own seek (what other types already do).
+ this.seek(index[columnId]);
+ }
+
+ @Override
public void seek(PositionProvider index) throws IOException {
if (present != null) {
if (_isFileCompressed) {
@@ -1106,8 +1135,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -1233,6 +1263,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void seek(PositionProvider[] index) throws IOException {
+ // This string reader should simply redirect to its own seek (what other types already do).
+ this.seek(index[columnId]);
+ }
+
+ @Override
public void seek(PositionProvider index) throws IOException {
if (present != null) {
if (_isFileCompressed) {
@@ -1274,8 +1310,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -1411,8 +1448,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -1511,8 +1549,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -1617,8 +1656,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
}
@@ -1673,177 +1713,760 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
}
- public static TreeReader[] createEncodedTreeReader(int numCols,
- List<OrcProto.Type> types,
- List<OrcProto.ColumnEncoding> encodings,
- EncodedColumnBatch<OrcBatchKey> batch,
- CompressionCodec codec, boolean skipCorrupt,
- String writerTimezone) throws IOException {
- TreeReader[] treeReaders = new TreeReader[numCols];
- for (int i = 0; i < numCols; i++) {
- int columnIndex = batch.getColumnIxs()[i];
- ColumnStreamData[] streamBuffers = batch.getColumnData()[i];
- OrcProto.Type columnType = types.get(columnIndex);
-
- // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
- // But we need to know if the original data is compressed or not. This is used to skip
- // positions in row index properly. If the file is originally compressed,
- // then 1st position (compressed offset) in row index should be skipped to get
- // uncompressed offset, else 1st position should not be skipped.
- // TODO: there should be a better way to do this, code just needs to be modified
- OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
-
- // stream buffers are arranged in enum order of stream kind
- ColumnStreamData present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE],
- data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE],
- dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE],
- lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE],
- secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE];
-
- switch (columnType.getKind()) {
- case BINARY:
- treeReaders[i] = BinaryStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setLengthStream(lengths)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case BOOLEAN:
- treeReaders[i] = BooleanStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .build();
- break;
- case BYTE:
- treeReaders[i] = ByteStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .build();
- break;
- case SHORT:
- treeReaders[i] = ShortStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case INT:
- treeReaders[i] = IntStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case LONG:
- treeReaders[i] = LongStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .skipCorrupt(skipCorrupt)
- .build();
- break;
- case FLOAT:
- treeReaders[i] = FloatStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .build();
- break;
- case DOUBLE:
- treeReaders[i] = DoubleStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setCompressionCodec(codec)
- .build();
- break;
- case CHAR:
- treeReaders[i] = CharStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setMaxLength(columnType.getMaximumLength())
- .setPresentStream(present)
- .setDataStream(data)
- .setLengthStream(lengths)
- .setDictionaryStream(dictionary)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case VARCHAR:
- treeReaders[i] = VarcharStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setMaxLength(columnType.getMaximumLength())
- .setPresentStream(present)
- .setDataStream(data)
- .setLengthStream(lengths)
- .setDictionaryStream(dictionary)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case STRING:
- treeReaders[i] = StringStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setDataStream(data)
- .setLengthStream(lengths)
- .setDictionaryStream(dictionary)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case DECIMAL:
- treeReaders[i] = DecimalStreamReader.builder()
- .setColumnIndex(columnIndex)
- .setPrecision(columnType.getPrecision())
- .setScale(columnType.getScale())
- .setPresentStream(present)
- .setValueStream(data)
- .setScaleStream(secondary)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
- .build();
- break;
- case TIMESTAMP:
- treeReaders[i] = TimestampStreamReader.builder()
+ public static StructTreeReader createRootTreeReader(TypeDescription schema,
+ List<OrcProto.ColumnEncoding> encodings, EncodedColumnBatch<OrcBatchKey> batch,
+ CompressionCodec codec, boolean skipCorrupt, String tz, int[] columnMapping)
+ throws IOException {
+ if (schema.getCategory() != Category.STRUCT) {
+ throw new AssertionError("Schema is not a struct: " + schema);
+ }
+ // Some child types may be excluded. Note that this can only happen at root level.
+ List<TypeDescription> children = schema.getChildren();
+ int childCount = children.size(), includedCount = 0;
+ for (int childIx = 0; childIx < childCount; ++childIx) {
+ if (!batch.hasData(children.get(childIx).getId())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column at " + childIx + " " + children.get(childIx).getId()
+ + ":" + children.get(childIx).toString() + " has no data");
+ }
+ continue;
+ }
+ ++includedCount;
+ }
+ TreeReader[] childReaders = new TreeReader[includedCount];
+ for (int schemaChildIx = 0, inclChildIx = -1; schemaChildIx < childCount; ++schemaChildIx) {
+ if (!batch.hasData(children.get(schemaChildIx).getId())) continue;
+ childReaders[++inclChildIx] = createEncodedTreeReader(
+ schema.getChildren().get(schemaChildIx), encodings, batch, codec, skipCorrupt, tz);
+ columnMapping[inclChildIx] = schemaChildIx;
+ }
+ return StructStreamReader.builder()
+ .setColumnIndex(0)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(encodings.get(0))
+ .setChildReaders(childReaders)
+ .build();
+ }
+
+
+ private static TreeReader createEncodedTreeReader(TypeDescription schema,
+ List<OrcProto.ColumnEncoding> encodings, EncodedColumnBatch<OrcBatchKey> batch,
+ CompressionCodec codec, boolean skipCorrupt, String tz) throws IOException {
+ int columnIndex = schema.getId();
+ ColumnStreamData[] streamBuffers = batch.getColumnData(columnIndex);
+
+ // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
+ // But we need to know if the original data is compressed or not. This is used to skip
+ // positions in row index properly. If the file is originally compressed,
+ // then 1st position (compressed offset) in row index should be skipped to get
+ // uncompressed offset, else 1st position should not be skipped.
+ // TODO: there should be a better way to do this, code just needs to be modified
+ OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
+
+ // stream buffers are arranged in enum order of stream kind
+ ColumnStreamData present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE],
+ data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE],
+ dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE],
+ lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE],
+ secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE];
+
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" +
+ " present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}",
+ columnIndex, schema, streamBuffers.length, columnEncoding, present != null,
+ data != null, dictionary != null, lengths != null, secondary != null, tz);
+ }
+ switch (schema.getCategory()) {
+ case BINARY:
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ case DECIMAL:
+ case TIMESTAMP:
+ case DATE:
+ return getPrimitiveTreeReaders(columnIndex, schema, codec, columnEncoding,
+ present, data, dictionary, lengths, secondary, skipCorrupt, tz);
+ case LIST:
+ TypeDescription elementType = schema.getChildren().get(0);
+ TreeReader elementReader = createEncodedTreeReader(
+ elementType, encodings, batch, codec, skipCorrupt, tz);
+ return ListStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setColumnEncoding(columnEncoding)
+ .setCompressionCodec(codec)
+ .setPresentStream(present)
+ .setLengthStream(lengths)
+ .setElementReader(elementReader)
+ .build();
+ case MAP:
+ TypeDescription keyType = schema.getChildren().get(0);
+ TypeDescription valueType = schema.getChildren().get(1);
+ TreeReader keyReader = createEncodedTreeReader(
+ keyType, encodings, batch, codec, skipCorrupt, tz);
+ TreeReader valueReader = createEncodedTreeReader(
+ valueType, encodings, batch, codec, skipCorrupt, tz);
+ return MapStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setColumnEncoding(columnEncoding)
+ .setCompressionCodec(codec)
+ .setPresentStream(present)
+ .setLengthStream(lengths)
+ .setKeyReader(keyReader)
+ .setValueReader(valueReader)
+ .build();
+ case STRUCT: {
+ int childCount = schema.getChildren().size();
+ TreeReader[] childReaders = new TreeReader[childCount];
+ for (int i = 0; i < childCount; i++) {
+ TypeDescription childType = schema.getChildren().get(i);
+ childReaders[i] = createEncodedTreeReader(
+ childType, encodings, batch, codec, skipCorrupt, tz);
+ }
+ return StructStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .setPresentStream(present)
+ .setChildReaders(childReaders)
+ .build();
+ }
+ case UNION: {
+ int childCount = schema.getChildren().size();
+ TreeReader[] childReaders = new TreeReader[childCount];
+ for (int i = 0; i < childCount; i++) {
+ TypeDescription childType = schema.getChildren().get(i);
+ childReaders[i] = createEncodedTreeReader(
+ childType, encodings, batch, codec, skipCorrupt, tz);
+ }
+ return UnionStreamReader.builder()
.setColumnIndex(columnIndex)
- .setPresentStream(present)
- .setSecondsStream(data)
- .setNanosStream(secondary)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
- .setWriterTimezone(writerTimezone)
- .skipCorrupt(skipCorrupt)
- .build();
- break;
- case DATE:
- treeReaders[i] = DateStreamReader.builder()
- .setColumnIndex(columnIndex)
.setPresentStream(present)
.setDataStream(data)
- .setCompressionCodec(codec)
- .setColumnEncoding(columnEncoding)
+ .setChildReaders(childReaders)
.build();
- break;
- default:
- throw new UnsupportedOperationException("Data type not supported yet! " + columnType);
+ }
+ default:
+ throw new UnsupportedOperationException("Data type not supported: " + schema);
+ }
+ }
+
+ private static TreeReader getPrimitiveTreeReaders(final int columnIndex,
+ TypeDescription columnType, CompressionCodec codec, OrcProto.ColumnEncoding columnEncoding,
+ ColumnStreamData present, ColumnStreamData data, ColumnStreamData dictionary,
+ ColumnStreamData lengths, ColumnStreamData secondary, boolean skipCorrupt, String tz)
+ throws IOException {
+ switch (columnType.getCategory()) {
+ case BINARY:
+ return BinaryStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setLengthStream(lengths)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ case BOOLEAN:
+ return BooleanStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .build();
+ case BYTE:
+ return ByteStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .build();
+ case SHORT:
+ return ShortStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ case INT:
+ return IntStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ case LONG:
+ return LongStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .skipCorrupt(skipCorrupt)
+ .build();
+ case FLOAT:
+ return FloatStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .build();
+ case DOUBLE:
+ return DoubleStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .build();
+ case CHAR:
+ return CharStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setMaxLength(columnType.getMaxLength())
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setLengthStream(lengths)
+ .setDictionaryStream(dictionary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ case VARCHAR:
+ return VarcharStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setMaxLength(columnType.getMaxLength())
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setLengthStream(lengths)
+ .setDictionaryStream(dictionary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ case STRING:
+ return StringStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setLengthStream(lengths)
+ .setDictionaryStream(dictionary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ case DECIMAL:
+ return DecimalStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPrecision(columnType.getPrecision())
+ .setScale(columnType.getScale())
+ .setPresentStream(present)
+ .setValueStream(data)
+ .setScaleStream(secondary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ case TIMESTAMP:
+ return TimestampStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setSecondsStream(data)
+ .setNanosStream(secondary)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .setWriterTimezone(tz)
+ .skipCorrupt(skipCorrupt)
+ .build();
+ case DATE:
+ return DateStreamReader.builder()
+ .setColumnIndex(columnIndex)
+ .setPresentStream(present)
+ .setDataStream(data)
+ .setCompressionCodec(codec)
+ .setColumnEncoding(columnEncoding)
+ .build();
+ default:
+ throw new AssertionError("Not a primitive category: " + columnType.getCategory());
+ }
+ }
+
+ protected static class ListStreamReader extends ListTreeReader implements SettableTreeReader {
+ private boolean _isFileCompressed;
+ private SettableUncompressedStream _presentStream;
+ private SettableUncompressedStream _lengthStream;
+
+ public ListStreamReader(final int columnIndex,
+ final SettableUncompressedStream present, final SettableUncompressedStream lengthStream,
+ final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed,
+ final TreeReader elementReader) throws IOException {
+ super(columnIndex, present, lengthStream, columnEncoding, elementReader);
+ this._isFileCompressed = isFileCompressed;
+ this._presentStream = present;
+ this._lengthStream = lengthStream;
+ }
+
+ @Override
+ public void seek(PositionProvider[] index) throws IOException {
+ PositionProvider ownIndex = index[columnId];
+ if (present != null) {
+ if (_isFileCompressed) {
+ ownIndex.getNext();
+ }
+ present.seek(ownIndex);
+ }
+
+ // lengths stream could be empty stream or already reached end of stream before present stream.
+ // This can happen if all values in stream are nulls or last row group values are all null.
+ if (_lengthStream.available() > 0) {
+ if (_isFileCompressed) {
+ ownIndex.getNext();
+ }
+ lengths.seek(ownIndex);
+ elementReader.seek(index);
}
}
- return treeReaders;
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ // Only our parent class can call this.
+ throw new IOException("Should never be called");
+ }
+
+ @Override
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
+ throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
+ if (_presentStream != null) {
+ _presentStream.setBuffers(
+ StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
+ }
+ if (_lengthStream != null) {
+ _lengthStream.setBuffers(
+ StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.LENGTH_VALUE]));
+ }
+
+ if (elementReader != null) {
+ ((SettableTreeReader) elementReader).setBuffers(batch, sameStripe);
+ }
+ }
+
+ public static class StreamReaderBuilder {
+ private int columnIndex;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData lengthStream;
+ private CompressionCodec compressionCodec;
+ private OrcProto.ColumnEncoding columnEncoding;
+ private TreeReader elementReader;
+
+
+ public ListStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) {
+ this.columnIndex = columnIndex;
+ return this;
+ }
+
+ public ListStreamReader.StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
+ this.lengthStream = lengthStream;
+ return this;
+ }
+
+ public ListStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
+ this.presentStream = presentStream;
+ return this;
+ }
+
+ public ListStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+ this.columnEncoding = encoding;
+ return this;
+ }
+
+ public ListStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ return this;
+ }
+
+ public ListStreamReader.StreamReaderBuilder setElementReader(TreeReader elementReader) {
+ this.elementReader = elementReader;
+ return this;
+ }
+
+ public ListStreamReader build() throws IOException {
+ SettableUncompressedStream present = StreamUtils
+ .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+ presentStream);
+
+ SettableUncompressedStream length = StreamUtils
+ .createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(),
+ lengthStream);
+
+ boolean isFileCompressed = compressionCodec != null;
+ return new ListStreamReader(columnIndex, present, length, columnEncoding, isFileCompressed,
+ elementReader);
+ }
+ }
+
+ public static ListStreamReader.StreamReaderBuilder builder() {
+ return new ListStreamReader.StreamReaderBuilder();
+ }
+ }
+
+ protected static class MapStreamReader extends MapTreeReader implements SettableTreeReader{
+ private boolean _isFileCompressed;
+ private SettableUncompressedStream _presentStream;
+ private SettableUncompressedStream _lengthStream;
+
+ public MapStreamReader(final int columnIndex,
+ final SettableUncompressedStream present, final SettableUncompressedStream lengthStream,
+ final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed,
+ final TreeReader keyReader, final TreeReader valueReader) throws IOException {
+ super(columnIndex, present, lengthStream, columnEncoding, keyReader, valueReader);
+ this._isFileCompressed = isFileCompressed;
+ this._presentStream = present;
+ this._lengthStream = lengthStream;
+ }
+
+ @Override
+ public void seek(PositionProvider[] index) throws IOException {
+ // We are not calling super.seek since we handle the present stream differently.
+ PositionProvider ownIndex = index[columnId];
+ if (present != null) {
+ if (_isFileCompressed) {
+ ownIndex.getNext();
+ }
+ present.seek(ownIndex);
+ }
+
+ // lengths stream could be empty stream or already reached end of stream before present stream.
+ // This can happen if all values in stream are nulls or last row group values are all null.
+ if (_lengthStream.available() > 0) {
+ if (_isFileCompressed) {
+ ownIndex.getNext();
+ }
+ lengths.seek(ownIndex);
+ keyReader.seek(index);
+ valueReader.seek(index);
+ }
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ // Only our parent class can call this.
+ throw new IOException("Should never be called");
+ }
+
+ @Override
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
+ throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
+ if (_presentStream != null) {
+ _presentStream.setBuffers(
+ StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
+ }
+ if (_lengthStream != null) {
+ _lengthStream.setBuffers(
+ StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.LENGTH_VALUE]));
+ }
+
+ if (keyReader != null) {
+ ((SettableTreeReader) keyReader).setBuffers(batch, sameStripe);
+ }
+
+ if (valueReader != null) {
+ ((SettableTreeReader) valueReader).setBuffers(batch, sameStripe);
+ }
+ }
+
+ public static class StreamReaderBuilder {
+ private int columnIndex;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData lengthStream;
+ private CompressionCodec compressionCodec;
+ private OrcProto.ColumnEncoding columnEncoding;
+ private TreeReader keyReader;
+ private TreeReader valueReader;
+
+
+ public MapStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) {
+ this.columnIndex = columnIndex;
+ return this;
+ }
+
+ public MapStreamReader.StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
+ this.lengthStream = lengthStream;
+ return this;
+ }
+
+ public MapStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
+ this.presentStream = presentStream;
+ return this;
+ }
+
+ public MapStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+ this.columnEncoding = encoding;
+ return this;
+ }
+
+ public MapStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ return this;
+ }
+
+ public MapStreamReader.StreamReaderBuilder setKeyReader(TreeReader keyReader) {
+ this.keyReader = keyReader;
+ return this;
+ }
+
+ public MapStreamReader.StreamReaderBuilder setValueReader(TreeReader valueReader) {
+ this.valueReader = valueReader;
+ return this;
+ }
+
+ public MapStreamReader build() throws IOException {
+ SettableUncompressedStream present = StreamUtils
+ .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+ presentStream);
+
+ SettableUncompressedStream length = StreamUtils
+ .createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(),
+ lengthStream);
+
+ boolean isFileCompressed = compressionCodec != null;
+ return new MapStreamReader(columnIndex, present, length, columnEncoding, isFileCompressed,
+ keyReader, valueReader);
+ }
+ }
+
+ public static MapStreamReader.StreamReaderBuilder builder() {
+ return new MapStreamReader.StreamReaderBuilder();
+ }
+ }
+
+ protected static class StructStreamReader extends StructTreeReader
+ implements SettableTreeReader {
+ private boolean _isFileCompressed;
+ private SettableUncompressedStream _presentStream;
+
+ public StructStreamReader(final int columnIndex,
+ final SettableUncompressedStream present,
+ final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed,
+ final TreeReader[] childReaders) throws IOException {
+ super(columnIndex, present, columnEncoding, childReaders);
+ this._isFileCompressed = isFileCompressed;
+ this._presentStream = present;
+ }
+
+ @Override
+ public void seek(PositionProvider[] index) throws IOException {
+ PositionProvider ownIndex = index[columnId];
+ if (present != null) {
+ if (_isFileCompressed) {
+ ownIndex.getNext();
+ }
+ present.seek(ownIndex);
+ }
+ if (fields != null) {
+ for (TreeReader child : fields) {
+ child.seek(index);
+ }
+ }
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ // Only our parent class can call this.
+ throw new IOException("Should never be called");
+ }
+
+
+ @Override
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
+ throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
+ if (_presentStream != null) {
+ _presentStream.setBuffers(
+ StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
+ }
+ if (fields != null) {
+ for (TreeReader child : fields) {
+ ((SettableTreeReader) child).setBuffers(batch, sameStripe);
+ }
+ }
+ }
+
+ public static class StreamReaderBuilder {
+ private int columnIndex;
+ private ColumnStreamData presentStream;
+ private CompressionCodec compressionCodec;
+ private OrcProto.ColumnEncoding columnEncoding;
+ private TreeReader[] childReaders;
+
+
+ public StructStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) {
+ this.columnIndex = columnIndex;
+ return this;
+ }
+
+ public StructStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
+ this.presentStream = presentStream;
+ return this;
+ }
+
+ public StructStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+ this.columnEncoding = encoding;
+ return this;
+ }
+
+ public StructStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ return this;
+ }
+
+ public StructStreamReader.StreamReaderBuilder setChildReaders(TreeReader[] childReaders) {
+ this.childReaders = childReaders;
+ return this;
+ }
+
+ public StructStreamReader build() throws IOException {
+ SettableUncompressedStream present = StreamUtils
+ .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+ presentStream);
+
+ boolean isFileCompressed = compressionCodec != null;
+ return new StructStreamReader(columnIndex, present, columnEncoding, isFileCompressed,
+ childReaders);
+ }
+ }
+
+ public static StructStreamReader.StreamReaderBuilder builder() {
+ return new StructStreamReader.StreamReaderBuilder();
+ }
+ }
+
+ protected static class UnionStreamReader extends UnionTreeReader implements SettableTreeReader {
+ private boolean _isFileCompressed;
+ private SettableUncompressedStream _presentStream;
+ private SettableUncompressedStream _dataStream;
+
+ public UnionStreamReader(final int columnIndex,
+ final SettableUncompressedStream present, final SettableUncompressedStream dataStream,
+ final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed,
+ final TreeReader[] childReaders) throws IOException {
+ super(columnIndex, present, columnEncoding, childReaders);
+ this._isFileCompressed = isFileCompressed;
+ this._presentStream = present;
+ this._dataStream = dataStream;
+ // Note: other parent readers init everything in ctor, but union does it in startStripe.
+ this.tags = new RunLengthByteReader(dataStream);
+ }
+
+ @Override
+ public void seek(PositionProvider[] index) throws IOException {
+ PositionProvider ownIndex = index[columnId];
+ if (present != null) {
+ if (_isFileCompressed) {
+ ownIndex.getNext();
+ }
+ present.seek(ownIndex);
+ }
+
+ // lengths stream could be empty stream or already reached end of stream before present stream.
+ // This can happen if all values in stream are nulls or last row group values are all null.
+ if (_dataStream.available() > 0) {
+ if (_isFileCompressed) {
+ ownIndex.getNext();
+ }
+ tags.seek(ownIndex);
+ if (fields != null) {
+ for (TreeReader child : fields) {
+ child.seek(index);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ // Only our parent class can call this.
+ throw new IOException("Should never be called");
+ }
+
+ @Override
+ public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
+ throws IOException {
+ ColumnStreamData[] streamsData = batch.getColumnData(columnId);
+ if (_presentStream != null) {
+ _presentStream.setBuffers(
+ StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
+ }
+ if (_dataStream != null) {
+ _dataStream.setBuffers(
+ StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.DATA_VALUE]));
+ }
+ if (fields != null) {
+ for (TreeReader child : fields) {
+ ((SettableTreeReader) child).setBuffers(batch, sameStripe);
+ }
+ }
+ }
+
+ public static class StreamReaderBuilder {
+ private int columnIndex;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private CompressionCodec compressionCodec;
+ private OrcProto.ColumnEncoding columnEncoding;
+ private TreeReader[] childReaders;
+
+
+ public UnionStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) {
+ this.columnIndex = columnIndex;
+ return this;
+ }
+
+ public UnionStreamReader.StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+ this.dataStream = dataStream;
+ return this;
+ }
+
+ public UnionStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
+ this.presentStream = presentStream;
+ return this;
+ }
+
+ public UnionStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+ this.columnEncoding = encoding;
+ return this;
+ }
+
+ public UnionStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ return this;
+ }
+
+ public UnionStreamReader.StreamReaderBuilder setChildReaders(TreeReader[] childReaders) {
+ this.childReaders = childReaders;
+ return this;
+ }
+
+ public UnionStreamReader build() throws IOException {
+ SettableUncompressedStream present = StreamUtils.createSettableUncompressedStream(
+ OrcProto.Stream.Kind.PRESENT.name(), presentStream);
+
+ SettableUncompressedStream data = StreamUtils.createSettableUncompressedStream(
+ OrcProto.Stream.Kind.DATA.name(), dataStream);
+
+ boolean isFileCompressed = compressionCodec != null;
+ return new UnionStreamReader(columnIndex, present, data,
+ columnEncoding, isFileCompressed, childReaders);
+ }
+ }
+
+ public static UnionStreamReader.StreamReaderBuilder builder() {
+ return new UnionStreamReader.StreamReaderBuilder();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 4405232..1c5f0e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -46,7 +46,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
public static final int ALL_RGS = -1;
/**
* All the previous streams are data streams, this and the next ones are index streams.
- * We assume the sort will stay the same for backward compat.
+ * We assume the order will stay the same for backward compat.
*/
public static final int MAX_DATA_STREAMS = OrcProto.Stream.Kind.ROW_INDEX.getNumber();
public void init(Object fileKey, int stripeIx, int rgIx, int columnCount) {
@@ -57,6 +57,10 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
}
resetColumnArrays(columnCount);
}
+
+ public void initOrcColumn(int colIx) {
+ super.initColumn(colIx, MAX_DATA_STREAMS);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 5cc3663..601324a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -278,30 +278,35 @@ public class MapWork extends BaseWork {
}
// check if the column types that are read are supported by LLAP IO
- for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
- if (hasLlap) {
- final String alias = entry.getKey();
- Operator<? extends OperatorDesc> op = entry.getValue();
- PartitionDesc partitionDesc = aliasToPartnInfo.get(alias);
- if (op instanceof TableScanOperator && partitionDesc != null &&
- partitionDesc.getTableDesc() != null) {
- final TableScanOperator tsOp = (TableScanOperator) op;
- final List<String> readColumnNames = tsOp.getNeededColumns();
- final Properties props = partitionDesc.getTableDesc().getProperties();
- final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(
- props.getProperty(serdeConstants.LIST_COLUMN_TYPES));
- final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos);
- final List<String> allColumnNames = Utilities.getColumnNames(props);
- hasLlap = Utilities.checkLlapIOSupportedTypes(readColumnNames, allColumnNames,
- allColumnTypes);
- }
- }
+ if (hasLlap) {
+ // TODO: no need for now hasLlap = checkVectorizerSupportedTypes();
}
llapIoDesc = deriveLlapIoDescString(
isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid);
}
+ private boolean checkVectorizerSupportedTypes(boolean hasLlap) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
+ final String alias = entry.getKey();
+ Operator<? extends OperatorDesc> op = entry.getValue();
+ PartitionDesc partitionDesc = aliasToPartnInfo.get(alias);
+ if (op instanceof TableScanOperator && partitionDesc != null &&
+ partitionDesc.getTableDesc() != null) {
+ final TableScanOperator tsOp = (TableScanOperator) op;
+ final List<String> readColumnNames = tsOp.getNeededColumns();
+ final Properties props = partitionDesc.getTableDesc().getProperties();
+ final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(
+ props.getProperty(serdeConstants.LIST_COLUMN_TYPES));
+ final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos);
+ final List<String> allColumnNames = Utilities.getColumnNames(props);
+ hasLlap = Utilities.checkVectorizerSupportedTypes(readColumnNames, allColumnNames,
+ allColumnTypes);
+ }
+ }
+ return hasLlap;
+ }
+
private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny,
boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) {
if (!isLlapOn) return null; // LLAP IO is off, don't output.
http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/queries/clientpositive/vector_complex_all.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_complex_all.q b/ql/src/test/queries/clientpositive/vector_complex_all.q
index 91a7368..b71ac62 100644
--- a/ql/src/test/queries/clientpositive/vector_complex_all.q
+++ b/ql/src/test/queries/clientpositive/vector_complex_all.q
@@ -1,9 +1,11 @@
set hive.compute.query.using.stats=false;
-set hive.compute.query.using.stats=false;
+set hive.strict.checks.cartesian.product=false;
set hive.cli.print.header=true;
set hive.explain.user=false;
set hive.fetch.task.conversion=none;
SET hive.vectorized.execution.enabled=true;
+set hive.llap.io.enabled=false;
+set hive.mapred.mode=nonstrict;
CREATE TABLE orc_create_staging (
str STRING,
@@ -21,25 +23,45 @@ CREATE TABLE orc_create_complex (
str STRING,
mp MAP<STRING,STRING>,
lst ARRAY<STRING>,
- strct STRUCT<A:STRING,B:STRING>
-) STORED AS ORC;
+ strct STRUCT<A:STRING,B:STRING>,
+ val string
+) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000");
-INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging;
+INSERT OVERWRITE TABLE orc_create_complex
+SELECT orc_create_staging.*, '0' FROM orc_create_staging;
--- Since complex types are not supported, this query should not vectorize.
-EXPLAIN
-SELECT * FROM orc_create_complex;
+set hive.llap.io.enabled=true;
SELECT * FROM orc_create_complex;
--- However, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT COUNT(*) FROM orc_create_complex;
+SELECT str FROM orc_create_complex;
+
+SELECT strct, mp, lst FROM orc_create_complex;
+
+SELECT lst, str FROM orc_create_complex;
+
+SELECT mp, str FROM orc_create_complex;
+
+SELECT strct, str FROM orc_create_complex;
+
+SELECT strct.B, str FROM orc_create_complex;
+
+set hive.llap.io.enabled=false;
+
+INSERT INTO TABLE orc_create_complex
+SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2;
+
+select count(*) from orc_create_complex;
+
+set hive.llap.io.enabled=true;
+
+SELECT distinct lst, strct FROM orc_create_complex;
+
+SELECT str, count(val) FROM orc_create_complex GROUP BY str;
+
+SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B;
+
+SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str;
-SELECT COUNT(*) FROM orc_create_complex;
--- Also, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT str FROM orc_create_complex ORDER BY str;
-SELECT str FROM orc_create_complex ORDER BY str;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out
index 08d49bc..f16bb16 100644
--- a/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out
@@ -34,8 +34,9 @@ PREHOOK: query: CREATE TABLE orc_create_complex (
str STRING,
mp MAP<STRING,STRING>,
lst ARRAY<STRING>,
- strct STRUCT<A:STRING,B:STRING>
-) STORED AS ORC
+ strct STRUCT<A:STRING,B:STRING>,
+ val string
+) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000")
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@orc_create_complex
@@ -43,16 +44,19 @@ POSTHOOK: query: CREATE TABLE orc_create_complex (
str STRING,
mp MAP<STRING,STRING>,
lst ARRAY<STRING>,
- strct STRUCT<A:STRING,B:STRING>
-) STORED AS ORC
+ strct STRUCT<A:STRING,B:STRING>,
+ val string
+) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000")
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@orc_create_complex
-PREHOOK: query: INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging
+PREHOOK: query: INSERT OVERWRITE TABLE orc_create_complex
+SELECT orc_create_staging.*, '0' FROM orc_create_staging
PREHOOK: type: QUERY
PREHOOK: Input: default@orc_create_staging
PREHOOK: Output: default@orc_create_complex
-POSTHOOK: query: INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging
+POSTHOOK: query: INSERT OVERWRITE TABLE orc_create_complex
+SELECT orc_create_staging.*, '0' FROM orc_create_staging
POSTHOOK: type: QUERY
POSTHOOK: Input: default@orc_create_staging
POSTHOOK: Output: default@orc_create_complex
@@ -60,199 +64,166 @@ POSTHOOK: Lineage: orc_create_complex.lst SIMPLE [(orc_create_staging)orc_create
POSTHOOK: Lineage: orc_create_complex.mp SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:mp, type:map<string,string>, comment:null), ]
POSTHOOK: Lineage: orc_create_complex.str SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:str, type:string, comment:null), ]
POSTHOOK: Lineage: orc_create_complex.strct SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:strct, type:struct<A:string,B:string>, comment:null), ]
-orc_create_staging.str orc_create_staging.mp orc_create_staging.lst orc_create_staging.strct
-PREHOOK: query: -- Since complex types are not supported, this query should not vectorize.
-EXPLAIN
-SELECT * FROM orc_create_complex
+POSTHOOK: Lineage: orc_create_complex.val SIMPLE []
+orc_create_staging.str orc_create_staging.mp orc_create_staging.lst orc_create_staging.strct c1
+PREHOOK: query: SELECT * FROM orc_create_complex
PREHOOK: type: QUERY
-POSTHOOK: query: -- Since complex types are not supported, this query should not vectorize.
-EXPLAIN
-SELECT * FROM orc_create_complex
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM orc_create_complex
POSTHOOK: type: QUERY
-Explain
-STAGE DEPENDENCIES:
- Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
-
-STAGE PLANS:
- Stage: Stage-1
- Tez
+POSTHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
- Vertices:
- Map 1
- Map Operator Tree:
- TableScan
- alias: orc_create_complex
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: str (type: string), mp (type: map<string,string>), lst (type: array<string>), strct (type: struct<a:string,b:string>)
- outputColumnNames: _col0, _col1, _col2, _col3
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Execution mode: llap
- LLAP IO: no inputs
-
- Stage: Stage-0
- Fetch Operator
- limit: -1
- Processor Tree:
- ListSink
-
-PREHOOK: query: SELECT * FROM orc_create_complex
+orc_create_complex.str orc_create_complex.mp orc_create_complex.lst orc_create_complex.strct orc_create_complex.val
+line1 {"key13":"value13","key11":"value11","key12":"value12"} ["a","b","c"] {"a":"one","b":"two"} 0
+line2 {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"] {"a":"three","b":"four"} 0
+line3 {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"] {"a":"five","b":"six"} 0
+PREHOOK: query: SELECT str FROM orc_create_complex
PREHOOK: type: QUERY
PREHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
-POSTHOOK: query: SELECT * FROM orc_create_complex
+POSTHOOK: query: SELECT str FROM orc_create_complex
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+str
+line1
+line2
+line3
+PREHOOK: query: SELECT strct, mp, lst FROM orc_create_complex
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT strct, mp, lst FROM orc_create_complex
POSTHOOK: type: QUERY
POSTHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
-orc_create_complex.str orc_create_complex.mp orc_create_complex.lst orc_create_complex.strct
-line1 {"key13":"value13","key11":"value11","key12":"value12"} ["a","b","c"] {"a":"one","b":"two"}
-line2 {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"] {"a":"three","b":"four"}
-line3 {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"] {"a":"five","b":"six"}
-PREHOOK: query: -- However, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT COUNT(*) FROM orc_create_complex
+strct mp lst
+{"a":"one","b":"two"} {"key13":"value13","key11":"value11","key12":"value12"} ["a","b","c"]
+{"a":"three","b":"four"} {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"]
+{"a":"five","b":"six"} {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"]
+PREHOOK: query: SELECT lst, str FROM orc_create_complex
PREHOOK: type: QUERY
-POSTHOOK: query: -- However, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT COUNT(*) FROM orc_create_complex
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT lst, str FROM orc_create_complex
POSTHOOK: type: QUERY
-Explain
-STAGE DEPENDENCIES:
- Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
-
-STAGE PLANS:
- Stage: Stage-1
- Tez
+POSTHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
- Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE)
+lst str
+["a","b","c"] line1
+["d","e","f"] line2
+["g","h","i"] line3
+PREHOOK: query: SELECT mp, str FROM orc_create_complex
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
- Vertices:
- Map 1
- Map Operator Tree:
- TableScan
- alias: orc_create_complex
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: COMPLETE
- Select Operator
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: COMPLETE
- Group By Operator
- aggregations: count()
- mode: hash
- outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
- Reduce Output Operator
- sort order:
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
- value expressions: _col0 (type: bigint)
- Execution mode: vectorized, llap
- LLAP IO: all inputs
- Reducer 2
- Execution mode: vectorized, llap
- Reduce Operator Tree:
- Group By Operator
- aggregations: count(VALUE._col0)
- mode: mergepartial
- outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
- File Output Operator
- compressed: false
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- Stage: Stage-0
- Fetch Operator
- limit: -1
- Processor Tree:
- ListSink
-
-PREHOOK: query: SELECT COUNT(*) FROM orc_create_complex
+POSTHOOK: query: SELECT mp, str FROM orc_create_complex
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+mp str
+{"key13":"value13","key11":"value11","key12":"value12"} line1
+{"key21":"value21","key22":"value22","key23":"value23"} line2
+{"key31":"value31","key32":"value32","key33":"value33"} line3
+PREHOOK: query: SELECT strct, str FROM orc_create_complex
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT strct, str FROM orc_create_complex
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+strct str
+{"a":"one","b":"two"} line1
+{"a":"three","b":"four"} line2
+{"a":"five","b":"six"} line3
+PREHOOK: query: SELECT strct.B, str FROM orc_create_complex
PREHOOK: type: QUERY
PREHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
-POSTHOOK: query: SELECT COUNT(*) FROM orc_create_complex
+POSTHOOK: query: SELECT strct.B, str FROM orc_create_complex
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+b str
+two line1
+four line2
+six line3
+Warning: Shuffle Join MERGEJOIN[15][tables = [$hdt$_1, $hdt$_2, $hdt$_3, $hdt$_0]] in Stage 'Reducer 2' is a cross product
+PREHOOK: query: INSERT INTO TABLE orc_create_complex
+SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_staging
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_create_complex
+POSTHOOK: query: INSERT INTO TABLE orc_create_complex
+SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_staging
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_create_complex
+POSTHOOK: Lineage: orc_create_complex.lst SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:lst, type:array<string>, comment:null), ]
+POSTHOOK: Lineage: orc_create_complex.mp SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:mp, type:map<string,string>, comment:null), ]
+POSTHOOK: Lineage: orc_create_complex.str SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:str, type:string, comment:null), ]
+POSTHOOK: Lineage: orc_create_complex.strct SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:strct, type:struct<A:string,B:string>, comment:null), ]
+POSTHOOK: Lineage: orc_create_complex.val SIMPLE [(src)src1.FieldSchema(name:key, type:string, comment:default), ]
+orc_create_staging.str orc_create_staging.mp orc_create_staging.lst orc_create_staging.strct src1.key
+PREHOOK: query: select count(*) from orc_create_complex
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_create_complex
POSTHOOK: type: QUERY
POSTHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
c0
-3
-PREHOOK: query: -- Also, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT str FROM orc_create_complex ORDER BY str
+13503
+PREHOOK: query: SELECT distinct lst, strct FROM orc_create_complex
PREHOOK: type: QUERY
-POSTHOOK: query: -- Also, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT str FROM orc_create_complex ORDER BY str
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT distinct lst, strct FROM orc_create_complex
POSTHOOK: type: QUERY
-Explain
-STAGE DEPENDENCIES:
- Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
-
-STAGE PLANS:
- Stage: Stage-1
- Tez
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+lst strct
+["a","b","c"] {"a":"one","b":"two"}
+["d","e","f"] {"a":"three","b":"four"}
+["g","h","i"] {"a":"five","b":"six"}
+PREHOOK: query: SELECT str, count(val) FROM orc_create_complex GROUP BY str
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
- Edges:
- Reducer 2 <- Map 1 (SIMPLE_EDGE)
+POSTHOOK: query: SELECT str, count(val) FROM orc_create_complex GROUP BY str
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
- Vertices:
- Map 1
- Map Operator Tree:
- TableScan
- alias: orc_create_complex
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: str (type: string)
- outputColumnNames: _col0
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
- Execution mode: vectorized, llap
- LLAP IO: all inputs
- Reducer 2
- Execution mode: vectorized, llap
- Reduce Operator Tree:
- Select Operator
- expressions: KEY.reducesinkkey0 (type: string)
- outputColumnNames: _col0
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
- Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
- Stage: Stage-0
- Fetch Operator
- limit: -1
- Processor Tree:
- ListSink
-
-PREHOOK: query: SELECT str FROM orc_create_complex ORDER BY str
+str c1
+line1 4501
+line2 4501
+line3 4501
+PREHOOK: query: SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B
PREHOOK: type: QUERY
PREHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
-POSTHOOK: query: SELECT str FROM orc_create_complex ORDER BY str
+POSTHOOK: query: SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B
POSTHOOK: type: QUERY
POSTHOOK: Input: default@orc_create_complex
#### A masked pattern was here ####
-str
-line1
-line2
-line3
+strct.b _c1
+four 4501
+six 4501
+two 4501
+PREHOOK: query: SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+strct mp lst str c4
+{"a":"one","b":"two"} {"key11":"value11","key12":"value12","key13":"value13"} ["a","b","c"] line1 4501
+{"a":"three","b":"four"} {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"] line2 4501
+{"a":"five","b":"six"} {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"] line3 4501
http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
index 97d5642..133b8ef 100644
--- a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
@@ -90,7 +90,7 @@ STAGE PLANS:
Statistics: Num rows: 1 Data size: 190 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: map<int,string>)
Execution mode: llap
- LLAP IO: no inputs
+ LLAP IO: all inputs
Stage: Stage-0
Fetch Operator
@@ -211,7 +211,7 @@ STAGE PLANS:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: a (type: array<int>)
Execution mode: llap
- LLAP IO: no inputs
+ LLAP IO: all inputs
Stage: Stage-0
Fetch Operator