You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/18 19:07:36 UTC

[3/4] hive git commit: HIVE-14089 : complex type support in LLAP IO is broken (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index ebbdf8d..d5f5f9d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -17,18 +17,26 @@
  */
 package org.apache.hadoop.hive.ql.io.orc.encoded;
 
+import org.apache.orc.impl.RunLengthByteReader;
+import org.apache.orc.impl.StreamName;
+
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.TypeDescription.Category;
 import org.apache.orc.impl.PositionProvider;
 import org.apache.orc.impl.SettableUncompressedStream;
 import org.apache.orc.impl.TreeReaderFactory;
 import org.apache.orc.OrcProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class EncodedTreeReaderFactory extends TreeReaderFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(EncodedTreeReaderFactory.class);
   /**
    * We choose to use a toy programming language, so we cannot use multiple inheritance.
    * If we could, we could have this inherit TreeReader to contain the common impl, and then
@@ -36,7 +44,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
    * Instead, we have a settable interface that the caller will cast to and call setBuffers.
    */
   public interface SettableTreeReader {
-    void setBuffers(ColumnStreamData[] streamBuffers, boolean sameStripe) throws IOException;
+    void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe) throws IOException;
   }
 
   public static class TimestampStreamReader extends TimestampTreeReader
@@ -84,8 +92,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -198,6 +207,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void seek(PositionProvider[] index) throws IOException {
+      // This string reader should simply redirect to its own seek (what other types already do).
+      this.seek(index[columnId]);
+    }
+
+    @Override
     public void seek(PositionProvider index) throws IOException {
       if (present != null) {
         if (_isFileCompressed) {
@@ -211,7 +226,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         // data stream could be empty stream or already reached end of stream before present stream.
         // This can happen if all values in stream are nulls or last row group values are all null.
-        if (_dataStream.available() > 0) {
+        if (_dataStream != null && _dataStream.available() > 0) {
           if (_isFileCompressed) {
             index.getNext();
           }
@@ -222,14 +237,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         // data stream could be empty stream or already reached end of stream before present stream.
         // This can happen if all values in stream are nulls or last row group values are all null.
-        if (_dataStream.available() > 0) {
+        if (_dataStream != null && _dataStream.available() > 0) {
           if (_isFileCompressed) {
             index.getNext();
           }
           ((StringDirectTreeReader) reader).getStream().seek(index);
         }
 
-        if (_lengthStream.available() > 0) {
+        if (_lengthStream != null && _lengthStream.available() > 0) {
           if (_isFileCompressed) {
             index.getNext();
           }
@@ -239,8 +254,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -371,8 +387,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -468,8 +485,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -571,8 +589,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -668,8 +687,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -758,8 +778,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -859,8 +880,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -979,8 +1001,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -1065,6 +1088,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void seek(PositionProvider[] index) throws IOException {
+      // This string reader should simply redirect to its own seek (what other types already do).
+      this.seek(index[columnId]);
+    }
+
+    @Override
     public void seek(PositionProvider index) throws IOException {
       if (present != null) {
         if (_isFileCompressed) {
@@ -1106,8 +1135,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -1233,6 +1263,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void seek(PositionProvider[] index) throws IOException {
+      // This string reader should simply redirect to its own seek (what other types already do).
+      this.seek(index[columnId]);
+    }
+
+    @Override
     public void seek(PositionProvider index) throws IOException {
       if (present != null) {
         if (_isFileCompressed) {
@@ -1274,8 +1310,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -1411,8 +1448,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -1511,8 +1549,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -1617,8 +1656,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe)
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
       }
@@ -1673,177 +1713,760 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
   }
 
-  public static TreeReader[] createEncodedTreeReader(int numCols,
-                                                     List<OrcProto.Type> types,
-                                                     List<OrcProto.ColumnEncoding> encodings,
-                                                     EncodedColumnBatch<OrcBatchKey> batch,
-                                                     CompressionCodec codec, boolean skipCorrupt,
-                                                     String writerTimezone) throws IOException {
-    TreeReader[] treeReaders = new TreeReader[numCols];
-    for (int i = 0; i < numCols; i++) {
-      int columnIndex = batch.getColumnIxs()[i];
-      ColumnStreamData[] streamBuffers = batch.getColumnData()[i];
-      OrcProto.Type columnType = types.get(columnIndex);
-
-      // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
-      // But we need to know if the original data is compressed or not. This is used to skip
-      // positions in row index properly. If the file is originally compressed,
-      // then 1st position (compressed offset) in row index should be skipped to get
-      // uncompressed offset, else 1st position should not be skipped.
-      // TODO: there should be a better way to do this, code just needs to be modified
-      OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
-
-      // stream buffers are arranged in enum order of stream kind
-      ColumnStreamData present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE],
-        data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE],
-        dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE],
-        lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE],
-        secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE];
-
-      switch (columnType.getKind()) {
-        case BINARY:
-          treeReaders[i] = BinaryStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setLengthStream(lengths)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case BOOLEAN:
-          treeReaders[i] = BooleanStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .build();
-          break;
-        case BYTE:
-          treeReaders[i] = ByteStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .build();
-          break;
-        case SHORT:
-          treeReaders[i] = ShortStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case INT:
-          treeReaders[i] = IntStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case LONG:
-          treeReaders[i] = LongStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .skipCorrupt(skipCorrupt)
-              .build();
-          break;
-        case FLOAT:
-          treeReaders[i] = FloatStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .build();
-          break;
-        case DOUBLE:
-          treeReaders[i] = DoubleStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .build();
-          break;
-        case CHAR:
-          treeReaders[i] = CharStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setMaxLength(columnType.getMaximumLength())
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setLengthStream(lengths)
-              .setDictionaryStream(dictionary)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case VARCHAR:
-          treeReaders[i] = VarcharStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setMaxLength(columnType.getMaximumLength())
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setLengthStream(lengths)
-              .setDictionaryStream(dictionary)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case STRING:
-          treeReaders[i] = StringStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setLengthStream(lengths)
-              .setDictionaryStream(dictionary)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case DECIMAL:
-          treeReaders[i] = DecimalStreamReader.builder()
-              .setColumnIndex(columnIndex)
-              .setPrecision(columnType.getPrecision())
-              .setScale(columnType.getScale())
-              .setPresentStream(present)
-              .setValueStream(data)
-              .setScaleStream(secondary)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case TIMESTAMP:
-          treeReaders[i] = TimestampStreamReader.builder()
+  public static StructTreeReader createRootTreeReader(TypeDescription schema,
+       List<OrcProto.ColumnEncoding> encodings, EncodedColumnBatch<OrcBatchKey> batch,
+       CompressionCodec codec, boolean skipCorrupt, String tz, int[] columnMapping)
+           throws IOException {
+    if (schema.getCategory() != Category.STRUCT) {
+      throw new AssertionError("Schema is not a struct: " + schema);
+    }
+    // Some child types may be excluded. Note that this can only happen at root level.
+    List<TypeDescription> children = schema.getChildren();
+    int childCount = children.size(), includedCount = 0;
+    for (int childIx = 0; childIx < childCount; ++childIx) {
+      if (!batch.hasData(children.get(childIx).getId())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Column at " + childIx + " " + children.get(childIx).getId()
+              + ":" + children.get(childIx).toString() + " has no data");
+        }
+        continue;
+      }
+      ++includedCount;
+    }
+    TreeReader[] childReaders = new TreeReader[includedCount];
+    for (int schemaChildIx = 0, inclChildIx = -1; schemaChildIx < childCount; ++schemaChildIx) {
+      if (!batch.hasData(children.get(schemaChildIx).getId())) continue;
+      childReaders[++inclChildIx] = createEncodedTreeReader(
+          schema.getChildren().get(schemaChildIx), encodings, batch, codec, skipCorrupt, tz);
+      columnMapping[inclChildIx] = schemaChildIx;
+    }
+    return StructStreamReader.builder()
+        .setColumnIndex(0)
+        .setCompressionCodec(codec)
+        .setColumnEncoding(encodings.get(0))
+        .setChildReaders(childReaders)
+        .build();
+  }
+
+
+  private static TreeReader createEncodedTreeReader(TypeDescription schema,
+      List<OrcProto.ColumnEncoding> encodings, EncodedColumnBatch<OrcBatchKey> batch,
+      CompressionCodec codec, boolean skipCorrupt, String tz) throws IOException {
+      int columnIndex = schema.getId();
+    ColumnStreamData[] streamBuffers = batch.getColumnData(columnIndex);
+
+    // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
+    // But we need to know if the original data is compressed or not. This is used to skip
+    // positions in row index properly. If the file is originally compressed,
+    // then 1st position (compressed offset) in row index should be skipped to get
+    // uncompressed offset, else 1st position should not be skipped.
+    // TODO: there should be a better way to do this, code just needs to be modified
+    OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
+
+    // stream buffers are arranged in enum order of stream kind
+    ColumnStreamData present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE],
+      data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE],
+      dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE],
+      lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE],
+      secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE];
+
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" +
+          " present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}",
+          columnIndex, schema, streamBuffers.length, columnEncoding, present != null,
+          data != null, dictionary != null, lengths != null, secondary != null, tz);
+    }
+    switch (schema.getCategory()) {
+      case BINARY:
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+      case DECIMAL:
+      case TIMESTAMP:
+      case DATE:
+        return getPrimitiveTreeReaders(columnIndex, schema, codec, columnEncoding,
+            present, data, dictionary, lengths, secondary, skipCorrupt, tz);
+      case LIST:
+        TypeDescription elementType = schema.getChildren().get(0);
+        TreeReader elementReader = createEncodedTreeReader(
+            elementType, encodings, batch, codec, skipCorrupt, tz);
+        return ListStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setColumnEncoding(columnEncoding)
+            .setCompressionCodec(codec)
+            .setPresentStream(present)
+            .setLengthStream(lengths)
+            .setElementReader(elementReader)
+            .build();
+      case MAP:
+        TypeDescription keyType = schema.getChildren().get(0);
+        TypeDescription valueType = schema.getChildren().get(1);
+        TreeReader keyReader = createEncodedTreeReader(
+            keyType, encodings, batch, codec, skipCorrupt, tz);
+        TreeReader valueReader = createEncodedTreeReader(
+            valueType, encodings, batch, codec, skipCorrupt, tz);
+        return MapStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setColumnEncoding(columnEncoding)
+            .setCompressionCodec(codec)
+            .setPresentStream(present)
+            .setLengthStream(lengths)
+            .setKeyReader(keyReader)
+            .setValueReader(valueReader)
+            .build();
+      case STRUCT: {
+        int childCount = schema.getChildren().size();
+        TreeReader[] childReaders = new TreeReader[childCount];
+        for (int i = 0; i < childCount; i++) {
+          TypeDescription childType = schema.getChildren().get(i);
+          childReaders[i] = createEncodedTreeReader(
+              childType, encodings, batch, codec, skipCorrupt, tz);
+        }
+        return StructStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .setPresentStream(present)
+            .setChildReaders(childReaders)
+            .build();
+      }
+      case UNION: {
+        int childCount = schema.getChildren().size();
+        TreeReader[] childReaders = new TreeReader[childCount];
+        for (int i = 0; i < childCount; i++) {
+          TypeDescription childType = schema.getChildren().get(i);
+          childReaders[i] = createEncodedTreeReader(
+              childType, encodings, batch, codec, skipCorrupt, tz);
+        }
+        return UnionStreamReader.builder()
               .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setSecondsStream(data)
-              .setNanosStream(secondary)
               .setCompressionCodec(codec)
               .setColumnEncoding(columnEncoding)
-              .setWriterTimezone(writerTimezone)
-              .skipCorrupt(skipCorrupt)
-              .build();
-          break;
-        case DATE:
-          treeReaders[i] = DateStreamReader.builder()
-              .setColumnIndex(columnIndex)
               .setPresentStream(present)
               .setDataStream(data)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
+              .setChildReaders(childReaders)
               .build();
-          break;
-        default:
-          throw new UnsupportedOperationException("Data type not supported yet! " + columnType);
+      }
+      default:
+        throw new UnsupportedOperationException("Data type not supported: " + schema);
+    }
+  }
+
+  private static TreeReader getPrimitiveTreeReaders(final int columnIndex,
+      TypeDescription columnType, CompressionCodec codec, OrcProto.ColumnEncoding columnEncoding,
+      ColumnStreamData present, ColumnStreamData data, ColumnStreamData dictionary,
+      ColumnStreamData lengths, ColumnStreamData secondary, boolean skipCorrupt, String tz)
+          throws IOException {
+    switch (columnType.getCategory()) {
+      case BINARY:
+        return BinaryStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setLengthStream(lengths)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .build();
+      case BOOLEAN:
+        return BooleanStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setCompressionCodec(codec)
+            .build();
+      case BYTE:
+        return ByteStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setCompressionCodec(codec)
+            .build();
+      case SHORT:
+        return ShortStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .build();
+      case INT:
+        return IntStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .build();
+      case LONG:
+        return LongStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .skipCorrupt(skipCorrupt)
+            .build();
+      case FLOAT:
+        return FloatStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setCompressionCodec(codec)
+            .build();
+      case DOUBLE:
+        return DoubleStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setCompressionCodec(codec)
+            .build();
+      case CHAR:
+        return CharStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setMaxLength(columnType.getMaxLength())
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setLengthStream(lengths)
+            .setDictionaryStream(dictionary)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .build();
+      case VARCHAR:
+        return VarcharStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setMaxLength(columnType.getMaxLength())
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setLengthStream(lengths)
+            .setDictionaryStream(dictionary)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .build();
+      case STRING:
+        return StringStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setLengthStream(lengths)
+            .setDictionaryStream(dictionary)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .build();
+      case DECIMAL:
+        return DecimalStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPrecision(columnType.getPrecision())
+            .setScale(columnType.getScale())
+            .setPresentStream(present)
+            .setValueStream(data)
+            .setScaleStream(secondary)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .build();
+      case TIMESTAMP:
+        return TimestampStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setSecondsStream(data)
+            .setNanosStream(secondary)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .setWriterTimezone(tz)
+            .skipCorrupt(skipCorrupt)
+            .build();
+      case DATE:
+        return DateStreamReader.builder()
+            .setColumnIndex(columnIndex)
+            .setPresentStream(present)
+            .setDataStream(data)
+            .setCompressionCodec(codec)
+            .setColumnEncoding(columnEncoding)
+            .build();
+    default:
+      throw new AssertionError("Not a primitive category: " + columnType.getCategory());
+    }
+  }
+
+  protected static class ListStreamReader extends ListTreeReader implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _lengthStream;
+
+    public ListStreamReader(final int columnIndex,
+        final SettableUncompressedStream present, final SettableUncompressedStream lengthStream,
+        final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed,
+        final TreeReader elementReader) throws IOException {
+      super(columnIndex, present, lengthStream, columnEncoding, elementReader);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._lengthStream = lengthStream;
+    }
+
+    @Override
+    public void seek(PositionProvider[] index) throws IOException {
+      PositionProvider ownIndex = index[columnId];
+      if (present != null) {
+        if (_isFileCompressed) {
+          ownIndex.getNext();
+        }
+        present.seek(ownIndex);
+      }
+
+      // lengths stream could be empty stream or already reached end of stream before present stream.
+      // This can happen if all values in stream are nulls or last row group values are all null.
+      if (_lengthStream.available() > 0) {
+        if (_isFileCompressed) {
+          ownIndex.getNext();
+        }
+        lengths.seek(ownIndex);
+        elementReader.seek(index);
       }
     }
 
-    return treeReaders;
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      // Only our parent class can call this.
+      throw new IOException("Should never be called");
+    }
+
+    @Override
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
+        throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
+      if (_presentStream != null) {
+        _presentStream.setBuffers(
+            StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
+      }
+      if (_lengthStream != null) {
+        _lengthStream.setBuffers(
+            StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.LENGTH_VALUE]));
+      }
+
+      if (elementReader != null) {
+        ((SettableTreeReader) elementReader).setBuffers(batch, sameStripe);
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData lengthStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+      private TreeReader elementReader;
+
+
+      public ListStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public ListStreamReader.StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
+        this.lengthStream = lengthStream;
+        return this;
+      }
+
+      public ListStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public ListStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public ListStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public ListStreamReader.StreamReaderBuilder setElementReader(TreeReader elementReader) {
+        this.elementReader = elementReader;
+        return this;
+      }
+
+      public ListStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                presentStream);
+
+        SettableUncompressedStream length = StreamUtils
+            .createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(),
+                lengthStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new ListStreamReader(columnIndex, present, length, columnEncoding, isFileCompressed,
+            elementReader);
+      }
+    }
+
+    public static ListStreamReader.StreamReaderBuilder builder() {
+      return new ListStreamReader.StreamReaderBuilder();
+    }
+  }
+
+  protected static class MapStreamReader extends MapTreeReader implements SettableTreeReader{
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _lengthStream;
+
+    public MapStreamReader(final int columnIndex,
+        final SettableUncompressedStream present, final SettableUncompressedStream lengthStream,
+        final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed,
+        final TreeReader keyReader, final TreeReader valueReader) throws IOException {
+      super(columnIndex, present, lengthStream, columnEncoding, keyReader, valueReader);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._lengthStream = lengthStream;
+    }
+
+    @Override
+    public void seek(PositionProvider[] index) throws IOException {
+      // We are not calling super.seek since we handle the present stream differently.
+      PositionProvider ownIndex = index[columnId];
+      if (present != null) {
+        if (_isFileCompressed) {
+          ownIndex.getNext();
+        }
+        present.seek(ownIndex);
+      }
+
+      // lengths stream could be empty stream or already reached end of stream before present stream.
+      // This can happen if all values in stream are nulls or last row group values are all null.
+      if (_lengthStream.available() > 0) {
+        if (_isFileCompressed) {
+          ownIndex.getNext();
+        }
+        lengths.seek(ownIndex);
+        keyReader.seek(index);
+        valueReader.seek(index);
+      }
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      // Only our parent class can call this.
+      throw new IOException("Should never be called");
+    }
+
+    @Override
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
+        throws IOException {
+     ColumnStreamData[] streamsData = batch.getColumnData(columnId);
+     if (_presentStream != null) {
+        _presentStream.setBuffers(
+            StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
+      }
+      if (_lengthStream != null) {
+        _lengthStream.setBuffers(
+            StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.LENGTH_VALUE]));
+      }
+
+      if (keyReader != null) {
+        ((SettableTreeReader) keyReader).setBuffers(batch, sameStripe);
+      }
+
+      if (valueReader != null) {
+        ((SettableTreeReader) valueReader).setBuffers(batch, sameStripe);
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData lengthStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+      private TreeReader keyReader;
+      private TreeReader valueReader;
+
+
+      public MapStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public MapStreamReader.StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
+        this.lengthStream = lengthStream;
+        return this;
+      }
+
+      public MapStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public MapStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public MapStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public MapStreamReader.StreamReaderBuilder setKeyReader(TreeReader keyReader) {
+        this.keyReader = keyReader;
+        return this;
+      }
+
+      public MapStreamReader.StreamReaderBuilder setValueReader(TreeReader valueReader) {
+        this.valueReader = valueReader;
+        return this;
+      }
+
+      public MapStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                presentStream);
+
+        SettableUncompressedStream length = StreamUtils
+            .createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(),
+                lengthStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new MapStreamReader(columnIndex, present, length, columnEncoding, isFileCompressed,
+            keyReader, valueReader);
+      }
+    }
+
+    public static MapStreamReader.StreamReaderBuilder builder() {
+      return new MapStreamReader.StreamReaderBuilder();
+    }
+  }
+
+  protected static class StructStreamReader extends StructTreeReader
+      implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+
+    public StructStreamReader(final int columnIndex,
+        final SettableUncompressedStream present,
+        final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed,
+        final TreeReader[] childReaders) throws IOException {
+      super(columnIndex, present, columnEncoding, childReaders);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+    }
+
+    @Override
+    public void seek(PositionProvider[] index) throws IOException {
+      PositionProvider ownIndex = index[columnId];
+      if (present != null) {
+        if (_isFileCompressed) {
+          ownIndex.getNext();
+        }
+        present.seek(ownIndex);
+      }
+      if (fields != null) {
+        for (TreeReader child : fields) {
+          child.seek(index);
+        }
+      }
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      // Only our parent class can call this.
+      throw new IOException("Should never be called");
+    }
+
+
+    @Override
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
+        throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
+      if (_presentStream != null) {
+        _presentStream.setBuffers(
+            StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
+      }
+      if (fields != null) {
+        for (TreeReader child : fields) {
+          ((SettableTreeReader) child).setBuffers(batch, sameStripe);
+        }
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+      private TreeReader[] childReaders;
+
+
+      public StructStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public StructStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public StructStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public StructStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public StructStreamReader.StreamReaderBuilder setChildReaders(TreeReader[] childReaders) {
+        this.childReaders = childReaders;
+        return this;
+      }
+
+      public StructStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils
+            .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
+                presentStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new StructStreamReader(columnIndex, present, columnEncoding, isFileCompressed,
+            childReaders);
+      }
+    }
+
+    public static StructStreamReader.StreamReaderBuilder builder() {
+      return new StructStreamReader.StreamReaderBuilder();
+    }
+  }
+
+  protected static class UnionStreamReader extends UnionTreeReader implements SettableTreeReader {
+    private boolean _isFileCompressed;
+    private SettableUncompressedStream _presentStream;
+    private SettableUncompressedStream _dataStream;
+
+    public UnionStreamReader(final int columnIndex,
+        final SettableUncompressedStream present, final SettableUncompressedStream dataStream,
+        final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed,
+        final TreeReader[] childReaders) throws IOException {
+      super(columnIndex, present, columnEncoding, childReaders);
+      this._isFileCompressed = isFileCompressed;
+      this._presentStream = present;
+      this._dataStream = dataStream;
+      // Note: other parent readers init everything in ctor, but union does it in startStripe.
+      this.tags = new RunLengthByteReader(dataStream);
+    }
+
+    @Override
+    public void seek(PositionProvider[] index) throws IOException {
+      PositionProvider ownIndex = index[columnId];
+      if (present != null) {
+        if (_isFileCompressed) {
+          ownIndex.getNext();
+        }
+        present.seek(ownIndex);
+      }
+
+      // lengths stream could be empty stream or already reached end of stream before present stream.
+      // This can happen if all values in stream are nulls or last row group values are all null.
+      if (_dataStream.available() > 0) {
+        if (_isFileCompressed) {
+          ownIndex.getNext();
+        }
+        tags.seek(ownIndex);
+        if (fields != null) {
+          for (TreeReader child : fields) {
+            child.seek(index);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      // Only our parent class can call this.
+      throw new IOException("Should never be called");
+    }
+
+    @Override
+    public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
+        throws IOException {
+      ColumnStreamData[] streamsData = batch.getColumnData(columnId);
+      if (_presentStream != null) {
+        _presentStream.setBuffers(
+            StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
+      }
+      if (_dataStream != null) {
+        _dataStream.setBuffers(
+            StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.DATA_VALUE]));
+      }
+      if (fields != null) {
+        for (TreeReader child : fields) {
+          ((SettableTreeReader) child).setBuffers(batch, sameStripe);
+        }
+      }
+    }
+
+    public static class StreamReaderBuilder {
+      private int columnIndex;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private CompressionCodec compressionCodec;
+      private OrcProto.ColumnEncoding columnEncoding;
+      private TreeReader[] childReaders;
+
+
+      public UnionStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) {
+        this.columnIndex = columnIndex;
+        return this;
+      }
+
+      public UnionStreamReader.StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
+        this.dataStream = dataStream;
+        return this;
+      }
+
+      public UnionStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
+        this.presentStream = presentStream;
+        return this;
+      }
+
+      public UnionStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+        this.columnEncoding = encoding;
+        return this;
+      }
+
+      public UnionStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+        this.compressionCodec = compressionCodec;
+        return this;
+      }
+
+      public UnionStreamReader.StreamReaderBuilder setChildReaders(TreeReader[] childReaders) {
+        this.childReaders = childReaders;
+        return this;
+      }
+
+      public UnionStreamReader build() throws IOException {
+        SettableUncompressedStream present = StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.PRESENT.name(), presentStream);
+
+        SettableUncompressedStream data = StreamUtils.createSettableUncompressedStream(
+            OrcProto.Stream.Kind.DATA.name(), dataStream);
+
+        boolean isFileCompressed = compressionCodec != null;
+        return new UnionStreamReader(columnIndex, present, data,
+            columnEncoding, isFileCompressed, childReaders);
+      }
+    }
+
+    public static UnionStreamReader.StreamReaderBuilder builder() {
+      return new UnionStreamReader.StreamReaderBuilder();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 4405232..1c5f0e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -46,7 +46,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
     public static final int ALL_RGS = -1;
     /**
      * All the previous streams are data streams, this and the next ones are index streams.
-     * We assume the sort will stay the same for backward compat.
+     * We assume the order will stay the same for backward compat.
      */
     public static final int MAX_DATA_STREAMS = OrcProto.Stream.Kind.ROW_INDEX.getNumber();
     public void init(Object fileKey, int stripeIx, int rgIx, int columnCount) {
@@ -57,6 +57,10 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
       }
       resetColumnArrays(columnCount);
     }
+
+    public void initOrcColumn(int colIx) {
+      super.initColumn(colIx, MAX_DATA_STREAMS);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 5cc3663..601324a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -278,30 +278,35 @@ public class MapWork extends BaseWork {
     }
 
     // check if the column types that are read are supported by LLAP IO
-    for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
-      if (hasLlap) {
-        final String alias = entry.getKey();
-        Operator<? extends OperatorDesc> op = entry.getValue();
-        PartitionDesc partitionDesc = aliasToPartnInfo.get(alias);
-        if (op instanceof TableScanOperator && partitionDesc != null &&
-            partitionDesc.getTableDesc() != null) {
-          final TableScanOperator tsOp = (TableScanOperator) op;
-          final List<String> readColumnNames = tsOp.getNeededColumns();
-          final Properties props = partitionDesc.getTableDesc().getProperties();
-          final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(
-              props.getProperty(serdeConstants.LIST_COLUMN_TYPES));
-          final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos);
-          final List<String> allColumnNames = Utilities.getColumnNames(props);
-          hasLlap = Utilities.checkLlapIOSupportedTypes(readColumnNames, allColumnNames,
-              allColumnTypes);
-        }
-      }
+    if (hasLlap) {
+      // TODO: no need for now hasLlap = checkVectorizerSupportedTypes();
     }
 
     llapIoDesc = deriveLlapIoDescString(
         isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid);
   }
 
+  private boolean checkVectorizerSupportedTypes(boolean hasLlap) {
+    for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
+      final String alias = entry.getKey();
+      Operator<? extends OperatorDesc> op = entry.getValue();
+      PartitionDesc partitionDesc = aliasToPartnInfo.get(alias);
+      if (op instanceof TableScanOperator && partitionDesc != null &&
+          partitionDesc.getTableDesc() != null) {
+        final TableScanOperator tsOp = (TableScanOperator) op;
+        final List<String> readColumnNames = tsOp.getNeededColumns();
+        final Properties props = partitionDesc.getTableDesc().getProperties();
+        final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(
+            props.getProperty(serdeConstants.LIST_COLUMN_TYPES));
+        final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos);
+        final List<String> allColumnNames = Utilities.getColumnNames(props);
+        hasLlap = Utilities.checkVectorizerSupportedTypes(readColumnNames, allColumnNames,
+            allColumnTypes);
+      }
+    }
+    return hasLlap;
+  }
+
   private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny,
       boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) {
     if (!isLlapOn) return null; // LLAP IO is off, don't output.

http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/queries/clientpositive/vector_complex_all.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_complex_all.q b/ql/src/test/queries/clientpositive/vector_complex_all.q
index 91a7368..b71ac62 100644
--- a/ql/src/test/queries/clientpositive/vector_complex_all.q
+++ b/ql/src/test/queries/clientpositive/vector_complex_all.q
@@ -1,9 +1,11 @@
 set hive.compute.query.using.stats=false;
-set hive.compute.query.using.stats=false;
+set hive.strict.checks.cartesian.product=false;
 set hive.cli.print.header=true;
 set hive.explain.user=false;
 set hive.fetch.task.conversion=none;
 SET hive.vectorized.execution.enabled=true;
+set hive.llap.io.enabled=false;
+set hive.mapred.mode=nonstrict;
 
 CREATE TABLE orc_create_staging (
   str STRING,
@@ -21,25 +23,45 @@ CREATE TABLE orc_create_complex (
   str STRING,
   mp  MAP<STRING,STRING>,
   lst ARRAY<STRING>,
-  strct STRUCT<A:STRING,B:STRING>
-) STORED AS ORC;
+  strct STRUCT<A:STRING,B:STRING>,
+  val string
+) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000");
 
-INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging;
+INSERT OVERWRITE TABLE orc_create_complex
+SELECT orc_create_staging.*, '0' FROM orc_create_staging;
 
--- Since complex types are not supported, this query should not vectorize.
-EXPLAIN
-SELECT * FROM orc_create_complex;
+set hive.llap.io.enabled=true;
 
 SELECT * FROM orc_create_complex;
 
--- However, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT COUNT(*) FROM orc_create_complex;
+SELECT str FROM orc_create_complex;
+
+SELECT strct, mp, lst FROM orc_create_complex;
+
+SELECT lst, str FROM orc_create_complex;
+
+SELECT mp, str FROM orc_create_complex;
+
+SELECT strct, str FROM orc_create_complex;
+
+SELECT strct.B, str FROM orc_create_complex;
+
+set hive.llap.io.enabled=false;
+
+INSERT INTO TABLE orc_create_complex
+SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2;
+
+select count(*) from orc_create_complex;
+
+set hive.llap.io.enabled=true;
+
+SELECT distinct lst, strct FROM orc_create_complex;
+
+SELECT str, count(val)  FROM orc_create_complex GROUP BY str;
+
+SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B;
+
+SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str;
 
-SELECT COUNT(*) FROM orc_create_complex;
 
--- Also, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT str FROM orc_create_complex ORDER BY str;
 
-SELECT str FROM orc_create_complex ORDER BY str;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out
index 08d49bc..f16bb16 100644
--- a/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out
@@ -34,8 +34,9 @@ PREHOOK: query: CREATE TABLE orc_create_complex (
   str STRING,
   mp  MAP<STRING,STRING>,
   lst ARRAY<STRING>,
-  strct STRUCT<A:STRING,B:STRING>
-) STORED AS ORC
+  strct STRUCT<A:STRING,B:STRING>,
+  val string
+) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000")
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
 PREHOOK: Output: default@orc_create_complex
@@ -43,16 +44,19 @@ POSTHOOK: query: CREATE TABLE orc_create_complex (
   str STRING,
   mp  MAP<STRING,STRING>,
   lst ARRAY<STRING>,
-  strct STRUCT<A:STRING,B:STRING>
-) STORED AS ORC
+  strct STRUCT<A:STRING,B:STRING>,
+  val string
+) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000")
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@orc_create_complex
-PREHOOK: query: INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging
+PREHOOK: query: INSERT OVERWRITE TABLE orc_create_complex
+SELECT orc_create_staging.*, '0' FROM orc_create_staging
 PREHOOK: type: QUERY
 PREHOOK: Input: default@orc_create_staging
 PREHOOK: Output: default@orc_create_complex
-POSTHOOK: query: INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging
+POSTHOOK: query: INSERT OVERWRITE TABLE orc_create_complex
+SELECT orc_create_staging.*, '0' FROM orc_create_staging
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@orc_create_staging
 POSTHOOK: Output: default@orc_create_complex
@@ -60,199 +64,166 @@ POSTHOOK: Lineage: orc_create_complex.lst SIMPLE [(orc_create_staging)orc_create
 POSTHOOK: Lineage: orc_create_complex.mp SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:mp, type:map<string,string>, comment:null), ]
 POSTHOOK: Lineage: orc_create_complex.str SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:str, type:string, comment:null), ]
 POSTHOOK: Lineage: orc_create_complex.strct SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:strct, type:struct<A:string,B:string>, comment:null), ]
-orc_create_staging.str	orc_create_staging.mp	orc_create_staging.lst	orc_create_staging.strct
-PREHOOK: query: -- Since complex types are not supported, this query should not vectorize.
-EXPLAIN
-SELECT * FROM orc_create_complex
+POSTHOOK: Lineage: orc_create_complex.val SIMPLE []
+orc_create_staging.str	orc_create_staging.mp	orc_create_staging.lst	orc_create_staging.strct	c1
+PREHOOK: query: SELECT * FROM orc_create_complex
 PREHOOK: type: QUERY
-POSTHOOK: query: -- Since complex types are not supported, this query should not vectorize.
-EXPLAIN
-SELECT * FROM orc_create_complex
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM orc_create_complex
 POSTHOOK: type: QUERY
-Explain
-STAGE DEPENDENCIES:
-  Stage-1 is a root stage
-  Stage-0 depends on stages: Stage-1
-
-STAGE PLANS:
-  Stage: Stage-1
-    Tez
+POSTHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-      Vertices:
-        Map 1 
-            Map Operator Tree:
-                TableScan
-                  alias: orc_create_complex
-                  Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: str (type: string), mp (type: map<string,string>), lst (type: array<string>), strct (type: struct<a:string,b:string>)
-                    outputColumnNames: _col0, _col1, _col2, _col3
-                    Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
-                    File Output Operator
-                      compressed: false
-                      Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
-                      table:
-                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-            Execution mode: llap
-            LLAP IO: no inputs
-
-  Stage: Stage-0
-    Fetch Operator
-      limit: -1
-      Processor Tree:
-        ListSink
-
-PREHOOK: query: SELECT * FROM orc_create_complex
+orc_create_complex.str	orc_create_complex.mp	orc_create_complex.lst	orc_create_complex.strct	orc_create_complex.val
+line1	{"key13":"value13","key11":"value11","key12":"value12"}	["a","b","c"]	{"a":"one","b":"two"}	0
+line2	{"key21":"value21","key22":"value22","key23":"value23"}	["d","e","f"]	{"a":"three","b":"four"}	0
+line3	{"key31":"value31","key32":"value32","key33":"value33"}	["g","h","i"]	{"a":"five","b":"six"}	0
+PREHOOK: query: SELECT str FROM orc_create_complex
 PREHOOK: type: QUERY
 PREHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-POSTHOOK: query: SELECT * FROM orc_create_complex
+POSTHOOK: query: SELECT str FROM orc_create_complex
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+str
+line1
+line2
+line3
+PREHOOK: query: SELECT strct, mp, lst FROM orc_create_complex
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT strct, mp, lst FROM orc_create_complex
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-orc_create_complex.str	orc_create_complex.mp	orc_create_complex.lst	orc_create_complex.strct
-line1	{"key13":"value13","key11":"value11","key12":"value12"}	["a","b","c"]	{"a":"one","b":"two"}
-line2	{"key21":"value21","key22":"value22","key23":"value23"}	["d","e","f"]	{"a":"three","b":"four"}
-line3	{"key31":"value31","key32":"value32","key33":"value33"}	["g","h","i"]	{"a":"five","b":"six"}
-PREHOOK: query: -- However, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT COUNT(*) FROM orc_create_complex
+strct	mp	lst
+{"a":"one","b":"two"}	{"key13":"value13","key11":"value11","key12":"value12"}	["a","b","c"]
+{"a":"three","b":"four"}	{"key21":"value21","key22":"value22","key23":"value23"}	["d","e","f"]
+{"a":"five","b":"six"}	{"key31":"value31","key32":"value32","key33":"value33"}	["g","h","i"]
+PREHOOK: query: SELECT lst, str FROM orc_create_complex
 PREHOOK: type: QUERY
-POSTHOOK: query: -- However, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT COUNT(*) FROM orc_create_complex
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT lst, str FROM orc_create_complex
 POSTHOOK: type: QUERY
-Explain
-STAGE DEPENDENCIES:
-  Stage-1 is a root stage
-  Stage-0 depends on stages: Stage-1
-
-STAGE PLANS:
-  Stage: Stage-1
-    Tez
+POSTHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-      Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+lst	str
+["a","b","c"]	line1
+["d","e","f"]	line2
+["g","h","i"]	line3
+PREHOOK: query: SELECT mp, str FROM orc_create_complex
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-      Vertices:
-        Map 1 
-            Map Operator Tree:
-                TableScan
-                  alias: orc_create_complex
-                  Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: COMPLETE
-                  Select Operator
-                    Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: COMPLETE
-                    Group By Operator
-                      aggregations: count()
-                      mode: hash
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        sort order: 
-                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col0 (type: bigint)
-            Execution mode: vectorized, llap
-            LLAP IO: all inputs
-        Reducer 2 
-            Execution mode: vectorized, llap
-            Reduce Operator Tree:
-              Group By Operator
-                aggregations: count(VALUE._col0)
-                mode: mergepartial
-                outputColumnNames: _col0
-                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                  table:
-                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
-  Stage: Stage-0
-    Fetch Operator
-      limit: -1
-      Processor Tree:
-        ListSink
-
-PREHOOK: query: SELECT COUNT(*) FROM orc_create_complex
+POSTHOOK: query: SELECT mp, str FROM orc_create_complex
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+mp	str
+{"key13":"value13","key11":"value11","key12":"value12"}	line1
+{"key21":"value21","key22":"value22","key23":"value23"}	line2
+{"key31":"value31","key32":"value32","key33":"value33"}	line3
+PREHOOK: query: SELECT strct, str FROM orc_create_complex
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT strct, str FROM orc_create_complex
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+strct	str
+{"a":"one","b":"two"}	line1
+{"a":"three","b":"four"}	line2
+{"a":"five","b":"six"}	line3
+PREHOOK: query: SELECT strct.B, str FROM orc_create_complex
 PREHOOK: type: QUERY
 PREHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-POSTHOOK: query: SELECT COUNT(*) FROM orc_create_complex
+POSTHOOK: query: SELECT strct.B, str FROM orc_create_complex
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+b	str
+two	line1
+four	line2
+six	line3
+Warning: Shuffle Join MERGEJOIN[15][tables = [$hdt$_1, $hdt$_2, $hdt$_3, $hdt$_0]] in Stage 'Reducer 2' is a cross product
+PREHOOK: query: INSERT INTO TABLE orc_create_complex
+SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_staging
+PREHOOK: Input: default@src
+PREHOOK: Output: default@orc_create_complex
+POSTHOOK: query: INSERT INTO TABLE orc_create_complex
+SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_staging
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@orc_create_complex
+POSTHOOK: Lineage: orc_create_complex.lst SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:lst, type:array<string>, comment:null), ]
+POSTHOOK: Lineage: orc_create_complex.mp SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:mp, type:map<string,string>, comment:null), ]
+POSTHOOK: Lineage: orc_create_complex.str SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:str, type:string, comment:null), ]
+POSTHOOK: Lineage: orc_create_complex.strct SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:strct, type:struct<A:string,B:string>, comment:null), ]
+POSTHOOK: Lineage: orc_create_complex.val SIMPLE [(src)src1.FieldSchema(name:key, type:string, comment:default), ]
+orc_create_staging.str	orc_create_staging.mp	orc_create_staging.lst	orc_create_staging.strct	src1.key
+PREHOOK: query: select count(*) from orc_create_complex
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from orc_create_complex
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
 c0
-3
-PREHOOK: query: -- Also, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT str FROM orc_create_complex ORDER BY str
+13503
+PREHOOK: query: SELECT distinct lst, strct FROM orc_create_complex
 PREHOOK: type: QUERY
-POSTHOOK: query: -- Also, since this query is not referencing the complex fields, it should vectorize.
-EXPLAIN
-SELECT str FROM orc_create_complex ORDER BY str
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT distinct lst, strct FROM orc_create_complex
 POSTHOOK: type: QUERY
-Explain
-STAGE DEPENDENCIES:
-  Stage-1 is a root stage
-  Stage-0 depends on stages: Stage-1
-
-STAGE PLANS:
-  Stage: Stage-1
-    Tez
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+lst	strct
+["a","b","c"]	{"a":"one","b":"two"}
+["d","e","f"]	{"a":"three","b":"four"}
+["g","h","i"]	{"a":"five","b":"six"}
+PREHOOK: query: SELECT str, count(val)  FROM orc_create_complex GROUP BY str
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-      Edges:
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+POSTHOOK: query: SELECT str, count(val)  FROM orc_create_complex GROUP BY str
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-      Vertices:
-        Map 1 
-            Map Operator Tree:
-                TableScan
-                  alias: orc_create_complex
-                  Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: str (type: string)
-                    outputColumnNames: _col0
-                    Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string)
-                      sort order: +
-                      Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
-            Execution mode: vectorized, llap
-            LLAP IO: all inputs
-        Reducer 2 
-            Execution mode: vectorized, llap
-            Reduce Operator Tree:
-              Select Operator
-                expressions: KEY.reducesinkkey0 (type: string)
-                outputColumnNames: _col0
-                Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE
-                  table:
-                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-
-  Stage: Stage-0
-    Fetch Operator
-      limit: -1
-      Processor Tree:
-        ListSink
-
-PREHOOK: query: SELECT str FROM orc_create_complex ORDER BY str
+str	c1
+line1	4501
+line2	4501
+line3	4501
+PREHOOK: query: SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B
 PREHOOK: type: QUERY
 PREHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-POSTHOOK: query: SELECT str FROM orc_create_complex ORDER BY str
+POSTHOOK: query: SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@orc_create_complex
 #### A masked pattern was here ####
-str
-line1
-line2
-line3
+strct.b	_c1
+four	4501
+six	4501
+two	4501
+PREHOOK: query: SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str
+PREHOOK: type: QUERY
+PREHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@orc_create_complex
+#### A masked pattern was here ####
+strct	mp	lst	str	c4
+{"a":"one","b":"two"}	{"key11":"value11","key12":"value12","key13":"value13"}	["a","b","c"]	line1	4501
+{"a":"three","b":"four"}	{"key21":"value21","key22":"value22","key23":"value23"}	["d","e","f"]	line2	4501
+{"a":"five","b":"six"}	{"key31":"value31","key32":"value32","key33":"value33"}	["g","h","i"]	line3	4501

http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
index 97d5642..133b8ef 100644
--- a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
@@ -90,7 +90,7 @@ STAGE PLANS:
                         Statistics: Num rows: 1 Data size: 190 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: map<int,string>)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
 
   Stage: Stage-0
     Fetch Operator
@@ -211,7 +211,7 @@ STAGE PLANS:
                       Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                       value expressions: a (type: array<int>)
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
 
   Stage: Stage-0
     Fetch Operator