You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/01/06 21:30:52 UTC

[1/2] hive git commit: HIVE-12597 : LLAP - allow using elevator without cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/branch-2.0 410b0dd3d -> 3133ac5a3
  refs/heads/master 20c549908 -> 8069b59a0


HIVE-12597 : LLAP - allow using elevator without cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8069b59a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8069b59a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8069b59a

Branch: refs/heads/master
Commit: 8069b59a0580c53dbcad0acffadb659104651f0d
Parents: 20c5499
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Jan 6 12:29:55 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Jan 6 12:29:55 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  28 +++--
 data/conf/hive-site.xml                         |   2 +-
 data/conf/llap/hive-site.xml                    |   2 +-
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |  10 +-
 .../hive/llap/cache/BufferUsageManager.java     |  39 +++++++
 .../hadoop/hive/llap/cache/LowLevelCache.java   |  14 ---
 .../hive/llap/cache/LowLevelCacheImpl.java      |   8 +-
 .../llap/cache/LowLevelCacheMemoryManager.java  |   8 +-
 .../llap/cache/LowLevelLrfuCachePolicy.java     |   4 +-
 .../hadoop/hive/llap/cache/SimpleAllocator.java | 116 +++++++++++++++++++
 .../hive/llap/cache/SimpleBufferManager.java    |  78 +++++++++++++
 .../hadoop/hive/llap/cli/LlapServiceDriver.java |  12 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       |  12 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |  59 ++++++----
 .../llap/io/decode/OrcColumnVectorProducer.java |  11 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |  44 ++++---
 .../hive/llap/cache/TestBuddyAllocator.java     |   8 +-
 .../hive/llap/cache/TestLowLevelCacheImpl.java  |   6 +-
 .../llap/cache/TestLowLevelLrfuCachePolicy.java |   4 +-
 19 files changed, 358 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ffe0d9a..479fa46 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2337,21 +2337,25 @@ public class HiveConf extends Configuration {
         true,
         "Updates tez job execution progress in-place in the terminal."),
     LLAP_IO_ENABLED("hive.llap.io.enabled", false, "Whether the LLAP IO layer is enabled."),
-    LLAP_LOW_LEVEL_CACHE("hive.llap.io.use.lowlevel.cache", true, "Must always be true for now"),
-    LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.io.cache.orc.alloc.min", 128 * 1024,
-        "Minimum allocation possible from LLAP low-level cache for ORC. Allocations below that\n" +
-        "will be padded to minimum allocation. Should generally be the same as expected ORC\n" +
-        "compression buffer size, or next lowest power of 2. Must be power of 2."),
-    LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.io.cache.orc.alloc.max", 16 * 1024 * 1024,
-        "Maximum allocation possible from LLAP low-level cache for ORC. Should be as large as\n" +
-        "the largest expected ORC compression buffer size. Must be power of 2."),
-    LLAP_ORC_CACHE_ARENA_COUNT("hive.llap.io.cache.orc.arena.count", 8,
+    LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache",
+        new StringSet("cache", "allocator", "none"),
+        "LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
+        "custom off-heap allocator, 'allocator' uses the custom allocator without the caches,\n" +
+        "'none' doesn't use either (this mode may result in significant performance degradation)"),
+    LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", 128 * 1024,
+        "Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
+        "padded to minimum allocation. For ORC, should generally be the same as the expected\n" +
+        "compression buffer size, or next lowest power of 2. Must be a power of 2."),
+    LLAP_ALLOCATOR_MAX_ALLOC("hive.llap.io.allocator.alloc.max", 16 * 1024 * 1024,
+        "Maximum allocation possible from LLAP buddy allocator. For ORC, should be as large as\n" +
+        "the largest expected ORC compression buffer size. Must be a power of 2."),
+    LLAP_ALLOCATOR_ARENA_COUNT("hive.llap.io.allocator.arena.count", 8,
         "Arena count for LLAP low-level cache; cache will be allocated in the steps of\n" +
         "(size/arena_count) bytes. This size must be <= 1Gb and >= max allocation; if it is\n" +
         "not the case, an adjusted size will be used. Using powers of 2 is recommended."),
-    LLAP_ORC_CACHE_MAX_SIZE("hive.llap.io.cache.orc.size", 1024L * 1024 * 1024,
-        "Maximum size for ORC low-level cache; must be a multiple of arena size."),
-    LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true,
+    LLAP_IO_MEMORY_MAX_SIZE("hive.llap.io.memory.size", 1024L * 1024 * 1024,
+        "Maximum size for IO allocator or ORC low-level cache.", "hive.llap.io.cache.orc.size"),
+    LLAP_ALLOCATOR_DIRECT("hive.llap.io.allocator.direct", true,
         "Whether ORC low-level cache should use direct allocation."),
     LLAP_USE_LRFU("hive.llap.io.use.lrfu", false,
         "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/data/conf/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
index 44f75bf..cbb5546 100644
--- a/data/conf/hive-site.xml
+++ b/data/conf/hive-site.xml
@@ -293,7 +293,7 @@
 
 
 <property>
-  <name>hive.llap.io.cache.direct</name>
+  <name>hive.llap.io.allocator.direct</name>
   <value>false</value>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/data/conf/llap/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index f768601..c2bef58 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -264,7 +264,7 @@
 </property>
 
 <property>
-  <name>hive.llap.io.cache.direct</name>
+  <name>hive.llap.io.allocator.direct</name>
   <value>false</value>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 0c96efa..ab4df5d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -46,11 +46,11 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
   private static final int MAX_ARENA_SIZE = 1024*1024*1024;
   public BuddyAllocator(Configuration conf, MemoryManager memoryManager,
       LlapDaemonCacheMetrics metrics) {
-    isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
-    minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
-    maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
-    int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_COUNT);
-    long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+    isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT);
+    minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+    maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC);
+    int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT);
+    long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
     int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount);
     arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
     if (LlapIoImpl.LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
new file mode 100644
index 0000000..a441dc4
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+/**
+ * The buffer usage (refcount) tracker. Buffer usage can either be tracked by a cache, which
+ * will handle the coordination with eviction, or by a simple refcount manager to facilitate
+ * sharing when the buffer contains data for multiple readers.
+ */
+public interface BufferUsageManager {
+  Allocator getAllocator();
+
+  void decRefBuffer(MemoryBuffer buffer);
+
+  void decRefBuffers(List<MemoryBuffer> buffers);
+
+  boolean incRefBuffer(MemoryBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index b17edb5..17d9fdf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import java.util.List;
-
-import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
@@ -62,15 +59,4 @@ public interface LowLevelCache {
    */
   long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] chunks,
       long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
-
-  Allocator getAllocator();
-
-  /**
-   * Releases the buffer returned by getFileData.
-   */
-  void releaseBuffer(MemoryBuffer buffer);
-
-  void releaseBuffers(List<MemoryBuffer> cacheBuffers);
-
-  boolean reuseBuffer(MemoryBuffer buffer);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index c2a130a..1132171 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
+public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
   private static final int DEFAULT_CLEANUP_INTERVAL = 600;
   private final EvictionAwareAllocator allocator;
   private final AtomicInteger newEvictions = new AtomicInteger(0);
@@ -334,12 +334,12 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
   }
 
   @Override
-  public void releaseBuffer(MemoryBuffer buffer) {
+  public void decRefBuffer(MemoryBuffer buffer) {
     unlockBuffer((LlapDataBuffer)buffer, true);
   }
 
   @Override
-  public void releaseBuffers(List<MemoryBuffer> cacheBuffers) {
+  public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
     for (MemoryBuffer b : cacheBuffers) {
       unlockBuffer((LlapDataBuffer)b, true);
     }
@@ -502,7 +502,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
   }
 
   @Override
-  public boolean reuseBuffer(MemoryBuffer buffer) {
+  public boolean incRefBuffer(MemoryBuffer buffer) {
     // notifyReused implies that buffer is already locked; it's also called once for new
     // buffers that are not cached yet. Don't notify cache policy.
     return lockBuffer(((LlapDataBuffer)buffer), false);

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 8788e15..992da8e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -39,13 +39,14 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
 
   public LowLevelCacheMemoryManager(Configuration conf, LowLevelCachePolicy evictor,
       LlapDaemonCacheMetrics metrics) {
-    this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+    this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
     this.evictor = evictor;
     this.usedMemory = new AtomicLong(0);
     this.metrics = metrics;
     metrics.setCacheCapacityTotal(maxSize);
     if (LlapIoImpl.LOGL.isInfoEnabled()) {
-      LlapIoImpl.LOG.info("Cache memory manager initialized with max size " + maxSize);
+      LlapIoImpl.LOG.info("Memory manager initialized with max size " + maxSize + " and "
+          + ((evictor == null) ? "no " : "") + "ability to evict blocks");
     }
   }
 
@@ -65,6 +66,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
         }
         continue;
       }
+      if (evictor == null) return false;
       // TODO: for one-block case, we could move notification for the last block out of the loop.
       long evicted = evictor.evictSomeBlocks(remainingToReserve);
       if (evicted == 0) {
@@ -107,6 +109,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
 
   @Override
   public void forceReservedMemory(int memoryToEvict) {
+    if (evictor == null) return;
     while (memoryToEvict > 0) {
       long evicted = evictor.evictSomeBlocks(memoryToEvict);
       if (evicted == 0) return;
@@ -126,6 +129,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
 
   @Override
   public String debugDumpForOom() {
+    if (evictor == null) return null;
     return "cache state\n" + evictor.debugDumpForOom();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 40cb92d..84910d7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -67,8 +67,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   private LlapOomDebugDump parentDebugDump;
 
   public LowLevelLrfuCachePolicy(Configuration conf) {
-    long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
-    int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+    long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+    int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
     lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
     int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
     int maxHeapSize = -1;

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
new file mode 100644
index 0000000..526ff22
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+import sun.misc.Cleaner;
+
+public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
+  private final boolean isDirect;
+  private static Field cleanerField;
+  static {
+    ByteBuffer tmp = ByteBuffer.allocateDirect(1);
+    try {
+      cleanerField = tmp.getClass().getDeclaredField("cleaner");
+      cleanerField.setAccessible(true);
+    } catch (Throwable t) {
+      LlapIoImpl.LOG.warn("Cannot initialize DirectByteBuffer cleaner", t);
+      cleanerField = null;
+    }
+  }
+
+  public SimpleAllocator(Configuration conf) {
+    isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
+    if (LlapIoImpl.LOG.isInfoEnabled()) {
+      LlapIoImpl.LOG.info("Simple allocator with " + (isDirect ? "direct" : "byte") + " buffers");
+    }
+  }
+
+  @Override
+  public void allocateMultiple(MemoryBuffer[] dest, int size) {
+    for (int i = 0; i < dest.length; ++i) {
+      LlapDataBuffer buf = null;
+      if (dest[i] == null) {
+        dest[i] = buf = createUnallocated();
+      } else {
+        buf = (LlapDataBuffer)dest[i];
+      }
+      ByteBuffer bb = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+      buf.initialize(0, bb, 0, size);
+    }
+  }
+
+  @Override
+  public void deallocate(MemoryBuffer buffer) {
+    LlapDataBuffer buf = (LlapDataBuffer)buffer;
+    ByteBuffer bb = buf.byteBuffer;
+    buf.byteBuffer = null;
+    Field field = cleanerField;
+    if (field == null) return;
+    try {
+      ((Cleaner)field.get(bb)).clean();
+    } catch (Throwable t) {
+      LlapIoImpl.LOG.warn("Error using DirectByteBuffer cleaner; stopping its use", t);
+      cleanerField = null;
+    }
+  }
+
+  @Override
+  public boolean isDirectAlloc() {
+    return isDirect;
+  }
+
+  @Override
+  public LlapDataBuffer createUnallocated() {
+    return new LlapDataBuffer();
+  }
+
+  // BuddyAllocatorMXBean
+  @Override
+  public boolean getIsDirect() {
+    return isDirect;
+  }
+
+  @Override
+  public int getMinAllocation() {
+    return 0;
+  }
+
+  @Override
+  public int getMaxAllocation() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public int getArenaSize() {
+    return -1;
+  }
+
+  @Override
+  public long getMaxCacheSize() {
+    return Integer.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
new file mode 100644
index 0000000..734a5c0
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+
+public class SimpleBufferManager implements BufferUsageManager {
+  private final Allocator allocator;
+  private final LlapDaemonCacheMetrics metrics;
+
+  public SimpleBufferManager(Allocator allocator, LlapDaemonCacheMetrics metrics) {
+    if (LlapIoImpl.LOGL.isInfoEnabled()) {
+      LlapIoImpl.LOG.info("Simple buffer manager");
+    }
+    this.allocator = allocator;
+    this.metrics = metrics;
+  }
+
+  private boolean lockBuffer(LlapDataBuffer buffer) {
+    int rc = buffer.incRef();
+    if (rc <= 0) return false;
+    metrics.incrCacheNumLockedBuffers();
+    return true;
+  }
+
+  private void unlockBuffer(LlapDataBuffer buffer) {
+    if (buffer.decRef() == 0) {
+      if (DebugUtils.isTraceCachingEnabled()) {
+        LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached");
+      }
+      allocator.deallocate(buffer);
+    }
+    metrics.decrCacheNumLockedBuffers();
+  }
+
+  @Override
+  public void decRefBuffer(MemoryBuffer buffer) {
+    unlockBuffer((LlapDataBuffer)buffer);
+  }
+
+  @Override
+  public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+    for (MemoryBuffer b : cacheBuffers) {
+      unlockBuffer((LlapDataBuffer)b);
+    }
+  }
+
+  @Override
+  public boolean incRefBuffer(MemoryBuffer buffer) {
+    return lockBuffer((LlapDataBuffer)buffer);
+  }
+
+  @Override
+  public Allocator getAllocator() {
+    return allocator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 8e5377f..4b330f4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -145,7 +145,7 @@ public class LlapServiceDriver {
         Preconditions.checkArgument(options.getXmx() < options.getSize(),
             "Working memory has to be smaller than the container sizing");
       }
-      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT)) {
+      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)) {
         Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
             "Working memory + cache has to be smaller than the containing sizing ");
       }
@@ -165,7 +165,7 @@ public class LlapServiceDriver {
     }
 
     if (options.getCache() != -1) {
-      conf.setLong(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, options.getCache());
+      conf.setLong(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, options.getCache());
     }
 
     if (options.getXmx() != -1) {
@@ -278,11 +278,11 @@ public class LlapServiceDriver {
     configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf,
         ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
 
-    configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname,
-        HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
+    configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
+        HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
 
-    configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname,
-        HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT));
+    configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
+        HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
 
     configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf,
         ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 467ab71..b3057c3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -290,14 +290,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
           .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
       long executorMemoryBytes = HiveConf.getIntVar(
           daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
-      long cacheMemoryBytes =
-          HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
-      boolean isDirectCache =
-          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
+
+      long ioMemoryBytes = HiveConf.getLongVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+      boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
       boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED);
-      llapDaemon =
-          new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, isDirectCache,
-              cacheMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
+      llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled,
+              isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
 
       LOG.info("Adding shutdown hook for LlapDaemon");
       ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index b38f472..d2c1907 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -28,8 +28,10 @@ import javax.management.ObjectName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
@@ -38,7 +40,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
 import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
 import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
-import org.apache.hadoop.hive.llap.cache.NoopCache;
+import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
+import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
@@ -59,19 +62,21 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
   public static final Logger LOG = LoggerFactory.getLogger(LlapIoImpl.class);
   public static final LogLevels LOGL = new LogLevels(LOG);
+  private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator";
 
   private final ColumnVectorProducer cvp;
   private final ListeningExecutorService executor;
   private LlapDaemonCacheMetrics cacheMetrics;
   private LlapDaemonQueueMetrics queueMetrics;
   private ObjectName buddyAllocatorMXBean;
-  private EvictionAwareAllocator allocator;
+  private Allocator allocator;
 
   private LlapIoImpl(Configuration conf) throws IOException {
-    boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
-    // High-level cache not supported yet.
+    String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
+    boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode),
+        useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode);
     if (LOGL.isInfoEnabled()) {
-      LOG.info("Initializing LLAP IO" + (useLowLevelCache ? " with low level cache" : ""));
+      LOG.info("Initializing LLAP IO in " + ioMode + " mode");
     }
 
     String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName();
@@ -86,35 +91,47 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     LOG.info("Started llap daemon metrics with displayName: " + displayName +
         " sessionId: " + sessionId);
 
-    Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
-    boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
-    LowLevelCachePolicy cachePolicy =
-        useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
-    LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
-        conf, cachePolicy, cacheMetrics);
-    // Memory manager uses cache policy to trigger evictions.
-    OrcMetadataCache metadataCache = new OrcMetadataCache(memManager, cachePolicy);
+    Cache<OrcCacheKey> cache = null; // High-level cache is not implemented or supported.
+
+    OrcMetadataCache metadataCache = null;
     LowLevelCacheImpl orcCache = null;
+    BufferUsageManager bufferManager = null;
     if (useLowLevelCache) {
-      // Allocator uses memory manager to request memory.
-      allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
-      // Cache uses allocator to allocate and deallocate.
+      // Memory manager uses cache policy to trigger evictions, so create the policy first.
+      boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
+      LowLevelCachePolicy cachePolicy =
+          useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
+      // Allocator uses memory manager to request memory, so create the manager next.
+      LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+          conf, cachePolicy, cacheMetrics);
+      // Cache uses allocator to allocate and deallocate, create allocator and then caches.
+      EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+      this.allocator = allocator;
       orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
+      metadataCache = new OrcMetadataCache(memManager, cachePolicy);
       // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
       cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
       cachePolicy.setParentDebugDumper(orcCache);
-      orcCache.init();
+      orcCache.init(); // Start the cache threads.
+      bufferManager = orcCache; // Cache also serves as buffer manager.
     } else {
-      cachePolicy.setEvictionListener(new EvictionDispatcher(null, metadataCache));
+      if (useAllocOnly) {
+        LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+            conf, null, cacheMetrics);
+        allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+      } else {
+        allocator = new SimpleAllocator(conf);
+      }
+      bufferManager = new SimpleBufferManager(allocator, cacheMetrics);
     }
-    // Arbitrary thread pool. Listening is used for unhandled errors for now (TODO: remove?)
+    // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
     int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
     executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads,
         new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()));
 
     // TODO: this should depends on input format and be in a map, or something.
-    this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf, cacheMetrics,
-        queueMetrics);
+    this.cvp = new OrcColumnVectorProducer(
+        metadataCache, orcCache, bufferManager, cache, conf, cacheMetrics, queueMetrics);
     if (LOGL.isInfoEnabled()) {
       LOG.info("LLAP IO initialized");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index e156eaa..18191da 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
@@ -42,20 +43,22 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
   private final OrcMetadataCache metadataCache;
   private final Cache<OrcCacheKey> cache;
   private final LowLevelCache lowLevelCache;
+  private final BufferUsageManager bufferManager;
   private final Configuration conf;
   private boolean _skipCorrupt; // TODO: get rid of this
   private LlapDaemonCacheMetrics cacheMetrics;
   private LlapDaemonQueueMetrics queueMetrics;
 
   public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
-      LowLevelCacheImpl lowLevelCache, Cache<OrcCacheKey> cache, Configuration conf,
-      LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
+      LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, Cache<OrcCacheKey> cache,
+      Configuration conf, LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
     if (LlapIoImpl.LOGL.isInfoEnabled()) {
       LlapIoImpl.LOG.info("Initializing ORC column vector producer");
     }
 
     this.metadataCache = metadataCache;
     this.lowLevelCache = lowLevelCache;
+    this.bufferManager = bufferManager;
     this.cache = cache;
     this.conf = conf;
     this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
@@ -71,8 +74,8 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
     cacheMetrics.incrCacheReadRequests();
     OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
         _skipCorrupt, counters, queueMetrics);
-    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, cache, metadataCache,
-        conf, split, columnIds, sarg, columnNames, edc, counters);
+    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, cache,
+        metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters);
     edc.init(reader, reader);
     return edc;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 58d2ac8..3ddfc29 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
@@ -133,6 +134,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
 
   private final OrcMetadataCache metadataCache;
   private final LowLevelCache lowLevelCache;
+  private final BufferUsageManager bufferManager;
   private final Configuration conf;
   private final Cache<OrcCacheKey> cache;
   private final FileSplit split;
@@ -160,12 +162,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   @SuppressWarnings("unused")
   private volatile boolean isPaused = false;
 
-  public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
-      OrcMetadataCache metadataCache, Configuration conf, FileSplit split,
-      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+  public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
+      Cache<OrcCacheKey> cache, OrcMetadataCache metadataCache, Configuration conf,
+      FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
       OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
     this.lowLevelCache = lowLevelCache;
     this.metadataCache = metadataCache;
+    this.bufferManager = bufferManager;
     this.cache = cache;
     this.conf = conf;
     this.split = split;
@@ -382,7 +385,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         if (stripeMetadatas != null) {
           stripeMetadata = stripeMetadatas.get(stripeIxMod);
         } else {
-          if (hasFileId) {
+          if (hasFileId && metadataCache != null) {
             stripeKey.stripeIx = stripeIx;
             stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
           }
@@ -394,7 +397,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
             stripeMetadata = new OrcStripeMetadata(
                 stripeKey, metadataReader, stripe, stripeIncludes, sargColumns);
             counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs);
-            if (hasFileId) {
+            if (hasFileId && metadataCache != null) {
               stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
               if (DebugUtils.isTraceOrcEnabled()) {
                 LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -510,11 +513,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private void validateFileMetadata() throws IOException {
     if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
     int bufferSize = fileMetadata.getCompressionBufferSize();
-    int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+    int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
     if (bufferSize < minAllocSize) {
       LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
             + "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
-            + HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.toString() + " to avoid wasting memory");
+            + HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.toString() + " to avoid wasting memory");
     }
   }
 
@@ -617,18 +620,20 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
    */
   private OrcFileMetadata getOrReadFileMetadata() throws IOException {
     OrcFileMetadata metadata = null;
-    if (fileId != null) {
+    if (fileId != null && metadataCache != null) {
       metadata = metadataCache.getFileMetadata(fileId);
       if (metadata != null) {
         counters.incrCounter(Counter.METADATA_CACHE_HIT);
         return metadata;
+      } else {
+        counters.incrCounter(Counter.METADATA_CACHE_MISS);
       }
-      counters.incrCounter(Counter.METADATA_CACHE_MISS);
     }
     ensureOrcReader();
     // We assume this call doesn't touch HDFS because everything is already read; don't add time.
     metadata = new OrcFileMetadata(fileId != null ? fileId : 0, orcReader);
-    return (fileId == null) ? metadata : metadataCache.putFileMetadata(metadata);
+    if (fileId == null || metadataCache == null) return metadata;
+    return metadataCache.putFileMetadata(metadata);
   }
 
   /**
@@ -643,7 +648,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
       OrcStripeMetadata value = null;
       int stripeIx = stripeIxMod + stripeIxFrom;
-      if (hasFileId) {
+      if (hasFileId && metadataCache != null) {
         stripeKey.stripeIx = stripeIx;
         value = metadataCache.getStripeMetadata(stripeKey);
       }
@@ -655,7 +660,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
           long startTime = counters.startTimeCounter();
           value = new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns);
           counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
-          if (hasFileId) {
+          if (hasFileId && metadataCache != null) {
             value = metadataCache.putStripeMetadata(value);
             if (DebugUtils.isTraceOrcEnabled()) {
               LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -701,7 +706,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
             LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
           }
         }
-        lowLevelCache.releaseBuffers(data.getCacheBuffers());
+        bufferManager.decRefBuffers(data.getCacheBuffers());
         CSD_POOL.offer(data);
       }
     }
@@ -939,7 +944,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
 
     public DataWrapperForOrc() {
       boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
-      if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) {
+      if (useZeroCopy && !getAllocator().isDirectAlloc()) {
         throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
             + "buffers; either disable zero-copy or enable direct cache allocation");
       }
@@ -949,30 +954,31 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     @Override
     public DiskRangeList getFileData(long fileId, DiskRangeList range,
         long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
-      return lowLevelCache.getFileData(fileId, range, baseOffset, factory, counters, gotAllData);
+      return (lowLevelCache == null) ? range : lowLevelCache.getFileData(
+          fileId, range, baseOffset, factory, counters, gotAllData);
     }
 
     @Override
     public long[] putFileData(long fileId, DiskRange[] ranges,
         MemoryBuffer[] data, long baseOffset) {
-      return lowLevelCache.putFileData(
+      return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
           fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
     }
 
     @Override
     public void releaseBuffer(MemoryBuffer buffer) {
-      lowLevelCache.releaseBuffer(buffer);
+      bufferManager.decRefBuffer(buffer);
     }
 
     @Override
     public void reuseBuffer(MemoryBuffer buffer) {
-      boolean isReused = lowLevelCache.reuseBuffer(buffer);
+      boolean isReused = bufferManager.incRefBuffer(buffer);
       assert isReused;
     }
 
     @Override
     public Allocator getAllocator() {
-      return lowLevelCache.getAllocator();
+      return bufferManager.getAllocator();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 6375996..fc014a7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -282,10 +282,10 @@ public class TestBuddyAllocator {
 
   private Configuration createConf(int min, int max, int arena, int total) {
     Configuration conf = new Configuration();
-    conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
-    conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max);
-    conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_COUNT.varname, total/arena);
-    conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total);
+    conf.setInt(ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, min);
+    conf.setInt(ConfVars.LLAP_ALLOCATOR_MAX_ALLOC.varname, max);
+    conf.setInt(ConfVars.LLAP_ALLOCATOR_ARENA_COUNT.varname, total/arena);
+    conf.setLong(ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, total);
     return conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 8324b21..0846db9 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -348,7 +348,7 @@ public class TestLowLevelCacheImpl {
                 ++gets;
                 LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).getBuffer();
                 assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
-                cache.releaseBuffer(result);
+                cache.decRefBuffer(result);
                 iter = iter.next;
               }
             } else {
@@ -378,7 +378,7 @@ public class TestLowLevelCacheImpl {
                   assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex);
                 }
                 maskVal >>= 1;
-                cache.releaseBuffer(buf);
+                cache.decRefBuffer(buf);
               }
             }
           }
@@ -411,7 +411,7 @@ public class TestLowLevelCacheImpl {
             DiskRange r = results[index];
             if (r instanceof CacheChunk) {
               LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).getBuffer();
-              cache.releaseBuffer(result);
+              cache.decRefBuffer(result);
               if (victim == null && result.invalidate()) {
                 ++evictions;
                 victim = result;

http://git-wip-us.apache.org/repos/asf/hive/blob/8069b59a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index d0abfa3..46e9547 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -145,8 +145,8 @@ public class TestLowLevelLrfuCachePolicy {
 
   private Configuration createConf(int min, int heapSize, Double lambda) {
     Configuration conf = new Configuration();
-    conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
-    conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, heapSize);
+    conf.setInt(HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, min);
+    conf.setInt(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, heapSize);
     if (lambda != null) {
       conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, lambda.doubleValue());
     }


[2/2] hive git commit: HIVE-12597 : LLAP - allow using elevator without cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Posted by se...@apache.org.
HIVE-12597 : LLAP - allow using elevator without cache (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3133ac5a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3133ac5a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3133ac5a

Branch: refs/heads/branch-2.0
Commit: 3133ac5a37d7a558278284bc031924b0cb98d51e
Parents: 410b0dd
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Jan 6 12:29:55 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Jan 6 12:30:16 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  28 +++--
 data/conf/hive-site.xml                         |   2 +-
 data/conf/llap/hive-site.xml                    |   2 +-
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |  10 +-
 .../hive/llap/cache/BufferUsageManager.java     |  39 +++++++
 .../hadoop/hive/llap/cache/LowLevelCache.java   |  14 ---
 .../hive/llap/cache/LowLevelCacheImpl.java      |   8 +-
 .../llap/cache/LowLevelCacheMemoryManager.java  |   8 +-
 .../llap/cache/LowLevelLrfuCachePolicy.java     |   4 +-
 .../hadoop/hive/llap/cache/SimpleAllocator.java | 116 +++++++++++++++++++
 .../hive/llap/cache/SimpleBufferManager.java    |  78 +++++++++++++
 .../hadoop/hive/llap/cli/LlapServiceDriver.java |  12 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       |  12 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |  59 ++++++----
 .../llap/io/decode/OrcColumnVectorProducer.java |  11 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |  44 ++++---
 .../hive/llap/cache/TestBuddyAllocator.java     |   8 +-
 .../hive/llap/cache/TestLowLevelCacheImpl.java  |   6 +-
 .../llap/cache/TestLowLevelLrfuCachePolicy.java |   4 +-
 19 files changed, 358 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a831d09..caf7842 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2325,21 +2325,25 @@ public class HiveConf extends Configuration {
         true,
         "Updates tez job execution progress in-place in the terminal."),
     LLAP_IO_ENABLED("hive.llap.io.enabled", false, "Whether the LLAP IO layer is enabled."),
-    LLAP_LOW_LEVEL_CACHE("hive.llap.io.use.lowlevel.cache", true, "Must always be true for now"),
-    LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.io.cache.orc.alloc.min", 128 * 1024,
-        "Minimum allocation possible from LLAP low-level cache for ORC. Allocations below that\n" +
-        "will be padded to minimum allocation. Should generally be the same as expected ORC\n" +
-        "compression buffer size, or next lowest power of 2. Must be power of 2."),
-    LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.io.cache.orc.alloc.max", 16 * 1024 * 1024,
-        "Maximum allocation possible from LLAP low-level cache for ORC. Should be as large as\n" +
-        "the largest expected ORC compression buffer size. Must be power of 2."),
-    LLAP_ORC_CACHE_ARENA_COUNT("hive.llap.io.cache.orc.arena.count", 8,
+    LLAP_IO_MEMORY_MODE("hive.llap.io.memory.mode", "cache",
+        new StringSet("cache", "allocator", "none"),
+        "LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
+        "custom off-heap allocator, 'allocator' uses the custom allocator without the caches,\n" +
+        "'none' doesn't use either (this mode may result in significant performance degradation)"),
+    LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", 128 * 1024,
+        "Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
+        "padded to minimum allocation. For ORC, should generally be the same as the expected\n" +
+        "compression buffer size, or next lowest power of 2. Must be a power of 2."),
+    LLAP_ALLOCATOR_MAX_ALLOC("hive.llap.io.allocator.alloc.max", 16 * 1024 * 1024,
+        "Maximum allocation possible from LLAP buddy allocator. For ORC, should be as large as\n" +
+        "the largest expected ORC compression buffer size. Must be a power of 2."),
+    LLAP_ALLOCATOR_ARENA_COUNT("hive.llap.io.allocator.arena.count", 8,
         "Arena count for LLAP low-level cache; cache will be allocated in the steps of\n" +
         "(size/arena_count) bytes. This size must be <= 1Gb and >= max allocation; if it is\n" +
         "not the case, an adjusted size will be used. Using powers of 2 is recommended."),
-    LLAP_ORC_CACHE_MAX_SIZE("hive.llap.io.cache.orc.size", 1024L * 1024 * 1024,
-        "Maximum size for ORC low-level cache; must be a multiple of arena size."),
-    LLAP_ORC_CACHE_ALLOCATE_DIRECT("hive.llap.io.cache.direct", true,
+    LLAP_IO_MEMORY_MAX_SIZE("hive.llap.io.memory.size", 1024L * 1024 * 1024,
+        "Maximum size for IO allocator or ORC low-level cache.", "hive.llap.io.cache.orc.size"),
+    LLAP_ALLOCATOR_DIRECT("hive.llap.io.allocator.direct", true,
         "Whether ORC low-level cache should use direct allocation."),
     LLAP_USE_LRFU("hive.llap.io.use.lrfu", false,
         "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/data/conf/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
index 5a790e7..d5e950d 100644
--- a/data/conf/hive-site.xml
+++ b/data/conf/hive-site.xml
@@ -276,7 +276,7 @@
 
 
 <property>
-  <name>hive.llap.io.cache.direct</name>
+  <name>hive.llap.io.allocator.direct</name>
   <value>false</value>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/data/conf/llap/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml
index f768601..c2bef58 100644
--- a/data/conf/llap/hive-site.xml
+++ b/data/conf/llap/hive-site.xml
@@ -264,7 +264,7 @@
 </property>
 
 <property>
-  <name>hive.llap.io.cache.direct</name>
+  <name>hive.llap.io.allocator.direct</name>
   <value>false</value>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 0c96efa..ab4df5d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -46,11 +46,11 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
   private static final int MAX_ARENA_SIZE = 1024*1024*1024;
   public BuddyAllocator(Configuration conf, MemoryManager memoryManager,
       LlapDaemonCacheMetrics metrics) {
-    isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
-    minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
-    maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
-    int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_COUNT);
-    long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+    isDirect = HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT);
+    minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+    maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC);
+    int arenaCount = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT);
+    long maxSizeVal = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
     int arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : (int)(maxSizeVal / arenaCount);
     arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
     if (LlapIoImpl.LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
new file mode 100644
index 0000000..a441dc4
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BufferUsageManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+
+/**
+ * The buffer usage (refcount) tracker. Buffer usage can either be tracked by a cache, which
+ * will handle the coordination with eviction, or by a simple refcount manager to facilitate
+ * sharing when the buffer contains data for multiple readers.
+ */
+public interface BufferUsageManager {
+  Allocator getAllocator();
+
+  void decRefBuffer(MemoryBuffer buffer);
+
+  void decRefBuffers(List<MemoryBuffer> buffers);
+
+  boolean incRefBuffer(MemoryBuffer buffer);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index b17edb5..17d9fdf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import java.util.List;
-
-import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
@@ -62,15 +59,4 @@ public interface LowLevelCache {
    */
   long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] chunks,
       long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
-
-  Allocator getAllocator();
-
-  /**
-   * Releases the buffer returned by getFileData.
-   */
-  void releaseBuffer(MemoryBuffer buffer);
-
-  void releaseBuffers(List<MemoryBuffer> cacheBuffers);
-
-  boolean reuseBuffer(MemoryBuffer buffer);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index c2a130a..1132171 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
+public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
   private static final int DEFAULT_CLEANUP_INTERVAL = 600;
   private final EvictionAwareAllocator allocator;
   private final AtomicInteger newEvictions = new AtomicInteger(0);
@@ -334,12 +334,12 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
   }
 
   @Override
-  public void releaseBuffer(MemoryBuffer buffer) {
+  public void decRefBuffer(MemoryBuffer buffer) {
     unlockBuffer((LlapDataBuffer)buffer, true);
   }
 
   @Override
-  public void releaseBuffers(List<MemoryBuffer> cacheBuffers) {
+  public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
     for (MemoryBuffer b : cacheBuffers) {
       unlockBuffer((LlapDataBuffer)b, true);
     }
@@ -502,7 +502,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
   }
 
   @Override
-  public boolean reuseBuffer(MemoryBuffer buffer) {
+  public boolean incRefBuffer(MemoryBuffer buffer) {
     // notifyReused implies that buffer is already locked; it's also called once for new
     // buffers that are not cached yet. Don't notify cache policy.
     return lockBuffer(((LlapDataBuffer)buffer), false);

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 8788e15..992da8e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -39,13 +39,14 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
 
   public LowLevelCacheMemoryManager(Configuration conf, LowLevelCachePolicy evictor,
       LlapDaemonCacheMetrics metrics) {
-    this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+    this.maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
     this.evictor = evictor;
     this.usedMemory = new AtomicLong(0);
     this.metrics = metrics;
     metrics.setCacheCapacityTotal(maxSize);
     if (LlapIoImpl.LOGL.isInfoEnabled()) {
-      LlapIoImpl.LOG.info("Cache memory manager initialized with max size " + maxSize);
+      LlapIoImpl.LOG.info("Memory manager initialized with max size " + maxSize + " and "
+          + ((evictor == null) ? "no " : "") + "ability to evict blocks");
     }
   }
 
@@ -65,6 +66,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
         }
         continue;
       }
+      if (evictor == null) return false;
       // TODO: for one-block case, we could move notification for the last block out of the loop.
       long evicted = evictor.evictSomeBlocks(remainingToReserve);
       if (evicted == 0) {
@@ -107,6 +109,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
 
   @Override
   public void forceReservedMemory(int memoryToEvict) {
+    if (evictor == null) return;
     while (memoryToEvict > 0) {
       long evicted = evictor.evictSomeBlocks(memoryToEvict);
       if (evicted == 0) return;
@@ -126,6 +129,7 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
 
   @Override
   public String debugDumpForOom() {
+    if (evictor == null) return null;
     return "cache state\n" + evictor.debugDumpForOom();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 40cb92d..84910d7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -67,8 +67,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   private LlapOomDebugDump parentDebugDump;
 
   public LowLevelLrfuCachePolicy(Configuration conf) {
-    long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
-    int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+    long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+    int minBufferSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
     lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
     int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
     int maxHeapSize = -1;

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
new file mode 100644
index 0000000..526ff22
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+import sun.misc.Cleaner;
+
+public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
+  private final boolean isDirect;
+  private static Field cleanerField;
+  static {
+    ByteBuffer tmp = ByteBuffer.allocateDirect(1);
+    try {
+      cleanerField = tmp.getClass().getDeclaredField("cleaner");
+      cleanerField.setAccessible(true);
+    } catch (Throwable t) {
+      LlapIoImpl.LOG.warn("Cannot initialize DirectByteBuffer cleaner", t);
+      cleanerField = null;
+    }
+  }
+
+  public SimpleAllocator(Configuration conf) {
+    isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
+    if (LlapIoImpl.LOG.isInfoEnabled()) {
+      LlapIoImpl.LOG.info("Simple allocator with " + (isDirect ? "direct" : "byte") + " buffers");
+    }
+  }
+
+  @Override
+  public void allocateMultiple(MemoryBuffer[] dest, int size) {
+    for (int i = 0; i < dest.length; ++i) {
+      LlapDataBuffer buf = null;
+      if (dest[i] == null) {
+        dest[i] = buf = createUnallocated();
+      } else {
+        buf = (LlapDataBuffer)dest[i];
+      }
+      ByteBuffer bb = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+      buf.initialize(0, bb, 0, size);
+    }
+  }
+
+  @Override
+  public void deallocate(MemoryBuffer buffer) {
+    LlapDataBuffer buf = (LlapDataBuffer)buffer;
+    ByteBuffer bb = buf.byteBuffer;
+    buf.byteBuffer = null;
+    Field field = cleanerField;
+    if (field == null) return;
+    try {
+      ((Cleaner)field.get(bb)).clean();
+    } catch (Throwable t) {
+      LlapIoImpl.LOG.warn("Error using DirectByteBuffer cleaner; stopping its use", t);
+      cleanerField = null;
+    }
+  }
+
+  @Override
+  public boolean isDirectAlloc() {
+    return isDirect;
+  }
+
+  @Override
+  public LlapDataBuffer createUnallocated() {
+    return new LlapDataBuffer();
+  }
+
+  // BuddyAllocatorMXBean
+  @Override
+  public boolean getIsDirect() {
+    return isDirect;
+  }
+
+  @Override
+  public int getMinAllocation() {
+    return 0;
+  }
+
+  @Override
+  public int getMaxAllocation() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  public int getArenaSize() {
+    return -1;
+  }
+
+  @Override
+  public long getMaxCacheSize() {
+    return Integer.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
new file mode 100644
index 0000000..734a5c0
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+
+public class SimpleBufferManager implements BufferUsageManager {
+  private final Allocator allocator;
+  private final LlapDaemonCacheMetrics metrics;
+
+  public SimpleBufferManager(Allocator allocator, LlapDaemonCacheMetrics metrics) {
+    if (LlapIoImpl.LOGL.isInfoEnabled()) {
+      LlapIoImpl.LOG.info("Simple buffer manager");
+    }
+    this.allocator = allocator;
+    this.metrics = metrics;
+  }
+
+  private boolean lockBuffer(LlapDataBuffer buffer) {
+    int rc = buffer.incRef();
+    if (rc <= 0) return false;
+    metrics.incrCacheNumLockedBuffers();
+    return true;
+  }
+
+  private void unlockBuffer(LlapDataBuffer buffer) {
+    if (buffer.decRef() == 0) {
+      if (DebugUtils.isTraceCachingEnabled()) {
+        LlapIoImpl.LOG.info("Deallocating " + buffer + " that was not cached");
+      }
+      allocator.deallocate(buffer);
+    }
+    metrics.decrCacheNumLockedBuffers();
+  }
+
+  @Override
+  public void decRefBuffer(MemoryBuffer buffer) {
+    unlockBuffer((LlapDataBuffer)buffer);
+  }
+
+  @Override
+  public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+    for (MemoryBuffer b : cacheBuffers) {
+      unlockBuffer((LlapDataBuffer)b);
+    }
+  }
+
+  @Override
+  public boolean incRefBuffer(MemoryBuffer buffer) {
+    return lockBuffer((LlapDataBuffer)buffer);
+  }
+
+  @Override
+  public Allocator getAllocator() {
+    return allocator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 8e5377f..4b330f4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -145,7 +145,7 @@ public class LlapServiceDriver {
         Preconditions.checkArgument(options.getXmx() < options.getSize(),
             "Working memory has to be smaller than the container sizing");
       }
-      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT)) {
+      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)) {
         Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
             "Working memory + cache has to be smaller than the containing sizing ");
       }
@@ -165,7 +165,7 @@ public class LlapServiceDriver {
     }
 
     if (options.getCache() != -1) {
-      conf.setLong(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, options.getCache());
+      conf.setLong(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, options.getCache());
     }
 
     if (options.getXmx() != -1) {
@@ -278,11 +278,11 @@ public class LlapServiceDriver {
     configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf,
         ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
 
-    configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname,
-        HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
+    configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
+        HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
 
-    configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname,
-        HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT));
+    configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
+        HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
 
     configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf,
         ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 467ab71..b3057c3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -290,14 +290,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
           .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
       long executorMemoryBytes = HiveConf.getIntVar(
           daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
-      long cacheMemoryBytes =
-          HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
-      boolean isDirectCache =
-          HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT);
+
+      long ioMemoryBytes = HiveConf.getLongVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
+      boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
       boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED);
-      llapDaemon =
-          new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, isDirectCache,
-              cacheMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
+      llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled,
+              isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
 
       LOG.info("Adding shutdown hook for LlapDaemon");
       ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index b38f472..d2c1907 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -28,8 +28,10 @@ import javax.management.ObjectName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
@@ -38,7 +40,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
 import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
 import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
-import org.apache.hadoop.hive.llap.cache.NoopCache;
+import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
+import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
@@ -59,19 +62,21 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
   public static final Logger LOG = LoggerFactory.getLogger(LlapIoImpl.class);
   public static final LogLevels LOGL = new LogLevels(LOG);
+  private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator";
 
   private final ColumnVectorProducer cvp;
   private final ListeningExecutorService executor;
   private LlapDaemonCacheMetrics cacheMetrics;
   private LlapDaemonQueueMetrics queueMetrics;
   private ObjectName buddyAllocatorMXBean;
-  private EvictionAwareAllocator allocator;
+  private Allocator allocator;
 
   private LlapIoImpl(Configuration conf) throws IOException {
-    boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
-    // High-level cache not supported yet.
+    String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
+    boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode),
+        useAllocOnly = !useLowLevelCache && LlapIoImpl.MODE_ALLOCATOR.equalsIgnoreCase(ioMode);
     if (LOGL.isInfoEnabled()) {
-      LOG.info("Initializing LLAP IO" + (useLowLevelCache ? " with low level cache" : ""));
+      LOG.info("Initializing LLAP IO in " + ioMode + " mode");
     }
 
     String displayName = "LlapDaemonCacheMetrics-" + MetricsUtils.getHostName();
@@ -86,35 +91,47 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     LOG.info("Started llap daemon metrics with displayName: " + displayName +
         " sessionId: " + sessionId);
 
-    Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
-    boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
-    LowLevelCachePolicy cachePolicy =
-        useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
-    LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
-        conf, cachePolicy, cacheMetrics);
-    // Memory manager uses cache policy to trigger evictions.
-    OrcMetadataCache metadataCache = new OrcMetadataCache(memManager, cachePolicy);
+    Cache<OrcCacheKey> cache = null; // High-level cache is not implemented or supported.
+
+    OrcMetadataCache metadataCache = null;
     LowLevelCacheImpl orcCache = null;
+    BufferUsageManager bufferManager = null;
     if (useLowLevelCache) {
-      // Allocator uses memory manager to request memory.
-      allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
-      // Cache uses allocator to allocate and deallocate.
+      // Memory manager uses cache policy to trigger evictions, so create the policy first.
+      boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
+      LowLevelCachePolicy cachePolicy =
+          useLrfu ? new LowLevelLrfuCachePolicy(conf) : new LowLevelFifoCachePolicy(conf);
+      // Allocator uses memory manager to request memory, so create the manager next.
+      LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+          conf, cachePolicy, cacheMetrics);
+      // Cache uses allocator to allocate and deallocate, create allocator and then caches.
+      EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+      this.allocator = allocator;
       orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
+      metadataCache = new OrcMetadataCache(memManager, cachePolicy);
       // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
       cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
       cachePolicy.setParentDebugDumper(orcCache);
-      orcCache.init();
+      orcCache.init(); // Start the cache threads.
+      bufferManager = orcCache; // Cache also serves as buffer manager.
     } else {
-      cachePolicy.setEvictionListener(new EvictionDispatcher(null, metadataCache));
+      if (useAllocOnly) {
+        LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+            conf, null, cacheMetrics);
+        allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+      } else {
+        allocator = new SimpleAllocator(conf);
+      }
+      bufferManager = new SimpleBufferManager(allocator, cacheMetrics);
     }
-    // Arbitrary thread pool. Listening is used for unhandled errors for now (TODO: remove?)
+    // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
     int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
     executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads,
         new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()));
 
     // TODO: this should depends on input format and be in a map, or something.
-    this.cvp = new OrcColumnVectorProducer(metadataCache, orcCache, cache, conf, cacheMetrics,
-        queueMetrics);
+    this.cvp = new OrcColumnVectorProducer(
+        metadataCache, orcCache, bufferManager, cache, conf, cacheMetrics, queueMetrics);
     if (LOGL.isInfoEnabled()) {
       LOG.info("LLAP IO initialized");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index e156eaa..18191da 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
@@ -42,20 +43,22 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
   private final OrcMetadataCache metadataCache;
   private final Cache<OrcCacheKey> cache;
   private final LowLevelCache lowLevelCache;
+  private final BufferUsageManager bufferManager;
   private final Configuration conf;
   private boolean _skipCorrupt; // TODO: get rid of this
   private LlapDaemonCacheMetrics cacheMetrics;
   private LlapDaemonQueueMetrics queueMetrics;
 
   public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
-      LowLevelCacheImpl lowLevelCache, Cache<OrcCacheKey> cache, Configuration conf,
-      LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
+      LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, Cache<OrcCacheKey> cache,
+      Configuration conf, LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
     if (LlapIoImpl.LOGL.isInfoEnabled()) {
       LlapIoImpl.LOG.info("Initializing ORC column vector producer");
     }
 
     this.metadataCache = metadataCache;
     this.lowLevelCache = lowLevelCache;
+    this.bufferManager = bufferManager;
     this.cache = cache;
     this.conf = conf;
     this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
@@ -71,8 +74,8 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
     cacheMetrics.incrCacheReadRequests();
     OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
         _skipCorrupt, counters, queueMetrics);
-    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, cache, metadataCache,
-        conf, split, columnIds, sarg, columnNames, edc, counters);
+    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, cache,
+        metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters);
     edc.init(reader, reader);
     return edc;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 58d2ac8..3ddfc29 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
@@ -133,6 +134,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
 
   private final OrcMetadataCache metadataCache;
   private final LowLevelCache lowLevelCache;
+  private final BufferUsageManager bufferManager;
   private final Configuration conf;
   private final Cache<OrcCacheKey> cache;
   private final FileSplit split;
@@ -160,12 +162,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   @SuppressWarnings("unused")
   private volatile boolean isPaused = false;
 
-  public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
-      OrcMetadataCache metadataCache, Configuration conf, FileSplit split,
-      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+  public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
+      Cache<OrcCacheKey> cache, OrcMetadataCache metadataCache, Configuration conf,
+      FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
       OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
     this.lowLevelCache = lowLevelCache;
     this.metadataCache = metadataCache;
+    this.bufferManager = bufferManager;
     this.cache = cache;
     this.conf = conf;
     this.split = split;
@@ -382,7 +385,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         if (stripeMetadatas != null) {
           stripeMetadata = stripeMetadatas.get(stripeIxMod);
         } else {
-          if (hasFileId) {
+          if (hasFileId && metadataCache != null) {
             stripeKey.stripeIx = stripeIx;
             stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
           }
@@ -394,7 +397,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
             stripeMetadata = new OrcStripeMetadata(
                 stripeKey, metadataReader, stripe, stripeIncludes, sargColumns);
             counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs);
-            if (hasFileId) {
+            if (hasFileId && metadataCache != null) {
               stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
               if (DebugUtils.isTraceOrcEnabled()) {
                 LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -510,11 +513,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private void validateFileMetadata() throws IOException {
     if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
     int bufferSize = fileMetadata.getCompressionBufferSize();
-    int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+    int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
     if (bufferSize < minAllocSize) {
       LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
             + "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
-            + HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.toString() + " to avoid wasting memory");
+            + HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.toString() + " to avoid wasting memory");
     }
   }
 
@@ -617,18 +620,20 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
    */
   private OrcFileMetadata getOrReadFileMetadata() throws IOException {
     OrcFileMetadata metadata = null;
-    if (fileId != null) {
+    if (fileId != null && metadataCache != null) {
       metadata = metadataCache.getFileMetadata(fileId);
       if (metadata != null) {
         counters.incrCounter(Counter.METADATA_CACHE_HIT);
         return metadata;
+      } else {
+        counters.incrCounter(Counter.METADATA_CACHE_MISS);
       }
-      counters.incrCounter(Counter.METADATA_CACHE_MISS);
     }
     ensureOrcReader();
     // We assume this call doesn't touch HDFS because everything is already read; don't add time.
     metadata = new OrcFileMetadata(fileId != null ? fileId : 0, orcReader);
-    return (fileId == null) ? metadata : metadataCache.putFileMetadata(metadata);
+    if (fileId == null || metadataCache == null) return metadata;
+    return metadataCache.putFileMetadata(metadata);
   }
 
   /**
@@ -643,7 +648,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
       OrcStripeMetadata value = null;
       int stripeIx = stripeIxMod + stripeIxFrom;
-      if (hasFileId) {
+      if (hasFileId && metadataCache != null) {
         stripeKey.stripeIx = stripeIx;
         value = metadataCache.getStripeMetadata(stripeKey);
       }
@@ -655,7 +660,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
           long startTime = counters.startTimeCounter();
           value = new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns);
           counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
-          if (hasFileId) {
+          if (hasFileId && metadataCache != null) {
             value = metadataCache.putStripeMetadata(value);
             if (DebugUtils.isTraceOrcEnabled()) {
               LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
@@ -701,7 +706,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
             LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
           }
         }
-        lowLevelCache.releaseBuffers(data.getCacheBuffers());
+        bufferManager.decRefBuffers(data.getCacheBuffers());
         CSD_POOL.offer(data);
       }
     }
@@ -939,7 +944,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
 
     public DataWrapperForOrc() {
       boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
-      if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) {
+      if (useZeroCopy && !getAllocator().isDirectAlloc()) {
         throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
             + "buffers; either disable zero-copy or enable direct cache allocation");
       }
@@ -949,30 +954,31 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     @Override
     public DiskRangeList getFileData(long fileId, DiskRangeList range,
         long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
-      return lowLevelCache.getFileData(fileId, range, baseOffset, factory, counters, gotAllData);
+      return (lowLevelCache == null) ? range : lowLevelCache.getFileData(
+          fileId, range, baseOffset, factory, counters, gotAllData);
     }
 
     @Override
     public long[] putFileData(long fileId, DiskRange[] ranges,
         MemoryBuffer[] data, long baseOffset) {
-      return lowLevelCache.putFileData(
+      return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
           fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
     }
 
     @Override
     public void releaseBuffer(MemoryBuffer buffer) {
-      lowLevelCache.releaseBuffer(buffer);
+      bufferManager.decRefBuffer(buffer);
     }
 
     @Override
     public void reuseBuffer(MemoryBuffer buffer) {
-      boolean isReused = lowLevelCache.reuseBuffer(buffer);
+      boolean isReused = bufferManager.incRefBuffer(buffer);
       assert isReused;
     }
 
     @Override
     public Allocator getAllocator() {
-      return lowLevelCache.getAllocator();
+      return bufferManager.getAllocator();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 6375996..fc014a7 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -282,10 +282,10 @@ public class TestBuddyAllocator {
 
   private Configuration createConf(int min, int max, int arena, int total) {
     Configuration conf = new Configuration();
-    conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
-    conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max);
-    conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_COUNT.varname, total/arena);
-    conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total);
+    conf.setInt(ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, min);
+    conf.setInt(ConfVars.LLAP_ALLOCATOR_MAX_ALLOC.varname, max);
+    conf.setInt(ConfVars.LLAP_ALLOCATOR_ARENA_COUNT.varname, total/arena);
+    conf.setLong(ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, total);
     return conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 8324b21..0846db9 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -348,7 +348,7 @@ public class TestLowLevelCacheImpl {
                 ++gets;
                 LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).getBuffer();
                 assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
-                cache.releaseBuffer(result);
+                cache.decRefBuffer(result);
                 iter = iter.next;
               }
             } else {
@@ -378,7 +378,7 @@ public class TestLowLevelCacheImpl {
                   assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex);
                 }
                 maskVal >>= 1;
-                cache.releaseBuffer(buf);
+                cache.decRefBuffer(buf);
               }
             }
           }
@@ -411,7 +411,7 @@ public class TestLowLevelCacheImpl {
             DiskRange r = results[index];
             if (r instanceof CacheChunk) {
               LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).getBuffer();
-              cache.releaseBuffer(result);
+              cache.decRefBuffer(result);
               if (victim == null && result.invalidate()) {
                 ++evictions;
                 victim = result;

http://git-wip-us.apache.org/repos/asf/hive/blob/3133ac5a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index d0abfa3..46e9547 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -145,8 +145,8 @@ public class TestLowLevelLrfuCachePolicy {
 
   private Configuration createConf(int min, int heapSize, Double lambda) {
     Configuration conf = new Configuration();
-    conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
-    conf.setInt(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, heapSize);
+    conf.setInt(HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.varname, min);
+    conf.setInt(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, heapSize);
     if (lambda != null) {
       conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, lambda.doubleValue());
     }