You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/10 02:36:40 UTC

[1/3] hive git commit: HIVE-15831 : LLAP: Fix a problem of the output of LlapDump (Takanobu Asanuma, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 050e679df -> 8f273cc53


HIVE-15831 : LLAP: Fix a problem of the output of LlapDump (Takanobu Asanuma, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fbe9b05f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fbe9b05f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fbe9b05f

Branch: refs/heads/master
Commit: fbe9b05f00bc54f012754a1eeb4f404bea3a69e7
Parents: 050e679
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Feb 9 18:24:14 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Feb 9 18:24:14 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/llap/LlapDump.java   | 30 +++-----------------
 1 file changed, 4 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fbe9b05f/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
index 08ad1f5..4a83141 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
@@ -17,17 +17,7 @@
  */
 package org.apache.hadoop.hive.llap;
 
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileInputStream;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 import org.slf4j.Logger;
@@ -39,23 +29,10 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
-import org.apache.hadoop.hive.llap.LlapRowInputFormat;
-import org.apache.hadoop.hive.llap.LlapRowRecordReader;
-import org.apache.hadoop.hive.llap.Row;
-import org.apache.hadoop.hive.llap.Schema;
 
 /**
  * Utility to test query and data retrieval via the LLAP input format.
@@ -160,10 +137,11 @@ public class LlapDump {
   private static void printRow(Row row) {
     Schema schema = row.getSchema();
     StringBuilder sb = new StringBuilder();
-    for (int idx = 0; idx < schema.getColumns().size(); ++idx) {
-      if (idx > 0) {
+    int length = schema.getColumns().size();
+    for (int idx = 0; idx < length; ++idx) {
+      sb.append(row.getValue(idx));
+      if (idx != length - 1) {
         sb.append(", ");
-        sb.append(row.getValue(idx));
       }
     }
     System.out.println(sb.toString());


[2/3] hive git commit: HIVE-15672 : LLAP text cache: improve first query perf II (Sergey Shelukhin, reviewed by Prasanth Jayachandran, Owen O'Malley)

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index a434763..c21327f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -18,14 +18,14 @@
 package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import org.apache.orc.impl.RunLengthByteReader;
-import org.apache.orc.impl.SchemaEvolution;
-import org.apache.orc.impl.StreamName;
 
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.TypeDescription.Category;
@@ -54,20 +54,40 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _secondsStream;
     private SettableUncompressedStream _nanosStream;
+    private List<ColumnVector> vectors;
+    private int vectorIndex = 0;
 
     private TimestampStreamReader(int columnId, SettableUncompressedStream present,
                                   SettableUncompressedStream data, SettableUncompressedStream nanos,
                                   boolean isFileCompressed, OrcProto.ColumnEncoding encoding,
-                                  TreeReaderFactory.Context context) throws IOException {
+                                  TreeReaderFactory.Context context,
+                                  List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data, nanos, encoding, context);
       this.isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._secondsStream = data;
       this._nanosStream = nanos;
+      this.vectors = vectors;
+    }
+
+    @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      // Note: we assume that batchSize will be consistent with vectors passed in.
+      // This is rather brittle; same in other readers.
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (isFileCompressed) {
           index.getNext();
@@ -95,6 +115,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      // The situation here and in other readers is currently as such - setBuffers is never called
+      // in SerDe reader case, and SerDe reader case is the only one that uses vector-s.
+      // When the readers are created with vectors, streams are actually not created at all.
+      // So, if we could have a set of vectors, then set of buffers, we'd be in trouble here;
+      // we may need to implement that if this scenario is ever supported.
+      assert vectors == null;
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -119,6 +145,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
       private TreeReaderFactory.Context context;
+      private List<ColumnVector> vectors;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
         this.columnIndex = columnIndex;
@@ -170,7 +197,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new TimestampStreamReader(columnIndex, present, data, nanos,
-            isFileCompressed, columnEncoding, context);
+            isFileCompressed, columnEncoding, context, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -187,12 +219,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private SettableUncompressedStream _dataStream;
     private SettableUncompressedStream _lengthStream;
     private SettableUncompressedStream _dictionaryStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private StringStreamReader(int columnId, SettableUncompressedStream present,
         SettableUncompressedStream data, SettableUncompressedStream length,
         SettableUncompressedStream dictionary,
         boolean isFileCompressed, OrcProto.ColumnEncoding encoding,
-        TreeReaderFactory.Context context) throws IOException {
+        TreeReaderFactory.Context context, List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data, length, dictionary, encoding, context);
       this._isDictionaryEncoding = dictionary != null;
       this._isFileCompressed = isFileCompressed;
@@ -200,6 +234,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       this._dataStream = data;
       this._lengthStream = length;
       this._dictionaryStream = dictionary;
+      this.vectors = vectors;
     }
 
     @Override
@@ -210,6 +245,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -250,8 +286,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -284,9 +334,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData lengthStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
+      private List<ColumnVector> vectors;
       private TreeReaderFactory.Context context;
 
-
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
         this.columnIndex = columnIndex;
         return this;
@@ -345,7 +395,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new StringStreamReader(columnIndex, present, data, length, dictionary,
-            isFileCompressed, columnEncoding, context);
+            isFileCompressed, columnEncoding, context, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -359,19 +414,23 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private boolean isFileCompressed;
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private ShortStreamReader(int columnId, SettableUncompressedStream present,
         SettableUncompressedStream data, boolean isFileCompressed,
-        OrcProto.ColumnEncoding encoding,
-        TreeReaderFactory.Context context) throws IOException {
+        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data, encoding, context);
       this.isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._dataStream = data;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (isFileCompressed) {
           index.getNext();
@@ -390,8 +449,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -407,6 +480,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
+      private List<ColumnVector> vectors;
       private TreeReaderFactory.Context context;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -450,8 +524,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new ShortStreamReader(columnIndex, present, data, isFileCompressed,
-            columnEncoding, context);
+            columnEncoding, context, vectors);
       }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
+      }
+
     }
 
     public static StreamReaderBuilder builder() {
@@ -463,19 +543,23 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private boolean _isFileCompressed;
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
+    private List<ColumnVector> vectors;
+    private int vectorIndex = 0;
 
     private LongStreamReader(int columnId, SettableUncompressedStream present,
         SettableUncompressedStream data, boolean isFileCompressed,
-        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context
-        ) throws IOException {
+        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data, encoding, context);
       this._isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._dataStream = data;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -494,8 +578,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -512,6 +610,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
       private TreeReaderFactory.Context context;
+      private List<ColumnVector> vectors;
 
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -555,8 +654,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new LongStreamReader(columnIndex, present, data, isFileCompressed,
-            columnEncoding, context);
+            columnEncoding, context, vectors);
       }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
+      }
+
     }
 
     public static StreamReaderBuilder builder() {
@@ -568,19 +673,23 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private boolean _isFileCompressed;
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private IntStreamReader(int columnId, SettableUncompressedStream present,
         SettableUncompressedStream data, boolean isFileCompressed,
-        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context
-        ) throws IOException {
+        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data, encoding, context);
       this._isFileCompressed = isFileCompressed;
       this._dataStream = data;
       this._presentStream = present;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -599,8 +708,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -616,6 +739,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
+      private List<ColumnVector> vectors;
       private TreeReaderFactory.Context context;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -659,8 +783,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new IntStreamReader(columnIndex, present, data, isFileCompressed,
-            columnEncoding, context);
+            columnEncoding, context, vectors);
       }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
+      }
+
     }
 
     public static StreamReaderBuilder builder() {
@@ -673,17 +803,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private boolean _isFileCompressed;
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private FloatStreamReader(int columnId, SettableUncompressedStream present,
-        SettableUncompressedStream data, boolean isFileCompressed) throws IOException {
+        SettableUncompressedStream data, boolean isFileCompressed,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data);
       this._isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._dataStream = data;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -702,8 +837,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -718,7 +867,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData presentStream;
       private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
-
+      private List<ColumnVector> vectors;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
         this.columnIndex = columnIndex;
@@ -750,7 +899,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
                 dataStream);
 
         boolean isFileCompressed = compressionCodec != null;
-        return new FloatStreamReader(columnIndex, present, data, isFileCompressed);
+        return new FloatStreamReader(columnIndex, present, data, isFileCompressed, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -764,17 +918,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private boolean _isFileCompressed;
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private DoubleStreamReader(int columnId, SettableUncompressedStream present,
-        SettableUncompressedStream data, boolean isFileCompressed) throws IOException {
+        SettableUncompressedStream data, boolean isFileCompressed,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data);
       this._isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._dataStream = data;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -793,8 +952,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -809,6 +982,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData presentStream;
       private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
+      private List<ColumnVector> vectors;
       private TreeReaderFactory.Context context;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -846,7 +1020,13 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
                 dataStream);
 
         boolean isFileCompressed = compressionCodec != null;
-        return new DoubleStreamReader(columnIndex, present, data, isFileCompressed);
+        // TODO: why doesn't this use context?
+        return new DoubleStreamReader(columnIndex, present, data, isFileCompressed, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -860,22 +1040,26 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _valueStream;
     private SettableUncompressedStream _scaleStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private DecimalStreamReader(int columnId, int precision, int scale,
         SettableUncompressedStream presentStream,
         SettableUncompressedStream valueStream, SettableUncompressedStream scaleStream,
         boolean isFileCompressed,
-        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context
-        ) throws IOException {
+        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, presentStream, valueStream, scaleStream, encoding, context);
       this._isFileCompressed = isFileCompressed;
       this._presentStream = presentStream;
       this._valueStream = valueStream;
       this._scaleStream = scaleStream;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -901,8 +1085,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -924,9 +1122,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private int precision;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
+      private List<ColumnVector> vectors;
       private TreeReaderFactory.Context context;
 
-
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
         this.columnIndex = columnIndex;
         return this;
@@ -985,7 +1183,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         boolean isFileCompressed = compressionCodec != null;
         return new DecimalStreamReader(columnIndex, precision, scale, presentInStream,
             valueInStream,
-            scaleInStream, isFileCompressed, columnEncoding, context);
+            scaleInStream, isFileCompressed, columnEncoding, context, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -998,19 +1201,23 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private boolean isFileCompressed;
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private DateStreamReader(int columnId, SettableUncompressedStream present,
         SettableUncompressedStream data, boolean isFileCompressed,
-        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context
+        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context, List<ColumnVector> vectors
         ) throws IOException {
       super(columnId, present, data, encoding, context);
       this.isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._dataStream = data;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (isFileCompressed) {
           index.getNext();
@@ -1029,8 +1236,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1046,6 +1267,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
+      private List<ColumnVector> vectors;
       private TreeReaderFactory.Context context;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -1078,6 +1300,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
+      }
+
       public DateStreamReader build() throws IOException {
         SettableUncompressedStream present = StreamUtils
             .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
@@ -1090,7 +1317,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new DateStreamReader(columnIndex, present, data, isFileCompressed,
-            columnEncoding, context);
+            columnEncoding, context, vectors);
       }
     }
 
@@ -1106,11 +1333,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private SettableUncompressedStream _dataStream;
     private SettableUncompressedStream _lengthStream;
     private SettableUncompressedStream _dictionaryStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private CharStreamReader(int columnId, int maxLength,
         SettableUncompressedStream present, SettableUncompressedStream data,
         SettableUncompressedStream length, SettableUncompressedStream dictionary,
-        boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws IOException {
+        boolean isFileCompressed, OrcProto.ColumnEncoding encoding,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, maxLength, present, data, length,
           dictionary, encoding);
       this._isDictionaryEncoding = dictionary != null;
@@ -1119,6 +1349,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       this._dataStream = data;
       this._lengthStream = length;
       this._dictionaryStream = dictionary;
+      this.vectors = vectors;
     }
 
     @Override
@@ -1129,6 +1360,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -1169,8 +1401,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1204,7 +1450,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData lengthStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
-
+      private List<ColumnVector> vectors;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
         this.columnIndex = columnIndex;
@@ -1264,7 +1510,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new CharStreamReader(columnIndex, maxLength, present, data, length,
-            dictionary, isFileCompressed, columnEncoding);
+            dictionary, isFileCompressed, columnEncoding, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -1281,11 +1532,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private SettableUncompressedStream _dataStream;
     private SettableUncompressedStream _lengthStream;
     private SettableUncompressedStream _dictionaryStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private VarcharStreamReader(int columnId, int maxLength,
         SettableUncompressedStream present, SettableUncompressedStream data,
         SettableUncompressedStream length, SettableUncompressedStream dictionary,
-        boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws IOException {
+        boolean isFileCompressed, OrcProto.ColumnEncoding encoding,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, maxLength, present, data, length,
           dictionary, encoding);
       this._isDictionaryEncoding = dictionary != null;
@@ -1294,6 +1548,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       this._dataStream = data;
       this._lengthStream = length;
       this._dictionaryStream = dictionary;
+      this.vectors = vectors;
     }
 
     @Override
@@ -1304,6 +1559,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -1344,8 +1600,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1379,7 +1649,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData lengthStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
-
+      private List<ColumnVector> vectors;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
         this.columnIndex = columnIndex;
@@ -1439,7 +1709,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new VarcharStreamReader(columnIndex, maxLength, present, data, length,
-            dictionary, isFileCompressed, columnEncoding);
+            dictionary, isFileCompressed, columnEncoding, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -1453,17 +1728,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private boolean _isFileCompressed;
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private ByteStreamReader(int columnId, SettableUncompressedStream present,
-        SettableUncompressedStream data, boolean isFileCompressed) throws IOException {
+        SettableUncompressedStream data, boolean isFileCompressed,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data);
       this._isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._dataStream = data;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -1482,8 +1762,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1498,7 +1792,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData presentStream;
       private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
-
+      private List<ColumnVector> vectors;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
         this.columnIndex = columnIndex;
@@ -1530,7 +1824,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
                 dataStream);
 
         boolean isFileCompressed = compressionCodec != null;
-        return new ByteStreamReader(columnIndex, present, data, isFileCompressed);
+        return new ByteStreamReader(columnIndex, present, data, isFileCompressed, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -1544,20 +1843,37 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
     private SettableUncompressedStream _lengthsStream;
+    private List<ColumnVector> vectors;
+      private int vectorIndex = 0;
 
     private BinaryStreamReader(int columnId, SettableUncompressedStream present,
         SettableUncompressedStream data, SettableUncompressedStream length,
         boolean isFileCompressed,
-        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context) throws IOException {
+        OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context, List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data, length, encoding, context);
       this._isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._dataStream = data;
       this._lengthsStream = length;
+      this.vectors = vectors;
+    }
+
+    @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -1585,6 +1901,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1604,6 +1921,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData lengthStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
+      private List<ColumnVector> vectors;
       private TreeReaderFactory.Context context;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -1653,7 +1971,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
         boolean isFileCompressed = compressionCodec != null;
         return new BinaryStreamReader(columnIndex, present, data, length, isFileCompressed,
-            columnEncoding, context);
+            columnEncoding, context, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -1666,17 +1989,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     private boolean _isFileCompressed;
     private SettableUncompressedStream _presentStream;
     private SettableUncompressedStream _dataStream;
+    private List<ColumnVector> vectors;
+    private int vectorIndex = 0;
 
     private BooleanStreamReader(int columnId, SettableUncompressedStream present,
-        SettableUncompressedStream data, boolean isFileCompressed) throws IOException {
+        SettableUncompressedStream data, boolean isFileCompressed,
+        List<ColumnVector> vectors) throws IOException {
       super(columnId, present, data);
       this._isFileCompressed = isFileCompressed;
       this._presentStream = present;
       this._dataStream = data;
+      this.vectors = vectors;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
+      if (vectors != null) return;
       if (present != null) {
         if (_isFileCompressed) {
           index.getNext();
@@ -1695,8 +2023,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
+    public void nextVector(
+        ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+      if (vectors == null) {
+        super.nextVector(previousVector, isNull, batchSize);
+        return;
+      }
+      vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+      if (vectorIndex == vectors.size()) {
+        vectors = null;
+      }
+    }
+
+    @Override
     public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
         throws IOException {
+      assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
       ColumnStreamData[] streamsData = batch.getColumnData(columnId);
       if (_presentStream != null) {
         _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1711,7 +2053,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private ColumnStreamData presentStream;
       private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
-
+      private List<ColumnVector> vectors;
 
       public StreamReaderBuilder setColumnIndex(int columnIndex) {
         this.columnIndex = columnIndex;
@@ -1743,7 +2085,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
                 dataStream);
 
         boolean isFileCompressed = compressionCodec != null;
-        return new BooleanStreamReader(columnIndex, present, data, isFileCompressed);
+        return new BooleanStreamReader(columnIndex, present, data, isFileCompressed, vectors);
+      }
+
+      public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+        this.vectors = vectors;
+        return this;
       }
     }
 
@@ -1753,7 +2100,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
   }
 
   public static StructTreeReader createRootTreeReader(TypeDescription schema,
-       List<OrcProto.ColumnEncoding> encodings, EncodedColumnBatch<OrcBatchKey> batch,
+       List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch,
        CompressionCodec codec, TreeReaderFactory.Context context, int[] columnMapping)
            throws IOException {
     if (schema.getCategory() != Category.STRUCT) {
@@ -1763,7 +2110,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     List<TypeDescription> children = schema.getChildren();
     int childCount = children.size(), includedCount = 0;
     for (int childIx = 0; childIx < childCount; ++childIx) {
-      if (!batch.hasData(children.get(childIx).getId())) {
+      int batchColIx = children.get(childIx).getId();
+      if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Column at " + childIx + " " + children.get(childIx).getId()
               + ":" + children.get(childIx).toString() + " has no data");
@@ -1774,7 +2122,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
     TreeReader[] childReaders = new TreeReader[includedCount];
     for (int schemaChildIx = 0, inclChildIx = -1; schemaChildIx < childCount; ++schemaChildIx) {
-      if (!batch.hasData(children.get(schemaChildIx).getId())) continue;
+      int batchColIx = children.get(schemaChildIx).getId();
+      if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) continue;
       childReaders[++inclChildIx] = createEncodedTreeReader(
           schema.getChildren().get(schemaChildIx), encodings, batch, codec, context);
       columnMapping[inclChildIx] = schemaChildIx;
@@ -1790,10 +2139,18 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
 
 
   private static TreeReader createEncodedTreeReader(TypeDescription schema,
-      List<OrcProto.ColumnEncoding> encodings, EncodedColumnBatch<OrcBatchKey> batch,
+      List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch,
       CompressionCodec codec, TreeReaderFactory.Context context) throws IOException {
-      int columnIndex = schema.getId();
-    ColumnStreamData[] streamBuffers = batch.getColumnData(columnIndex);
+    int columnIndex = schema.getId();
+    ColumnStreamData[] streamBuffers = null;
+    List<ColumnVector> vectors = null;
+    if (batch.hasData(columnIndex)) {
+      streamBuffers = batch.getColumnData(columnIndex);
+    } else if (batch.hasVectors(columnIndex)) {
+      vectors = batch.getColumnVectors(columnIndex);
+    } else {
+      throw new AssertionError("Batch has no data for " + columnIndex + ": " + batch);
+    }
 
     // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
     // But we need to know if the original data is compressed or not. This is used to skip
@@ -1804,20 +2161,26 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
 
     // stream buffers are arranged in enum order of stream kind
-    ColumnStreamData present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE],
-      data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE],
-      dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE],
-      lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE],
+    ColumnStreamData present = null, data = null, dictionary = null,
+        lengths = null, secondary = null;
+    if (streamBuffers != null) {
+      present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE];
+      data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE];
+      dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE];
+      lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE];
       secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE];
+    }
 
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" +
+      LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} vectors: {} columnEncoding: {}" +
           " present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}",
-          columnIndex, schema, streamBuffers.length, columnEncoding, present != null,
+          columnIndex, schema, streamBuffers == null ? 0 : streamBuffers.length,
+          vectors == null ? 0 : vectors.size(), columnEncoding, present != null,
           data, dictionary != null, lengths != null, secondary != null,
           context.getWriterTimezone());
     }
+    // TODO: get rid of the builders - they serve no purpose... just call ctors directly.
     switch (schema.getCategory()) {
       case BINARY:
       case BOOLEAN:
@@ -1833,9 +2196,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       case DECIMAL:
       case TIMESTAMP:
       case DATE:
-        return getPrimitiveTreeReaders(columnIndex, schema, codec, columnEncoding,
-            present, data, dictionary, lengths, secondary, context);
+        return getPrimitiveTreeReader(columnIndex, schema, codec, columnEncoding,
+            present, data, dictionary, lengths, secondary, context, vectors);
       case LIST:
+        assert vectors == null; // Not currently supported.
         TypeDescription elementType = schema.getChildren().get(0);
         TreeReader elementReader = createEncodedTreeReader(
             elementType, encodings, batch, codec, context);
@@ -1849,6 +2213,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setContext(context)
             .build();
       case MAP:
+        assert vectors == null; // Not currently supported.
         TypeDescription keyType = schema.getChildren().get(0);
         TypeDescription valueType = schema.getChildren().get(1);
         TreeReader keyReader = createEncodedTreeReader(
@@ -1866,6 +2231,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setContext(context)
             .build();
       case STRUCT: {
+        assert vectors == null; // Not currently supported.
         int childCount = schema.getChildren().size();
         TreeReader[] childReaders = new TreeReader[childCount];
         for (int i = 0; i < childCount; i++) {
@@ -1883,6 +2249,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .build();
       }
       case UNION: {
+        assert vectors == null; // Not currently supported.
         int childCount = schema.getChildren().size();
         TreeReader[] childReaders = new TreeReader[childCount];
         for (int i = 0; i < childCount; i++) {
@@ -1905,11 +2272,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
   }
 
-  private static TreeReader getPrimitiveTreeReaders(final int columnIndex,
+  private static TreeReader getPrimitiveTreeReader(final int columnIndex,
       TypeDescription columnType, CompressionCodec codec, OrcProto.ColumnEncoding columnEncoding,
       ColumnStreamData present, ColumnStreamData data, ColumnStreamData dictionary,
-      ColumnStreamData lengths, ColumnStreamData secondary, TreeReaderFactory.Context context)
-          throws IOException {
+      ColumnStreamData lengths, ColumnStreamData secondary, TreeReaderFactory.Context context,
+      List<ColumnVector> vectors) throws IOException {
     switch (columnType.getCategory()) {
       case BINARY:
         return BinaryStreamReader.builder()
@@ -1919,6 +2286,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setLengthStream(lengths)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .setContext(context)
             .build();
       case BOOLEAN:
@@ -1927,6 +2295,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setPresentStream(present)
             .setDataStream(data)
             .setCompressionCodec(codec)
+            .setVectors(vectors)
             .build();
       case BYTE:
         return ByteStreamReader.builder()
@@ -1934,6 +2303,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setPresentStream(present)
             .setDataStream(data)
             .setCompressionCodec(codec)
+            .setVectors(vectors)
             .build();
       case SHORT:
         return ShortStreamReader.builder()
@@ -1942,6 +2312,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setDataStream(data)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .setContext(context)
             .build();
       case INT:
@@ -1951,6 +2322,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setDataStream(data)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .setContext(context)
             .build();
       case LONG:
@@ -1960,6 +2332,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setDataStream(data)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .setContext(context)
             .build();
       case FLOAT:
@@ -1968,6 +2341,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setPresentStream(present)
             .setDataStream(data)
             .setCompressionCodec(codec)
+            .setVectors(vectors)
             .build();
       case DOUBLE:
         return DoubleStreamReader.builder()
@@ -1975,6 +2349,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setPresentStream(present)
             .setDataStream(data)
             .setCompressionCodec(codec)
+            .setVectors(vectors)
             .build();
       case CHAR:
         return CharStreamReader.builder()
@@ -1986,6 +2361,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setDictionaryStream(dictionary)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .build();
       case VARCHAR:
         return VarcharStreamReader.builder()
@@ -1997,6 +2373,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setDictionaryStream(dictionary)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .build();
       case STRING:
         return StringStreamReader.builder()
@@ -2007,6 +2384,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setDictionaryStream(dictionary)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .build();
       case DECIMAL:
         return DecimalStreamReader.builder()
@@ -2018,6 +2396,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setScaleStream(secondary)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .setContext(context)
             .build();
       case TIMESTAMP:
@@ -2028,6 +2407,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setNanosStream(secondary)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .setContext(context)
             .build();
       case DATE:
@@ -2037,6 +2417,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             .setDataStream(data)
             .setCompressionCodec(codec)
             .setColumnEncoding(columnEncoding)
+            .setVectors(vectors)
             .setContext(context)
             .build();
     default:

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 1c5f0e6..31b0609 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -19,12 +19,15 @@
 package org.apache.hadoop.hive.ql.io.orc.encoded;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.orc.DataReader;
 import org.apache.orc.OrcProto;
 
@@ -33,6 +36,17 @@ import org.apache.orc.OrcProto;
  */
 public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
 
+  /**
+   * Creates the encoded reader.
+   * @param fileKey File ID to read, to use for cache lookups and such.
+   * @param dataCache Data cache to use for cache lookups.
+   * @param dataReader Data reader to read data not found in cache (from disk, HDFS, and such).
+   * @param pf Pool factory to create object pools.
+   * @return The reader.
+   */
+  EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
+      PoolFactory pf) throws IOException;
+
   /** The factory that can create (or return) the pools used by encoded reader. */
   public interface PoolFactory {
     <T> Pool<T> createPool(int size, PoolObjectHelper<T> helper);
@@ -61,16 +75,49 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
     public void initOrcColumn(int colIx) {
       super.initColumn(colIx, MAX_DATA_STREAMS);
     }
-  }
 
-  /**
-   * Creates the encoded reader.
-   * @param fileKey File ID to read, to use for cache lookups and such.
-   * @param dataCache Data cache to use for cache lookups.
-   * @param dataReader Data reader to read data not found in cache (from disk, HDFS, and such).
-   * @param pf Pool factory to create object pools.
-   * @return The reader.
-   */
-  EncodedReader encodedReader(
-      Object fileKey, DataCache dataCache, DataReader dataReader, PoolFactory pf) throws IOException;
+    /**
+     * Same as columnData, but for the data that already comes as VRBs.
+     * The combination of the two contains all the necessary data,
+     */
+    protected List<ColumnVector>[] columnVectors;
+
+    @Override
+    public void reset() {
+      super.reset();
+      if (columnVectors == null) return;
+      Arrays.fill(columnVectors, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void initColumnWithVectors(int colIx, List<ColumnVector> data) {
+      if (columnVectors == null) {
+        columnVectors = new List[columnData.length];
+      }
+      columnVectors[colIx] = data;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void resetColumnArrays(int columnCount) {
+      super.resetColumnArrays(columnCount);
+      if (columnVectors != null && columnCount == columnVectors.length) {
+        Arrays.fill(columnVectors, null);
+        return;
+      } if (columnVectors != null) {
+        columnVectors = new List[columnCount];
+      } else {
+        columnVectors = null;
+      }
+    }
+
+    public boolean hasVectors(int colIx) {
+      return columnVectors != null && columnVectors[colIx] != null;
+    }
+
+    public List<ColumnVector> getColumnVectors(int colIx) {
+      if (!hasVectors(colIx)) throw new AssertionError("No data for column " + colIx);
+      return columnVectors[colIx];
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
index b894c11e..63084b9 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
@@ -113,6 +113,7 @@ public class EncodedColumnBatch<BatchKey> {
     }
   }
 
+
   public void initColumn(int colIx, int streamCount) {
     hasData[colIx] = true;
     if (columnData[colIx] == null || columnData[colIx].length != streamCount) {

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index bbd9ca6..f914a22 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -472,4 +472,15 @@ public class BytesColumnVector extends ColumnVector {
       }
     }
   }
+
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    BytesColumnVector other = (BytesColumnVector)otherCv;
+    super.shallowCopyTo(other);
+    other.nextFree = nextFree;
+    other.vector = vector;
+    other.start = start;
+    other.length = length;
+    other.buffer = buffer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
index 6f090a1..065c1fa 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
@@ -214,4 +214,16 @@ public abstract class ColumnVector {
      */
     public abstract void stringifyValue(StringBuilder buffer,
                                         int row);
+
+    /**
+     * Shallow copy of the contents of this vector to the other vector;
+     * replaces other vector's values.
+     */
+    public void shallowCopyTo(ColumnVector otherCv) {
+      otherCv.isNull = isNull;
+      otherCv.noNulls = noNulls;
+      otherCv.isRepeating = isRepeating;
+      otherCv.preFlattenIsRepeating = preFlattenIsRepeating;
+      otherCv.preFlattenNoNulls = preFlattenNoNulls;
+    }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
index e4f8d82..67076eb 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
@@ -18,11 +18,9 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.math.BigInteger;
 
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.common.type.FastHiveDecimal;
 
 public class DecimalColumnVector extends ColumnVector {
 
@@ -142,4 +140,13 @@ public class DecimalColumnVector extends ColumnVector {
       vector[i] = new HiveDecimalWritable(0);  // Initially zero.
     }
   }
+
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    DecimalColumnVector other = (DecimalColumnVector)otherCv;
+    super.shallowCopyTo(other);
+    other.scale = scale;
+    other.precision = precision;
+    other.vector = vector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
index bd421f4..11409bd 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
 import java.util.Arrays;
 
 /**
@@ -175,4 +174,11 @@ public class DoubleColumnVector extends ColumnVector {
       }
     }
   }
+
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    DoubleColumnVector other = (DoubleColumnVector)otherCv;
+    super.shallowCopyTo(other);
+    other.vector = vector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java
index c4a6c0f..e876c05 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java
@@ -364,4 +364,12 @@ public class IntervalDayTimeColumnVector extends ColumnVector {
       }
     }
   }
+
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    IntervalDayTimeColumnVector other = (IntervalDayTimeColumnVector)otherCv;
+    super.shallowCopyTo(other);
+    other.totalSeconds = totalSeconds;
+    other.nanos = nanos;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
index 80d4731..3ae6a33 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.exec.vector;
 
-import java.io.IOException;
 import java.util.Arrays;
 
 /**
@@ -221,4 +220,11 @@ public class LongColumnVector extends ColumnVector {
       }
     }
   }
+
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    LongColumnVector other = (LongColumnVector)otherCv;
+    super.shallowCopyTo(other);
+    other.vector = vector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
index 1aeff83..892e8d8 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
@@ -147,4 +147,8 @@ public abstract class MultiValuedColumnVector extends ColumnVector {
     childCount = 0;
   }
 
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    throw new UnsupportedOperationException(); // Implement in future, if needed.
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
index cf07bca..a361899 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
@@ -129,4 +129,9 @@ public class StructColumnVector extends ColumnVector {
       fields[i].setRepeating(isRepeating);
     }
   }
+
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    throw new UnsupportedOperationException(); // Implement if needed.
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
index 28997a0..9d579ce 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
@@ -416,4 +416,12 @@ public class TimestampColumnVector extends ColumnVector {
       }
     }
   }
+
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    TimestampColumnVector other = (TimestampColumnVector)otherCv;
+    super.shallowCopyTo(other);
+    other.time = time;
+    other.nanos = nanos;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
index 0c61243..151f791 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
@@ -137,4 +137,9 @@ public class UnionColumnVector extends ColumnVector {
       fields[i].setRepeating(isRepeating);
     }
   }
+
+  @Override
+  public void shallowCopyTo(ColumnVector otherCv) {
+    throw new UnsupportedOperationException(); // Implement if needed.
+  }
 }


[3/3] hive git commit: HIVE-15672 : LLAP text cache: improve first query perf II (Sergey Shelukhin, reviewed by Prasanth Jayachandran, Owen O'Malley)

Posted by se...@apache.org.
HIVE-15672 : LLAP text cache: improve first query perf II (Sergey Shelukhin, reviewed by Prasanth Jayachandran, Owen O'Malley)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f273cc5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f273cc5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f273cc5

Branch: refs/heads/master
Commit: 8f273cc53f62c79f8ac30453e3ff94717d91b4dd
Parents: fbe9b05
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Feb 9 18:26:22 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Feb 9 18:26:22 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../hive/llap/cache/SerDeLowLevelCacheImpl.java |  13 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |  22 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java | 883 +++++++++++++------
 .../io/encoded/VertorDeserializeOrcWriter.java  | 285 +++++-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  39 +
 .../orc/encoded/EncodedTreeReaderFactory.java   | 497 +++++++++--
 .../hadoop/hive/ql/io/orc/encoded/Reader.java   |  69 +-
 .../common/io/encoded/EncodedColumnBatch.java   |   1 +
 .../hive/ql/exec/vector/BytesColumnVector.java  |  11 +
 .../hive/ql/exec/vector/ColumnVector.java       |  12 +
 .../ql/exec/vector/DecimalColumnVector.java     |  11 +-
 .../hive/ql/exec/vector/DoubleColumnVector.java |   8 +-
 .../vector/IntervalDayTimeColumnVector.java     |   8 +
 .../hive/ql/exec/vector/LongColumnVector.java   |   8 +-
 .../ql/exec/vector/MultiValuedColumnVector.java |   4 +
 .../hive/ql/exec/vector/StructColumnVector.java |   5 +
 .../ql/exec/vector/TimestampColumnVector.java   |   8 +
 .../hive/ql/exec/vector/UnionColumnVector.java  |   5 +
 19 files changed, 1504 insertions(+), 388 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e82758f..b27b663 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2922,6 +2922,9 @@ public class HiveConf extends Configuration {
         LLAP_ALLOCATOR_MAX_ALLOC + "."),
     LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED("hive.llap.io.encode.vector.serde.enabled", true,
         "Whether LLAP should use vectorized SerDe reader to read text data when re-encoding."),
+    LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED("hive.llap.io.encode.vector.serde.async.enabled",
+        true,
+        "Whether LLAP should use async mode in vectorized SerDe reader to read text data."),
     LLAP_IO_ENCODE_SLICE_ROW_COUNT("hive.llap.io.encode.slice.row.count", 100000,
         "Row count to use to separate cache slices when reading encoded data from row-based\n" +
         "inputs into LLAP cache, if this feature is enabled."),

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index 85fae9a..4809398 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -113,7 +113,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
 
   public static final class StripeData {
     // In LRR case, if we just store 2 boundaries (which could be split boundaries or reader
-    // positions, we wouldn't be able to account for torn rows correctly because the semantics of
+    // positions), we wouldn't be able to account for torn rows correctly because the semantics of
     // our "exact" reader positions, and inexact split boundaries, are different. We cannot even
     // tell LRR to use exact boundaries, as there can be a mismatch in an original mid-file split
     // wrt first row when caching - we may produce incorrect result if we adjust the split
@@ -182,7 +182,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
           + firstStart + " to [" + lastStart + ", " + lastEnd + ")";
     }
 
-    public static StripeData duplicateForResults(StripeData s) {
+    public static StripeData duplicateStructure(StripeData s) {
       return new StripeData(s.knownTornStart, s.firstStart, s.lastStart, s.lastEnd,
           s.rowCount, new OrcProto.ColumnEncoding[s.encodings.length]);
     }
@@ -389,14 +389,14 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
     if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
       LlapIoImpl.CACHE_LOGGER.trace("Got stripe in cache " + cStripe);
     }
-    StripeData stripe = StripeData.duplicateForResults(cStripe);
+    StripeData stripe = StripeData.duplicateStructure(cStripe);
     result.stripes.add(stripe);
     boolean isMissed = false;
     for (int colIx = 0; colIx < cached.colCount; ++colIx) {
       if (!includes[colIx]) continue;
       if (cStripe.encodings[colIx] == null || cStripe.data[colIx] == null) {
         if (cStripe.data[colIx] != null) {
-          assert false : cStripe;
+          throw new AssertionError(cStripe);
           // No encoding => must have no data.
         }
         isMissed = true;
@@ -419,9 +419,9 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
             LlapIoImpl.CACHE_LOGGER.info("Couldn't lock data for stripe at "
                 + stripeIx + ", colIx " + colIx + ", stream type " + streamIx);
 
+            handleRemovedColumnData(cColData);
             cColData = null;
             isMissed = true;
-            handleRemovedColumnData(cColData);
             if (gotAllData != null) {
               gotAllData.value = false;
             }
@@ -432,6 +432,9 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
       // At this point, we have arrived at the level where we need all the data, and the
       // arrays never change. So we will just do a shallow assignment here instead of copy.
       stripe.data[colIx] = cColData;
+      if (cColData == null) {
+        stripe.encodings[colIx] = null;
+      }
     }
     doMetricsStuffForOneSlice(qfCounters, stripe, isMissed);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 4295c1c..8d96e7b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -53,12 +52,8 @@ import org.apache.orc.impl.SchemaEvolution;
 import org.apache.orc.impl.TreeReaderFactory;
 import org.apache.orc.impl.TreeReaderFactory.StructTreeReader;
 import org.apache.orc.impl.TreeReaderFactory.TreeReader;
-import org.apache.orc.OrcProto;
 import org.apache.orc.impl.WriterImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
+import org.apache.orc.OrcProto;
 
 
 public class OrcEncodedDataConsumer
@@ -134,7 +129,7 @@ public class OrcEncodedDataConsumer
             schema, stripeMetadata.getEncodings(), batch, codec, context, columnMapping);
         this.columnReaders = treeReader.getChildReaders();
         this.columnMapping = Arrays.copyOf(columnMapping, columnReaders.length);
-        positionInStreams(columnReaders, batch, stripeMetadata);
+        positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata);
       } else {
         repositionInStreams(this.columnReaders, batch, sameStripe, stripeMetadata);
       }
@@ -225,8 +220,8 @@ public class OrcEncodedDataConsumer
   }
 
   private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
-      EncodedColumnBatch<OrcBatchKey> batch, ConsumerStripeMetadata stripeMetadata) throws IOException {
-    PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+      OrcBatchKey batchKey, ConsumerStripeMetadata stripeMetadata) throws IOException {
+    PositionProvider[] pps = createPositionProviders(columnReaders, batchKey, stripeMetadata);
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       columnReaders[i].seek(pps);
@@ -236,10 +231,13 @@ public class OrcEncodedDataConsumer
   private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
       EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe,
       ConsumerStripeMetadata stripeMetadata) throws IOException {
-    PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+    PositionProvider[] pps = createPositionProviders(
+        columnReaders, batch.getBatchKey(), stripeMetadata);
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       TreeReader reader = columnReaders[i];
+      // Note: we assume this never happens for SerDe reader - the batch would never have vectors.
+      // That is always true now; but it wasn't some day, the below would throw in getColumnData.
       ((SettableTreeReader) reader).setBuffers(batch, sameStripe);
       // TODO: When hive moves to java8, make updateTimezone() as default method in
       // SettableTreeReader so that we can avoid this check.
@@ -268,7 +266,7 @@ public class OrcEncodedDataConsumer
   }
 
   private PositionProvider[] createPositionProviders(
-      TreeReaderFactory.TreeReader[] columnReaders, EncodedColumnBatch<OrcBatchKey> batch,
+      TreeReaderFactory.TreeReader[] columnReaders, OrcBatchKey batchKey,
       ConsumerStripeMetadata stripeMetadata) throws IOException {
     if (columnReaders.length == 0) return null;
     PositionProvider[] pps = null;
@@ -279,7 +277,7 @@ public class OrcEncodedDataConsumer
         pps[i] = singleRgPp;
       }
     } else {
-      int rowGroupIndex = batch.getBatchKey().rgIx;
+      int rowGroupIndex = batchKey.rgIx;
       if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
         throw new IOException("Cannot position readers without RG information");
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 8d86d17..419043a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -52,6 +52,10 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.SerDeStripeMetadata;
 import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
+import org.apache.hadoop.hive.llap.io.encoded.VertorDeserializeOrcWriter.AsyncCallback;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
@@ -158,19 +162,21 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   private final InputFormat<?, ?> sourceInputFormat;
   private final Reporter reporter;
   private final JobConf jobConf;
+  private final TypeDescription schema;
   private final int allocSize;
   private final int targetSliceRowCount;
   private final boolean isLrrEnabled;
 
   private final boolean[] writerIncludes;
-  private EncodingWriter writer = null;
-  private CacheWriter cacheWriter = null;
+  private FileReaderYieldReturn currentFileRead = null;
+
   /**
    * Data from cache currently being processed. We store it here so that we could decref
    * it in case of failures. We remove each slice from the data after it has been sent to
    * the consumer, at which point the consumer is responsible for it.
    */
   private FileData cachedData;
+  private List<VertorDeserializeOrcWriter> asyncWriters = new ArrayList<>();
 
   public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache,
       BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split,
@@ -210,6 +216,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     this.sourceSerDe = sourceSerDe;
     this.reporter = reporter;
     this.jobConf = jobConf;
+    this.schema = schema;
     this.writerIncludes = OrcInputFormat.genIncludedColumns(schema, columnIds);
     SchemaEvolution evolution = new SchemaEvolution(schema,
         new Reader.Options(jobConf).include(writerIncludes));
@@ -243,7 +250,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     throw new UnsupportedOperationException();
   }
 
-  // TODO: move to base class?
+  // TODO: move to a base class?
   @Override
   protected Void callInternal() throws IOException, InterruptedException {
     return ugi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -254,21 +261,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     });
   }
 
-  public static class CacheOutStream extends OutStream {
-    private final CacheOutputReceiver receiver;
-    public CacheOutStream(String name, int bufferSize, CompressionCodec codec,
-        CacheOutputReceiver receiver) throws IOException {
-      super(name, bufferSize, codec, receiver);
-      this.receiver = receiver;
-    }
-
-    @Override
-    public void clear() throws IOException {
-      super.clear();
-      receiver.clear();
-    }
-  }
-
   /** A row-based (Writable) reader that may also be able to report file offsets. */
   interface ReaderWithOffsets {
     /** Moves the reader to the next row. */
@@ -333,7 +325,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private CacheStripeData currentStripe;
     private final List<CacheStripeData> stripes = new ArrayList<>();
     private final BufferUsageManager bufferManager;
-    private final int bufferSize;
+    /**
+     * For !doesSourceHaveIncludes case, stores global column IDs to verify writer columns.
+     * For doesSourceHaveIncludes case, stores source column IDs used to map things.
+     */
     private final List<Integer> columnIds;
     private final boolean[] writerIncludes;
     // These are global since ORC reuses objects between stripes.
@@ -341,14 +336,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private final Map<Integer, List<CacheOutputReceiver>> colStreams = new HashMap<>();
     private final boolean doesSourceHaveIncludes;
 
-    public CacheWriter(BufferUsageManager bufferManager, int bufferSize,
-        List<Integer> columnIds, boolean[] writerIncludes, boolean doesSourceHaveIncludes) {
+    public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds,
+        boolean[] writerIncludes, boolean doesSourceHaveIncludes) {
       this.bufferManager = bufferManager;
-      this.bufferSize = bufferSize;
-      this.columnIds = columnIds;
       assert writerIncludes != null; // Taken care of on higher level.
       this.writerIncludes = writerIncludes;
       this.doesSourceHaveIncludes = doesSourceHaveIncludes;
+      this.columnIds = columnIds;
       startStripe();
     }
 
@@ -456,18 +450,18 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
     @Override
     public void writeHeader() throws IOException {
-
     }
 
     @Override
-    public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec) throws IOException {
+    public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index,
+        CompressionCodec codec) throws IOException {
       // TODO: right now we treat each slice as a stripe with a single RG and never bother
       //       with indexes. In phase 4, we need to add indexing and filtering.
     }
 
     @Override
-    public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec) throws IOException {
-
+    public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom,
+        CompressionCodec codec) throws IOException {
     }
 
     @Override
@@ -545,9 +539,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
 
     @Override
-    public void appendRawStripe(ByteBuffer stripe,
-                                OrcProto.StripeInformation.Builder dirEntry
-                                ) throws IOException {
+    public void appendRawStripe(
+        ByteBuffer stripe, OrcProto.StripeInformation.Builder dirEntry) throws IOException {
       throw new UnsupportedOperationException(); // Only used in ACID writer.
     }
 
@@ -586,7 +579,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     @Override
     public void suppress() {
       suppressed = true;
-      buffers = null;
       lastBufferPos = -1;
     }
 
@@ -639,6 +631,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private static class NullOutputReceiver implements OutputReceiver {
+    @SuppressWarnings("unused")
     private final StreamName name;
 
     public NullOutputReceiver(StreamName name) {
@@ -655,6 +648,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   protected Void performDataRead() throws IOException {
+    boolean isOk = false;
     try {
       try {
         long startTime = counters.startTimeCounter();
@@ -667,10 +661,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
         try {
           isFromCache = readFileWithCache(startTime);
         } finally {
+          // Note that the code removes the data from the field as it's passed to the consumer,
+          // so we expect to have stuff remaining in there only in case of errors.
           if (cachedData != null && cachedData.getData() != null) {
             for (StripeData sd : cachedData.getData()) {
               unlockAllBuffers(sd);
             }
+            cachedData = null;
           }
         }
         if (isFromCache == null) return null; // Stop requested, and handled inside.
@@ -680,20 +677,24 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
         // Done with all the things.
         recordReaderTime(startTime);
+        if (LlapIoImpl.LOG.isTraceEnabled()) {
+          LlapIoImpl.LOG.trace("done processing {}", split);
+        }
       } catch (Throwable e) {
         LlapIoImpl.LOG.error("Exception while processing", e);
         consumer.setError(e);
         throw e;
       }
       consumer.setDone();
-
-      LlapIoImpl.LOG.trace("done processing {}", split);
+      isOk = true;
       return null;
     } finally {
-      cleanupReaders();
+      cleanup(!isOk);
+      // Do not clean up the writers - the callback should do it.
     }
   }
 
+
   private void unlockAllBuffers(StripeData si) {
     for (int i = 0; i < si.getData().length; ++i) {
       LlapDataBuffer[][] colData = si.getData()[i];
@@ -718,6 +719,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
         // Make data consistent with encodings, don't store useless information.
         if (sd.getData()[i] == null) {
           encodings[i] = null;
+        } else if (encodings[i] == null) {
+          throw new AssertionError("Caching data without an encoding at " + i + ": " + sd);
         }
       }
       FileData fd = new FileData(fileKey, encodings.length);
@@ -732,7 +735,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     // This relies on sequence of calls to cacheFileData and sendEcb..
   }
 
-
   private void lockAllBuffers(StripeData sd) {
     for (int i = 0; i < sd.getData().length; ++i) {
       LlapDataBuffer[][] colData = sd.getData()[i];
@@ -755,7 +757,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     this.cachedData = cache.getFileData(fileKey, split.getStart(),
         endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
     if (cachedData == null) {
-      LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache");
+      if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+        LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache");
+      }
       return false;
     }
     String[] hosts = extractHosts(split, false), inMemoryHosts = extractHosts(split, true);
@@ -796,7 +800,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
 
     if (uncachedSuffixStart < endOfSplit || isUnfortunate) {
-      // TODO: will 0-length split work? should we assume 1+ chars and add 1 for isUnfortunate?
+      // Note: we assume 0-length split is correct given now LRR interprets offsets (reading an
+      // extra row). Should we instead assume 1+ chars and add 1 for isUnfortunate?
       FileSplit splitPart = new FileSplit(split.getPath(), uncachedSuffixStart,
           endOfSplit - uncachedSuffixStart, hosts, inMemoryHosts);
       if (!processOneFileSplit(splitPart, startTime, stripeIx, null)) return null;
@@ -806,101 +811,119 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   public boolean processOneFileSplit(FileSplit split, long startTime,
       Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
-    ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
     LlapIoImpl.LOG.info("Processing one split {" + split.getPath() + ", "
         + split.getStart() + ", " + split.getLength() + "}");
     if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
       LlapIoImpl.CACHE_LOGGER.trace("Cache data for the split is " + slice);
     }
-    boolean[] splitIncludes = writerIncludes;
-    boolean hasAllData = false;
-    if (cacheEncodings != null) {
-      hasAllData = true;
-      splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
-      for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
-        if (!splitIncludes[colIx]) continue;
-        assert (cacheEncodings[colIx] != null) == (slice.getData()[colIx] != null);
-        if (cacheEncodings[colIx] != null) {
-          splitIncludes[colIx] = false;
-        } else {
-          hasAllData = false;
-        }
-      }
-    }
-    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
-      LlapIoImpl.LOG.trace("Includes accounting for cached data: before " + DebugUtils.toString(
-        writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
-    }
+    boolean[] splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
+    boolean hasAllData = slice != null
+        && determineSplitIncludes(slice, splitIncludes, writerIncludes);
+
     // We have 3 cases here:
     // 1) All the data is in the cache. Always a single slice, no disk read, no cache puts.
     // 2) Some data is in the cache. Always a single slice, disk read and a single cache put.
     // 3) No data is in the cache. Multiple slices, disk read and multiple cache puts.
-    if (!hasAllData) {
-      // This initializes cacheWriter with data.
-      readSplitFromFile(split, splitIncludes, slice);
-      assert cacheWriter != null;
-    }
-    if (slice != null) {
-      // If we had a cache range already, it should not have been split.
-      assert cacheWriter == null || cacheWriter.stripes.size() == 1;
-      CacheWriter.CacheStripeData csd = hasAllData ? null : cacheWriter.stripes.get(0);
+    if (hasAllData) {
+      // Everything comes from cache.
+      CacheWriter.CacheStripeData csd = null;
       boolean result = processOneSlice(csd, splitIncludes, stripeIxRef.value, slice, startTime);
       ++stripeIxRef.value;
       return result;
-    } else {
-      for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
-        if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
-          return false;
+    }
+
+    boolean result = false;
+    // This initializes currentFileRead.
+    startReadSplitFromFile(split, splitIncludes, slice);
+    try {
+      if (slice != null) {
+        // If we had a cache range already, we expect a single matching disk slice.
+        Vectors vectors = currentFileRead.readNextSlice();
+        if (!vectors.isSupported()) {
+          // Not in VRB mode - the new cache data is ready, we should use it.
+          CacheWriter cacheWriter = currentFileRead.getCacheWriter();
+          assert cacheWriter.stripes.size() == 1;
+          result = processOneSlice(
+              cacheWriter.stripes.get(0), splitIncludes, stripeIxRef.value, slice, startTime);
+        } else {
+          // VRB mode - process the VRBs with cache data; the new cache data is coming later.
+          result = processOneSlice(
+              vectors, splitIncludes, stripeIxRef.value, slice, startTime);
         }
+        assert null == currentFileRead.readNextSlice();
         ++stripeIxRef.value;
+      } else {
+        // All the data comes from disk. The reader may have split it into multiple slices.
+        Vectors vectors = currentFileRead.readNextSlice();
+        assert vectors != null;
+        result = true;
+        if (!vectors.isSupported()) {
+          // Not in VRB mode - the new cache data is (partially) ready, we should use it.
+          while (currentFileRead.readNextSlice() != null); // Force the rest of the data thru.
+          CacheWriter cacheWriter = currentFileRead.getCacheWriter();
+          for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
+            if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
+              result = false;
+              break;
+            }
+            ++stripeIxRef.value;
+          }
+        } else {
+          // VRB mode - process the VRBs with cache data; the new cache data is coming later.
+          do {
+            assert vectors.isSupported();
+            if (!processOneSlice(vectors, splitIncludes, stripeIxRef.value, null, startTime)) {
+              result = false;
+              break;
+            }
+            ++stripeIxRef.value;
+          } while ((vectors = currentFileRead.readNextSlice()) != null);
+        }
       }
-      return true;
+    } finally {
+      cleanUpCurrentRead();
     }
+    return result;
   }
 
-  private boolean processOneSlice(CacheWriter.CacheStripeData csd, boolean[] splitIncludes,
-      int stripeIx, StripeData slice, long startTime) throws IOException {
-    String sliceStr = slice == null ? "null" : slice.toCoordinateString();
-    if (LlapIoImpl.LOG.isDebugEnabled()) {
-      LlapIoImpl.LOG.debug("Processing slice #" + stripeIx + " " + sliceStr + "; has"
-        + ((slice == null) ? " no" : "") + " cache data; has" + ((csd == null) ? " no" : "")
-        + " disk data");
+  private static boolean determineSplitIncludes(
+      StripeData slice, boolean[] splitIncludes, boolean[] writerIncludes) {
+    ColumnEncoding[] cacheEncodings = slice.getEncodings();
+    assert cacheEncodings != null;
+    boolean hasAllData = true;
+    for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+      if (!splitIncludes[colIx]) continue;
+      if ((cacheEncodings[colIx] != null) != (slice.getData()[colIx] != null)) {
+        throw new AssertionError("Inconsistent cache slice " + slice);
+      }
+      if (cacheEncodings[colIx] != null) {
+        splitIncludes[colIx] = false;
+      } else {
+        hasAllData = false;
+      }
+    }
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOG.trace("Includes accounting for cached data: before " + DebugUtils.toString(
+        writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
     }
+    return hasAllData;
+  }
 
-    ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
-    LlapDataBuffer[][][] cacheData = slice == null ? null : slice.getData();
-    long cacheRowCount = slice == null ? -1L : slice.getRowCount();
+  private boolean processOneSlice(CacheWriter.CacheStripeData diskData, boolean[] splitIncludes,
+      int stripeIx, StripeData cacheData, long startTime) throws IOException {
+    logProcessOneSlice(stripeIx, diskData, cacheData);
+
+    ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
+    LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+    long cacheRowCount = cacheData == null ? -1L : cacheData.getRowCount();
     SerDeStripeMetadata metadata = new SerDeStripeMetadata(stripeIx);
     StripeData sliceToCache = null;
-    boolean hasAllData = csd == null;
+    boolean hasAllData = diskData == null;
     if (!hasAllData) {
-      if (slice == null) {
-        sliceToCache = new StripeData(
-            csd.knownTornStart, csd.firstRowStart, csd.lastRowStart, csd.lastRowEnd,
-            csd.rowCount, csd.encodings.toArray(new ColumnEncoding[csd.encodings.size()]));
-      } else {
-        if (csd.rowCount != slice.getRowCount()) {
-          throw new IOException("Row count mismatch; disk " + csd.rowCount + ", cache "
-              + slice.getRowCount() + " from " + csd + " and " + slice);
-        }
-        if (csd.encodings.size() != slice.getEncodings().length) {
-          throw new IOException("Column count mismatch; disk " + csd.encodings.size()
-              + ", cache " + slice.getEncodings().length + " from " + csd + " and " + slice);
-        }
-        if (LlapIoImpl.LOG.isDebugEnabled()) {
-          LlapIoImpl.LOG.debug("Creating slice to cache in addition to an existing slice "
-            + slice.toCoordinateString() + "; disk offsets were " + csd.toCoordinateString());
-        }
-        sliceToCache = StripeData.duplicateForResults(slice);
-        for (int i = 0; i < csd.encodings.size(); ++i) {
-          sliceToCache.getEncodings()[i] = csd.encodings.get(i);
-        }
-        sliceToCache.setKnownTornStart(Math.min(csd.knownTornStart, slice.getKnownTornStart()));
-      }
-      metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, csd.encodings));
-      metadata.setRowCount(csd.rowCount);
+      sliceToCache = createSliceToCache(diskData, cacheData);
+      metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, diskData.encodings));
+      metadata.setRowCount(diskData.rowCount);
     } else {
-      assert cacheWriter == null;
       metadata.setEncodings(Lists.newArrayList(cacheEncodings));
       metadata.setRowCount(cacheRowCount);
     }
@@ -916,46 +939,28 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
       if (!hasAllData && splitIncludes[colIx]) {
         // The column has been read from disk.
-        List<CacheWriter.CacheStreamData> streams = csd.colStreams.get(colIx);
-        if (LlapIoImpl.LOG.isTraceEnabled()) {
-          LlapIoImpl.LOG.trace("Processing streams for column " + colIx + ": " + streams);
-        }
-        LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
-            = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+        List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
+        LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
         if (streams == null) continue; // Struct column, such as root?
         Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
         while (iter.hasNext()) {
           CacheWriter.CacheStreamData stream = iter.next();
           if (stream.isSuppressed) {
-            LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+            if (LlapIoImpl.LOG.isTraceEnabled()) {
+              LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+            }
             iter.remove();
             discardUncachedBuffers(stream.data);
             continue;
           }
-          // TODO: We write each slice using a separate writer, so we don't share dictionaries. Fix?
+          int streamIx = setStreamDataToCache(newCacheDataForCol, stream);
           ColumnStreamData cb = CSD_POOL.take();
           cb.incRef();
-          int streamIx = stream.name.getKind().getNumber();
           cb.setCacheBuffers(stream.data);
-          // This is kinda hacky - we "know" these are LlapDataBuffer-s.
-          newCacheDataForCol[streamIx] = stream.data.toArray(
-              new LlapDataBuffer[stream.data.size()]);
           ecb.setStreamData(colIx, streamIx, cb);
         }
       } else {
-        // The column has been obtained from cache.
-        LlapDataBuffer[][] colData = cacheData[colIx];
-        if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
-          LlapIoImpl.CACHE_LOGGER.trace("Processing cache data for column " + colIx + ": "
-              + SerDeLowLevelCacheImpl.toString(colData));
-        }
-        for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
-          if (colData[streamIx] == null) continue;
-          ColumnStreamData cb = CSD_POOL.take();
-          cb.incRef();
-          cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
-          ecb.setStreamData(colIx, streamIx, cb);
-        }
+        processColumnCacheData(cacheBuffers, ecb, colIx);
       }
     }
     if (processStop()) {
@@ -969,7 +974,183 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       LlapIoImpl.CACHE_LOGGER.trace("Data to cache from the read " + sliceToCache);
     }
     cacheFileData(sliceToCache);
-    return sendEcbToConsumer(ecb, slice != null, csd);
+    return sendEcbToConsumer(ecb, cacheData != null, diskData);
+  }
+
+  private void validateCacheAndDisk(StripeData cacheData,
+      long rowCount, long encodingCount, Object diskDataLog) throws IOException {
+    if (rowCount != cacheData.getRowCount()) {
+      throw new IOException("Row count mismatch; disk " + rowCount + ", cache "
+          + cacheData.getRowCount() + " from " + diskDataLog + " and " + cacheData);
+    }
+    if (encodingCount > 0 && encodingCount != cacheData.getEncodings().length) {
+      throw new IOException("Column count mismatch; disk " + encodingCount + ", cache "
+          + cacheData.getEncodings().length + " from " + diskDataLog + " and " + cacheData);
+    }
+  }
+
+
+  /** Unlike the other overload of processOneSlice, doesn't cache data. */
+  private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes,
+      int stripeIx, StripeData cacheData, long startTime) throws IOException {
+    if (diskData == null) {
+      throw new AssertionError(); // The other overload should have been used.
+    }
+    // LlapIoImpl.LOG.debug("diskData " + diskData);
+    logProcessOneSlice(stripeIx, diskData, cacheData);
+
+    if (cacheData == null && diskData.getRowCount() == 0) {
+      return true; // Nothing to process.
+    }
+    ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
+    LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+    if (cacheData != null) {
+      // Don't validate column count - no encodings for vectors.
+      validateCacheAndDisk(cacheData, diskData.getRowCount(), -1, diskData);
+    }
+    SerDeStripeMetadata metadata = new SerDeStripeMetadata(stripeIx);
+    metadata.setEncodings(Arrays.asList(cacheEncodings == null
+        ? new ColumnEncoding[splitIncludes.length] : cacheEncodings));
+    metadata.setRowCount(diskData.getRowCount());
+    if (LlapIoImpl.LOG.isTraceEnabled()) {
+      LlapIoImpl.LOG.trace("Derived stripe metadata for this split is " + metadata);
+    }
+    consumer.setStripeMetadata(metadata);
+
+    OrcEncodedColumnBatch ecb = ECB_POOL.take();
+    ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length);
+    int vectorsIx = 0;
+    for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+      if (!writerIncludes[colIx]) continue;
+      if (splitIncludes[colIx]) {
+        // Skip the 0-th column, since it won't have a vector after reading the text source.
+        if (colIx != 0 ) {
+          List<ColumnVector> vectors = diskData.getVectors(vectorsIx++);
+          if (LlapIoImpl.LOG.isTraceEnabled()) {
+            LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": " + vectors);
+          }
+          ecb.initColumnWithVectors(colIx, vectors);
+        } else {
+          ecb.initColumn(0, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+        }
+      } else {
+        ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+        processColumnCacheData(cacheBuffers, ecb, colIx);
+      }
+    }
+    if (processStop()) {
+      recordReaderTime(startTime);
+      return false;
+    }
+    return sendEcbToConsumer(ecb, cacheData != null, null);
+  }
+
+
+  private void processAsyncCacheData(CacheWriter.CacheStripeData diskData,
+      boolean[] splitIncludes) throws IOException {
+    StripeData sliceToCache = new StripeData(diskData.knownTornStart, diskData.firstRowStart,
+        diskData.lastRowStart, diskData.lastRowEnd, diskData.rowCount,
+        diskData.encodings.toArray(new ColumnEncoding[diskData.encodings.size()]));
+    for (int colIx = 0; colIx < splitIncludes.length; ++colIx) {
+      if (!splitIncludes[colIx]) continue;
+      // The column has been read from disk.
+      List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
+      LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
+      if (streams == null) continue; // Struct column, such as root?
+      Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
+      while (iter.hasNext()) {
+        CacheWriter.CacheStreamData stream = iter.next();
+        if (stream.isSuppressed) {
+          if (LlapIoImpl.LOG.isTraceEnabled()) {
+            LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+          }
+          iter.remove();
+          discardUncachedBuffers(stream.data);
+          continue;
+        }
+        setStreamDataToCache(newCacheDataForCol, stream);
+      }
+    }
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.CACHE_LOGGER.trace("Data to cache from async read " + sliceToCache);
+    }
+    try {
+      cacheFileData(sliceToCache);
+    } finally {
+      unlockAllBuffers(sliceToCache);
+    }
+  }
+
+  private StripeData createSliceToCache(
+      CacheWriter.CacheStripeData diskData, StripeData cacheData) throws IOException {
+    assert diskData != null;
+    if (cacheData == null) {
+      return new StripeData(diskData.knownTornStart, diskData.firstRowStart,
+          diskData.lastRowStart, diskData.lastRowEnd, diskData.rowCount,
+          diskData.encodings.toArray(new ColumnEncoding[diskData.encodings.size()]));
+    } else {
+      long rowCount = diskData.rowCount, encodingCount = diskData.encodings.size();
+      validateCacheAndDisk(cacheData, rowCount, encodingCount, diskData);
+      if (LlapIoImpl.LOG.isDebugEnabled()) {
+        LlapIoImpl.LOG.debug("Creating slice to cache in addition to an existing slice "
+          + cacheData.toCoordinateString() + "; disk offsets were "
+          + diskData.toCoordinateString());
+      }
+      // Note: we could just do what we already do above from disk data, except for the validation
+      // that is not strictly necessary, and knownTornStart which is an optimization.
+      StripeData sliceToCache = StripeData.duplicateStructure(cacheData);
+      for (int i = 0; i < diskData.encodings.size(); ++i) {
+        sliceToCache.getEncodings()[i] = diskData.encodings.get(i);
+      }
+      sliceToCache.setKnownTornStart(Math.min(
+          diskData.knownTornStart, sliceToCache.getKnownTornStart()));
+      return sliceToCache;
+    }
+  }
+
+
+  private static LlapDataBuffer[][] createArrayToCache(
+      StripeData sliceToCache, int colIx, List<CacheWriter.CacheStreamData> streams) {
+    if (LlapIoImpl.LOG.isTraceEnabled()) {
+      LlapIoImpl.LOG.trace("Processing streams for column " + colIx + ": " + streams);
+    }
+    LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+        = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+    return newCacheDataForCol;
+  }
+
+  private static int setStreamDataToCache(
+      LlapDataBuffer[][] newCacheDataForCol, CacheWriter.CacheStreamData stream) {
+    int streamIx = stream.name.getKind().getNumber();
+    // This is kinda hacky - we "know" these are LlapDataBuffer-s.
+    newCacheDataForCol[streamIx] = stream.data.toArray(new LlapDataBuffer[stream.data.size()]);
+    return streamIx;
+  }
+
+  private void processColumnCacheData(LlapDataBuffer[][][] cacheBuffers,
+      OrcEncodedColumnBatch ecb, int colIx) {
+    // The column has been obtained from cache.
+    LlapDataBuffer[][] colData = cacheBuffers[colIx];
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.CACHE_LOGGER.trace("Processing cache data for column " + colIx + ": "
+          + SerDeLowLevelCacheImpl.toString(colData));
+    }
+    for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
+      if (colData[streamIx] == null) continue;
+      ColumnStreamData cb = CSD_POOL.take();
+      cb.incRef();
+      cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
+      ecb.setStreamData(colIx, streamIx, cb);
+    }
+  }
+
+  private void logProcessOneSlice(int stripeIx, Object diskData, StripeData cacheData) {
+    String sliceStr = cacheData == null ? "null" : cacheData.toCoordinateString();
+    if (LlapIoImpl.LOG.isDebugEnabled()) {
+      LlapIoImpl.LOG.debug("Processing slice #" + stripeIx + " " + sliceStr + "; has"
+        + ((cacheData == null) ? " no" : "") + " cache data; has"
+        + ((diskData == null) ? " no" : "") + " disk data");
+    }
   }
 
   private void discardUncachedBuffers(List<MemoryBuffer> list) {
@@ -1003,126 +1184,309 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     return Lists.newArrayList(combinedEncodings);
   }
 
+  private static class Vectors {
+    private final List<ColumnVector>[] data;
+    private final boolean isSupported;
+    private final long rowCount;
+
+    @SuppressWarnings("unchecked")
+    public Vectors(List<VectorizedRowBatch> vrbs) {
+      if (vrbs == null) {
+        isSupported = false;
+        data = null;
+        rowCount = 0;
+        return;
+      }
+      isSupported = true;
+      if (vrbs.isEmpty()) {
+        data = null;
+        rowCount = 0;
+        return;
+      }
+      data = new List[vrbs.get(0).numCols];
+      for (int i = 0; i < data.length; ++i) {
+        data[i] = new ArrayList<>(vrbs.size());
+      }
+      int rowCount = 0;
+      for (VectorizedRowBatch vrb : vrbs) {
+        assert !vrb.selectedInUse;
+        rowCount += vrb.size;
+        for (int i = 0; i < vrb.cols.length; ++i) {
+          data[i].add(vrb.cols[i]);
+        }
+      }
+      this.rowCount = rowCount;
+    }
+
+    public List<ColumnVector> getVectors(int ix) {
+      return data[ix];
+    }
+
+    public long getRowCount() {
+      return rowCount;
+    }
+
+    public boolean isSupported() {
+      return isSupported;
+    }
+
+    @Override
+    public String toString() {
+      return "Vectors {isSupported=" + isSupported + ", rowCount=" + rowCount
+          + ", data=" + Arrays.toString(data) + "}";
+    }
+  }
+
+  /**
+   * This class only exists because Java doesn't have yield return. The original method
+   * before this change only needed yield return-s sprinkled here and there; however,
+   * Java developers are usually paid by class, so here we go.
+   */
+  private static class FileReaderYieldReturn {
+    private ReaderWithOffsets offsetReader;
+    private int rowsPerSlice = 0;
+    private long currentKnownTornStart;
+    private long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
+    private boolean hasUnsplittableData = false;
+    private final EncodingWriter writer;
+    private final boolean maySplitTheSplit;
+    private final int targetSliceRowCount;
+    private final FileSplit split;
+
+    public FileReaderYieldReturn(ReaderWithOffsets offsetReader, FileSplit split, EncodingWriter writer,
+        boolean maySplitTheSplit, int targetSliceRowCount) {
+      this.offsetReader = offsetReader;
+      currentKnownTornStart = split.getStart();
+      this.writer = writer;
+      this.maySplitTheSplit = maySplitTheSplit;
+      this.targetSliceRowCount = targetSliceRowCount;
+      this.split = split;
+    }
+
+    public CacheWriter getCacheWriter() throws IOException {
+      return writer.getCacheWriter();
+    }
+
+    public Vectors readNextSlice() throws IOException {
+      if (offsetReader == null) return null;
+      try {
+        while (offsetReader.next()) {
+          hasUnsplittableData = true;
+          Writable value = offsetReader.getCurrentRow();
+          lastStartOffset = offsetReader.getCurrentRowStartOffset();
+          if (firstStartOffset == Long.MIN_VALUE) {
+            firstStartOffset = lastStartOffset;
+          }
+          writer.writeOneRow(value);
+
+          if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
+            assert offsetReader.hasOffsets();
+            writer.flushIntermediateData();
+            long fileOffset = offsetReader.getCurrentRowEndOffset();
+            // Must support offsets to be able to split.
+            if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+              throw new AssertionError("Unable to get offsets from "
+                  + offsetReader.getClass().getSimpleName());
+            }
+            writer.setCurrentStripeOffsets(
+                currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+            writer.writeIntermediateFooter();
+
+            // Split starting at row start will not read that row.
+            currentKnownTornStart = lastStartOffset;
+            // Row offsets will be determined from the reader (we could set the first from last).
+            lastStartOffset = Long.MIN_VALUE;
+            firstStartOffset = Long.MIN_VALUE;
+            rowsPerSlice = 0;
+            return new Vectors(writer.extractCurrentVrbs());
+          }
+        }
+        try {
+          Vectors result = null;
+          if (rowsPerSlice > 0 || (!maySplitTheSplit && hasUnsplittableData)) {
+            long fileOffset = -1;
+            if (!offsetReader.hasOffsets()) {
+              // The reader doesn't support offsets. We adjust offsets to match future splits.
+              // If cached split was starting at row start, that row would be skipped, so +1
+              firstStartOffset = split.getStart() + 1;
+              // Last row starting at the end of the split would be read.
+              lastStartOffset = split.getStart() + split.getLength();
+              // However, it must end after the split end, otherwise the next one would have been read.
+              fileOffset = lastStartOffset + 1;
+              if (LlapIoImpl.CACHE_LOGGER.isDebugEnabled()) {
+                LlapIoImpl.CACHE_LOGGER.debug("Cache offsets based on the split - 'first row' at "
+                  + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
+              }
+            } else {
+              fileOffset = offsetReader.getCurrentRowEndOffset();
+              assert firstStartOffset >= 0 && lastStartOffset >= 0 && fileOffset >= 0;
+            }
+            writer.setCurrentStripeOffsets(
+                currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+            // Close the writer to finalize the metadata.
+            writer.close();
+            result = new Vectors(writer.extractCurrentVrbs());
+          } else {
+            writer.close();
+          }
+          return result;
+        } finally {
+          closeOffsetReader();
+        }
+      } catch (Exception ex) {
+        closeOffsetReader();
+        throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+      }
+    }
+
+    private void closeOffsetReader() {
+      if (offsetReader == null) return;
+      try {
+        offsetReader.close();
+      } catch (Exception ex) {
+        LlapIoImpl.LOG.error("Failed to close source reader", ex);
+      }
+      offsetReader = null;
+    }
+  }
 
-  public void readSplitFromFile(FileSplit split, boolean[] splitIncludes, StripeData slice)
-      throws IOException {
+  public void startReadSplitFromFile(
+      FileSplit split, boolean[] splitIncludes, StripeData slice) throws IOException {
     boolean maySplitTheSplit = slice == null;
     ReaderWithOffsets offsetReader = null;
     @SuppressWarnings("rawtypes")
     RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
     try {
       offsetReader = createOffsetReader(sourceReader);
-      maySplitTheSplit = maySplitTheSplit && offsetReader.hasOffsets();
+      sourceReader = null;
+    } finally {
+      if (sourceReader != null) {
+        try {
+          sourceReader.close();
+        } catch (Exception ex) {
+          LlapIoImpl.LOG.error("Failed to close source reader", ex);
+        }
+      }
+    }
+    maySplitTheSplit = maySplitTheSplit && offsetReader.hasOffsets();
 
-      // writer writes to orcWriter which writes to cacheWriter
-      // TODO: in due course, writer will also propagate row batches if it's capable
+    try {
       StructObjectInspector originalOi = (StructObjectInspector)getOiFromSerDe();
-      writer = VertorDeserializeOrcWriter.create(sourceInputFormat, sourceSerDe, parts,
-          daemonConf, jobConf, split.getPath(), originalOi, columnIds);
-      cacheWriter = new CacheWriter(
-          bufferManager, allocSize, columnIds, splitIncludes, writer.hasIncludes());
-      writer.init(OrcFile.createWriter(split.getPath(),
-          createOrcWriterOptions(writer.getDestinationOi())));
-
-      int rowsPerSlice = 0;
-      long currentKnownTornStart = split.getStart();
-      long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
-      boolean hasData = false;
-      while (offsetReader.next()) {
-        hasData = true;
-        Writable value = offsetReader.getCurrentRow();
-        lastStartOffset = offsetReader.getCurrentRowStartOffset();
-        if (firstStartOffset == Long.MIN_VALUE) {
-          firstStartOffset = lastStartOffset;
-        }
-        writer.writeOneRow(value);
-
-        if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
-          assert offsetReader.hasOffsets();
-          writer.flushIntermediateData();
-          long fileOffset = offsetReader.getCurrentRowEndOffset();
-          // Must support offsets to be able to split.
-          if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
-            throw new AssertionError("Unable to get offsets from "
-                + offsetReader.getClass().getSimpleName());
-          }
-          cacheWriter.setCurrentStripeOffsets(
-              currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
-          // Split starting at row start will not read that row.
-          currentKnownTornStart = lastStartOffset;
-          // Row offsets will be determined from the reader (we could set the first from last).
-          lastStartOffset = Long.MIN_VALUE;
-          firstStartOffset = Long.MIN_VALUE;
-          rowsPerSlice = 0;
-          writer.writeIntermediateFooter();
-        }
+      List<Integer> splitColumnIds = OrcInputFormat.genIncludedColumnsReverse(
+          schema, splitIncludes, false);
+      // fileread writes to the writer, which writes to orcWriter, which writes to cacheWriter
+      EncodingWriter writer = VertorDeserializeOrcWriter.create(
+          sourceInputFormat, sourceSerDe, parts, daemonConf, jobConf, split.getPath(), originalOi,
+          splitColumnIds, splitIncludes, allocSize);
+      // TODO: move this into ctor? EW would need to create CacheWriter then
+      List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds;
+      writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
+          writer.isOnlyWritingIncludedColumns()), daemonConf, split.getPath());
+      if (writer instanceof VertorDeserializeOrcWriter) {
+        VertorDeserializeOrcWriter asyncWriter = (VertorDeserializeOrcWriter)writer;
+        asyncWriter.startAsync(new AsyncCacheDataCallback());
+        this.asyncWriters.add(asyncWriter);
       }
-      if (rowsPerSlice > 0 || (!maySplitTheSplit && hasData)) {
-        long fileOffset = -1;
-        if (!offsetReader.hasOffsets()) {
-          // The reader doesn't support offsets. We adjust offsets to match future splits.
-          // If cached split was starting at row start, that row would be skipped, so +1
-          firstStartOffset = split.getStart() + 1;
-          // Last row starting at the end of the split would be read.
-          lastStartOffset = split.getStart() + split.getLength();
-          // However, it must end after the split end, otherwise the next one would have been read.
-          fileOffset = lastStartOffset + 1;
-          if (LlapIoImpl.CACHE_LOGGER.isDebugEnabled()) {
-            LlapIoImpl.CACHE_LOGGER.debug("Cache offsets based on the split - 'first row' at "
-              + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
-          }
-        } else {
-          fileOffset = offsetReader.getCurrentRowEndOffset();
-          assert firstStartOffset >= 0 && lastStartOffset >= 0 && fileOffset >= 0;
+      currentFileRead = new FileReaderYieldReturn(
+          offsetReader, split, writer, maySplitTheSplit, targetSliceRowCount);
+    } finally {
+      // Assignment is the last thing in the try, so if it happen we assume success.
+      if (currentFileRead != null) return;
+      if (offsetReader == null) return;
+      try {
+        offsetReader.close();
+      } catch (Exception ex) {
+        LlapIoImpl.LOG.error("Failed to close source reader", ex);
+      }
+    }
+  }
+
+  private class AsyncCacheDataCallback implements AsyncCallback {
+    @Override
+    public void onComplete(VertorDeserializeOrcWriter writer) {
+      CacheWriter cacheWriter = null;
+      try {
+        cacheWriter = writer.getCacheWriter();
+        // What we were reading from disk originally.
+        boolean[] cacheIncludes = writer.getOriginalCacheIncludes();
+        Iterator<CacheWriter.CacheStripeData> iter = cacheWriter.stripes.iterator();
+        while (iter.hasNext()) {
+          processAsyncCacheData(iter.next(), cacheIncludes);
+          iter.remove();
         }
-        cacheWriter.setCurrentStripeOffsets(
-            currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+      } catch (IOException e) {
+        LlapIoImpl.LOG.error("Failed to cache async data", e);
+      } finally {
+        cacheWriter.discardData();
       }
-      // Close the writer to finalize the metadata. No catch since we cannot go on if this throws.
-      writer.close();
-      writer = null;
-    } finally {
-      // We don't need the source reader anymore.
-      if (offsetReader != null) {
+    }
+  }
+
+  // TODO: this interface is ugly. The two implementations are so far apart feature-wise
+  //       after all the perf changes that we might was well hardcode them separately.
+  static abstract class EncodingWriter {
+    protected Writer orcWriter;
+    protected CacheWriter cacheWriter;
+    protected final StructObjectInspector sourceOi;
+    private final int allocSize;
+
+    public EncodingWriter(StructObjectInspector sourceOi, int allocSize) {
+      this.sourceOi = sourceOi;
+      this.allocSize = allocSize;
+    }
+
+
+    public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException {
+      this.orcWriter = createOrcWriter(cacheWriter, conf, path, sourceOi);
+      this.cacheWriter = cacheWriter;
+    }
+
+    public CacheWriter getCacheWriter() {
+      return cacheWriter;
+    }
+    public abstract boolean isOnlyWritingIncludedColumns();
+
+    public abstract void writeOneRow(Writable row) throws IOException;
+    public abstract void setCurrentStripeOffsets(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long fileOffset);
+    public abstract void flushIntermediateData() throws IOException;
+    public abstract void writeIntermediateFooter() throws IOException;
+    public abstract List<VectorizedRowBatch> extractCurrentVrbs();
+    public void close() throws IOException {
+      if (orcWriter != null) {
         try {
-          offsetReader.close();
+          orcWriter.close();
+          orcWriter = null;
         } catch (Exception ex) {
-          LlapIoImpl.LOG.error("Failed to close source reader", ex);
+          LlapIoImpl.LOG.error("Failed to close ORC writer", ex);
         }
-      } else {
-        assert sourceReader != null;
+      }
+      if (cacheWriter != null) {
         try {
-          sourceReader.close();
+          cacheWriter.discardData();
+          cacheWriter = null;
         } catch (Exception ex) {
-          LlapIoImpl.LOG.error("Failed to close source reader", ex);
+          LlapIoImpl.LOG.error("Failed to close cache writer", ex);
         }
       }
     }
-  }
 
-  interface EncodingWriter {
-    void writeOneRow(Writable row) throws IOException;
-    StructObjectInspector getDestinationOi();
-    void init(Writer orcWriter);
-    boolean hasIncludes();
-    void writeIntermediateFooter() throws IOException;
-    void flushIntermediateData() throws IOException;
-    void close() throws IOException;
+    protected Writer createOrcWriter(CacheWriter cacheWriter, Configuration conf,
+        Path path, StructObjectInspector oi) throws IOException {
+      // TODO: this is currently broken. We need to set memory manager to a bogus implementation
+      //       to avoid problems with memory manager actually tracking the usage.
+      return OrcFile.createWriter(path, createOrcWriterOptions(
+          sourceOi, conf, cacheWriter, allocSize));
+    }
   }
 
-  static class DeserialerOrcWriter implements EncodingWriter {
-    private Writer orcWriter;
+  static class DeserializerOrcWriter extends EncodingWriter {
     private final Deserializer sourceSerDe;
-    private final StructObjectInspector sourceOi;
 
-    public DeserialerOrcWriter(Deserializer sourceSerDe, StructObjectInspector sourceOi) {
+    public DeserializerOrcWriter(
+        Deserializer sourceSerDe, StructObjectInspector sourceOi, int allocSize) {
+      super(sourceOi, allocSize);
       this.sourceSerDe = sourceSerDe;
-      this.sourceOi = sourceOi;
-    }
-
-    @Override
-    public void init(Writer orcWriter) {
-      this.orcWriter = orcWriter;
     }
 
     @Override
@@ -1152,22 +1516,30 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
 
     @Override
-    public boolean hasIncludes() {
+    public boolean isOnlyWritingIncludedColumns() {
       return false; // LazySimpleSerDe doesn't support projection.
     }
 
     @Override
-    public StructObjectInspector getDestinationOi() {
-      return sourceOi;
+    public void setCurrentStripeOffsets(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long fileOffset) {
+      cacheWriter.setCurrentStripeOffsets(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+    }
+
+    @Override
+    public List<VectorizedRowBatch> extractCurrentVrbs() {
+      return null; // Doesn't support creating VRBs.
     }
   }
 
-  private WriterOptions createOrcWriterOptions(ObjectInspector sourceOi) throws IOException {
-    return OrcFile.writerOptions(daemonConf).stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
+  static WriterOptions createOrcWriterOptions(ObjectInspector sourceOi,
+      Configuration conf, CacheWriter cacheWriter, int allocSize) throws IOException {
+    return OrcFile.writerOptions(conf).stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
         .rowIndexStride(Integer.MAX_VALUE) // For now, do not limit this - one RG per split
         .blockPadding(false).compress(CompressionKind.NONE).version(Version.CURRENT)
         .encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi)
-        .physicalWriter(cacheWriter);
+        .physicalWriter(cacheWriter).bufferSize(allocSize);
   }
 
   private ObjectInspector getOiFromSerDe() throws IOException {
@@ -1178,7 +1550,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
   }
 
-  private ReaderWithOffsets createOffsetReader(RecordReader sourceReader) {
+  private ReaderWithOffsets createOffsetReader(RecordReader<?, ?> sourceReader) {
     if (LlapIoImpl.LOG.isDebugEnabled()) {
       LlapIoImpl.LOG.debug("Using " + sourceReader.getClass().getSimpleName() + " to read data");
     }
@@ -1206,9 +1578,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
-      boolean hasCachedSlice, CacheWriter.CacheStripeData writerData) {
+      boolean hasCachedSlice, CacheWriter.CacheStripeData diskData) {
     if (ecb == null) { // This basically means stop has been called.
-      cleanupReaders();
+      cleanup(true);
       return false;
     }
     LlapIoImpl.LOG.trace("Sending a batch over to consumer");
@@ -1216,29 +1588,32 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     if (hasCachedSlice) {
       cachedData.getData().remove(0); // See javadoc - no need to clean up the cache data anymore.
     }
-    if (writerData != null) {
-      writerData.colStreams.clear();
+    if (diskData != null) {
+      diskData.colStreams.clear();
     }
     return true;
   }
 
-
-  private void cleanupReaders() {
-    if (writer != null) {
+  private void cleanup(boolean isError) {
+    cleanUpCurrentRead();
+    if (!isError) return;
+    for (VertorDeserializeOrcWriter asyncWriter : asyncWriters) {
       try {
-        writer.close();
-        writer = null;
+        asyncWriter.interrupt();
       } catch (Exception ex) {
-        LlapIoImpl.LOG.error("Failed to close ORC writer", ex);
+        LlapIoImpl.LOG.warn("Failed to interrupt an async writer", ex);
       }
     }
-    if (cacheWriter != null) {
-      try {
-        cacheWriter.discardData();
-        cacheWriter = null;
-      } catch (Exception ex) {
-        LlapIoImpl.LOG.error("Failed to close cache writer", ex);
-      }
+    asyncWriters.clear();
+  }
+
+  private void cleanUpCurrentRead() {
+    if (currentFileRead == null) return;
+    try {
+      currentFileRead.closeOffsetReader();
+      currentFileRead = null;
+    } catch (Exception ex) {
+      LlapIoImpl.LOG.error("Failed to close current file reader", ex);
     }
   }
 
@@ -1249,7 +1624,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   private boolean processStop() {
     if (!isStopped) return false;
     LlapIoImpl.LOG.info("SerDe-based data reader is stopping");
-    cleanupReaders();
+    cleanup(true);
     return true;
   }
 
@@ -1262,11 +1637,11 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic);
   }
 
-  // TODO: move to a superclass?
   @Override
   public void returnData(OrcEncodedColumnBatch ecb) {
     for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
       if (!ecb.hasData(colIx)) continue;
+      // TODO: reuse columnvector-s on hasBatch - save the array by column? take apart each list.
       ColumnStreamData[] datas = ecb.getColumnData(colIx);
       for (ColumnStreamData data : datas) {
         if (data == null || data.decRef() != 0) continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
index 63a3be2..86d9ecc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.llap.io.encoded;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -30,7 +32,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserialerOrcWriter;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserializerOrcWriter;
 import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.EncodingWriter;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
@@ -57,73 +60,90 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 
 /** The class that writes rows from a text reader to an ORC writer using VectorDeserializeRow. */
-class VertorDeserializeOrcWriter implements EncodingWriter {
+class VertorDeserializeOrcWriter extends EncodingWriter implements Runnable {
+  private final VectorizedRowBatchCtx vrbCtx;
   private Writer orcWriter;
   private final LazySimpleDeserializeRead deserializeRead;
   private final VectorDeserializeRow<?> vectorDeserializeRow;
-  private final VectorizedRowBatch sourceBatch, destinationBatch;
-  private final boolean hasIncludes;
   private final StructObjectInspector destinationOi;
+  private final boolean usesSourceIncludes;
+  private final List<Integer> sourceIncludes;
+
+  private final boolean isAsync;
+  private final Thread orcThread;
+  private final ConcurrentLinkedQueue<WriteOperation> queue;
+  private AsyncCallback completion;
+
+  // Stored here only as async operation context.
+  private final boolean[] cacheIncludes;
+
+  private VectorizedRowBatch sourceBatch, destinationBatch;
+  private List<VectorizedRowBatch> currentBatches;
 
   // TODO: if more writers are added, separate out an EncodingWriterFactory
   public static EncodingWriter create(InputFormat<?, ?> sourceIf, Deserializer serDe,
-      Map<Path, PartitionDesc> parts, Configuration daemonConf,
-      Configuration jobConf, Path splitPath, StructObjectInspector sourceOi,
-      List<Integer> includes) throws IOException {
+      Map<Path, PartitionDesc> parts, Configuration daemonConf, Configuration jobConf,
+      Path splitPath, StructObjectInspector sourceOi, List<Integer> sourceIncludes,
+      boolean[] cacheIncludes, int allocSize) throws IOException {
     // Vector SerDe can be disabled both on client and server side.
     if (!HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
         || !HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
         || !(sourceIf instanceof TextInputFormat) || !(serDe instanceof LazySimpleSerDe)) {
-      return new DeserialerOrcWriter(serDe, sourceOi);
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
     }
     Path path = splitPath.getFileSystem(daemonConf).makeQualified(splitPath);
     PartitionDesc partDesc = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
         parts, path, null);
     if (partDesc == null) {
       LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: no partition desc for " + path);
-      return new DeserialerOrcWriter(serDe, sourceOi);
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
     }
     Properties tblProps = partDesc.getTableDesc().getProperties();
     if ("true".equalsIgnoreCase(tblProps.getProperty(
         serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST))) {
       LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter due to "
         + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST);
-      return new DeserialerOrcWriter(serDe, sourceOi);
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
     }
     for (StructField sf : sourceOi.getAllStructFieldRefs()) {
       Category c = sf.getFieldObjectInspector().getCategory();
       if (c != Category.PRIMITIVE) {
         LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: " + c + " is not supported");
-        return new DeserialerOrcWriter(serDe, sourceOi);
+        return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
       }
     }
     LlapIoImpl.LOG.info("Creating VertorDeserializeOrcWriter for " + path);
-    return new VertorDeserializeOrcWriter(daemonConf, tblProps, sourceOi, includes);
+    return new VertorDeserializeOrcWriter(
+        daemonConf, tblProps, sourceOi, sourceIncludes, cacheIncludes, allocSize);
   }
 
   private VertorDeserializeOrcWriter(Configuration conf, Properties tblProps,
-      StructObjectInspector sourceOi, List<Integer> columnIds) throws IOException {
+      StructObjectInspector sourceOi, List<Integer> sourceIncludes, boolean[] cacheIncludes,
+      int allocSize) throws IOException {
+    super(sourceOi, allocSize);
     // See also: the usage of VectorDeserializeType, for binary. For now, we only want text.
-    VectorizedRowBatchCtx vrbCtx = createVrbCtx(sourceOi);
+    this.vrbCtx = createVrbCtx(sourceOi);
+    this.sourceIncludes = sourceIncludes;
+    this.cacheIncludes = cacheIncludes;
     this.sourceBatch = vrbCtx.createVectorizedRowBatch();
     deserializeRead = new LazySimpleDeserializeRead(vrbCtx.getRowColumnTypeInfos(),
         /* useExternalBuffer */ true, createSerdeParams(conf, tblProps));
     vectorDeserializeRow = new VectorDeserializeRow<LazySimpleDeserializeRead>(deserializeRead);
     int colCount = vrbCtx.getRowColumnTypeInfos().length;
     boolean[] includes = null;
-    this.hasIncludes = columnIds.size() < colCount;
-    if (hasIncludes) {
+    this.usesSourceIncludes = sourceIncludes.size() < colCount;
+    if (usesSourceIncludes) {
       // VectorDeserializeRow produces "sparse" VRB when includes are used; we need to write the
       // "dense" VRB to ORC. Ideally, we'd use projection columns, but ORC writer doesn't use them.
       // In any case, we would also need to build a new OI for OrcWriter config.
       // This is why OrcWriter is created after this writer, by the way.
-      this.destinationBatch = new VectorizedRowBatch(columnIds.size());
+      this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
       includes = new boolean[colCount];
       int inclBatchIx = 0;
-      List<String> childNames = new ArrayList<>(columnIds.size());
-      List<ObjectInspector> childOis = new ArrayList<>(columnIds.size());
+      List<String> childNames = new ArrayList<>(sourceIncludes.size());
+      List<ObjectInspector> childOis = new ArrayList<>(sourceIncludes.size());
       List<? extends StructField> sourceFields = sourceOi.getAllStructFieldRefs();
-      for (Integer columnId : columnIds) {
+      for (Integer columnId : sourceIncludes) {
         includes[columnId] = true;
         assert inclBatchIx <= columnId;
         // Note that we use the same vectors in both batches. Clever, very clever.
@@ -135,7 +155,7 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
       // This is only used by ORC to derive the structure. Most fields are unused.
       destinationOi = new LazySimpleStructObjectInspector(
           childNames, childOis, null, (byte)0, null);
-      destinationBatch.setPartitionInfo(columnIds.size(), 0);
+      destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
       if (LlapIoImpl.LOG.isDebugEnabled()) {
         LlapIoImpl.LOG.debug("Includes for deserializer are " + DebugUtils.toString(includes));
       }
@@ -154,6 +174,23 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
         throw new IOException(e);
       }
     }
+    this.isAsync = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED);
+    if (isAsync) {
+      currentBatches = new LinkedList<>();
+      queue = new ConcurrentLinkedQueue<>();
+      orcThread = new Thread(this);
+      orcThread.setDaemon(true);
+      orcThread.setName(Thread.currentThread().getName() + "-OrcEncode");
+    } else {
+      queue = null;
+      orcThread = null;
+      currentBatches = null;
+    }
+  }
+
+  public void startAsync(AsyncCallback callback) {
+    this.completion = callback;
+    this.orcThread.start();
   }
 
   private static VectorizedRowBatchCtx createVrbCtx(StructObjectInspector oi) throws IOException {
@@ -176,13 +213,57 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
   }
 
   @Override
-  public boolean hasIncludes() {
-    return hasIncludes;
+  public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException {
+    this.orcWriter = super.createOrcWriter(cacheWriter, conf, path, destinationOi);
+    this.cacheWriter = cacheWriter;
+  }
+
+  public interface AsyncCallback {
+    void onComplete(VertorDeserializeOrcWriter writer);
   }
 
   @Override
-  public StructObjectInspector getDestinationOi() {
-    return destinationOi;
+  public void run() {
+    while (true) {
+      WriteOperation op = null;
+      int fallbackMs = 8;
+      while (true) {
+        op = queue.poll();
+        if (op != null) break;
+        if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins.
+          LlapIoImpl.LOG.error("ORC encoder timed out waiting for input");
+          discardData();
+          return;
+        }
+        try {
+          Thread.sleep(fallbackMs);
+        } catch (InterruptedException e) {
+          LlapIoImpl.LOG.error("ORC encoder interrupted waiting for input");
+          discardData();
+          return;
+        }
+        fallbackMs <<= 1;
+      }
+      try {
+        if (op.apply(orcWriter, cacheWriter)) {
+          LlapIoImpl.LOG.info("ORC encoder received a exit event");
+          completion.onComplete(this);
+          return;
+        }
+      } catch (Exception e) {
+        LlapIoImpl.LOG.error("ORC encoder failed", e);
+        discardData();
+        return;
+      }
+    }
+  }
+
+  private void discardData() {
+    try {
+      cacheWriter.discardData();
+    } catch (Exception ex) {
+      LlapIoImpl.LOG.error("Failed to close an async cache writer", ex);
+    }
   }
 
   @Override
@@ -196,7 +277,7 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
 
     // Deserialize and append new row using the current batch size as the index.
     try {
-      // TODO: can we use ByRef? Probably not, need to see text record reader.
+      // Not using ByRef now since it's unsafe for text readers. Might be safe for others.
       vectorDeserializeRow.deserialize(sourceBatch, sourceBatch.size++);
     } catch (Exception e) {
       throw new IOException("DeserializeRead detail: "
@@ -206,19 +287,36 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
 
   private void flushBatch() throws IOException {
     addBatchToWriter();
-
-    for (int c = 0; c < sourceBatch.cols.length; ++c) {
-      // This resets vectors in both batches.
-      ColumnVector colVector = sourceBatch.cols[c];
-      if (colVector != null) {
-        colVector.reset();
-        colVector.init();
+    if (!isAsync) {
+      for (int c = 0; c < sourceBatch.cols.length; ++c) {
+        // This resets vectors in both batches.
+        ColumnVector colVector = sourceBatch.cols[c];
+        if (colVector != null) {
+          colVector.reset();
+          colVector.init();
+        }
+      }
+      sourceBatch.selectedInUse = false;
+      sourceBatch.size = 0;
+      sourceBatch.endOfFile = false;
+      propagateSourceBatchFieldsToDest();
+    } else {
+      // In addBatchToWriter, we have passed the batch to both ORC and operator pipeline
+      // (neither ever changes the vectors). We'd need a set of vectors batch to write to.
+      // TODO: for now, create this from scratch. Ideally we should return the vectors from ops.
+      //       We could also have the ORC thread create it for us in its spare time...
+      this.sourceBatch = vrbCtx.createVectorizedRowBatch();
+      if (usesSourceIncludes) {
+        this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
+        int inclBatchIx = 0;
+        for (Integer columnId : sourceIncludes) {
+          destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId];
+        }
+        destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
+      } else {
+        this.destinationBatch = sourceBatch;
       }
     }
-    sourceBatch.selectedInUse = false;
-    sourceBatch.size = 0;
-    sourceBatch.endOfFile = false;
-    propagateSourceBatchFieldsToDest();
   }
 
   private void propagateSourceBatchFieldsToDest() {
@@ -230,8 +328,12 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
 
   private void addBatchToWriter() throws IOException {
     propagateSourceBatchFieldsToDest();
-    // LlapIoImpl.LOG.info("Writing includeOnlyBatch " + s + "; data "+ includeOnlyBatch);
-    orcWriter.addRowBatch(destinationBatch);
+    if (!isAsync) {
+      orcWriter.addRowBatch(destinationBatch);
+    } else {
+      currentBatches.add(destinationBatch);
+      addWriteOp(new VrbOperation(destinationBatch));
+    }
   }
 
   @Override
@@ -243,7 +345,28 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
 
   @Override
   public void writeIntermediateFooter() throws IOException {
-    orcWriter.writeIntermediateFooter();
+    if (isAsync) {
+      addWriteOp(new IntermediateFooterOperation());
+    } else {
+      orcWriter.writeIntermediateFooter();
+    }
+  }
+
+  private void addWriteOp(WriteOperation wo) throws AssertionError {
+    if (queue.offer(wo)) return;
+    throw new AssertionError("Queue full"); // This should never happen with linked list queue.
+  }
+
+  @Override
+  public void setCurrentStripeOffsets(long currentKnownTornStart,
+      long firstStartOffset, long lastStartOffset, long fileOffset) {
+    if (isAsync) {
+      addWriteOp(new SetStripeDataOperation(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset));
+    } else {
+      cacheWriter.setCurrentStripeOffsets(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+    }
   }
 
   @Override
@@ -251,11 +374,85 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
     if (sourceBatch.size > 0) {
       addBatchToWriter();
     }
-    orcWriter.close();
+    if (!isAsync) {
+      orcWriter.close();
+    } else {
+      addWriteOp(new CloseOperation());
+    }
+  }
+
+  public List<VectorizedRowBatch> extractCurrentVrbs() {
+    if (!isAsync) return null;
+    List<VectorizedRowBatch> result = currentBatches;
+    currentBatches = new LinkedList<>();
+    return result;
+  }
+
+  private static interface WriteOperation {
+    boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException;
+  }
+
+  private static class VrbOperation implements WriteOperation {
+    private VectorizedRowBatch batch;
+
+    public VrbOperation(VectorizedRowBatch batch) {
+      // LlapIoImpl.LOG.debug("Adding batch " + batch);
+      this.batch = batch;
+    }
+
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      // LlapIoImpl.LOG.debug("Writing batch " + batch);
+      writer.addRowBatch(batch);
+      return false;
+    }
+  }
+
+  private static class IntermediateFooterOperation implements WriteOperation {
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      writer.writeIntermediateFooter();
+      return false;
+    }
+  }
+
+  private static class SetStripeDataOperation implements WriteOperation {
+    private final long currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset;
+    public SetStripeDataOperation(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long fileOffset) {
+      this.currentKnownTornStart = currentKnownTornStart;
+      this.firstStartOffset = firstStartOffset;
+      this.lastStartOffset = lastStartOffset;
+      this.fileOffset = fileOffset;
+    }
+
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      cacheWriter.setCurrentStripeOffsets(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+      return false;
+    }
+  }
+
+  private static class CloseOperation implements WriteOperation {
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      writer.close();
+      return true; // The thread should stop after this. 
+    }
+  }
+
+  public boolean[] getOriginalCacheIncludes() {
+    return cacheIncludes;
   }
 
   @Override
-  public void init(Writer orcWriter) {
-    this.orcWriter = orcWriter;
+  public boolean isOnlyWritingIncludedColumns() {
+    return usesSourceIncludes;
+  }
+
+  public void interrupt() {
+    assert orcThread != null;
+    orcThread.interrupt();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 99cc506..369584b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -350,6 +350,45 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   }
 
   /**
+   * Reverses genIncludedColumns; produces the table columns indexes from ORC included columns.
+   * @param readerSchema The ORC reader schema for the table.
+   * @param included The included ORC columns.
+   * @param isFullColumnMatch Whether full column match should be enforced (i.e. whether to expect
+   *          that all the sub-columns or a complex type column should be included or excluded
+   *          together in the included array. If false, any sub-column being included for a complex
+   *          type is sufficient for the entire complex column to be included in the result.
+   * @return The list of table column indexes.
+   */
+  public static List<Integer> genIncludedColumnsReverse(
+      TypeDescription readerSchema, boolean[] included, boolean isFullColumnMatch) {
+    assert included != null;
+    List<Integer> result = new ArrayList<>();
+    List<TypeDescription> children = readerSchema.getChildren();
+    for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) {
+      TypeDescription child = children.get(columnNumber);
+      int id = child.getId();
+      int maxId = child.getMaximumId();
+      if (id >= included.length || maxId >= included.length) {
+        throw new AssertionError("Inconsistent includes: " + included.length
+            + " elements; found column ID " + id);
+      }
+      boolean isIncluded = included[id];
+      for (int col = id + 1; col <= maxId; ++col) {
+        if (isFullColumnMatch && included[col] != isIncluded) {
+          throw new AssertionError("Inconsistent includes: root column IDs are [" + id + ", "
+              + maxId + "]; included[" + col + "] = " + included[col] + ", which is different "
+              + " from the previous IDs of the same root column.");
+        }
+        isIncluded = isIncluded || included[col];
+      }
+      if (isIncluded) {
+        result.add(columnNumber);
+      }
+    }
+    return result;
+  }
+
+  /**
    * Take the configuration and figure out which columns we need to include.
    * @param readerSchema the types for the reader
    * @param conf the configuration