You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/27 22:48:43 UTC

[2/4] hive git commit: HIVE-11259 : LLAP: clean up ORC dependencies part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
index e66955d..e9c32ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -26,58 +28,60 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
-import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheListHelper;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunk;
+import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache;
+import org.apache.hadoop.hive.common.io.storage_api.DataReader;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
 import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
 import org.apache.hive.common.util.FixedSizedObjectPool;
 import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /**
  * Encoded reader implementation.
  *
  * Note about refcounts on cache blocks.
- * When we get or put blocks into cache, they are "locked" (refcount++). After that, we send the
- * blocks out to processor as part of RG data; one block can be used for multiple RGs. In some
- * cases, one block is sent for ALL rgs (e.g. a dictionary for string column). This is how we deal
- * with this:
- * For non-dictionary case:
- * 1) At all times, every buffer has +1 refcount for each time we sent this block to processing.
- * 2) When processor is done with an RG, it decrefs for all the blocks involved.
- * 3) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we send
- *    the block to processor, and the latter decrefs it, the block won't be evicted when we want
- *    to reuse it for some other RG, forcing us to do an extra disk read or cache lookup.
- * 4) As we read (we always read RGs in order, assume they are stored in physical order in the
- *    file, plus that CBs are not shared between streams, AND that we read each stream from the
- *    beginning), we note which blocks cannot possibly be reused anymore (next RG starts in the
- *    next CB). We decref for the refcount from (3) in such case.
- * 5) Given that RG end boundary in ORC is an estimate, so we can request data from cache and then
- *    not use it, at the end we go thru all the blocks, and release those not released by (4).
+ * When we get or put blocks into cache, they are "locked" (refcount++), so they cannot be evicted.
+ * We send the MemoryBuffer-s to caller as part of RG data; one MemoryBuffer can be used for many
+ * RGs (e.g. a dictionary, or multiple RGs per block). Also, we want to "unlock" MemoryBuffer-s in
+ * cache as soon as possible. This is how we deal with this:
+ *
  * For dictionary case:
- * 1) We have a separate refcount on the ColumnBuffer object we send to the processor (in the
- *    non-dictionary case, it's always 1, so when processor is done it goes directly to decrefing
- *    cache buffers).
- * 2) In the dictionary case, it's increased per RG, and processors don't touch cache buffers if
- *    they do not happen to decref this refcount to 0.
- * 3) This is done because dictionary can have many buffers; decrefing all of them for all RGs
- *    is more expensive; plus, decrefing in cache may be more expensive due to cache policy/etc.
+ * 1) There's a separate refcount on the StreamBuffer object we send to the caller. In the
+ *    dictionary case, it's increased per RG, and callers don't release MBs if the containing
+ *    StreamBuffer is not ready to be released. This is done because dictionary can have many
+ *    buffers; decrefing all of them for all RGs is more expensive; plus, decrefing in cache
+ *    may be more expensive due to cache policy/etc.
+ *
+ * For non-dictionary case:
+ * 1) All the StreamBuffer-s for normal data always have refcount 1, because we return then once.
+ * 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of SB.
+ * 3) When caller is done, it therefore decrefs SB to 0, and decrefs all the MBs involved.
+ * 4) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we return
+ *    the MB to caller, and he decrefs it, the MB can't be evicted and will be there if we want to
+ *    reuse it for some other RG.
+ * 5) As we read (we always read RGs in order and forward in each stream; we assume they are stored
+ *    physically in order in the file; AND that CBs are not shared between streams), we note which
+ *    MBs cannot possibly be reused anymore (next RG starts in the next CB). We decref the refcount
+ *    from (4) in such case.
+ * 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then
+ *    not use it; thus, at the end we go thru all the MBs, and release those not released by (5).
  */
 public class EncodedReaderImpl implements EncodedReader {
   public static final Log LOG = LogFactory.getLog(EncodedReaderImpl.class);
@@ -114,52 +118,67 @@ public class EncodedReaderImpl implements EncodedReader {
           t.reset();
         }
       });
-  public static final FixedSizedObjectPool<StreamBuffer> SB_POOL =
-      new FixedSizedObjectPool<>(8192, new PoolObjectHelper<StreamBuffer>() {
+  public static final FixedSizedObjectPool<ColumnStreamData> SB_POOL =
+      new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+        @Override
+        protected ColumnStreamData create() {
+          return new ColumnStreamData();
+        }
+        @Override
+        protected void resetBeforeOffer(ColumnStreamData t) {
+          t.reset();
+        }
+      });
+  private static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
+        @Override
+        protected CacheChunk create() {
+          return new CacheChunk();
+        }
+        @Override
+        protected void resetBeforeOffer(CacheChunk t) {
+          t.reset();
+        }
+      });
+  private static final FixedSizedObjectPool<ProcCacheChunk> PCC_POOL =
+      new FixedSizedObjectPool<>(1024, new PoolObjectHelper<ProcCacheChunk>() {
         @Override
-        protected StreamBuffer create() {
-          return new StreamBuffer();
+        protected ProcCacheChunk create() {
+          return new ProcCacheChunk();
         }
         @Override
-        protected void resetBeforeOffer(StreamBuffer t) {
+        protected void resetBeforeOffer(ProcCacheChunk t) {
           t.reset();
         }
       });
+  private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
+        @Override
+        public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+          CacheChunk tcc = TCC_POOL.take();
+          tcc.init(buffer, offset, end);
+          return tcc;
+        }
+      };
   private final long fileId;
-  private FSDataInputStream file;
+  private final DataReader dataReader;
+  private boolean isDataReaderOpen = false;
   private final CompressionCodec codec;
   private final int bufferSize;
   private final List<OrcProto.Type> types;
-  private ZeroCopyReaderShim zcr;
   private final long rowIndexStride;
-  private final LowLevelCache cache;
+  private final DataCache cache;
   private ByteBufferAllocatorPool pool;
-  // For now, one consumer for all calls.
-  private final Consumer<OrcEncodedColumnBatch> consumer;
-  private final LowLevelCacheCounters qfCounters;
-  private final FileSystem fs;
-  private final Path path;
-  private final boolean useZeroCopy;
-
-  public EncodedReaderImpl(FileSystem fileSystem, Path path, long fileId, boolean useZeroCopy,
-      List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate,
-      LowLevelCache cache, LowLevelCacheCounters qfCounters,
-      Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
+  private boolean isDebugTracingEnabled;
+
+  public EncodedReaderImpl(long fileId, List<OrcProto.Type> types, CompressionCodec codec,
+      int bufferSize, long strideRate, DataCache cache, DataReader dataReader) throws IOException {
     this.fileId = fileId;
-    this.fs = fileSystem;
-    this.path = path;
     this.codec = codec;
     this.types = types;
     this.bufferSize = bufferSize;
     this.rowIndexStride = strideRate;
     this.cache = cache;
-    this.qfCounters = qfCounters;
-    this.consumer = consumer;
-    this.useZeroCopy = useZeroCopy;
-    if (useZeroCopy && !cache.getAllocator().isDirectAlloc()) {
-      throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
-          + "buffers; either disable zero-copy or enable direct cache allocation");
-    }
+    this.dataReader = dataReader;
   }
 
   /** Helper context for each column being read */
@@ -228,7 +247,7 @@ public class EncodedReaderImpl implements EncodedReader {
     /** Iterators for the buffers; used to maintain position in per-rg reading. */
     DiskRangeList bufferIter;
     /** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
-    StreamBuffer stripeLevelStream;
+    ColumnStreamData stripeLevelStream;
 
     @Override
     public String toString() {
@@ -249,17 +268,15 @@ public class EncodedReaderImpl implements EncodedReader {
       } else {
         batchKey.set(fileId, stripeIx, rgIx);
       }
-      if (columnIxs == null || columnCount != columnIxs.length) {
-        columnIxs = new int[columnCount];
-        columnData = new StreamBuffer[columnCount][];
-      }
+      resetColumnArrays(columnCount);
     }
   }
 
   @Override
   public void readEncodedColumns(int stripeIx, StripeInformation stripe,
       RowIndex[] indexes, List<ColumnEncoding> encodings, List<Stream> streamList,
-      boolean[] included, boolean[][] colRgs) throws IOException {
+      boolean[] included, boolean[][] colRgs,
+      Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
     // Note: for now we don't have to setError here, caller will setError if we throw.
     // We are also not supposed to call setDone, since we are only part of the operation.
     long stripeOffset = stripe.getOffset();
@@ -267,8 +284,8 @@ public class EncodedReaderImpl implements EncodedReader {
     long offset = 0; // Stream offset in relation to the stripe.
     // 1.1. Figure out which columns have a present stream
     boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
-    if (DebugUtils.isTraceOrcEnabled()) {
-      LOG.info("The following columns have PRESENT streams: " + DebugUtils.toString(hasNull));
+    if (isDebugTracingEnabled) {
+      LOG.info("The following columns have PRESENT streams: " + arrayToString(hasNull));
     }
 
     // We assume stream list is sorted by column and that non-data
@@ -278,7 +295,7 @@ public class EncodedReaderImpl implements EncodedReader {
     ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
     boolean[] includedRgs = null;
     boolean isCompressed = (codec != null);
-    DiskRangeListCreateHelper listToRead = new DiskRangeListCreateHelper();
+    CreateHelper listToRead = new CreateHelper();
     boolean hasIndexOnlyCols = false;
     for (OrcProto.Stream stream : streamList) {
       long length = stream.getLength();
@@ -288,7 +305,7 @@ public class EncodedReaderImpl implements EncodedReader {
         // We have a stream for included column, but in future it might have no data streams.
         // It's more like "has at least one column included that has an index stream".
         hasIndexOnlyCols = hasIndexOnlyCols | included[colIx];
-        if (DebugUtils.isTraceOrcEnabled()) {
+        if (isDebugTracingEnabled) {
           LOG.info("Skipping stream: " + streamKind + " at " + offset + ", " + length);
         }
         offset += length;
@@ -302,7 +319,7 @@ public class EncodedReaderImpl implements EncodedReader {
         includedRgs = colRgs[colRgIx];
         ctx = colCtxs[colRgIx] = COLCTX_POOL.take();
         ctx.init(colIx, encodings.get(colIx), indexes[colIx]);
-        if (DebugUtils.isTraceOrcEnabled()) {
+        if (isDebugTracingEnabled) {
           LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
         }
       } else {
@@ -312,13 +329,13 @@ public class EncodedReaderImpl implements EncodedReader {
       int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
           types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
       ctx.addStream(offset, stream, indexIx);
-      if (DebugUtils.isTraceOrcEnabled()) {
+      if (isDebugTracingEnabled) {
         LOG.info("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
             + ", " + length + ", index position " + indexIx);
       }
       if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) {
         RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true);
-        if (DebugUtils.isTraceOrcEnabled()) {
+        if (isDebugTracingEnabled) {
           LOG.info("Will read whole stream " + streamKind + "; added to " + listToRead.getTail());
         }
       } else {
@@ -344,35 +361,23 @@ public class EncodedReaderImpl implements EncodedReader {
     }
 
     // 2. Now, read all of the ranges from cache or disk.
-    CacheListHelper toRead = new CacheListHelper(listToRead.get());
-    if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
-        && LOG.isInfoEnabled()) {
+    DiskRangeList.MutateHelper toRead = new DiskRangeList.MutateHelper(listToRead.get());
+    if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
       LOG.info("Resulting disk ranges to read (file " + fileId + "): "
           + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
-    cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY, qfCounters);
-    if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
-        && LOG.isInfoEnabled()) {
+    BooleanRef result = new BooleanRef();
+    cache.getFileData(fileId, toRead.next, stripeOffset, CC_FACTORY, result);
+    if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
       LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
           + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
 
-    if (!toRead.didGetAllData) {
-      long startTime = qfCounters.startTimeCounter();
-      if (this.file == null) {
-        this.file = fs.open(path);
-        this.pool = useZeroCopy ? new ByteBufferAllocatorPool() : null;
-        this.zcr = useZeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null;
-      }
-      // Force direct buffers if we will be decompressing to direct cache.
-      RecordReaderUtils.readDiskRanges(
-          file, zcr, stripeOffset, toRead.next, cache.getAllocator().isDirectAlloc());
-      qfCounters.recordHdfsTime(startTime);
-      if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
-          && LOG.isInfoEnabled()) {
-        LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + stripeOffset
-              + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
+    if (!result.value) {
+      if (isDataReaderOpen) {
+        this.dataReader.open();
       }
+      dataReader.readFileData(toRead.next, stripeOffset, cache.getAllocator().isDirectAlloc());
     }
 
     // 3. For uncompressed case, we need some special processing before read.
@@ -382,15 +387,14 @@ public class EncodedReaderImpl implements EncodedReader {
         ColumnReadContext ctx = colCtxs[colIxMod];
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
           StreamContext sctx = ctx.streams[streamIx];
-          DiskRangeList newIter = InStream.preReadUncompressedStream(fileId, stripeOffset,
-              iter, sctx.offset, sctx.offset + sctx.length, zcr, cache, qfCounters);
+          DiskRangeList newIter = preReadUncompressedStream(
+              stripeOffset, iter, sctx.offset, sctx.offset + sctx.length);
           if (newIter != null) {
             iter = newIter;
           }
         }
       }
-      if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
-          && LOG.isInfoEnabled()) {
+      if (isDebugTracingEnabled) {
         LOG.info("Disk ranges after pre-read (file " + fileId + ", base offset "
             + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
       }
@@ -418,10 +422,10 @@ public class EncodedReaderImpl implements EncodedReader {
         ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
           StreamContext sctx = ctx.streams[streamIx];
-          StreamBuffer cb = null;
+          ColumnStreamData cb = null;
           if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
             // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
-            if (DebugUtils.isTraceOrcEnabled()) {
+            if (isDebugTracingEnabled) {
               LOG.info("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
                   + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
             }
@@ -436,9 +440,9 @@ public class EncodedReaderImpl implements EncodedReader {
               // For stripe-level streams we don't need the extra refcount on the block.
               // See class comment about refcounts.
               long unlockUntilCOffset = sctx.offset + sctx.length;
-              DiskRangeList lastCached = InStream.readEncodedStream(fileId, stripeOffset, iter,
-                  sctx.offset, sctx.offset + sctx.length, zcr, codec, bufferSize, cache,
-                  sctx.stripeLevelStream, unlockUntilCOffset, sctx.offset, qfCounters);
+              DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
+                  sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
+                  unlockUntilCOffset, sctx.offset);
               if (lastCached != null) {
                 iter = lastCached;
               }
@@ -459,11 +463,11 @@ public class EncodedReaderImpl implements EncodedReader {
             cb = createRgStreamBuffer(
                 rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
             boolean isStartOfStream = sctx.bufferIter == null;
-            DiskRangeList lastCached = InStream.readEncodedStream(fileId, stripeOffset,
-                (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, zcr, codec,
-                bufferSize, cache, cb, unlockUntilCOffset, sctx.offset, qfCounters);
+            DiskRangeList lastCached = readEncodedStream(stripeOffset,
+                (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
+                unlockUntilCOffset, sctx.offset);
             if (lastCached != null) {
-              sctx.bufferIter = iter = lastCached; // Reset iter just to ensure it's valid
+              sctx.bufferIter = iter = lastCached;
             }
           }
           ecb.setStreamData(colIxMod, streamIx, cb);
@@ -475,25 +479,35 @@ public class EncodedReaderImpl implements EncodedReader {
     }
     releaseContexts(colCtxs);
 
-    if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
-        && LOG.isInfoEnabled()) {
+    if (isDebugTracingEnabled) {
       LOG.info("Disk ranges after preparing all the data "
           + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
 
     // Release the unreleased buffers. See class comment about refcounts.
     releaseInitialRefcounts(toRead.next);
-    InStream.releaseCacheChunksIntoObjectPool(toRead.next);
+    releaseCacheChunksIntoObjectPool(toRead.next);
+  }
+
+
+  private static String arrayToString(boolean[] a) {
+    StringBuilder b = new StringBuilder();
+    b.append('[');
+    for (int i = 0; i < a.length; ++i) {
+      b.append(a[i] ? "1" : "0");
+    }
+    b.append(']');
+    return b.toString();
   }
 
 
-  private StreamBuffer createRgStreamBuffer(int rgIx, boolean isLastRg,
+  private ColumnStreamData createRgStreamBuffer(int rgIx, boolean isLastRg,
       int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
-    StreamBuffer cb;
+    ColumnStreamData cb;
     cb = SB_POOL.take();
     cb.init(sctx.kind.getNumber());
     cb.incRef();
-    if (DebugUtils.isTraceOrcEnabled()) {
+    if (isDebugTracingEnabled) {
       LOG.info("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "")
           + "RG " + rgIx + " stream " + sctx.kind  + " at " + sctx.offset + ", "
           + sctx.length + " index position " + sctx.streamIndexOffset + ": " +
@@ -521,31 +535,869 @@ public class EncodedReaderImpl implements EncodedReader {
     while (current != null) {
       DiskRangeList toFree = current;
       current = current.next;
-      if (!(toFree instanceof TrackedCacheChunk)) continue;
-      TrackedCacheChunk cc = (TrackedCacheChunk)toFree;
-      if (cc.isReleased) continue;
-      LlapMemoryBuffer buffer = ((CacheChunk)toFree).buffer;
-      if (DebugUtils.isTraceLockingEnabled()) {
-        LOG.info("Unlocking " + buffer + " for the fetching thread at the end");
-      }
+      if (!(toFree instanceof CacheChunk)) continue;
+      CacheChunk cc = (CacheChunk)toFree;
+      if (cc.getBuffer() == null) continue;
+      MemoryBuffer buffer = cc.getBuffer();
       cache.releaseBuffer(buffer);
-      cc.isReleased = true;
+      cc.setBuffer(null);
     }
   }
 
+  @Override
+  public void setDebugTracing(boolean isEnabled) {
+    this.isDebugTracingEnabled = isEnabled;
+  }
+
 
   @Override
   public void close() throws IOException {
-    if (file != null) {
-      try {
-        file.close();
-      } catch (IOException ex) {
-        // Tez might have closed our filesystem. Log and ignore error.
-        LOG.info("Failed to close file; ignoring: " + ex.getMessage());
-      }
-    }
+    dataReader.close();
     if (pool != null) {
       pool.clear();
     }
   }
+
+  /** DiskRange containing encoded, uncompressed data from cache. */
+  @VisibleForTesting
+  public static class CacheChunk extends DiskRangeList {
+    protected MemoryBuffer buffer;
+
+    public CacheChunk() {
+      super(-1, -1);
+    }
+
+    public void init(MemoryBuffer buffer, long offset, long end) {
+      this.buffer = buffer;
+      this.offset = offset;
+      this.end = end;
+    }
+
+    @Override
+    public boolean hasData() {
+      return buffer != null;
+    }
+
+    @Override
+    public ByteBuffer getData() {
+      // Callers duplicate the buffer, they have to for BufferChunk
+      return buffer.getByteBufferRaw();
+    }
+
+    @Override
+    public String toString() {
+      return "start: " + offset + " end: " + end + " cache buffer: " + getBuffer();
+    }
+
+    @Override
+    public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
+      throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted ["
+          + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") ");
+    }
+
+    public MemoryBuffer getBuffer() {
+      return buffer;
+    }
+
+    public void setBuffer(MemoryBuffer buffer) {
+      this.buffer = buffer;
+    }
+
+    public void handleCacheCollision(DataCache cache,
+        MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
+      throw new UnsupportedOperationException();
+    }
+
+    public void reset() {
+      init(null, -1, -1);
+    }
+  }
+
+  /**
+   * Fake cache chunk used for uncompressed data. Used in preRead for uncompressed files.
+   * Makes assumptions about preRead code; for example, we add chunks here when they are
+   * already in the linked list, without unlinking. So, we record the start position in the
+   * original list, and then, when someone adds the next element, we merely increase the number
+   * of elements one has to traverse from that position to get the whole list.
+   */
+  private static class UncompressedCacheChunk extends CacheChunk {
+    private BufferChunk chunk;
+    private int count;
+
+    public UncompressedCacheChunk(BufferChunk bc) {
+      super();
+      init(null, bc.getOffset(), bc.getEnd());
+      chunk = bc;
+      count = 1;
+    }
+
+    public void addChunk(BufferChunk bc) {
+      assert bc.getOffset() == this.getEnd();
+      this.end = bc.getEnd();
+      ++count;
+    }
+
+    public BufferChunk getChunk() {
+      return chunk;
+    }
+
+    public int getCount() {
+      return count;
+    }
+
+    @Override
+    public void handleCacheCollision(DataCache cache,
+        MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
+      assert cacheBuffers == null;
+      // This is done at pre-read stage where there's nothing special w/refcounts. Just release.
+      cache.getAllocator().deallocate(getBuffer());
+      // Replace the buffer in our big range list, as well as in current results.
+      this.setBuffer(replacementBuffer);
+    }
+
+    public void clear() {
+      this.chunk = null;
+      this.count = -1;
+    }
+  }
+
+  /**
+   * CacheChunk that is pre-created for new cache data; initially, it contains an original disk
+   * buffer and an unallocated MemoryBuffer object. Before we expose it, the MB is allocated,
+   * the data is decompressed, and original compressed data is discarded. The chunk lives on in
+   * the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
+   */
+  private static class ProcCacheChunk extends CacheChunk {
+    public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed,
+        ByteBuffer originalData, MemoryBuffer targetBuffer, int originalCbIndex) {
+      super.init(targetBuffer, cbStartOffset, cbEndOffset);
+      this.isOriginalDataCompressed = isCompressed;
+      this.originalData = originalData;
+      this.originalCbIndex = originalCbIndex;
+    }
+
+    @Override
+    public void reset() {
+      super.reset();
+      this.originalData = null;
+    }
+
+    @Override
+    public void handleCacheCollision(DataCache cache, MemoryBuffer replacementBuffer,
+        List<MemoryBuffer> cacheBuffers) {
+      assert originalCbIndex >= 0;
+      // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
+      // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
+      // is not in cache.
+      cache.getAllocator().deallocate(getBuffer());
+      cache.reuseBuffer(replacementBuffer);
+      // Replace the buffer in our big range list, as well as in current results.
+      this.buffer = replacementBuffer;
+      cacheBuffers.set(originalCbIndex, replacementBuffer);
+      originalCbIndex = -1; // This can only happen once at decompress time.
+    }
+
+    /** Original data that will be turned into encoded cache data in this.buffer and reset. */
+    private ByteBuffer originalData = null;
+    /** Whether originalData is compressed. */
+    private boolean isOriginalDataCompressed;
+    /** Index of the MemoryBuffer corresponding to this object inside the result list. If we
+     * hit a cache collision, we will replace this memory buffer with the one from cache at
+     * this index, without having to look for it. */
+    private int originalCbIndex;
+  }
+
+  /**
+   * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
+   * and remove what we have returned. We will keep iterator as a "hint" point.
+   * @param baseOffset Absolute offset of boundaries and ranges relative to file, for cache keys.
+   * @param start Ordered ranges containing file data. Helpful if they point close to cOffset.
+   * @param cOffset Start offset to decompress.
+   * @param endCOffset End offset to decompress; estimate, partial CBs will be ignored.
+   * @param streamBuffer Stream buffer, to add the results.
+   * @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as
+   *                           they will not be used in future calls (see the class comment in
+   *                           EncodedReaderImpl about refcounts).
+   * @return Last buffer cached during decompression. Cache buffers are never removed from
+   *         the master list, so they are safe to keep as iterators for various streams.
+   */
+  public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
+      long endCOffset, ColumnStreamData streamBuffer, long unlockUntilCOffset, long streamOffset)
+          throws IOException {
+    if (streamBuffer.getCacheBuffers() == null) {
+      streamBuffer.setCacheBuffers(new ArrayList<MemoryBuffer>());
+    } else {
+      streamBuffer.getCacheBuffers().clear();
+    }
+    if (cOffset == endCOffset) return null;
+    boolean isCompressed = codec != null;
+    List<ProcCacheChunk> toDecompress = null;
+    List<ByteBuffer> toRelease = null;
+    if (isCompressed) {
+      toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
+      toDecompress = new ArrayList<ProcCacheChunk>();
+    }
+
+    // 1. Find our bearings in the stream. Normally, iter will already point either to where we
+    // want to be, or just before. However, RGs can overlap due to encoding, so we may have
+    // to return to a previous block.
+    DiskRangeList current = findExactPosition(start, cOffset);
+    if (isDebugTracingEnabled) {
+      LOG.info("Starting read for [" + cOffset + "," + endCOffset + ") at " + current);
+    }
+
+    CacheChunk lastUncompressed = null;
+
+    // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
+    lastUncompressed = isCompressed ?
+        prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
+            unlockUntilCOffset, current, streamBuffer, toRelease, toDecompress)
+      : prepareRangesForUncompressedRead(
+          cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, streamBuffer);
+
+    // 3. Allocate the buffers, prepare cache keys.
+    // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
+    // data and some unallocated membufs for decompression. toDecompress contains all the work we
+    // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
+    // has also been adjusted to point to these buffers instead of compressed data for the ranges.
+    if (toDecompress == null) return lastUncompressed; // Nothing to decompress.
+
+    MemoryBuffer[] targetBuffers = new MemoryBuffer[toDecompress.size()];
+    DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
+    int ix = 0;
+    for (ProcCacheChunk chunk : toDecompress) {
+      cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
+      targetBuffers[ix] = chunk.getBuffer();
+      ++ix;
+    }
+    cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
+
+    // 4. Now decompress (or copy) the data into cache buffers.
+    for (ProcCacheChunk chunk : toDecompress) {
+      ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
+      if (chunk.isOriginalDataCompressed) {
+        decompressChunk(chunk.originalData, codec, dest);
+      } else {
+        copyUncompressedChunk(chunk.originalData, dest);
+      }
+
+      chunk.originalData = null;
+      if (isDebugTracingEnabled) {
+        LOG.info("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
+      }
+      cache.reuseBuffer(chunk.getBuffer());
+    }
+
+    // 5. Release original compressed buffers to zero-copy reader if needed.
+    if (toRelease != null) {
+      assert dataReader.isTrackingDiskRanges();
+      for (ByteBuffer buffer : toRelease) {
+        dataReader.releaseBuffer(buffer);
+      }
+    }
+
+    // 6. Finally, put uncompressed data to cache.
+    long[] collisionMask = cache.putFileData(fileId, cacheKeys, targetBuffers, baseOffset);
+    processCacheCollisions(collisionMask, toDecompress, targetBuffers, streamBuffer.getCacheBuffers());
+
+    // 7. It may happen that we know we won't use some compression buffers anymore.
+    //    Release initial refcounts.
+    for (ProcCacheChunk chunk : toDecompress) {
+      ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
+    }
+
+    return lastUncompressed;
+  }
+
+  private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer,
+      List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException {
+    if (cOffset > current.getOffset()) {
+      // Target compression block is in the middle of the range; slice the range in two.
+      current = current.split(cOffset).next;
+    }
+    long currentOffset = cOffset;
+    CacheChunk lastUncompressed = null;
+    while (true) {
+      DiskRangeList next = null;
+      if (current instanceof CacheChunk) {
+        // 2a. This is a decoded compression buffer, add as is.
+        CacheChunk cc = (CacheChunk)current;
+        if (isDebugTracingEnabled) {
+          LOG.info("Locking " + cc.getBuffer() + " due to reuse");
+        }
+        cache.reuseBuffer(cc.getBuffer());
+        streamBuffer.getCacheBuffers().add(cc.getBuffer());
+        currentOffset = cc.getEnd();
+        if (isDebugTracingEnabled) {
+          LOG.info("Adding an already-uncompressed buffer " + cc.getBuffer());
+        }
+        ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, cc);
+        lastUncompressed = cc;
+        next = current.next;
+      } else if (current instanceof IncompleteCb)  {
+        // 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates.
+        if (isDebugTracingEnabled) {
+          LOG.info("Cannot read " + current);
+        }
+        next = null;
+        currentOffset = -1;
+      } else {
+        // 2c. This is a compressed buffer. We need to uncompress it; the buffer can comprise
+        // several disk ranges, so we might need to combine them.
+        BufferChunk bc = (BufferChunk)current;
+        ProcCacheChunk newCached = addOneCompressionBuffer(
+            bc, streamBuffer.getCacheBuffers(), toDecompress, toRelease);
+        lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
+        next = (newCached != null) ? newCached.next : null;
+        currentOffset = (next != null) ? next.getOffset() : -1;
+      }
+
+      if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
+        break;
+      }
+      current = next;
+    }
+    return lastUncompressed;
+  }
+
+  private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
+      long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer)
+          throws IOException {
+    long currentOffset = cOffset;
+    CacheChunk lastUncompressed = null;
+    boolean isFirst = true;
+    while (true) {
+      DiskRangeList next = null;
+      assert current instanceof CacheChunk;
+      lastUncompressed = (CacheChunk)current;
+      if (isDebugTracingEnabled) {
+        LOG.info("Locking " + lastUncompressed.getBuffer() + " due to reuse");
+      }
+      cache.reuseBuffer(lastUncompressed.getBuffer());
+      if (isFirst) {
+        streamBuffer.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
+        isFirst = false;
+      }
+      streamBuffer.getCacheBuffers().add(lastUncompressed.getBuffer());
+      currentOffset = lastUncompressed.getEnd();
+      if (isDebugTracingEnabled) {
+        LOG.info("Adding an uncompressed buffer " + lastUncompressed.getBuffer());
+      }
+      ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, lastUncompressed);
+      next = current.next;
+      if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
+        break;
+      }
+      current = next;
+    }
+    return lastUncompressed;
+  }
+
+  /**
+   * To achieve some sort of consistent cache boundaries, we will cache streams deterministically;
+   * in segments starting w/stream start, and going for either stream size or maximum allocation.
+   * If we are not reading the entire segment's worth of data, then we will not cache the partial
+   * RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA
+   * to handle just for this case.
+   * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
+   * allocator. Uncompressed case is not mainline though so let's not complicate it.
+   * @param qfCounters
+   */
+  private DiskRangeList preReadUncompressedStream(long baseOffset,
+      DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
+    if (streamOffset == streamEnd) return null;
+    List<UncompressedCacheChunk> toCache = null;
+    List<ByteBuffer> toRelease = null;
+
+    // 1. Find our bearings in the stream.
+    DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
+    if (isDebugTracingEnabled) {
+      LOG.info("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current);
+    }
+
+    if (streamOffset > current.getOffset()) {
+      // Target compression block is in the middle of the range; slice the range in two.
+      current = current.split(streamOffset).next;
+    }
+    // Account for maximum cache buffer size.
+    long streamLen = streamEnd - streamOffset;
+    int partSize = cache.getAllocator().getMaxAllocation(),
+        partCount = (int)((streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0));
+    long partOffset = streamOffset, partEnd = Math.min(partOffset + partSize, streamEnd);
+
+    CacheChunk lastUncompressed = null;
+    MemoryBuffer[] singleAlloc = new MemoryBuffer[1];
+    for (int i = 0; i < partCount; ++i) {
+      long hasEntirePartTo = -1;
+      if (partOffset == current.getOffset()) {
+        hasEntirePartTo = partOffset;
+        // We assume cache chunks would always match the way we read, so check and skip it.
+        if (current instanceof CacheChunk) {
+          lastUncompressed = (CacheChunk)current;
+          assert current.getOffset() == partOffset && current.getEnd() == partEnd;
+          partOffset = partEnd;
+          partEnd = Math.min(partOffset + partSize, streamEnd);
+          continue;
+        }
+      }
+      if (current.getOffset() >= partEnd) {
+        // We have no data at all for this part of the stream (could be unneeded), skip.
+        partOffset = partEnd;
+        partEnd = Math.min(partOffset + partSize, streamEnd);
+        continue;
+      }
+      if (toRelease == null && dataReader.isTrackingDiskRanges()) {
+        toRelease = new ArrayList<ByteBuffer>();
+      }
+      // We have some disk buffers... see if we have entire part, etc.
+      UncompressedCacheChunk candidateCached = null;
+      DiskRangeList next = current;
+      while (true) {
+        if (next == null || next.getOffset() >= partEnd) {
+          if (hasEntirePartTo < partEnd && candidateCached != null) {
+            // We are missing a section at the end of the part...
+            lastUncompressed = copyAndReplaceCandidateToNonCached(
+                candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
+            candidateCached = null;
+          }
+          break;
+        }
+        current = next;
+        boolean wasSplit = (current.getEnd() > partEnd);
+        if (wasSplit) {
+          current = current.split(partEnd);
+        }
+        if (isDebugTracingEnabled) {
+          LOG.info("Processing uncompressed file data at ["
+              + current.getOffset() + ", " + current.getEnd() + ")");
+        }
+        BufferChunk bc = (BufferChunk)current;
+        if (!wasSplit && toRelease != null) {
+          toRelease.add(bc.chunk); // TODO: is it valid to give zcr the modified 2nd part?
+        }
+
+        // Track if we still have the entire part.
+        long hadEntirePartTo = hasEntirePartTo;
+        if (hasEntirePartTo != -1) {
+          hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
+        }
+        if (candidateCached != null && hasEntirePartTo == -1) {
+          lastUncompressed = copyAndReplaceCandidateToNonCached(
+              candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
+          candidateCached = null;
+        }
+
+        if (hasEntirePartTo != -1) {
+          // So far we have all the data from the beginning of the part.
+          if (candidateCached == null) {
+            candidateCached = new UncompressedCacheChunk(bc);
+          } else {
+            candidateCached.addChunk(bc);
+          }
+          // We will take care of this at the end of the part, or if we find a gap.
+          next = current.next;
+          continue;
+        }
+        // We don't have the entire part; just copy to an allocated buffer. We could try to
+        // optimize a bit if we have contiguous buffers with gaps, but it's probably not needed.
+        lastUncompressed = copyAndReplaceUncompressedToNonCached(bc, cache, singleAlloc);
+        next = lastUncompressed.next;
+      }
+      if (candidateCached != null) {
+        if (toCache == null) {
+          toCache = new ArrayList<>(partCount - i);
+        }
+        toCache.add(candidateCached);
+      }
+    }
+
+    // 3. Allocate the buffers, prepare cache keys.
+    if (toCache == null) return lastUncompressed; // Nothing to copy and cache.
+
+    MemoryBuffer[] targetBuffers =
+        toCache.size() == 1 ? singleAlloc : new MemoryBuffer[toCache.size()];
+    targetBuffers[0] = null;
+    DiskRange[] cacheKeys = new DiskRange[toCache.size()];
+    int ix = 0;
+    for (UncompressedCacheChunk chunk : toCache) {
+      cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
+      ++ix;
+    }
+    cache.getAllocator().allocateMultiple(
+        targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
+
+    // 4. Now copy the data into cache buffers.
+    ix = 0;
+    for (UncompressedCacheChunk candidateCached : toCache) {
+      candidateCached.setBuffer(targetBuffers[ix]);
+      ByteBuffer dest = candidateCached.getBuffer().getByteBufferRaw();
+      copyAndReplaceUncompressedChunks(candidateCached, dest, candidateCached);
+      candidateCached.clear();
+      lastUncompressed = candidateCached;
+      ++ix;
+    }
+
+    // 5. Release original compressed buffers to zero-copy reader if needed.
+    if (toRelease != null) {
+      assert dataReader.isTrackingDiskRanges();
+      for (ByteBuffer buf : toRelease) {
+        dataReader.releaseBuffer(buf);
+      }
+    }
+
+    // 6. Finally, put uncompressed data to cache.
+    long[] collisionMask = cache.putFileData(fileId, cacheKeys, targetBuffers, baseOffset);
+    processCacheCollisions(collisionMask, toCache, targetBuffers, null);
+
+    return lastUncompressed;
+  }
+
+  private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) {
+    int startPos = dest.position(), startLim = dest.limit();
+    dest.put(src); // Copy uncompressed data to cache.
+    // Put moves position forward by the size of the data.
+    int newPos = dest.position();
+    if (newPos > startLim) {
+      throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+          + ") became [" + newPos + ", " + dest.limit() + ")");
+    }
+    dest.position(startPos);
+    dest.limit(newPos);
+  }
+
+
+  private static CacheChunk copyAndReplaceCandidateToNonCached(
+      UncompressedCacheChunk candidateCached, long partOffset,
+      long candidateEnd, DataCache cache, MemoryBuffer[] singleAlloc) {
+    // We thought we had the entire part to cache, but we don't; convert start to
+    // non-cached. Since we are at the first gap, the previous stuff must be contiguous.
+    singleAlloc[0] = null;
+    cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
+
+    MemoryBuffer buffer = singleAlloc[0];
+    cache.reuseBuffer(buffer);
+    ByteBuffer dest = buffer.getByteBufferRaw();
+    CacheChunk tcc = TCC_POOL.take();
+    tcc.init(buffer, partOffset, candidateEnd);
+    copyAndReplaceUncompressedChunks(candidateCached, dest, tcc);
+    return tcc;
+  }
+
+  private static CacheChunk copyAndReplaceUncompressedToNonCached(
+      BufferChunk bc, DataCache cache, MemoryBuffer[] singleAlloc) {
+    singleAlloc[0] = null;
+    cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
+    MemoryBuffer buffer = singleAlloc[0];
+    cache.reuseBuffer(buffer);
+    ByteBuffer dest = buffer.getByteBufferRaw();
+    CacheChunk tcc = TCC_POOL.take();
+    tcc.init(buffer, bc.getOffset(), bc.getEnd());
+    copyUncompressedChunk(bc.chunk, dest);
+    bc.replaceSelfWith(tcc);
+    return tcc;
+  }
+
+  private static void copyAndReplaceUncompressedChunks(
+      UncompressedCacheChunk candidateCached, ByteBuffer dest, CacheChunk tcc) {
+    int startPos = dest.position(), startLim = dest.limit();
+    BufferChunk chunk = candidateCached.getChunk();
+    for (int i = 0; i < candidateCached.getCount(); ++i) {
+      dest.put(chunk.getData());
+      BufferChunk next = (BufferChunk)(chunk.next);
+      if (i == 0) {
+        chunk.replaceSelfWith(tcc);
+      } else {
+        chunk.removeSelf();
+      }
+      chunk = next;
+    }
+    int newPos = dest.position();
+    if (newPos > startLim) {
+      throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+          + ") became [" + newPos + ", " + dest.limit() + ")");
+    }
+    dest.position(startPos);
+    dest.limit(newPos);
+  }
+
+  private static void decompressChunk(
+      ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException {
+    int startPos = dest.position(), startLim = dest.limit();
+    codec.decompress(src, dest);
+    // Codec resets the position to 0 and limit to correct limit.
+    dest.position(startPos);
+    int newLim = dest.limit();
+    if (newLim > startLim) {
+      throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim
+          + ") became [" + dest.position() + ", " + newLim + ")");
+    }
+  }
+
+  public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
+    while (current != null) {
+      if (current instanceof ProcCacheChunk) {
+        PCC_POOL.offer((ProcCacheChunk)current);
+      } else if (current instanceof CacheChunk) {
+        TCC_POOL.offer((CacheChunk)current);
+      }
+      current = current.next;
+    }
+  }
+
+  private void ponderReleaseInitialRefcount(
+      long unlockUntilCOffset, long streamStartOffset, CacheChunk cc) {
+    if (cc.getEnd() > unlockUntilCOffset) return;
+    assert cc.getBuffer() != null;
+    releaseInitialRefcount(cc, false);
+    // Release all the previous buffers that we may not have been able to release due to reuse.
+    DiskRangeList prev = cc.prev;
+    while (true) {
+      if ((prev == null) || (prev.getEnd() <= streamStartOffset)
+          || !(prev instanceof CacheChunk)) break;
+      CacheChunk prevCc = (CacheChunk)prev;
+      if (prevCc.buffer == null) break;
+      releaseInitialRefcount(prevCc, true);
+      prev = prev.prev;
+    }
+  }
+
+  private void releaseInitialRefcount(CacheChunk cc, boolean isBacktracking) {
+    // This is the last RG for which this buffer will be used. Remove the initial refcount
+    if (isDebugTracingEnabled) {
+      LOG.info("Unlocking " + cc.getBuffer() + " for the fetching thread"
+          + (isBacktracking ? "; backtracking" : ""));
+    }
+    cache.releaseBuffer(cc.getBuffer());
+    cc.setBuffer(null);
+  }
+
+  private void processCacheCollisions(long[] collisionMask,
+      List<? extends CacheChunk> toDecompress, MemoryBuffer[] targetBuffers,
+      List<MemoryBuffer> cacheBuffers) {
+    if (collisionMask == null) return;
+    assert collisionMask.length >= (toDecompress.size() >>> 6);
+    // There are some elements that were cached in parallel, take care of them.
+    long maskVal = -1;
+    for (int i = 0; i < toDecompress.size(); ++i) {
+      if ((i & 63) == 0) {
+        maskVal = collisionMask[i >>> 6];
+      }
+      if ((maskVal & 1) == 1) {
+        // Cache has found an old buffer for the key and put it into array instead of our new one.
+        CacheChunk replacedChunk = toDecompress.get(i);
+        MemoryBuffer replacementBuffer = targetBuffers[i];
+        if (isDebugTracingEnabled) {
+          LOG.info("Discarding data due to cache collision: " + replacedChunk.getBuffer()
+              + " replaced with " + replacementBuffer);
+        }
+        assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results "
+            + "even though mask is [" + Long.toBinaryString(maskVal) + "]";
+        replacedChunk.handleCacheCollision(cache, replacementBuffer, cacheBuffers);
+      }
+      maskVal >>= 1;
+    }
+  }
+
+
+  /** Finds compressed offset in a stream and makes sure iter points to its position.
+     This may be necessary for obscure combinations of compression and encoding boundaries. */
+  private static DiskRangeList findExactPosition(DiskRangeList ranges, long offset) {
+    if (offset < 0) return ranges;
+    return findIntersectingPosition(ranges, offset, offset);
+  }
+
+  private static DiskRangeList findIntersectingPosition(DiskRangeList ranges, long offset, long end) {
+    if (offset < 0) return ranges;
+    // We expect the offset to be valid TODO: rather, validate
+    while (ranges.getEnd() <= offset) {
+      ranges = ranges.next;
+    }
+    while (ranges.getOffset() > end) {
+      ranges = ranges.prev;
+    }
+    // We are now on some intersecting buffer, find the first intersecting buffer.
+    while (ranges.prev != null && ranges.prev.getEnd() > offset) {
+      ranges = ranges.prev;
+    }
+    return ranges;
+  }
+
+  private static class IncompleteCb extends DiskRangeList {
+    public IncompleteCb(long offset, long end) {
+      super(offset, end);
+    }
+
+    @Override
+    public String toString() {
+      return "incomplete CB start: " + offset + " end: " + end;
+    }
+  }
+
+  /**
+   * Reads one compression block from the source; handles compression blocks read from
+   * multiple ranges (usually, that would only happen with zcr).
+   * Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does).
+   * @param current BufferChunk where compression block starts.
+   * @param ranges Iterator of all chunks, pointing at current.
+   * @param cacheBuffers The result buffer array to add pre-allocated target cache buffer.
+   * @param toDecompress The list of work to decompress - pairs of compressed buffers and the 
+   *                     target buffers (same as the ones added to cacheBuffers).
+   * @param toRelease The list of buffers to release to zcr because they are no longer in use.
+   * @return The resulting cache chunk.
+   */
+  private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
+      List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
+      List<ByteBuffer> toRelease) throws IOException {
+    ByteBuffer slice = null;
+    ByteBuffer compressed = current.chunk;
+    long cbStartOffset = current.getOffset();
+    int b0 = compressed.get() & 0xff;
+    int b1 = compressed.get() & 0xff;
+    int b2 = compressed.get() & 0xff;
+    int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+    if (chunkLength > bufferSize) {
+      throw new IllegalArgumentException("Buffer size too small. size = " +
+          bufferSize + " needed = " + chunkLength);
+    }
+    int consumedLength = chunkLength + OutStream.HEADER_SIZE;
+    long cbEndOffset = cbStartOffset + consumedLength;
+    boolean isUncompressed = ((b0 & 0x01) == 1);
+    if (isDebugTracingEnabled) {
+      LOG.info("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total "
+          + consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed");
+    }
+    if (compressed.remaining() >= chunkLength) {
+      // Simple case - CB fits entirely in the disk range.
+      slice = compressed.slice();
+      slice.limit(chunkLength);
+      ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
+          cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
+      if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+        toRelease.add(compressed);
+      }
+      return cc;
+    }
+    if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
+      addIncompleteCompressionBuffer(cbStartOffset, current, 0);
+      return null; // This is impossible to read from this chunk.
+    }
+
+    // TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
+    // We need to consolidate 2 or more buffers into one to decompress.
+    ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+    int remaining = chunkLength - compressed.remaining();
+    int originalPos = compressed.position();
+    copy.put(compressed);
+    if (isDebugTracingEnabled) {
+      LOG.info("Removing partial CB " + current + " from ranges after copying its contents");
+    }
+    DiskRangeList next = current.next;
+    current.removeSelf();
+    if (dataReader.isTrackingDiskRanges()) {
+      if (originalPos == 0) {
+        dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+      } else {
+        toRelease.add(compressed); // There might be slices depending on this buffer.
+      }
+    }
+
+    int extraChunkCount = 0;
+    while (true) {
+      if (!(next instanceof BufferChunk)) {
+        throw new IOException("Trying to extend compressed block into uncompressed block " + next);
+      }
+      compressed = next.getData();
+      ++extraChunkCount;
+      if (compressed.remaining() >= remaining) {
+        // This is the last range for this compression block. Yay!
+        slice = compressed.slice();
+        slice.limit(remaining);
+        copy.put(slice);
+        ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
+            cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
+        if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+          dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+        }
+        return cc;
+      }
+      remaining -= compressed.remaining();
+      copy.put(compressed);
+      if (dataReader.isTrackingDiskRanges()) {
+        dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+      }
+      DiskRangeList tmp = next;
+      next = next.hasContiguousNext() ? next.next : null;
+      if (next != null) {
+        if (isDebugTracingEnabled) {
+          LOG.info("Removing partial CB " + tmp + " from ranges after copying its contents");
+        }
+        tmp.removeSelf();
+      } else {
+        addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount);
+        return null; // This is impossible to read from this chunk.
+      }
+    }
+  }
+
+  private void addIncompleteCompressionBuffer(
+      long cbStartOffset, DiskRangeList target, int extraChunkCount) {
+    IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
+    if (isDebugTracingEnabled) {
+      LOG.info("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with "
+          + icb + " in the buffers");
+    }
+    target.replaceSelfWith(icb);
+  }
+
+  /**
+   * Add one buffer with compressed data the results for addOneCompressionBuffer (see javadoc).
+   * @param fullCompressionBlock (fCB) Entire compression block, sliced or copied from disk data.
+   * @param isUncompressed Whether the data in the block is uncompressed.
+   * @param cbStartOffset Compressed start offset of the fCB.
+   * @param cbEndOffset Compressed end offset of the fCB.
+   * @param lastRange The buffer from which the last (or all) bytes of fCB come.
+   * @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock.
+   * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange.
+   * @param lastChunk 
+   * @param toDecompress See addOneCompressionBuffer.
+   * @param cacheBuffers See addOneCompressionBuffer.
+   * @return New cache buffer.
+   */
+  private ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
+      boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength,
+      BufferChunk lastChunk, List<ProcCacheChunk> toDecompress, List<MemoryBuffer> cacheBuffers) {
+    // Prepare future cache buffer.
+    MemoryBuffer futureAlloc = cache.getAllocator().createUnallocated();
+    // Add it to result in order we are processing.
+    cacheBuffers.add(futureAlloc);
+    // Add it to the list of work to decompress.
+    ProcCacheChunk cc = PCC_POOL.take();
+    cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
+        fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
+    toDecompress.add(cc);
+    // Adjust the compression block position.
+    if (isDebugTracingEnabled) {
+      LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes");
+    }
+    lastChunk.chunk.position(lastChunk.chunk.position() + lastChunkLength);
+    // Finally, put it in the ranges list for future use (if shared between RGs).
+    // Before anyone else accesses it, it would have been allocated and decompressed locally.
+    if (lastChunk.chunk.remaining() <= 0) {
+      if (isDebugTracingEnabled) {
+        LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
+      }
+      lastChunk.replaceSelfWith(cc);
+    } else {
+      if (isDebugTracingEnabled) {
+        LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers");
+      }
+      lastChunk.insertPartBefore(cc);
+    }
+    return cc;
+  }
+
+  private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+    return isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
index 9e2d281..c546e22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
@@ -20,8 +20,9 @@ package org.apache.hadoop.hive.ql.io.orc;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
 
 /**
  *
@@ -71,7 +72,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -88,9 +89,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
-      private EncodedColumnBatch.StreamBuffer nanosStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData nanosStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
       private boolean skipCorrupt;
@@ -105,17 +106,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setSecondsStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setSecondsStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
 
-      public StreamReaderBuilder setNanosStream(EncodedColumnBatch.StreamBuffer secondaryStream) {
+      public StreamReaderBuilder setNanosStream(ColumnStreamData secondaryStream) {
         this.nanosStream = secondaryStream;
         return this;
       }
@@ -222,7 +223,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -251,10 +252,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
-      private EncodedColumnBatch.StreamBuffer dictionaryStream;
-      private EncodedColumnBatch.StreamBuffer lengthStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData dictionaryStream;
+      private ColumnStreamData lengthStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
 
@@ -268,22 +269,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
 
-      public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+      public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
         this.lengthStream = lengthStream;
         return this;
       }
 
-      public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+      public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
         this.dictionaryStream = dictStream;
         return this;
       }
@@ -360,7 +361,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -374,8 +375,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
 
@@ -389,12 +390,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
@@ -463,7 +464,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -477,8 +478,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
       private boolean skipCorrupt;
@@ -493,12 +494,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
@@ -572,7 +573,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -586,8 +587,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
 
@@ -601,12 +602,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
@@ -675,7 +676,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -689,8 +690,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
 
       public StreamReaderBuilder setFileId(Long fileId) {
@@ -703,12 +704,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
@@ -771,7 +772,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -785,8 +786,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
 
       public StreamReaderBuilder setFileId(Long fileId) {
@@ -799,12 +800,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
@@ -878,7 +879,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -895,9 +896,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer valueStream;
-      private EncodedColumnBatch.StreamBuffer scaleStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData valueStream;
+      private ColumnStreamData scaleStream;
       private int scale;
       private int precision;
       private CompressionCodec compressionCodec;
@@ -923,17 +924,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setValueStream(EncodedColumnBatch.StreamBuffer valueStream) {
+      public StreamReaderBuilder setValueStream(ColumnStreamData valueStream) {
         this.valueStream = valueStream;
         return this;
       }
 
-      public StreamReaderBuilder setScaleStream(EncodedColumnBatch.StreamBuffer scaleStream) {
+      public StreamReaderBuilder setScaleStream(ColumnStreamData scaleStream) {
         this.scaleStream = scaleStream;
         return this;
       }
@@ -1004,7 +1005,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -1018,8 +1019,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
 
@@ -1033,12 +1034,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
@@ -1138,7 +1139,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -1168,10 +1169,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private Long fileId;
       private int columnIndex;
       private int maxLength;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
-      private EncodedColumnBatch.StreamBuffer dictionaryStream;
-      private EncodedColumnBatch.StreamBuffer lengthStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData dictionaryStream;
+      private ColumnStreamData lengthStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
 
@@ -1190,22 +1191,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
 
-      public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+      public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
         this.lengthStream = lengthStream;
         return this;
       }
 
-      public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+      public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
         this.dictionaryStream = dictStream;
         return this;
       }
@@ -1312,7 +1313,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -1342,10 +1343,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       private Long fileId;
       private int columnIndex;
       private int maxLength;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
-      private EncodedColumnBatch.StreamBuffer dictionaryStream;
-      private EncodedColumnBatch.StreamBuffer lengthStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData dictionaryStream;
+      private ColumnStreamData lengthStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
 
@@ -1364,22 +1365,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
 
-      public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+      public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
         this.lengthStream = lengthStream;
         return this;
       }
 
-      public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+      public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
         this.dictionaryStream = dictStream;
         return this;
       }
@@ -1455,7 +1456,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -1469,8 +1470,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
 
       public StreamReaderBuilder setFileId(Long fileId) {
@@ -1483,12 +1484,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
@@ -1561,7 +1562,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -1578,9 +1579,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
-      private EncodedColumnBatch.StreamBuffer lengthStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
+      private ColumnStreamData lengthStream;
       private CompressionCodec compressionCodec;
       private OrcProto.ColumnEncoding columnEncoding;
 
@@ -1594,17 +1595,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
 
-      public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer secondaryStream) {
+      public StreamReaderBuilder setLengthStream(ColumnStreamData secondaryStream) {
         this.lengthStream = secondaryStream;
         return this;
       }
@@ -1673,7 +1674,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     }
 
     @Override
-    public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+    public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
         throws IOException {
       super.setBuffers(buffers, sameStripe);
       if (_presentStream != null) {
@@ -1687,8 +1688,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
     public static class StreamReaderBuilder {
       private Long fileId;
       private int columnIndex;
-      private EncodedColumnBatch.StreamBuffer presentStream;
-      private EncodedColumnBatch.StreamBuffer dataStream;
+      private ColumnStreamData presentStream;
+      private ColumnStreamData dataStream;
       private CompressionCodec compressionCodec;
 
       public StreamReaderBuilder setFileId(Long fileId) {
@@ -1701,12 +1702,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
         return this;
       }
 
-      public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
         this.presentStream = presentStream;
         return this;
       }
 
-      public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
         this.dataStream = dataStream;
         return this;
       }
@@ -1740,11 +1741,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       List<OrcProto.ColumnEncoding> encodings,
       EncodedColumnBatch<OrcBatchKey> batch,
       CompressionCodec codec, boolean skipCorrupt) throws IOException {
-    long file = batch.batchKey.file;
+    long file = batch.getBatchKey().file;
     TreeReader[] treeReaders = new TreeReader[numCols];
     for (int i = 0; i < numCols; i++) {
-      int columnIndex = batch.columnIxs[i];
-      EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
+      int columnIndex = batch.getColumnIxs()[i];
+      ColumnStreamData[] streamBuffers = batch.getColumnData()[i];
       OrcProto.Type columnType = types.get(columnIndex);
 
       // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
@@ -1756,13 +1757,13 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
       OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
 
       // stream buffers are arranged in enum order of stream kind
-      EncodedColumnBatch.StreamBuffer present = null;
-      EncodedColumnBatch.StreamBuffer data = null;
-      EncodedColumnBatch.StreamBuffer dictionary = null;
-      EncodedColumnBatch.StreamBuffer lengths = null;
-      EncodedColumnBatch.StreamBuffer secondary = null;
-      for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
-        switch (streamBuffer.streamKind) {
+      ColumnStreamData present = null;
+      ColumnStreamData data = null;
+      ColumnStreamData dictionary = null;
+      ColumnStreamData lengths = null;
+      ColumnStreamData secondary = null;
+      for (ColumnStreamData streamBuffer : streamBuffers) {
+        switch (streamBuffer.getStreamKind()) {
           case 0:
             // PRESENT stream
             present = streamBuffer;
@@ -1784,7 +1785,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
             secondary = streamBuffer;
             break;
           default:
-            throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
+            throw new IOException("Unexpected stream kind: " + streamBuffer.getStreamKind());
         }
       }