You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/10 03:38:18 UTC

svn commit: r1650717 - in /hive/branches/llap: common/src/java/org/apache/hadoop/hive/conf/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/h...

Author: sershe
Date: Sat Jan 10 02:38:17 2015
New Revision: 1650717

URL: http://svn.apache.org/r1650717
Log:
Preliminary patch for low-level cache, needs few more touches and LRFU policy would not be thread-safe

Added:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
Removed:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/Allocator.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/JavaAllocator.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/FifoCachePolicy.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/LrfuCachePolicy.java
Modified:
    hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Jan 10 02:38:17 2015
@@ -1969,8 +1969,10 @@ public class HiveConf extends Configurat
         "Updates tez job execution progress in-place in the terminal."),
 
     LLAP_ENABLED("hive.llap.enabled", true, ""),
-    LLAP_CACHE_SIZE("hive.llap.cache.size", 1024L * 1024 * 1024, ""),
-    LLAP_BUFFER_SIZE("hive.llap.buffer.size", 16 * 1024 * 1024, ""),
+    LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.minalloc", 128 * 1024, ""),
+    LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.minalloc", 16 * 1024 * 1024, ""),
+    LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.minalloc", 128L * 1024 * 1024, ""),
+    LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.minalloc", 1024L * 1024 * 1024, ""),
     LLAP_REQUEST_THREAD_COUNT("hive.llap.request.thread.count", 16, ""),
     LLAP_USE_LRFU("hive.llap.use.lrfu", true, ""),
     LLAP_LRFU_LAMBDA("hive.llap.lrfu.lambda", 0.01f, "")

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java Sat Jan 10 02:38:17 2015
@@ -18,10 +18,11 @@
 
 package org.apache.hadoop.hive.llap.io.api;
 
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
-
 public class EncodedColumn<BatchKey> {
-  public EncodedColumn(BatchKey batchKey, int columnIndex, LlapBuffer columnData) {
+  // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance
+  //       generality, and ability to not copy data from underlying low-level cached buffers.
+  public static class ColumnBuffer {}
+  public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer columnData) {
     this.batchKey = batchKey;
     this.columnIndex = columnIndex;
     this.columnData = columnData;
@@ -29,5 +30,5 @@ public class EncodedColumn<BatchKey> {
 
   public BatchKey batchKey;
   public int columnIndex;
-  public LlapBuffer columnData;
+  public ColumnBuffer columnData;
 }
\ No newline at end of file

Added: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java (added)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.cache;
+
+import java.nio.ByteBuffer;
+
+public abstract class LlapMemoryBuffer {
+  protected LlapMemoryBuffer(ByteBuffer byteBuffer, int offset, int length) {
+    initialize(byteBuffer, offset, length);
+  }
+  public void initialize(ByteBuffer byteBuffer, int offset, int length) {
+    this.byteBuffer = byteBuffer;
+    this.offset = offset;
+    this.length = length;
+  }
+  public ByteBuffer byteBuffer;
+  public int offset;
+  public int length;
+
+}
\ No newline at end of file

Added: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (added)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.cache;
+
+
+public interface LowLevelCache {
+
+  /**
+   * Gets file data for particular offsets. Null entries mean no data.
+   */
+  LlapMemoryBuffer[] getFileData(String fileName, long[] offsets);
+
+  /**
+   * Puts file data into cache.
+   * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
+   *         the replacement chunks from cache are updated directly in the array.
+   */
+  long[] putFileData(String file, long[] offsets, LlapMemoryBuffer[] chunks);
+
+  /**
+   * Releases the buffer returned by getFileData or allocateMultiple.
+   */
+  void releaseBuffer(LlapMemoryBuffer buffer);
+
+  /**
+   * Allocate dest.length new blocks of size into dest.
+   */
+  void allocateMultiple(LlapMemoryBuffer[] dest, int size);
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java Sat Jan 10 02:38:17 2015
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
 
 /** Dummy interface for now, might be different. */
 public interface Cache<CacheKey> {
-  public LlapBuffer cacheOrGet(CacheKey key, LlapBuffer value);
-  public LlapBuffer get(CacheKey key);
+  public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value);
+  public ColumnBuffer get(CacheKey key);
 }

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+interface EvictionListener {
+  void notifyEvicted(LlapCacheableBuffer buffer);
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+public final class LlapCacheableBuffer extends LlapMemoryBuffer {
+  public LlapCacheableBuffer(ByteBuffer byteBuffer, int offset, int length) {
+    super(byteBuffer, offset, length);
+  }
+
+  public String toStringForCache() {
+    return "[" + Integer.toHexString(hashCode()) + " " + String.format("%1$.2f", priority) + " "
+        + lastUpdate + " " + (isLocked() ? "!" : ".") + "]";
+  }
+
+  private static final int EVICTED_REFCOUNT = -1;
+  private final AtomicInteger refCount = new AtomicInteger(0);
+
+  // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object.
+  public double priority;
+  public long lastUpdate = -1;
+  public int indexInHeap = -1;
+  public boolean isLockedInHeap; // TODO#: this flag is invalid and not thread safe
+
+  @Override
+  public int hashCode() {
+    if (this.byteBuffer == null) return 0;
+    return (System.identityHashCode(this.byteBuffer) * 37 + offset) * 37 + length;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (!(obj instanceof LlapCacheableBuffer)) return false;
+    LlapCacheableBuffer other = (LlapCacheableBuffer)obj;
+    // We only compare objects, and not contents of the ByteBuffer.
+    return byteBuffer == other.byteBuffer
+        && this.offset == other.offset && this.length == other.length;
+  }
+
+  int lock() {
+    int oldRefCount = -1;
+    while (true) {
+      oldRefCount = refCount.get();
+      if (oldRefCount == EVICTED_REFCOUNT) return -1;
+      assert oldRefCount >= 0;
+      if (refCount.compareAndSet(oldRefCount, oldRefCount + 1)) break;
+    }
+    return oldRefCount;
+  }
+
+  public boolean isLocked() {
+    // Best-effort check. We cannot do a good check against caller thread, since
+    // refCount could still be > 0 if someone else locked. This is used for asserts.
+    return refCount.get() > 0;
+  }
+
+  public boolean isInvalid() {
+    return refCount.get() == EVICTED_REFCOUNT;
+  }
+
+  int unlock() {
+    int newRefCount = refCount.decrementAndGet();
+    if (newRefCount < 0) {
+      throw new AssertionError("Unexpected refCount " + newRefCount);
+    }
+    return newRefCount;
+  }
+
+  @Override
+  public String toString() {
+    return "0x" + Integer.toHexString(hashCode());
+  }
+
+  /**
+   * @return Whether the we can invalidate; false if locked or already evicted.
+   */
+  boolean invalidate() {
+    while (true) {
+      int value = refCount.get();
+      if (value != 0) return false;
+      if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break;
+    }
+    if (DebugUtils.isTraceLockingEnabled()) {
+      LlapIoImpl.LOG.info("Invalidated " + this + " due to eviction");
+    }
+    return true;
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+public class LowLevelBuddyCache implements LowLevelCache, EvictionListener {
+  private final ArrayList<arena> arenas;
+  private AtomicInteger newEvictions = new AtomicInteger(0);
+  private final Thread cleanupThread;
+  private final ConcurrentHashMap<String, FileCache> cache =
+      new ConcurrentHashMap<String, FileCache>();
+  private final LowLevelCachePolicy cachePolicy;
+
+  // Config settings
+  private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;
+
+  private final int minAllocation, maxAllocation;
+  private final long maxSize, arenaSize;
+  
+  public LowLevelBuddyCache(Configuration conf) {
+    minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+    maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
+    arenaSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
+    maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+    if (maxSize < arenaSize || arenaSize > maxAllocation || maxAllocation < minAllocation) {
+      throw new AssertionError("Inconsistent sizes of cache, arena and allocations: "
+          + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSize);
+    }
+    if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)
+        || (Long.bitCount(arenaSize) != 1) || (minAllocation == 1)) {
+      // TODO: technically, arena size is not required to be so; needs to be divisible by maxAlloc
+      throw new AssertionError("Allocation and arena sizes must be powers of two > 1: "
+          + minAllocation + ", " + maxAllocation + ", " + arenaSize);
+    }
+    if ((maxSize % arenaSize) > 0 || (maxSize / arenaSize) > Integer.MAX_VALUE) {
+      throw new AssertionError(
+          "Cache size not consistent with arena size: " + arenaSize + "," + maxSize);
+    }
+    minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
+    maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
+    arenaSizeLog2 = 31 - Long.numberOfLeadingZeros(arenaSize);
+    maxArenas = (int)(maxSize / arenaSize);
+    arenas = new ArrayList<arena>(maxArenas);
+    for (int i = 0; i < maxArenas; ++i) {
+      arenas.add(new arena());
+    }
+    arenas.get(0).init();
+    cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
+        ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this)
+        : new LowLevelFifoCachePolicy(minAllocation, maxSize, this);
+    cleanupThread = new CleanupThread();
+    cleanupThread.start();
+  }
+
+  // TODO: would it make sense to return buffers asynchronously?
+  @Override
+  public void allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+    assert size > 0;
+    int freeListIndex = 31 - Integer.numberOfLeadingZeros(size);
+    if (size != (1 << freeListIndex)) ++freeListIndex; // not a power of two, add one more
+    freeListIndex = Math.max(freeListIndex - minAllocLog2, 0);
+    int allocationSize = 1 << (freeListIndex + minAllocLog2);
+    int total = dest.length * allocationSize;
+    cachePolicy.reserveMemory(total);
+
+    int ix = 0;
+    for (int i = 0; i < dest.length; ++i) {
+      if (dest[i] != null) continue;
+      dest[i] = new LlapCacheableBuffer(null, -1, -1); // TODO: pool of objects?
+    }
+    // TODO: instead of waiting, loop only ones we haven't tried w/tryLock?
+    for (arena block : arenas) {
+      int newIx = allocateFast(block, freeListIndex, dest, ix, allocationSize);
+      if (newIx == -1) break;
+      if (newIx == dest.length) return;
+      ix = newIx;
+    }
+    // Then try to split bigger blocks.
+    for (arena block : arenas) {
+      int newIx = allocateWithSplit(block, freeListIndex, dest, ix, allocationSize);
+      if (newIx == -1) break;
+      if (newIx == dest.length) return;
+      ix = newIx;
+    }
+    // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
+    for (arena block : arenas) {
+      ix = allocateWithExpand(block, freeListIndex, dest, ix, allocationSize);
+      if (ix == dest.length) return;
+    }
+  }
+
+  private int allocateFast(arena block,
+      int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
+    if (block.data == null) return -1; // not allocated yet
+    FreeList freeList = block.freeLists[freeListIndex];
+    freeList.lock.lock();
+    try {
+      ix = allocateFromFreeListUnderLock(block, freeList, freeListIndex, dest, ix, size);
+    } finally {
+      freeList.lock.unlock();
+    }
+    return ix;
+  }
+
+  private int allocateWithSplit(
+      arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) {
+    if (arena.data == null) return -1; // not allocated yet
+    FreeList freeList = arena.freeLists[freeListIndex];
+    int remaining = -1;
+    freeList.lock.lock();
+    try {
+      ix = allocateFromFreeListUnderLock(arena, freeList, freeListIndex, dest, ix, allocationSize);
+      remaining = dest.length - ix;
+      if (remaining == 0) return ix;
+    } finally {
+      freeList.lock.unlock();
+    }
+    int splitListIndex = freeListIndex;
+    byte headerData = (byte)((freeListIndex << 1) | 1);
+    while (remaining > 0) {
+      ++splitListIndex;
+      int splitWays = 1 << (splitListIndex - freeListIndex);
+      int headerStep = 1 << splitListIndex;
+      int lastSplitBlocksRemaining = -1, lastSplitNextHeader = -1;
+      FreeList splitList = arena.freeLists[splitListIndex];
+      splitList.lock.lock();
+      try {
+        int headerIx = splitList.listHead;
+        while (headerIx >= 0 && remaining > 0) {
+          int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset;
+          int toTake = Math.min(splitWays, remaining);
+          remaining -= toTake;
+          lastSplitBlocksRemaining = splitWays - toTake;
+          for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
+            arena.headers[headerIx] = headerData;
+            dest[ix].initialize(arena.data, offset, allocationSize);
+          }
+          lastSplitNextHeader = headerIx;
+          headerIx = arena.data.getInt(origOffset + 4);
+          arena.data.putLong(origOffset, -1); // overwrite list pointers for safety
+        }
+        splitList.listHead = headerIx;
+      } finally {
+        splitList.lock.unlock();
+      }
+      if (remaining == 0) {
+        // We have just obtained all we needed by splitting at lastSplitBlockOffset; now
+        // we need to put the space remaining from that block into lower free lists.
+        // TODO: if we could return blocks asynchronously, we could do this 
+        int newListIndex = freeListIndex;
+        while (lastSplitBlocksRemaining > 0) {
+          if ((lastSplitBlocksRemaining & 1) == 1) {
+            arena.headers[lastSplitNextHeader] = (byte)((newListIndex << 1) | 1);
+            int offset = offsetFromHeaderIndex(lastSplitNextHeader);
+            FreeList newFreeList = arena.freeLists[newListIndex];
+            newFreeList.lock.lock();
+            try {
+              arena.data.putInt(offset, -1);
+              arena.data.putInt(offset, newFreeList.listHead);
+              newFreeList.listHead = lastSplitNextHeader;
+            } finally {
+              newFreeList.lock.unlock();
+            }
+            lastSplitNextHeader += (1 << newListIndex);
+          }
+          lastSplitBlocksRemaining >>>= 1;
+          ++newListIndex;
+          continue;
+        }
+      }
+    }
+    return ix;
+  }
+
+  public int offsetFromHeaderIndex(int lastSplitNextHeader) {
+    return lastSplitNextHeader << minAllocLog2;
+  }
+
+  public int allocateFromFreeListUnderLock(arena block, FreeList freeList,
+      int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
+    int current = freeList.listHead;
+    while (current >= 0 && ix < dest.length) {
+      int offset = offsetFromHeaderIndex(current);
+      block.headers[current] = (byte)((freeListIndex << 1) | 1);
+      current = block.data.getInt(offset + 4);
+      dest[ix].initialize(block.data, offset, size);
+      block.data.putLong(offset, -1); // overwrite list pointers for safety
+      ++ix;
+    }
+    freeList.listHead = current;
+    return ix;
+  }
+
+  private int allocateWithExpand(
+      arena block, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
+    if (block.data != null) return ix; // already allocated
+    synchronized (block) {
+      // Never goes from non-null to null, so this is the only place we need sync.
+      if (block.data == null) {
+        block.init();
+      }
+    }
+    return allocateWithSplit(block, freeListIndex, dest, ix, size);
+  }
+
+  @Override
+  public LlapMemoryBuffer[] getFileData(String fileName, long[] offsets) {
+    LlapMemoryBuffer[] result = null;
+    // TODO: string must be internalized
+    FileCache subCache = cache.get(fileName);
+    if (subCache == null || !subCache.incRef()) return result;
+    try {
+      for (int i = 0; i < offsets.length; ++i) {
+        while (true) { // Overwhelmingly only runs once.
+          long offset = offsets[i];
+          LlapCacheableBuffer buffer = subCache.cache.get(offset);
+          if (buffer == null) break;
+          if (lockBuffer(buffer)) {
+            if (result == null) {
+              result = new LlapCacheableBuffer[offsets.length];
+            }
+            result[i] = buffer;
+            break;
+          }
+          if (subCache.cache.remove(offset, buffer)) break;
+        }
+      }
+    } finally {
+      subCache.decRef();
+    }
+    return result;
+  }
+
+  private boolean lockBuffer(LlapCacheableBuffer buffer) {
+    int rc = buffer.lock();
+    if (rc == 0) {
+      cachePolicy.notifyLock(buffer);
+    }
+    return rc >= 0;
+  }
+
+  @Override
+  public long[] putFileData(String fileName, long[] offsets, LlapMemoryBuffer[] buffers) {
+    long[] result = null;
+    assert buffers.length == offsets.length;
+    // TODO: string must be internalized
+    FileCache subCache = getOrAddFileSubCache(fileName);
+    try {
+      for (int i = 0; i < offsets.length; ++i) {
+        LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i];
+        long offset = offsets[i];
+        assert buffer.isLocked();
+        while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
+          LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
+          if (oldVal == null) break; // Cached successfully.
+          if (DebugUtils.isTraceCachingEnabled()) {
+            LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for "
+                + fileName + "@" + offset  + "; old " + oldVal + ", new " + buffer);
+          }
+          if (lockBuffer(oldVal)) {
+            // We found an old, valid block for this key in the cache.
+            releaseBufferInternal(buffer);
+            buffers[i] = oldVal;
+            if (result == null) {
+              result = new long[align64(buffers.length) >>> 6];
+            }
+            result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value
+            break;
+          }
+          // We found some old value but couldn't lock it; remove it.
+          subCache.cache.remove(offset, oldVal);
+        }
+      }
+    } finally {
+      subCache.decRef();
+    }
+    return result;
+  }
+
+  /**
+   * All this mess is necessary because we want to be able to remove sub-caches for fully
+   * evicted files. It may actually be better to have non-nested map with object keys?
+   */
+  public FileCache getOrAddFileSubCache(String fileName) {
+    FileCache newSubCache = null;
+    while (true) { // Overwhelmingly executes once.
+      FileCache subCache = cache.get(fileName);
+      if (subCache != null) {
+        if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
+        if (newSubCache == null) {
+          newSubCache = new FileCache();
+          newSubCache.incRef();
+        }
+        // Found a stale value we cannot incRef; try to replace it with new value.
+        if (cache.replace(fileName, subCache, newSubCache)) return newSubCache;
+        continue; // Someone else replaced/removed a stale value, try again.
+      }
+      // No value found.
+      if (newSubCache == null) {
+        newSubCache = new FileCache();
+        newSubCache.incRef();
+      }
+      FileCache oldSubCache = cache.putIfAbsent(fileName, newSubCache);
+      if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
+      if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
+      // Someone created one in parallel and then it went stale.
+      if (cache.replace(fileName, oldSubCache, newSubCache)) return newSubCache;
+      // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
+    }
+  }
+
+  private static int align64(int number) {
+    return ((number + 63) & ~63);
+  }
+
+
+  @Override
+  public void releaseBuffer(LlapMemoryBuffer buffer) {
+    releaseBufferInternal((LlapCacheableBuffer)buffer);
+  }
+
+  public void releaseBufferInternal(LlapCacheableBuffer buffer) {
+    if (buffer.unlock() == 0) {
+      cachePolicy.notifyUnlock(buffer);
+      unblockEviction();
+    }
+  }
+
+  public static LlapCacheableBuffer allocateFake() {
+    return new LlapCacheableBuffer(null, -1, -1);
+  }
+
+  public void unblockEviction() {
+    newEvictions.incrementAndGet();
+  }
+
+  @Override
+  public void notifyEvicted(LlapCacheableBuffer buffer) {
+    
+  }
+
+  private final class CleanupThread extends Thread {
+    private int APPROX_CLEANUP_INTERVAL_SEC = 600;
+
+    public CleanupThread() {
+      super("Llap ChunkPool cleanup thread");
+      setDaemon(true);
+      setPriority(1);
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          doOneCleanupRound();
+        } catch (InterruptedException ex) {
+          LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
+          Thread.currentThread().interrupt();
+          break;
+        } catch (Throwable t) {
+          LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
+          break;
+        }
+      }
+    }
+
+    private void doOneCleanupRound() throws InterruptedException {
+      while (true) {
+        int evictionsSinceLast = newEvictions.getAndSet(0);
+        if (evictionsSinceLast > 0) break;
+        synchronized (newEvictions) {
+          newEvictions.wait(10000);
+        }
+      }
+      // Duration is an estimate; if the size of the map changes, it can be very different.
+      long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L;
+      int leftToCheck = 0; // approximate
+      for (FileCache fc : cache.values()) {
+        leftToCheck += fc.cache.size();
+      }
+      // TODO: if these super-long-lived iterator affects the map in some bad way,
+      //       we'd need to sleep once per round instead.
+      // Iterate thru all the filecaches. This is best-effort.
+      Iterator<Map.Entry<String, FileCache>> iter = cache.entrySet().iterator();
+      while (iter.hasNext()) {
+        FileCache fc = iter.next().getValue();
+        if (!fc.incRef()) {
+          throw new AssertionError("Something other than cleanup is removing elements from map");
+        }
+        // Iterate thru the file cache. This is best-effort.
+        Iterator<Map.Entry<Long, LlapCacheableBuffer>> subIter = fc.cache.entrySet().iterator();
+        boolean isEmpty = true;
+        while (subIter.hasNext()) {
+          Thread.sleep((leftToCheck <= 0)
+              ? 1 : (endTime - System.nanoTime()) / (1000000L * leftToCheck));
+          if (subIter.next().getValue().isInvalid()) {
+            subIter.remove();
+          } else {
+            isEmpty = false;
+          }
+          --leftToCheck;
+        }
+        if (!isEmpty) {
+          fc.decRef();
+          continue;
+        }
+        // FileCache might be empty; see if we can remove it. "tryWriteLock"
+        if (!fc.startEvicting()) continue;
+        if (fc.cache.isEmpty()) {
+          fc.commitEvicting();
+          iter.remove();
+        } else {
+          fc.abortEvicting();
+        }
+      }
+    }
+  }
+
+  private class arena {
+    void init() {
+      data = ByteBuffer.allocateDirect(maxAllocation);
+      int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
+      headers = new byte[maxMinAllocs];
+      int allocLog2Diff = maxAllocLog2 - minAllocLog2;
+      freeLists = new FreeList[allocLog2Diff];
+      for (int i = 0; i < maxAllocLog2; ++i) {
+        freeLists[i] = new FreeList();
+      }
+      int maxMaxAllocs = 1 << (arenaSizeLog2 - maxAllocLog2),
+          headerIndex = 0, headerIncrement = 1 << allocLog2Diff;
+      freeLists[maxAllocLog2 - 1].listHead = 0;
+      for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
+        // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
+        headers[headerIndex] = (byte)(allocLog2Diff << 1); // Maximum allocation size
+        data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerIncrement));
+        data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerIncrement));
+        headerIndex += headerIncrement;
+      }
+    }
+    ByteBuffer data;
+    // Avoid storing headers with data since we expect binary size allocations.
+    // Each headers[i] is a "virtual" byte at i * minAllocation.
+    byte[] headers;
+    FreeList[] freeLists;
+  }
+
+  private static class FreeList {
+    ReentrantLock lock = new ReentrantLock(false);
+    int listHead = -1; // Index of where the buffer is; in minAllocation units
+    // TODO: One possible improvement - store blocks arriving left over from splits, and
+    //       blocks requested, to be able to wait for pending splits and reduce fragmentation.
+    //       However, we are trying to increase fragmentation now, since we cater to single-size.
+  }
+
+  // TODO##: separate the classes?
+  private static class FileCache {
+    private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
+    // TODO: given the specific data, perhaps the nested thing should not be CHM
+    private ConcurrentHashMap<Long, LlapCacheableBuffer> cache
+      = new ConcurrentHashMap<Long, LlapCacheableBuffer>();
+    private AtomicInteger refCount = new AtomicInteger(0);
+
+    boolean incRef() {
+      while (true) {
+        int value = refCount.get();
+        if (value == EVICTED_REFCOUNT) return false;
+        if (value == EVICTING_REFCOUNT) continue; // spin until it resolves
+        assert value >= 0;
+        if (refCount.compareAndSet(value, value + 1)) return true;
+      }
+    }
+
+    void decRef() {
+      int value = refCount.decrementAndGet();
+      if (value < 0) {
+        throw new AssertionError("Unexpected refCount " + value);
+      }
+    }
+
+    boolean startEvicting() {
+      while (true) {
+        int value = refCount.get();
+        if (value != 1) return false;
+        if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
+      }
+    }
+
+    void commitEvicting() {
+      boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
+      assert result;
+    }
+
+    void abortEvicting() {
+      boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
+      assert result;
+    }
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+public interface LowLevelCachePolicy {
+  void cache(LlapCacheableBuffer buffer);
+  void notifyLock(LlapCacheableBuffer buffer);
+  void notifyUnlock(LlapCacheableBuffer buffer);
+  void reserveMemory(long total);
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy {
+  private final AtomicLong usedMemory;
+  private final long maxSize;
+  private EvictionListener evictionListener;
+
+  public LowLevelCachePolicyBase(long maxSize, EvictionListener listener) {
+    this.maxSize = maxSize;
+    this.usedMemory = new AtomicLong(0);
+    this.evictionListener = listener;
+  }
+
+  @Override
+  public void reserveMemory(long memoryToReserve) {
+    // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point?
+    while (memoryToReserve > 0) {
+      long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve;
+      if (newUsedMem <= maxSize) {
+        if (usedMemory.compareAndSet(usedMem, newUsedMem)) break;
+        continue;
+      }
+      // TODO: for one-block case, we could move notification for the last block out of the loop.
+      long evicted = evictSomeBlocks(memoryToReserve, evictionListener);
+      // Adjust the memory - we have to account for what we have just evicted.
+      while (true) {
+        long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted);
+        if (usedMemory.compareAndSet(usedMem, usedMem + reserveWithEviction)) {
+          memoryToReserve -= reserveWithEviction;
+          break;
+        }
+        usedMem = usedMemory.get();
+      }
+    }
+  }
+
+  protected abstract long evictSomeBlocks(long memoryToReserve, EvictionListener listener);
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LowLevelFifoCachePolicy extends LowLevelCachePolicyBase {
+  private final Lock lock = new ReentrantLock();
+  private final LinkedHashSet<LlapCacheableBuffer> buffers;
+
+  public LowLevelFifoCachePolicy(
+      int expectedBufferSize, long maxCacheSize, EvictionListener listener) {
+    super(maxCacheSize, listener);
+    int expectedBuffers = (int)Math.ceil((maxCacheSize * 1.0) / expectedBufferSize);
+    buffers = new LinkedHashSet<LlapCacheableBuffer>((int)(expectedBuffers / 0.75f));
+  }
+
+  @Override
+  public void cache(LlapCacheableBuffer buffer) {
+    lock.lock();
+    try {
+      buffers.add(buffer);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void notifyLock(LlapCacheableBuffer buffer) {
+    // FIFO policy doesn't care.
+  }
+
+  @Override
+  public void notifyUnlock(LlapCacheableBuffer buffer) {
+    // FIFO policy doesn't care.
+  }
+
+  @Override
+  protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+    long evicted = 0;
+    lock.lock();
+    try {
+      Iterator<LlapCacheableBuffer> iter = buffers.iterator();
+      while (evicted < memoryToReserve && iter.hasNext()) {
+        LlapCacheableBuffer candidate = iter.next();
+        if (candidate.invalidate()) {
+          iter.remove();
+          evicted += candidate.length;
+          listener.notifyEvicted(candidate);
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+    return evicted;
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implementation of the "simple" algorithm from "On the Existence of a Spectrum of Policies
+ * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies".
+ * TODO: fix this, no longer true; with ORC as is, 4k buffers per gig of cache
+ * We expect the number of buffers to be relatively small (1000s), so we just use one heap.
+ **/
+public class LowLevelLrfuCachePolicy extends LowLevelCachePolicyBase {
+  private final double lambda;
+  private final double f(long x) {
+    return Math.pow(0.5, lambda * x);
+  }
+  private static final double F0 = 1; // f(0) is always 1
+  private final double touchPriority(long time, long lastAccess, double previous) {
+    return F0 + f(time - lastAccess) * previous;
+  }
+  private final double expirePriority(long time, long lastAccess, double previous) {
+    return f(time - lastAccess) * previous;
+  }
+
+  private final AtomicLong timer = new AtomicLong(0);
+  /**
+   * The heap. Currently synchronized on itself; there is a number of papers out there
+   * with various lock-free/efficient priority queues which we can use if needed.
+   */
+  private final LlapCacheableBuffer[] heap;
+  /** Number of elements. */
+  private int heapSize = 0;
+
+  public LowLevelLrfuCachePolicy(Configuration conf,
+      long minBufferSize, long maxCacheSize, EvictionListener listener) {
+    super(maxCacheSize, listener);
+    heap = new LlapCacheableBuffer[(int)Math.ceil((maxCacheSize * 1.0) / minBufferSize)];
+    lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
+  }
+
+  @Override
+  public void cache(LlapCacheableBuffer buffer) {
+    buffer.lastUpdate = timer.incrementAndGet();
+    buffer.priority = F0;
+    assert buffer.isLocked();
+    buffer.isLockedInHeap = true;
+    synchronized (heap) {
+      // Ensured by reserveMemory.
+      assert heapSize < heap.length : heap.length + " >= " + heapSize;
+      buffer.indexInHeap = heapSize;
+      heapifyUpUnderLock(buffer, buffer.lastUpdate);
+      if (DebugUtils.isTraceEnabled()) {
+        LlapIoImpl.LOG.info(buffer + " inserted at " + buffer.lastUpdate);
+      }
+      ++heapSize;
+    }
+  }
+
+  @Override
+  public void notifyLock(LlapCacheableBuffer buffer) {
+    long time = timer.get();
+    synchronized (heap) {
+      buffer.isLockedInHeap = true;
+      heapifyDownUnderLock(buffer, time);
+    }
+  }
+
+  @Override
+  public void notifyUnlock(LlapCacheableBuffer buffer) {
+    long time = timer.incrementAndGet();
+    synchronized (heap) {
+      if (DebugUtils.isTraceCachingEnabled()) {
+        LlapIoImpl.LOG.info("Touching " + buffer + " at " + time);
+      }
+      buffer.priority = touchPriority(time, buffer.lastUpdate, buffer.priority);
+      buffer.lastUpdate = time;
+      buffer.isLockedInHeap = false;
+      // Buffer's priority just decreased from boosted lock priority, so move up.
+      heapifyUpUnderLock(buffer, time);
+    }
+  }
+
+  private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
+    if (heapSize == 0) return null;
+    LlapCacheableBuffer result = heap[0];
+    if (!result.invalidate()) {
+      // We boost the priority of locked buffers to a very large value;
+      // this means entire heap is locked. TODO: need to work around that for small pools?
+      if (DebugUtils.isTraceCachingEnabled()) {
+        LlapIoImpl.LOG.info("Failed to invalidate head " + result.toString() + "; size = " + heapSize);
+      }
+      return null;
+    }
+    if (DebugUtils.isTraceCachingEnabled()) {
+      LlapIoImpl.LOG.info("Evicting " + result + " at " + time);
+    }
+    result.indexInHeap = -1;
+    --heapSize;
+    LlapCacheableBuffer newRoot = heap[heapSize];
+    newRoot.indexInHeap = 0;
+    if (newRoot.lastUpdate != time && !newRoot.isLockedInHeap) {
+      newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority);
+      newRoot.lastUpdate = time;
+    }
+    heapifyDownUnderLock(newRoot, time);
+    return result;
+  }
+
+  private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) {
+    // Relative positions of the blocks don't change over time; priorities we expire can only
+    // decrease; we only have one block that could have broken heap rule and we always move it
+    // down; therefore, we can update priorities of other blocks as we go for part of the heap -
+    // we correct any discrepancy w/the parent after expiring priority, and any block we expire
+    // the priority for already has lower priority than that of its children.
+    // TODO: avoid expiring priorities if times are close? might be needlessly expensive.
+    int ix = buffer.indexInHeap;
+    double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority;
+    while (true) {
+      int leftIx = (ix << 1) + 1, rightIx = leftIx + 1;
+      if (leftIx >= heapSize) break; // Buffer is at the leaf node.
+      LlapCacheableBuffer left = heap[leftIx], right = null;
+      if (rightIx < heapSize) {
+        right = heap[rightIx];
+      }
+      double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time);
+      if (priority <= leftPri && priority <= rightPri) break;
+      if (leftPri <= rightPri) { // prefer left, cause right might be missing
+        heap[ix] = left;
+        left.indexInHeap = ix;
+        ix = leftIx;
+      } else {
+        heap[ix] = right;
+        right.indexInHeap = ix;
+        ix = rightIx;
+      }
+    }
+    buffer.indexInHeap = ix;
+    heap[ix] = buffer;
+  }
+
+  private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) {
+    // See heapifyDown comment.
+    int ix = buffer.indexInHeap;
+    double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority;
+    while (true) {
+      if (ix == 0) break; // Buffer is at the top of the heap.
+      int parentIx = (ix - 1) >>> 1;
+      LlapCacheableBuffer parent = heap[parentIx];
+      double parentPri = getHeapifyPriority(parent, time);
+      if (priority >= parentPri) break;
+      heap[ix] = parent;
+      parent.indexInHeap = ix;
+      ix = parentIx;
+    }
+    buffer.indexInHeap = ix;
+    heap[ix] = buffer;
+  }
+
+  private double getHeapifyPriority(LlapCacheableBuffer buf, long time) {
+    if (buf == null || buf.isLockedInHeap) return Double.MAX_VALUE;
+    if (buf.lastUpdate != time) {
+      buf.priority = expirePriority(time, buf.lastUpdate, buf.priority);
+      buf.lastUpdate = time;
+    }
+    return buf.priority;
+  }
+
+  public String debugDumpHeap() {
+    if (heapSize == 0) return "<empty>";
+    int levels = 32 - Integer.numberOfLeadingZeros(heapSize);
+    StringBuilder result = new StringBuilder();
+    int ix = 0;
+    int spacesCount = heap[0].toStringForCache().length() + 3;
+    String full = StringUtils.repeat(" ", spacesCount),
+        half = StringUtils.repeat(" ", spacesCount / 2);
+    int maxWidth = 1 << (levels - 1);
+    for (int i = 0; i < levels; ++i) {
+      int width = 1 << i;
+      int middleGap = (maxWidth - width) / width;
+      for (int j = 0; j < (middleGap >>> 1); ++j) {
+        result.append(full);
+      }
+      if ((middleGap & 1) == 1) {
+        result.append(half);
+      }
+      for (int j = 0; j < width && ix < heapSize; ++j, ++ix) {
+        if (j != 0) {
+          for (int k = 0; k < middleGap; ++k) {
+            result.append(full);
+          }
+          if (middleGap == 0) {
+            result.append(" ");
+          }
+        }
+        if ((j & 1) == 0) {
+          result.append("(");
+        }
+        result.append(heap[ix].toStringForCache());
+        if ((j & 1) == 1) {
+          result.append(")");
+        }
+      }
+      result.append("\n");
+    }
+    return result.toString();
+  }
+
+  @VisibleForTesting
+  public LlapCacheableBuffer evictOneMoreBlock() {
+    synchronized (heap) {
+      return evictFromHeapUnderLock(timer.get());
+    }
+  }
+
+  @Override
+  protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+    long evicted = 0;
+    while (evicted < memoryToReserve) {
+      LlapCacheableBuffer buffer = evictOneMoreBlock();
+      if (buffer == null) return evicted;
+      evicted += buffer.length;
+      listener.notifyEvicted(buffer);
+    }
+    return evicted;
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java Sat Jan 10 02:38:17 2015
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
 
 public class NoopCache<CacheKey> implements Cache<CacheKey> {
   @Override
-  public LlapBuffer cacheOrGet(CacheKey key, LlapBuffer value) {
+  public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value) {
     return value;
   }
 
   @Override
-  public LlapBuffer get(CacheKey key) {
+  public ColumnBuffer get(CacheKey key) {
     return null;  // TODO: ensure real implementation increases refcount
   }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java Sat Jan 10 02:38:17 2015
@@ -22,7 +22,7 @@ package org.apache.hadoop.hive.llap.io.a
 import java.util.List;
 import java.io.IOException;
 
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 public interface VectorReader {
@@ -33,7 +33,7 @@ public interface VectorReader {
   public static class ColumnVectorBatch {
     public ColumnVector[] cols;
     public int size;
-    public List<LlapBuffer> lockedBuffers;
+    public List<ColumnBuffer> lockedBuffers;
   }
   public ColumnVectorBatch next() throws InterruptedException, IOException;
   public void close() throws IOException;

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Sat Jan 10 02:38:17 2015
@@ -26,11 +26,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.cache.Cache;
-import org.apache.hadoop.hive.llap.cache.JavaAllocator;
+import org.apache.hadoop.hive.llap.cache.LowLevelBuddyCache;
 import org.apache.hadoop.hive.llap.cache.NoopCache;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.api.VectorReader;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
 import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
@@ -53,11 +52,8 @@ public class LlapIoImpl implements LlapI
 
   private LlapIoImpl(Configuration conf) throws IOException {
     this.conf = conf;
-    // ChunkPool<OrcLoader.ChunkKey> chunkPool = new ChunkPool<OrcLoader.ChunkKey>();
-    // new BufferPool(conf, chunkPool)
-    Allocator allocator = new JavaAllocator();
-    Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>();
-    this.edp = new OrcEncodedDataProducer(allocator, cache, conf);
+    Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>(); // High-level cache not supported yet.
+    this.edp = new OrcEncodedDataProducer(new LowLevelBuddyCache(conf), cache, conf);
     this.cvp = new OrcColumnVectorProducer(edp, conf);
   }
 
@@ -66,7 +62,7 @@ public class LlapIoImpl implements LlapI
     getOrCreateInstance(conf);
   }
 
-  // TODO#: Add "create" method in a well-defined place when server is started
+  // TODO: Add "create" method in a well-defined place when server is started
   public static LlapIo getOrCreateInstance(Configuration conf) {
     if (ioImpl != null) return ioImpl;
     synchronized (instanceLock) {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Sat Jan 10 02:38:17 2015
@@ -26,9 +26,8 @@ import java.util.List;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
 import org.apache.hadoop.hive.llap.io.api.VectorReader.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -38,10 +37,10 @@ import org.apache.hadoop.mapred.InputSpl
 public abstract class ColumnVectorProducer<BatchKey> {
   static class EncodedColumnBatch {
     public EncodedColumnBatch(int colCount) {
-      columnDatas = new LlapBuffer[colCount];
+      columnDatas = new ColumnBuffer[colCount];
       columnsRemaining = colCount;
     }
-    public LlapBuffer[] columnDatas;
+    public ColumnBuffer[] columnDatas;
     public int columnsRemaining;
   }
 
@@ -51,7 +50,7 @@ public abstract class ColumnVectorProduc
     // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb.
     private final HashMap<BatchKey, EncodedColumnBatch> pendingData =
         new HashMap<BatchKey, EncodedColumnBatch>();
-    private ConsumerFeedback<LlapBuffer> upstreamFeedback;
+    private ConsumerFeedback<ColumnBuffer> upstreamFeedback;
     private final Consumer<ColumnVectorBatch> downstreamConsumer;
     private final int colCount;
 
@@ -60,7 +59,7 @@ public abstract class ColumnVectorProduc
       this.colCount = colCount;
     }
 
-    public void init(ConsumerFeedback<LlapBuffer> upstreamFeedback) {
+    public void init(ConsumerFeedback<ColumnBuffer> upstreamFeedback) {
       this.upstreamFeedback = upstreamFeedback;
     }
 
@@ -134,13 +133,14 @@ public abstract class ColumnVectorProduc
 
     @Override
     public void returnData(ColumnVectorBatch data) {
-      for (LlapBuffer lockedBuffer : data.lockedBuffers) {
+      // TODO#: this should happen earlier, when data is decoded buffers are not needed
+      for (ColumnBuffer lockedBuffer : data.lockedBuffers) {
         upstreamFeedback.returnData(lockedBuffer);
       }
     }
 
     private void dicardPendingData(boolean isStopped) {
-      List<LlapBuffer> dataToDiscard = new ArrayList<LlapBuffer>(pendingData.size() * colCount);
+      List<ColumnBuffer> dataToDiscard = new ArrayList<ColumnBuffer>(pendingData.size() * colCount);
       List<EncodedColumnBatch> batches = new ArrayList<EncodedColumnBatch>(pendingData.size());
       synchronized (pendingData) {
         if (isStopped) {
@@ -151,13 +151,13 @@ public abstract class ColumnVectorProduc
       }
       for (EncodedColumnBatch batch : batches) {
         synchronized (batch) {
-          for (LlapBuffer b : batch.columnDatas) {
+          for (ColumnBuffer b : batch.columnDatas) {
             dataToDiscard.add(b);
           }
           batch.columnDatas = null;
         }
       }
-      for (LlapBuffer data : dataToDiscard) {
+      for (ColumnBuffer data : dataToDiscard) {
         upstreamFeedback.returnData(data);
       }
     }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java Sat Jan 10 02:38:17 2015
@@ -21,8 +21,8 @@ package org.apache.hadoop.hive.llap.io.e
 import java.io.IOException;
 
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
 
-public interface EncodedDataReader<BatchKey> extends ConsumerFeedback<LlapBuffer> {
+public interface EncodedDataReader<BatchKey> extends ConsumerFeedback<ColumnBuffer> {
   public void start() throws IOException;
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Sat Jan 10 02:38:17 2015
@@ -30,8 +30,8 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
@@ -48,9 +48,9 @@ import org.apache.hadoop.mapred.InputSpl
 
 public class OrcEncodedDataProducer implements EncodedDataProducer<OrcBatchKey> {
   private FileSystem cachedFs = null;
+  private final LowLevelCache lowLevelCache;
   private Configuration conf;
   private OrcMetadataCache metadataCache;
-  private final Allocator allocator;
   private final Cache<OrcCacheKey> cache;
 
   private class OrcEncodedDataReader implements EncodedDataReader<OrcBatchKey>,
@@ -140,7 +140,7 @@ public class OrcEncodedDataProducer impl
         }
         RecordReader stripeReader = orcReader.rows(si.getOffset(), si.getLength(), includes);
         // We pass in the already-filtered RGs, as well as sarg. ORC can apply additional filtering.
-        stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, allocator);
+        stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, lowLevelCache);
         stripeReader.close();
       }
 
@@ -151,7 +151,7 @@ public class OrcEncodedDataProducer impl
     }
 
     @Override
-    public void returnData(LlapBuffer data) {
+    public void returnData(ColumnBuffer data) {
       // TODO#: return the data to cache (unlock)
     }
 
@@ -231,7 +231,7 @@ public class OrcEncodedDataProducer impl
           boolean areAllRgsInCache = true;
           for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
             key.rgIx = rgIx;
-            LlapBuffer cached = cache.get(key);
+            ColumnBuffer cached = cache.get(key);
             if (cached == null) {
               areAllRgsInCache = false;
               continue;
@@ -274,9 +274,9 @@ public class OrcEncodedDataProducer impl
     public void consumeData(EncodedColumn<OrcBatchKey> data) {
       // Store object in cache; create new key object - cannot be reused.
       OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex);
-      LlapBuffer cached = cache.cacheOrGet(key, data.columnData);
+      ColumnBuffer cached = cache.cacheOrGet(key, data.columnData);
       if (data.columnData != cached) {
-        allocator.deallocate(data.columnData);
+        // TODO: deallocate columnData
         data.columnData = cached;
       }
       consumer.consumeData(data);
@@ -301,15 +301,15 @@ public class OrcEncodedDataProducer impl
   }
 
   private static int align64(int number) {
-    int rem = number & 63;
-    return number - rem + (rem == 0 ? 0 : 64);
+    return ((number + 63) & ~63);
   }
 
-  public OrcEncodedDataProducer(Allocator allocator, Cache<OrcCacheKey> cache, Configuration conf) throws IOException {
+  public OrcEncodedDataProducer(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
+      Configuration conf) throws IOException {
     // We assume all splits will come from the same FS.
     this.cachedFs = FileSystem.get(conf);
     this.cache = cache;
-    this.allocator = allocator;
+    this.lowLevelCache = lowLevelCache;
     this.conf = conf;
     this.metadataCache = null;
   }
@@ -319,5 +319,4 @@ public class OrcEncodedDataProducer impl
       SearchArgument sarg, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
     return new OrcEncodedDataReader(split, columnIds, sarg, consumer);
   }
-
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Sat Jan 10 02:38:17 2015
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.orc.*;
@@ -99,7 +99,7 @@ public class LLAPRecordReaderImpl extend
 
   @Override
   public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
-      Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator) {
+      Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache) {
 
   }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java Sat Jan 10 02:38:17 2015
@@ -39,11 +39,9 @@ public class BufferPool {
 
 
   public BufferPool(Configuration conf) {
-    this.maxCacheSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_CACHE_SIZE);
-    this.bufferSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_BUFFER_SIZE);
-    this.cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
-        ? new LrfuCachePolicy(conf, bufferSize, maxCacheSize)
-        : new FifoCachePolicy(bufferSize, maxCacheSize);
+    this.maxCacheSize = 0;// HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_CACHE_SIZE);
+    this.bufferSize = 0; // HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_BUFFER_SIZE);
+    this.cachePolicy = null;
   }
 
   /**

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java Sat Jan 10 02:38:17 2015
@@ -54,11 +54,9 @@ public class ChunkPool<K> /*implements E
    * @return Chunk corresponding to k.
    */
   public Chunk getChunk(K key, HashSet<WeakBuffer> lockedBuffers) {
-    Chunk result = chunkCache.get(key);
-    if (result == null) {
-      return null;
-    }
     while (true) {
+      Chunk result = chunkCache.get(key);
+      if (result == null) return null;
       if (lockChunk(result, lockedBuffers)) return result;
       if (chunkCache.remove(key, result)) return null;
     }

Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java Sat Jan 10 02:38:17 2015
@@ -29,7 +29,7 @@ import org.junit.Assume;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
-public class TestLrfuCachePolicy {
+public class TestLrfuCachePolicy {/* TODO: switch to LowLevel one
   private static final Log LOG = LogFactory.getLog(TestLrfuCachePolicy.class);
 
   @Test
@@ -221,5 +221,5 @@ public class TestLrfuCachePolicy {
       debugStr += inserted.get(i);
     }
     return debugStr;
-  }
+  }*/
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Sat Jan 10 02:38:17 2015
@@ -152,6 +152,7 @@ abstract class InStream extends InputStr
       currentRange = 0;
     }
 
+    // TODO: this should allocate from cache
     private ByteBuffer allocateBuffer(int size) {
       // TODO: use the same pool as the ORC readers
       if(isDirect == true) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Sat Jan 10 02:38:17 2015
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 
 
@@ -97,5 +97,5 @@ public interface RecordReader {
    * @param allocator Allocator to allocate memory.
    */
   void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
-      Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator);
+      Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache);
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Sat Jan 10 02:38:17 2015
@@ -45,7 +45,7 @@ import org.apache.hadoop.hive.common.typ
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -2676,11 +2676,13 @@ public class RecordReaderImpl implements
                                   ) throws IOException {
     long start = stripe.getIndexLength();
     long end = start + stripe.getDataLength();
+    // TODO: planning should be added here too, to take cache into account
     // explicitly trigger 1 big read
     DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
     bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), Arrays.asList(ranges));
     List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
     createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+    // TODO: decompressed data from streams should be put in cache
   }
 
   /**
@@ -3050,6 +3052,7 @@ public class RecordReaderImpl implements
   private void readPartialDataStreams(StripeInformation stripe
                                       ) throws IOException {
     List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+    // TODO: planning should take cache into account
     List<DiskRange> chunks =
         planReadPartialDataStreams(streamList,
             indexes, included, includedRowGroups, codec != null,
@@ -3062,8 +3065,8 @@ public class RecordReaderImpl implements
       LOG.debug("merge = " + stringifyDiskRanges(chunks));
     }
     bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), chunks);
-    createStreams(streamList, bufferChunks, included, codec, bufferSize,
-        streams);
+    // TODO: decompressed data from streams should be put in cache
+    createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
   }
 
   @Override
@@ -3300,7 +3303,7 @@ public class RecordReaderImpl implements
 
   @Override
   public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
-      Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator) {
+      Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache allocator) {
     // TODO: HERE read encoded data
   }