You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/10 02:36:42 UTC

[3/3] hive git commit: HIVE-15672 : LLAP text cache: improve first query perf II (Sergey Shelukhin, reviewed by Prasanth Jayachandran, Owen O'Malley)

HIVE-15672 : LLAP text cache: improve first query perf II (Sergey Shelukhin, reviewed by Prasanth Jayachandran, Owen O'Malley)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f273cc5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f273cc5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f273cc5

Branch: refs/heads/master
Commit: 8f273cc53f62c79f8ac30453e3ff94717d91b4dd
Parents: fbe9b05
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Feb 9 18:26:22 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Feb 9 18:26:22 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../hive/llap/cache/SerDeLowLevelCacheImpl.java |  13 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |  22 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java | 883 +++++++++++++------
 .../io/encoded/VertorDeserializeOrcWriter.java  | 285 +++++-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  39 +
 .../orc/encoded/EncodedTreeReaderFactory.java   | 497 +++++++++--
 .../hadoop/hive/ql/io/orc/encoded/Reader.java   |  69 +-
 .../common/io/encoded/EncodedColumnBatch.java   |   1 +
 .../hive/ql/exec/vector/BytesColumnVector.java  |  11 +
 .../hive/ql/exec/vector/ColumnVector.java       |  12 +
 .../ql/exec/vector/DecimalColumnVector.java     |  11 +-
 .../hive/ql/exec/vector/DoubleColumnVector.java |   8 +-
 .../vector/IntervalDayTimeColumnVector.java     |   8 +
 .../hive/ql/exec/vector/LongColumnVector.java   |   8 +-
 .../ql/exec/vector/MultiValuedColumnVector.java |   4 +
 .../hive/ql/exec/vector/StructColumnVector.java |   5 +
 .../ql/exec/vector/TimestampColumnVector.java   |   8 +
 .../hive/ql/exec/vector/UnionColumnVector.java  |   5 +
 19 files changed, 1504 insertions(+), 388 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e82758f..b27b663 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2922,6 +2922,9 @@ public class HiveConf extends Configuration {
         LLAP_ALLOCATOR_MAX_ALLOC + "."),
     LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED("hive.llap.io.encode.vector.serde.enabled", true,
         "Whether LLAP should use vectorized SerDe reader to read text data when re-encoding."),
+    LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED("hive.llap.io.encode.vector.serde.async.enabled",
+        true,
+        "Whether LLAP should use async mode in vectorized SerDe reader to read text data."),
     LLAP_IO_ENCODE_SLICE_ROW_COUNT("hive.llap.io.encode.slice.row.count", 100000,
         "Row count to use to separate cache slices when reading encoded data from row-based\n" +
         "inputs into LLAP cache, if this feature is enabled."),

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index 85fae9a..4809398 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -113,7 +113,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
 
   public static final class StripeData {
     // In LRR case, if we just store 2 boundaries (which could be split boundaries or reader
-    // positions, we wouldn't be able to account for torn rows correctly because the semantics of
+    // positions), we wouldn't be able to account for torn rows correctly because the semantics of
     // our "exact" reader positions, and inexact split boundaries, are different. We cannot even
     // tell LRR to use exact boundaries, as there can be a mismatch in an original mid-file split
     // wrt first row when caching - we may produce incorrect result if we adjust the split
@@ -182,7 +182,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
           + firstStart + " to [" + lastStart + ", " + lastEnd + ")";
     }
 
-    public static StripeData duplicateForResults(StripeData s) {
+    public static StripeData duplicateStructure(StripeData s) {
       return new StripeData(s.knownTornStart, s.firstStart, s.lastStart, s.lastEnd,
           s.rowCount, new OrcProto.ColumnEncoding[s.encodings.length]);
     }
@@ -389,14 +389,14 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
     if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
       LlapIoImpl.CACHE_LOGGER.trace("Got stripe in cache " + cStripe);
     }
-    StripeData stripe = StripeData.duplicateForResults(cStripe);
+    StripeData stripe = StripeData.duplicateStructure(cStripe);
     result.stripes.add(stripe);
     boolean isMissed = false;
     for (int colIx = 0; colIx < cached.colCount; ++colIx) {
       if (!includes[colIx]) continue;
       if (cStripe.encodings[colIx] == null || cStripe.data[colIx] == null) {
         if (cStripe.data[colIx] != null) {
-          assert false : cStripe;
+          throw new AssertionError(cStripe);
           // No encoding => must have no data.
         }
         isMissed = true;
@@ -419,9 +419,9 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
             LlapIoImpl.CACHE_LOGGER.info("Couldn't lock data for stripe at "
                 + stripeIx + ", colIx " + colIx + ", stream type " + streamIx);
 
+            handleRemovedColumnData(cColData);
             cColData = null;
             isMissed = true;
-            handleRemovedColumnData(cColData);
             if (gotAllData != null) {
               gotAllData.value = false;
             }
@@ -432,6 +432,9 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
       // At this point, we have arrived at the level where we need all the data, and the
       // arrays never change. So we will just do a shallow assignment here instead of copy.
       stripe.data[colIx] = cColData;
+      if (cColData == null) {
+        stripe.encodings[colIx] = null;
+      }
     }
     doMetricsStuffForOneSlice(qfCounters, stripe, isMissed);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 4295c1c..8d96e7b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -53,12 +52,8 @@ import org.apache.orc.impl.SchemaEvolution;
 import org.apache.orc.impl.TreeReaderFactory;
 import org.apache.orc.impl.TreeReaderFactory.StructTreeReader;
 import org.apache.orc.impl.TreeReaderFactory.TreeReader;
-import org.apache.orc.OrcProto;
 import org.apache.orc.impl.WriterImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
+import org.apache.orc.OrcProto;
 
 
 public class OrcEncodedDataConsumer
@@ -134,7 +129,7 @@ public class OrcEncodedDataConsumer
             schema, stripeMetadata.getEncodings(), batch, codec, context, columnMapping);
         this.columnReaders = treeReader.getChildReaders();
         this.columnMapping = Arrays.copyOf(columnMapping, columnReaders.length);
-        positionInStreams(columnReaders, batch, stripeMetadata);
+        positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata);
       } else {
         repositionInStreams(this.columnReaders, batch, sameStripe, stripeMetadata);
       }
@@ -225,8 +220,8 @@ public class OrcEncodedDataConsumer
   }
 
   private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
-      EncodedColumnBatch<OrcBatchKey> batch, ConsumerStripeMetadata stripeMetadata) throws IOException {
-    PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+      OrcBatchKey batchKey, ConsumerStripeMetadata stripeMetadata) throws IOException {
+    PositionProvider[] pps = createPositionProviders(columnReaders, batchKey, stripeMetadata);
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       columnReaders[i].seek(pps);
@@ -236,10 +231,13 @@ public class OrcEncodedDataConsumer
   private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
       EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe,
       ConsumerStripeMetadata stripeMetadata) throws IOException {
-    PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+    PositionProvider[] pps = createPositionProviders(
+        columnReaders, batch.getBatchKey(), stripeMetadata);
     if (pps == null) return;
     for (int i = 0; i < columnReaders.length; i++) {
       TreeReader reader = columnReaders[i];
+      // Note: we assume this never happens for SerDe reader - the batch would never have vectors.
+      // That is always true now; but it wasn't some day, the below would throw in getColumnData.
       ((SettableTreeReader) reader).setBuffers(batch, sameStripe);
       // TODO: When hive moves to java8, make updateTimezone() as default method in
       // SettableTreeReader so that we can avoid this check.
@@ -268,7 +266,7 @@ public class OrcEncodedDataConsumer
   }
 
   private PositionProvider[] createPositionProviders(
-      TreeReaderFactory.TreeReader[] columnReaders, EncodedColumnBatch<OrcBatchKey> batch,
+      TreeReaderFactory.TreeReader[] columnReaders, OrcBatchKey batchKey,
       ConsumerStripeMetadata stripeMetadata) throws IOException {
     if (columnReaders.length == 0) return null;
     PositionProvider[] pps = null;
@@ -279,7 +277,7 @@ public class OrcEncodedDataConsumer
         pps[i] = singleRgPp;
       }
     } else {
-      int rowGroupIndex = batch.getBatchKey().rgIx;
+      int rowGroupIndex = batchKey.rgIx;
       if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
         throw new IOException("Cannot position readers without RG information");
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 8d86d17..419043a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -52,6 +52,10 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.SerDeStripeMetadata;
 import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
+import org.apache.hadoop.hive.llap.io.encoded.VertorDeserializeOrcWriter.AsyncCallback;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
@@ -158,19 +162,21 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   private final InputFormat<?, ?> sourceInputFormat;
   private final Reporter reporter;
   private final JobConf jobConf;
+  private final TypeDescription schema;
   private final int allocSize;
   private final int targetSliceRowCount;
   private final boolean isLrrEnabled;
 
   private final boolean[] writerIncludes;
-  private EncodingWriter writer = null;
-  private CacheWriter cacheWriter = null;
+  private FileReaderYieldReturn currentFileRead = null;
+
   /**
    * Data from cache currently being processed. We store it here so that we could decref
    * it in case of failures. We remove each slice from the data after it has been sent to
    * the consumer, at which point the consumer is responsible for it.
    */
   private FileData cachedData;
+  private List<VertorDeserializeOrcWriter> asyncWriters = new ArrayList<>();
 
   public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache,
       BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split,
@@ -210,6 +216,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     this.sourceSerDe = sourceSerDe;
     this.reporter = reporter;
     this.jobConf = jobConf;
+    this.schema = schema;
     this.writerIncludes = OrcInputFormat.genIncludedColumns(schema, columnIds);
     SchemaEvolution evolution = new SchemaEvolution(schema,
         new Reader.Options(jobConf).include(writerIncludes));
@@ -243,7 +250,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     throw new UnsupportedOperationException();
   }
 
-  // TODO: move to base class?
+  // TODO: move to a base class?
   @Override
   protected Void callInternal() throws IOException, InterruptedException {
     return ugi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -254,21 +261,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     });
   }
 
-  public static class CacheOutStream extends OutStream {
-    private final CacheOutputReceiver receiver;
-    public CacheOutStream(String name, int bufferSize, CompressionCodec codec,
-        CacheOutputReceiver receiver) throws IOException {
-      super(name, bufferSize, codec, receiver);
-      this.receiver = receiver;
-    }
-
-    @Override
-    public void clear() throws IOException {
-      super.clear();
-      receiver.clear();
-    }
-  }
-
   /** A row-based (Writable) reader that may also be able to report file offsets. */
   interface ReaderWithOffsets {
     /** Moves the reader to the next row. */
@@ -333,7 +325,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private CacheStripeData currentStripe;
     private final List<CacheStripeData> stripes = new ArrayList<>();
     private final BufferUsageManager bufferManager;
-    private final int bufferSize;
+    /**
+     * For !doesSourceHaveIncludes case, stores global column IDs to verify writer columns.
+     * For doesSourceHaveIncludes case, stores source column IDs used to map things.
+     */
     private final List<Integer> columnIds;
     private final boolean[] writerIncludes;
     // These are global since ORC reuses objects between stripes.
@@ -341,14 +336,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private final Map<Integer, List<CacheOutputReceiver>> colStreams = new HashMap<>();
     private final boolean doesSourceHaveIncludes;
 
-    public CacheWriter(BufferUsageManager bufferManager, int bufferSize,
-        List<Integer> columnIds, boolean[] writerIncludes, boolean doesSourceHaveIncludes) {
+    public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds,
+        boolean[] writerIncludes, boolean doesSourceHaveIncludes) {
       this.bufferManager = bufferManager;
-      this.bufferSize = bufferSize;
-      this.columnIds = columnIds;
       assert writerIncludes != null; // Taken care of on higher level.
       this.writerIncludes = writerIncludes;
       this.doesSourceHaveIncludes = doesSourceHaveIncludes;
+      this.columnIds = columnIds;
       startStripe();
     }
 
@@ -456,18 +450,18 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
     @Override
     public void writeHeader() throws IOException {
-
     }
 
     @Override
-    public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec) throws IOException {
+    public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index,
+        CompressionCodec codec) throws IOException {
       // TODO: right now we treat each slice as a stripe with a single RG and never bother
       //       with indexes. In phase 4, we need to add indexing and filtering.
     }
 
     @Override
-    public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec) throws IOException {
-
+    public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom,
+        CompressionCodec codec) throws IOException {
     }
 
     @Override
@@ -545,9 +539,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
 
     @Override
-    public void appendRawStripe(ByteBuffer stripe,
-                                OrcProto.StripeInformation.Builder dirEntry
-                                ) throws IOException {
+    public void appendRawStripe(
+        ByteBuffer stripe, OrcProto.StripeInformation.Builder dirEntry) throws IOException {
       throw new UnsupportedOperationException(); // Only used in ACID writer.
     }
 
@@ -586,7 +579,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     @Override
     public void suppress() {
       suppressed = true;
-      buffers = null;
       lastBufferPos = -1;
     }
 
@@ -639,6 +631,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private static class NullOutputReceiver implements OutputReceiver {
+    @SuppressWarnings("unused")
     private final StreamName name;
 
     public NullOutputReceiver(StreamName name) {
@@ -655,6 +648,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   protected Void performDataRead() throws IOException {
+    boolean isOk = false;
     try {
       try {
         long startTime = counters.startTimeCounter();
@@ -667,10 +661,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
         try {
           isFromCache = readFileWithCache(startTime);
         } finally {
+          // Note that the code removes the data from the field as it's passed to the consumer,
+          // so we expect to have stuff remaining in there only in case of errors.
           if (cachedData != null && cachedData.getData() != null) {
             for (StripeData sd : cachedData.getData()) {
               unlockAllBuffers(sd);
             }
+            cachedData = null;
           }
         }
         if (isFromCache == null) return null; // Stop requested, and handled inside.
@@ -680,20 +677,24 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
         // Done with all the things.
         recordReaderTime(startTime);
+        if (LlapIoImpl.LOG.isTraceEnabled()) {
+          LlapIoImpl.LOG.trace("done processing {}", split);
+        }
       } catch (Throwable e) {
         LlapIoImpl.LOG.error("Exception while processing", e);
         consumer.setError(e);
         throw e;
       }
       consumer.setDone();
-
-      LlapIoImpl.LOG.trace("done processing {}", split);
+      isOk = true;
       return null;
     } finally {
-      cleanupReaders();
+      cleanup(!isOk);
+      // Do not clean up the writers - the callback should do it.
     }
   }
 
+
   private void unlockAllBuffers(StripeData si) {
     for (int i = 0; i < si.getData().length; ++i) {
       LlapDataBuffer[][] colData = si.getData()[i];
@@ -718,6 +719,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
         // Make data consistent with encodings, don't store useless information.
         if (sd.getData()[i] == null) {
           encodings[i] = null;
+        } else if (encodings[i] == null) {
+          throw new AssertionError("Caching data without an encoding at " + i + ": " + sd);
         }
       }
       FileData fd = new FileData(fileKey, encodings.length);
@@ -732,7 +735,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     // This relies on sequence of calls to cacheFileData and sendEcb..
   }
 
-
   private void lockAllBuffers(StripeData sd) {
     for (int i = 0; i < sd.getData().length; ++i) {
       LlapDataBuffer[][] colData = sd.getData()[i];
@@ -755,7 +757,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     this.cachedData = cache.getFileData(fileKey, split.getStart(),
         endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
     if (cachedData == null) {
-      LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache");
+      if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+        LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache");
+      }
       return false;
     }
     String[] hosts = extractHosts(split, false), inMemoryHosts = extractHosts(split, true);
@@ -796,7 +800,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
 
     if (uncachedSuffixStart < endOfSplit || isUnfortunate) {
-      // TODO: will 0-length split work? should we assume 1+ chars and add 1 for isUnfortunate?
+      // Note: we assume 0-length split is correct given now LRR interprets offsets (reading an
+      // extra row). Should we instead assume 1+ chars and add 1 for isUnfortunate?
       FileSplit splitPart = new FileSplit(split.getPath(), uncachedSuffixStart,
           endOfSplit - uncachedSuffixStart, hosts, inMemoryHosts);
       if (!processOneFileSplit(splitPart, startTime, stripeIx, null)) return null;
@@ -806,101 +811,119 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   public boolean processOneFileSplit(FileSplit split, long startTime,
       Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
-    ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
     LlapIoImpl.LOG.info("Processing one split {" + split.getPath() + ", "
         + split.getStart() + ", " + split.getLength() + "}");
     if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
       LlapIoImpl.CACHE_LOGGER.trace("Cache data for the split is " + slice);
     }
-    boolean[] splitIncludes = writerIncludes;
-    boolean hasAllData = false;
-    if (cacheEncodings != null) {
-      hasAllData = true;
-      splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
-      for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
-        if (!splitIncludes[colIx]) continue;
-        assert (cacheEncodings[colIx] != null) == (slice.getData()[colIx] != null);
-        if (cacheEncodings[colIx] != null) {
-          splitIncludes[colIx] = false;
-        } else {
-          hasAllData = false;
-        }
-      }
-    }
-    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
-      LlapIoImpl.LOG.trace("Includes accounting for cached data: before " + DebugUtils.toString(
-        writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
-    }
+    boolean[] splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
+    boolean hasAllData = slice != null
+        && determineSplitIncludes(slice, splitIncludes, writerIncludes);
+
     // We have 3 cases here:
     // 1) All the data is in the cache. Always a single slice, no disk read, no cache puts.
     // 2) Some data is in the cache. Always a single slice, disk read and a single cache put.
     // 3) No data is in the cache. Multiple slices, disk read and multiple cache puts.
-    if (!hasAllData) {
-      // This initializes cacheWriter with data.
-      readSplitFromFile(split, splitIncludes, slice);
-      assert cacheWriter != null;
-    }
-    if (slice != null) {
-      // If we had a cache range already, it should not have been split.
-      assert cacheWriter == null || cacheWriter.stripes.size() == 1;
-      CacheWriter.CacheStripeData csd = hasAllData ? null : cacheWriter.stripes.get(0);
+    if (hasAllData) {
+      // Everything comes from cache.
+      CacheWriter.CacheStripeData csd = null;
       boolean result = processOneSlice(csd, splitIncludes, stripeIxRef.value, slice, startTime);
       ++stripeIxRef.value;
       return result;
-    } else {
-      for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
-        if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
-          return false;
+    }
+
+    boolean result = false;
+    // This initializes currentFileRead.
+    startReadSplitFromFile(split, splitIncludes, slice);
+    try {
+      if (slice != null) {
+        // If we had a cache range already, we expect a single matching disk slice.
+        Vectors vectors = currentFileRead.readNextSlice();
+        if (!vectors.isSupported()) {
+          // Not in VRB mode - the new cache data is ready, we should use it.
+          CacheWriter cacheWriter = currentFileRead.getCacheWriter();
+          assert cacheWriter.stripes.size() == 1;
+          result = processOneSlice(
+              cacheWriter.stripes.get(0), splitIncludes, stripeIxRef.value, slice, startTime);
+        } else {
+          // VRB mode - process the VRBs with cache data; the new cache data is coming later.
+          result = processOneSlice(
+              vectors, splitIncludes, stripeIxRef.value, slice, startTime);
         }
+        assert null == currentFileRead.readNextSlice();
         ++stripeIxRef.value;
+      } else {
+        // All the data comes from disk. The reader may have split it into multiple slices.
+        Vectors vectors = currentFileRead.readNextSlice();
+        assert vectors != null;
+        result = true;
+        if (!vectors.isSupported()) {
+          // Not in VRB mode - the new cache data is (partially) ready, we should use it.
+          while (currentFileRead.readNextSlice() != null); // Force the rest of the data thru.
+          CacheWriter cacheWriter = currentFileRead.getCacheWriter();
+          for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
+            if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
+              result = false;
+              break;
+            }
+            ++stripeIxRef.value;
+          }
+        } else {
+          // VRB mode - process the VRBs with cache data; the new cache data is coming later.
+          do {
+            assert vectors.isSupported();
+            if (!processOneSlice(vectors, splitIncludes, stripeIxRef.value, null, startTime)) {
+              result = false;
+              break;
+            }
+            ++stripeIxRef.value;
+          } while ((vectors = currentFileRead.readNextSlice()) != null);
+        }
       }
-      return true;
+    } finally {
+      cleanUpCurrentRead();
     }
+    return result;
   }
 
-  private boolean processOneSlice(CacheWriter.CacheStripeData csd, boolean[] splitIncludes,
-      int stripeIx, StripeData slice, long startTime) throws IOException {
-    String sliceStr = slice == null ? "null" : slice.toCoordinateString();
-    if (LlapIoImpl.LOG.isDebugEnabled()) {
-      LlapIoImpl.LOG.debug("Processing slice #" + stripeIx + " " + sliceStr + "; has"
-        + ((slice == null) ? " no" : "") + " cache data; has" + ((csd == null) ? " no" : "")
-        + " disk data");
+  private static boolean determineSplitIncludes(
+      StripeData slice, boolean[] splitIncludes, boolean[] writerIncludes) {
+    ColumnEncoding[] cacheEncodings = slice.getEncodings();
+    assert cacheEncodings != null;
+    boolean hasAllData = true;
+    for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+      if (!splitIncludes[colIx]) continue;
+      if ((cacheEncodings[colIx] != null) != (slice.getData()[colIx] != null)) {
+        throw new AssertionError("Inconsistent cache slice " + slice);
+      }
+      if (cacheEncodings[colIx] != null) {
+        splitIncludes[colIx] = false;
+      } else {
+        hasAllData = false;
+      }
+    }
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOG.trace("Includes accounting for cached data: before " + DebugUtils.toString(
+        writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
     }
+    return hasAllData;
+  }
 
-    ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
-    LlapDataBuffer[][][] cacheData = slice == null ? null : slice.getData();
-    long cacheRowCount = slice == null ? -1L : slice.getRowCount();
+  private boolean processOneSlice(CacheWriter.CacheStripeData diskData, boolean[] splitIncludes,
+      int stripeIx, StripeData cacheData, long startTime) throws IOException {
+    logProcessOneSlice(stripeIx, diskData, cacheData);
+
+    ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
+    LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+    long cacheRowCount = cacheData == null ? -1L : cacheData.getRowCount();
     SerDeStripeMetadata metadata = new SerDeStripeMetadata(stripeIx);
     StripeData sliceToCache = null;
-    boolean hasAllData = csd == null;
+    boolean hasAllData = diskData == null;
     if (!hasAllData) {
-      if (slice == null) {
-        sliceToCache = new StripeData(
-            csd.knownTornStart, csd.firstRowStart, csd.lastRowStart, csd.lastRowEnd,
-            csd.rowCount, csd.encodings.toArray(new ColumnEncoding[csd.encodings.size()]));
-      } else {
-        if (csd.rowCount != slice.getRowCount()) {
-          throw new IOException("Row count mismatch; disk " + csd.rowCount + ", cache "
-              + slice.getRowCount() + " from " + csd + " and " + slice);
-        }
-        if (csd.encodings.size() != slice.getEncodings().length) {
-          throw new IOException("Column count mismatch; disk " + csd.encodings.size()
-              + ", cache " + slice.getEncodings().length + " from " + csd + " and " + slice);
-        }
-        if (LlapIoImpl.LOG.isDebugEnabled()) {
-          LlapIoImpl.LOG.debug("Creating slice to cache in addition to an existing slice "
-            + slice.toCoordinateString() + "; disk offsets were " + csd.toCoordinateString());
-        }
-        sliceToCache = StripeData.duplicateForResults(slice);
-        for (int i = 0; i < csd.encodings.size(); ++i) {
-          sliceToCache.getEncodings()[i] = csd.encodings.get(i);
-        }
-        sliceToCache.setKnownTornStart(Math.min(csd.knownTornStart, slice.getKnownTornStart()));
-      }
-      metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, csd.encodings));
-      metadata.setRowCount(csd.rowCount);
+      sliceToCache = createSliceToCache(diskData, cacheData);
+      metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, diskData.encodings));
+      metadata.setRowCount(diskData.rowCount);
     } else {
-      assert cacheWriter == null;
       metadata.setEncodings(Lists.newArrayList(cacheEncodings));
       metadata.setRowCount(cacheRowCount);
     }
@@ -916,46 +939,28 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
       if (!hasAllData && splitIncludes[colIx]) {
         // The column has been read from disk.
-        List<CacheWriter.CacheStreamData> streams = csd.colStreams.get(colIx);
-        if (LlapIoImpl.LOG.isTraceEnabled()) {
-          LlapIoImpl.LOG.trace("Processing streams for column " + colIx + ": " + streams);
-        }
-        LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
-            = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+        List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
+        LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
         if (streams == null) continue; // Struct column, such as root?
         Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
         while (iter.hasNext()) {
           CacheWriter.CacheStreamData stream = iter.next();
           if (stream.isSuppressed) {
-            LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+            if (LlapIoImpl.LOG.isTraceEnabled()) {
+              LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+            }
             iter.remove();
             discardUncachedBuffers(stream.data);
             continue;
           }
-          // TODO: We write each slice using a separate writer, so we don't share dictionaries. Fix?
+          int streamIx = setStreamDataToCache(newCacheDataForCol, stream);
           ColumnStreamData cb = CSD_POOL.take();
           cb.incRef();
-          int streamIx = stream.name.getKind().getNumber();
           cb.setCacheBuffers(stream.data);
-          // This is kinda hacky - we "know" these are LlapDataBuffer-s.
-          newCacheDataForCol[streamIx] = stream.data.toArray(
-              new LlapDataBuffer[stream.data.size()]);
           ecb.setStreamData(colIx, streamIx, cb);
         }
       } else {
-        // The column has been obtained from cache.
-        LlapDataBuffer[][] colData = cacheData[colIx];
-        if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
-          LlapIoImpl.CACHE_LOGGER.trace("Processing cache data for column " + colIx + ": "
-              + SerDeLowLevelCacheImpl.toString(colData));
-        }
-        for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
-          if (colData[streamIx] == null) continue;
-          ColumnStreamData cb = CSD_POOL.take();
-          cb.incRef();
-          cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
-          ecb.setStreamData(colIx, streamIx, cb);
-        }
+        processColumnCacheData(cacheBuffers, ecb, colIx);
       }
     }
     if (processStop()) {
@@ -969,7 +974,183 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       LlapIoImpl.CACHE_LOGGER.trace("Data to cache from the read " + sliceToCache);
     }
     cacheFileData(sliceToCache);
-    return sendEcbToConsumer(ecb, slice != null, csd);
+    return sendEcbToConsumer(ecb, cacheData != null, diskData);
+  }
+
+  private void validateCacheAndDisk(StripeData cacheData,
+      long rowCount, long encodingCount, Object diskDataLog) throws IOException {
+    if (rowCount != cacheData.getRowCount()) {
+      throw new IOException("Row count mismatch; disk " + rowCount + ", cache "
+          + cacheData.getRowCount() + " from " + diskDataLog + " and " + cacheData);
+    }
+    if (encodingCount > 0 && encodingCount != cacheData.getEncodings().length) {
+      throw new IOException("Column count mismatch; disk " + encodingCount + ", cache "
+          + cacheData.getEncodings().length + " from " + diskDataLog + " and " + cacheData);
+    }
+  }
+
+
+  /** Unlike the other overload of processOneSlice, doesn't cache data. */
+  private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes,
+      int stripeIx, StripeData cacheData, long startTime) throws IOException {
+    if (diskData == null) {
+      throw new AssertionError(); // The other overload should have been used.
+    }
+    // LlapIoImpl.LOG.debug("diskData " + diskData);
+    logProcessOneSlice(stripeIx, diskData, cacheData);
+
+    if (cacheData == null && diskData.getRowCount() == 0) {
+      return true; // Nothing to process.
+    }
+    ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
+    LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+    if (cacheData != null) {
+      // Don't validate column count - no encodings for vectors.
+      validateCacheAndDisk(cacheData, diskData.getRowCount(), -1, diskData);
+    }
+    SerDeStripeMetadata metadata = new SerDeStripeMetadata(stripeIx);
+    metadata.setEncodings(Arrays.asList(cacheEncodings == null
+        ? new ColumnEncoding[splitIncludes.length] : cacheEncodings));
+    metadata.setRowCount(diskData.getRowCount());
+    if (LlapIoImpl.LOG.isTraceEnabled()) {
+      LlapIoImpl.LOG.trace("Derived stripe metadata for this split is " + metadata);
+    }
+    consumer.setStripeMetadata(metadata);
+
+    OrcEncodedColumnBatch ecb = ECB_POOL.take();
+    ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length);
+    int vectorsIx = 0;
+    for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+      if (!writerIncludes[colIx]) continue;
+      if (splitIncludes[colIx]) {
+        // Skip the 0-th column, since it won't have a vector after reading the text source.
+        if (colIx != 0 ) {
+          List<ColumnVector> vectors = diskData.getVectors(vectorsIx++);
+          if (LlapIoImpl.LOG.isTraceEnabled()) {
+            LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": " + vectors);
+          }
+          ecb.initColumnWithVectors(colIx, vectors);
+        } else {
+          ecb.initColumn(0, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+        }
+      } else {
+        ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+        processColumnCacheData(cacheBuffers, ecb, colIx);
+      }
+    }
+    if (processStop()) {
+      recordReaderTime(startTime);
+      return false;
+    }
+    return sendEcbToConsumer(ecb, cacheData != null, null);
+  }
+
+
+  private void processAsyncCacheData(CacheWriter.CacheStripeData diskData,
+      boolean[] splitIncludes) throws IOException {
+    StripeData sliceToCache = new StripeData(diskData.knownTornStart, diskData.firstRowStart,
+        diskData.lastRowStart, diskData.lastRowEnd, diskData.rowCount,
+        diskData.encodings.toArray(new ColumnEncoding[diskData.encodings.size()]));
+    for (int colIx = 0; colIx < splitIncludes.length; ++colIx) {
+      if (!splitIncludes[colIx]) continue;
+      // The column has been read from disk.
+      List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
+      LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
+      if (streams == null) continue; // Struct column, such as root?
+      Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
+      while (iter.hasNext()) {
+        CacheWriter.CacheStreamData stream = iter.next();
+        if (stream.isSuppressed) {
+          if (LlapIoImpl.LOG.isTraceEnabled()) {
+            LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+          }
+          iter.remove();
+          discardUncachedBuffers(stream.data);
+          continue;
+        }
+        setStreamDataToCache(newCacheDataForCol, stream);
+      }
+    }
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.CACHE_LOGGER.trace("Data to cache from async read " + sliceToCache);
+    }
+    try {
+      cacheFileData(sliceToCache);
+    } finally {
+      unlockAllBuffers(sliceToCache);
+    }
+  }
+
+  private StripeData createSliceToCache(
+      CacheWriter.CacheStripeData diskData, StripeData cacheData) throws IOException {
+    assert diskData != null;
+    if (cacheData == null) {
+      return new StripeData(diskData.knownTornStart, diskData.firstRowStart,
+          diskData.lastRowStart, diskData.lastRowEnd, diskData.rowCount,
+          diskData.encodings.toArray(new ColumnEncoding[diskData.encodings.size()]));
+    } else {
+      long rowCount = diskData.rowCount, encodingCount = diskData.encodings.size();
+      validateCacheAndDisk(cacheData, rowCount, encodingCount, diskData);
+      if (LlapIoImpl.LOG.isDebugEnabled()) {
+        LlapIoImpl.LOG.debug("Creating slice to cache in addition to an existing slice "
+          + cacheData.toCoordinateString() + "; disk offsets were "
+          + diskData.toCoordinateString());
+      }
+      // Note: we could just do what we already do above from disk data, except for the validation
+      // that is not strictly necessary, and knownTornStart which is an optimization.
+      StripeData sliceToCache = StripeData.duplicateStructure(cacheData);
+      for (int i = 0; i < diskData.encodings.size(); ++i) {
+        sliceToCache.getEncodings()[i] = diskData.encodings.get(i);
+      }
+      sliceToCache.setKnownTornStart(Math.min(
+          diskData.knownTornStart, sliceToCache.getKnownTornStart()));
+      return sliceToCache;
+    }
+  }
+
+
+  private static LlapDataBuffer[][] createArrayToCache(
+      StripeData sliceToCache, int colIx, List<CacheWriter.CacheStreamData> streams) {
+    if (LlapIoImpl.LOG.isTraceEnabled()) {
+      LlapIoImpl.LOG.trace("Processing streams for column " + colIx + ": " + streams);
+    }
+    LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+        = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+    return newCacheDataForCol;
+  }
+
+  private static int setStreamDataToCache(
+      LlapDataBuffer[][] newCacheDataForCol, CacheWriter.CacheStreamData stream) {
+    int streamIx = stream.name.getKind().getNumber();
+    // This is kinda hacky - we "know" these are LlapDataBuffer-s.
+    newCacheDataForCol[streamIx] = stream.data.toArray(new LlapDataBuffer[stream.data.size()]);
+    return streamIx;
+  }
+
+  private void processColumnCacheData(LlapDataBuffer[][][] cacheBuffers,
+      OrcEncodedColumnBatch ecb, int colIx) {
+    // The column has been obtained from cache.
+    LlapDataBuffer[][] colData = cacheBuffers[colIx];
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.CACHE_LOGGER.trace("Processing cache data for column " + colIx + ": "
+          + SerDeLowLevelCacheImpl.toString(colData));
+    }
+    for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
+      if (colData[streamIx] == null) continue;
+      ColumnStreamData cb = CSD_POOL.take();
+      cb.incRef();
+      cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
+      ecb.setStreamData(colIx, streamIx, cb);
+    }
+  }
+
+  private void logProcessOneSlice(int stripeIx, Object diskData, StripeData cacheData) {
+    String sliceStr = cacheData == null ? "null" : cacheData.toCoordinateString();
+    if (LlapIoImpl.LOG.isDebugEnabled()) {
+      LlapIoImpl.LOG.debug("Processing slice #" + stripeIx + " " + sliceStr + "; has"
+        + ((cacheData == null) ? " no" : "") + " cache data; has"
+        + ((diskData == null) ? " no" : "") + " disk data");
+    }
   }
 
   private void discardUncachedBuffers(List<MemoryBuffer> list) {
@@ -1003,126 +1184,309 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     return Lists.newArrayList(combinedEncodings);
   }
 
+  private static class Vectors {
+    private final List<ColumnVector>[] data;
+    private final boolean isSupported;
+    private final long rowCount;
+
+    @SuppressWarnings("unchecked")
+    public Vectors(List<VectorizedRowBatch> vrbs) {
+      if (vrbs == null) {
+        isSupported = false;
+        data = null;
+        rowCount = 0;
+        return;
+      }
+      isSupported = true;
+      if (vrbs.isEmpty()) {
+        data = null;
+        rowCount = 0;
+        return;
+      }
+      data = new List[vrbs.get(0).numCols];
+      for (int i = 0; i < data.length; ++i) {
+        data[i] = new ArrayList<>(vrbs.size());
+      }
+      int rowCount = 0;
+      for (VectorizedRowBatch vrb : vrbs) {
+        assert !vrb.selectedInUse;
+        rowCount += vrb.size;
+        for (int i = 0; i < vrb.cols.length; ++i) {
+          data[i].add(vrb.cols[i]);
+        }
+      }
+      this.rowCount = rowCount;
+    }
+
+    public List<ColumnVector> getVectors(int ix) {
+      return data[ix];
+    }
+
+    public long getRowCount() {
+      return rowCount;
+    }
+
+    public boolean isSupported() {
+      return isSupported;
+    }
+
+    @Override
+    public String toString() {
+      return "Vectors {isSupported=" + isSupported + ", rowCount=" + rowCount
+          + ", data=" + Arrays.toString(data) + "}";
+    }
+  }
+
+  /**
+   * This class only exists because Java doesn't have yield return. The original method
+   * before this change only needed yield return-s sprinkled here and there; however,
+   * Java developers are usually paid by class, so here we go.
+   */
+  private static class FileReaderYieldReturn {
+    private ReaderWithOffsets offsetReader;
+    private int rowsPerSlice = 0;
+    private long currentKnownTornStart;
+    private long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
+    private boolean hasUnsplittableData = false;
+    private final EncodingWriter writer;
+    private final boolean maySplitTheSplit;
+    private final int targetSliceRowCount;
+    private final FileSplit split;
+
+    public FileReaderYieldReturn(ReaderWithOffsets offsetReader, FileSplit split, EncodingWriter writer,
+        boolean maySplitTheSplit, int targetSliceRowCount) {
+      this.offsetReader = offsetReader;
+      currentKnownTornStart = split.getStart();
+      this.writer = writer;
+      this.maySplitTheSplit = maySplitTheSplit;
+      this.targetSliceRowCount = targetSliceRowCount;
+      this.split = split;
+    }
+
+    public CacheWriter getCacheWriter() throws IOException {
+      return writer.getCacheWriter();
+    }
+
+    public Vectors readNextSlice() throws IOException {
+      if (offsetReader == null) return null;
+      try {
+        while (offsetReader.next()) {
+          hasUnsplittableData = true;
+          Writable value = offsetReader.getCurrentRow();
+          lastStartOffset = offsetReader.getCurrentRowStartOffset();
+          if (firstStartOffset == Long.MIN_VALUE) {
+            firstStartOffset = lastStartOffset;
+          }
+          writer.writeOneRow(value);
+
+          if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
+            assert offsetReader.hasOffsets();
+            writer.flushIntermediateData();
+            long fileOffset = offsetReader.getCurrentRowEndOffset();
+            // Must support offsets to be able to split.
+            if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+              throw new AssertionError("Unable to get offsets from "
+                  + offsetReader.getClass().getSimpleName());
+            }
+            writer.setCurrentStripeOffsets(
+                currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+            writer.writeIntermediateFooter();
+
+            // Split starting at row start will not read that row.
+            currentKnownTornStart = lastStartOffset;
+            // Row offsets will be determined from the reader (we could set the first from last).
+            lastStartOffset = Long.MIN_VALUE;
+            firstStartOffset = Long.MIN_VALUE;
+            rowsPerSlice = 0;
+            return new Vectors(writer.extractCurrentVrbs());
+          }
+        }
+        try {
+          Vectors result = null;
+          if (rowsPerSlice > 0 || (!maySplitTheSplit && hasUnsplittableData)) {
+            long fileOffset = -1;
+            if (!offsetReader.hasOffsets()) {
+              // The reader doesn't support offsets. We adjust offsets to match future splits.
+              // If cached split was starting at row start, that row would be skipped, so +1
+              firstStartOffset = split.getStart() + 1;
+              // Last row starting at the end of the split would be read.
+              lastStartOffset = split.getStart() + split.getLength();
+              // However, it must end after the split end, otherwise the next one would have been read.
+              fileOffset = lastStartOffset + 1;
+              if (LlapIoImpl.CACHE_LOGGER.isDebugEnabled()) {
+                LlapIoImpl.CACHE_LOGGER.debug("Cache offsets based on the split - 'first row' at "
+                  + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
+              }
+            } else {
+              fileOffset = offsetReader.getCurrentRowEndOffset();
+              assert firstStartOffset >= 0 && lastStartOffset >= 0 && fileOffset >= 0;
+            }
+            writer.setCurrentStripeOffsets(
+                currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+            // Close the writer to finalize the metadata.
+            writer.close();
+            result = new Vectors(writer.extractCurrentVrbs());
+          } else {
+            writer.close();
+          }
+          return result;
+        } finally {
+          closeOffsetReader();
+        }
+      } catch (Exception ex) {
+        closeOffsetReader();
+        throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+      }
+    }
+
+    private void closeOffsetReader() {
+      if (offsetReader == null) return;
+      try {
+        offsetReader.close();
+      } catch (Exception ex) {
+        LlapIoImpl.LOG.error("Failed to close source reader", ex);
+      }
+      offsetReader = null;
+    }
+  }
 
-  public void readSplitFromFile(FileSplit split, boolean[] splitIncludes, StripeData slice)
-      throws IOException {
+  public void startReadSplitFromFile(
+      FileSplit split, boolean[] splitIncludes, StripeData slice) throws IOException {
     boolean maySplitTheSplit = slice == null;
     ReaderWithOffsets offsetReader = null;
     @SuppressWarnings("rawtypes")
     RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
     try {
       offsetReader = createOffsetReader(sourceReader);
-      maySplitTheSplit = maySplitTheSplit && offsetReader.hasOffsets();
+      sourceReader = null;
+    } finally {
+      if (sourceReader != null) {
+        try {
+          sourceReader.close();
+        } catch (Exception ex) {
+          LlapIoImpl.LOG.error("Failed to close source reader", ex);
+        }
+      }
+    }
+    maySplitTheSplit = maySplitTheSplit && offsetReader.hasOffsets();
 
-      // writer writes to orcWriter which writes to cacheWriter
-      // TODO: in due course, writer will also propagate row batches if it's capable
+    try {
       StructObjectInspector originalOi = (StructObjectInspector)getOiFromSerDe();
-      writer = VertorDeserializeOrcWriter.create(sourceInputFormat, sourceSerDe, parts,
-          daemonConf, jobConf, split.getPath(), originalOi, columnIds);
-      cacheWriter = new CacheWriter(
-          bufferManager, allocSize, columnIds, splitIncludes, writer.hasIncludes());
-      writer.init(OrcFile.createWriter(split.getPath(),
-          createOrcWriterOptions(writer.getDestinationOi())));
-
-      int rowsPerSlice = 0;
-      long currentKnownTornStart = split.getStart();
-      long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
-      boolean hasData = false;
-      while (offsetReader.next()) {
-        hasData = true;
-        Writable value = offsetReader.getCurrentRow();
-        lastStartOffset = offsetReader.getCurrentRowStartOffset();
-        if (firstStartOffset == Long.MIN_VALUE) {
-          firstStartOffset = lastStartOffset;
-        }
-        writer.writeOneRow(value);
-
-        if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
-          assert offsetReader.hasOffsets();
-          writer.flushIntermediateData();
-          long fileOffset = offsetReader.getCurrentRowEndOffset();
-          // Must support offsets to be able to split.
-          if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
-            throw new AssertionError("Unable to get offsets from "
-                + offsetReader.getClass().getSimpleName());
-          }
-          cacheWriter.setCurrentStripeOffsets(
-              currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
-          // Split starting at row start will not read that row.
-          currentKnownTornStart = lastStartOffset;
-          // Row offsets will be determined from the reader (we could set the first from last).
-          lastStartOffset = Long.MIN_VALUE;
-          firstStartOffset = Long.MIN_VALUE;
-          rowsPerSlice = 0;
-          writer.writeIntermediateFooter();
-        }
+      List<Integer> splitColumnIds = OrcInputFormat.genIncludedColumnsReverse(
+          schema, splitIncludes, false);
+      // fileread writes to the writer, which writes to orcWriter, which writes to cacheWriter
+      EncodingWriter writer = VertorDeserializeOrcWriter.create(
+          sourceInputFormat, sourceSerDe, parts, daemonConf, jobConf, split.getPath(), originalOi,
+          splitColumnIds, splitIncludes, allocSize);
+      // TODO: move this into ctor? EW would need to create CacheWriter then
+      List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds;
+      writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
+          writer.isOnlyWritingIncludedColumns()), daemonConf, split.getPath());
+      if (writer instanceof VertorDeserializeOrcWriter) {
+        VertorDeserializeOrcWriter asyncWriter = (VertorDeserializeOrcWriter)writer;
+        asyncWriter.startAsync(new AsyncCacheDataCallback());
+        this.asyncWriters.add(asyncWriter);
       }
-      if (rowsPerSlice > 0 || (!maySplitTheSplit && hasData)) {
-        long fileOffset = -1;
-        if (!offsetReader.hasOffsets()) {
-          // The reader doesn't support offsets. We adjust offsets to match future splits.
-          // If cached split was starting at row start, that row would be skipped, so +1
-          firstStartOffset = split.getStart() + 1;
-          // Last row starting at the end of the split would be read.
-          lastStartOffset = split.getStart() + split.getLength();
-          // However, it must end after the split end, otherwise the next one would have been read.
-          fileOffset = lastStartOffset + 1;
-          if (LlapIoImpl.CACHE_LOGGER.isDebugEnabled()) {
-            LlapIoImpl.CACHE_LOGGER.debug("Cache offsets based on the split - 'first row' at "
-              + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
-          }
-        } else {
-          fileOffset = offsetReader.getCurrentRowEndOffset();
-          assert firstStartOffset >= 0 && lastStartOffset >= 0 && fileOffset >= 0;
+      currentFileRead = new FileReaderYieldReturn(
+          offsetReader, split, writer, maySplitTheSplit, targetSliceRowCount);
+    } finally {
+      // Assignment is the last thing in the try, so if it happen we assume success.
+      if (currentFileRead != null) return;
+      if (offsetReader == null) return;
+      try {
+        offsetReader.close();
+      } catch (Exception ex) {
+        LlapIoImpl.LOG.error("Failed to close source reader", ex);
+      }
+    }
+  }
+
+  private class AsyncCacheDataCallback implements AsyncCallback {
+    @Override
+    public void onComplete(VertorDeserializeOrcWriter writer) {
+      CacheWriter cacheWriter = null;
+      try {
+        cacheWriter = writer.getCacheWriter();
+        // What we were reading from disk originally.
+        boolean[] cacheIncludes = writer.getOriginalCacheIncludes();
+        Iterator<CacheWriter.CacheStripeData> iter = cacheWriter.stripes.iterator();
+        while (iter.hasNext()) {
+          processAsyncCacheData(iter.next(), cacheIncludes);
+          iter.remove();
         }
-        cacheWriter.setCurrentStripeOffsets(
-            currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+      } catch (IOException e) {
+        LlapIoImpl.LOG.error("Failed to cache async data", e);
+      } finally {
+        cacheWriter.discardData();
       }
-      // Close the writer to finalize the metadata. No catch since we cannot go on if this throws.
-      writer.close();
-      writer = null;
-    } finally {
-      // We don't need the source reader anymore.
-      if (offsetReader != null) {
+    }
+  }
+
+  // TODO: this interface is ugly. The two implementations are so far apart feature-wise
+  //       after all the perf changes that we might was well hardcode them separately.
+  static abstract class EncodingWriter {
+    protected Writer orcWriter;
+    protected CacheWriter cacheWriter;
+    protected final StructObjectInspector sourceOi;
+    private final int allocSize;
+
+    public EncodingWriter(StructObjectInspector sourceOi, int allocSize) {
+      this.sourceOi = sourceOi;
+      this.allocSize = allocSize;
+    }
+
+
+    public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException {
+      this.orcWriter = createOrcWriter(cacheWriter, conf, path, sourceOi);
+      this.cacheWriter = cacheWriter;
+    }
+
+    public CacheWriter getCacheWriter() {
+      return cacheWriter;
+    }
+    public abstract boolean isOnlyWritingIncludedColumns();
+
+    public abstract void writeOneRow(Writable row) throws IOException;
+    public abstract void setCurrentStripeOffsets(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long fileOffset);
+    public abstract void flushIntermediateData() throws IOException;
+    public abstract void writeIntermediateFooter() throws IOException;
+    public abstract List<VectorizedRowBatch> extractCurrentVrbs();
+    public void close() throws IOException {
+      if (orcWriter != null) {
         try {
-          offsetReader.close();
+          orcWriter.close();
+          orcWriter = null;
         } catch (Exception ex) {
-          LlapIoImpl.LOG.error("Failed to close source reader", ex);
+          LlapIoImpl.LOG.error("Failed to close ORC writer", ex);
         }
-      } else {
-        assert sourceReader != null;
+      }
+      if (cacheWriter != null) {
         try {
-          sourceReader.close();
+          cacheWriter.discardData();
+          cacheWriter = null;
         } catch (Exception ex) {
-          LlapIoImpl.LOG.error("Failed to close source reader", ex);
+          LlapIoImpl.LOG.error("Failed to close cache writer", ex);
         }
       }
     }
-  }
 
-  interface EncodingWriter {
-    void writeOneRow(Writable row) throws IOException;
-    StructObjectInspector getDestinationOi();
-    void init(Writer orcWriter);
-    boolean hasIncludes();
-    void writeIntermediateFooter() throws IOException;
-    void flushIntermediateData() throws IOException;
-    void close() throws IOException;
+    protected Writer createOrcWriter(CacheWriter cacheWriter, Configuration conf,
+        Path path, StructObjectInspector oi) throws IOException {
+      // TODO: this is currently broken. We need to set memory manager to a bogus implementation
+      //       to avoid problems with memory manager actually tracking the usage.
+      return OrcFile.createWriter(path, createOrcWriterOptions(
+          sourceOi, conf, cacheWriter, allocSize));
+    }
   }
 
-  static class DeserialerOrcWriter implements EncodingWriter {
-    private Writer orcWriter;
+  static class DeserializerOrcWriter extends EncodingWriter {
     private final Deserializer sourceSerDe;
-    private final StructObjectInspector sourceOi;
 
-    public DeserialerOrcWriter(Deserializer sourceSerDe, StructObjectInspector sourceOi) {
+    public DeserializerOrcWriter(
+        Deserializer sourceSerDe, StructObjectInspector sourceOi, int allocSize) {
+      super(sourceOi, allocSize);
       this.sourceSerDe = sourceSerDe;
-      this.sourceOi = sourceOi;
-    }
-
-    @Override
-    public void init(Writer orcWriter) {
-      this.orcWriter = orcWriter;
     }
 
     @Override
@@ -1152,22 +1516,30 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
 
     @Override
-    public boolean hasIncludes() {
+    public boolean isOnlyWritingIncludedColumns() {
       return false; // LazySimpleSerDe doesn't support projection.
     }
 
     @Override
-    public StructObjectInspector getDestinationOi() {
-      return sourceOi;
+    public void setCurrentStripeOffsets(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long fileOffset) {
+      cacheWriter.setCurrentStripeOffsets(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+    }
+
+    @Override
+    public List<VectorizedRowBatch> extractCurrentVrbs() {
+      return null; // Doesn't support creating VRBs.
     }
   }
 
-  private WriterOptions createOrcWriterOptions(ObjectInspector sourceOi) throws IOException {
-    return OrcFile.writerOptions(daemonConf).stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
+  static WriterOptions createOrcWriterOptions(ObjectInspector sourceOi,
+      Configuration conf, CacheWriter cacheWriter, int allocSize) throws IOException {
+    return OrcFile.writerOptions(conf).stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
         .rowIndexStride(Integer.MAX_VALUE) // For now, do not limit this - one RG per split
         .blockPadding(false).compress(CompressionKind.NONE).version(Version.CURRENT)
         .encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi)
-        .physicalWriter(cacheWriter);
+        .physicalWriter(cacheWriter).bufferSize(allocSize);
   }
 
   private ObjectInspector getOiFromSerDe() throws IOException {
@@ -1178,7 +1550,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     }
   }
 
-  private ReaderWithOffsets createOffsetReader(RecordReader sourceReader) {
+  private ReaderWithOffsets createOffsetReader(RecordReader<?, ?> sourceReader) {
     if (LlapIoImpl.LOG.isDebugEnabled()) {
       LlapIoImpl.LOG.debug("Using " + sourceReader.getClass().getSimpleName() + " to read data");
     }
@@ -1206,9 +1578,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
-      boolean hasCachedSlice, CacheWriter.CacheStripeData writerData) {
+      boolean hasCachedSlice, CacheWriter.CacheStripeData diskData) {
     if (ecb == null) { // This basically means stop has been called.
-      cleanupReaders();
+      cleanup(true);
       return false;
     }
     LlapIoImpl.LOG.trace("Sending a batch over to consumer");
@@ -1216,29 +1588,32 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     if (hasCachedSlice) {
       cachedData.getData().remove(0); // See javadoc - no need to clean up the cache data anymore.
     }
-    if (writerData != null) {
-      writerData.colStreams.clear();
+    if (diskData != null) {
+      diskData.colStreams.clear();
     }
     return true;
   }
 
-
-  private void cleanupReaders() {
-    if (writer != null) {
+  private void cleanup(boolean isError) {
+    cleanUpCurrentRead();
+    if (!isError) return;
+    for (VertorDeserializeOrcWriter asyncWriter : asyncWriters) {
       try {
-        writer.close();
-        writer = null;
+        asyncWriter.interrupt();
       } catch (Exception ex) {
-        LlapIoImpl.LOG.error("Failed to close ORC writer", ex);
+        LlapIoImpl.LOG.warn("Failed to interrupt an async writer", ex);
       }
     }
-    if (cacheWriter != null) {
-      try {
-        cacheWriter.discardData();
-        cacheWriter = null;
-      } catch (Exception ex) {
-        LlapIoImpl.LOG.error("Failed to close cache writer", ex);
-      }
+    asyncWriters.clear();
+  }
+
+  private void cleanUpCurrentRead() {
+    if (currentFileRead == null) return;
+    try {
+      currentFileRead.closeOffsetReader();
+      currentFileRead = null;
+    } catch (Exception ex) {
+      LlapIoImpl.LOG.error("Failed to close current file reader", ex);
     }
   }
 
@@ -1249,7 +1624,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   private boolean processStop() {
     if (!isStopped) return false;
     LlapIoImpl.LOG.info("SerDe-based data reader is stopping");
-    cleanupReaders();
+    cleanup(true);
     return true;
   }
 
@@ -1262,11 +1637,11 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic);
   }
 
-  // TODO: move to a superclass?
   @Override
   public void returnData(OrcEncodedColumnBatch ecb) {
     for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
       if (!ecb.hasData(colIx)) continue;
+      // TODO: reuse columnvector-s on hasBatch - save the array by column? take apart each list.
       ColumnStreamData[] datas = ecb.getColumnData(colIx);
       for (ColumnStreamData data : datas) {
         if (data == null || data.decRef() != 0) continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
index 63a3be2..86d9ecc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.llap.io.encoded;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -30,7 +32,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserialerOrcWriter;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserializerOrcWriter;
 import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.EncodingWriter;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
@@ -57,73 +60,90 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 
 /** The class that writes rows from a text reader to an ORC writer using VectorDeserializeRow. */
-class VertorDeserializeOrcWriter implements EncodingWriter {
+class VertorDeserializeOrcWriter extends EncodingWriter implements Runnable {
+  private final VectorizedRowBatchCtx vrbCtx;
   private Writer orcWriter;
   private final LazySimpleDeserializeRead deserializeRead;
   private final VectorDeserializeRow<?> vectorDeserializeRow;
-  private final VectorizedRowBatch sourceBatch, destinationBatch;
-  private final boolean hasIncludes;
   private final StructObjectInspector destinationOi;
+  private final boolean usesSourceIncludes;
+  private final List<Integer> sourceIncludes;
+
+  private final boolean isAsync;
+  private final Thread orcThread;
+  private final ConcurrentLinkedQueue<WriteOperation> queue;
+  private AsyncCallback completion;
+
+  // Stored here only as async operation context.
+  private final boolean[] cacheIncludes;
+
+  private VectorizedRowBatch sourceBatch, destinationBatch;
+  private List<VectorizedRowBatch> currentBatches;
 
   // TODO: if more writers are added, separate out an EncodingWriterFactory
   public static EncodingWriter create(InputFormat<?, ?> sourceIf, Deserializer serDe,
-      Map<Path, PartitionDesc> parts, Configuration daemonConf,
-      Configuration jobConf, Path splitPath, StructObjectInspector sourceOi,
-      List<Integer> includes) throws IOException {
+      Map<Path, PartitionDesc> parts, Configuration daemonConf, Configuration jobConf,
+      Path splitPath, StructObjectInspector sourceOi, List<Integer> sourceIncludes,
+      boolean[] cacheIncludes, int allocSize) throws IOException {
     // Vector SerDe can be disabled both on client and server side.
     if (!HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
         || !HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
         || !(sourceIf instanceof TextInputFormat) || !(serDe instanceof LazySimpleSerDe)) {
-      return new DeserialerOrcWriter(serDe, sourceOi);
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
     }
     Path path = splitPath.getFileSystem(daemonConf).makeQualified(splitPath);
     PartitionDesc partDesc = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
         parts, path, null);
     if (partDesc == null) {
       LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: no partition desc for " + path);
-      return new DeserialerOrcWriter(serDe, sourceOi);
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
     }
     Properties tblProps = partDesc.getTableDesc().getProperties();
     if ("true".equalsIgnoreCase(tblProps.getProperty(
         serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST))) {
       LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter due to "
         + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST);
-      return new DeserialerOrcWriter(serDe, sourceOi);
+      return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
     }
     for (StructField sf : sourceOi.getAllStructFieldRefs()) {
       Category c = sf.getFieldObjectInspector().getCategory();
       if (c != Category.PRIMITIVE) {
         LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: " + c + " is not supported");
-        return new DeserialerOrcWriter(serDe, sourceOi);
+        return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
       }
     }
     LlapIoImpl.LOG.info("Creating VertorDeserializeOrcWriter for " + path);
-    return new VertorDeserializeOrcWriter(daemonConf, tblProps, sourceOi, includes);
+    return new VertorDeserializeOrcWriter(
+        daemonConf, tblProps, sourceOi, sourceIncludes, cacheIncludes, allocSize);
   }
 
   private VertorDeserializeOrcWriter(Configuration conf, Properties tblProps,
-      StructObjectInspector sourceOi, List<Integer> columnIds) throws IOException {
+      StructObjectInspector sourceOi, List<Integer> sourceIncludes, boolean[] cacheIncludes,
+      int allocSize) throws IOException {
+    super(sourceOi, allocSize);
     // See also: the usage of VectorDeserializeType, for binary. For now, we only want text.
-    VectorizedRowBatchCtx vrbCtx = createVrbCtx(sourceOi);
+    this.vrbCtx = createVrbCtx(sourceOi);
+    this.sourceIncludes = sourceIncludes;
+    this.cacheIncludes = cacheIncludes;
     this.sourceBatch = vrbCtx.createVectorizedRowBatch();
     deserializeRead = new LazySimpleDeserializeRead(vrbCtx.getRowColumnTypeInfos(),
         /* useExternalBuffer */ true, createSerdeParams(conf, tblProps));
     vectorDeserializeRow = new VectorDeserializeRow<LazySimpleDeserializeRead>(deserializeRead);
     int colCount = vrbCtx.getRowColumnTypeInfos().length;
     boolean[] includes = null;
-    this.hasIncludes = columnIds.size() < colCount;
-    if (hasIncludes) {
+    this.usesSourceIncludes = sourceIncludes.size() < colCount;
+    if (usesSourceIncludes) {
       // VectorDeserializeRow produces "sparse" VRB when includes are used; we need to write the
       // "dense" VRB to ORC. Ideally, we'd use projection columns, but ORC writer doesn't use them.
       // In any case, we would also need to build a new OI for OrcWriter config.
       // This is why OrcWriter is created after this writer, by the way.
-      this.destinationBatch = new VectorizedRowBatch(columnIds.size());
+      this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
       includes = new boolean[colCount];
       int inclBatchIx = 0;
-      List<String> childNames = new ArrayList<>(columnIds.size());
-      List<ObjectInspector> childOis = new ArrayList<>(columnIds.size());
+      List<String> childNames = new ArrayList<>(sourceIncludes.size());
+      List<ObjectInspector> childOis = new ArrayList<>(sourceIncludes.size());
       List<? extends StructField> sourceFields = sourceOi.getAllStructFieldRefs();
-      for (Integer columnId : columnIds) {
+      for (Integer columnId : sourceIncludes) {
         includes[columnId] = true;
         assert inclBatchIx <= columnId;
         // Note that we use the same vectors in both batches. Clever, very clever.
@@ -135,7 +155,7 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
       // This is only used by ORC to derive the structure. Most fields are unused.
       destinationOi = new LazySimpleStructObjectInspector(
           childNames, childOis, null, (byte)0, null);
-      destinationBatch.setPartitionInfo(columnIds.size(), 0);
+      destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
       if (LlapIoImpl.LOG.isDebugEnabled()) {
         LlapIoImpl.LOG.debug("Includes for deserializer are " + DebugUtils.toString(includes));
       }
@@ -154,6 +174,23 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
         throw new IOException(e);
       }
     }
+    this.isAsync = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED);
+    if (isAsync) {
+      currentBatches = new LinkedList<>();
+      queue = new ConcurrentLinkedQueue<>();
+      orcThread = new Thread(this);
+      orcThread.setDaemon(true);
+      orcThread.setName(Thread.currentThread().getName() + "-OrcEncode");
+    } else {
+      queue = null;
+      orcThread = null;
+      currentBatches = null;
+    }
+  }
+
+  public void startAsync(AsyncCallback callback) {
+    this.completion = callback;
+    this.orcThread.start();
   }
 
   private static VectorizedRowBatchCtx createVrbCtx(StructObjectInspector oi) throws IOException {
@@ -176,13 +213,57 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
   }
 
   @Override
-  public boolean hasIncludes() {
-    return hasIncludes;
+  public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException {
+    this.orcWriter = super.createOrcWriter(cacheWriter, conf, path, destinationOi);
+    this.cacheWriter = cacheWriter;
+  }
+
+  public interface AsyncCallback {
+    void onComplete(VertorDeserializeOrcWriter writer);
   }
 
   @Override
-  public StructObjectInspector getDestinationOi() {
-    return destinationOi;
+  public void run() {
+    while (true) {
+      WriteOperation op = null;
+      int fallbackMs = 8;
+      while (true) {
+        op = queue.poll();
+        if (op != null) break;
+        if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins.
+          LlapIoImpl.LOG.error("ORC encoder timed out waiting for input");
+          discardData();
+          return;
+        }
+        try {
+          Thread.sleep(fallbackMs);
+        } catch (InterruptedException e) {
+          LlapIoImpl.LOG.error("ORC encoder interrupted waiting for input");
+          discardData();
+          return;
+        }
+        fallbackMs <<= 1;
+      }
+      try {
+        if (op.apply(orcWriter, cacheWriter)) {
+          LlapIoImpl.LOG.info("ORC encoder received a exit event");
+          completion.onComplete(this);
+          return;
+        }
+      } catch (Exception e) {
+        LlapIoImpl.LOG.error("ORC encoder failed", e);
+        discardData();
+        return;
+      }
+    }
+  }
+
+  private void discardData() {
+    try {
+      cacheWriter.discardData();
+    } catch (Exception ex) {
+      LlapIoImpl.LOG.error("Failed to close an async cache writer", ex);
+    }
   }
 
   @Override
@@ -196,7 +277,7 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
 
     // Deserialize and append new row using the current batch size as the index.
     try {
-      // TODO: can we use ByRef? Probably not, need to see text record reader.
+      // Not using ByRef now since it's unsafe for text readers. Might be safe for others.
       vectorDeserializeRow.deserialize(sourceBatch, sourceBatch.size++);
     } catch (Exception e) {
       throw new IOException("DeserializeRead detail: "
@@ -206,19 +287,36 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
 
   private void flushBatch() throws IOException {
     addBatchToWriter();
-
-    for (int c = 0; c < sourceBatch.cols.length; ++c) {
-      // This resets vectors in both batches.
-      ColumnVector colVector = sourceBatch.cols[c];
-      if (colVector != null) {
-        colVector.reset();
-        colVector.init();
+    if (!isAsync) {
+      for (int c = 0; c < sourceBatch.cols.length; ++c) {
+        // This resets vectors in both batches.
+        ColumnVector colVector = sourceBatch.cols[c];
+        if (colVector != null) {
+          colVector.reset();
+          colVector.init();
+        }
+      }
+      sourceBatch.selectedInUse = false;
+      sourceBatch.size = 0;
+      sourceBatch.endOfFile = false;
+      propagateSourceBatchFieldsToDest();
+    } else {
+      // In addBatchToWriter, we have passed the batch to both ORC and operator pipeline
+      // (neither ever changes the vectors). We'd need a set of vectors batch to write to.
+      // TODO: for now, create this from scratch. Ideally we should return the vectors from ops.
+      //       We could also have the ORC thread create it for us in its spare time...
+      this.sourceBatch = vrbCtx.createVectorizedRowBatch();
+      if (usesSourceIncludes) {
+        this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
+        int inclBatchIx = 0;
+        for (Integer columnId : sourceIncludes) {
+          destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId];
+        }
+        destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
+      } else {
+        this.destinationBatch = sourceBatch;
       }
     }
-    sourceBatch.selectedInUse = false;
-    sourceBatch.size = 0;
-    sourceBatch.endOfFile = false;
-    propagateSourceBatchFieldsToDest();
   }
 
   private void propagateSourceBatchFieldsToDest() {
@@ -230,8 +328,12 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
 
   private void addBatchToWriter() throws IOException {
     propagateSourceBatchFieldsToDest();
-    // LlapIoImpl.LOG.info("Writing includeOnlyBatch " + s + "; data "+ includeOnlyBatch);
-    orcWriter.addRowBatch(destinationBatch);
+    if (!isAsync) {
+      orcWriter.addRowBatch(destinationBatch);
+    } else {
+      currentBatches.add(destinationBatch);
+      addWriteOp(new VrbOperation(destinationBatch));
+    }
   }
 
   @Override
@@ -243,7 +345,28 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
 
   @Override
   public void writeIntermediateFooter() throws IOException {
-    orcWriter.writeIntermediateFooter();
+    if (isAsync) {
+      addWriteOp(new IntermediateFooterOperation());
+    } else {
+      orcWriter.writeIntermediateFooter();
+    }
+  }
+
+  private void addWriteOp(WriteOperation wo) throws AssertionError {
+    if (queue.offer(wo)) return;
+    throw new AssertionError("Queue full"); // This should never happen with linked list queue.
+  }
+
+  @Override
+  public void setCurrentStripeOffsets(long currentKnownTornStart,
+      long firstStartOffset, long lastStartOffset, long fileOffset) {
+    if (isAsync) {
+      addWriteOp(new SetStripeDataOperation(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset));
+    } else {
+      cacheWriter.setCurrentStripeOffsets(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+    }
   }
 
   @Override
@@ -251,11 +374,85 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
     if (sourceBatch.size > 0) {
       addBatchToWriter();
     }
-    orcWriter.close();
+    if (!isAsync) {
+      orcWriter.close();
+    } else {
+      addWriteOp(new CloseOperation());
+    }
+  }
+
+  public List<VectorizedRowBatch> extractCurrentVrbs() {
+    if (!isAsync) return null;
+    List<VectorizedRowBatch> result = currentBatches;
+    currentBatches = new LinkedList<>();
+    return result;
+  }
+
+  private static interface WriteOperation {
+    boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException;
+  }
+
+  private static class VrbOperation implements WriteOperation {
+    private VectorizedRowBatch batch;
+
+    public VrbOperation(VectorizedRowBatch batch) {
+      // LlapIoImpl.LOG.debug("Adding batch " + batch);
+      this.batch = batch;
+    }
+
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      // LlapIoImpl.LOG.debug("Writing batch " + batch);
+      writer.addRowBatch(batch);
+      return false;
+    }
+  }
+
+  private static class IntermediateFooterOperation implements WriteOperation {
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      writer.writeIntermediateFooter();
+      return false;
+    }
+  }
+
+  private static class SetStripeDataOperation implements WriteOperation {
+    private final long currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset;
+    public SetStripeDataOperation(long currentKnownTornStart,
+        long firstStartOffset, long lastStartOffset, long fileOffset) {
+      this.currentKnownTornStart = currentKnownTornStart;
+      this.firstStartOffset = firstStartOffset;
+      this.lastStartOffset = lastStartOffset;
+      this.fileOffset = fileOffset;
+    }
+
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      cacheWriter.setCurrentStripeOffsets(
+          currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+      return false;
+    }
+  }
+
+  private static class CloseOperation implements WriteOperation {
+    @Override
+    public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+      writer.close();
+      return true; // The thread should stop after this. 
+    }
+  }
+
+  public boolean[] getOriginalCacheIncludes() {
+    return cacheIncludes;
   }
 
   @Override
-  public void init(Writer orcWriter) {
-    this.orcWriter = orcWriter;
+  public boolean isOnlyWritingIncludedColumns() {
+    return usesSourceIncludes;
+  }
+
+  public void interrupt() {
+    assert orcThread != null;
+    orcThread.interrupt();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 99cc506..369584b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -350,6 +350,45 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   }
 
   /**
+   * Reverses genIncludedColumns; produces the table columns indexes from ORC included columns.
+   * @param readerSchema The ORC reader schema for the table.
+   * @param included The included ORC columns.
+   * @param isFullColumnMatch Whether full column match should be enforced (i.e. whether to expect
+   *          that all the sub-columns or a complex type column should be included or excluded
+   *          together in the included array. If false, any sub-column being included for a complex
+   *          type is sufficient for the entire complex column to be included in the result.
+   * @return The list of table column indexes.
+   */
+  public static List<Integer> genIncludedColumnsReverse(
+      TypeDescription readerSchema, boolean[] included, boolean isFullColumnMatch) {
+    assert included != null;
+    List<Integer> result = new ArrayList<>();
+    List<TypeDescription> children = readerSchema.getChildren();
+    for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) {
+      TypeDescription child = children.get(columnNumber);
+      int id = child.getId();
+      int maxId = child.getMaximumId();
+      if (id >= included.length || maxId >= included.length) {
+        throw new AssertionError("Inconsistent includes: " + included.length
+            + " elements; found column ID " + id);
+      }
+      boolean isIncluded = included[id];
+      for (int col = id + 1; col <= maxId; ++col) {
+        if (isFullColumnMatch && included[col] != isIncluded) {
+          throw new AssertionError("Inconsistent includes: root column IDs are [" + id + ", "
+              + maxId + "]; included[" + col + "] = " + included[col] + ", which is different "
+              + " from the previous IDs of the same root column.");
+        }
+        isIncluded = isIncluded || included[col];
+      }
+      if (isIncluded) {
+        result.add(columnNumber);
+      }
+    }
+    return result;
+  }
+
+  /**
    * Take the configuration and figure out which columns we need to include.
    * @param readerSchema the types for the reader
    * @param conf the configuration