You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/16 04:30:47 UTC

svn commit: r1652337 - in /hive/branches/llap: llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ llap-server/src/test/org...

Author: sershe
Date: Fri Jan 16 03:30:46 2015
New Revision: 1652337

URL: http://svn.apache.org/r1652337
Log:
Separated allocator and cache; unit tests for allocator, fixed a bunch of bugs

Added:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
Removed:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
Modified:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java Fri Jan 16 03:30:46 2015
@@ -24,7 +24,9 @@ public abstract class LlapMemoryBuffer {
   protected LlapMemoryBuffer(ByteBuffer byteBuffer, int offset, int length) {
     initialize(byteBuffer, offset, length);
   }
-  public void initialize(ByteBuffer byteBuffer, int offset, int length) {
+  protected LlapMemoryBuffer() {
+  }
+  protected void initialize(ByteBuffer byteBuffer, int offset, int length) {
     this.byteBuffer = byteBuffer;
     this.offset = offset;
     this.length = length;
@@ -32,5 +34,4 @@ public abstract class LlapMemoryBuffer {
   public ByteBuffer byteBuffer;
   public int offset;
   public int length;
-
 }
\ No newline at end of file

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Fri Jan 16 03:30:46 2015
@@ -23,11 +23,13 @@ public interface LowLevelCache {
 
   /**
    * Gets file data for particular offsets. Null entries mean no data.
+   * @param file File name; MUST be interned.
    */
   LlapMemoryBuffer[] getFileData(String fileName, long[] offsets);
 
   /**
    * Puts file data into cache.
+   * @param file File name; MUST be interned.
    * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
    *         the replacement chunks from cache are updated directly in the array.
    */

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+
+final class BuddyAllocator {
+  private static final Log LOG = LogFactory.getLog(BuddyAllocator.class);
+
+  private final Arena[] arenas;
+  private AtomicInteger allocatedArenas = new AtomicInteger(0);
+
+  private final MemoryManager memoryManager;
+
+  // Config settings
+  private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;
+  private final int minAllocation, maxAllocation, arenaSize;
+  private final long maxSize;
+
+  public BuddyAllocator(Configuration conf, MemoryManager memoryManager) {
+    minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+    maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
+    arenaSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
+    maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+    if (minAllocation < 8) {
+      throw new AssertionError("Min allocation must be at least 8: " + minAllocation);
+    }
+    if (maxSize < arenaSize || arenaSize < maxAllocation || maxAllocation < minAllocation) {
+      throw new AssertionError("Inconsistent sizes of cache, arena and allocations: "
+          + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSize);
+    }
+    if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)
+        || (Long.bitCount(arenaSize) != 1)) {
+      // Technically, arena size only needs to be divisible by maxAlloc
+      throw new AssertionError("Allocation and arena sizes must be powers of two: "
+          + minAllocation + ", " + maxAllocation + ", " + arenaSize);
+    }
+    if ((maxSize % arenaSize) > 0 || (maxSize / arenaSize) > Integer.MAX_VALUE) {
+      throw new AssertionError(
+          "Cache size not consistent with arena size: " + arenaSize + "," + maxSize);
+    }
+    minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
+    maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
+    arenaSizeLog2 = 63 - Long.numberOfLeadingZeros(arenaSize);
+    maxArenas = (int)(maxSize / arenaSize);
+    arenas = new Arena[maxArenas];
+    for (int i = 0; i < maxArenas; ++i) {
+      arenas[i] = new Arena();
+    }
+    arenas[0].init();
+    allocatedArenas.set(1);
+    this.memoryManager = memoryManager;
+  }
+
+  // TODO: would it make sense to return buffers asynchronously?
+  public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+    assert size > 0 : "size is " + size;
+    if (size > maxAllocation) {
+      throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
+    }
+    int freeListIx = 31 - Integer.numberOfLeadingZeros(size);
+    if (size != (1 << freeListIx)) ++freeListIx; // not a power of two, add one more
+    freeListIx = Math.max(freeListIx - minAllocLog2, 0);
+    int allocLog2 = freeListIx + minAllocLog2;
+    int allocationSize = 1 << allocLog2;
+    // TODO: reserving the entire thing is not ideal before we alloc anything. Interleave?
+    memoryManager.reserveMemory(dest.length << allocLog2, true);
+
+    int ix = 0;
+    for (int i = 0; i < dest.length; ++i) {
+      if (dest[i] != null) continue;
+      dest[i] = new LlapCacheableBuffer(); // TODO: pool of objects?
+    }
+    // First try to quickly lock some of the correct-sized free lists and allocate from them.
+    int arenaCount = allocatedArenas.get();
+    long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
+    {
+      int startIndex = (int)(threadId % arenaCount), index = startIndex;
+      do {
+        int newIx = arenas[index].allocateFast(index, freeListIx, dest, ix, allocationSize);
+        if (newIx == dest.length) return true;
+        if (newIx != -1) {  // Shouldn't happen.
+          ix = newIx;
+        }
+        if ((++index) == arenaCount) {
+          index = 0;
+        }
+      } while (index != startIndex);
+    }
+
+    // Then try to split bigger blocks. TODO: again, ideally we would tryLock at least once
+    for (int i = 0; i < arenaCount; ++i) {
+      int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize);
+      if (newIx == -1) break; // Shouldn't happen.
+      if (newIx == dest.length) return true;
+      ix = newIx;
+    }
+    // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
+    for (int i = arenaCount; i < arenas.length; ++i) {
+      ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize);
+      if (ix == dest.length) return true;
+    }
+    return false;
+  }
+
+  public static LlapCacheableBuffer allocateFake() {
+    LlapCacheableBuffer fake = new LlapCacheableBuffer();
+    fake.initialize(-1, null, -1, 1);
+    return fake;
+  }
+
+  public void deallocate(LlapCacheableBuffer buffer) {
+    arenas[buffer.arenaIndex].deallocate(buffer);
+  }
+
+  public String debugDump() {
+    StringBuilder result = new StringBuilder();
+    for (Arena arena : arenas) {
+      arena.debugDump(result);
+    }
+    result.append("\n");
+    return result.toString();
+  }
+
+  private class Arena {
+    private ByteBuffer data;
+    // Avoid storing headers with data since we expect binary size allocations.
+    // Each headers[i] is a "virtual" byte at i * minAllocation.
+    private byte[] headers;
+    private FreeList[] freeLists;
+
+    void init() {
+      data = ByteBuffer.allocateDirect(arenaSize);
+      int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
+      headers = new byte[maxMinAllocs];
+      int allocLog2Diff = maxAllocLog2 - minAllocLog2, freeListCount = allocLog2Diff + 1;
+      freeLists = new FreeList[freeListCount];
+      for (int i = 0; i < freeListCount; ++i) {
+        freeLists[i] = new FreeList();
+      }
+      int maxMaxAllocs = 1 << (arenaSizeLog2 - maxAllocLog2),
+          headerIndex = 0, headerStep = 1 << allocLog2Diff;
+      freeLists[allocLog2Diff].listHead = 0;
+      for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
+        // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
+        headers[headerIndex] = makeHeader(allocLog2Diff, false);
+        LOG.info("TODO# 1 mucking with " + System.identityHashCode(data) + ":" + offset);
+        data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerStep));
+        data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerStep));
+        headerIndex += headerStep;
+      }
+    }
+
+    public void debugDump(StringBuilder result) {
+      result.append("\nArena: ");
+      if (data == null) {
+        result.append(" not allocated");
+        return;
+      }
+      for (int i = 0; i < headers.length; ++i) {
+        byte header = headers[i];
+        if (header == 0) continue;
+        int freeListIx = (header >> 1) - 1, offset = offsetFromHeaderIndex(i);
+        boolean isFree = (header & 1) == 0;
+        result.append("\n  block " + i + " at " + offset + ": size " + (1 << (freeListIx + minAllocLog2))
+            + ", " + (isFree ? "free" : "allocated"));
+      }
+      int allocSize = minAllocation;
+      for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
+        result.append("\n  free list for size " + allocSize + ": ");
+        int nextItem = freeLists[i].listHead;
+        while (nextItem >= 0) {
+          result.append(nextItem + ", ");
+          nextItem = data.getInt(offsetFromHeaderIndex(nextItem));
+        }
+      }
+    }
+
+    private int allocateFast(int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+      if (data == null) return -1; // not allocated yet
+      FreeList freeList = freeLists[freeListIx];
+      if (!freeList.lock.tryLock()) return ix;
+      try {
+        return allocateFromFreeListUnderLock(arenaIx, freeList, freeListIx, dest, ix, size);
+      } finally {
+        freeList.lock.unlock();
+      }
+    }
+
+    private int allocateWithSplit(int arenaIx, int freeListIx,
+        LlapMemoryBuffer[] dest, int ix, int allocationSize) {
+      if (data == null) return -1; // not allocated yet
+      FreeList freeList = freeLists[freeListIx];
+      int remaining = -1;
+      freeList.lock.lock();
+      try {
+        ix = allocateFromFreeListUnderLock(
+            arenaIx, freeList, freeListIx, dest, ix, allocationSize);
+        remaining = dest.length - ix;
+        if (remaining == 0) return ix;
+      } finally {
+        freeList.lock.unlock();
+      }
+      byte headerData = makeHeader(freeListIx, true);
+      int headerStep = 1 << freeListIx;
+      int splitListIx = freeListIx + 1;
+      while (remaining > 0 && splitListIx < freeLists.length) {
+        int splitWays = 1 << (splitListIx - freeListIx);
+        int lastSplitBlocksRemaining = -1, lastSplitNextHeader = -1;
+        FreeList splitList = freeLists[splitListIx];
+        splitList.lock.lock();
+        try {
+          int headerIx = splitList.listHead;
+          while (headerIx >= 0 && remaining > 0) {
+            int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset;
+            int toTake = Math.min(splitWays, remaining);
+            remaining -= toTake;
+            lastSplitBlocksRemaining = splitWays - toTake;
+            for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
+              headers[headerIx] = headerData;
+              ((LlapCacheableBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
+            }
+            lastSplitNextHeader = headerIx;
+            headerIx = data.getInt(origOffset + 4);
+          }
+          replaceListHeadUnderLock(splitList, headerIx);
+        } finally {
+          splitList.lock.unlock();
+        }
+        if (remaining == 0) {
+          // We have just obtained all we needed by splitting at lastSplitBlockOffset; now
+          // we need to put the space remaining from that block into lower free lists.
+          int newListIndex = freeListIx;
+          while (lastSplitBlocksRemaining > 0) {
+            if ((lastSplitBlocksRemaining & 1) == 1) {
+              FreeList newFreeList = freeLists[newListIndex];
+              newFreeList.lock.lock();
+              headers[lastSplitNextHeader] = makeHeader(newListIndex, false);
+              try {
+                addBlockToFreeListUnderLock(newFreeList, lastSplitNextHeader);
+              } finally {
+                newFreeList.lock.unlock();
+              }
+              lastSplitNextHeader += (1 << newListIndex);
+            }
+            lastSplitBlocksRemaining >>>= 1;
+            ++newListIndex;
+            continue;
+          }
+        }
+        ++splitListIx;
+      }
+      return ix;
+    }
+
+    private void replaceListHeadUnderLock(FreeList freeList, int headerIx) {
+      if (headerIx == freeList.listHead) return;
+      if (headerIx >= 0) {
+        int newHeadOffset = offsetFromHeaderIndex(headerIx);
+        LOG.info("TODO# 3 mucking with " + System.identityHashCode(data) + ":" + newHeadOffset);
+        data.putInt(newHeadOffset, -1); // Remove backlink.
+      }
+      freeList.listHead = headerIx;
+    }
+
+    private int allocateWithExpand(
+        int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+      if (data == null) {
+        synchronized (this) {
+          // Never goes from non-null to null, so this is the only place we need sync.
+          if (data == null) {
+            init();
+            allocatedArenas.incrementAndGet();
+          }
+        }
+      }
+      return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+    }
+
+    public int offsetFromHeaderIndex(int lastSplitNextHeader) {
+      return lastSplitNextHeader << minAllocLog2;
+    }
+
+    public int allocateFromFreeListUnderLock(int arenaIx, FreeList freeList,
+        int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+      int current = freeList.listHead;
+      while (current >= 0 && ix < dest.length) {
+        int offset = offsetFromHeaderIndex(current);
+        // Noone else has this either allocated or in a different free list; no sync needed.
+        headers[current] = makeHeader(freeListIx, true);
+        current = data.getInt(offset + 4);
+        ((LlapCacheableBuffer)dest[ix]).initialize(arenaIx, data, offset, size);
+        ++ix;
+      }
+      replaceListHeadUnderLock(freeList, current);
+      return ix;
+    }
+
+    private byte makeHeader(int freeListIx, boolean isInUse) {
+      return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
+    }
+
+    public void deallocate(LlapCacheableBuffer buffer) {
+      assert data != null;
+      int freeListIx = 31 - Integer.numberOfLeadingZeros(buffer.length) - minAllocLog2,
+          headerIx = buffer.offset >>> minAllocLog2;
+      while (true) {
+        FreeList freeList = freeLists[freeListIx];
+        int bHeaderIx = headerIx ^ (1 << freeListIx);
+        freeList.lock.lock();
+        try {
+          if ((freeListIx == freeLists.length - 1)
+            || headers[bHeaderIx] != makeHeader(freeListIx, false)) {
+            // Buddy block is allocated, or it is on higher level of allocation than we are, or we
+            // have reached the top level. Add whatever we have got to the current free list.
+            addBlockToFreeListUnderLock(freeList, headerIx);
+            headers[headerIx] = makeHeader(freeListIx, false);
+            break;
+          }
+          // Buddy block is free and in the same free list we have locked. Take it out for merge.
+          removeBlockFromFreeList(freeList, bHeaderIx);
+          headers[bHeaderIx] = headers[headerIx] = 0; // Erase both headers of the blocks to merge.
+        } finally {
+          freeList.lock.unlock();
+        }
+        ++freeListIx;
+        headerIx = Math.min(headerIx, bHeaderIx);
+      }
+    }
+
+    private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx) {
+      if (freeList.listHead >= 0) {
+        int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead);
+        assert data.getInt(oldHeadOffset) == -1;
+        LOG.info("TODO# 4 mucking with " + System.identityHashCode(data) + ":" + oldHeadOffset);
+        data.putInt(oldHeadOffset, headerIx);
+      }
+      int offset = offsetFromHeaderIndex(headerIx);
+      LOG.info("TODO# 5 mucking with " + System.identityHashCode(data) + ":" + offset);
+      data.putInt(offset, -1);
+      data.putInt(offset + 4, freeList.listHead);
+      freeList.listHead = headerIx;
+    }
+
+    private void removeBlockFromFreeList(FreeList freeList, int headerIx) {
+      int bOffset = offsetFromHeaderIndex(headerIx),
+          bpHeaderIx = data.getInt(bOffset), bnHeaderIx = data.getInt(bOffset + 4);
+      if (freeList.listHead == headerIx) {
+        assert bpHeaderIx == -1;
+        freeList.listHead = bnHeaderIx;
+      }
+      if (bpHeaderIx != -1) {
+        data.putInt(offsetFromHeaderIndex(bpHeaderIx) + 4, bnHeaderIx);
+        LOG.info("TODO# 6 mucking with " + System.identityHashCode(data) + ":" + offsetFromHeaderIndex(bpHeaderIx) + " + 4");
+      }
+      if (bnHeaderIx != -1) {
+        data.putInt(offsetFromHeaderIndex(bnHeaderIx), bpHeaderIx);
+        LOG.info("TODO# 7 mucking with " + System.identityHashCode(data) + ":" + offsetFromHeaderIndex(bnHeaderIx));
+      }
+    }
+  }
+
+  private static class FreeList {
+    ReentrantLock lock = new ReentrantLock(false);
+    int listHead = -1; // Index of where the buffer is; in minAllocation units
+    // TODO: One possible improvement - store blocks arriving left over from splits, and
+    //       blocks requested, to be able to wait for pending splits and reduce fragmentation.
+    //       However, we are trying to increase fragmentation now, since we cater to single-size.
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Fri Jan 16 03:30:46 2015
@@ -26,8 +26,12 @@ import org.apache.hadoop.hive.llap.io.ap
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
 public final class LlapCacheableBuffer extends LlapMemoryBuffer {
-  public LlapCacheableBuffer(ByteBuffer byteBuffer, int offset, int length) {
-    super(byteBuffer, offset, length);
+  private static final int EVICTED_REFCOUNT = -1;
+  static final int IN_LIST = -2, NOT_IN_CACHE = -1;
+
+  public void initialize(int arenaIndex, ByteBuffer byteBuffer, int offset, int length) {
+    super.initialize(byteBuffer, offset, length);
+    this.arenaIndex = arenaIndex;
   }
 
   public String toStringForCache() {
@@ -35,15 +39,13 @@ public final class LlapCacheableBuffer e
         + lastUpdate + " " + (isLocked() ? "!" : ".") + "]";
   }
 
-  private static final int EVICTED_REFCOUNT = -1;
   private final AtomicInteger refCount = new AtomicInteger(0);
 
-  // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object.
+  public int arenaIndex = -1;
   public double priority;
   public long lastUpdate = -1;
   public LlapCacheableBuffer prev = null, next = null;
   public int indexInHeap = NOT_IN_CACHE;
-  public static final int IN_LIST = -2, NOT_IN_CACHE = -1;
 
   @Override
   public int hashCode() {

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+public class LowLevelCacheImpl implements LowLevelCache, EvictionListener {
+  private final BuddyAllocator allocator;
+
+  private AtomicInteger newEvictions = new AtomicInteger(0);
+  private final Thread cleanupThread;
+  private final ConcurrentHashMap<String, FileCache> cache =
+      new ConcurrentHashMap<String, FileCache>();
+  private final LowLevelCachePolicyBase cachePolicy;
+
+  public LowLevelCacheImpl(Configuration conf) {
+    int minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+    long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+    cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
+        ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this)
+        : new LowLevelFifoCachePolicy(minAllocation, maxSize, this);
+    allocator = new BuddyAllocator(conf, cachePolicy);
+    cleanupThread = new CleanupThread();
+    cleanupThread.start();
+  }
+
+  @Override
+  public void allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+    allocator.allocateMultiple(dest, size);
+  }
+
+  @Override
+  public LlapMemoryBuffer[] getFileData(String fileName, long[] offsets) {
+    LlapMemoryBuffer[] result = null;
+    FileCache subCache = cache.get(fileName);
+    if (subCache == null || !subCache.incRef()) return result;
+    try {
+      for (int i = 0; i < offsets.length; ++i) {
+        while (true) { // Overwhelmingly only runs once.
+          long offset = offsets[i];
+          LlapCacheableBuffer buffer = subCache.cache.get(offset);
+          if (buffer == null) break;
+          if (lockBuffer(buffer)) {
+            if (result == null) {
+              result = new LlapCacheableBuffer[offsets.length];
+            }
+            result[i] = buffer;
+            break;
+          }
+          if (subCache.cache.remove(offset, buffer)) break;
+        }
+      }
+    } finally {
+      subCache.decRef();
+    }
+    return result;
+  }
+
+  private boolean lockBuffer(LlapCacheableBuffer buffer) {
+    int rc = buffer.incRef();
+    if (rc == 1) {
+      cachePolicy.notifyLock(buffer);
+    }
+    return rc >= 0;
+  }
+
+  @Override
+  public long[] putFileData(String fileName, long[] offsets, LlapMemoryBuffer[] buffers) {
+    long[] result = null;
+    assert buffers.length == offsets.length;
+    FileCache subCache = getOrAddFileSubCache(fileName);
+    try {
+      for (int i = 0; i < offsets.length; ++i) {
+        LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i];
+        long offset = offsets[i];
+        assert buffer.isLocked();
+        while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
+          LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
+          if (oldVal == null) {
+            // Cached successfully, add to policy.
+            cachePolicy.cache(buffer);
+            break;
+          }
+          if (DebugUtils.isTraceCachingEnabled()) {
+            LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for "
+                + fileName + "@" + offset  + "; old " + oldVal + ", new " + buffer);
+          }
+          if (lockBuffer(oldVal)) {
+            // We found an old, valid block for this key in the cache.
+            releaseBufferInternal(buffer);
+            buffers[i] = oldVal;
+            if (result == null) {
+              result = new long[align64(buffers.length) >>> 6];
+            }
+            result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value
+            break;
+          }
+          // We found some old value but couldn't incRef it; remove it.
+          subCache.cache.remove(offset, oldVal);
+        }
+      }
+    } finally {
+      subCache.decRef();
+    }
+    return result;
+  }
+
+  /**
+   * All this mess is necessary because we want to be able to remove sub-caches for fully
+   * evicted files. It may actually be better to have non-nested map with object keys?
+   */
+  public FileCache getOrAddFileSubCache(String fileName) {
+    FileCache newSubCache = null;
+    while (true) { // Overwhelmingly executes once.
+      FileCache subCache = cache.get(fileName);
+      if (subCache != null) {
+        if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
+        if (newSubCache == null) {
+          newSubCache = new FileCache();
+          newSubCache.incRef();
+        }
+        // Found a stale value we cannot incRef; try to replace it with new value.
+        if (cache.replace(fileName, subCache, newSubCache)) return newSubCache;
+        continue; // Someone else replaced/removed a stale value, try again.
+      }
+      // No value found.
+      if (newSubCache == null) {
+        newSubCache = new FileCache();
+        newSubCache.incRef();
+      }
+      FileCache oldSubCache = cache.putIfAbsent(fileName, newSubCache);
+      if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
+      if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
+      // Someone created one in parallel and then it went stale.
+      if (cache.replace(fileName, oldSubCache, newSubCache)) return newSubCache;
+      // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
+    }
+  }
+
+  private static int align64(int number) {
+    return ((number + 63) & ~63);
+  }
+
+
+  @Override
+  public void releaseBuffer(LlapMemoryBuffer buffer) {
+    releaseBufferInternal((LlapCacheableBuffer)buffer);
+  }
+
+  @Override
+  public void releaseBuffers(LlapMemoryBuffer[] cacheBuffers) {
+    for (int i = 0; i < cacheBuffers.length; ++i) {
+      releaseBufferInternal((LlapCacheableBuffer)cacheBuffers[i]);
+    }
+  }
+
+  public void releaseBufferInternal(LlapCacheableBuffer buffer) {
+    if (buffer.decRef() == 0) {
+      cachePolicy.notifyUnlock(buffer);
+    }
+  }
+
+  public static LlapCacheableBuffer allocateFake() {
+    LlapCacheableBuffer fake = new LlapCacheableBuffer();
+    fake.initialize(-1, null, -1, 1);
+    return fake;
+  }
+
+  @Override
+  public void notifyEvicted(LlapCacheableBuffer buffer) {
+    allocator.deallocate(buffer);
+    newEvictions.incrementAndGet();
+  }
+
+  private static class FileCache {
+    private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
+    // TODO: given the specific data, perhaps the nested thing should not be CHM
+    private ConcurrentHashMap<Long, LlapCacheableBuffer> cache
+      = new ConcurrentHashMap<Long, LlapCacheableBuffer>();
+    private AtomicInteger refCount = new AtomicInteger(0);
+
+    boolean incRef() {
+      while (true) {
+        int value = refCount.get();
+        if (value == EVICTED_REFCOUNT) return false;
+        if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
+        assert value >= 0;
+        if (refCount.compareAndSet(value, value + 1)) return true;
+      }
+    }
+
+    void decRef() {
+      int value = refCount.decrementAndGet();
+      if (value < 0) {
+        throw new AssertionError("Unexpected refCount " + value);
+      }
+    }
+
+    boolean startEvicting() {
+      while (true) {
+        int value = refCount.get();
+        if (value != 1) return false;
+        if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
+      }
+    }
+
+    void commitEvicting() {
+      boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
+      assert result;
+    }
+
+    void abortEvicting() {
+      boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
+      assert result;
+    }
+  }
+
+  private final class CleanupThread extends Thread {
+    private int APPROX_CLEANUP_INTERVAL_SEC = 600;
+
+    public CleanupThread() {
+      super("Llap low level cache cleanup thread");
+      setDaemon(true);
+      setPriority(1);
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          doOneCleanupRound();
+        } catch (InterruptedException ex) {
+          LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
+          Thread.currentThread().interrupt();
+          break;
+        } catch (Throwable t) {
+          LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
+          break;
+        }
+      }
+    }
+
+    private void doOneCleanupRound() throws InterruptedException {
+      while (true) {
+        int evictionsSinceLast = newEvictions.getAndSet(0);
+        if (evictionsSinceLast > 0) break;
+        synchronized (newEvictions) {
+          newEvictions.wait(10000);
+        }
+      }
+      // Duration is an estimate; if the size of the map changes, it can be very different.
+      long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L;
+      int leftToCheck = 0; // approximate
+      for (FileCache fc : cache.values()) {
+        leftToCheck += fc.cache.size();
+      }
+      // Iterate thru all the filecaches. This is best-effort.
+      // If these super-long-lived iterator affects the map in some bad way,
+      // we'd need to sleep once per round instead.
+      Iterator<Map.Entry<String, FileCache>> iter = cache.entrySet().iterator();
+      while (iter.hasNext()) {
+        FileCache fc = iter.next().getValue();
+        if (!fc.incRef()) {
+          throw new AssertionError("Something other than cleanup is removing elements from map");
+        }
+        // Iterate thru the file cache. This is best-effort.
+        Iterator<Map.Entry<Long, LlapCacheableBuffer>> subIter = fc.cache.entrySet().iterator();
+        boolean isEmpty = true;
+        while (subIter.hasNext()) {
+          Thread.sleep((leftToCheck <= 0)
+              ? 1 : (endTime - System.nanoTime()) / (1000000L * leftToCheck));
+          if (subIter.next().getValue().isInvalid()) {
+            subIter.remove();
+          } else {
+            isEmpty = false;
+          }
+          --leftToCheck;
+        }
+        if (!isEmpty) {
+          fc.decRef();
+          continue;
+        }
+        // FileCache might be empty; see if we can remove it. "tryWriteLock"
+        if (!fc.startEvicting()) continue;
+        if (fc.cache.isEmpty()) {
+          fc.commitEvicting();
+          iter.remove();
+        } else {
+          fc.abortEvicting();
+        }
+      }
+    }
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Fri Jan 16 03:30:46 2015
@@ -22,5 +22,4 @@ public interface LowLevelCachePolicy {
   void cache(LlapCacheableBuffer buffer);
   void notifyLock(LlapCacheableBuffer buffer);
   void notifyUnlock(LlapCacheableBuffer buffer);
-  boolean reserveMemory(long memoryToReserve, boolean oneEviction);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Fri Jan 16 03:30:46 2015
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.llap.cach
 
 import java.util.concurrent.atomic.AtomicLong;
 
-public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy {
+public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy, MemoryManager {
   private final AtomicLong usedMemory;
   private final long maxSize;
   private EvictionListener evictionListener;

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+public interface MemoryManager {
+  boolean reserveMemory(long memoryToReserve, boolean waitForEviction);
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Fri Jan 16 03:30:46 2015
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.cache.Cache;
-import org.apache.hadoop.hive.llap.cache.LowLevelBuddyCache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.cache.NoopCache;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.api.VectorReader;
@@ -56,7 +56,7 @@ public class LlapIoImpl implements LlapI
     boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
     // High-level cache not supported yet.
     Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
-    LowLevelBuddyCache orcCache = useLowLevelCache ? new LowLevelBuddyCache(conf) : null;
+    LowLevelCacheImpl orcCache = useLowLevelCache ? new LowLevelCacheImpl(conf) : null;
     this.edp = new OrcEncodedDataProducer(orcCache, cache, conf);
     this.cvp = new OrcColumnVectorProducer(edp, conf);
   }

Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java (added)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBuddyAllocator {
+  private static final Log LOG = LogFactory.getLog(TestBuddyAllocator.class);
+  private final Random rdm = new Random(2284);
+
+  private static class DummyMemoryManager implements MemoryManager {
+    @Override
+    public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) {
+      return true;
+    }
+  }
+
+  @Test
+  public void testVariableSizeAllocs() {
+    testVariableSizeInternal(1, 2, 1);
+  }
+
+  @Test
+  public void testVariableSizeMultiAllocs() {
+    testVariableSizeInternal(3, 2, 3);
+    testVariableSizeInternal(5, 2, 5);
+  }
+
+  @Test
+  public void testSameSizes() {
+    int min = 3, max = 8, maxAlloc = 1 << max;
+    Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, maxAlloc);
+    BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
+    for (int i = min; i <= max; i <<= 1) {
+      allocSameSize(a, 1 << (max - i), i);
+    }
+  }
+
+  @Test
+  public void testMultipleArenas() {
+    int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5;
+    Configuration conf = createConf(1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount);
+    BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
+    allocSameSize(a, arenaCount * 2, allocLog2);
+  }
+
+  @Test
+  public void testMTT() {
+    final int min = 3, max = 8, maxAlloc = 1 << max, allocsPerSize = 3;
+    Configuration conf = createConf(1 << min, maxAlloc, maxAlloc * 8, maxAlloc * 24);
+    final BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    FutureTask<Object> upTask = new FutureTask<Object>(new Runnable() {
+      public void run() {
+        allocateUp(a, min, max, allocsPerSize, false);
+        allocateUp(a, min, max, allocsPerSize, true);
+      }
+    }, null), downTask = new FutureTask<Object>(new Runnable() {
+      public void run() {
+        allocateDown(a, min, max, allocsPerSize, false);
+        allocateDown(a, min, max, allocsPerSize, true);
+      }
+    }, null), sameTask = new FutureTask<Object>(new Runnable() {
+      public void run() {
+        for (int i = min; i <= max; i <<= 1) {
+          allocSameSize(a, (1 << (max - i)) * allocsPerSize, i);
+        }
+      }
+    }, null);
+    executor.execute(sameTask);
+    executor.execute(upTask);
+    executor.execute(downTask);
+    try {
+      upTask.get();
+      downTask.get();
+      sameTask.get();
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
+
+  private void testVariableSizeInternal(int allocCount, int arenaSizeMult, int arenaCount) {
+    int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult;
+    Configuration conf = createConf(1 << min, maxAlloc, arenaSize, arenaSize * arenaCount);
+    BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
+    allocateUp(a, min, max, allocCount, true);
+    allocateDown(a, min, max, allocCount, true);
+    allocateDown(a, min, max, allocCount, false);
+    allocateUp(a, min, max, allocCount, true);
+    allocateUp(a, min, max, allocCount, false);
+    allocateDown(a, min, max, allocCount, true);
+  }
+
+  private void allocSameSize(BuddyAllocator a, int allocCount, int sizeLog2) {
+    LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[allocCount][];
+    long[][] testValues = new long[allocCount][];
+    for (int j = 0; j < allocCount; ++j) {
+      allocateAndUseBuffer(a, allocs, testValues, 1, j, sizeLog2);
+    }
+    deallocUpOrDown(a, false, allocs, testValues);
+  }
+
+  private void allocateUp(
+      BuddyAllocator a, int min, int max, int allocPerSize, boolean isSameOrderDealloc) {
+    int sizes = max - min + 1;
+    LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
+    // Put in the beginning; relies on the knowledge of internal implementation. Pave?
+    long[][] testValues = new long[sizes][];
+    for (int i = min; i <= max; ++i) {
+      allocateAndUseBuffer(a, allocs, testValues, allocPerSize, i - min, i);
+    }
+    deallocUpOrDown(a, isSameOrderDealloc, allocs, testValues);
+  }
+
+  private void allocateDown(
+      BuddyAllocator a, int min, int max, int allocPerSize, boolean isSameOrderDealloc) {
+    int sizes = max - min + 1;
+    LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
+    // Put in the beginning; relies on the knowledge of internal implementation. Pave?
+    long[][] testValues = new long[sizes][];
+    for (int i = max; i >= min; --i) {
+      allocateAndUseBuffer(a, allocs, testValues, allocPerSize, i - min, i);
+    }
+    deallocUpOrDown(a, isSameOrderDealloc, allocs, testValues);
+  }
+
+  private void allocateAndUseBuffer(BuddyAllocator a, LlapMemoryBuffer[][] allocs,
+      long[][] testValues, int allocCount, int index, int sizeLog2) {
+    allocs[index] = new LlapMemoryBuffer[allocCount];
+    testValues[index] = new long[allocCount];
+    int size = (1 << sizeLog2) - 1;
+    if (!a.allocateMultiple(allocs[index], size)) {
+      LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.debugDump());
+      fail();
+    }
+    // LOG.info("Allocated " + allocCount + " of " + size + "; " + a.debugDump());
+    for (int j = 0; j < allocCount; ++j) {
+      LlapMemoryBuffer mem = allocs[index][j];
+      long testValue = testValues[index][j] = rdm.nextLong();
+      mem.byteBuffer.putLong(mem.offset, testValue);
+      int halfLength = mem.length >> 1;
+      if (halfLength + 8 <= mem.length) {
+        mem.byteBuffer.putLong(mem.offset + halfLength, testValue);
+      }
+    }
+  }
+
+  private void deallocUpOrDown(BuddyAllocator a, boolean isSameOrderDealloc,
+      LlapMemoryBuffer[][] allocs, long[][] testValues) {
+    if (isSameOrderDealloc) {
+      for (int i = 0; i < allocs.length; ++i) {
+        deallocBuffers(a, allocs[i], testValues[i]);
+      }
+    } else {
+      for (int i = allocs.length - 1; i >= 0; --i) {
+        deallocBuffers(a, allocs[i], testValues[i]);
+      }
+    }
+  }
+
+  private void deallocBuffers(
+      BuddyAllocator a, LlapMemoryBuffer[] allocs, long[] testValues) {
+    for (int j = 0; j < allocs.length; ++j) {
+      LlapCacheableBuffer mem = (LlapCacheableBuffer)allocs[j];
+      assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset));
+      int halfLength = mem.length >> 1;
+      if (halfLength + 8 <= mem.length) {
+        assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset + halfLength));
+      }
+      a.deallocate(mem);
+    }
+  }
+
+  private Configuration createConf(int min, int max, int arena, int total) {
+    Configuration conf = new Configuration();
+    conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
+    conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max);
+    conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_SIZE.varname, arena);
+    conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total);
+    return conf;
+  }
+}

Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (added)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assume;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLowLevelCacheImpl {
+  private static final Log LOG = LogFactory.getLog(TestLowLevelCacheImpl.class);
+}

Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Fri Jan 16 03:30:46 2015
@@ -38,11 +38,6 @@ public class TestLowLevelLrfuCachePolicy
   }
 
   @Test
-  public void testHeapSize7() {
-    testHeapSize(7);
-  }
-
-  @Test
   public void testHeapSize8() {
     testHeapSize(8);
   }
@@ -52,6 +47,11 @@ public class TestLowLevelLrfuCachePolicy
     testHeapSize(30);
   }
 
+  @Test
+  public void testHeapSize64() {
+    testHeapSize(64);
+  }
+
   private class EvictionTracker implements EvictionListener {
     public List<LlapCacheableBuffer> evicted = new ArrayList<LlapCacheableBuffer>();
 
@@ -72,7 +72,7 @@ public class TestLowLevelLrfuCachePolicy
     EvictionTracker et = new EvictionTracker();
     LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
     for (int i = 0; i < heapSize; ++i) {
-      LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+      LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
       assertTrue(cache(lfu, et, buffer));
       inserted.add(buffer);
     }
@@ -99,7 +99,7 @@ public class TestLowLevelLrfuCachePolicy
     EvictionTracker et = new EvictionTracker();
     LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
     for (int i = 0; i < heapSize; ++i) {
-      LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+      LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
       assertTrue(cache(lru, et, buffer));
       inserted.add(buffer);
     }
@@ -122,7 +122,7 @@ public class TestLowLevelLrfuCachePolicy
     EvictionTracker et = new EvictionTracker();
     LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(new HiveConf(), 1, heapSize, et);
     for (int i = 0; i < heapSize; ++i) {
-      LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+      LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
       assertTrue(cache(lrfu, et, buffer));
       inserted.add(buffer);
     }
@@ -137,7 +137,7 @@ public class TestLowLevelLrfuCachePolicy
     unlock(lrfu, locked);
   }
 
-  private static final LlapCacheableBuffer CANNOT_EVICT = LowLevelBuddyCache.allocateFake();
+  private static final LlapCacheableBuffer CANNOT_EVICT = LowLevelCacheImpl.allocateFake();
   // Buffers in test are fakes not linked to cache; notify cache policy explicitly.
   public boolean cache(
       LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapCacheableBuffer buffer) {
@@ -172,7 +172,7 @@ public class TestLowLevelLrfuCachePolicy
     LOG.info("Testing heap size " + heapSize);
     Random rdm = new Random(1234);
     HiveConf conf = new HiveConf();
-    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.05f); // very small heap? TODO#
+    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements
     EvictionTracker et = new EvictionTracker();
     LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
     // Insert the number of elements plus 2, to trigger 2 evictions.
@@ -181,7 +181,7 @@ public class TestLowLevelLrfuCachePolicy
     LlapCacheableBuffer[] evicted = new LlapCacheableBuffer[toEvict];
     Assume.assumeTrue(toEvict <= heapSize);
     for (int i = 0; i < heapSize + toEvict; ++i) {
-      LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+      LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
       assertTrue(cache(lrfu, et, buffer));
       LlapCacheableBuffer evictedBuf = getOneEvictedBuffer(et);
       if (i < toEvict) {