You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/14 03:47:09 UTC

svn commit: r1651559 - in /hive/branches/llap: common/src/java/org/apache/hadoop/hive/conf/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/h...

Author: sershe
Date: Wed Jan 14 02:47:08 2015
New Revision: 1651559

URL: http://svn.apache.org/r1651559
Log:
Finish reworking LRFU policy for low-level cache (not clear if it's a good pick due to concurrency); tests; some pipeline adjustments

Added:
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
Removed:
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java
Modified:
    hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 14 02:47:08 2015
@@ -1969,10 +1969,11 @@ public class HiveConf extends Configurat
         "Updates tez job execution progress in-place in the terminal."),
 
     LLAP_ENABLED("hive.llap.enabled", true, ""),
-    LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.minalloc", 128 * 1024, ""),
-    LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.minalloc", 16 * 1024 * 1024, ""),
-    LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.minalloc", 128L * 1024 * 1024, ""),
-    LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.minalloc", 1024L * 1024 * 1024, ""),
+    LLAP_LOW_LEVEL_CACHE("hive.llap.use.lowlevel.cache", true, ""),
+    LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.alloc.min", 128 * 1024, ""),
+    LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.alloc.max", 16 * 1024 * 1024, ""),
+    LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.arena.size", 128 * 1024 * 1024, ""),
+    LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.size", 1024L * 1024 * 1024, ""),
     LLAP_REQUEST_THREAD_COUNT("hive.llap.request.thread.count", 16, ""),
     LLAP_USE_LRFU("hive.llap.use.lrfu", true, ""),
     LLAP_LRFU_LAMBDA("hive.llap.lrfu.lambda", 0.01f, "")

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java Wed Jan 14 02:47:08 2015
@@ -18,10 +18,17 @@
 
 package org.apache.hadoop.hive.llap.io.api;
 
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+
 public class EncodedColumn<BatchKey> {
   // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance
   //       generality, and ability to not copy data from underlying low-level cached buffers.
-  public static class ColumnBuffer {}
+  public static class ColumnBuffer {
+    // TODO: given how ORC will allocate, it might make sense to share array between all
+    //       returned encodedColumn-s, and store index and length in the array.
+    public LlapMemoryBuffer[] cacheBuffers;
+    public int firstOffset, lastLength;
+  }
   public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer columnData) {
     this.batchKey = batchKey;
     this.columnIndex = columnIndex;

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Wed Jan 14 02:47:08 2015
@@ -42,4 +42,7 @@ public interface LowLevelCache {
    * Allocate dest.length new blocks of size into dest.
    */
   void allocateMultiple(LlapMemoryBuffer[] dest, int size);
+
+  void releaseBuffers(LlapMemoryBuffer[] cacheBuffers);
+
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Wed Jan 14 02:47:08 2015
@@ -41,8 +41,9 @@ public final class LlapCacheableBuffer e
   // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object.
   public double priority;
   public long lastUpdate = -1;
-  public int indexInHeap = -1;
-  public boolean isLockedInHeap; // TODO#: this flag is invalid and not thread safe
+  public LlapCacheableBuffer prev = null, next = null;
+  public int indexInHeap = NOT_IN_CACHE;
+  public static final int IN_LIST = -2, NOT_IN_CACHE = -1;
 
   @Override
   public int hashCode() {
@@ -60,15 +61,16 @@ public final class LlapCacheableBuffer e
         && this.offset == other.offset && this.length == other.length;
   }
 
-  int lock() {
-    int oldRefCount = -1;
+  int incRef() {
+    int newRefCount = -1;
     while (true) {
-      oldRefCount = refCount.get();
+      int oldRefCount = refCount.get();
       if (oldRefCount == EVICTED_REFCOUNT) return -1;
       assert oldRefCount >= 0;
-      if (refCount.compareAndSet(oldRefCount, oldRefCount + 1)) break;
+      newRefCount = oldRefCount + 1;
+      if (refCount.compareAndSet(oldRefCount, newRefCount)) break;
     }
-    return oldRefCount;
+    return newRefCount;
   }
 
   public boolean isLocked() {
@@ -81,7 +83,7 @@ public final class LlapCacheableBuffer e
     return refCount.get() == EVICTED_REFCOUNT;
   }
 
-  int unlock() {
+  int decRef() {
     int newRefCount = refCount.decrementAndGet();
     if (newRefCount < 0) {
       throw new AssertionError("Unexpected refCount " + newRefCount);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java Wed Jan 14 02:47:08 2015
@@ -33,8 +33,9 @@ import org.apache.hadoop.hive.llap.io.ap
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
+// TODO: refactor the cache and allocator parts?
 public class LowLevelBuddyCache implements LowLevelCache, EvictionListener {
-  private final ArrayList<arena> arenas;
+  private final ArrayList<Arena> arenas;
   private AtomicInteger newEvictions = new AtomicInteger(0);
   private final Thread cleanupThread;
   private final ConcurrentHashMap<String, FileCache> cache =
@@ -44,21 +45,21 @@ public class LowLevelBuddyCache implemen
   // Config settings
   private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;
 
-  private final int minAllocation, maxAllocation;
-  private final long maxSize, arenaSize;
-  
+  private final int minAllocation, maxAllocation, arenaSize;
+  private final long maxSize;
+
   public LowLevelBuddyCache(Configuration conf) {
     minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
     maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
-    arenaSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
+    arenaSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
     maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
-    if (maxSize < arenaSize || arenaSize > maxAllocation || maxAllocation < minAllocation) {
+    if (maxSize < arenaSize || arenaSize < maxAllocation || maxAllocation < minAllocation) {
       throw new AssertionError("Inconsistent sizes of cache, arena and allocations: "
           + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSize);
     }
     if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)
         || (Long.bitCount(arenaSize) != 1) || (minAllocation == 1)) {
-      // TODO: technically, arena size is not required to be so; needs to be divisible by maxAlloc
+      // TODO: technically, arena size only needs to be divisible by maxAlloc
       throw new AssertionError("Allocation and arena sizes must be powers of two > 1: "
           + minAllocation + ", " + maxAllocation + ", " + arenaSize);
     }
@@ -70,11 +71,11 @@ public class LowLevelBuddyCache implemen
     maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
     arenaSizeLog2 = 31 - Long.numberOfLeadingZeros(arenaSize);
     maxArenas = (int)(maxSize / arenaSize);
-    arenas = new ArrayList<arena>(maxArenas);
+    arenas = new ArrayList<Arena>(maxArenas);
     for (int i = 0; i < maxArenas; ++i) {
-      arenas.add(new arena());
+      arenas.add(new Arena());
     }
-    arenas.get(0).init();
+    arenas.get(0).init(arenaSize, maxAllocation, arenaSizeLog2, minAllocLog2, maxAllocLog2);
     cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
         ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this)
         : new LowLevelFifoCachePolicy(minAllocation, maxSize, this);
@@ -91,7 +92,7 @@ public class LowLevelBuddyCache implemen
     freeListIndex = Math.max(freeListIndex - minAllocLog2, 0);
     int allocationSize = 1 << (freeListIndex + minAllocLog2);
     int total = dest.length * allocationSize;
-    cachePolicy.reserveMemory(total);
+    cachePolicy.reserveMemory(total, true);
 
     int ix = 0;
     for (int i = 0; i < dest.length; ++i) {
@@ -99,27 +100,27 @@ public class LowLevelBuddyCache implemen
       dest[i] = new LlapCacheableBuffer(null, -1, -1); // TODO: pool of objects?
     }
     // TODO: instead of waiting, loop only ones we haven't tried w/tryLock?
-    for (arena block : arenas) {
+    for (Arena block : arenas) {
       int newIx = allocateFast(block, freeListIndex, dest, ix, allocationSize);
       if (newIx == -1) break;
       if (newIx == dest.length) return;
       ix = newIx;
     }
     // Then try to split bigger blocks.
-    for (arena block : arenas) {
+    for (Arena block : arenas) {
       int newIx = allocateWithSplit(block, freeListIndex, dest, ix, allocationSize);
       if (newIx == -1) break;
       if (newIx == dest.length) return;
       ix = newIx;
     }
     // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
-    for (arena block : arenas) {
+    for (Arena block : arenas) {
       ix = allocateWithExpand(block, freeListIndex, dest, ix, allocationSize);
       if (ix == dest.length) return;
     }
   }
 
-  private int allocateFast(arena block,
+  private int allocateFast(Arena block,
       int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
     if (block.data == null) return -1; // not allocated yet
     FreeList freeList = block.freeLists[freeListIndex];
@@ -133,7 +134,7 @@ public class LowLevelBuddyCache implemen
   }
 
   private int allocateWithSplit(
-      arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) {
+      Arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) {
     if (arena.data == null) return -1; // not allocated yet
     FreeList freeList = arena.freeLists[freeListIndex];
     int remaining = -1;
@@ -206,7 +207,7 @@ public class LowLevelBuddyCache implemen
     return lastSplitNextHeader << minAllocLog2;
   }
 
-  public int allocateFromFreeListUnderLock(arena block, FreeList freeList,
+  public int allocateFromFreeListUnderLock(Arena block, FreeList freeList,
       int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
     int current = freeList.listHead;
     while (current >= 0 && ix < dest.length) {
@@ -222,15 +223,15 @@ public class LowLevelBuddyCache implemen
   }
 
   private int allocateWithExpand(
-      arena block, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
-    if (block.data != null) return ix; // already allocated
-    synchronized (block) {
+      Arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
+    if (arena.data != null) return ix; // already allocated
+    synchronized (arena) {
       // Never goes from non-null to null, so this is the only place we need sync.
-      if (block.data == null) {
-        block.init();
+      if (arena.data == null) {
+        arena.init(arenaSize, maxAllocation, arenaSizeLog2, minAllocLog2, maxAllocLog2);
       }
     }
-    return allocateWithSplit(block, freeListIndex, dest, ix, size);
+    return allocateWithSplit(arena, freeListIndex, dest, ix, size);
   }
 
   @Override
@@ -262,8 +263,8 @@ public class LowLevelBuddyCache implemen
   }
 
   private boolean lockBuffer(LlapCacheableBuffer buffer) {
-    int rc = buffer.lock();
-    if (rc == 0) {
+    int rc = buffer.incRef();
+    if (rc == 1) {
       cachePolicy.notifyLock(buffer);
     }
     return rc >= 0;
@@ -282,7 +283,11 @@ public class LowLevelBuddyCache implemen
         assert buffer.isLocked();
         while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
           LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
-          if (oldVal == null) break; // Cached successfully.
+          if (oldVal == null) {
+            // Cached successfully, add to policy.
+            cachePolicy.cache(buffer);
+            break;
+          }
           if (DebugUtils.isTraceCachingEnabled()) {
             LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for "
                 + fileName + "@" + offset  + "; old " + oldVal + ", new " + buffer);
@@ -297,7 +302,7 @@ public class LowLevelBuddyCache implemen
             result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value
             break;
           }
-          // We found some old value but couldn't lock it; remove it.
+          // We found some old value but couldn't incRef it; remove it.
           subCache.cache.remove(offset, oldVal);
         }
       }
@@ -349,15 +354,22 @@ public class LowLevelBuddyCache implemen
     releaseBufferInternal((LlapCacheableBuffer)buffer);
   }
 
+  @Override
+  public void releaseBuffers(LlapMemoryBuffer[] cacheBuffers) {
+    for (int i = 0; i < cacheBuffers.length; ++i) {
+      releaseBufferInternal((LlapCacheableBuffer)cacheBuffers[i]);
+    }
+  }
+
   public void releaseBufferInternal(LlapCacheableBuffer buffer) {
-    if (buffer.unlock() == 0) {
+    if (buffer.decRef() == 0) {
       cachePolicy.notifyUnlock(buffer);
       unblockEviction();
     }
   }
 
   public static LlapCacheableBuffer allocateFake() {
-    return new LlapCacheableBuffer(null, -1, -1);
+    return new LlapCacheableBuffer(null, -1, 1);
   }
 
   public void unblockEviction() {
@@ -446,9 +458,9 @@ public class LowLevelBuddyCache implemen
     }
   }
 
-  private class arena {
-    void init() {
-      data = ByteBuffer.allocateDirect(maxAllocation);
+  private static class Arena {
+    void init(int arenaSize, int maxAlloc, int arenaSizeLog2, int minAllocLog2, int maxAllocLog2) {
+      data = ByteBuffer.allocateDirect(arenaSize);
       int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
       headers = new byte[maxMinAllocs];
       int allocLog2Diff = maxAllocLog2 - minAllocLog2;
@@ -459,7 +471,7 @@ public class LowLevelBuddyCache implemen
       int maxMaxAllocs = 1 << (arenaSizeLog2 - maxAllocLog2),
           headerIndex = 0, headerIncrement = 1 << allocLog2Diff;
       freeLists[maxAllocLog2 - 1].listHead = 0;
-      for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
+      for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAlloc) {
         // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
         headers[headerIndex] = (byte)(allocLog2Diff << 1); // Maximum allocation size
         data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerIncrement));
@@ -482,7 +494,6 @@ public class LowLevelBuddyCache implemen
     //       However, we are trying to increase fragmentation now, since we cater to single-size.
   }
 
-  // TODO##: separate the classes?
   private static class FileCache {
     private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
     // TODO: given the specific data, perhaps the nested thing should not be CHM

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Wed Jan 14 02:47:08 2015
@@ -22,5 +22,5 @@ public interface LowLevelCachePolicy {
   void cache(LlapCacheableBuffer buffer);
   void notifyLock(LlapCacheableBuffer buffer);
   void notifyUnlock(LlapCacheableBuffer buffer);
-  void reserveMemory(long total);
+  boolean reserveMemory(long memoryToReserve, boolean oneEviction);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Wed Jan 14 02:47:08 2015
@@ -32,7 +32,7 @@ public abstract class LowLevelCachePolic
   }
 
   @Override
-  public void reserveMemory(long memoryToReserve) {
+  public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) {
     // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point?
     while (memoryToReserve > 0) {
       long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve;
@@ -42,16 +42,18 @@ public abstract class LowLevelCachePolic
       }
       // TODO: for one-block case, we could move notification for the last block out of the loop.
       long evicted = evictSomeBlocks(memoryToReserve, evictionListener);
+      if (!waitForEviction && evicted == 0) return false;
       // Adjust the memory - we have to account for what we have just evicted.
       while (true) {
         long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted);
-        if (usedMemory.compareAndSet(usedMem, usedMem + reserveWithEviction)) {
+        if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reserveWithEviction)) {
           memoryToReserve -= reserveWithEviction;
           break;
         }
         usedMem = usedMemory.get();
       }
     }
+    return true;
   }
 
   protected abstract long evictSomeBlocks(long memoryToReserve, EvictionListener listener);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Wed Jan 14 02:47:08 2015
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -27,14 +27,11 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
- * Implementation of the "simple" algorithm from "On the Existence of a Spectrum of Policies
+ * Implementation of the algorithm from "On the Existence of a Spectrum of Policies
  * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies".
- * TODO: fix this, no longer true; with ORC as is, 4k buffers per gig of cache
- * We expect the number of buffers to be relatively small (1000s), so we just use one heap.
- **/
+ * Additionally, buffer locking has to be handled (locked buffer cannot be evicted).
+ */
 public class LowLevelLrfuCachePolicy extends LowLevelCachePolicyBase {
   private final double lambda;
   private final double f(long x) {
@@ -50,124 +47,162 @@ public class LowLevelLrfuCachePolicy ext
 
   private final AtomicLong timer = new AtomicLong(0);
   /**
-   * The heap. Currently synchronized on itself; there is a number of papers out there
-   * with various lock-free/efficient priority queues which we can use if needed.
+   * The heap and list. Currently synchronized on the object, which is not good. If this becomes
+   * a problem (which it probably will), we can partition the cache policy, or use some better
+   * structure. Heap should not be locked while holding the lock on list.
+   * As of now, eviction in most cases will only need the list; locking doesn't do anything;
+   * unlocking actually places item in evictable cache - unlocking is done after processing,
+   * so this most expensive part (and only access to heap in most cases) will not affect it.
+   * Perhaps we should use ConcurrentDoubleLinkedList (in public domain).
+   * ONLY LIST REMOVAL is allowed under list lock.
    */
   private final LlapCacheableBuffer[] heap;
+  private final ReentrantLock listLock = new ReentrantLock();
+  private LlapCacheableBuffer listHead, listTail;
   /** Number of elements. */
   private int heapSize = 0;
 
   public LowLevelLrfuCachePolicy(Configuration conf,
       long minBufferSize, long maxCacheSize, EvictionListener listener) {
     super(maxCacheSize, listener);
-    heap = new LlapCacheableBuffer[(int)Math.ceil((maxCacheSize * 1.0) / minBufferSize)];
     lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
+    int maxBuffers = (int)Math.ceil((maxCacheSize * 1.0) / minBufferSize);
+    int maxHeapSize = -1;
+    if (lambda == 0) {
+      maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case
+    } else {
+      int lrfuThreshold = (int)((Math.log(1 - Math.pow(0.5, lambda)) / Math.log(0.5)) / lambda);
+      maxHeapSize = Math.min(lrfuThreshold, maxBuffers);
+    }
+    heap = new LlapCacheableBuffer[maxHeapSize];
+    listHead = listTail = null;
   }
 
   @Override
   public void cache(LlapCacheableBuffer buffer) {
-    buffer.lastUpdate = timer.incrementAndGet();
-    buffer.priority = F0;
+    // LRFU cache policy doesn't store locked blocks. When we cache, the block is locked, so
+    // we simply do nothing here. The fact that it was never updated will allow us to add it
+    // properly on the first notifyUnlock.
     assert buffer.isLocked();
-    buffer.isLockedInHeap = true;
-    synchronized (heap) {
-      // Ensured by reserveMemory.
-      assert heapSize < heap.length : heap.length + " >= " + heapSize;
-      buffer.indexInHeap = heapSize;
-      heapifyUpUnderLock(buffer, buffer.lastUpdate);
-      if (DebugUtils.isTraceEnabled()) {
-        LlapIoImpl.LOG.info(buffer + " inserted at " + buffer.lastUpdate);
-      }
-      ++heapSize;
-    }
   }
 
   @Override
   public void notifyLock(LlapCacheableBuffer buffer) {
-    long time = timer.get();
-    synchronized (heap) {
-      buffer.isLockedInHeap = true;
-      heapifyDownUnderLock(buffer, time);
-    }
+    // We do not proactively remove locked items from the heap, and opportunistically try to
+    // remove from the list (since eviction is mostly from the list). If eviction stumbles upon
+    // a locked item in either, it will remove it from cache; when we unlock, we are going to
+    // put it back or update it, depending on whether this has happened. This should cause
+    // most of the expensive cache update work to happen in unlock, not blocking processing.
+    if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return;
+    if (!listLock.tryLock()) return;
+    removeFromListAndUnlock(buffer);
   }
 
   @Override
   public void notifyUnlock(LlapCacheableBuffer buffer) {
     long time = timer.incrementAndGet();
+    if (DebugUtils.isTraceCachingEnabled()) {
+      LlapIoImpl.LOG.info("Touching " + buffer + " at " + time);
+    }
     synchronized (heap) {
-      if (DebugUtils.isTraceCachingEnabled()) {
-        LlapIoImpl.LOG.info("Touching " + buffer + " at " + time);
-      }
-      buffer.priority = touchPriority(time, buffer.lastUpdate, buffer.priority);
+      // First, update buffer priority - we have just been using it.
+      buffer.priority = (buffer.lastUpdate == -1) ? F0
+          : touchPriority(time, buffer.lastUpdate, buffer.priority);
       buffer.lastUpdate = time;
-      buffer.isLockedInHeap = false;
-      // Buffer's priority just decreased from boosted lock priority, so move up.
-      heapifyUpUnderLock(buffer, time);
+      // Then, if the buffer was in the list, remove it.
+      if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) {
+        listLock.lock();
+        removeFromListAndUnlock(buffer);
+      }
+      // The only concurrent change that can happen when we hold the heap lock is list removal;
+      // we have just ensured the item is not in the list, so we have a definite state now.
+      if (buffer.indexInHeap >= 0) {
+        // The buffer has lived in the heap all along. Restore heap property.
+        heapifyDownUnderLock(buffer, time);
+      } else if (heapSize == heap.length) {
+        // The buffer is not in the (full) heap. Demote the top item of the heap into the list.
+        LlapCacheableBuffer demoted = heap[0];
+        synchronized (listLock) {
+          demoted.indexInHeap = LlapCacheableBuffer.IN_LIST;
+          demoted.prev = null;
+          if (listHead != null) {
+            demoted.next = listHead;
+            listHead.prev = demoted;
+            listHead = demoted;
+          } else {
+            listHead = listTail = demoted;
+            demoted.next = null;
+          }
+        }
+        // Now insert the buffer in its place and restore heap property.
+        buffer.indexInHeap = 0;
+        heapifyDownUnderLock(buffer, time);
+      } else {
+        // Heap is not full, add the buffer to the heap and restore heap property up.
+        assert heapSize < heap.length : heap.length + " < " + heapSize;
+        buffer.indexInHeap = heapSize;
+        heapifyUpUnderLock(buffer, time);
+        ++heapSize;
+      }
     }
   }
 
-  private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
-    if (heapSize == 0) return null;
-    LlapCacheableBuffer result = heap[0];
-    if (!result.invalidate()) {
-      // We boost the priority of locked buffers to a very large value;
-      // this means entire heap is locked. TODO: need to work around that for small pools?
-      if (DebugUtils.isTraceCachingEnabled()) {
-        LlapIoImpl.LOG.info("Failed to invalidate head " + result.toString() + "; size = " + heapSize);
+  @Override
+  protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+    long evicted = 0;
+    // In normal case, we evict the items from the list.
+    LlapCacheableBuffer nextCandidate, firstCandidate;
+    listLock.lock();
+    try {
+      nextCandidate = firstCandidate = listTail;
+      while (evicted < memoryToReserve && nextCandidate != null) {
+        if (!nextCandidate.invalidate()) {
+          // Locked buffer was in the list - just drop it; will be re-added on unlock.
+          LlapCacheableBuffer lockedBuffer = nextCandidate;
+          nextCandidate = nextCandidate.prev;
+          removeFromListUnderLock(lockedBuffer);
+          continue;
+        }
+        // Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us.
+        // TODO#: double check this is valid!
+        nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
+        evicted += nextCandidate.length;
       }
-      return null;
-    }
-    if (DebugUtils.isTraceCachingEnabled()) {
-      LlapIoImpl.LOG.info("Evicting " + result + " at " + time);
-    }
-    result.indexInHeap = -1;
-    --heapSize;
-    LlapCacheableBuffer newRoot = heap[heapSize];
-    newRoot.indexInHeap = 0;
-    if (newRoot.lastUpdate != time && !newRoot.isLockedInHeap) {
-      newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority);
-      newRoot.lastUpdate = time;
+      if (firstCandidate != nextCandidate) {
+        if (nextCandidate == null) {
+          listHead = listTail = null; // We have evicted the entire list.
+        } else {
+          // Splice the section that we have evicted out of the list.
+          removeFromListUnderLock(nextCandidate.next, firstCandidate);
+        }
+      }
+    } finally {
+      listLock.unlock();
     }
-    heapifyDownUnderLock(newRoot, time);
-    return result;
-  }
-
-  private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) {
-    // Relative positions of the blocks don't change over time; priorities we expire can only
-    // decrease; we only have one block that could have broken heap rule and we always move it
-    // down; therefore, we can update priorities of other blocks as we go for part of the heap -
-    // we correct any discrepancy w/the parent after expiring priority, and any block we expire
-    // the priority for already has lower priority than that of its children.
-    // TODO: avoid expiring priorities if times are close? might be needlessly expensive.
-    int ix = buffer.indexInHeap;
-    double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority;
-    while (true) {
-      int leftIx = (ix << 1) + 1, rightIx = leftIx + 1;
-      if (leftIx >= heapSize) break; // Buffer is at the leaf node.
-      LlapCacheableBuffer left = heap[leftIx], right = null;
-      if (rightIx < heapSize) {
-        right = heap[rightIx];
-      }
-      double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time);
-      if (priority <= leftPri && priority <= rightPri) break;
-      if (leftPri <= rightPri) { // prefer left, cause right might be missing
-        heap[ix] = left;
-        left.indexInHeap = ix;
-        ix = leftIx;
-      } else {
-        heap[ix] = right;
-        right.indexInHeap = ix;
-        ix = rightIx;
+    while (firstCandidate != nextCandidate) {
+      listener.notifyEvicted(firstCandidate);
+      firstCandidate = firstCandidate.prev;
+    }
+    if (evicted >= memoryToReserve) return evicted;
+    // This should not happen unless we are evicting a lot at once, or buffers are large (so
+    // there's a small number of buffers and they all live in the heap).
+    long time = timer.get();
+    while (evicted < memoryToReserve) {
+      LlapCacheableBuffer buffer = null;
+      synchronized (heap) {
+        buffer = evictFromHeapUnderLock(time);
       }
+      if (buffer == null) return evicted;
+      evicted += buffer.length;
+      listener.notifyEvicted(buffer);
     }
-    buffer.indexInHeap = ix;
-    heap[ix] = buffer;
+    return evicted;
   }
 
   private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) {
     // See heapifyDown comment.
     int ix = buffer.indexInHeap;
-    double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority;
+    double priority = buffer.priority;
     while (true) {
       if (ix == 0) break; // Buffer is at the top of the heap.
       int parentIx = (ix - 1) >>> 1;
@@ -182,19 +217,140 @@ public class LowLevelLrfuCachePolicy ext
     heap[ix] = buffer;
   }
 
+  // Note: almost never called (unless buffers are very large or we evict a lot).
+  private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
+    while (true) {
+      if (heapSize == 0) return null;
+      LlapCacheableBuffer result = heap[0];
+      if (DebugUtils.isTraceCachingEnabled()) {
+        LlapIoImpl.LOG.info("Evicting " + result + " at " + time);
+      }
+      result.indexInHeap = -1;
+      --heapSize;
+      boolean canEvict = result.invalidate();
+      if (heapSize > 0) {
+        LlapCacheableBuffer newRoot = heap[heapSize];
+        newRoot.indexInHeap = 0;
+        if (newRoot.lastUpdate != time) {
+          newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority);
+          newRoot.lastUpdate = time;
+        }
+        heapifyDownUnderLock(newRoot, time);
+      }
+      if (canEvict) return result;
+      // Otherwise we just removed a locked item from heap; unlock will re-add it, we continue.
+    }
+  }
+
+  private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) {
+    // Relative positions of the blocks don't change over time; priorities we expire can only
+    // decrease; we only have one block that could have broken heap rule and we always move it
+    // down; therefore, we can update priorities of other blocks as we go for part of the heap -
+    // we correct any discrepancy w/the parent after expiring priority, and any block we expire
+    // the priority for already has lower priority than that of its children.
+    // TODO: avoid expiring priorities if times are close? might be needlessly expensive.
+    int ix = buffer.indexInHeap;
+    double priority = buffer.priority;
+    while (true) {
+      int newIx = moveMinChildUp(ix, time, priority);
+      if (newIx == -1) break;
+      ix = newIx;
+    }
+    buffer.indexInHeap = ix;
+    heap[ix] = buffer;
+  }
+
+  /**
+   * Moves the minimum child of targetPos block up to targetPos; optionally compares priorities
+   * and terminates if targetPos element has lesser value than either of its children.
+   * @return the index of the child that was moved up; -1 if nothing was moved due to absence
+   *         of the children, or a failed priority check.
+   */
+  private int moveMinChildUp(int targetPos, long time, double comparePri) {
+    int leftIx = (targetPos << 1) + 1, rightIx = leftIx + 1;
+    if (leftIx >= heapSize) return -1; // Buffer is at the leaf node.
+    LlapCacheableBuffer left = heap[leftIx], right = null;
+    if (rightIx < heapSize) {
+      right = heap[rightIx];
+    }
+    double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time);
+    if (comparePri >= 0 && comparePri <= leftPri && comparePri <= rightPri) {
+      return -1;
+    }
+    if (leftPri <= rightPri) { // prefer left, cause right might be missing
+      heap[targetPos] = left;
+      left.indexInHeap = targetPos;
+      return leftIx;
+    } else {
+      heap[targetPos] = right;
+      right.indexInHeap = targetPos;
+      return rightIx;
+    }
+  }
+
   private double getHeapifyPriority(LlapCacheableBuffer buf, long time) {
-    if (buf == null || buf.isLockedInHeap) return Double.MAX_VALUE;
-    if (buf.lastUpdate != time) {
+    if (buf == null) return Double.MAX_VALUE;
+    if (buf.lastUpdate != time && time >= 0) {
       buf.priority = expirePriority(time, buf.lastUpdate, buf.priority);
       buf.lastUpdate = time;
     }
     return buf.priority;
   }
 
+  private void removeFromListAndUnlock(LlapCacheableBuffer buffer) {
+    try {
+      if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) return;
+      removeFromListUnderLock(buffer);
+      buffer.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
+    } finally {
+      listLock.unlock();
+    }
+  }
+
+  private void removeFromListUnderLock(LlapCacheableBuffer buffer) {
+    if (buffer == listTail) {
+      listTail = buffer.prev;
+    } else {
+      buffer.next.prev = buffer.prev;
+    }
+    if (buffer == listHead) {
+      listHead = buffer.next;
+    } else {
+      buffer.prev.next = buffer.next;
+    }
+  }
+
+  private void removeFromListUnderLock(LlapCacheableBuffer from, LlapCacheableBuffer to) {
+    if (to == listTail) {
+      listTail = from.prev;
+    } else {
+      to.next.prev = from.prev;
+    }
+    if (from == listHead) {
+      listHead = to.next;
+    } else {
+      from.prev.next = to.next;
+    }
+  }
+
   public String debugDumpHeap() {
-    if (heapSize == 0) return "<empty>";
+    StringBuilder result = new StringBuilder("List: ");
+    if (listHead == null) {
+      result.append("<empty>");
+    } else {
+      LlapCacheableBuffer listItem = listHead;
+      while (listItem != null) {
+        result.append(listItem.toStringForCache()).append(" -> ");
+        listItem = listItem.next;
+      }
+    }
+    result.append("\nHeap:");
+    if (heapSize == 0) {
+      result.append(" <empty>\n");
+      return result.toString();
+    }
+    result.append("\n");
     int levels = 32 - Integer.numberOfLeadingZeros(heapSize);
-    StringBuilder result = new StringBuilder();
     int ix = 0;
     int spacesCount = heap[0].toStringForCache().length() + 3;
     String full = StringUtils.repeat(" ", spacesCount),
@@ -230,23 +386,4 @@ public class LowLevelLrfuCachePolicy ext
     }
     return result.toString();
   }
-
-  @VisibleForTesting
-  public LlapCacheableBuffer evictOneMoreBlock() {
-    synchronized (heap) {
-      return evictFromHeapUnderLock(timer.get());
-    }
-  }
-
-  @Override
-  protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
-    long evicted = 0;
-    while (evicted < memoryToReserve) {
-      LlapCacheableBuffer buffer = evictOneMoreBlock();
-      if (buffer == null) return evicted;
-      evicted += buffer.length;
-      listener.notifyEvicted(buffer);
-    }
-    return evicted;
-  }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java Wed Jan 14 02:47:08 2015
@@ -33,7 +33,6 @@ public interface VectorReader {
   public static class ColumnVectorBatch {
     public ColumnVector[] cols;
     public int size;
-    public List<ColumnBuffer> lockedBuffers;
   }
   public ColumnVectorBatch next() throws InterruptedException, IOException;
   public void close() throws IOException;

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Wed Jan 14 02:47:08 2015
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelBuddyCache;
 import org.apache.hadoop.hive.llap.cache.NoopCache;
@@ -52,8 +53,11 @@ public class LlapIoImpl implements LlapI
 
   private LlapIoImpl(Configuration conf) throws IOException {
     this.conf = conf;
-    Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>(); // High-level cache not supported yet.
-    this.edp = new OrcEncodedDataProducer(new LowLevelBuddyCache(conf), cache, conf);
+    boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
+    // High-level cache not supported yet.
+    Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
+    LowLevelBuddyCache orcCache = useLowLevelCache ? new LowLevelBuddyCache(conf) : null;
+    this.edp = new OrcEncodedDataProducer(orcCache, cache, conf);
     this.cvp = new OrcColumnVectorProducer(edp, conf);
   }
 

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Wed Jan 14 02:47:08 2015
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
 import org.apache.hadoop.hive.llap.io.api.VectorReader.ColumnVectorBatch;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -105,6 +106,10 @@ public abstract class ColumnVectorProduc
       }
       if (0 == colsRemaining) {
         ColumnVectorProducer.this.decodeBatch(data.batchKey, targetBatch, downstreamConsumer);
+        // Batch has been decoded; unlock the buffers in cache
+        for (ColumnBuffer cb : targetBatch.columnDatas) {
+          upstreamFeedback.returnData(cb);
+        }
       }
     }
 
@@ -133,10 +138,7 @@ public abstract class ColumnVectorProduc
 
     @Override
     public void returnData(ColumnVectorBatch data) {
-      // TODO#: this should happen earlier, when data is decoded buffers are not needed
-      for (ColumnBuffer lockedBuffer : data.lockedBuffers) {
-        upstreamFeedback.returnData(lockedBuffer);
-      }
+      // TODO: column vectors could be added to object pool here
     }
 
     private void dicardPendingData(boolean isStopped) {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Wed Jan 14 02:47:08 2015
@@ -48,10 +48,11 @@ import org.apache.hadoop.mapred.InputSpl
 
 public class OrcEncodedDataProducer implements EncodedDataProducer<OrcBatchKey> {
   private FileSystem cachedFs = null;
-  private final LowLevelCache lowLevelCache;
   private Configuration conf;
   private OrcMetadataCache metadataCache;
+  // TODO: it makes zero sense to have both at the same time and duplicate data. Add "cache mode".
   private final Cache<OrcCacheKey> cache;
+  private final LowLevelCache lowLevelCache;
 
   private class OrcEncodedDataReader implements EncodedDataReader<OrcBatchKey>,
     Consumer<EncodedColumn<OrcBatchKey>> {
@@ -112,11 +113,11 @@ public class OrcEncodedDataProducer impl
       }
       determineWhatToRead(stripes);
       if (isStopped) return;
-      List<Integer>[] stripeColumnsToRead = produceDataFromCache();
+      List<Integer>[] stripeColsToRead = produceDataFromCache();
       // readState now contains some 1s for column x rgs that were fetched from cache.
       // TODO: I/O threadpool would be here; for now, linear and inefficient
       for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
-        List<Integer> colsToRead = stripeColumnsToRead[stripeIxMod];
+        List<Integer> colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
         long[][] colRgs = readState[stripeIxMod];
         if (colsToRead == null) {
           colsToRead = columnIds;
@@ -139,8 +140,10 @@ public class OrcEncodedDataProducer impl
           orcReader = createOrcReader(split);
         }
         RecordReader stripeReader = orcReader.rows(si.getOffset(), si.getLength(), includes);
-        // We pass in the already-filtered RGs, as well as sarg. ORC can apply additional filtering.
-        stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, lowLevelCache);
+        // In case if we have high-level cache, we will intercept the data and add it there;
+        // otherwise just pass the data directly to the consumer.
+        Consumer<EncodedColumn<OrcBatchKey>> consumer = (cache == null) ? this.consumer : this;
+        stripeReader.readEncodedColumns(colRgs, rgCount, consumer, lowLevelCache);
         stripeReader.close();
       }
 
@@ -152,13 +155,11 @@ public class OrcEncodedDataProducer impl
 
     @Override
     public void returnData(ColumnBuffer data) {
-      // TODO#: return the data to cache (unlock)
+      lowLevelCache.releaseBuffers(data.cacheBuffers);
     }
 
     private void determineWhatToRead(List<StripeInformation> stripes) {
-      // The unit of caching for ORC is (stripe x column) (see OrcBatchKey). Note that we do not use
-      // SARG anywhere, because file-level filtering on sarg is already performed during split
-      // generation, and stripe-level filtering to get row groups is not very helpful right now.
+      // The unit of caching for ORC is (stripe x column) (see OrcBatchKey).
       long offset = split.getStart(), maxOffset = offset + split.getLength();
       stripeIxFrom = stripeIxTo = -1;
       int stripeIx = 0;
@@ -208,6 +209,7 @@ public class OrcEncodedDataProducer impl
           readState[i][j] = new long[bitmaskSize];
         }
       }
+      // TODO: HERE, we need to apply sargs and mark RGs that are filtered as 1s
       rgsPerStripe = new int[stripeRgCounts.size()];
       for (int i = 0; i < rgsPerStripe.length; ++i) {
          rgsPerStripe[i] = stripeRgCounts.get(i);
@@ -215,11 +217,10 @@ public class OrcEncodedDataProducer impl
     }
 
     // TODO: split by stripe? we do everything by stripe, and it might be faster
-    // TODO: return type provisional depending on ORC API
     private List<Integer>[] produceDataFromCache() {
-      // Assumes none of the columns are fetched, because we always do this before reading.
+      if (cache == null) return null;
       OrcCacheKey key = new OrcCacheKey(internedFilePath, -1, -1, -1);
-      @SuppressWarnings("unchecked") // Grr, no generics arrays, "J" in "Java" stands for "joke".
+      @SuppressWarnings("unchecked") // No generics arrays - "J" in "Java" stands for "joke".
       List<Integer>[] stripeColsNotInCache = new List[readState.length];
       for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
         key.stripeIx = stripeIxFrom + stripeIxMod;
@@ -230,6 +231,8 @@ public class OrcEncodedDataProducer impl
           long[] doneMask = cols[colIxMod];
           boolean areAllRgsInCache = true;
           for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
+            int maskIndex = rgIx >>> 6, maskBit = 1 << (rgIx & 63);
+            if ((doneMask[maskIndex] & maskBit) != 0) continue; // RG eliminated by SARG
             key.rgIx = rgIx;
             ColumnBuffer cached = cache.get(key);
             if (cached == null) {
@@ -240,7 +243,7 @@ public class OrcEncodedDataProducer impl
             EncodedColumn<OrcBatchKey> col = new EncodedColumn<OrcBatchKey>(
                 key.copyToPureBatchKey(), key.colIx, cached);
             consumer.consumeData(col);
-            doneMask[rgIx >>> 6] |= 1 << (rgIx & 63);
+            doneMask[maskIndex] = doneMask[maskIndex] | maskBit;
           }
           boolean hasFetchList = stripeColsNotInCache[stripeIxMod] != null;
           if (areAllRgsInCache) {
@@ -273,10 +276,11 @@ public class OrcEncodedDataProducer impl
     @Override
     public void consumeData(EncodedColumn<OrcBatchKey> data) {
       // Store object in cache; create new key object - cannot be reused.
+      assert cache != null;
       OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex);
       ColumnBuffer cached = cache.cacheOrGet(key, data.columnData);
       if (data.columnData != cached) {
-        // TODO: deallocate columnData
+        lowLevelCache.releaseBuffers(data.columnData.cacheBuffers);
         data.columnData = cached;
       }
       consumer.consumeData(data);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Wed Jan 14 02:47:08 2015
@@ -98,7 +98,7 @@ public class LLAPRecordReaderImpl extend
   }
 
   @Override
-  public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
+  public void readEncodedColumns(long[][] colRgs, int rgCount,
       Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache) {
 
   }

Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java?rev=1651559&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java (added)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Wed Jan 14 02:47:08 2015
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assume;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLowLevelLrfuCachePolicy {
+  private static final Log LOG = LogFactory.getLog(TestLowLevelLrfuCachePolicy.class);
+
+  @Test
+  public void testHeapSize2() {
+    testHeapSize(2);
+  }
+
+  @Test
+  public void testHeapSize7() {
+    testHeapSize(7);
+  }
+
+  @Test
+  public void testHeapSize8() {
+    testHeapSize(8);
+  }
+
+  @Test
+  public void testHeapSize30() {
+    testHeapSize(30);
+  }
+
+  private class EvictionTracker implements EvictionListener {
+    public List<LlapCacheableBuffer> evicted = new ArrayList<LlapCacheableBuffer>();
+
+    @Override
+    public void notifyEvicted(LlapCacheableBuffer buffer) {
+      evicted.add(buffer);
+    }
+  }
+
+  @Test
+  public void testLfuExtreme() {
+    int heapSize = 4;
+    LOG.info("Testing lambda 0 (LFU)");
+    Random rdm = new Random(1234);
+    HiveConf conf = new HiveConf();
+    ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
+    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f);
+    EvictionTracker et = new EvictionTracker();
+    LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
+    for (int i = 0; i < heapSize; ++i) {
+      LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+      assertTrue(cache(lfu, et, buffer));
+      inserted.add(buffer);
+    }
+    Collections.shuffle(inserted, rdm);
+    // LFU extreme, order of accesses should be ignored, only frequency matters.
+    // We touch first elements later, but do it less times, so they will be evicted first.
+    for (int i = inserted.size() - 1; i >= 0; --i) {
+      for (int j = 0; j < i + 1; ++j) {
+        lfu.notifyLock(inserted.get(i));
+        lfu.notifyUnlock(inserted.get(i));
+      }
+    }
+    verifyOrder(lfu, et, inserted);
+  }
+
+  @Test
+  public void testLruExtreme() {
+    int heapSize = 4;
+    LOG.info("Testing lambda 1 (LRU)");
+    Random rdm = new Random(1234);
+    HiveConf conf = new HiveConf();
+    ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
+    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+    EvictionTracker et = new EvictionTracker();
+    LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
+    for (int i = 0; i < heapSize; ++i) {
+      LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+      assertTrue(cache(lru, et, buffer));
+      inserted.add(buffer);
+    }
+    Collections.shuffle(inserted, rdm);
+    // LRU extreme, frequency of accesses should be ignored, only order matters.
+    for (int i = 0; i < inserted.size(); ++i) {
+      for (int j = 0; j < (inserted.size() - i); ++j) {
+        lru.notifyLock(inserted.get(i));
+        lru.notifyUnlock(inserted.get(i));
+      }
+    }
+    verifyOrder(lru, et, inserted);
+  }
+
+  @Test
+  public void testDeadlockResolution() {
+    int heapSize = 4;
+    LOG.info("Testing deadlock resolution");
+    ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
+    EvictionTracker et = new EvictionTracker();
+    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(new HiveConf(), 1, heapSize, et);
+    for (int i = 0; i < heapSize; ++i) {
+      LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+      assertTrue(cache(lrfu, et, buffer));
+      inserted.add(buffer);
+    }
+    // Lock the lowest priority buffer; try to evict - we'll evict some other buffer.
+    LlapCacheableBuffer locked = inserted.get(0);
+    lock(lrfu, locked);
+    lrfu.reserveMemory(1, false);
+    LlapCacheableBuffer evicted = et.evicted.get(0);
+    assertNotNull(evicted);
+    assertTrue(evicted.isInvalid());
+    assertNotSame(locked, evicted);
+    unlock(lrfu, locked);
+  }
+
+  private static final LlapCacheableBuffer CANNOT_EVICT = LowLevelBuddyCache.allocateFake();
+  // Buffers in test are fakes not linked to cache; notify cache policy explicitly.
+  public boolean cache(
+      LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapCacheableBuffer buffer) {
+    if (!lrfu.reserveMemory(1, false)) {
+      return false;
+    }
+    buffer.incRef();
+    lrfu.cache(buffer);
+    buffer.decRef();
+    lrfu.notifyUnlock(buffer);
+    return true;
+  }
+
+  private LlapCacheableBuffer getOneEvictedBuffer(EvictionTracker et) {
+    assertTrue(et.evicted.size() == 0 || et.evicted.size() == 1); // test-specific
+    LlapCacheableBuffer result = et.evicted.isEmpty() ? null : et.evicted.get(0);
+    et.evicted.clear();
+    return result;
+  }
+
+  private static void lock(LowLevelLrfuCachePolicy lrfu, LlapCacheableBuffer locked) {
+    locked.incRef();
+    lrfu.notifyLock(locked);
+  }
+
+  private static void unlock(LowLevelLrfuCachePolicy lrfu, LlapCacheableBuffer locked) {
+    locked.decRef();
+    lrfu.notifyUnlock(locked);
+  }
+
+  private void testHeapSize(int heapSize) {
+    LOG.info("Testing heap size " + heapSize);
+    Random rdm = new Random(1234);
+    HiveConf conf = new HiveConf();
+    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.05f); // very small heap? TODO#
+    EvictionTracker et = new EvictionTracker();
+    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
+    // Insert the number of elements plus 2, to trigger 2 evictions.
+    int toEvict = 2;
+    ArrayList<LlapCacheableBuffer> inserted = new ArrayList<LlapCacheableBuffer>(heapSize);
+    LlapCacheableBuffer[] evicted = new LlapCacheableBuffer[toEvict];
+    Assume.assumeTrue(toEvict <= heapSize);
+    for (int i = 0; i < heapSize + toEvict; ++i) {
+      LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+      assertTrue(cache(lrfu, et, buffer));
+      LlapCacheableBuffer evictedBuf = getOneEvictedBuffer(et);
+      if (i < toEvict) {
+        evicted[i] = buffer;
+      } else {
+        if (i >= heapSize) {
+          assertSame(evicted[i - heapSize], evictedBuf);
+          assertTrue(evictedBuf.isInvalid());
+        } else {
+          assertNull(evictedBuf);
+        }
+        inserted.add(buffer);
+      }
+    }
+    LOG.info("Inserted " + dumpInserted(inserted));
+    // We will touch all blocks in random order.
+    Collections.shuffle(inserted, rdm);
+    LOG.info("Touch order " + dumpInserted(inserted));
+    // Lock entire heap; heap is still full; we should not be able to evict or insert.
+    for (LlapCacheableBuffer buf : inserted) {
+      lock(lrfu, buf);
+    }
+    assertFalse(lrfu.reserveMemory(1, false));
+    if (!et.evicted.isEmpty()) {
+      assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty());
+    }
+    for (LlapCacheableBuffer buf : inserted) {
+      unlock(lrfu, buf);
+    }
+    // To make (almost) sure we get definite order, touch blocks in order large number of times.
+    for (LlapCacheableBuffer buf : inserted) {
+      // TODO: this seems to indicate that priorities change too little...
+      //       perhaps we need to adjust the policy.
+      for (int j = 0; j < 10; ++j) {
+        lrfu.notifyLock(buf);
+        lrfu.notifyUnlock(buf);
+      }
+    }
+    verifyOrder(lrfu, et, inserted);
+  }
+
+  private void verifyOrder(LowLevelLrfuCachePolicy lrfu,
+      EvictionTracker et, ArrayList<LlapCacheableBuffer> inserted) {
+    LlapCacheableBuffer block;
+    // Evict all blocks.
+    et.evicted.clear();
+    for (int i = 0; i < inserted.size(); ++i) {
+      assertTrue(lrfu.reserveMemory(1, false));
+    }
+    // The map should now be empty.
+    assertFalse(lrfu.reserveMemory(1, false));
+    for (int i = 0; i < inserted.size(); ++i) {
+      block = et.evicted.get(i);
+      assertTrue(block.isInvalid());
+      assertSame(inserted.get(i), block);
+    }
+  }
+
+  private String dumpInserted(ArrayList<LlapCacheableBuffer> inserted) {
+    String debugStr = "";
+    for (int i = 0; i < inserted.size(); ++i) {
+      if (i != 0) debugStr += ", ";
+      debugStr += inserted.get(i);
+    }
+    return debugStr;
+  }
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Wed Jan 14 02:47:08 2015
@@ -96,6 +96,6 @@ public interface RecordReader {
    * @param consumer Consumer to pass the results too.
    * @param allocator Allocator to allocate memory.
    */
-  void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
+  void readEncodedColumns(long[][] colRgs, int rgCount,
       Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache);
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1651559&r1=1651558&r2=1651559&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Jan 14 02:47:08 2015
@@ -3302,7 +3302,7 @@ public class RecordReaderImpl implements
   }
 
   @Override
-  public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
+  public void readEncodedColumns(long[][] colRgs, int rgCount,
       Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache allocator) {
     // TODO: HERE read encoded data
   }