You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/17 02:59:12 UTC

[4/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)

HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/682a3c7b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/682a3c7b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/682a3c7b

Branch: refs/heads/master-15147
Commit: 682a3c7b46aec9e43275551698fa6ba9c7ac5d7c
Parents: 7f46c8d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 16 18:57:28 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 16 18:57:28 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    7 +
 .../apache/hadoop/hive/llap/io/api/LlapIo.java  |    3 +-
 .../llap/IncrementalObjectSizeEstimator.java    |    2 +-
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |    3 +-
 .../hive/llap/cache/EvictionDispatcher.java     |   11 +-
 .../hadoop/hive/llap/cache/FileCache.java       |  107 ++
 .../hive/llap/cache/FileCacheCleanupThread.java |  104 ++
 .../hadoop/hive/llap/cache/LlapDataBuffer.java  |    7 +
 .../hive/llap/cache/LowLevelCacheImpl.java      |  218 +--
 .../hive/llap/cache/SerDeLowLevelCacheImpl.java |  716 ++++++++++
 .../hive/llap/io/api/impl/LlapInputFormat.java  |  348 +----
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   26 +-
 .../hive/llap/io/api/impl/LlapRecordReader.java |  335 +++++
 .../llap/io/decode/ColumnVectorProducer.java    |   11 +-
 .../llap/io/decode/EncodedDataConsumer.java     |    7 +
 .../io/decode/GenericColumnVectorProducer.java  |  201 +++
 .../llap/io/decode/LlapTextInputFormat.java     |   33 +
 .../llap/io/decode/OrcColumnVectorProducer.java |   12 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   95 +-
 .../hive/llap/io/decode/ReadPipeline.java       |    2 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java | 1248 ++++++++++++++++++
 .../llap/io/metadata/ConsumerFileMetadata.java  |   31 +
 .../io/metadata/ConsumerStripeMetadata.java     |   35 +
 .../hive/llap/io/metadata/OrcFileMetadata.java  |    8 +-
 .../hive/llap/io/metadata/OrcMetadataCache.java |    2 +-
 .../llap/io/metadata/OrcStripeMetadata.java     |   13 +-
 orc/src/java/org/apache/orc/OrcUtils.java       |   83 ++
 .../org/apache/orc/impl/PhysicalWriter.java     |    1 -
 .../org/apache/orc/impl/RecordReaderImpl.java   |    5 +
 .../org/apache/orc/impl/TreeReaderFactory.java  |    1 +
 .../java/org/apache/orc/impl/WriterImpl.java    |   99 +-
 .../org/apache/hadoop/hive/llap/DebugUtils.java |    1 +
 .../hadoop/hive/ql/exec/FetchOperator.java      |    2 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   |    4 +-
 .../hive/ql/io/CombineHiveRecordReader.java     |   55 +-
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   37 +-
 .../io/LlapWrappableInputFormatInterface.java   |    6 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |   10 +
 .../hadoop/hive/ql/io/orc/WriterImpl.java       |   20 +-
 .../hive/ql/io/orc/encoded/CacheChunk.java      |    4 +
 .../orc/encoded/EncodedTreeReaderFactory.java   |    2 +-
 .../hive/ql/io/orc/encoded/StreamUtils.java     |   13 +-
 .../ql/io/rcfile/stats/PartialScanMapper.java   |    5 +-
 ql/src/test/queries/clientpositive/llap_text.q  |   62 +
 .../test/results/clientpositive/llap_text.q.out |  502 +++++++
 .../hadoop/hive/common/io/DiskRangeList.java    |    6 +
 .../common/io/encoded/EncodedColumnBatch.java   |   17 +
 47 files changed, 3898 insertions(+), 622 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b4e89b0..9806105 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2900,6 +2900,13 @@ public class HiveConf extends Configuration {
     LLAP_IO_USE_FILEID_PATH("hive.llap.io.use.fileid.path", true,
         "Whether LLAP should use fileId (inode)-based path to ensure better consistency for the\n" +
         "cases of file overwrites. This is supported on HDFS."),
+    LLAP_IO_ENCODE_ALLOC_SIZE("hive.llap.io.encode.alloc.size", "256Kb", new SizeValidator(),
+        "Allocation size for the buffers used to cache encoded data from non-ORC files. Must\n" +
+        "be a power of two between " + LLAP_ALLOCATOR_MIN_ALLOC + " and\n" +
+        LLAP_ALLOCATOR_MAX_ALLOC + "."),
+    LLAP_IO_ENCODE_SLICE_ROW_COUNT("hive.llap.io.encode.slice.row.count", 100000,
+        "Row count to use to separate cache slices when caching encoded data from row-based\n" +
+        "inputs into LLAP cache."),
     LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
         "Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
     LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index d82757f..e5ab601 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -18,10 +18,11 @@
 
 package org.apache.hadoop.hive.llap.io.api;
 
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 
 public interface LlapIo<T> {
-  InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat);
+  InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat, Deserializer serde);
   void close();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
index 3efbcc2..ff6e7ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel;
  * during the actual estimation. TODO: clean up
  */
 public class IncrementalObjectSizeEstimator {
-  private static final JavaDataModel memoryModel = JavaDataModel.get();
+  public static final JavaDataModel memoryModel = JavaDataModel.get();
   private enum FieldType { PRIMITIVE_ARRAY, OBJECT_ARRAY, COLLECTION, MAP, OTHER };
 
   public static HashMap<Class<?>, ObjectEstimator> createEstimators(Object rootObj) {

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index d9d407d..8d7f0d3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -611,7 +611,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
       assert data != null;
       int headerIx = buffer.byteBuffer.position() >>> minAllocLog2,
           freeListIx = freeListFromHeader(headers[headerIx]);
-      assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2);
+      assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2)
+          : buffer.allocSize + " " + freeListIx;
       while (true) {
         FreeList freeList = freeLists[freeListIx];
         int bHeaderIx = headerIx ^ (1 << freeListIx);

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index b6fd3e3..2d3197c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -27,11 +27,16 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
  */
 public final class EvictionDispatcher implements EvictionListener {
   private final LowLevelCache dataCache;
+  private final SerDeLowLevelCacheImpl serdeCache;
   private final OrcMetadataCache metadataCache;
+  private final EvictionAwareAllocator allocator;
 
-  public EvictionDispatcher(LowLevelCache dataCache, OrcMetadataCache metadataCache) {
+  public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl serdeCache,
+      OrcMetadataCache metadataCache, EvictionAwareAllocator allocator) {
     this.dataCache = dataCache;
     this.metadataCache = metadataCache;
+    this.serdeCache = serdeCache;
+    this.allocator = allocator;
   }
 
   @Override
@@ -40,7 +45,11 @@ public final class EvictionDispatcher implements EvictionListener {
   }
 
   public void notifyEvicted(LlapDataBuffer buffer) {
+    // Note: we don't know which cache this is from, so we notify both. They can noop if they
+    //       want to find the buffer in their structures and can't.
     dataCache.notifyEvicted(buffer);
+    serdeCache.notifyEvicted(buffer);
+    allocator.deallocateEvicted(buffer);
   }
 
   public void notifyEvicted(OrcFileMetadata buffer) {

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
new file mode 100644
index 0000000..44b71c9
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Function;
+
+/** Class used for a single file in LowLevelCacheImpl, etc. */
+class FileCache<T> {
+  private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
+  private final T cache;
+  private final AtomicInteger refCount = new AtomicInteger(0);
+
+  private FileCache(T value) {
+    this.cache = value;
+  }
+
+  public T getCache() {
+    return cache;
+  }
+
+  boolean incRef() {
+    while (true) {
+      int value = refCount.get();
+      if (value == EVICTED_REFCOUNT) return false;
+      if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
+      assert value >= 0;
+      if (refCount.compareAndSet(value, value + 1)) return true;
+    }
+  }
+
+  void decRef() {
+    int value = refCount.decrementAndGet();
+    if (value < 0) {
+      throw new AssertionError("Unexpected refCount " + value);
+    }
+  }
+
+  boolean startEvicting() {
+    while (true) {
+      int value = refCount.get();
+      if (value != 1) return false;
+      if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
+    }
+  }
+
+  void commitEvicting() {
+    boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
+    assert result;
+  }
+
+  void abortEvicting() {
+    boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
+    assert result;
+  }
+
+  /**
+   * All this mess is necessary because we want to be able to remove sub-caches for fully
+   * evicted files. It may actually be better to have non-nested map with object keys?
+   */
+  public static <T> FileCache<T> getOrAddFileSubCache(
+      ConcurrentHashMap<Object, FileCache<T>> cache, Object fileKey,
+      Function<Void, T> createFunc) {
+    FileCache<T> newSubCache = null;
+    while (true) { // Overwhelmingly executes once.
+      FileCache<T> subCache = cache.get(fileKey);
+      if (subCache != null) {
+        if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
+        if (newSubCache == null) {
+          newSubCache = new FileCache<T>(createFunc.apply(null));
+          newSubCache.incRef();
+        }
+        // Found a stale value we cannot incRef; try to replace it with new value.
+        if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
+        continue; // Someone else replaced/removed a stale value, try again.
+      }
+      // No value found.
+      if (newSubCache == null) {
+        newSubCache = new FileCache<T>(createFunc.apply(null));
+        newSubCache.incRef();
+      }
+      FileCache<T> oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
+      if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
+      if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
+      // Someone created one in parallel and then it went stale.
+      if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
+      // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
new file mode 100644
index 0000000..17c7ee6
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hive.common.util.Ref;
+
+/** Class used to slowly clean up a map of FileCache-s. */
+abstract class FileCacheCleanupThread<T> extends Thread {
+  private final long approxCleanupIntervalSec;
+  private final AtomicInteger newEvictions;
+  private final ConcurrentHashMap<Object, FileCache<T>> fileMap;
+
+  public FileCacheCleanupThread(String name, ConcurrentHashMap<Object, FileCache<T>> fileMap,
+      AtomicInteger newEvictions, long cleanupInterval) {
+    super(name);
+    this.fileMap = fileMap;
+    this.newEvictions = newEvictions;
+    this.approxCleanupIntervalSec = cleanupInterval;
+    setDaemon(true);
+    setPriority(1);
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      try {
+        doOneCleanupRound();
+      } catch (InterruptedException ex) {
+        LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
+        Thread.currentThread().interrupt();
+        break;
+      } catch (Throwable t) {
+        LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
+        break;
+      }
+    }
+  }
+
+  private void doOneCleanupRound() throws InterruptedException {
+    while (true) {
+      int evictionsSinceLast = newEvictions.getAndSet(0);
+      if (evictionsSinceLast > 0) break;
+      synchronized (newEvictions) {
+        newEvictions.wait(10000);
+      }
+    }
+    // Duration is an estimate; if the size of the map changes, it can be very different.
+    long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
+    int leftToCheck = 0; // approximate
+    for (FileCache<T> fc : fileMap.values()) {
+      leftToCheck += getCacheSize(fc);
+    }
+    // Iterate thru all the filecaches. This is best-effort.
+    // If these super-long-lived iterators affect the map in some bad way,
+    // we'd need to sleep once per round instead.
+    Iterator<Map.Entry<Object, FileCache<T>>> iter = fileMap.entrySet().iterator();
+    Ref<Boolean> isPastEndTime = Ref.from(false);
+    while (iter.hasNext()) {
+      FileCache<T> fc = iter.next().getValue();
+      if (!fc.incRef()) {
+        throw new AssertionError("Something other than cleanup is removing elements from map");
+      }
+      leftToCheck = cleanUpOneFileCache(fc, leftToCheck, endTime, isPastEndTime);
+      if (getCacheSize(fc) > 0) {
+        fc.decRef();
+        continue;
+      }
+      // FileCache might be empty; see if we can remove it. "tryWriteLock"
+      if (!fc.startEvicting()) continue;
+      if (getCacheSize(fc) == 0) {
+        fc.commitEvicting();
+        iter.remove();
+      } else {
+        fc.abortEvicting();
+      }
+    }
+  }
+
+  protected abstract int getCacheSize(FileCache<T> fc);
+
+  protected abstract int cleanUpOneFileCache(FileCache<T> fc, int leftToCheck, long endTime,
+      Ref<Boolean> isPastEndTime) throws InterruptedException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index d1a961c..7d5c101 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -139,4 +139,11 @@ public final class LlapDataBuffer extends LlapCacheableBuffer implements MemoryB
     int refCount = this.refCount.get();
     return "0x" + Integer.toHexString(System.identityHashCode(this)) + "(" + refCount + ")";
   }
+
+  public static String toDataString(MemoryBuffer s) {
+    if (s == null || s.getByteBufferRaw().remaining() == 0) return "" + s;
+    byte b = s.getByteBufferRaw().get(s.getByteBufferRaw().position());
+    int i = (b < 0) ? -b : b;
+    return s + " (0x" + Integer.toHexString(i) + ")";
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index ea458ca..72980ae 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -32,33 +32,45 @@ import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
 import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hive.common.util.Ref;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 
 public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
   private static final int DEFAULT_CLEANUP_INTERVAL = 600;
-  private final EvictionAwareAllocator allocator;
+  private final Allocator allocator;
   private final AtomicInteger newEvictions = new AtomicInteger(0);
   private Thread cleanupThread = null;
-  private final ConcurrentHashMap<Object, FileCache> cache =
-      new ConcurrentHashMap<Object, FileCache>();
+  // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
+  //       In fact, CSLM has slow single-threaded operation, and one file is probably often read
+  //       by just one (or few) threads, so a much more simple DS with locking might be better.
+  //       Let's use CSLM for now, since it's available.
+  private final ConcurrentHashMap<Object,
+      FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> cache = new ConcurrentHashMap<>();
   private final LowLevelCachePolicy cachePolicy;
   private final long cleanupInterval;
   private final LlapDaemonCacheMetrics metrics;
   private final boolean doAssumeGranularBlocks;
 
+  private static final Function<Void, ConcurrentSkipListMap<Long, LlapDataBuffer>> CACHE_CTOR =
+      new Function<Void, ConcurrentSkipListMap<Long, LlapDataBuffer>>() {
+        @Override
+        public ConcurrentSkipListMap<Long, LlapDataBuffer> apply(Void input) {
+          return new ConcurrentSkipListMap<>();
+        }
+      };
+
   public LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
-      EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks) {
+      Allocator allocator, boolean doAssumeGranularBlocks) {
     this(metrics, cachePolicy, allocator, doAssumeGranularBlocks, DEFAULT_CLEANUP_INTERVAL);
   }
 
   @VisibleForTesting
   LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
-      EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
-
+      Allocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
     LlapIoImpl.LOG.info("Low level cache; cleanup interval {} sec", cleanupInterval);
     this.cachePolicy = cachePolicy;
     this.allocator = allocator;
@@ -69,7 +81,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
 
   public void startThreads() {
     if (cleanupInterval < 0) return;
-    cleanupThread = new CleanupThread(cleanupInterval);
+    cleanupThread = new CleanupThread(cache, newEvictions, cleanupInterval);
     cleanupThread.start();
   }
 
@@ -78,7 +90,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
       DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
     if (ranges == null) return null;
     DiskRangeList prev = ranges.prev;
-    FileCache subCache = cache.get(fileKey);
+    FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache = cache.get(fileKey);
     if (subCache == null || !subCache.incRef()) {
       long totalMissed = ranges.getTotalLength();
       metrics.incrCacheRequestedBytes(totalMissed);
@@ -102,7 +114,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         metrics.incrCacheRequestedBytes(current.getLength());
         // We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
         DiskRangeList next = current.next;
-        getOverlappingRanges(baseOffset, current, subCache.cache, factory, gotAllData);
+        getOverlappingRanges(baseOffset, current, subCache.getCache(), factory, gotAllData);
         current = next;
       }
     } finally {
@@ -234,7 +246,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
       long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
     long[] result = null;
     assert buffers.length == ranges.length;
-    FileCache subCache = getOrAddFileSubCache(fileKey);
+    FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache =
+        FileCache.getOrAddFileSubCache(cache, fileKey, CACHE_CTOR);
     try {
       for (int i = 0; i < ranges.length; ++i) {
         LlapDataBuffer buffer = (LlapDataBuffer)buffers[i];
@@ -247,7 +260,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         assert buffer.declaredCachedLength == LlapDataBuffer.UNKNOWN_CACHED_LENGTH;
         buffer.declaredCachedLength = ranges[i].getLength();
         while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
-          LlapDataBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
+          LlapDataBuffer oldVal = subCache.getCache().putIfAbsent(offset, buffer);
           if (oldVal == null) {
             // Cached successfully, add to policy.
             cachePolicy.cache(buffer, priority);
@@ -287,7 +300,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
             break;
           }
           // We found some old value but couldn't incRef it; remove it.
-          subCache.cache.remove(offset, oldVal);
+          subCache.getCache().remove(offset, oldVal);
         }
       }
     } finally {
@@ -296,38 +309,6 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
     return result;
   }
 
-  /**
-   * All this mess is necessary because we want to be able to remove sub-caches for fully
-   * evicted files. It may actually be better to have non-nested map with object keys?
-   */
-  private FileCache getOrAddFileSubCache(Object fileKey) {
-    FileCache newSubCache = null;
-    while (true) { // Overwhelmingly executes once.
-      FileCache subCache = cache.get(fileKey);
-      if (subCache != null) {
-        if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
-        if (newSubCache == null) {
-          newSubCache = new FileCache();
-          newSubCache.incRef();
-        }
-        // Found a stale value we cannot incRef; try to replace it with new value.
-        if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
-        continue; // Someone else replaced/removed a stale value, try again.
-      }
-      // No value found.
-      if (newSubCache == null) {
-        newSubCache = new FileCache();
-        newSubCache.incRef();
-      }
-      FileCache oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
-      if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
-      if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
-      // Someone created one in parallel and then it went stale.
-      if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
-      // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
-    }
-  }
-
   private static int align64(int number) {
     return ((number + 63) & ~63);
   }
@@ -370,134 +351,44 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
 
   @Override
   public final void notifyEvicted(MemoryBuffer buffer) {
-    allocator.deallocateEvicted(buffer);
     newEvictions.incrementAndGet();
   }
 
-  private static class FileCache {
-    private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
-    // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
-    //       In fact, CSLM has slow single-threaded operation, and one file is probably often read
-    //       by just one (or few) threads, so a much more simple DS with locking might be better.
-    //       Let's use CSLM for now, since it's available.
-    private final ConcurrentSkipListMap<Long, LlapDataBuffer> cache
-      = new ConcurrentSkipListMap<Long, LlapDataBuffer>();
-    private final AtomicInteger refCount = new AtomicInteger(0);
-
-    boolean incRef() {
-      while (true) {
-        int value = refCount.get();
-        if (value == EVICTED_REFCOUNT) return false;
-        if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
-        assert value >= 0;
-        if (refCount.compareAndSet(value, value + 1)) return true;
-      }
-    }
-
-    void decRef() {
-      int value = refCount.decrementAndGet();
-      if (value < 0) {
-        throw new AssertionError("Unexpected refCount " + value);
-      }
-    }
-
-    boolean startEvicting() {
-      while (true) {
-        int value = refCount.get();
-        if (value != 1) return false;
-        if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
-      }
-    }
-
-    void commitEvicting() {
-      boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
-      assert result;
-    }
-
-    void abortEvicting() {
-      boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
-      assert result;
-    }
-  }
-
-  private final class CleanupThread extends Thread {
-    private final long approxCleanupIntervalSec;
+  private static final class CleanupThread
+    extends FileCacheCleanupThread<ConcurrentSkipListMap<Long, LlapDataBuffer>> {
 
-    public CleanupThread(long cleanupInterval) {
-      super("Llap low level cache cleanup thread");
-      this.approxCleanupIntervalSec = cleanupInterval;
-      setDaemon(true);
-      setPriority(1);
+    public CleanupThread(ConcurrentHashMap<Object,
+        FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> fileMap,
+        AtomicInteger newEvictions, long cleanupInterval) {
+      super("Llap low level cache cleanup thread", fileMap, newEvictions, cleanupInterval);
     }
 
     @Override
-    public void run() {
-      while (true) {
-        try {
-          doOneCleanupRound();
-        } catch (InterruptedException ex) {
-          LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
-          Thread.currentThread().interrupt();
-          break;
-        } catch (Throwable t) {
-          LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
-          break;
-        }
-      }
+    protected int getCacheSize( FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> fc) {
+      return fc.getCache().size();
     }
 
-    private void doOneCleanupRound() throws InterruptedException {
-      while (true) {
-        int evictionsSinceLast = newEvictions.getAndSet(0);
-        if (evictionsSinceLast > 0) break;
-        synchronized (newEvictions) {
-          newEvictions.wait(10000);
-        }
-      }
-      // Duration is an estimate; if the size of the map changes, it can be very different.
-      long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
-      int leftToCheck = 0; // approximate
-      for (FileCache fc : cache.values()) {
-        leftToCheck += fc.cache.size();
-      }
-      // Iterate thru all the filecaches. This is best-effort.
-      // If these super-long-lived iterators affect the map in some bad way,
-      // we'd need to sleep once per round instead.
-      Iterator<Map.Entry<Object, FileCache>> iter = cache.entrySet().iterator();
-      boolean isPastEndTime = false;
-      while (iter.hasNext()) {
-        FileCache fc = iter.next().getValue();
-        if (!fc.incRef()) {
-          throw new AssertionError("Something other than cleanup is removing elements from map");
-        }
-        // Iterate thru the file cache. This is best-effort.
-        Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.cache.entrySet().iterator();
-        boolean isEmpty = true;
-        while (subIter.hasNext()) {
-          long time = -1;
-          isPastEndTime = isPastEndTime || ((time = System.nanoTime()) >= endTime);
-          Thread.sleep(((leftToCheck <= 0) || isPastEndTime)
-              ? 1 : (endTime - time) / (1000000L * leftToCheck));
-          if (subIter.next().getValue().isInvalid()) {
-            subIter.remove();
-          } else {
-            isEmpty = false;
-          }
-          --leftToCheck;
-        }
-        if (!isEmpty) {
-          fc.decRef();
-          continue;
-        }
-        // FileCache might be empty; see if we can remove it. "tryWriteLock"
-        if (!fc.startEvicting()) continue;
-        if (fc.cache.isEmpty()) {
-          fc.commitEvicting();
-          iter.remove();
+    @Override
+    public int cleanUpOneFileCache(
+        FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> fc,
+        int leftToCheck, long endTime, Ref<Boolean> isPastEndTime)
+        throws InterruptedException {
+      // Iterate thru the file cache. This is best-effort.
+      Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.getCache().entrySet().iterator();
+      boolean isEmpty = true;
+      while (subIter.hasNext()) {
+        long time = -1;
+        isPastEndTime.value = isPastEndTime.value || ((time = System.nanoTime()) >= endTime);
+        Thread.sleep(((leftToCheck <= 0) || isPastEndTime.value)
+            ? 1 : (endTime - time) / (1000000L * leftToCheck));
+        if (subIter.next().getValue().isInvalid()) {
+          subIter.remove();
         } else {
-          fc.abortEvicting();
+          isEmpty = false;
         }
+        --leftToCheck;
       }
+      return leftToCheck;
     }
   }
 
@@ -516,11 +407,12 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
   @Override
   public String debugDumpForOom() {
     StringBuilder sb = new StringBuilder("File cache state ");
-    for (Map.Entry<Object, FileCache> e : cache.entrySet()) {
+    for (Map.Entry<Object, FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> e :
+      cache.entrySet()) {
       if (!e.getValue().incRef()) continue;
       try {
         sb.append("\n  file " + e.getKey());
-        for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().cache.entrySet()) {
+        for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) {
           if (e2.getValue().incRef() < 0) continue;
           try {
             sb.append("\n    [").append(e2.getKey()).append(", ")

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
new file mode 100644
index 0000000..53e3275
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hive.common.util.Ref;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+
+import com.google.common.base.Function;
+
+public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugDump {
+  private static final int DEFAULT_CLEANUP_INTERVAL = 600;
+  private final Allocator allocator;
+  private final AtomicInteger newEvictions = new AtomicInteger(0);
+  private Thread cleanupThread = null;
+  private final ConcurrentHashMap<Object, FileCache<FileData>> cache = new ConcurrentHashMap<>();
+  private final LowLevelCachePolicy cachePolicy;
+  private final long cleanupInterval;
+  private final LlapDaemonCacheMetrics metrics;
+
+  private static final class StripeInfoComparator implements
+      Comparator<StripeData> {
+    @Override
+    public int compare(StripeData o1, StripeData o2) {
+      int starts = Long.compare(o1.knownTornStart, o2.knownTornStart);
+      if (starts != 0) return starts;
+      starts = Long.compare(o1.firstStart, o2.firstStart);
+      if (starts != 0) return starts;
+      assert (o1.lastStart == o2.lastStart) == (o1.lastEnd == o2.lastEnd);
+      return Long.compare(o1.lastStart, o2.lastStart);
+    }
+  }
+
+  public static class FileData {
+    /**
+     * RW lock ensures we have a consistent view of the file data, which is important given that
+     * we generate "stripe" boundaries arbitrarily. Reading buffer data itself doesn't require
+     * that this lock is held; however, everything else in stripes list does.
+     * TODO: make more granular? We only care that each one reader sees consistent boundaries.
+     *       So, we could shallow-copy the stripes list, then have individual locks inside each.
+     */
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Object fileKey;
+    private final int colCount;
+    private ArrayList<StripeData> stripes;
+
+    public FileData(Object fileKey, int colCount) {
+      this.fileKey = fileKey;
+      this.colCount = colCount;
+    }
+
+    public void toString(StringBuilder sb) {
+      sb.append("File data for ").append(fileKey).append(" with ").append(colCount)
+        .append(" columns: ").append(stripes);
+    }
+
+    public int getColCount() {
+      return colCount;
+    }
+
+    public ArrayList<StripeData> getData() {
+      return stripes;
+    }
+
+    public void addStripe(StripeData sd) {
+      if (stripes == null) {
+        stripes = new ArrayList<>();
+      }
+      stripes.add(sd);
+    }
+
+    @Override
+    public String toString() {
+      return "[fileKey=" + fileKey + ", colCount=" + colCount + ", stripes=" + stripes + "]";
+    }
+  }
+
+  public static final class StripeData {
+    // In LRR case, if we just store 2 boundaries (which could be split boundaries or reader
+    // positions, we wouldn't be able to account for torn rows correctly because the semantics of
+    // our "exact" reader positions, and inexact split boundaries, are different. We cannot even
+    // tell LRR to use exact boundaries, as there can be a mismatch in an original mid-file split
+    // wrt first row when caching - we may produce incorrect result if we adjust the split
+    // boundary, and also if we don't adjust it, depending where it falls. At best, we'd end up
+    // with spurious disk reads if we cache on row boundaries but splits include torn rows.
+    // This structure implies that when reading a split, we skip the first torn row but fully
+    // read the last torn row (as LineRecordReader does). If we want to support a different scheme,
+    // we'd need to store more offsets and make logic account for that.
+    private long knownTornStart; // This can change based on new splits.
+    private final long firstStart, lastStart, lastEnd;
+    // TODO: we can actually consider storing ALL the delta encoded row offsets - not a lot of
+    //       overhead compared to the data itself, and with row offsets, we could use columnar
+    //       blocks for inconsistent splits. We are not optimizing for inconsistent splits for now.
+
+    private final long rowCount;
+    private final OrcProto.ColumnEncoding[] encodings;
+    private LlapDataBuffer[][][] data; // column index, stream type, buffers
+
+    public StripeData(long knownTornStart, long firstStart, long lastStart, long lastEnd,
+        long rowCount, ColumnEncoding[] encodings) {
+      this.knownTornStart = knownTornStart;
+      this.firstStart = firstStart;
+      this.lastStart = lastStart;
+      this.lastEnd = lastEnd;
+      this.encodings = encodings;
+      this.rowCount = rowCount;
+      this.data = encodings == null ? null : new LlapDataBuffer[encodings.length][][];
+    }
+ 
+    @Override
+    public String toString() {
+      return toCoordinateString() + " with encodings [" + Arrays.toString(encodings)
+          .replace('\n', ' ') + "] and data " + SerDeLowLevelCacheImpl.toString(data);
+    }
+
+    public long getKnownTornStart() {
+      return knownTornStart;
+    }
+
+    public long getFirstStart() {
+      return firstStart;
+    }
+
+    public long getLastStart() {
+      return lastStart;
+    }
+
+    public long getLastEnd() {
+      return lastEnd;
+    }
+
+    public long getRowCount() {
+      return rowCount;
+    }
+
+    public OrcProto.ColumnEncoding[] getEncodings() {
+      return encodings;
+    }
+
+    public LlapDataBuffer[][][] getData() {
+      return data;
+    }
+
+    public String toCoordinateString() {
+      return "stripe kts " + knownTornStart + " from "
+          + firstStart + " to [" + lastStart + ", " + lastEnd + ")";
+    }
+
+    public static StripeData duplicateForResults(StripeData s) {
+      return new StripeData(s.knownTornStart, s.firstStart, s.lastStart, s.lastEnd,
+          s.rowCount, new OrcProto.ColumnEncoding[s.encodings.length]);
+    }
+
+    public void setKnownTornStart(long value) {
+      knownTornStart = value;
+    }
+  }
+
+  public static String toString(LlapDataBuffer[][][] data) {
+    if (data == null) return "null";
+    StringBuilder sb = new StringBuilder("[");
+    for (int i = 0; i < data.length; ++i) {
+      LlapDataBuffer[][] colData = data[i];
+      if (colData == null) {
+        sb.append("null, ");
+        continue;
+      }
+      sb.append("[");
+      for (int j = 0; j < colData.length; ++j) {
+        LlapDataBuffer[] streamData = colData[j];
+        if (streamData == null) {
+          sb.append("null, ");
+          continue;
+        }
+        sb.append("[");
+        for (int k = 0; k < streamData.length; ++k) {
+          LlapDataBuffer s = streamData[k];
+          sb.append(LlapDataBuffer.toDataString(s));
+        }
+        sb.append("], ");
+      }
+      sb.append("], ");
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+  
+
+  public static String toString(LlapDataBuffer[][] data) {
+    if (data == null) return "null";
+    StringBuilder sb = new StringBuilder("[");
+    for (int j = 0; j < data.length; ++j) {
+      LlapDataBuffer[] streamData = data[j];
+      if (streamData == null) {
+        sb.append("null, ");
+        continue;
+      }
+      sb.append("[");
+      for (int k = 0; k < streamData.length; ++k) {
+        LlapDataBuffer s = streamData[k];
+        sb.append(LlapDataBuffer.toDataString(s));
+      }
+      sb.append("], ");
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public SerDeLowLevelCacheImpl(
+      LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy, Allocator allocator) {
+    this.cachePolicy = cachePolicy;
+    this.allocator = allocator;
+    this.cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
+    this.metrics = metrics;
+    LlapIoImpl.LOG.info("SerDe low-level level cache; cleanup interval {} sec", cleanupInterval);
+  }
+
+  public void startThreads() {
+    if (cleanupInterval < 0) return;
+    cleanupThread = new CleanupThread(cache, newEvictions, cleanupInterval);
+    cleanupThread.start();
+  }
+
+  public FileData getFileData(Object fileKey, long start, long end, boolean[] includes,
+      DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData)
+          throws IOException {
+    FileCache<FileData> subCache = cache.get(fileKey);
+    if (subCache == null || !subCache.incRef()) {
+      LlapIoImpl.CACHE_LOGGER.info("TODO# cannot find cache for " + fileKey + " in " + cache);
+      markAllAsMissed(start, end, qfCounters, gotAllData);
+      return null;
+    }
+
+    try {
+      FileData cached = subCache.getCache();
+      cached.rwLock.readLock().lock();
+      LlapIoImpl.CACHE_LOGGER.info(("TODO# cache for " + fileKey + " is " + subCache.getCache()).replace('\n', ' '));
+      try {
+        if (cached.stripes == null) {
+          LlapIoImpl.CACHE_LOGGER.info("TODO# cannot find any stripes for " + fileKey);
+          markAllAsMissed(start, end, qfCounters, gotAllData);
+          return null;
+        }
+        if (includes.length > cached.colCount) {
+          throw new IOException("Includes " + DebugUtils.toString(includes) + " for "
+              + cached.colCount + " columns");
+        }
+        FileData result = new FileData(cached.fileKey, cached.colCount);
+        if (gotAllData != null) {
+          gotAllData.value = true;
+        }
+        // We will adjust start and end so that we could record the metrics; save the originals.
+        long origStart = start, origEnd = end;
+        // startIx is inclusive, endIx is exclusive.
+        int startIx = Integer.MIN_VALUE, endIx = Integer.MIN_VALUE;
+        LlapIoImpl.CACHE_LOGGER.info("TODO# Looking for data between " + start + " and " + end);
+        for (int i = 0; i < cached.stripes.size() && endIx == Integer.MIN_VALUE; ++i) {
+          StripeData si = cached.stripes.get(i);
+          LlapIoImpl.CACHE_LOGGER.info("TODO# looking at " + si.toCoordinateString());
+
+          if (startIx == i) {
+            // The start of the split was in the middle of the previous slice.
+            start = si.knownTornStart;
+          } else if (startIx == Integer.MIN_VALUE) {
+            // Determine if we need to read this slice for the split.
+            if (si.lastEnd <= start) continue; // Slice before the start of the split.
+            // Start of the split falls somewhere within or before this slice.
+            // Note the ">=" - LineRecordReader will skip the first row even if we start
+            // directly at its start, because it cannot know if it's the start or not.
+            // Unless it's 0; note that we DO give 0 special treatment here, unlike the EOF below,
+            // because zero is zero. Need to mention it in Javadoc.
+            if (start == 0 && si.firstStart == 0) {
+              startIx = i;
+            } else if (start >= si.firstStart) {
+              // If the start of the split points into the middle of the cached slice, we cannot
+              // use the cached block - it's encoded and columnar, so we cannot map the file
+              // offset to some "offset" in "middle" of the slice (but see TODO for firstStart).
+              startIx = i + 1;
+              // continue;
+            } else {
+              // Start of the split is before this slice.
+              startIx = i; // Simple case - we will read cache from the split start offset.
+              start = si.knownTornStart;
+            }
+          }
+
+          // Determine if this (or previous) is the last slice we need to read for this split.
+          if (startIx != Integer.MIN_VALUE && endIx == Integer.MIN_VALUE) {
+            if (si.lastEnd <= end) {
+              // The entire current slice is part of the split. Note that if split end EQUALS
+              // lastEnd, the split would also read the next row, so we do need to look at the
+              // next slice, if any (although we'd probably find we cannot use it).
+              // Note also that we DO NOT treat end-of-file differently here, cause we do not know
+              // of any such thing. The caller must handle lastEnd vs end of split vs end of file
+              // match correctly in terms of how LRR handles them. See above for start-of-file.
+              if (i + 1 != cached.stripes.size()) continue;
+              endIx = i + 1;
+              end = si.lastEnd;
+            } else if (si.lastStart <= end) {
+              // The split ends within (and would read) the last row of this slice. Exact match.
+              endIx = i + 1;
+              end = si.lastEnd;
+            } else {
+              // Either the slice comes entirely after the end of split (following a gap in cached
+              // data); or the split ends in the middle of the slice, so it's the same as in the
+              // startIx logic w.r.t. the partial match; so, we either don't want to, or cannot,
+              // use this. There's no need to distinguish these two cases for now.
+              endIx = i;
+              end = (endIx > 0) ? cached.stripes.get(endIx - 1).lastEnd : start;
+            }
+          }
+        }
+        LlapIoImpl.CACHE_LOGGER.info("TODO# determined stripe indexes " + startIx + ", " + endIx);
+        if (endIx <= startIx) {
+          if (gotAllData != null) {
+            gotAllData.value = false;
+          }
+          return null;  // No data for the split, or it fits in the middle of one or two slices.
+        }
+        if (start > origStart || end < origEnd) {
+          if (gotAllData != null) {
+            gotAllData.value = false;
+          }
+          long totalMiss = Math.max(0, origEnd - end) + Math.max(0, start - origStart);
+          metrics.incrCacheRequestedBytes(totalMiss);
+          if (qfCounters != null) {
+            qfCounters.recordCacheMiss(totalMiss);
+          }
+        }
+
+        result.stripes = new ArrayList<>(endIx - startIx);
+        for (int stripeIx = startIx; stripeIx < endIx; ++stripeIx) {
+          getCacheDataForOneSlice(stripeIx, cached, result, gotAllData, includes, qfCounters);
+        }
+        return result;
+      } finally {
+        cached.rwLock.readLock().unlock();
+      }
+    } finally {
+      subCache.decRef();
+    }
+  }
+
+
+  private void getCacheDataForOneSlice(int stripeIx, FileData cached, FileData result,
+      BooleanRef gotAllData, boolean[] includes, LowLevelCacheCounters qfCounters) {
+    StripeData cStripe = cached.stripes.get(stripeIx);
+    LlapIoImpl.CACHE_LOGGER.info("TODO# got stripe in cache " + cStripe);
+    StripeData stripe = StripeData.duplicateForResults(cStripe);
+    result.stripes.add(stripe);
+    boolean isMissed = false;
+    for (int colIx = 0; colIx < cached.colCount; ++colIx) {
+      if (!includes[colIx]) continue;
+      if (cStripe.encodings[colIx] == null || cStripe.data[colIx] == null) {
+        assert cStripe.data[colIx] == null; // No encoding => must have no data.
+        isMissed = true;
+        if (gotAllData != null) {
+          gotAllData.value = false;
+        }
+        continue;
+      }
+      stripe.encodings[colIx] = cStripe.encodings[colIx];
+      LlapDataBuffer[][] cColData = cStripe.data[colIx];
+      assert cColData != null;
+      for (int streamIx = 0;
+          cColData != null && streamIx < cColData.length; ++streamIx) {
+        LlapDataBuffer[] streamData = cColData[streamIx];
+        // Note: this relies on the fact that we always evict the entire column, so if
+        //       we have the column data, we assume we have all the streams we need.
+        if (streamData == null) continue;
+        for (int i = 0; i < streamData.length; ++i) { // Finally, we are going to use "i"!
+          if (!lockBuffer(streamData[i], true)) {
+            LlapIoImpl.CACHE_LOGGER.info("TODO# couldn't lock data for stripe at "
+                + stripeIx + ", colIx " + colIx + ", stream type " + streamIx);
+
+            cColData = null;
+            isMissed = true;
+            handleRemovedColumnData(cColData);
+            if (gotAllData != null) {
+              gotAllData.value = false;
+            }
+            break;
+          }
+        }
+      }
+      // At this point, we have arrived at the level where we need all the data, and the
+      // arrays never change. So we will just do a shallow assignment here instead of copy.
+      stripe.data[colIx] = cColData;
+    }
+    doMetricsStuffForOneSlice(qfCounters, stripe, isMissed);
+  }
+
+
+  private void doMetricsStuffForOneSlice(
+      LowLevelCacheCounters qfCounters, StripeData stripe, boolean isMissed) {
+    // Slice boundaries may not match split boundaries due to torn rows in either direction,
+    // so this counter may not be consistent with splits. This is also why we increment
+    // requested bytes here, instead of based on the split - we don't want the metrics to be
+    // inconsistent with each other. No matter what we determine here, at least we'll account
+    // for both in the same manner.
+    long bytes = stripe.lastEnd - stripe.knownTornStart;
+    metrics.incrCacheRequestedBytes(bytes);
+    if (!isMissed) {
+      metrics.incrCacheHitBytes(bytes);
+    }
+    if (qfCounters != null) {
+      if (isMissed) {
+        qfCounters.recordCacheMiss(bytes);
+      } else {
+        qfCounters.recordCacheHit(bytes);
+      }
+    }
+  }
+
+  private void markAllAsMissed(long from, long to,
+      LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
+    if (qfCounters != null) {
+      metrics.incrCacheRequestedBytes(to - from);
+      qfCounters.recordCacheMiss(to - from);
+    }
+    if (gotAllData != null) {
+      gotAllData.value = false;
+    }
+  }
+
+  private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) {
+    int rc = buffer.incRef();
+    if (rc > 0) {
+      metrics.incrCacheNumLockedBuffers();
+    }
+    if (doNotifyPolicy && rc == 1) {
+      // We have just locked a buffer that wasn't previously locked.
+      cachePolicy.notifyLock(buffer);
+    }
+    return rc > 0;
+  }
+
+  public void putFileData(final FileData data, Priority priority,
+      LowLevelCacheCounters qfCounters) {
+    // TODO: buffers are accounted for at allocation time, but ideally we should report the memory
+    //       overhead from the java objects to memory manager and remove it when discarding file.
+    if (data.stripes == null || data.stripes.isEmpty()) {
+      LlapIoImpl.LOG.warn("Trying to cache FileData with no data for " + data.fileKey);
+      return;
+    }
+    FileCache<FileData> subCache = null;
+    FileData cached = null;
+    data.rwLock.writeLock().lock();
+    try {
+      subCache = FileCache.getOrAddFileSubCache(
+          cache, data.fileKey, new Function<Void, FileData>() {
+        @Override
+        public FileData apply(Void input) {
+          return data; // If we don't have a file cache, we will add this one as is.
+        }
+      });
+      cached = subCache.getCache();
+    } finally {
+      if (data != cached) {
+        data.rwLock.writeLock().unlock();
+      }
+    }
+    try {
+      if (data != cached) {
+        cached.rwLock.writeLock().lock();
+      }
+      try {
+        for (StripeData si : data.stripes) {
+          lockAllBuffersForPut(si, priority);
+        }
+        if (data == cached) {
+          LlapIoImpl.CACHE_LOGGER.info(("TODO# cached new data " + data).replace('\n', ' '));
+          return;
+        }
+        LlapIoImpl.CACHE_LOGGER.info(("TODO# merging old " + cached + " and new " + data).replace('\n', ' '));
+        ArrayList<StripeData> combined = new ArrayList<>(
+            cached.stripes.size() + data.stripes.size());
+        combined.addAll(cached.stripes);
+        combined.addAll(data.stripes);
+        Collections.sort(combined, new StripeInfoComparator());
+        int lastIx = combined.size() - 1;
+        for (int ix = 0; ix < lastIx; ++ix) {
+          StripeData cur = combined.get(ix), next = combined.get(ix + 1);
+          if (cur.lastEnd <= next.firstStart) continue; // All good.
+          if (cur.firstStart == next.firstStart && cur.lastEnd == next.lastEnd) {
+            mergeStripeInfos(cur, next);
+            combined.remove(ix + 1);
+            --lastIx;
+            // Don't recheck with next, only 2 lists each w/o collisions.
+            continue;
+          }
+          // The original lists do not contain collisions, so only one is 'old'.
+          boolean isCurOriginal = cached.stripes.contains(cur);
+          handleRemovedStripeInfo(combined.remove(isCurOriginal ? ix : ix + 1));
+          --ix;
+          --lastIx;
+        }
+        cached.stripes = combined;
+        LlapIoImpl.CACHE_LOGGER.info(("TODO# new cache data is " + combined).replace('\n', ' '));
+
+      } finally {
+        cached.rwLock.writeLock().unlock();
+      }
+    } finally {
+      subCache.decRef();
+    }
+  }
+
+  private void lockAllBuffersForPut(StripeData si, Priority priority) {
+    for (int i = 0; i < si.data.length; ++i) {
+      LlapDataBuffer[][] colData = si.data[i];
+      if (colData == null) continue;
+      for (int j = 0; j < colData.length; ++j) {
+        LlapDataBuffer[] streamData = colData[j];
+        if (streamData == null) continue;
+        for (int k = 0; k < streamData.length; ++k) {
+          boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet
+          assert canLock;
+          /*LlapIoImpl.LOG.info("TODO# Calling cache on "
+              + System.identityHashCode(streamData[k]) + ": " + i + ", " + j + ", " + k);*/
+          cachePolicy.cache(streamData[k], priority);
+          streamData[k].declaredCachedLength = streamData[k].getByteBufferRaw().remaining();
+        }
+      }
+    }
+  }
+
+  private void handleRemovedStripeInfo(StripeData removed) {
+    for (LlapDataBuffer[][] colData : removed.data) {
+      handleRemovedColumnData(colData);
+    }
+  }
+
+  private void handleRemovedColumnData(LlapDataBuffer[][] removed) {
+    // TODO: could we tell the policy that we don't care about these and have them evicted? or we
+    //       could just deallocate them when unlocked, and free memory + handle that in eviction.
+    //       For now, just abandon the blocks - eventually, they'll get evicted.
+  }
+
+  private void mergeStripeInfos(StripeData to, StripeData from) {
+    LlapIoImpl.CACHE_LOGGER.info("TODO# merging slices data: old " + to + " and new " + from);
+    to.knownTornStart = Math.min(to.knownTornStart, from.knownTornStart);
+    if (from.encodings.length != to.encodings.length) {
+      throw new RuntimeException("Different encodings " + from + "; " + to);
+    }
+    for (int colIx = 0; colIx < from.encodings.length; ++colIx) {
+      if (to.encodings[colIx] == null) {
+        to.encodings[colIx] = from.encodings[colIx];
+      } else if (from.encodings[colIx] != null
+          && !to.encodings[colIx].equals(from.encodings[colIx])) {
+        throw new RuntimeException("Different encodings at " + colIx + ": " + from + "; " + to);
+      }
+      LlapDataBuffer[][] fromColData = from.data[colIx];
+      if (fromColData != null) {
+        if (to.data[colIx] != null) {
+          // Note: we assume here that the data that was returned to the caller from cache will not
+          // be passed back in via put. Right now it's safe since we don't do anything. But if we
+          // evict proactively, we will have to compare objects all the way down.
+          handleRemovedColumnData(to.data[colIx]);
+        }
+        to.data[colIx] = fromColData;
+      }
+    } 
+  }
+
+  @Override
+  public void decRefBuffer(MemoryBuffer buffer) {
+    unlockBuffer((LlapDataBuffer)buffer, true);
+  }
+
+  @Override
+  public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+    for (MemoryBuffer b : cacheBuffers) {
+      unlockBuffer((LlapDataBuffer)b, true);
+    }
+  }
+
+  private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) {
+    boolean isLastDecref = (buffer.decRef() == 0);
+    if (handleLastDecRef && isLastDecref) {
+      // This is kind of not pretty, but this is how we detect whether buffer was cached.
+      // We would always set this for lookups at put time.
+      if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
+        cachePolicy.notifyUnlock(buffer);
+      } else {
+        if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+          LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer);
+        }
+        allocator.deallocate(buffer);
+      }
+    }
+    metrics.decrCacheNumLockedBuffers();
+  }
+
+  private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
+  public static LlapDataBuffer allocateFake() {
+    LlapDataBuffer fake = new LlapDataBuffer();
+    fake.initialize(-1, fakeBuf, 0, 1);
+    return fake;
+  }
+
+  public final void notifyEvicted(MemoryBuffer buffer) {
+    newEvictions.incrementAndGet();
+  }
+
+  private final class CleanupThread extends FileCacheCleanupThread<FileData> {
+
+    public CleanupThread(ConcurrentHashMap<Object, FileCache<FileData>> fileMap,
+        AtomicInteger newEvictions, long cleanupInterval) {
+      super("Llap serde low level cache cleanup thread", fileMap, newEvictions, cleanupInterval);
+    }
+
+    @Override
+    protected int getCacheSize(FileCache<FileData> fc) {
+      return 1; // Each iteration cleans the file cache as a single unit (unlike the ORC cache).
+    }
+
+    @Override
+    public int cleanUpOneFileCache(FileCache<FileData> fc, int leftToCheck, long endTime,
+        Ref<Boolean> isPastEndTime) throws InterruptedException {
+      FileData fd = fc.getCache();
+      fd.rwLock.writeLock().lock();
+      try {
+        for (StripeData sd : fd.stripes) {
+          for (int colIx = 0; colIx < sd.data.length; ++colIx) {
+            LlapDataBuffer[][] colData = sd.data[colIx];
+            if (colData == null) continue;
+            boolean hasAllData = true;
+            for (int j = 0; (j < colData.length) && hasAllData; ++j) {
+              LlapDataBuffer[] streamData = colData[j];
+              if (streamData == null) continue;
+              for (int k = 0; k < streamData.length; ++k) {
+                LlapDataBuffer buf = streamData[k];
+                hasAllData = hasAllData && lockBuffer(buf, false);
+                if (!hasAllData) break;
+                unlockBuffer(buf, true);
+              }
+            }
+            if (!hasAllData) {
+              handleRemovedColumnData(colData);
+              sd.data[colIx] = null;
+            }
+          }
+        }
+      } finally {
+        fd.rwLock.writeLock().unlock();
+      }
+      return leftToCheck - 1;
+    }
+  }
+
+  @Override
+  public boolean incRefBuffer(MemoryBuffer buffer) {
+    // notifyReused implies that buffer is already locked; it's also called once for new
+    // buffers that are not cached yet. Don't notify cache policy.
+    return lockBuffer(((LlapDataBuffer)buffer), false);
+  }
+
+  @Override
+  public Allocator getAllocator() {
+    return allocator;
+  }
+
+  @Override
+  public String debugDumpForOom() {
+    StringBuilder sb = new StringBuilder("File cache state ");
+    for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) {
+      if (!e.getValue().incRef()) continue;
+      try {
+        sb.append("\n  file " + e.getKey());
+        sb.append("\n    [");
+        e.getValue().getCache().toString(sb);
+        sb.append("]");
+      } finally {
+        e.getValue().decRef();
+      }
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 290624d..ac9c1da 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -27,21 +27,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
-import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
@@ -53,15 +46,13 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination;
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
-import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.NullWritable;
@@ -89,65 +80,77 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
   @SuppressWarnings("rawtypes")
   private final InputFormat sourceInputFormat;
   private final AvoidSplitCombination sourceASC;
-  private final ColumnVectorProducer cvp;
-  private final ExecutorService executor;
+  @SuppressWarnings("deprecation")
+  private final Deserializer sourceSerDe;
+  final ColumnVectorProducer cvp;
+  final ExecutorService executor;
   private final String hostName;
 
   @SuppressWarnings("rawtypes")
-  LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
-      ExecutorService executor) {
-    // TODO: right now, we do nothing with source input format, ORC-only in the first cut.
-    //       We'd need to plumb it thru and use it to get data to cache/etc.
-    assert sourceInputFormat instanceof OrcInputFormat;
+  LlapInputFormat(InputFormat sourceInputFormat, Deserializer sourceSerDe,
+      ColumnVectorProducer cvp, ExecutorService executor) {
     this.executor = executor;
     this.cvp = cvp;
     this.sourceInputFormat = sourceInputFormat;
     this.sourceASC = (sourceInputFormat instanceof AvoidSplitCombination)
         ? (AvoidSplitCombination)sourceInputFormat : null;
+    this.sourceSerDe = sourceSerDe;
     this.hostName = HiveStringUtils.getHostname();
   }
 
   @Override
   public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(
       InputSplit split, JobConf job, Reporter reporter) throws IOException {
-    boolean useLlapIo = true;
-    if (split instanceof LlapAwareSplit) {
-      useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
-    }
+    RecordReader<NullWritable, VectorizedRowBatch> noLlap = checkLlapSplit(split, job, reporter);
+    if (noLlap != null) return noLlap;
+
     boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job);
 
-    if (!useLlapIo) {
-      LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
-      return sourceInputFormat.getRecordReader(split, job, reporter);
-    }
     FileSplit fileSplit = (FileSplit) split;
     reporter.setStatus(fileSplit.toString());
     try {
       List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
           ? null : ColumnProjectionUtils.getReadColumnIDs(job);
-      LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName);
-
+      RecordReader<?, ?> sourceRr = null;
+      LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName, cvp,
+          executor, sourceInputFormat, sourceSerDe, reporter);
       if (!rr.init()) {
         return sourceInputFormat.getRecordReader(split, job, reporter);
       }
 
-      // vectorized row batch reader
-      if (isVectorized) {
-        return rr;
-      }
-
-      // row batch to row-by-row reader
-      if (sourceInputFormat instanceof BatchToRowInputFormat) {
-        return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
-            rr, rr.getVectorizedRowBatchCtx(), includedCols));
-      }
-
-      return sourceInputFormat.getRecordReader(split, job, reporter);
+      return wrapLlapReader(isVectorized, includedCols, rr, split, job, reporter);
     } catch (Exception ex) {
       throw new IOException(ex);
     }
   }
 
+  public RecordReader<NullWritable, VectorizedRowBatch> wrapLlapReader(
+      boolean isVectorized, List<Integer> includedCols, LlapRecordReader rr,
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    // vectorized row batch reader
+    if (isVectorized) {
+      return rr;
+    } else if (sourceInputFormat instanceof BatchToRowInputFormat) {
+      return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
+          rr, rr.getVectorizedRowBatchCtx(), includedCols));
+    } else {
+      LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
+      return sourceInputFormat.getRecordReader(split, job, reporter);
+    }
+  }
+
+  public RecordReader<NullWritable, VectorizedRowBatch> checkLlapSplit(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    boolean useLlapIo = true;
+    if (split instanceof LlapAwareSplit) {
+      useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
+    }
+    if (useLlapIo) return null;
+
+    LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
+    return sourceInputFormat.getRecordReader(split, job, reporter);
+  }
+
   // Returning either a vectorized or non-vectorized reader from the same call requires breaking
   // generics... this is how vectorization currently works.
   @SuppressWarnings("unchecked")
@@ -160,276 +163,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
     return sourceInputFormat.getSplits(job, numSplits);
   }
 
-  private class LlapRecordReader
-      implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
-    private final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
-    private final FileSplit split;
-    private final List<Integer> columnIds;
-    private final SearchArgument sarg;
-    private final String[] columnNames;
-    private final VectorizedRowBatchCtx rbCtx;
-    private final Object[] partitionValues;
-
-    private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
-    private ColumnVectorBatch lastCvb = null;
-    private boolean isFirst = true;
-
-    private Throwable pendingError = null;
-    /** Vector that is currently being processed by our user. */
-    private boolean isDone = false;
-    private final boolean isClosed = false;
-    private final ConsumerFeedback<ColumnVectorBatch> feedback;
-    private final QueryFragmentCounters counters;
-    private long firstReturnTime;
-
-    private final JobConf jobConf;
-    private final TypeDescription fileSchema;
-    private final boolean[] includedColumns;
-    private final ReadPipeline rp;
-
-    public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols,
-        String hostName) throws IOException, HiveException {
-      this.jobConf = job;
-      this.split = split;
-      this.columnIds = includedCols;
-      this.sarg = ConvertAstToSearchArg.createFromConf(job);
-      this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
-      final String fragmentId = LlapTezUtils.getFragmentId(job);
-      final String dagId = LlapTezUtils.getDagId(job);
-      final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
-      MDC.put("dagId", dagId);
-      MDC.put("queryId", queryId);
-      TezCounters taskCounters = null;
-      if (fragmentId != null) {
-        MDC.put("fragmentId", fragmentId);
-        taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
-        LOG.info("Received fragment id: {}", fragmentId);
-      } else {
-        LOG.warn("Not using tez counters as fragment id string is null");
-      }
-      this.counters = new QueryFragmentCounters(job, taskCounters);
-      this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
-
-      MapWork mapWork = Utilities.getMapWork(job);
-      VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
-      rbCtx = ctx != null ? ctx : createFakeVrbCtx(mapWork);
-
-      int partitionColumnCount = rbCtx.getPartitionColumnCount();
-      if (partitionColumnCount > 0) {
-        partitionValues = new Object[partitionColumnCount];
-        VectorizedRowBatchCtx.getPartitionValues(rbCtx, job, split, partitionValues);
-      } else {
-        partitionValues = null;
-      }
-      boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
-      TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(job, isAcidScan, Integer.MAX_VALUE);
-
-      // Create the consumer of encoded data; it will coordinate decoding to CVBs.
-      rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters, schema);
-      feedback = rp;
-      fileSchema = rp.getFileSchema();
-      includedColumns = rp.getIncludedColumns();
-    }
-
-    /**
-     * Starts the data read pipeline
-     */
-    public boolean init() {
-      SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema,
-          rp.getReaderSchema(), includedColumns);
-      for (Integer colId : columnIds) {
-        if (!schemaEvolution.isPPDSafeConversion(colId)) {
-          LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split);
-          return false;
-        }
-      }
-
-      // perform the data read asynchronously
-      if (executor instanceof StatsRecordingThreadPool) {
-        // Every thread created by this thread pool will use the same handler
-        ((StatsRecordingThreadPool) executor)
-            .setUncaughtExceptionHandler(new IOUncaughtExceptionHandler());
-      }
-      executor.submit(rp.getReadCallable());
-      return true;
-    }
-
-    @Override
-    public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
-      assert value != null;
-      if (isClosed) {
-        throw new AssertionError("next called after close");
-      }
-      // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
-      boolean wasFirst = isFirst;
-      if (isFirst) {
-        if (partitionValues != null) {
-          rbCtx.addPartitionColsToBatch(value, partitionValues);
-        }
-        isFirst = false;
-      }
-      ColumnVectorBatch cvb = null;
-      try {
-        cvb = nextCvb();
-      } catch (InterruptedException e) {
-        // Query might have been canceled. Stop the background processing.
-        feedback.stop();
-        throw new IOException(e);
-      }
-      if (cvb == null) {
-        if (wasFirst) {
-          firstReturnTime = counters.startTimeCounter();
-        }
-        counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
-        return false;
-      }
-      if (columnIds.size() != cvb.cols.length) {
-        throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
-            + " included, but the reader returned " + cvb.cols.length);
-      }
-      // VRB was created from VrbCtx, so we already have pre-allocated column vectors
-      for (int i = 0; i < cvb.cols.length; ++i) {
-        // Return old CVs (if any) to caller. We assume these things all have the same schema.
-        cvb.swapColumnVector(i, value.cols, columnIds.get(i));
-      }
-      value.selectedInUse = false;
-      value.size = cvb.size;
-      if (wasFirst) {
-        firstReturnTime = counters.startTimeCounter();
-      }
-      return true;
-    }
-
-    public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
-      return rbCtx;
-    }
-
-    private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
-      @Override
-      public void uncaughtException(final Thread t, final Throwable e) {
-        LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
-            " Message: {}", t.getName(), t.getId(), e.getMessage());
-        setError(e);
-      }
-    }
-
-    ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
-      boolean isFirst = (lastCvb == null);
-      if (!isFirst) {
-        feedback.returnData(lastCvb);
-      }
-      synchronized (pendingData) {
-        // We are waiting for next block. Either we will get it, or be told we are done.
-        boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
-        if (doLogBlocking) {
-          LlapIoImpl.LOG.trace("next will block");
-        }
-        while (isNothingToReport()) {
-          pendingData.wait(100);
-        }
-        if (doLogBlocking) {
-          LlapIoImpl.LOG.trace("next is unblocked");
-        }
-        rethrowErrorIfAny();
-        lastCvb = pendingData.poll();
-      }
-      if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
-        LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
-      }
-      return lastCvb;
-    }
-
-    private boolean isNothingToReport() {
-      return !isDone && pendingData.isEmpty() && pendingError == null;
-    }
-
-    @Override
-    public NullWritable createKey() {
-      return NullWritable.get();
-    }
-
-    @Override
-    public VectorizedRowBatch createValue() {
-      return rbCtx.createVectorizedRowBatch();
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (LlapIoImpl.LOG.isTraceEnabled()) {
-        LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
-            isClosed, isDone, pendingError, pendingData.size());
-      }
-      LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
-      feedback.stop();
-      rethrowErrorIfAny();
-      MDC.clear();
-    }
-
-    private void rethrowErrorIfAny() throws IOException {
-      if (pendingError == null) return;
-      if (pendingError instanceof IOException) {
-        throw (IOException)pendingError;
-      }
-      throw new IOException(pendingError);
-    }
-
-    @Override
-    public void setDone() {
-      if (LlapIoImpl.LOG.isTraceEnabled()) {
-        LlapIoImpl.LOG.trace("setDone called; closed {}, done {}, err {}, pending {}",
-            isClosed, isDone, pendingError, pendingData.size());
-      }
-      synchronized (pendingData) {
-        isDone = true;
-        pendingData.notifyAll();
-      }
-    }
-
-    @Override
-    public void consumeData(ColumnVectorBatch data) {
-      if (LlapIoImpl.LOG.isTraceEnabled()) {
-        LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}",
-            isClosed, isDone, pendingError, pendingData.size());
-      }
-      synchronized (pendingData) {
-        if (isClosed) {
-          return;
-        }
-        pendingData.add(data);
-        pendingData.notifyAll();
-      }
-    }
-
-    @Override
-    public void setError(Throwable t) {
-      counters.incrCounter(LlapIOCounters.NUM_ERRORS);
-      LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
-          isClosed, isDone, pendingError, pendingData.size());
-      assert t != null;
-      synchronized (pendingData) {
-        pendingError = t;
-        pendingData.notifyAll();
-      }
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-      // TODO: plumb progress info thru the reader if we can get metadata from loader first.
-      return 0.0f;
-    }
-  }
-
   @Override
   public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
     return sourceASC == null ? false : sourceASC.shouldSkipCombine(path, conf);
   }
 
-  private static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
+  static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
     // This is based on Vectorizer code, minus the validation.
 
     // Add all non-virtual columns from the TableScan operator.
@@ -477,5 +216,4 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
     }
     return tableScanOperator;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 8048624..7cfd133 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -47,16 +47,20 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
 import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
 import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
 import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -74,7 +78,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
 
   private static final String MODE_CACHE = "cache";
 
-  private final ColumnVectorProducer cvp;
+  // TODO: later, we may have a map
+  private final ColumnVectorProducer orcCvp, genericCvp;
   private final ExecutorService executor;
   private final LlapDaemonCacheMetrics cacheMetrics;
   private final LlapDaemonIOMetrics ioMetrics;
@@ -110,6 +115,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
 
     OrcMetadataCache metadataCache = null;
     LowLevelCache cache = null;
+    SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed
     BufferUsageManager bufferManager = null;
     if (useLowLevelCache) {
       // Memory manager uses cache policy to trigger evictions, so create the policy first.
@@ -124,11 +130,15 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       this.allocator = allocator;
       LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
           cacheMetrics, cachePolicy, allocator, true);
+      SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
+          cacheMetrics, cachePolicy, allocator);
       cache = cacheImpl;
+      serdeCache = serdeCacheImpl;
       boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
       metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
       // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
-      cachePolicy.setEvictionListener(new EvictionDispatcher(cache, metadataCache));
+      cachePolicy.setEvictionListener(new EvictionDispatcher(
+          cache, serdeCacheImpl, metadataCache, allocator));
       cachePolicy.setParentDebugDumper(cacheImpl);
       cacheImpl.startThreads(); // Start the cache threads.
       bufferManager = cacheImpl; // Cache also serves as buffer manager.
@@ -145,8 +155,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
         new LinkedBlockingQueue<Runnable>(),
         new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
     // TODO: this should depends on input format and be in a map, or something.
-    this.cvp = new OrcColumnVectorProducer(
+    this.orcCvp = new OrcColumnVectorProducer(
         metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics);
+    this.genericCvp = new GenericColumnVectorProducer(
+        serdeCache, bufferManager, conf, cacheMetrics, ioMetrics);
     LOG.info("LLAP IO initialized");
 
     registerMXBeans();
@@ -159,8 +171,12 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
   @SuppressWarnings("rawtypes")
   @Override
   public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
-      InputFormat sourceInputFormat) {
-    return new LlapInputFormat(sourceInputFormat, cvp, executor);
+      InputFormat sourceInputFormat, Deserializer sourceSerDe) {
+    ColumnVectorProducer cvp = genericCvp;
+    if (sourceInputFormat instanceof OrcInputFormat) {
+      cvp = orcCvp; // Special-case for ORC.
+    }
+    return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor);
   }
 
   @Override