You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/08/24 18:32:05 UTC

[1/3] hive git commit: HIVE-16233 : llap: Query failed with AllocatorOutOfMemoryException (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)

Repository: hive
Updated Branches:
  refs/heads/branch-2 a4b913360 -> bd32deb44


http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 390b34b..364897c 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -57,7 +57,7 @@ public class TestBuddyAllocator {
     isMapped = mmap;
   }
 
-  private static class DummyMemoryManager implements MemoryManager {
+  static class DummyMemoryManager implements MemoryManager {
     @Override
     public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) {
     }
@@ -76,11 +76,6 @@ public class TestBuddyAllocator {
     }
 
     @Override
-    public long forceReservedMemory(int allocationSize, int count) {
-      return allocationSize * count;
-    }
-
-    @Override
     public void debugDumpShort(StringBuilder sb) {
     }
   }
@@ -99,8 +94,9 @@ public class TestBuddyAllocator {
   @Test
   public void testSameSizes() throws Exception {
     int min = 3, max = 8, maxAlloc = 1 << max;
-    BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc, maxAlloc,
-        tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+    BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc,
+        maxAlloc, 0, tmpDir, new DummyMemoryManager(),
+        LlapDaemonCacheMetrics.create("test", "1"), null);
     for (int i = max; i >= min; --i) {
       allocSameSize(a, 1 << (max - i), i);
     }
@@ -109,16 +105,18 @@ public class TestBuddyAllocator {
   @Test
   public void testMultipleArenas() throws Exception {
     int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5;
-    BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount,
-        tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+    BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc,
+        maxAlloc * arenaCount, 0, tmpDir, new DummyMemoryManager(),
+        LlapDaemonCacheMetrics.create("test", "1"), null);
     allocSameSize(a, arenaCount * 2, allocLog2);
   }
 
   @Test
   public void testMTT() {
     final int min = 3, max = 8, maxAlloc = 1 << max, allocsPerSize = 3;
-    final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc * 8,
-        maxAlloc * 24, tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+    final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc,
+        maxAlloc * 8, maxAlloc * 24, 0, tmpDir, new DummyMemoryManager(),
+        LlapDaemonCacheMetrics.create("test", "1"), null);
     ExecutorService executor = Executors.newFixedThreadPool(3);
     final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1);
     FutureTask<Void> upTask = new FutureTask<Void>(new Callable<Void>() {
@@ -162,8 +160,8 @@ public class TestBuddyAllocator {
   public void testMTTArenas() {
     final int min = 3, max = 4, maxAlloc = 1 << max, minAllocCount = 2048, threadCount = 4;
     final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc,
-        (1 << min) * minAllocCount, tmpDir, new DummyMemoryManager(),
-        LlapDaemonCacheMetrics.create("test", "1"));
+        (1 << min) * minAllocCount, 0, tmpDir, new DummyMemoryManager(),
+        LlapDaemonCacheMetrics.create("test", "1"), null);
     ExecutorService executor = Executors.newFixedThreadPool(threadCount);
     final CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1);
     Callable<Void> testCallable = new Callable<Void>() {
@@ -189,7 +187,8 @@ public class TestBuddyAllocator {
       throw new RuntimeException(t);
     }
   }
-  private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+
+  static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
     cdlIn.countDown();
     try {
       cdlOut.await();
@@ -202,8 +201,8 @@ public class TestBuddyAllocator {
       int allocCount, int arenaSizeMult, int arenaCount) throws Exception {
     int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult;
     BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, arenaSize,
-        arenaSize * arenaCount, tmpDir , new DummyMemoryManager(),
-        LlapDaemonCacheMetrics.create("test", "1"));
+        arenaSize * arenaCount, 0, tmpDir, new DummyMemoryManager(),
+        LlapDaemonCacheMetrics.create("test", "1"), null);
     allocateUp(a, min, max, allocCount, true);
     allocateDown(a, min, max, allocCount, true);
     allocateDown(a, min, max, allocCount, false);
@@ -253,19 +252,34 @@ public class TestBuddyAllocator {
     try {
       a.allocateMultiple(allocs[index], size);
     } catch (AllocatorOutOfMemoryException ex) {
-      LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.debugDumpForOomInternal());
+      LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.testDump());
       throw ex;
     }
     // LOG.info("Allocated " + allocCount + " of " + size + "; " + a.debugDump());
     for (int j = 0; j < allocCount; ++j) {
       MemoryBuffer mem = allocs[index][j];
       long testValue = testValues[index][j] = rdm.nextLong();
-      int pos = mem.getByteBufferRaw().position();
-      mem.getByteBufferRaw().putLong(pos, testValue);
-      int halfLength = mem.getByteBufferRaw().remaining() >> 1;
-      if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) {
-        mem.getByteBufferRaw().putLong(pos + halfLength, testValue);
-      }
+      putTestValue(mem, testValue);
+    }
+  }
+
+  public static void putTestValue(MemoryBuffer mem, long testValue) {
+    int pos = mem.getByteBufferRaw().position();
+    mem.getByteBufferRaw().putLong(pos, testValue);
+    int halfLength = mem.getByteBufferRaw().remaining() >> 1;
+    if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) {
+      mem.getByteBufferRaw().putLong(pos + halfLength, testValue);
+    }
+  }
+
+  public static void checkTestValue(MemoryBuffer mem, long testValue, String str) {
+    int pos = mem.getByteBufferRaw().position();
+    assertEquals("Failed to match (" + pos + ") on " + str,
+        testValue, mem.getByteBufferRaw().getLong(pos));
+    int halfLength = mem.getByteBufferRaw().remaining() >> 1;
+    if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) {
+      assertEquals("Failed to match half (" + (pos + halfLength) + ") on " + str,
+          testValue, mem.getByteBufferRaw().getLong(pos + halfLength));
     }
   }
 
@@ -286,14 +300,9 @@ public class TestBuddyAllocator {
       BuddyAllocator a, MemoryBuffer[] allocs, long[] testValues) {
     for (int j = 0; j < allocs.length; ++j) {
       LlapDataBuffer mem = (LlapDataBuffer)allocs[j];
-      int pos = mem.getByteBufferRaw().position();
-      assertEquals("Failed to match (" + pos + ") on " + j + "/" + allocs.length,
-          testValues[j], mem.getByteBufferRaw().getLong(pos));
-      int halfLength = mem.getByteBufferRaw().remaining() >> 1;
-      if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) {
-        assertEquals("Failed to match half (" + (pos + halfLength) + ") on " + j + "/"
-            + allocs.length, testValues[j], mem.getByteBufferRaw().getLong(pos + halfLength));
-      }
+      long testValue = testValues[j];
+      String str = j + "/" + allocs.length;
+      checkTestValue(mem, testValue, str);
       a.deallocate(mem);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java
new file mode 100644
index 0000000..79c44a7
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.cache.TestBuddyAllocator.DummyMemoryManager;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This relies on allocations basically being sequential, no internal reordering. Rather,
+ * the specific paths it takes do; all the scenarios should work regardless of allocation.
+ */
+public class TestBuddyAllocatorForceEvict {
+  private static final Logger LOG = LoggerFactory.getLogger(TestBuddyAllocatorForceEvict.class);
+  private static final DummyMemoryManager MM = new TestBuddyAllocator.DummyMemoryManager();
+  private static final LlapDaemonCacheMetrics METRICS = LlapDaemonCacheMetrics.create("test", "1");
+
+  @Test(timeout = 6000)
+  public void testSimple() {
+    runSimpleTests(false);
+    runSimpleTests(true);
+  }
+
+  public void runSimpleTests(boolean isBruteOnly) {
+    runSimple1to2Discard(create(1024, 1, 1024, true, isBruteOnly), 256);
+    runSimple1to2Discard(create(1024, 1, 1024, false, isBruteOnly), 256);
+    runSimple1to2Discard(create(512, 2, 1024, false, isBruteOnly), 256);
+  }
+
+  @Test(timeout = 6000)
+  public void testSmallBlocks() {
+    runSmallBlockersTests(false);
+    runSmallBlockersTests(true);
+  }
+
+  public void runSmallBlockersTests(boolean isBruteOnly) {
+    runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, false, false);
+    runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, true, false);
+    runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, false, true);
+    runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, true, true);
+
+    runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, false, false);
+    runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, true, false);
+    runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, false, true);
+    runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, true, true);
+  }
+
+  @Test(timeout = 6000)
+  public void testZebra() {
+    runZebraTests(false);
+    runZebraTests(true);
+  }
+
+  public void runZebraTests(boolean isBruteOnly) {
+    runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 1);
+    runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 64, 8, 1);
+    runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 2);
+    runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 4);
+
+    runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 32, 16, 1);
+    runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 64, 8, 1);
+    runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 32, 16, 2);
+
+    runZebraDiscard(create(256, 4, 1024, false, isBruteOnly), 32, 16, 2);
+    runZebraDiscard(create(256, 4, 1024, false, isBruteOnly), 32, 16, 4);
+  }
+
+  @Test(timeout = 6000)
+  public void testUnevenZebra() {
+    runUnevenZebraTests(false);
+    runUnevenZebraTests(true);
+  }
+
+  public void runUnevenZebraTests(boolean isBruteOnly) {
+    runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly),
+        new int[] { 256, 256, 128, 128, 128, 128 }, new int[] { 0, 2, 4 }, 512);
+    runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly),
+        new int[] { 256, 256, 64, 64, 64, 64, 64, 64, 64, 64 },
+        new int[] { 0, 2, 4, 6, 8 }, 512);
+
+    runCustomDiscard(create(512, 2, 1024, false, isBruteOnly),
+        new int[] { 256, 256, 128, 128, 128, 128 }, new int[] { 0, 2, 4 }, 512);
+    runCustomDiscard(create(512, 2, 1024, false, isBruteOnly),
+        new int[] { 256, 256, 64, 64, 64, 64, 64, 64, 64, 64 },
+        new int[] { 0, 2, 4, 6, 8 }, 512);
+  }
+
+  @Test(timeout = 6000)
+  public void testComplex1() {
+    runComplexTests(false);
+    runComplexTests(true);
+  }
+
+  public void runComplexTests(boolean isBruteOnly) {
+    runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly),
+        new int[] { 256, 128, 64, 64, 256, 64, 64, 128 },
+        new int[] { 0,            3,            6, 7 }, 512 );
+    runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly),
+        new int[] { 256, 64, 64, 64, 64, 256, 64, 64, 128 },
+        new int[] { 0,                4,           7, 8 }, 512 );
+
+    runCustomDiscard(create(512, 2, 1024, false, isBruteOnly),
+        new int[] { 256, 128, 64, 64, 256, 64, 64, 128 },
+        new int[] { 0,            3,            6, 7 }, 512 );
+    runCustomDiscard(create(512, 2, 1024, false, isBruteOnly),
+        new int[] { 256, 64, 64, 64, 64, 256, 64, 64, 128 },
+        new int[] { 0,                4,           7, 8 }, 512 );
+  }
+
+  static class MttTestCallableResult {
+    public int successes, ooms, allocSize;
+    @Override
+    public String toString() {
+      return "allocation size " + allocSize + ": " + successes + " allocations, " + ooms + " OOMs";
+    }
+  }
+
+  static class MttTestCallable implements Callable<MttTestCallableResult> {
+    private final CountDownLatch cdlIn, cdlOut;
+    private final int allocSize, allocCount, iterCount;
+    private final BuddyAllocator a;
+
+    public MttTestCallable(CountDownLatch cdlIn, CountDownLatch cdlOut, BuddyAllocator a,
+        int allocSize, int allocCount, int iterCount) {
+      this.cdlIn = cdlIn;
+      this.cdlOut = cdlOut;
+      this.a = a;
+      this.allocSize = allocSize;
+      this.allocCount = allocCount;
+      this.iterCount = iterCount;
+    }
+
+    public MttTestCallableResult call() throws Exception {
+      LOG.info(Thread.currentThread().getId() + " thread starts");
+      TestBuddyAllocator.syncThreadStart(cdlIn, cdlOut);
+      MttTestCallableResult result = new MttTestCallableResult();
+      result.allocSize = allocSize;
+      List<MemoryBuffer> allocs = new ArrayList<>(allocCount);
+      LlapAllocatorBuffer[] dest = new LlapAllocatorBuffer[1];
+      for (int i = 0; i < iterCount; ++i) {
+        for (int j = 0; j < allocCount; ++j) {
+          try {
+            dest[0] = null;
+            a.allocateMultiple(dest, allocSize);
+            LlapAllocatorBuffer buf = dest[0];
+            assertTrue(buf.incRef() > 0);
+            allocs.add(buf);
+            ++result.successes;
+            buf.decRef();
+          } catch (AllocatorOutOfMemoryException ex) {
+            ++result.ooms;
+          } catch (Throwable ex) {
+            LOG.error("Failed", ex);
+            throw new Exception(ex);
+          }
+        }
+        for (MemoryBuffer buf : allocs) {
+          try {
+            a.deallocate(buf);
+          } catch (Throwable ex) {
+            LOG.error("Failed", ex);
+            throw new Exception(ex);
+          }
+        }
+        allocs.clear();
+      }
+      return result;
+    }
+  }
+
+  @Test(timeout = 200000)
+  public void testMtt() {
+    final int baseAllocSizeLog2 = 3, maxAllocSizeLog2 = 10, totalSize = 8192,
+        baseAllocSize = 1 << baseAllocSizeLog2, maxAllocSize = 1 << maxAllocSizeLog2;
+    final int threadCount = maxAllocSizeLog2 - baseAllocSizeLog2 + 1;
+    final int iterCount = 500;
+    final BuddyAllocator a = create(maxAllocSize, 4, totalSize, true, false);
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount + 1);
+    CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1);
+    @SuppressWarnings("unchecked")
+    FutureTask<MttTestCallableResult>[] allocTasks = new FutureTask[threadCount];
+    FutureTask<Void> dumpTask = createAllocatorDumpTask(a);
+    for (int allocSize = baseAllocSize, i = 0; allocSize <= maxAllocSize; allocSize <<= 1, ++i) {
+      allocTasks[i] = new FutureTask<>(new MttTestCallable(
+          cdlIn, cdlOut, a, allocSize, totalSize / allocSize, iterCount));
+      executor.execute(allocTasks[i]);
+    }
+    executor.execute(dumpTask);
+
+    runMttTest(a, allocTasks, cdlIn, cdlOut, dumpTask, null, null, totalSize, maxAllocSize);
+  }
+
+  public static void runMttTest(BuddyAllocator a, FutureTask<?>[] allocTasks,
+      CountDownLatch cdlIn, CountDownLatch cdlOut, FutureTask<Void> dumpTask,
+      FutureTask<Void> defragTask, AtomicBoolean defragStopped, int totalSize, int maxAllocSize) {
+    Throwable t = null;
+    try {
+      cdlIn.await(); // Wait for all threads to be ready.
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } 
+    cdlOut.countDown(); // Release them at the same time.
+    for (int i = 0; i < allocTasks.length; ++i) {
+      try {
+        Object result = allocTasks[i].get();
+        LOG.info("" + result);
+      } catch (Throwable tt) {
+        LOG.error("Test callable failed", tt);
+        if (t == null) {
+          a.dumpTestLog();
+          t = tt;
+        }
+      }
+    }
+    dumpTask.cancel(true);
+    if (defragTask != null) {
+      defragStopped.set(true);
+      try {
+        defragTask.get();
+      } catch (Throwable tt) {
+        LOG.error("Defragmentation thread failed", t);
+        if (t == null) {
+          a.dumpTestLog();
+          t = tt;
+        }
+      }
+    }
+    if (t != null) {
+      throw new RuntimeException("One of the errors", t);
+    }
+    // All the tasks should have deallocated their stuff. Make sure we can allocate everything.
+    LOG.info("Allocator state after all the tasks: " + a.testDump());
+    try {
+      allocate(a, totalSize / maxAllocSize, maxAllocSize, 0);
+    } catch (Throwable tt) {
+      a.dumpTestLog();
+      throw tt;
+    }
+  }
+
+  public static FutureTask<Void> createAllocatorDumpTask(final BuddyAllocator a) {
+    return new FutureTask<Void>(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        int logs = 40000; // Prevent excessive logging in case of deadlocks or slowness.
+        while ((--logs) >= 0) {
+          LOG.info("Allocator state (MTT): " + a.testDump());
+          Thread.sleep(10);
+        }
+        return null;
+      }
+    });
+  }
+
+  private static void runCustomDiscard(BuddyAllocator a, int[] sizes, int[] dealloc, int size) {
+    LlapAllocatorBuffer[] initial = prepareCustomFragmentedAllocator(a, sizes, dealloc, true);
+    LlapAllocatorBuffer after = allocate(a, 1, size, initial.length + 1)[0];
+    LOG.info("After: " + a.testDump());
+    for (int i = 0; i < initial.length; ++i) {
+      if (initial[i] == null) continue;
+      checkTestValue(initial[i], i + 1, null, false);
+      a.deallocate(initial[i]);
+    }
+    checkTestValue(after, initial.length + 1, null, true);
+    a.deallocate(after);
+  }
+
+  private static void runZebraDiscard(
+      BuddyAllocator a, int baseSize, int pairCount, int allocs) {
+    LlapAllocatorBuffer[] initial = prepareZebraFragmentedAllocator(a, baseSize, pairCount, true);
+    int allocFraction = allocs * 2;
+    int bigAllocSize = pairCount * 2 * baseSize / allocFraction;
+    LlapAllocatorBuffer[] after = allocate(a, allocs, bigAllocSize, 1 + initial.length);
+    LOG.info("After: " + a.testDump());
+    for (int i = 0; i < pairCount; ++i) {
+      int ix = (i << 1) + 1;
+      checkTestValue(initial[ix], ix + 1, null, false);
+    }
+    checkTestValue(after[0], 1 + initial.length, null, true);
+  }
+
+  public static LlapAllocatorBuffer[] prepareZebraFragmentedAllocator(
+      BuddyAllocator a, int baseSize, int pairCount, boolean doIncRef) {
+    // Allocate 1-1-... xN; free every other one, allocate N/2 (or N/4).
+    LlapAllocatorBuffer[] initial = allocate(a, pairCount * 2, baseSize, 1, doIncRef);
+    for (int i = 0; i < pairCount; ++i) {
+      a.deallocate(initial[i << 1]);
+      initial[i << 1] = null;
+    }
+    LOG.info("Before: " + a.testDump());
+    a.setOomLoggingForTest(true);
+    return initial;
+  }
+
+  private void runSimple1to2Discard(BuddyAllocator a, int baseSize) {
+    // Allocate 1-1-1-1; free 0&2; allocate 2
+    LlapAllocatorBuffer[] initial = prepareSimpleFragmentedAllocator(a, baseSize, true);
+    LlapAllocatorBuffer[] after = allocate(a, 1, baseSize * 2, 1 + initial.length);
+    LOG.info("After: " + a.testDump());
+    checkInitialValues(initial, 0, 2);
+    checkTestValue(after[0], 1 + initial.length, null, true);
+    a.deallocate(initial[0]);
+    a.deallocate(initial[2]);
+    a.deallocate(after[0]);
+  }
+
+  public static LlapAllocatorBuffer[] prepareSimpleFragmentedAllocator(
+      BuddyAllocator a, int baseSize, boolean doIncRef) {
+    LlapAllocatorBuffer[] initial = allocate(a, 4, baseSize, 1, doIncRef);
+    checkInitialValues(initial, 0, 2);
+    a.deallocate(initial[1]);
+    a.deallocate(initial[3]);
+    LOG.info("Before: " + a.testDump());
+    a.setOomLoggingForTest(true);
+    return initial;
+  }
+
+  private void runSmallBlockersDiscard(BuddyAllocator a,
+      int baseSize, boolean deallocOneFirst, boolean deallocOneSecond) {
+    LlapAllocatorBuffer[] initial = prepareAllocatorWithSmallFragments(
+        a, baseSize, deallocOneFirst, deallocOneSecond, true);
+    int bigAllocSize = baseSize * 4;
+    LlapAllocatorBuffer[] after = allocate(a, 1, bigAllocSize, 1 + initial.length);
+    LOG.info("After: " + a.testDump());
+    checkInitialValues(initial, 2, 4);
+    checkTestValue(after[0], 1 + initial.length, null, true);
+  }
+
+  public static LlapAllocatorBuffer[] prepareAllocatorWithSmallFragments(BuddyAllocator a,
+      int baseSize, boolean deallocOneFirst, boolean deallocOneSecond, boolean doIncRef) {
+    // Allocate 2-1-1-2-1-1; free 0,3 and optionally 1 or 5; allocate 4
+    int offset = 0;
+    LlapAllocatorBuffer[] initial = new LlapAllocatorBuffer[6];
+    initial[offset++] = allocate(a, 1, baseSize * 2, offset + 1, doIncRef)[0];
+    MemoryBuffer[] tmp = allocate(a, 2, baseSize, offset + 1);
+    System.arraycopy(tmp, 0, initial, offset, 2);
+    offset += 2;
+    initial[offset++] = allocate(a, 1, baseSize * 2, offset + 1, doIncRef)[0];
+    tmp = allocate(a, 2, baseSize, offset + 1);
+    System.arraycopy(tmp, 0, initial, offset, 2);
+    if (deallocOneFirst) {
+      a.deallocate(initial[1]);
+    }
+    if (deallocOneSecond) {
+      a.deallocate(initial[5]);
+    }
+    a.deallocate(initial[0]);
+    a.deallocate(initial[3]);
+    LOG.info("Before: " + a.testDump());
+    a.setOomLoggingForTest(true);
+    return initial;
+  }
+
+  private static void checkInitialValues(LlapAllocatorBuffer[] bufs, int... indexes) {
+    for (int index : indexes) {
+      LlapAllocatorBuffer buf = bufs[index];
+      if (!incRefIfNotEvicted(buf, false)) continue;
+      try {
+        checkTestValue(buf, index + 1, null, false);
+      } finally {
+        buf.decRef();
+      }
+    }
+  }
+
+  private static boolean incRefIfNotEvicted(LlapAllocatorBuffer buf, boolean mustExist) {
+    int rc = buf.tryIncRef();
+    if (rc == LlapAllocatorBuffer.INCREF_FAILED) {
+      fail("Failed to incref (bad state) " + buf);
+    }
+    if (rc <= 0 && mustExist) {
+      fail("Failed to incref (evicted) " + buf);
+    }
+    return rc > 0; // We expect evicted, but not failed.
+  }
+
+  private static void checkTestValue(
+      LlapAllocatorBuffer mem, long testValue, String str, boolean mustExist) {
+    if (!incRefIfNotEvicted(mem, mustExist)) return;
+    try {
+      TestBuddyAllocator.checkTestValue(mem, testValue, str);
+    } finally {
+      mem.decRef();
+    }
+  }
+
+  public static BuddyAllocator create(int max, int arenas, int total, boolean isShortcut,
+      boolean isBruteForceOnly) {
+     BuddyAllocator result = new BuddyAllocator(false, false, 8, max, arenas, total, 0,
+         null, MM, METRICS, isBruteForceOnly ? "brute" : null);
+     if (!isShortcut) {
+       result.disableDefragShortcutForTest();
+     }
+     result.setOomLoggingForTest(false);
+     return result;
+  }
+
+  private static LlapAllocatorBuffer[] allocate(
+      BuddyAllocator a, int count, int size, int baseValue) {
+    return allocate(a, count, size, baseValue, true);
+  }
+
+  public static LlapAllocatorBuffer[] allocate(
+      BuddyAllocator a, int count, int size, int baseValue, boolean doIncRef) {
+    LlapAllocatorBuffer[] allocs = new LlapAllocatorBuffer[count];
+    try {
+      a.allocateMultiple(allocs, size);
+    } catch (AllocatorOutOfMemoryException ex) {
+      LOG.error("Failed to allocate " + allocs.length + " of " + size + "; " + a.testDump());
+      throw ex;
+    }
+    for (int i = 0; i < count; ++i) {
+      // Make sure buffers are eligible for discard.
+      if (doIncRef) {
+        int rc = allocs[i].incRef();
+        assertTrue(rc > 0);
+      }
+      TestBuddyAllocator.putTestValue(allocs[i], baseValue + i);
+      if (doIncRef) {
+        allocs[i].decRef();
+      }
+    }
+    return allocs;
+  }
+
+  public static LlapAllocatorBuffer[] prepareCustomFragmentedAllocator(
+      BuddyAllocator a, int[] sizes, int[] dealloc, boolean doIncRef) {
+    LlapAllocatorBuffer[] initial = new LlapAllocatorBuffer[sizes.length];
+    for (int i = 0; i < sizes.length; ++i) {
+      initial[i] = allocate(a, 1, sizes[i], i + 1, doIncRef)[0];
+    }
+    for (int i = 0; i < dealloc.length; ++i) {
+      a.deallocate(initial[dealloc[i]]);
+      initial[dealloc[i]] = null;
+    }
+    LOG.info("Before: " + a.testDump());
+    a.setOomLoggingForTest(true);
+    return initial;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index e95f807..ab10285 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -59,7 +59,7 @@ public class TestLowLevelCacheImpl {
     public void allocateMultiple(MemoryBuffer[] dest, int size) {
       for (int i = 0; i < dest.length; ++i) {
         LlapDataBuffer buf = new LlapDataBuffer();
-        buf.initialize(0, null, -1, size);
+        buf.initialize(null, -1, size);
         dest[i] = buf;
       }
     }
@@ -86,6 +86,12 @@ public class TestLowLevelCacheImpl {
     public MemoryBuffer createUnallocated() {
       return new LlapDataBuffer();
     }
+
+    @Override
+    public void allocateMultiple(MemoryBuffer[] dest, int size,
+        BufferObjectFactory factory) throws AllocatorOutOfMemoryException {
+      allocateMultiple(dest, size);
+    }
   }
 
   private static class DummyCachePolicy implements LowLevelCachePolicy {
@@ -116,11 +122,6 @@ public class TestLowLevelCacheImpl {
     }
 
     @Override
-    public long tryEvictContiguousData(int allocationSize, int count) {
-      return count * allocationSize;
-    }
-
-    @Override
     public void debugDumpShort(StringBuilder sb) {
     }
   }
@@ -249,7 +250,7 @@ Example code to test specific scenarios:
     evict(cache, fakes[2]);
     verifyCacheGet(cache, fn1, 1, 3, dr(1, 2), fakes[1]);
     verifyCacheGet(cache, fn2, 1, 2, dr(1, 2));
-    verifyRefcount(fakes, -1, 4, -1);
+    verifyRefcount(fakes, 0, 4, 0);
   }
 
   @Test
@@ -372,8 +373,8 @@ Example code to test specific scenarios:
                   continue;
                 }
                 ++gets;
-                LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).getBuffer();
-                assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
+                LlapAllocatorBuffer result = (LlapAllocatorBuffer)((CacheChunk)iter).getBuffer();
+                assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.getArenaIndex());
                 cache.decRefBuffer(result);
                 iter = iter.next;
               }
@@ -388,7 +389,7 @@ Example code to test specific scenarios:
               MemoryBuffer[] buffers = new MemoryBuffer[count];
               for (int j = 0; j < offsets.length; ++j) {
                 LlapDataBuffer buf = LowLevelCacheImpl.allocateFake();
-                buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]);
+                buf.setNewAllocLocation(makeFakeArenaIndex(fileIndex, offsets[j]), 0);
                 buffers[j] = buf;
               }
               long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null);
@@ -401,7 +402,7 @@ Example code to test specific scenarios:
               for (int j = 0; j < offsets.length; ++j) {
                 LlapDataBuffer buf = (LlapDataBuffer)(buffers[j]);
                 if ((maskVal & 1) == 1) {
-                  assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex);
+                  assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.getArenaIndex());
                 }
                 maskVal >>= 1;
                 cache.decRefBuffer(buf);
@@ -415,7 +416,7 @@ Example code to test specific scenarios:
       }
 
       private int makeFakeArenaIndex(int fileIndex, long offset) {
-        return (int)((fileIndex << 16) + offset);
+        return (int)((fileIndex << 12) + offset);
       }
     };
 
@@ -438,7 +439,7 @@ Example code to test specific scenarios:
             if (r instanceof CacheChunk) {
               LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).getBuffer();
               cache.decRefBuffer(result);
-              if (victim == null && result.invalidate()) {
+              if (victim == null && result.invalidate() == LlapCacheableBuffer.INVALIDATE_OK) {
                 ++evictions;
                 victim = result;
               }
@@ -491,7 +492,7 @@ Example code to test specific scenarios:
     for (int i = 0; i < refCount; ++i) {
       victimBuffer.decRef();
     }
-    assertTrue(victimBuffer.invalidate());
+    assertTrue(LlapCacheableBuffer.INVALIDATE_OK == victimBuffer.invalidate());
     cache.notifyEvicted(victimBuffer);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 210cbb0..f86a37c 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 
 import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -205,61 +204,6 @@ public class TestLowLevelLrfuCachePolicy {
     assertNotSame(locked, evicted);
     unlock(lrfu, locked);
   }
-  
-
-  @Test
-  public void testForceEvictBySize() {
-    int heapSize = 12;
-    LOG.info("Testing force-eviction out of order");
-    Configuration conf = new Configuration();
-    ArrayList<LlapDataBuffer> sizeTwo = new ArrayList<LlapDataBuffer>(4),
-        sizeOne = new ArrayList<LlapDataBuffer>(4);
-    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.45f); // size of LFU heap is 4
-    EvictionTracker et = new EvictionTracker();
-    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
-    lrfu.setEvictionListener(et);
-    for (int i = 0; i < 2; ++i) {
-      sizeTwo.add(cacheSizeTwoFake(et, lrfu));
-      for (int j = 0; j < 2; ++j) {
-        LlapDataBuffer fake = LowLevelCacheImpl.allocateFake();
-        assertTrue(cache(null, lrfu, et, fake));
-        sizeOne.add(fake);
-      }
-      sizeTwo.add(cacheSizeTwoFake(et, lrfu));
-    }
-    // Now we should have two in the heap and two in the list, which is an implementation detail.
-    // Evict only big blocks.
-    et.evicted.clear();
-    assertEquals(8, lrfu.tryEvictContiguousData(2, 4));
-    for (int i = 0; i < sizeTwo.size(); ++i) {
-      LlapDataBuffer block = et.evicted.get(i);
-      assertTrue(block.isInvalid());
-      assertSame(sizeTwo.get(i), block);
-    }
-    et.evicted.clear();
-    // Evict small blocks when no big ones are available.
-    assertEquals(2, lrfu.tryEvictContiguousData(2, 1));
-    for (int i = 0; i < 2; ++i) {
-	  LlapDataBuffer block = et.evicted.get(i);
-	  assertTrue(block.isInvalid());
-	  assertSame(sizeOne.get(i), block);
-	}
-    et.evicted.clear();
-    // Evict the rest.
-    assertEquals(2, lrfu.evictSomeBlocks(3));
-    for (int i = 2; i < sizeOne.size(); ++i) {
-      LlapDataBuffer block = et.evicted.get(i - 2);
-      assertTrue(block.isInvalid());
-      assertSame(sizeOne.get(i), block);
-    }
-  }
-
-  private LlapDataBuffer cacheSizeTwoFake(EvictionTracker et, LowLevelLrfuCachePolicy lrfu) {
-    LlapDataBuffer fake = new LlapDataBuffer();
-    fake.initialize(-1, ByteBuffer.wrap(new byte[2]), 0, 2);
-    assertTrue(cache(null, lrfu, et, fake));
-    return fake;
-  }
 
   // Buffers in test are fakes not linked to cache; notify cache policy explicitly.
   public boolean cache(LowLevelCacheMemoryManager mm,

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 1d5954e..630f305 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
@@ -65,11 +66,6 @@ public class TestOrcMetadataCache {
     }
 
     @Override
-    public long tryEvictContiguousData(int allocationSize, int count) {
-      return 0;
-    }
-
-    @Override
     public void debugDumpShort(StringBuilder sb) {
     }
   }
@@ -97,11 +93,6 @@ public class TestOrcMetadataCache {
     }
 
     @Override
-    public long forceReservedMemory(int allocationSize, int count) {
-      return allocationSize * count;
-    }
-
-    @Override
     public void debugDumpShort(StringBuilder sb) {
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 5e718c3..7ad457d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -406,7 +406,7 @@ class EncodedReaderImpl implements EncodedReader {
         hasError = false;
       } finally {
         // At this point, everything in the list is going to have a refcount of one. Unless it
-        // failed between the allocation and the incref for a single item, we should be ok. 
+        // failed between the allocation and the incref for a single item, we should be ok.
         if (hasError) {
           releaseInitialRefcounts(toRead.next);
           if (toRelease != null) {
@@ -456,7 +456,7 @@ class EncodedReaderImpl implements EncodedReader {
                   if (sctx.stripeLevelStream == null) {
                     sctx.stripeLevelStream = POOLS.csdPool.take();
                     // We will be using this for each RG while also sending RGs to processing.
-                    // To avoid buffers being unlocked, run refcount one ahead; so each RG 
+                    // To avoid buffers being unlocked, run refcount one ahead; so each RG
                     // processing will decref once, and the last one will unlock the buffers.
                     sctx.stripeLevelStream.incRef();
                     // For stripe-level streams we don't need the extra refcount on the block.
@@ -706,8 +706,8 @@ class EncodedReaderImpl implements EncodedReader {
       assert originalCbIndex >= 0;
       // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
       // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
-      // is not in cache.
-      cacheWrapper.getAllocator().deallocate(getBuffer());
+      // is not in cache. releaseBuffer will decref the buffer, and also deallocate.
+      cacheWrapper.releaseBuffer(this.buffer);
       cacheWrapper.reuseBuffer(replacementBuffer);
       // Replace the buffer in our big range list, as well as in current results.
       this.buffer = replacementBuffer;
@@ -959,7 +959,7 @@ class EncodedReaderImpl implements EncodedReader {
    * to handle just for this case.
    * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
    * allocator. Uncompressed case is not mainline though so let's not complicate it.
-   * @param kind 
+   * @param kind
    */
   private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start,
       long streamOffset, long streamEnd, Kind kind) throws IOException {
@@ -1137,9 +1137,9 @@ class EncodedReaderImpl implements EncodedReader {
 
   private void allocateMultiple(MemoryBuffer[] dest, int size) {
     if (allocator != null) {
-      allocator.allocateMultiple(dest, size, isStopped);
+      allocator.allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory(), isStopped);
     } else {
-      cacheWrapper.getAllocator().allocateMultiple(dest, size);
+      cacheWrapper.getAllocator().allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory());
     }
   }
 
@@ -1477,7 +1477,7 @@ class EncodedReaderImpl implements EncodedReader {
         ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset,
             cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers, true);
         if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
-          releaseBuffer(compressed, true); // We copied the entire buffer. 
+          releaseBuffer(compressed, true); // We copied the entire buffer.
         } // else there's more data to process; will be handled in next call.
         return cc;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
index 2172bd2..3aef6f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 
 public interface StoppableAllocator extends Allocator {
   /** Stoppable allocate method specific to branch-2. */
-  void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped)
+  void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped)
       throws AllocatorOutOfMemoryException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java
index 16b9713..775233c 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java
@@ -29,6 +29,10 @@ public interface Allocator {
     private static final long serialVersionUID = 268124648177151761L;
   }
 
+  public interface BufferObjectFactory {
+    MemoryBuffer create();
+  }
+
   /**
    * Allocates multiple buffers of a given size.
    * @param dest Array where buffers are placed. Objects are reused if already there
@@ -36,14 +40,27 @@ public interface Allocator {
    * @param size Allocation size.
    * @throws AllocatorOutOfMemoryException Cannot allocate.
    */
+  @Deprecated
   void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException;
 
   /**
+   * Allocates multiple buffers of a given size.
+   * @param dest Array where buffers are placed. Objects are reused if already there
+   *             (see createUnallocated), created otherwise.
+   * @param size Allocation size.
+   * @param factory A factory to create the objects in the dest array, if needed.
+   * @throws AllocatorOutOfMemoryException Cannot allocate.
+   */
+  void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory)
+      throws AllocatorOutOfMemoryException;
+
+  /**
    * Creates an unallocated memory buffer object. This object can be passed to allocateMultiple
    * to allocate; this is useful if data structures are created for separate buffers that can
    * later be allocated together.
    * @return a new unallocated memory buffer
    */
+  @Deprecated
   MemoryBuffer createUnallocated();
 
   /** Deallocates a memory buffer.

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
index e53b737..552f20e 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
@@ -100,4 +100,10 @@ public interface DataCache {
    * @return the allocator
    */
   Allocator getAllocator();
+
+  /**
+   * Gets the buffer object factory associated with this DataCache, to use with allocator.
+   * @return the factory
+   */
+  Allocator.BufferObjectFactory getDataBufferFactory();
 }


[3/3] hive git commit: HIVE-16233 : llap: Query failed with AllocatorOutOfMemoryException (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)

Posted by se...@apache.org.
HIVE-16233 : llap: Query failed with AllocatorOutOfMemoryException (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bd32deb4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bd32deb4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bd32deb4

Branch: refs/heads/branch-2
Commit: bd32deb44b8b9baf96f2cdbbed8d9a58c3ec73d4
Parents: a4b9133
Author: sergey <se...@apache.org>
Authored: Thu Aug 23 14:23:44 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Aug 24 11:26:04 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   10 +-
 .../hadoop/hive/llap/cache/BuddyAllocator.java  | 1499 +++++++++++++++---
 .../hive/llap/cache/LlapAllocatorBuffer.java    |  396 +++++
 .../hive/llap/cache/LlapCacheableBuffer.java    |    6 +-
 .../hadoop/hive/llap/cache/LlapDataBuffer.java  |  119 +-
 .../hive/llap/cache/LowLevelCacheImpl.java      |   26 +-
 .../llap/cache/LowLevelCacheMemoryManager.java  |   54 +-
 .../hive/llap/cache/LowLevelCachePolicy.java    |    2 -
 .../llap/cache/LowLevelFifoCachePolicy.java     |   16 +-
 .../llap/cache/LowLevelLrfuCachePolicy.java     |   95 +-
 .../hadoop/hive/llap/cache/MemoryManager.java   |    2 -
 .../hive/llap/cache/SerDeLowLevelCacheImpl.java |  115 +-
 .../hadoop/hive/llap/cache/SimpleAllocator.java |   22 +-
 .../hive/llap/cache/SimpleBufferManager.java    |   12 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   11 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |   14 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java |   70 +-
 .../io/encoded/VectorDeserializeOrcWriter.java  |    3 +
 .../llap/io/metadata/OrcFileEstimateErrors.java |    4 +-
 .../hive/llap/io/metadata/OrcFileMetadata.java  |    4 +-
 .../llap/io/metadata/OrcStripeMetadata.java     |    4 +-
 .../hive/llap/cache/TestBuddyAllocator.java     |   73 +-
 .../cache/TestBuddyAllocatorForceEvict.java     |  470 ++++++
 .../hive/llap/cache/TestLowLevelCacheImpl.java  |   29 +-
 .../llap/cache/TestLowLevelLrfuCachePolicy.java |   56 -
 .../hive/llap/cache/TestOrcMetadataCache.java   |   11 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    |   16 +-
 .../ql/io/orc/encoded/StoppableAllocator.java   |    3 +-
 .../apache/hadoop/hive/common/io/Allocator.java |   17 +
 .../apache/hadoop/hive/common/io/DataCache.java |    6 +
 30 files changed, 2405 insertions(+), 760 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 53dfdd9..f884eda 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2947,7 +2947,7 @@ public class HiveConf extends Configuration {
         "LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
         "custom off-heap allocator, 'none' doesn't use either (this mode may result in\n" +
         "significant performance degradation)"),
-    LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "256Kb", new SizeValidator(),
+    LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "16Kb", new SizeValidator(),
         "Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
         "padded to minimum allocation. For ORC, should generally be the same as the expected\n" +
         "compression buffer size, or next lowest power of 2. Must be a power of 2."),
@@ -2974,6 +2974,14 @@ public class HiveConf extends Configuration {
     LLAP_ALLOCATOR_MAPPED_PATH("hive.llap.io.allocator.mmap.path", "/tmp",
         new WritableDirectoryValidator(),
         "The directory location for mapping NVDIMM/NVMe flash storage into the ORC low-level cache."),
+    LLAP_ALLOCATOR_DISCARD_METHOD("hive.llap.io.allocator.discard.method", "both",
+        new StringSet("freelist", "brute", "both"),
+        "Which method to use to force-evict blocks to deal with fragmentation:\n" +
+        "freelist - use half-size free list (discards less, but also less reliable); brute -\n" +
+        "brute force, discard whatever we can; both - first try free list, then brute force."),
+    LLAP_ALLOCATOR_DEFRAG_HEADROOM("hive.llap.io.allocator.defrag.headroom", "1Mb",
+        "How much of a headroom to leave to allow allocator more flexibility to defragment.\n" +
+        "The allocator would further cap it to a fraction of total memory."),
     LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
         "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
     LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f,

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index af9243a..abe3fc8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -34,6 +34,7 @@ import java.nio.file.Path;
 import java.nio.file.attribute.FileAttribute;
 import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
+import org.apache.hive.common.util.FixedSizedObjectPool;
 
 public final class BuddyAllocator
   implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapOomDebugDump {
@@ -55,6 +57,8 @@ public final class BuddyAllocator
   private final MemoryManager memoryManager;
   private static final long MAX_DUMP_INTERVAL_NS = 300 * 1000000000L; // 5 minutes.
   private final AtomicLong lastLog = new AtomicLong(-1);
+  private final LlapDaemonCacheMetrics metrics;
+  private static final int MAX_DISCARD_ATTEMPTS = 10, LOG_DISCARD_ATTEMPTS = 5;
 
   // Config settings
   private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;
@@ -63,19 +67,24 @@ public final class BuddyAllocator
   private final boolean isDirect;
   private final boolean isMapped;
   private final Path cacheDir;
-  private final LlapDaemonCacheMetrics metrics;
+
+  // These are only used for tests.
+  private boolean enableDefragShortcut = true, oomLogging = true;
 
   // We don't know the acceptable size for Java array, so we'll use 1Gb boundary.
   // That is guaranteed to fit any maximum allocation.
   private static final int MAX_ARENA_SIZE = 1024*1024*1024;
   // Don't try to operate with less than MIN_SIZE allocator space, it will just give you grief.
   private static final int MIN_TOTAL_MEMORY_SIZE = 64*1024*1024;
+  // Maximum reasonable defragmentation headroom. Mostly kicks in on very small caches.
+  private static final float MAX_DEFRAG_HEADROOM_FRACTION = 0.01f;
 
   private static final FileAttribute<Set<PosixFilePermission>> RWX = PosixFilePermissions
       .asFileAttribute(PosixFilePermissions.fromString("rwx------"));
-  private static final FileAttribute<Set<PosixFilePermission>> RW_ = PosixFilePermissions
-      .asFileAttribute(PosixFilePermissions.fromString("rw-------"));
-
+  private final AtomicLong[] defragCounters;
+  private final boolean doUseFreeListDiscard, doUseBruteDiscard;
+  private final FixedSizedObjectPool<DiscardContext> ctxPool;
+  private final static boolean assertsEnabled = areAssertsEnabled();
 
   public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) {
     this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT),
@@ -83,9 +92,16 @@ public final class BuddyAllocator
         (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC),
         (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC),
         HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT),
-        getMaxTotalMemorySize(conf), 
+        getMaxTotalMemorySize(conf),
+        HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_DEFRAG_HEADROOM),
         HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED_PATH),
-        mm, metrics);
+        mm, metrics, HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_DISCARD_METHOD));
+  }
+
+  private static boolean areAssertsEnabled() {
+    boolean assertsEnabled = false;
+    assert assertsEnabled = true;
+    return assertsEnabled;
   }
 
   private static long getMaxTotalMemorySize(Configuration conf) {
@@ -96,29 +112,21 @@ public final class BuddyAllocator
     throw new RuntimeException("Allocator space is too small for reasonable operation; "
         + ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname + "=" + maxSize + ", but at least "
         + MIN_TOTAL_MEMORY_SIZE + " is required. If you cannot spare any memory, you can "
-        + "disable LLAP IO entirely via " + ConfVars.LLAP_IO_ENABLED.varname + "; or set "
-        + ConfVars.LLAP_IO_MEMORY_MODE.varname + " to 'none'");
+        + "disable LLAP IO entirely via " + ConfVars.LLAP_IO_ENABLED.varname);
   }
 
   @VisibleForTesting
-  public BuddyAllocator(boolean isDirectVal, int minAllocVal, int maxAllocVal, int arenaCount,
-      long maxSizeVal, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics) {
-    this(isDirectVal, false /*isMapped*/,  minAllocVal, maxAllocVal, arenaCount, maxSizeVal, 
-        null /* mapping path */, memoryManager, metrics);
-  }
-
-  @VisibleForTesting
-  public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, int maxAllocVal,
-      int arenaCount, long maxSizeVal, String mapPath, MemoryManager memoryManager,
-      LlapDaemonCacheMetrics metrics) {
+  public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal,
+      int maxAllocVal, int arenaCount, long maxSizeVal, long defragHeadroom, String mapPath,
+      MemoryManager memoryManager, LlapDaemonCacheMetrics metrics, String discardMethod) {
     isDirect = isDirectVal;
     isMapped = isMappedVal;
     minAllocation = minAllocVal;
     maxAllocation = maxAllocVal;
     if (isMapped) {
       try {
-        cacheDir =
-            Files.createTempDirectory(FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
+        cacheDir = Files.createTempDirectory(
+            FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
       } catch (IOException ioe) {
         // conf validator already checks this, so it will never trigger usually
         throw new AssertionError("Configured mmap directory should be writable", ioe);
@@ -126,6 +134,69 @@ public final class BuddyAllocator
     } else {
       cacheDir = null;
     }
+
+    arenaSize = validateAndDetermineArenaSize(arenaCount, maxSizeVal);
+    maxSize = validateAndDetermineMaxSize(maxSizeVal);
+    memoryManager.updateMaxSize(determineMaxMmSize(defragHeadroom, maxSize));
+
+    minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
+    maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
+    arenaSizeLog2 = 63 - Long.numberOfLeadingZeros(arenaSize);
+    maxArenas = (int)(maxSize / arenaSize);
+    arenas = new Arena[maxArenas];
+    for (int i = 0; i < maxArenas; ++i) {
+      arenas[i] = new Arena();
+    }
+    Arena firstArena = arenas[0];
+    firstArena.init(0);
+    allocatedArenas.set(1);
+    this.memoryManager = memoryManager;
+    defragCounters = new AtomicLong[maxAllocLog2 - minAllocLog2 + 1];
+    for (int i = 0; i < defragCounters.length; ++i) {
+      defragCounters[i] = new AtomicLong(0);
+    }
+    this.metrics = metrics;
+    metrics.incrAllocatedArena();
+    boolean isBoth = null == discardMethod || "both".equalsIgnoreCase(discardMethod);
+    doUseFreeListDiscard = isBoth || "freelist".equalsIgnoreCase(discardMethod);
+    doUseBruteDiscard = isBoth || "brute".equalsIgnoreCase(discardMethod);
+    ctxPool = new FixedSizedObjectPool<DiscardContext>(32,
+        new FixedSizedObjectPool.PoolObjectHelper<DiscardContext>() {
+          @Override
+          public DiscardContext create() {
+            return new DiscardContext();
+          }
+          @Override
+          public void resetBeforeOffer(DiscardContext t) {
+          }
+      });
+ }
+
+  public long determineMaxMmSize(long defragHeadroom, long maxMmSize) {
+    if (defragHeadroom > 0) {
+      long maxHeadroom = (long) Math.floor(maxSize * MAX_DEFRAG_HEADROOM_FRACTION);
+      defragHeadroom = Math.min(maxHeadroom, defragHeadroom);
+      LlapIoImpl.LOG.info("Leaving " + defragHeadroom + " of defragmentation headroom");
+      maxMmSize -= defragHeadroom;
+    }
+    return maxMmSize;
+  }
+
+  public long validateAndDetermineMaxSize(long maxSizeVal) {
+    if ((maxSizeVal % arenaSize) > 0) {
+      long oldMaxSize = maxSizeVal;
+      maxSizeVal = (maxSizeVal / arenaSize) * arenaSize;
+      LlapIoImpl.LOG.warn("Rounding cache size to " + maxSizeVal + " from " + oldMaxSize
+          + " to be divisible by arena size " + arenaSize);
+    }
+    if ((maxSizeVal / arenaSize) > Integer.MAX_VALUE) {
+      throw new RuntimeException(
+          "Too many arenas needed to allocate the cache: " + arenaSize + ", " + maxSizeVal);
+    }
+    return maxSizeVal;
+  }
+
+  public int validateAndDetermineArenaSize(int arenaCount, long maxSizeVal) {
     long arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : maxSizeVal / arenaCount;
     // The math.min, and the fact that maxAllocation is an int, ensures we don't overflow.
     arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
@@ -156,91 +227,53 @@ public final class BuddyAllocator
       LlapIoImpl.LOG.warn("Rounding arena size to " + arenaSizeVal + " from " + oldArenaSize
           + " to be divisible by allocation size " + maxAllocation);
     }
-    arenaSize = (int)arenaSizeVal;
-    if ((maxSizeVal % arenaSize) > 0) {
-      long oldMaxSize = maxSizeVal;
-      maxSizeVal = (maxSizeVal / arenaSize) * arenaSize;
-      LlapIoImpl.LOG.warn("Rounding cache size to " + maxSizeVal + " from " + oldMaxSize
-          + " to be divisible by arena size " + arenaSize);
-    }
-    if ((maxSizeVal / arenaSize) > Integer.MAX_VALUE) {
-      throw new RuntimeException(
-          "Too many arenas needed to allocate the cache: " + arenaSize + ", " + maxSizeVal);
-    }
-    maxSize = maxSizeVal;
-    memoryManager.updateMaxSize(maxSize);
-    minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
-    maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
-    arenaSizeLog2 = 63 - Long.numberOfLeadingZeros(arenaSize);
-    maxArenas = (int)(maxSize / arenaSize);
-    arenas = new Arena[maxArenas];
-    for (int i = 0; i < maxArenas; ++i) {
-      arenas[i] = new Arena();
-    }
-    arenas[0].init();
-    allocatedArenas.set(1);
-    this.memoryManager = memoryManager;
-
-    this.metrics = metrics;
-    metrics.incrAllocatedArena();
+    return (int)arenaSizeVal;
   }
 
-
   @Override
   public void allocateMultiple(MemoryBuffer[] dest, int size)
       throws AllocatorOutOfMemoryException {
-    allocateMultiple(dest, size, null);
+    allocateMultiple(dest, size, null, null);
   }
 
   @Override
-  public void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped)
+  public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory)
+      throws AllocatorOutOfMemoryException {
+    allocateMultiple(dest, size, factory, null);
+  }
+
+  @Override
+  public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped)
       throws AllocatorOutOfMemoryException {
     assert size > 0 : "size is " + size;
     if (size > maxAllocation) {
       throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
     }
-    int freeListIx = 31 - Integer.numberOfLeadingZeros(size);
-    if (size != (1 << freeListIx)) ++freeListIx; // not a power of two, add one more
-    freeListIx = Math.max(freeListIx - minAllocLog2, 0);
+    int freeListIx = determineFreeListForAllocation(size);
     int allocLog2 = freeListIx + minAllocLog2;
     int allocationSize = 1 << allocLog2;
     // TODO: reserving the entire thing is not ideal before we alloc anything. Interleave?
     memoryManager.reserveMemory(dest.length << allocLog2, isStopped);
-    int destAllocIx = 0;
+
     for (int i = 0; i < dest.length; ++i) {
       if (dest[i] != null) continue;
-      dest[i] = createUnallocated(); // TODO: pool of objects?
+      // Note: this is backward compat only. Should be removed with createUnallocated.
+      dest[i] = factory != null ? factory.create() : createUnallocated();
     }
+
     // First try to quickly lock some of the correct-sized free lists and allocate from them.
     int arenaCount = allocatedArenas.get();
     if (arenaCount < 0) {
       arenaCount = -arenaCount - 1; // Next arena is being allocated.
     }
+
+    // Note: we might want to be smarter if threadId-s are low and there more arenas than threads.
     long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
-    {
-      int startArenaIx = (int)(threadId % arenaCount), index = startArenaIx;
-      do {
-        int newDestIx = arenas[index].allocateFast(
-            index, freeListIx, dest, destAllocIx, allocationSize);
-        if (newDestIx == dest.length) return;
-        assert newDestIx != -1;
-        destAllocIx = newDestIx;
-        if ((++index) == arenaCount) {
-          index = 0;
-        }
-      } while (index != startArenaIx);
-    }
-
-    // 1) We can get fragmented on large blocks of uncompressed data. The memory might be
-    // in there, but it might be in separate small blocks. This is a complicated problem, and
-    // several solutions (in order of decreasing ugliness and increasing complexity) are: just
-    // ask to evict the exact-sized block (there may be no such block), evict from a particular
-    // arena (policy would know allocator internals somewhat), store buffer mapping and ask to
-    // evict from specific choice of blocks next to each other or next to already-evicted block,
-    // and finally do a compaction (requires a block mapping and complex sync). For now we'd just
-    // force-evict some memory and avoid both complexity and ugliness, since large blocks are rare.
-    // 2) Fragmentation aside (TODO: and this is a very hacky solution for that),
-    // we called reserveMemory so we know that there's memory waiting for us somewhere.
+    int destAllocIx = allocateFast(dest, null, 0, dest.length,
+        freeListIx, allocationSize, (int)(threadId % arenaCount), arenaCount);
+    if (destAllocIx == dest.length) return;
+
+    // We called reserveMemory so we know that there's memory waiting for us somewhere.
     // However, we have a class of rare race conditions related to the order of locking/checking of
     // different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2.
     // We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2;
@@ -255,63 +288,361 @@ public final class BuddyAllocator
     // allocator thread (or threads per arena).
     // The 2nd one is probably much simpler and will allow us to get rid of a lot of sync code.
     // But for now we will just retry. We will evict more each time.
-    long forceReserved = 0;
     int attempt = 0;
+    boolean isFailed = false;
+    int memoryForceReleased = 0;
     try {
+      int discardFailed = 0;
       while (true) {
-        // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once
-        {
-          int startArenaIx = (int)((threadId + attempt) % arenaCount), arenaIx = startArenaIx;
-          do {
-            int newDestIx = arenas[arenaIx].allocateWithSplit(
-                arenaIx, freeListIx, dest, destAllocIx, allocationSize);
-            if (newDestIx == dest.length) return;
-            assert newDestIx != -1;
-            destAllocIx = newDestIx;
-            if ((++arenaIx) == arenaCount) {
-              arenaIx = 0;
-            }
-          } while (arenaIx != startArenaIx);
-        }
+        // Try to split bigger blocks.
+        int startArenaIx = (int)((threadId + attempt) % arenaCount);
+        destAllocIx = allocateWithSplit(dest, null, destAllocIx, dest.length,
+            freeListIx, allocationSize, startArenaIx, arenaCount, -1);
+        if (destAllocIx == dest.length) return;
 
         if (attempt == 0) {
           // Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
-          for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) {
-            destAllocIx = arenas[arenaIx].allocateWithExpand(
-                arenaIx, freeListIx, dest, destAllocIx, allocationSize);
+          destAllocIx = allocateWithExpand(
+              dest, destAllocIx, freeListIx, allocationSize, arenaCount);
+          if (destAllocIx == dest.length) return;
+        }
+
+        // Try to force-evict the fragments of the requisite size.
+        boolean hasDiscardedAny = false;
+        DiscardContext ctx = ctxPool.take();
+        try {
+          // Brute force may discard up to twice as many buffers.
+          int maxListSize = 1 << (doUseBruteDiscard ? freeListIx : (freeListIx - 1));
+          int requiredBlocks = dest.length - destAllocIx;
+          ctx.init(maxListSize, requiredBlocks);
+          // First, try to use the blocks of half size in every arena.
+          if (doUseFreeListDiscard && freeListIx > 0) {
+            discardBlocksBasedOnFreeLists(freeListIx, startArenaIx, arenaCount, ctx);
+            memoryForceReleased += ctx.memoryReleased;
+            hasDiscardedAny = ctx.resultCount > 0;
+            destAllocIx = allocateFromDiscardResult(
+                dest, destAllocIx, freeListIx, allocationSize, ctx);
             if (destAllocIx == dest.length) return;
           }
+          // Then, try the brute force search for something to throw away.
+          if (doUseBruteDiscard) {
+            ctx.resetResults();
+            discardBlocksBruteForce(freeListIx, startArenaIx, arenaCount, ctx);
+            memoryForceReleased += ctx.memoryReleased;
+            hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0);
+            destAllocIx = allocateFromDiscardResult(
+                dest, destAllocIx, freeListIx, allocationSize, ctx);
+
+            if (destAllocIx == dest.length) return;
+          }
+        } finally {
+          ctxPool.offer(ctx);
         }
-        int numberToForce = (dest.length - destAllocIx) * (attempt + 1);
-        long newReserved = memoryManager.forceReservedMemory(allocationSize, numberToForce);
-        forceReserved += newReserved;
-        if (newReserved == 0) {
-          // Cannot force-evict anything, give up.
+        if (hasDiscardedAny) {
+          discardFailed = 0;
+        } else if (++discardFailed > MAX_DISCARD_ATTEMPTS) {
           String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of "
               + dest.length + " (entire cache is fragmented and locked, or an internal issue)";
           logOomErrorMessage(msg);
+          isFailed = true;
           throw new AllocatorOutOfMemoryException(msg);
         }
-        if (attempt == 0) {
-          LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry");
-        }
         ++attempt;
       }
     } finally {
-      if (attempt > 4) {
-        LlapIoImpl.LOG.warn("Allocation of " + dest.length + " buffers of size " + size
-            + " took " + attempt + " attempts to evict enough memory");
+      memoryManager.releaseMemory(memoryForceReleased);
+      if (!isFailed && attempt >= LOG_DISCARD_ATTEMPTS) {
+        LlapIoImpl.LOG.info("Allocation of " + dest.length + " buffers of size " + size + " took "
+            + attempt + " attempts to free enough memory; force-released " + memoryForceReleased);
+      }
+    }
+  }
+
+  /** The context for the forced eviction of buffers. */
+  private static final class DiscardContext {
+    long[] results;
+    int resultCount;
+    int memoryReleased;
+
+    /**
+     * The headers for blocks we've either locked to move (if allocated), or have taken out
+     * of the free lists (if not) so that nobody allocates them while we are freeing space.
+     * All the headers will be from the arena currently being processed.
+     */
+    int[] victimHeaders;
+    int victimCount; // The count of the elements of the above that are set.
+
+    /**
+     * List-based: the base free buffers that will be paired with the space freed from
+     * victimHeaders to create the buffers of allocation size.
+     * Brute force: the buffers (that do not exist as separate buffers) composed of victimHeaders
+     * buffers; the future result buffers.
+     * All the headers will be from the arena currently being processed.
+     */
+    int[] baseHeaders;
+    int baseCount; // The count of the elements of the above that are set.
+
+    /**
+     * How many more results (or base headers) do we need to find?
+     * This object is reused between arenas; this is the only counter that is preserved.
+     */
+    int remainingToFind;
+
+    /** The headers from abandoned moved attempts that cannot yet be returned to the
+     * free lists, or unlocked due to some lock being held and deadlock potential. */
+    int[] abandonedHeaders;
+    int abandonedCount;
+
+    void init(int headersPerOneReq, int reqCount) {
+      resetResults();
+      remainingToFind = reqCount;
+      if (results == null || results.length < reqCount) {
+        results = new long[reqCount];
+        baseHeaders = new int[reqCount];
+      }
+      int maxVictimCount = headersPerOneReq * reqCount;
+      if (victimHeaders == null || victimHeaders.length < maxVictimCount) {
+        victimHeaders = new int[maxVictimCount];
+      }
+    }
+
+    void resetResults() {
+      resetBetweenArenas();
+      resultCount = memoryReleased = 0;
+    }
+
+    void resetBetweenArenas() {
+      // Reset everything for the next arena; assume everything has been cleaned.
+      victimCount = baseCount = abandonedCount = 0;
+    }
+
+    public void addResult(int arenaIx, int freeHeaderIx) {
+      results[resultCount] = makeIntPair(arenaIx, freeHeaderIx);
+      ++resultCount;
+    }
+
+    public void addBaseHeader(int headerIx) {
+      baseHeaders[baseCount] = headerIx;
+      ++baseCount;
+      --remainingToFind;
+    }
+
+    @Override
+    public String toString() {
+      return "[victimHeaders=" + Arrays.toString(victimHeaders) + ", victimCount="
+          + victimCount + ", baseHeaders=" + Arrays.toString(baseHeaders) + ", baseCount="
+          + baseCount + ", remainingToFind=" + remainingToFind  + "]";
+    }
+  }
+
+  private void discardBlocksBasedOnFreeLists(
+      int freeListIx, int startArenaIx, int arenaCount, DiscardContext ctx) {
+    defragCounters[freeListIx].incrementAndGet();
+    // The free list level the blocks from which we need to merge.
+    final int mergeListIx = freeListIx - 1;
+
+    // Try to allocate using base-buffer approach from each arena.
+    int arenaIx = startArenaIx;
+    do {
+      Arena arena = arenas[arenaIx];
+      // Reserve blocks in this arena that would empty the sections of requisite size.
+      arena.reserveDiscardBlocksBasedOnFreeList(mergeListIx, ctx);
+      // Discard the blocks.
+      discardFromCtxBasedOnFreeList(arena, ctx, freeListIx);
+
+      if (ctx.remainingToFind == 0) return; // Reserved as much as we needed.
+      ctx.resetBetweenArenas();
+      arenaIx = getNextIx(arenaIx, arenaCount, 1);
+    } while (arenaIx != startArenaIx);
+  }
+
+  private void discardBlocksBruteForce(
+      int freeListIx, int startArenaIx, int arenaCount, DiscardContext ctx) {
+    // We are going to use this counter as a pseudo-random number for the start of the search.
+    // This is to avoid churning at the beginning of the arena all the time.
+    long counter = defragCounters[freeListIx].incrementAndGet();
+    // How many blocks of this size comprise an arena.
+    int positionsPerArena = 1 << (arenaSizeLog2 - (minAllocLog2 + freeListIx));
+    // Compute the pseudo-random position from the above, then derive the actual header.
+    int startHeaderIx = ((int) (counter % positionsPerArena)) << freeListIx;
+
+    // Try to allocate using brute force approach from each arena.
+    int arenaIx = startArenaIx;
+    do {
+      Arena arena = arenas[arenaIx];
+      // Reserve blocks in this arena that would empty the sections of requisite size.
+      arena.reserveDiscardBruteForce(freeListIx, ctx, startHeaderIx);
+      // Discard the blocks.
+      discardFromCtxBruteForce(arena, ctx, freeListIx);
+
+      if (ctx.remainingToFind == 0) return; // Reserved as much as we needed.
+      ctx.resetBetweenArenas();
+      arenaIx = getNextIx(arenaIx, arenaCount, 1);
+    } while (arenaIx != startArenaIx);
+  }
+
+  /**
+   * Frees up memory by deallocating based on base and victim buffers in MoveContext.
+   * @param freeListIx The list for which the blocks are being merged.
+   */
+  private void discardFromCtxBasedOnFreeList(Arena arena, DiscardContext ctx, int freeListIx) {
+    // Discard all the locked blocks.
+    discardAllBuffersFromCtx(arena, ctx);
+    // Finalize the headers.
+    for (int baseIx = ctx.baseCount - 1; baseIx >= 0; --baseIx) {
+      int baseHeaderIx = ctx.baseHeaders[baseIx];
+      int minHeaderIx = Math.min(baseHeaderIx, getBuddyHeaderIx(freeListIx - 1, baseHeaderIx));
+      finalizeDiscardResult(arena, ctx, freeListIx, minHeaderIx);
+    }
+  }
+
+  /**
+   * Frees up memory by deallocating based on base and victim buffers in MoveContext.
+   * @param freeListIx The list for which the blocks are being merged.
+   */
+  private void discardFromCtxBruteForce(Arena arena, DiscardContext ctx, int freeListIx) {
+    // Discard all the locked blocks.
+    discardAllBuffersFromCtx(arena, ctx);
+    // Finalize the headers.
+    for (int baseIx = ctx.baseCount - 1; baseIx >= 0; --baseIx) {
+      finalizeDiscardResult(arena, ctx, freeListIx, ctx.baseHeaders[baseIx]);
+    }
+  }
+
+  /**
+   * Sets the headers correctly for a newly-freed buffer after discarding stuff.
+   */
+  private void finalizeDiscardResult(
+      Arena arena, DiscardContext ctx, int freeListIx, int newlyFreeHeaderIx) {
+    int maxHeaderIx = newlyFreeHeaderIx + (1 << freeListIx);
+    if (assertsEnabled) {
+      arena.checkHeader(newlyFreeHeaderIx, -1, true);
+    }
+    arena.unsetHeaders(newlyFreeHeaderIx + 1, maxHeaderIx, CasLog.Src.CLEARED_VICTIM);
+    // Set the leftmost header of the base and its buddy (that are now being merged).
+    arena.setHeaderNoBufAlloc(newlyFreeHeaderIx, freeListIx, CasLog.Src.NEWLY_CLEARED);
+    ctx.addResult(arena.arenaIx, newlyFreeHeaderIx);
+  }
+
+  /**
+   * Discards all the victim buffers in the context.
+   */
+  private void discardAllBuffersFromCtx(Arena arena, DiscardContext ctx) {
+    for (int victimIx = 0; victimIx < ctx.victimCount; ++victimIx) {
+      int victimHeaderIx = ctx.victimHeaders[victimIx];
+      // Note: no location check here; the buffer is always locked for move.
+      LlapAllocatorBuffer buf = arena.buffers[victimHeaderIx];
+      if (buf == null) continue;
+      if (assertsEnabled) {
+        arena.checkHeader(victimHeaderIx, -1, true);
+        byte header = arena.headers[victimHeaderIx];
+        assertBufferLooksValid(freeListFromHeader(header), buf, arena.arenaIx, victimHeaderIx);
       }
-      // After we succeed (or fail), release the force-evicted memory to memory manager. We have
-      // previously reserved enough to allocate all we need, so we don't take our allocation out
-      // of this - as per the comment above, we basically just wasted a bunch of cache (and CPU).
-      if (forceReserved > 0) {
-        memoryManager.releaseMemory(forceReserved);
+      // We do not modify the header here; the caller will use this space.
+      arena.buffers[victimHeaderIx] = null;
+      long memUsage = buf.getMemoryUsage();
+      Boolean result = buf.endDiscard();
+      if (result == null) {
+        ctx.memoryReleased += memUsage; // We have essentially deallocated this.
+      } else if (result) {
+        // There was a parallel deallocate; it didn't account for the memory.
+        memoryManager.releaseMemory(memUsage);
+      } else {
+        // There was a parallel cache eviction - the evictor is accounting for the memory.
       }
     }
   }
 
+  /**
+   * Unlocks the buffer after the discard has been abandoned.
+   */
+  private void cancelDiscard(LlapAllocatorBuffer buf, int arenaIx, int headerIx) {
+    Boolean result = buf.cancelDiscard();
+    if (result == null) return;
+    // If the result is not null, the buffer was evicted during the move.
+    if (result) {
+      long memUsage = buf.getMemoryUsage(); // Release memory - simple deallocation.
+      arenas[arenaIx].deallocate(buf, true);
+      memoryManager.releaseMemory(memUsage);
+    } else {
+      arenas[arenaIx].deallocate(buf, true); // No need to release memory - cache eviction.
+    }
+  }
+
+  /**
+   * Tries to allocate destCount - destIx blocks, using best-effort fast allocation.
+   * @param dest Option 1 - memory allocated is stored in these buffers.
+   * @param destHeaders Option 2 - memory allocated is reserved and headers returned via this.
+   * @param destIx The start index in either array where allocations are to be saved.
+   * @param destCount The end index in either array where allocations are to be saved.
+   * @param freeListIx The free list from which to allocate.
+   * @param allocSize Allocation size.
+   * @param startArenaIx From which arena to start allocating.
+   * @param arenaCount The active arena count.
+   * @return The index in the array until which the memory has been allocated.
+   */
+  private int allocateFast(MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount,
+      int freeListIx, int allocSize, int startArenaIx, int arenaCount) {
+    int index = startArenaIx;
+    do {
+      int newDestIx = arenas[index].allocateFast(
+          freeListIx, dest, destHeaders, destIx, destCount, allocSize);
+      if (newDestIx == destCount) return newDestIx;
+      assert newDestIx != -1;
+      destIx = newDestIx;
+      index = getNextIx(index, arenaCount, 1);
+    } while (index != startArenaIx);
+    return destIx;
+  }
+
+ /**
+   * Tries to allocate destCount - destIx blocks by allocating new arenas, if needed. Same args
+   * as allocateFast, except the allocations start at arenaCount (the first unallocated arena).
+   */
+  private int allocateWithExpand(MemoryBuffer[] dest, int destIx,
+      int freeListIx, int allocSize, int arenaCount) {
+    for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) {
+      destIx = arenas[arenaIx].allocateWithExpand(
+          arenaIx, freeListIx, dest, destIx, allocSize);
+      if (destIx == dest.length) return destIx;
+    }
+    return destIx;
+  }
+
+  /**
+   * Tries to allocate destCount - destIx blocks, waiting for locks and splitting the larger
+   * blocks if the correct sized blocks are not available. Args the same as allocateFast.
+   */
+  private int allocateWithSplit(MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount,
+      int freeListIx, int allocSize, int startArenaIx, int arenaCount, int maxSplitFreeListIx) {
+    int arenaIx = startArenaIx;
+    do {
+      int newDestIx = arenas[arenaIx].allocateWithSplit(
+          freeListIx, dest, destHeaders, destIx, destCount, allocSize, maxSplitFreeListIx);
+      if (newDestIx == destCount) return newDestIx;
+      assert newDestIx != -1;
+      destIx = newDestIx;
+      arenaIx = getNextIx(arenaIx, arenaCount, 1);
+    } while (arenaIx != startArenaIx);
+    return destIx;
+  }
+
+  /**
+   * Tries to allocate destCount - destIx blocks after the forced eviction of some other buffers.
+   * Args the same as allocateFast.
+   */
+  private int allocateFromDiscardResult(MemoryBuffer[] dest, int destAllocIx,
+      int freeListIx, int allocationSize, DiscardContext discardResult) {
+    for (int i = 0; i < discardResult.resultCount; ++i) {
+      long result = discardResult.results[i];
+      destAllocIx = arenas[getFirstInt(result)].allocateFromDiscard(
+          dest, destAllocIx, getSecondInt(result), freeListIx, allocationSize);
+    }
+    return destAllocIx;
+  }
+
   private void logOomErrorMessage(String msg) {
+    if (!oomLogging) return;
     while (true) {
       long time = System.nanoTime();
       long lastTime = lastLog.get();
@@ -321,7 +652,7 @@ public final class BuddyAllocator
         continue;
       }
       if (shouldLog) {
-        LlapIoImpl.LOG.error(msg + debugDumpForOom());
+        LlapIoImpl.LOG.error(msg + debugDumpForOomInternal());
       } else {
         LlapIoImpl.LOG.error(msg);
       }
@@ -329,7 +660,7 @@ public final class BuddyAllocator
     }
   }
 
-  /**
+    /**
    * Arbitrarily, we start getting the state from Allocator. Allocator calls MM which calls
    * the policies that call the eviction dispatcher that calls the caches. See init - these all
    * are connected in a cycle, so we need to make sure the who-calls-whom order is definite.
@@ -337,6 +668,10 @@ public final class BuddyAllocator
   @Override
   public void debugDumpShort(StringBuilder sb) {
     memoryManager.debugDumpShort(sb);
+    sb.append("\nDefrag counters: ");
+    for (int i = 0; i < defragCounters.length; ++i) {
+      sb.append(defragCounters[i].get()).append(", ");
+    }
     sb.append("\nAllocator state:");
     int unallocCount = 0, fullCount = 0;
     long totalFree = 0;
@@ -358,21 +693,24 @@ public final class BuddyAllocator
 
   @Override
   public void deallocate(MemoryBuffer buffer) {
-    deallocateInternal(buffer, true);
+    LlapAllocatorBuffer buf = (LlapAllocatorBuffer)buffer;
+    int arenaToRelease = buf.invalidateAndRelease();
+    if (arenaToRelease < 0) return; // The block is being moved; the move will release memory.
+    long memUsage = buf.getMemoryUsage();
+    arenas[arenaToRelease].deallocate(buf, false);
+    memoryManager.releaseMemory(memUsage);
   }
 
   @Override
   public void deallocateEvicted(MemoryBuffer buffer) {
-    deallocateInternal(buffer, false);
-  }
-
-  private void deallocateInternal(MemoryBuffer buffer, boolean doReleaseMemory) {
-    LlapDataBuffer buf = (LlapDataBuffer)buffer;
-    long memUsage = buf.getMemoryUsage();
-    arenas[buf.arenaIndex].deallocate(buf);
-    if (doReleaseMemory) {
-      memoryManager.releaseMemory(memUsage);
-    }
+    LlapAllocatorBuffer buf = (LlapAllocatorBuffer)buffer;
+    assert buf.isInvalid();
+    int arenaToRelease = buf.releaseInvalidated();
+    if (arenaToRelease < 0) return; // The block is being moved; the move will release memory.
+    arenas[arenaToRelease].deallocate(buf, false);
+    // Note: for deallocateEvicted, we do not release the memory to memManager; it may
+    // happen that the evictor tries to use the allowance before the move finishes.
+    // Retrying/more defrag should take care of that.
   }
 
   @Override
@@ -380,42 +718,7 @@ public final class BuddyAllocator
     return isDirect;
   }
 
-  public String debugDumpForOomInternal() {
-    StringBuilder result = new StringBuilder(
-        "NOTE: with multiple threads the dump is not guaranteed to be consistent");
-    for (Arena arena : arenas) {
-      arena.debugDump(result);
-    }
-    return result.toString();
-  }
-
-  // BuddyAllocatorMXBean
-  @Override
-  public boolean getIsDirect() {
-    return isDirect;
-  }
-
-  @Override
-  public int getMinAllocation() {
-    return minAllocation;
-  }
-
-  @Override
-  public int getMaxAllocation() {
-    return maxAllocation;
-  }
-
-  @Override
-  public int getArenaSize() {
-    return arenaSize;
-  }
-
-  @Override
-  public long getMaxCacheSize() {
-    return maxSize;
-  }
-
-  private ByteBuffer preallocate(int arenaSize) {
+  private ByteBuffer preallocateArenaBuffer(int arenaSize) {
     if (isMapped) {
       RandomAccessFile rwf = null;
       File rf = null;
@@ -445,20 +748,28 @@ public final class BuddyAllocator
   }
 
   private class Arena {
+    private static final int FAILED_TO_RESERVE = Integer.MAX_VALUE;
+
+    private int arenaIx;
     private ByteBuffer data;
     // Avoid storing headers with data since we expect binary size allocations.
     // Each headers[i] is a "virtual" byte at i * minAllocation.
-    private byte[] headers;
+    private LlapAllocatorBuffer[] buffers;
+    // The TS rule for headers is - a header and buffer array element for some freeList
+    // can only be modified if the corresponding freeList lock is held.
+    private byte[] headers; // Free list indices of each unallocated block, for quick lookup.
     private FreeList[] freeLists;
 
-    void init() {
+    void init(int arenaIx) {
+      this.arenaIx = arenaIx;
       try {
-        data = preallocate(arenaSize);
+        data = preallocateArenaBuffer(arenaSize);
       } catch (OutOfMemoryError oom) {
         throw new OutOfMemoryError("Cannot allocate " + arenaSize + " bytes: " + oom.getMessage()
             + "; make sure your xmx and process size are set correctly.");
       }
       int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
+      buffers = new LlapAllocatorBuffer[maxMinAllocs];
       headers = new byte[maxMinAllocs];
       int allocLog2Diff = maxAllocLog2 - minAllocLog2, freeListCount = allocLog2Diff + 1;
       freeLists = new FreeList[freeListCount];
@@ -469,14 +780,350 @@ public final class BuddyAllocator
           headerIndex = 0, headerStep = 1 << allocLog2Diff;
       freeLists[allocLog2Diff].listHead = 0;
       for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
-        // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
-        headers[headerIndex] = makeHeader(allocLog2Diff, false);
+        setHeaderFree(headerIndex, allocLog2Diff, CasLog.Src.CTOR);
         data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerStep));
         data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerStep));
         headerIndex += headerStep;
       }
     }
 
+    public void checkHeader(int headerIx, int freeListIx, boolean isLocked) {
+      checkHeaderByte(arenaIx, headerIx, freeListIx, isLocked, headers[headerIx]);
+    }
+
+    /**
+     * Reserves the blocks to use to merge into larger blocks.
+     * @param freeListIx The free list to reserve for.
+     * @param startHeaderIx The header index at which to start looking (to avoid churn at 0).
+     */
+    public void reserveDiscardBruteForce(int freeListIx, DiscardContext ctx, int startHeaderIx) {
+      if (data == null) return; // not allocated yet
+      int headerStep = 1 << freeListIx;
+      int headerIx = startHeaderIx;
+      do {
+        long reserveResult = reserveBlockContents(
+            freeListIx, headerIx, ctx.victimHeaders, ctx.victimCount, true);
+        int reservedCount = getFirstInt(reserveResult), moveSize = getSecondInt(reserveResult);
+        if (moveSize == FAILED_TO_RESERVE) {
+          for (int i = ctx.victimCount; i < ctx.victimCount + reservedCount; ++i) {
+            abandonOneHeaderBeingMoved(ctx.victimHeaders[i], CasLog.Src.ABANDON_MOVE);
+          }
+        } else {
+          ctx.victimCount += reservedCount;
+          ctx.addBaseHeader(headerIx);
+        }
+        headerIx = getNextIx(headerIx, headers.length, headerStep);
+      } while (ctx.remainingToFind > 0 && headerIx != startHeaderIx);
+    }
+
+    /**
+     * Reserves the blocks to use to merge into larger blocks.
+     * @param mergeListIx The free list to reserve base blocks from.
+     */
+    public void reserveDiscardBlocksBasedOnFreeList(int mergeListIx, DiscardContext ctx) {
+      if (data == null) return; // not allocated yet
+      FreeList freeList = freeLists[mergeListIx];
+      freeList.lock.lock();
+      try {
+        int freeHeaderIx = freeList.listHead;
+        while (freeHeaderIx >= 0) {
+          boolean reserved = false;
+          if (ctx.remainingToFind > 0) {
+            int headerToFreeIx = getBuddyHeaderIx(mergeListIx, freeHeaderIx);
+            long reserveResult = reserveBlockContents(mergeListIx, headerToFreeIx,
+                ctx.victimHeaders, ctx.victimCount, true);
+            int reservedCount = getFirstInt(reserveResult), moveSize = getSecondInt(reserveResult);
+            reserved = (moveSize != FAILED_TO_RESERVE);
+            if (!reserved) {
+              // We cannot abandon the attempt here; the concurrent operations might have released
+              // all the buffer comprising our buddy block, necessitating a merge into a higher
+              // list. That may deadlock with another thread locking its own victims (one can only
+              // take list locks separately, or moving DOWN). The alternative would be to release
+              // the free list lock before reserving, however iterating the list that way is
+              // difficult (we'd have to keep track of things on the main path to avoid re-trying
+              // the same headers repeatedly - we'd rather keep track of extra things on failure).
+              prepareAbandonUnfinishedMoveAttempt(ctx, reservedCount);
+            } else {
+              ctx.victimCount += reservedCount;
+              ctx.addBaseHeader(freeHeaderIx);
+            }
+          }
+          int nextFreeHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(freeHeaderIx));
+          if (reserved) {
+            removeBlockFromFreeList(freeList, freeHeaderIx, mergeListIx);
+            if (assertsEnabled) {
+              checkHeader(freeHeaderIx, mergeListIx, false);
+            }
+            setHeaderNoBufAlloc(freeHeaderIx, mergeListIx, CasLog.Src.NEW_BASE);
+          }
+          if (ctx.remainingToFind == 0) break;
+          freeHeaderIx = nextFreeHeaderIx;
+        }
+      } finally {
+        freeList.lock.unlock();
+      }
+      // See the above. Release the headers after unlocking.
+      for (int i = 0; i < ctx.abandonedCount; ++i) {
+        abandonOneHeaderBeingMoved(ctx.abandonedHeaders[i], CasLog.Src.ABANDON_AT_END);
+      }
+      ctx.abandonedCount = 0;
+    }
+
+    /**
+     * Saves the victim headers from a failed reserve into a separate array into the context.
+     * See the comment at the call site; this is to prevent deadlocks.
+     * @param startIx the victim count before this reserve was started.
+     */
+    private void prepareAbandonUnfinishedMoveAttempt(DiscardContext ctx, int count) {
+      if (count == 0) return; // Nothing to do.
+      int startIx = ctx.victimCount, start;
+      if (ctx.abandonedHeaders == null) {
+        start = 0;
+        ctx.abandonedHeaders = new int[count];
+      } else {
+        start = ctx.abandonedCount;
+        int newLen = start + count;
+        if (newLen > ctx.abandonedHeaders.length) {
+          ctx.abandonedHeaders = Arrays.copyOf(ctx.abandonedHeaders, newLen);
+        }
+      }
+      System.arraycopy(ctx.victimHeaders, startIx, ctx.abandonedHeaders, start, count);
+      ctx.abandonedCount += count;
+      ctx.victimCount = startIx;
+    }
+
+    /**
+     * Reserve all the contents of a particular block to merge them together.
+     * @param freeListIx The list to which the hypothetical block belongs.
+     * @param freeHeaderIx The header of the base block.
+     */
+    private long reserveBlockContents(int freeListIx, int headerToFreeIx,
+        int[] victimHeaders, int victimsOffset, boolean isDiscard) {
+      // Try opportunistically for the common case - the same-sized, allocated buddy.
+      if (enableDefragShortcut) {
+        LlapAllocatorBuffer buffer = buffers[headerToFreeIx];
+        byte header = headers[headerToFreeIx];
+        if (buffer != null && freeListFromHeader(header) == freeListIx) {
+          // Double-check the header under lock.
+          FreeList freeList = freeLists[freeListIx];
+          freeList.lock.lock();
+          try {
+            // Noone can take this buffer out and thus change the level after we lock, and if
+            // they take it out before we lock, then we will fail to lock (same as
+            // prepareOneHeaderForMove).
+            if (headers[headerToFreeIx] == header
+                && buffer.startMoveOrDiscard(arenaIx, headerToFreeIx, isDiscard)) {
+              if (assertsEnabled) {
+                assertBufferLooksValid(freeListIx, buffer, arenaIx, headerToFreeIx);
+                CasLog.logMove(arenaIx, headerToFreeIx, System.identityHashCode(buffer));
+              }
+              victimHeaders[victimsOffset] = headerToFreeIx;
+              return makeIntPair(1, buffer.allocSize);
+            }
+          } finally {
+            freeList.lock.unlock();
+          }
+          // We don't bail on failure - try in detail below.
+        }
+      }
+      // Examine the buddy block and its sub-blocks in detail.
+      long[] stack = new long[freeListIx + 1]; // Can never have more than this in elements.
+      int stackSize = 1;
+      // Seed with the buddy of this block (so the first iteration would target this block).
+      stack[0] = makeIntPair(freeListIx, getBuddyHeaderIx(freeListIx, headerToFreeIx));
+      int victimCount = 0;
+      int totalMoveSize = 0;
+      // We traverse the leaf nodes of the tree. The stack entries indicate the existing leaf
+      // nodes that we need to see siblings for, and sibling levels.
+      while (stackSize > 0) {
+        --stackSize;
+        long next = stack[stackSize];
+        int listLevel = getFirstInt(next); // This is not an actual list; see intermList.
+        int sourceHeaderIx = getSecondInt(next);
+        // Find the buddy of the header at list level. We don't know what list it is actually in.
+        int levelBuddyHeaderIx = getBuddyHeaderIx(listLevel, sourceHeaderIx);
+        // First, handle the actual thing we found.
+        long result = prepareOneHeaderForMove(levelBuddyHeaderIx, isDiscard, freeListIx);
+        if (result == -1) {
+          // We have failed to reserve a single header. Do not undo the previous ones here,
+          // the caller has to handle this to avoid races.
+          return makeIntPair(victimCount, FAILED_TO_RESERVE);
+        }
+        int allocSize = getFirstInt(result);
+        totalMoveSize += allocSize;
+        victimHeaders[victimsOffset + victimCount] = levelBuddyHeaderIx;
+        ++victimCount;
+        // Explaining this would really require a picture. Basically if the level is lower than
+        // our level, that means (imagine a tree) we are on the leftmost leaf node of the sub-tree
+        // under our sibling in the tree. So we'd need to look at the buddies of that leftmost leaf
+        // block on all the intermediate levels (aka all intermediate levels of the tree between
+        // this guy and our sibling). Including its own buddy on its own level. And so on for every
+        // sub-tree where our buddy is not on the same level as us (i.e. does not cover the entire
+        // sub-tree).
+        int actualBuddyListIx = getSecondInt(result);
+        for (int intermListIx = listLevel - 1; intermListIx >= actualBuddyListIx; --intermListIx) {
+          stack[stackSize++] = makeIntPair(intermListIx, levelBuddyHeaderIx);
+        }
+      }
+      return makeIntPair(victimCount, totalMoveSize);
+    }
+
+    /**
+     * Abandons the move attempt for a single header that may be free or allocated.
+     */
+    private void abandonOneHeaderBeingMoved(int headerIx, CasLog.Src src) {
+      byte header = headers[headerIx];
+      int freeListIx = freeListFromHeader(header);
+      if ((header & 1) != 1) failWithLog("Victim header not in use");
+      LlapAllocatorBuffer buf = buffers[headerIx];
+      if (buf != null) {
+        // Note: no location check; the buffer is always locked for move here.
+        if (assertsEnabled) {
+          assertBufferLooksValid(freeListIx, buf, arenaIx, headerIx);
+        }
+        cancelDiscard(buf, arenaIx, headerIx);
+      } else {
+        if (assertsEnabled) {
+          checkHeader(headerIx, -1, true);
+        }
+        addToFreeListWithMerge(headerIx, freeListIx, null, src);
+      }
+    }
+
+    /**
+     * Prepares victimHeaderIx header to be moved - locks if it's allocated, takes out of
+     * the free list if not.
+     * @return the list level to which the header belongs if this succeeded, -1 if not.
+     */
+    private long prepareOneHeaderForMove(int victimHeaderIx, boolean isDiscard, int maxListIx) {
+      byte header = headers[victimHeaderIx];
+      if (header == 0) return -1;
+      int freeListIx = freeListFromHeader(header);
+      if (freeListIx > maxListIx) {
+        // This can only come from a brute force discard; for now we don't discard blocks larger
+        // than the target block. We could discard it and add remainder to free lists.
+        // By definition if we are fragmented there should be a smaller buffer somewhere.
+        return -1;
+      }
+      if (buffers[victimHeaderIx] == null && (header & 1) == 1) {
+        return -1; // There's no buffer and another move is reserving this.
+      }
+      FreeList freeList = freeLists[freeListIx];
+      freeList.lock.lock();
+      try {
+        if (headers[victimHeaderIx] != header) {
+          // We bail if there are any changes. Note that we don't care about ABA here - all the
+          // stuff on the left has been taken out already so noone can touch it, and all the stuff
+          // on the right is yet to be seen so we don't care if they changed with this - if it's
+          // in the same free list, the processing sequence will remain the same going right.
+          return -1;
+        }
+        LlapAllocatorBuffer buffer = buffers[victimHeaderIx];
+         if (buffer == null && (header & 1) == 1) {
+          return -1; // The only ABA problem we care about. Ok to have another buffer in there;
+                     // not ok to have a location locked by someone else.
+        }
+        int allocSize = 0;
+        if (buffer != null) {
+          // The buffer can only be removed after the removed flag has been set. If we are able to
+          // lock it here, noone can set the removed flag and thus remove it. That would also mean
+          // that the header is not free, and noone will touch the header either.
+          if (!buffer.startMoveOrDiscard(arenaIx, victimHeaderIx, isDiscard)) {
+            return -1;
+          }
+          CasLog.logMove(arenaIx, victimHeaderIx, System.identityHashCode(buffer));
+          allocSize = allocSizeFromFreeList(freeListIx);
+        } else {
+          // Take the empty buffer out of the free list.
+          setHeaderNoBufAlloc(victimHeaderIx, freeListIx, CasLog.Src.EMPTY_V);
+          removeBlockFromFreeList(freeList, victimHeaderIx, freeListIx);
+        }
+        return makeIntPair(allocSize, freeListIx);
+      } finally {
+        freeList.lock.unlock();
+      }
+    }
+
+    /** Allocates into an empty block obtained via a forced eviction. Same args as allocateFast. */
+    public int allocateFromDiscard(MemoryBuffer[] dest, int destIx,
+        int headerIx, int freeListIx, int allocationSize) {
+      LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+      initializeNewlyAllocated(buffer, allocationSize, headerIx, offsetFromHeaderIndex(headerIx));
+      if (assertsEnabled) {
+        checkHeader(headerIx, freeListIx, true);
+      }
+      setHeaderAlloc(headerIx, freeListIx, buffer, CasLog.Src.ALLOC_DEFRAG);
+      return destIx + 1;
+    }
+
+    /** Sets the header at an index to refer to an allocated buffer. */
+    private void setHeaderAlloc(int headerIx, int freeListIx, LlapAllocatorBuffer alloc,
+        CasLog.Src src) {
+      assert alloc != null;
+      headers[headerIx] = makeHeader(freeListIx, true);
+      buffers[headerIx] = alloc;
+      CasLog.logSet(src, arenaIx, headerIx, System.identityHashCode(alloc));
+    }
+
+    /** Sets the header at an index to refer to free space in a certain free list. */
+    private void setHeaderFree(int headerIndex, int freeListIx, CasLog.Src src) {
+      headers[headerIndex] = makeHeader(freeListIx, false);
+      buffers[headerIndex] = null;
+      CasLog.logSetFree(src, arenaIx, headerIndex, allocSizeFromFreeList(freeListIx));
+    }
+
+    /** Sets the header at an index to refer to some space in use, without an allocation. */
+    private void setHeaderNoBufAlloc(int headerIndex, int freeListIx, CasLog.Src src) {
+      headers[headerIndex] = makeHeader(freeListIx, true);
+      CasLog.logSetNb(src, arenaIx, headerIndex, allocSizeFromFreeList(freeListIx));
+    }
+
+    /** Unsets the header at an index (meaning this does not refer to a buffer). */
+    private void unsetHeader(int headerIndex, CasLog.Src src) {
+      headers[headerIndex] = 0;
+      CasLog.logUnset(src, arenaIx, headerIndex, headerIndex);
+    }
+
+    /** Unsets the headers (meaning this does not refer to a buffer). */
+    private void unsetHeaders(int fromHeaderIx, int toHeaderIx, CasLog.Src src) {
+      Arrays.fill(headers, fromHeaderIx, toHeaderIx, (byte)0);
+      CasLog.logUnset(src, arenaIx, fromHeaderIx, toHeaderIx - 1);
+    }
+
+    private void debugDump(StringBuilder result) {
+      result.append("\nArena: ");
+      if (data == null) {
+        result.append(" not allocated");
+        return;
+      }
+      // Try to get as consistent view as we can; make copy of the headers.
+      byte[] headers = new byte[this.headers.length];
+      System.arraycopy(this.headers, 0, headers, 0, headers.length);
+      int allocSize = minAllocation;
+      for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
+        result.append("\n  free list for size " + allocSize + ": ");
+        FreeList freeList = freeLists[i];
+        freeList.lock.lock();
+        try {
+          int nextHeaderIx = freeList.listHead;
+          while (nextHeaderIx >= 0) {
+            result.append(nextHeaderIx + ", ");
+            nextHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(nextHeaderIx));
+          }
+        } finally {
+          freeList.lock.unlock();
+        }
+      }
+      for (int i = 0; i < headers.length; ++i) {
+        byte header = headers[i];
+        if (header == 0) continue;
+        int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
+        boolean isFree = buffers[i] == null;
+        result.append("\n  block " + i + " at " + offset + ": size "
+            + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
+      }
+    }
+
     public Integer debugDumpShort(StringBuilder result) {
       if (data == null) {
         return null;
@@ -507,78 +1154,67 @@ public final class BuddyAllocator
       return total;
     }
 
-    public void debugDump(StringBuilder result) {
-      result.append("\nArena: ");
+    private void testDump(StringBuilder result) {
+      result.append("{");
       if (data == null) {
-        result.append(" not allocated");
+        result.append("}, ");
         return;
       }
       // Try to get as consistent view as we can; make copy of the headers.
       byte[] headers = new byte[this.headers.length];
       System.arraycopy(this.headers, 0, headers, 0, headers.length);
-      int allocSize = minAllocation;
-      for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
-        result.append("\n  free list for size " + allocSize + ": ");
-        FreeList freeList = freeLists[i];
-        freeList.lock.lock();
-        try {
-          int nextHeaderIx = freeList.listHead;
-          while (nextHeaderIx >= 0) {
-            result.append(nextHeaderIx + ", ");
-            nextHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(nextHeaderIx));
-          }
-        } finally {
-          freeList.lock.unlock();
-        }
-      }
       for (int i = 0; i < headers.length; ++i) {
         byte header = headers[i];
         if (header == 0) continue;
-        int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
-        boolean isFree = (header & 1) == 0;
-        result.append("\n  block " + i + " at " + offset + ": size "
-            + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
+        String allocState = ".";
+        if (buffers[i] != null) {
+          allocState = "*"; // Allocated
+        } else if ((header & 1) == 1) {
+          allocState = "!"; // Locked for defrag
+        }
+        int size = 1 << (freeListFromHeader(header) + minAllocLog2);
+        result.append("[").append(size).append(allocState).append("@").append(i).append("]");
       }
+      result.append("}, ");
     }
 
-    private int freeListFromHeader(byte header) {
-      return (header >> 1) - 1;
-    }
-
-    private int allocateFast(
-        int arenaIx, int freeListIx, MemoryBuffer[] dest, int ix, int size) {
+    private int allocateFast(int freeListIx, MemoryBuffer[] dest, long[] destHeaders,
+        int destIx, int destCount, int allocSize) {
       if (data == null) return -1; // not allocated yet
       FreeList freeList = freeLists[freeListIx];
-      if (!freeList.lock.tryLock()) return ix;
+      if (!freeList.lock.tryLock()) return destIx;
       try {
-        return allocateFromFreeListUnderLock(arenaIx, freeList, freeListIx, dest, ix, size);
+        return allocateFromFreeListUnderLock(
+            freeList, freeListIx, dest, destHeaders, destIx, destCount, allocSize);
       } finally {
         freeList.lock.unlock();
       }
     }
 
-    private int allocateWithSplit(int arenaIx, int freeListIx,
-        MemoryBuffer[] dest, int ix, int allocationSize) {
+    private int allocateWithSplit(int freeListIx, MemoryBuffer[] dest,
+        long[] destHeaders, int destIx, int destCount, int allocSize, int maxSplitFreeListIx) {
       if (data == null) return -1; // not allocated yet
       FreeList freeList = freeLists[freeListIx];
       int remaining = -1;
       freeList.lock.lock();
       try {
         // Try to allocate from target-sized free list, maybe we'll get lucky.
-        ix = allocateFromFreeListUnderLock(
-            arenaIx, freeList, freeListIx, dest, ix, allocationSize);
-        remaining = dest.length - ix;
-        if (remaining == 0) return ix;
+        destIx = allocateFromFreeListUnderLock(
+            freeList, freeListIx, dest, destHeaders, destIx, destCount, allocSize);
+        remaining = destCount - destIx;
+        if (remaining == 0) return destIx;
       } finally {
         freeList.lock.unlock();
       }
-      byte headerData = makeHeader(freeListIx, true); // Header for newly allocated used blocks.
       int headerStep = 1 << freeListIx; // Number of headers (smallest blocks) per target block.
       int splitListIx = freeListIx + 1; // Next free list from which we will be splitting.
       // Each iteration of this loop tries to split blocks from one level of the free list into
       // target size blocks; if we cannot satisfy the allocation from the free list containing the
       // blocks of a particular size, we'll try to split yet larger blocks, until we run out.
-      while (remaining > 0 && splitListIx < freeLists.length) {
+      if (maxSplitFreeListIx == -1) {
+        maxSplitFreeListIx = freeLists.length - 1;
+      }
+      while (remaining > 0 && splitListIx <= maxSplitFreeListIx) {
         int splitWaysLog2 = (splitListIx - freeListIx);
         assert splitWaysLog2 > 0;
         int splitWays = 1 << splitWaysLog2; // How many ways each block splits into target size.
@@ -596,35 +1232,39 @@ public final class BuddyAllocator
             remaining -= toTake;
             lastSplitBlocksRemaining = splitWays - toTake; // Whatever remains.
             // Take toTake blocks by splitting the block at offset.
-            for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
-              headers[headerIx] = headerData;
-              // TODO: this could be done out of the lock, we only need to take the blocks out.
-              ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
+            for (; toTake > 0; ++destIx, --toTake, headerIx += headerStep, offset += allocSize) {
+              if (assertsEnabled) {
+                checkHeader(headerIx, -1, false); // Cannot validate the list, it may be unset
+              }
+              if (dest != null) {
+                LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+                initializeNewlyAllocated(buffer, allocSize, headerIx, offset);
+                setHeaderAlloc(headerIx, freeListIx, buffer, CasLog.Src.ALLOC_SPLIT_BUF);
+              } else {
+                destHeaders[destIx] = makeIntPair(arenaIx, headerIx);
+                setHeaderNoBufAlloc(headerIx, freeListIx, CasLog.Src.ALLOC_SPLIT_DEFRAG);
+              }
             }
             lastSplitNextHeader = headerIx; // If anything remains, this is where it starts.
             headerIx = getNextFreeListItem(origOffset);
           }
-          replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head.
+          replaceListHeadUnderLock(splitList, headerIx, splitListIx); // In the end, update free list head.
         } finally {
           splitList.lock.unlock();
         }
+        CasLog.Src src = (dest != null) ? CasLog.Src.SPLIT_AFTER_BUF : CasLog.Src.SPLIT_AFTER_DEFRAG;
         if (remaining == 0) {
           // We have just obtained all we needed by splitting some block; now we need
           // to put the space remaining from that block into lower free lists.
           // We'll put at most one block into each list, since 2 blocks can always be combined
           // to make a larger-level block. Each bit in the remaining target-sized blocks count
           // is one block in a list offset from target-sized list by bit index.
+          // We do the merges here too, since the block we just allocated could immediately be
+          // moved out, then the resulting free space abandoned.
           int newListIndex = freeListIx;
           while (lastSplitBlocksRemaining > 0) {
             if ((lastSplitBlocksRemaining & 1) == 1) {
-              FreeList newFreeList = freeLists[newListIndex];
-              newFreeList.lock.lock();
-              headers[lastSplitNextHeader] = makeHeader(newListIndex, false);
-              try {
-                addBlockToFreeListUnderLock(newFreeList, lastSplitNextHeader);
-              } finally {
-                newFreeList.lock.unlock();
-              }
+              addToFreeListWithMerge(lastSplitNextHeader, newListIndex, null, src);
               lastSplitNextHeader += (1 << newListIndex);
             }
             lastSplitBlocksRemaining >>>= 1;
@@ -634,10 +1274,16 @@ public final class BuddyAllocator
         }
         ++splitListIx;
       }
-      return ix;
+      return destIx;
+    }
+
+    private void initializeNewlyAllocated(
+        LlapAllocatorBuffer buffer, int allocSize, int headerIx, int offset) {
+      buffer.initialize(data, offset, allocSize);
+      buffer.setNewAllocLocation(arenaIx, headerIx);
     }
 
-    private void replaceListHeadUnderLock(FreeList freeList, int headerIx) {
+    private void replaceListHeadUnderLock(FreeList freeList, int headerIx, int ix) {
       if (headerIx == freeList.listHead) return;
       if (headerIx >= 0) {
         int newHeadOffset = offsetFromHeaderIndex(headerIx);
@@ -655,7 +1301,7 @@ public final class BuddyAllocator
         }
         if (allocArenaCount > arenaIx) {
           // Someone already allocated this arena; just do the usual thing.
-          return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+          return allocateWithSplit(freeListIx, dest, null, ix, dest.length, size, -1);
         }
         if ((arenaIx + 1) == -arenaCount) {
           // Someone is allocating this arena. Wait a bit and recheck.
@@ -676,34 +1322,39 @@ public final class BuddyAllocator
           continue; // CAS race, look again.
         }
         assert data == null;
-        init();
+        init(arenaIx);
         boolean isCommited = allocatedArenas.compareAndSet(-arenaCount - 1, arenaCount + 1);
         assert isCommited;
         synchronized (this) {
           this.notifyAll();
         }
         metrics.incrAllocatedArena();
-        return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+        return allocateWithSplit(freeListIx, dest, null, ix, dest.length, size, -1);
       }
     }
 
-    public int offsetFromHeaderIndex(int lastSplitNextHeader) {
-      return lastSplitNextHeader << minAllocLog2;
-    }
-
-    public int allocateFromFreeListUnderLock(int arenaIx, FreeList freeList,
-        int freeListIx, MemoryBuffer[] dest, int ix, int size) {
+    public int allocateFromFreeListUnderLock(FreeList freeList, int freeListIx,
+        MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount, int allocSize) {
       int current = freeList.listHead;
-      while (current >= 0 && ix < dest.length) {
-        int offset = offsetFromHeaderIndex(current);
-        // Noone else has this either allocated or in a different free list; no sync needed.
-        headers[current] = makeHeader(freeListIx, true);
+      assert (dest == null) != (destHeaders == null);
+      while (current >= 0 && destIx < destCount) {
+        int offset = offsetFromHeaderIndex(current), allocHeaderIx = current;
         current = getNextFreeListItem(offset);
-        ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, size);
-        ++ix;
+        if (assertsEnabled) {
+          checkHeader(allocHeaderIx, freeListIx, false);
+        }
+        if (dest != null) {
+          LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+          initializeNewlyAllocated(buffer, allocSize, allocHeaderIx, offset);
+          setHeaderAlloc(allocHeaderIx, freeListIx, buffer, CasLog.Src.ALLOC_FREE_BUF);
+        } else {
+          destHeaders[destIx] = makeIntPair(arenaIx, allocHeaderIx);
+          setHeaderNoBufAlloc(allocHeaderIx, freeListIx, CasLog.Src.ALLOC_FREE_DEFRAG);
+        }
+        ++destIx;
       }
-      replaceListHeadUnderLock(freeList, current);
-      return ix;
+      replaceListHeadUnderLock(freeList, current, freeListIx);
+      return destIx;
     }
 
     private int getPrevFreeListItem(int offset) {
@@ -714,32 +1365,44 @@ public final class BuddyAllocator
       return data.getInt(offset + 4);
     }
 
-    private byte makeHeader(int freeListIx, boolean isInUse) {
-      return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
+    public void deallocate(LlapAllocatorBuffer buffer, boolean isAfterMove) {
+      assert data != null;
+      int pos = buffer.byteBuffer.position();
+      // Note: this is called by someone who has ensured the buffer is not going to be moved.
+      int headerIx = pos >>> minAllocLog2;
+      int freeListIx = freeListFromAllocSize(buffer.allocSize);
+      if (assertsEnabled && !isAfterMove) {
+        LlapAllocatorBuffer buf = buffers[headerIx];
+        if (buf != buffer) {
+          failWithLog(arenaIx + ":" + headerIx + " => "
+              + toDebugString(buffer) + ", " + toDebugString(buf));
+        }
+        assertBufferLooksValid(freeListFromHeader(headers[headerIx]), buf, arenaIx, headerIx);
+        checkHeader(headerIx, freeListIx, true);
+      }
+      buffers[headerIx] = null;
+      addToFreeListWithMerge(headerIx, freeListIx, buffer, CasLog.Src.DEALLOC);
     }
 
-    public void deallocate(LlapDataBuffer buffer) {
-      assert data != null;
-      int headerIx = buffer.byteBuffer.position() >>> minAllocLog2,
-          freeListIx = freeListFromHeader(headers[headerIx]);
-      assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2)
-          : buffer.allocSize + " " + freeListIx;
+    private void addToFreeListWithMerge(int headerIx, int freeListIx,
+        LlapAllocatorBuffer buffer, CasLog.Src src) {
       while (true) {
         FreeList freeList = freeLists[freeListIx];
-        int bHeaderIx = headerIx ^ (1 << freeListIx);
+        int bHeaderIx = getBuddyHeaderIx(freeListIx, headerIx);
         freeList.lock.lock();
         try {
           if ((freeListIx == freeLists.length - 1)
             || headers[bHeaderIx] != makeHeader(freeListIx, false)) {
             // Buddy block is allocated, or it is on higher level of allocation than we are, or we
             // have reached the top level. Add whatever we have got to the current free list.
-            addBlockToFreeListUnderLock(freeList, headerIx);
-            headers[headerIx] = makeHeader(freeListIx, false);
+            addBlockToFreeListUnderLock(freeList, headerIx, freeListIx);
+            setHeaderFree(headerIx, freeListIx, src);
             break;
           }
           // Buddy block is free and in the same free list we have locked. Take it out for merge.
-          removeBlockFromFreeList(freeList, bHeaderIx);
-          headers[bHeaderIx] = headers[headerIx] = 0; // Erase both headers of the blocks to merge.
+          removeBlockFromFreeList(freeList, bHeaderIx, freeListIx);
+          unsetHeader(bHeaderIx, src); // Erase both headers of the blocks to merge.
+          unsetHeader(headerIx, src);
         } finally {
           freeList.lock.unlock();
         }
@@ -748,7 +1411,8 @@ public final class BuddyAllocator
       }
     }
 
-    private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx) {
+    private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx, int ix) {
+      CasLog.logAddToList(arenaIx, headerIx, ix, freeList.listHead);
       if (freeList.listHead >= 0) {
         int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead);
         assert getPrevFreeListItem(oldHeadOffset) == -1;
@@ -760,13 +1424,15 @@ public final class BuddyAllocator
       freeList.listHead = headerIx;
     }
 
-    private void removeBlockFromFreeList(FreeList freeList, int headerIx) {
+    private void removeBlockFromFreeList(FreeList freeList, int headerIx, int ix) {
       int bOffset = offsetFromHeaderIndex(headerIx),
           bpHeaderIx = getPrevFreeListItem(bOffset), bnHeaderIx = getNextFreeListItem(bOffset);
+      CasLog.logRemoveFromList(arenaIx, headerIx, ix, freeList.listHead);
       if (freeList.listHead == headerIx) {
         assert bpHeaderIx == -1;
         freeList.listHead = bnHeaderIx;
       }
+      // Unnecessary: data.putInt(bOffset, -1); data.putInt(bOffset + 4, -1);
       if (bpHeaderIx != -1) {
         data.putInt(offsetFromHeaderIndex(bpHeaderIx) + 4, bnHeaderIx);
       }
@@ -778,20 +1444,341 @@ public final class BuddyAllocator
 
   private static class FreeList {
     ReentrantLock lock = new ReentrantLock(false);
-    int listHead = -1; // Index of where the buffer is; in minAllocation units
-    // TODO: One possible improvement - store blocks arriving left over from splits, and
-    //       blocks requested, to be able to wait for pending splits and reduce fragmentation.
-    //       However, we are trying to increase fragmentation now, since we cater to single-size.
+    int listHead = -1; // Index of where the buffer is; in minAllocation units (headers array).
   }
 
   @Override
+  @Deprecated
   public MemoryBuffer createUnallocated() {
     return new LlapDataBuffer();
   }
 
+  // BuddyAllocatorMXBean
+  @Override
+  public boolean getIsDirect() {
+    return isDirect;
+  }
+
+  @Override
+  public int getMinAllocation() {
+    return minAllocation;
+  }
+
+  @Override
+  public int getMaxAllocation() {
+    return maxAllocation;
+  }
+
+  @Override
+  public int getArenaSize() {
+    return arenaSize;
+  }
+
+  @Override
+  public long getMaxCacheSize() {
+    return maxSize;
+  }
+
+  // Various helper methods.
+  private static int getBuddyHeaderIx(int freeListIx, int headerIx) {
+    return headerIx ^ (1 << freeListIx);
+  }
+
+  private static int getNextIx(int ix, int count, int step) {
+    ix += step;
+    assert ix <= count; // We expect the start at 0 and count divisible by step.
+    return ix == count ? 0 : ix;
+  }
+
+  private static int freeListFromHeader(byte header) {
+    return (header >> 1) - 1;
+  }
+
+  private int freeListFromAllocSize(int allocSize) {
+    return (31 - Integer.numberOfLeadingZeros(allocSize)) - minAllocLog2;
+  }
+
+  private int allocSizeFromFreeList(int freeListIx) {
+    return 1 << (freeListIx + minAllocLog2);
+  }
+
+  public int offsetFromHeaderIndex(int lastSplitNextHeader) {
+    return lastSplitNextHeader << minAllocLog2;
+  }
+
+  private static byte makeHeader(int freeListIx, boolean isInUse) {
+    return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
+  }
+
+  private int determineFreeListForAllocation(int size) {
+    int freeListIx = 31 - Integer.numberOfLeadingZeros(size);
+    if (size != (1 << freeListIx)) ++freeListIx; // not a power of two, add one more
+    return Math.max(freeListIx - minAllocLog2, 0);
+  }
+
+  // Utility methods used to store pairs of ints as long.
+  private static long makeIntPair(int first, int second) {
+    return ((long)first) << 32 | second;
+  }
+  private static int getFirstInt(long result) {
+    return (int) (result >>> 32);
+  }
+  private static int getSecondInt(long result) {
+    return (int) (result & ((1L << 32) - 1));
+  }
+
+  // Debug/test related methods.
+  private void assertBufferLooksValid(
+      int freeListIx, LlapAllocatorBuffer buf, int arenaIx, int headerIx) {
+    if (buf.allocSize == allocSizeFromFreeList(freeListIx)) return;
+    failWithLog("Race; allocation size " + buf.allocSize + ", not "
+        + allocSizeFromFreeList(freeListIx) + " for free list "
+        + freeListIx + " at " + arenaIx + ":" + headerIx);
+  }
+
+  private static String toDebugString(LlapAllocatorBuffer buffer) {
+    return buffer == null ? "null" : buffer.toDebugString();
+  }
+
+  private void checkHeaderByte(
+      int arenaIx, int headerIx, int freeListIx, boolean isLocked, byte header) {
+    if (isLocked != ((header & 1) == 1)) {
+      failWithLog("Expected " + arenaIx + ":" + headerIx + " "
+          + (isLocked ? "" : "not ") + "locked: " + header);
+    }
+    if (freeListIx < 0) return;
+    if (freeListFromHeader(header) != freeListIx) {
+      failWithLog("Expected " + arenaIx + ":" + headerIx + " in list " + freeListIx
+          + ": " + freeListFromHeader(header));
+    }
+  }
+
+  @VisibleForTesting
+  void disableDefragShortcutForTest() {
+    this.enableDefragShortcut = false;
+  }
+
+  @VisibleForTesting
+  void setOomLoggingForTest(boolean b) {
+    this.oomLogging = b;
+  }
+
+  @VisibleForTesting
+  String testDump() {
+    StringBuilder sb = new StringBuilder();
+    for (Arena a : arenas) {
+      a.testDump(sb);
+    }
+    return sb.toString();
+  }
+
   @Override
   public String debugDumpForOom() {
     return "\nALLOCATOR STATE:\n" + debugDumpForOomInternal()
         + "\nPARENT STATE:\n" + memoryManager.debugDumpForOom();
   }
+
+  private String debugDumpForOomInternal() {
+    StringBuilder sb = new StringBuilder();
+    for (Arena a : arenas) {
+      a.debugDump(sb);
+    }
+    return sb.toString();
+  }
+
+  private void failWithLog(String string) {
+    CasLog.logError();
+    throw new AssertionError(string);
+  }
+
+  @VisibleForTesting
+  public void dumpTestLog() {
+    if (CasLog.casLog != null) {
+      CasLog.casLog.dumpLog(true);
+    }
+  }
+
+  private final static class CasLog {
+    // TODO: enable this for production debug, switching between two small buffers?
+    private final static CasLog casLog = null; //new CasLog();
+    public enum Src {
+      NEWLY_CLEARED,
+      SPLIT_AFTER_BUF,
+      SPLIT_AFTER_DEFRAG,
+      ALLOC_SPLIT_DEFRAG,
+      ALLOC_SPLIT_BUF,
+      ALLOC_DEFRAG,
+      EMPTY_V,
+      NEW_BASE,
+      CTOR,
+      MOVE_TO_NESTED,
+      MOVE_TO_ALLOC,
+      ABANDON_MOVE,
+      ABANDON_AT_END,
+      ABANDON_BASE,
+      CLEARED_BASE,
+      CLEARED_VICTIM,
+      UNUSABLE_NESTED,
+      ABANDON_NESTED,
+      DEALLOC,
+      ALLOC_FREE_DEFRAG,
+      ALLOC_FREE_BUF
+    }
+    private final int size;
+    private final long[] log;
+    private final AtomicInteger offset = new AtomicInteger(0);
+
+
+    public CasLog() {
+      size = 50000000;
+      log = new long[size];
+    }
+
+    public static final int START_MOVE = 0, SET_NB = 1, SET_BUF = 2, SET_FREE = 3,
+        ADD_TO_LIST = 4, REMOVE_FROM_LIST = 5, ERROR = 6, UNSET = 7;
+
+    public static void logMove(int arenaIx, int buddyHeaderIx, int identityHashCode) {
+      if (casLog == null) return;
+      int ix = casLog.offset.addAndGet(3) - 3;
+      casLog.log[ix] = makeIntPair(START_MOVE, identityHashCode);
+      casLog.log[ix + 1] = Thread.currentThread().getId();
+      casLog.log[ix + 2] = makeIntPair(arenaIx, buddyHeaderIx);
+    }
+
+    public static void logSetNb(CasLog.Src src, int arenaIx, int headerIndex, int size) {
+      if (casLog == null) return;
+      int ix = casLog.offset.addAndGet(4) - 4;
+      casLog.log[ix] = makeIntPair(SET_NB, src.ordinal());
+      casLog.log[ix + 1] = Thread.currentThread().getId();
+      casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+      casLog.log[ix + 3] = size;
+    }
+
+    public static void logSetFree(CasLog.Src src, int arenaIx, int headerIndex, int size) {
+      if (casLog == null) return;
+      int ix = casLog.offset.addAndGet(4) - 4;
+      casLog.log[ix] = makeIntPair(SET_FREE, src.ordinal());
+      casLog.log[ix + 1] = Thread.currentThread().getId();
+      casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+      casLog.log[ix + 3] = size;
+    }
+
+    public static void logUnset(CasLog.Src src, int arenaIx, int from, int to) {
+      if (casLog == null) return;
+      if (from > to) return;
+      int ix = casLog.offset.addAndGet(4) - 4;
+      casLog.log[ix] = makeIntPair(UNSET, src.ordinal());
+      casLog.log[ix + 1] = Thread.currentThread().getId();
+      casLog.log[ix + 2] = makeIntPair(arenaIx, from);
+      casLog.log[ix + 3] = makeIntPair(arenaIx, to);
+    }
+
+    public static void logSet(CasLog.Src src, int arenaIx, int headerIndex, int identityHashCode) {
+      if (casLog == null) return;
+      int ix = casLog.offset.addAndGet(4) - 4;
+      casLog.log[ix] = makeIntPair(SET_BUF, src.ordinal());
+      casLog.log[ix + 1] = Thread.currentThread().getId();
+      casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+      casLog.log[ix + 3] = identityHashCode;
+    }
+
+    public static void logRemoveFromList(int arenaIx, int headerIx, int listIx, int listHead) {
+      if (casLog == null) return;
+      int ix = casLog.offset.addAndGet(4) - 4;
+      casLog.log[ix] = makeIntPair(REMOVE_FROM_LIST, listIx);
+      casLog.log[ix + 1] = Thread.currentThread().getId();
+      casLog.log[ix + 2] = makeIntPair(arenaIx, headerIx);
+      casLog.log[ix + 3] = listHead;
+    }
+
+    public static void logAddToList(int arenaIx, int headerIx, int listIx, int listHead) {
+      if (casLog == null) return;
+      int ix = casLog.offset.addAndGet(4) - 4;
+      casLog.log[ix] = makeIntPair(ADD_TO_LIST, listIx);
+      casLog.log[ix + 1] = Thread.currentThread().getId();
+      casLog.log[ix + 2] = makeIntPair(arenaIx, headerIx);
+      casLog.log[ix + 3] = listHead;
+    }
+
+    public static void logError() {
+      if (casLog == null) return;
+      int ix = casLog.offset.addAndGet(2) - 2;
+      casLog.log[ix] = makeIntPair(ERROR, 0);
+      casLog.log[ix + 1] = Thread.currentThread().getId();
+    }
+
+    private int dumpOneLine(int ix) {
+      int event = getFirstInt(log[ix]);
+      switch (event) {
+      case START_MOVE: {
+        LlapIoImpl.LOG.info(prefix(ix) + " started to move "
+            + header(log[ix + 2]) + " " + Integer.toHexString(getSecondInt(log[ix])));
+        return ix + 3;
+      }
+      case SET_NB: {
+        LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+            + header(log[ix + 2]) + " to taken of size " + log[ix + 3]);
+        return ix + 4;
+      }
+      case SET_FREE: {
+        LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+            + header(log[ix + 2]) + " to free of size " + log[ix + 3]);
+        return ix + 4;
+      }
+      case UNSET: {
+        LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " unset ["
+            + header(log[ix + 2]) + ", " + header(log[ix + 3]) + "]");
+        return ix + 4;
+      }
+      case SET_BUF: {
+        LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+            + header(log[ix + 2]) + " to " + Integer.toHexString((int)log[ix + 3]));
+        return ix + 4;
+      }
+      case ADD_TO_LIST: {
+        //LlapIoImpl.LOG.info(prefix(ix) + " adding " + header(log[ix + 2]) + " to "
+        //    + getSecondInt(log[ix]) + " before " + log[ix + 3]);
+        return ix + 4;
+      }
+      case REMOVE_FROM_LIST: {
+        // LlapIoImpl.LOG.info(prefix(ix) + " removing " + header(log[ix + 2]) + " from "
+        //   + getSecondInt(log[ix]) + " head " + log[ix + 3]);
+        return ix + 4;
+      }
+      case ERROR: {
+        LlapIoImpl.LOG.error(prefix(ix) + " failed");
+        return ix + 2;
+      }
+      default: throw new AssertionError("Unknown " + event);
+      }
+    }
+
+    private String prefix(int ix) {
+      return ix + " thread-" + log[ix + 1];
+    }
+
+    private String src(int val) {
+      return Src.values()[val].name();
+    }
+
+    private String header(long l) {
+      return getFirstInt(l) + ":" + getSecondInt(l);
+    }
+
+
+    public synchronized void dumpLog(boolean doSleep) {
+      if (doSleep) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+        }
+      }
+      int logSize = (int)offset.get();
+      int ix = 0;
+      while (ix < logSize) {
+        ix = dumpOneLine(ix);
+      }
+      offset.set(0);
+    }
+  }
 }


[2/3] hive git commit: HIVE-16233 : llap: Query failed with AllocatorOutOfMemoryException (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
new file mode 100644
index 0000000..1490241
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
@@ -0,0 +1,396 @@
+package org.apache.hadoop.hive.llap.cache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * We want to have cacheable and non-allocator buffers, as well as allocator buffers with no
+ * cache dependency, and also ones that are both. Alas, we could only achieve this if we were
+ * using a real programming language.
+ */
+public abstract class LlapAllocatorBuffer extends LlapCacheableBuffer implements MemoryBuffer {
+  private final AtomicLong state = new AtomicLong(0);
+
+  public ByteBuffer byteBuffer;
+  /** Allocator uses this to remember the allocation size. Somewhat redundant with header. */
+  public int allocSize;
+
+  public void initialize(ByteBuffer byteBuffer, int offset, int length) {
+    this.byteBuffer = byteBuffer.slice();
+    this.byteBuffer.position(offset);
+    this.byteBuffer.limit(offset + length);
+    this.allocSize = length;
+  }
+
+  public void initializeWithExistingSlice(ByteBuffer byteBuffer, int allocSize) {
+    this.byteBuffer = byteBuffer;
+    this.allocSize = allocSize;
+  }
+
+  public void setNewAllocLocation(int arenaIx, int headerIx) {
+    assert state.get() == 0 : "New buffer state is not 0 " + this;
+    long newState = State.setFlag(State.setLocation(0, arenaIx, headerIx), State.FLAG_NEW_ALLOC);
+    if (!state.compareAndSet(0, newState)) {
+      throw new AssertionError("Contention on the new buffer " + this);
+    }
+  }
+
+  @Override
+  public ByteBuffer getByteBufferDup() {
+    return byteBuffer.duplicate();
+  }
+
+  @Override
+  public ByteBuffer getByteBufferRaw() {
+    return byteBuffer;
+  }
+
+  @Override
+  public long getMemoryUsage() {
+    return allocSize;
+  }
+
+  public int incRef() {
+    return incRefInternal(true);
+  }
+
+  int tryIncRef() {
+    return incRefInternal(false);
+  }
+
+  static final int INCREF_EVICTED = -1, INCREF_FAILED = -2;
+  private int incRefInternal(boolean doWait) {
+    long newValue = -1;
+    while (true) {
+      long oldValue = state.get();
+      if (State.hasFlags(oldValue, State.FLAG_EVICTED)) return INCREF_EVICTED;
+      if (State.hasFlags(oldValue, State.FLAG_MOVING)) {
+        if (!doWait || !waitForState()) return INCREF_FAILED; // Thread is being interrupted.
+        continue;
+      }
+      int oldRefCount = State.getRefCount(oldValue);
+      assert oldRefCount >= 0 : "oldValue is " + oldValue + " " + this;
+      if (oldRefCount == State.MAX_REFCOUNT) throw new AssertionError(this);
+      newValue = State.incRefCount(oldValue);
+      if (State.hasFlags(oldValue, State.FLAG_NEW_ALLOC)) {
+        // Remove new-alloc flag on first use. Full unlock after that would imply force-discarding
+        // this buffer is acceptable. This is kind of an ugly compact between the cache and us.
+        newValue = State.switchFlag(newValue, State.FLAG_NEW_ALLOC);
+      }
+      if (state.compareAndSet(oldValue, newValue)) break;
+    }
+    int newRefCount = State.getRefCount(newValue);
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Locked {}; new ref count {}", this, newRefCount);
+    }
+    return newRefCount;
+  }
+
+  @VisibleForTesting
+  int getRefCount() {
+    return State.getRefCount(state.get());
+  }
+
+  @VisibleForTesting
+  @Override
+  public boolean isLocked() {
+    // Best-effort check. We cannot do a good check against caller thread, since
+    // refCount could still be > 0 if someone else locked. This is used for asserts and logs.
+    return State.getRefCount(state.get()) > 0;
+  }
+
+  @VisibleForTesting
+  public boolean isInvalid() {
+    return State.hasFlags(state.get(), State.FLAG_EVICTED);
+  }
+
+
+  public int decRef() {
+    long newState, oldState;
+    do {
+      oldState = state.get();
+      newState = State.decRefCount(oldState);
+    } while (!state.compareAndSet(oldState, newState));
+    int newRefCount = State.getRefCount(newState);
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Unlocked {}; refcount {}", this, newRefCount);
+    }
+    if (newRefCount < 0) {
+      throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
+    }
+    return newRefCount;
+  }
+
+
+  /**
+   * Invalidates the cached buffer. The memory allocation in memory manager is managed by the
+   * caller, and therefore we mark the buffer as having released the memory too.
+   * @return Whether the we can invalidate; false if locked or already evicted.
+   */
+  @Override
+  public int invalidate() {
+    while (true) {
+      long oldValue = state.get();
+      if (State.getRefCount(oldValue) != 0) return INVALIDATE_FAILED;
+      if (State.hasFlags(oldValue, State.FLAG_EVICTED)) return INVALIDATE_ALREADY_INVALID;
+      long newValue = State.setFlag(oldValue, State.FLAG_EVICTED | State.FLAG_MEM_RELEASED);
+      if (state.compareAndSet(oldValue, newValue)) break;
+    }
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Invalidated {} due to eviction", this);
+    }
+    return INVALIDATE_OK;
+  }
+
+  /**
+   * Invalidates the uncached buffer. The memory allocation in memory manager is managed by the
+   * allocator; we will only mark it as released if there are no concurrent moves. In that case,
+   * the caller of this will release the memory; otherwise, the end of move will release.
+   * @return Arena index, if the buffer memory can be released; -1 otherwise.
+   */
+  public int invalidateAndRelease() {
+    boolean result;
+    long oldValue, newValue;
+    do {
+      result = false;
+      oldValue = state.get();
+      if (State.getRefCount(oldValue) != 0) {
+        throw new AssertionError("Refcount is " + State.getRefCount(oldValue));
+      }
+      if (State.hasFlags(oldValue, State.FLAG_EVICTED)) {
+        return -1; // Concurrent force-eviction - ignore.
+      }
+      newValue = State.setFlag(oldValue, State.FLAG_EVICTED);
+      if (!State.hasFlags(oldValue, State.FLAG_MOVING)) {
+        // No move pending, the allocator can release.
+        newValue = State.setFlag(newValue, State.FLAG_REMOVED | State.FLAG_MEM_RELEASED);
+        result = true;
+      }
+    } while (!state.compareAndSet(oldValue, newValue));
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Invalidated {} due to direct deallocation", this);
+    }
+    // Arena cannot change after we have marked it as released.
+    return result ? State.getArena(oldValue) : -1;
+  }
+
+  /**
+   * Marks previously invalidated buffer as released.
+   * @return Whether the buffer memory can be released.
+   */
+  public int releaseInvalidated() {
+    long oldValue, newValue;
+    do {
+      oldValue = state.get();
+      if (!State.hasFlags(oldValue, State.FLAG_EVICTED)) {
+        throw new AssertionError("Not invalidated");
+      }
+      if (State.hasFlags(oldValue, State.FLAG_MOVING | State.FLAG_REMOVED)) return -1;
+      // No move pending and no intervening discard, the allocator can release.
+      newValue = State.setFlag(oldValue, State.FLAG_REMOVED);
+    } while (!state.compareAndSet(oldValue, newValue));
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Removed {}", this);
+    }
+    // Arena cannot change after we have marked it as released.
+    return State.getArena(oldValue);
+  }
+
+  private boolean waitForState() {
+    synchronized (state) {
+      try {
+        state.wait(10);
+        return true;
+      } catch (InterruptedException e) {
+        LlapIoImpl.LOG.debug("Buffer incRef is deffering an interrupt");
+        Thread.currentThread().interrupt();
+        return false;
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    long state = this.state.get();
+    int flags = State.getAllFlags(state);
+    return "0x" + Integer.toHexString(System.identityHashCode(this)) + "("
+        + State.getRefCount(state) + (flags == 0 ? "" : (", "
+            + State.toFlagString(flags))) + ")";
+  }
+
+
+  public String toDebugString() {
+    return toDebugString(state.get());
+  }
+
+  private String toDebugString(long state) {
+    return "0x" + Integer.toHexString(System.identityHashCode(this)) + "("
+        + State.getArena(state) + ":" + State.getHeader(state) + "; " + allocSize + "; "
+        + State.toFlagString(State.getAllFlags(state)) + ")";
+  }
+
+  public boolean startMoveOrDiscard(int arenaIx, int headerIx, boolean isForceDiscard) {
+    long oldValue, newValue;
+    do {
+      oldValue = state.get();
+      if (State.getRefCount(oldValue) != 0) {
+        return false;
+      }
+      int flags = State.getAllFlags(oldValue);
+      // The only allowed flag is new-alloc, and that only if we are not discarding.
+      if (flags != 0 && (isForceDiscard || flags != State.FLAG_NEW_ALLOC)) {
+        return false; // We could start a move if it's being evicted, but let's not do it for now.
+      }
+      if (State.getArena(oldValue) != arenaIx || State.getHeader(oldValue) != headerIx) {
+        return false; // The caller could re-check the location, but would probably find it locked.
+      }
+      newValue = State.setFlag(oldValue, State.FLAG_MOVING);
+    } while (!state.compareAndSet(oldValue, newValue));
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Locked {} in preparation for a move", this);
+    }
+    return true;
+  }
+
+  /**
+   * @return null if no action is required; otherwise, the buffer should be deallocated and,
+   *         if the value is true, its memory should be released to the memory manager.
+   */
+  public Boolean cancelDiscard() {
+    long oldValue, newValue;
+    Boolean result;
+    do {
+      oldValue = state.get();
+      assert State.hasFlags(oldValue, State.FLAG_MOVING) : this.toDebugString();
+      newValue = State.switchFlag(oldValue, State.FLAG_MOVING);
+      result = null;
+      if (State.hasFlags(oldValue, State.FLAG_EVICTED)) {
+        if (State.hasFlags(oldValue, State.FLAG_REMOVED)) {
+          throw new AssertionError("Removed during the move " + this);
+        }
+        result = !State.hasFlags(oldValue, State.FLAG_MEM_RELEASED);
+        // Not necessary here cause noone will be looking at these after us; set them for clarity.
+        newValue = State.setFlag(newValue, State.FLAG_MEM_RELEASED | State.FLAG_REMOVED);
+      }
+    } while (!state.compareAndSet(oldValue, newValue));
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Move ended for {}", this);
+    }
+    synchronized (state) {
+      state.notifyAll();
+    }
+    return result;
+  }
+
+  /**
+   * @return null if no action is required; otherwise, the buffer should be deallocated and,
+   *         if the value is true, its memory should be released to the memory manager.
+   */
+  public Boolean endDiscard() {
+    long oldValue, newValue;
+    Boolean result;
+    do {
+      oldValue = state.get();
+      assert State.hasFlags(oldValue, State.FLAG_MOVING);
+      newValue = State.switchFlag(oldValue, State.FLAG_MOVING);
+      newValue = State.setFlag(newValue,
+          State.FLAG_EVICTED | State.FLAG_MEM_RELEASED | State.FLAG_REMOVED);
+      result = null;
+      // See if someone else evicted this in parallel.
+      if (State.hasFlags(oldValue, State.FLAG_EVICTED)) {
+        if (State.hasFlags(oldValue, State.FLAG_REMOVED)) {
+          throw new AssertionError("Removed during the move " + this);
+        }
+        result = !State.hasFlags(oldValue, State.FLAG_MEM_RELEASED);
+      }
+    } while (!state.compareAndSet(oldValue, newValue));
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Discared {}", this);
+    }
+    synchronized (state) {
+      state.notifyAll();
+    }
+    return result;
+  }
+
+  private static final class State {
+    public static final int
+        FLAG_MOVING =       0b00001, // Locked by someone to move or force-evict.
+        FLAG_EVICTED =      0b00010, // Evicted. This is cache-specific.
+        FLAG_REMOVED =      0b00100, // Removed from allocator structures. The final state.
+        FLAG_MEM_RELEASED = 0b01000, // The memory was released to memory manager.
+        FLAG_NEW_ALLOC =    0b10000; // New allocation before the first use; cannot force-evict.
+    private static final int FLAGS_WIDTH = 5,
+        REFCOUNT_WIDTH = 19, ARENA_WIDTH = 16, HEADER_WIDTH = 24;
+
+    public static final long MAX_REFCOUNT = (1 << REFCOUNT_WIDTH) - 1;
+
+    private static final int REFCOUNT_SHIFT = FLAGS_WIDTH,
+        ARENA_SHIFT = REFCOUNT_SHIFT + REFCOUNT_WIDTH, HEADER_SHIFT = ARENA_SHIFT + ARENA_WIDTH;
+
+    private static final long FLAGS_MASK = (1L << FLAGS_WIDTH) - 1,
+      REFCOUNT_MASK = ((1L << REFCOUNT_WIDTH) - 1) << REFCOUNT_SHIFT,
+      ARENA_MASK = ((1L << ARENA_WIDTH) - 1) << ARENA_SHIFT,
+      HEADER_MASK = ((1L << HEADER_WIDTH) - 1) << HEADER_SHIFT;
+
+    public static boolean hasFlags(long value, int flags) {
+      return (value & flags) != 0;
+    }
+
+    public static int getAllFlags(long state) {
+      return (int) (state & FLAGS_MASK);
+    }
+    public static final int getRefCount(long state) {
+      return (int)((state & REFCOUNT_MASK) >>> REFCOUNT_SHIFT);
+    }
+    public static final int getArena(long state) {
+      return (int)((state & ARENA_MASK) >>> ARENA_SHIFT);
+    }
+    public static final int getHeader(long state) {
+      return (int)((state & HEADER_MASK) >>> HEADER_SHIFT);
+    }
+
+    public static final long incRefCount(long state) {
+      // Note: doesn't check for overflow. Could AND with max refcount mask but the caller checks.
+      return state + (1 << REFCOUNT_SHIFT);
+    }
+
+    public static final long decRefCount(long state) {
+      // Note: doesn't check for overflow. Could AND with max refcount mask but the caller checks.
+      return state - (1 << REFCOUNT_SHIFT);
+    }
+
+    public static final long setLocation(long state, int arenaIx, int headerIx) {
+      long arenaVal = ((long)arenaIx) << ARENA_SHIFT, arenaWMask = arenaVal & ARENA_MASK;
+      long headerVal = ((long)headerIx) << HEADER_SHIFT, headerWMask = headerVal & HEADER_MASK;
+      assert arenaVal == arenaWMask : "Arena " + arenaIx + " is wider than " + ARENA_WIDTH;
+      assert headerVal == headerWMask : "Header " + headerIx + " is wider than " + HEADER_WIDTH;
+      return (state & ~(ARENA_MASK | HEADER_MASK)) | arenaWMask | headerWMask;
+    }
+
+    public static final long setFlag(long state, int flags) {
+      assert flags <= FLAGS_MASK;
+      return state | flags;
+    }
+
+    public static final long switchFlag(long state, int flags) {
+      assert flags <= FLAGS_MASK;
+      return state ^ flags;
+    }
+
+    public static String toFlagString(int state) {
+      return StringUtils.leftPad(Integer.toBinaryString(state), REFCOUNT_SHIFT, '0');
+    }
+  }
+
+  @VisibleForTesting
+  int getArenaIndex() {
+    return State.getArena(state.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
index 5c0b6f3..b53d4df 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hive.llap.cache;
 
 /**
  * Buffer that can be managed by LowLevelEvictionPolicy.
+ * We want to have cacheable and non-allocator buffers, as well as allocator buffers with no
+ * cache dependency, and also ones that are both. Alas, we could only achieve this if we were
+ * using a real programming language.
  */
 public abstract class LlapCacheableBuffer {
   protected static final int IN_LIST = -2, NOT_IN_CACHE = -1;
@@ -38,7 +41,8 @@ public abstract class LlapCacheableBuffer {
   /** Index in heap for LRFU/LFU cache policies. */
   public int indexInHeap = NOT_IN_CACHE;
 
-  protected abstract boolean invalidate();
+  public static final int INVALIDATE_OK = 0, INVALIDATE_FAILED = 1, INVALIDATE_ALREADY_INVALID = 2;
+  protected abstract int invalidate();
   public abstract long getMemoryUsage();
   public abstract void notifyEvicted(EvictionDispatcher evictionDispatcher);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index 7d5c101..0c232c8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -18,132 +18,15 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public final class LlapDataBuffer extends LlapCacheableBuffer implements MemoryBuffer {
-
-  // For now, we don't track refcount for metadata blocks, don't clear them, don't reuse them and
-  // basically rely on GC to remove them. So, refcount only applies to data blocks. If that
-  // changes, refcount management should be move to LlapCacheableBuffer to be shared.
-  private static final int EVICTED_REFCOUNT = -1;
+public final class LlapDataBuffer extends LlapAllocatorBuffer {
   public static final int UNKNOWN_CACHED_LENGTH = -1;
-  protected final AtomicInteger refCount = new AtomicInteger(0);
 
-  public ByteBuffer byteBuffer;
-  /** Allocator uses this to remember which arena to alloc from. */
-  public int arenaIndex = -1;
-  /** Allocator uses this to remember the allocation size. */
-  public int allocSize;
   /** ORC cache uses this to store compressed length; buffer is cached uncompressed, but
    * the lookup is on compressed ranges, so we need to know this. */
   public int declaredCachedLength = UNKNOWN_CACHED_LENGTH;
 
-  public void initialize(
-      int arenaIndex, ByteBuffer byteBuffer, int offset, int length) {
-    this.byteBuffer = byteBuffer.slice();
-    this.byteBuffer.position(offset);
-    this.byteBuffer.limit(offset + length);
-    this.arenaIndex = arenaIndex;
-    this.allocSize = length;
-  }
-
-  @Override
-  public ByteBuffer getByteBufferDup() {
-    return byteBuffer.duplicate();
-  }
-
-  @Override
-  public ByteBuffer getByteBufferRaw() {
-    return byteBuffer;
-  }
-
-  @Override
-  public long getMemoryUsage() {
-    return allocSize;
-  }
-
   @Override
   public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
     evictionDispatcher.notifyEvicted(this);
   }
-
-  int incRef() {
-    int newRefCount = -1;
-    while (true) {
-      int oldRefCount = refCount.get();
-      if (oldRefCount == EVICTED_REFCOUNT) return -1;
-      assert oldRefCount >= 0 : "oldRefCount is " + oldRefCount + " " + this;
-      newRefCount = oldRefCount + 1;
-      if (refCount.compareAndSet(oldRefCount, newRefCount)) break;
-    }
-    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
-      LlapIoImpl.LOCKING_LOGGER.trace("Locked {}; new ref count {}", this, newRefCount);
-    }
-    return newRefCount;
-  }
-
-  @VisibleForTesting
-  int getRefCount() {
-    return refCount.get();
-  }
-
-  @VisibleForTesting
-  @Override
-  public boolean isLocked() {
-    // Best-effort check. We cannot do a good check against caller thread, since
-    // refCount could still be > 0 if someone else locked. This is used for asserts and logs.
-    return refCount.get() > 0;
-  }
-
-  @VisibleForTesting
-  public boolean isInvalid() {
-    return refCount.get() == EVICTED_REFCOUNT;
-  }
-
-  int decRef() {
-    int newRefCount = refCount.decrementAndGet();
-    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
-      LlapIoImpl.LOCKING_LOGGER.trace("Unlocked {}; refcount {}", this, newRefCount);
-    }
-    if (newRefCount < 0) {
-      throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
-    }
-    return newRefCount;
-  }
-
-  /**
-   * @return Whether the we can invalidate; false if locked or already evicted.
-   */
-  @Override
-  public boolean invalidate() {
-    while (true) {
-      int value = refCount.get();
-      if (value != 0) return false;
-      if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break;
-    }
-    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
-      LlapIoImpl.LOCKING_LOGGER.trace("Invalidated {} due to eviction", this);
-    }
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    int refCount = this.refCount.get();
-    return "0x" + Integer.toHexString(System.identityHashCode(this)) + "(" + refCount + ")";
-  }
-
-  public static String toDataString(MemoryBuffer s) {
-    if (s == null || s.getByteBufferRaw().remaining() == 0) return "" + s;
-    byte b = s.getByteBufferRaw().get(s.getByteBufferRaw().position());
-    int i = (b < 0) ? -b : b;
-    return s + " (0x" + Integer.toHexString(i) + ")";
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 7673b5a..cc96d17 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -390,7 +390,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
   private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
   public static LlapDataBuffer allocateFake() {
     LlapDataBuffer fake = new LlapDataBuffer();
-    fake.initialize(-1, fakeBuf, 0, 1);
+    fake.initialize(fakeBuf, 0, 1);
     return fake;
   }
 
@@ -420,7 +420,6 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         throws InterruptedException {
       // Iterate thru the file cache. This is best-effort.
       Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.getCache().entrySet().iterator();
-      boolean isEmpty = true;
       while (subIter.hasNext()) {
         long time = -1;
         isPastEndTime.value = isPastEndTime.value || ((time = System.nanoTime()) >= endTime);
@@ -428,8 +427,6 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
             ? 1 : (endTime - time) / (1000000L * leftToCheck));
         if (subIter.next().getValue().isInvalid()) {
           subIter.remove();
-        } else {
-          isEmpty = false;
         }
         --leftToCheck;
       }
@@ -478,17 +475,21 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
   @Override
   public void debugDumpShort(StringBuilder sb) {
     sb.append("\nORC cache state ");
-    int allLocked = 0, allUnlocked = 0, allEvicted = 0;
+    int allLocked = 0, allUnlocked = 0, allEvicted = 0, allMoving = 0;
     for (Map.Entry<Object, FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> e :
       cache.entrySet()) {
       if (!e.getValue().incRef()) continue;
       try {
-        int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0;
+        int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0, fileMoving = 0;
         if (e.getValue().getCache().isEmpty()) continue;
         for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) {
-          int newRc = e2.getValue().incRef();
+          int newRc = e2.getValue().tryIncRef();
           if (newRc < 0) {
-            ++fileEvicted;
+            if (newRc == LlapAllocatorBuffer.INCREF_EVICTED) {
+              ++fileEvicted;
+            } else if (newRc == LlapAllocatorBuffer.INCREF_FAILED) {
+              ++fileMoving;
+            }
             continue;
           }
           try {
@@ -504,13 +505,14 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         allLocked += fileLocked;
         allUnlocked += fileUnlocked;
         allEvicted += fileEvicted;
-        sb.append("\n  file " + e.getKey() + ": " + fileLocked + " locked, "
-            + fileUnlocked + " unlocked, " + fileEvicted + " evicted");
+        allMoving += fileMoving;
+        sb.append("\n  file " + e.getKey() + ": " + fileLocked + " locked, " + fileUnlocked
+            + " unlocked, " + fileEvicted + " evicted, " + fileMoving + " being moved");
       } finally {
         e.getValue().decRef();
       }
     }
-    sb.append("\nORC cache summary: " + allLocked + " locked, "
-        + allUnlocked + " unlocked, " + allEvicted + " evicted");
+    sb.append("\nORC cache summary: " + allLocked + " locked, " + allUnlocked + " unlocked, "
+        + allEvicted + " evicted, " + allMoving + " being moved");
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index e30acb0..2c1086b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -99,17 +99,15 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
           result = false;
           break; // Test code path where we don't do more than one attempt.
         }
-        didDumpIoState = logEvictionIssue(++badCallCount, didDumpIoState);
-        waitTimeMs = Math.min(1000, waitTimeMs << 1);
-        assert waitTimeMs > 0;
-        try {
-          Thread.sleep(waitTimeMs);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
+
+        if (isStopped != null && isStopped.get()) {
           result = false;
           break;
         }
-        if (isStopped != null && isStopped.get()) {
+        try {
+          Thread.sleep(badCallCount > 9 ? 1000 : (1 << badCallCount));
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
           result = false;
           break;
         }
@@ -139,46 +137,6 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
     return result;
   }
 
-
-  private boolean logEvictionIssue(int badCallCount, boolean didDumpIoState) {
-    if (badCallCount <= LOCKING_DEBUG_DUMP_THRESHOLD) return didDumpIoState;
-    String ioStateDump = maybeDumpIoState(didDumpIoState);
-    if (ioStateDump == null) {
-      LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?");
-      return didDumpIoState;
-    } else {
-      LlapIoImpl.LOG.warn("Cannot evict blocks; IO state:\n " + ioStateDump);
-      return true;
-    }
-  }
-
-  private String maybeDumpIoState(boolean didDumpIoState) {
-    if (didDumpIoState) return null; // No more than once per reader.
-    long now = System.nanoTime(), last = lastCacheDumpNs.get();
-    while (true) {
-      if (last != 0 && (now - last) < LOCKING_DEBUG_DUMP_PERIOD_NS) {
-        return null; // We have recently dumped IO state into log.
-      }
-      if (lastCacheDumpNs.compareAndSet(last, now)) break;
-      now = System.nanoTime();
-      last = lastCacheDumpNs.get();
-    }
-    try {
-      StringBuilder sb = new StringBuilder();
-      memoryDumpRoot.debugDumpShort(sb);
-      return sb.toString();
-    } catch (Throwable t) {
-      return "Failed to dump cache state: " + t.getClass() + " " + t.getMessage();
-    }
-  }
-
-
-  @Override
-  public long forceReservedMemory(int allocationSize, int count) {
-    if (evictor == null) return 0;
-    return evictor.tryEvictContiguousData(allocationSize, count);
-  }
-
   @Override
   public void releaseMemory(final long memoryToRelease) {
     long oldV;

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
index fd9d942..acbaf85 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
@@ -27,6 +27,4 @@ public interface LowLevelCachePolicy extends LlapOomDebugDump {
   long evictSomeBlocks(long memoryToReserve);
   void setEvictionListener(EvictionListener listener);
   void setParentDebugDumper(LlapOomDebugDump dumper);
-  /** TODO: temporary method until we have a better allocator */
-  long tryEvictContiguousData(int allocationSize, int count);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
index 761fd00..e777dc7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
@@ -24,7 +24,6 @@ import java.util.LinkedList;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
@@ -83,8 +82,9 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
       while (evicted < memoryToReserve && iter.hasNext()) {
         LlapCacheableBuffer buffer = iter.next();
         long memUsage = buffer.getMemoryUsage();
-        if (memUsage < minSize || (minSize > 0  && !(buffer instanceof LlapDataBuffer))) continue;
-        if (buffer.invalidate()) {
+        if (memUsage < minSize || (minSize > 0
+            && !(buffer instanceof LlapAllocatorBuffer))) continue;
+        if (LlapCacheableBuffer.INVALIDATE_OK == buffer.invalidate()) {
           iter.remove();
           evicted += memUsage;
           evictionListener.notifyEvicted(buffer);
@@ -128,14 +128,4 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
       parentDebugDump.debugDumpShort(sb);
     }
   }
-
-  @Override
-  public long tryEvictContiguousData(int allocationSize, int count) {
-    long evicted = evictInternal(allocationSize * count, allocationSize);
-    int remainingCount = count - (int)(evicted / allocationSize);
-    if (remainingCount > 0) {
-      evicted += evictInternal(allocationSize * remainingCount, -1);
-    }
-    return evicted;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 3973c8a..158347a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -18,17 +18,11 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 
@@ -203,17 +197,6 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     return evicted;
   }
 
-  @Override
-  public long tryEvictContiguousData(int allocationSize, int count) {
-    int evicted = evictDataFromList(allocationSize, count);
-    if (count <= evicted) return evicted * allocationSize;
-    evicted += evictDataFromHeap(timer.get(), count - evicted, allocationSize);
-    long evictedBytes = evicted * allocationSize;
-    if (count <= evicted) return evictedBytes;
-    evictedBytes += evictSomeBlocks(allocationSize * (count - evicted));
-    return evictedBytes;
-  }
-
   private long evictFromList(long memoryToReserve) {
     long evicted = 0;
     LlapCacheableBuffer nextCandidate = null, firstCandidate = null;
@@ -224,8 +207,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     try {
       nextCandidate = firstCandidate = listTail;
       while (evicted < memoryToReserve && nextCandidate != null) {
-        if (!nextCandidate.invalidate()) {
-          // Locked buffer was in the list - just drop it; will be re-added on unlock.
+        if (LlapCacheableBuffer.INVALIDATE_OK != nextCandidate.invalidate()) {
+          // Locked, or invalidated, buffer was in the list - just drop it;
+          // will be re-added on unlock.
           LlapCacheableBuffer lockedBuffer = nextCandidate;
           if (firstCandidate == nextCandidate) {
             firstCandidate = nextCandidate.prev;
@@ -258,42 +242,6 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     return evicted;
   }
 
-
-  private int evictDataFromList(int minSize, int count) {
-    int evictedCount = 0;
-    // Unlike the normal evictFromList, we don't necessarily evict a sequence of blocks. We won't
-    // bother with optimization here and will just evict blocks one by one.
-    List<LlapCacheableBuffer> evictedBuffers = new ArrayList<>(count);
-    listLock.lock();
-    try {
-      LlapCacheableBuffer candidate = listTail;
-      while (evictedCount < count && candidate != null) {
-        LlapCacheableBuffer current = candidate;
-        candidate = candidate.prev;
-        long memUsage = current.getMemoryUsage();
-        if (memUsage < minSize || !(current instanceof LlapDataBuffer)) continue;
-        if (!current.invalidate()) {
-          // Locked buffer was in the list - just drop it; will be re-added on unlock.
-          removeFromListUnderLock(current);
-          continue;
-        }
-        // Remove the buffer from the list.
-        removeFromListUnderLock(current);
-        // This makes granularity assumptions.
-        assert memUsage % minSize == 0;
-        evictedCount += (memUsage / minSize);
-        evictedBuffers.add(current);
-      }
-    } finally {
-      listLock.unlock();
-    }
-    for (LlapCacheableBuffer buffer : evictedBuffers) {
-      evictionListener.notifyEvicted(buffer);
-    }
-    return evictedCount;
-  }
-
-
   // Note: rarely called (unless buffers are very large or we evict a lot, or in LFU case).
   private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
     while (true) {
@@ -303,38 +251,6 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     }
   }
 
-  // Note: almost never called (unless buffers are very large or we evict a lot, or LFU).
-  private int evictDataFromHeap(long time, int count, int minSize) {
-    LlapCacheableBuffer evicted = null;
-    int evictedCount = 0;
-    synchronized (heap) {
-      // Priorities go out of the window here.
-      int index = 0;
-      while (index < heapSize && evictedCount < count) {
-        LlapCacheableBuffer buffer = heap[index];
-        long memUsage = buffer.getMemoryUsage();
-        if (memUsage < minSize || !(buffer instanceof LlapDataBuffer)) {
-          ++index;
-          continue;
-        }
-        LlapCacheableBuffer result = evictHeapElementUnderLock(time, index);
-        // Don't advance the index - the buffer has been removed either way.
-        if (result != null) {
-          assert memUsage % minSize == 0;
-          evictedCount += (memUsage / minSize);
-          if (evicted != null) {
-            evictionListener.notifyEvicted(evicted);
-          }
-          evicted = result;
-        }
-      }
-    }
-    if (evicted != null) {
-      evictionListener.notifyEvicted(evicted);
-    }
-    return evictedCount;
-  }
-  
   private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) {
     // See heapifyDown comment.
     int ix = buffer.indexInHeap;
@@ -360,7 +276,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     }
     result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
     --heapSize;
-    boolean canEvict = result.invalidate();
+    int invalidateResult = result.invalidate();
+    boolean canEvict = invalidateResult == LlapCacheableBuffer.INVALIDATE_OK;
     if (heapSize > 0) {
       LlapCacheableBuffer newRoot = heap[heapSize];
       newRoot.indexInHeap = ix;
@@ -370,7 +287,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
       }
       heapifyDownUnderLock(newRoot, time);
     }
-    // Otherwise we just removed a locked item from heap; unlock will re-add it, we continue.
+    // Otherwise we just removed a locked/invalid item from heap; we continue.
     return canEvict ? result : null;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index e1133cd..ff1f269 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -23,7 +23,5 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public interface MemoryManager extends LlapOomDebugDump {
   void releaseMemory(long memUsage);
   void updateMaxSize(long maxSize);
-  /** TODO: temporary method until we get a better allocator. */
-  long forceReservedMemory(int allocationSize, int count);
   void reserveMemory(long memoryToReserve, AtomicBoolean isStopped);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index cd5bc9b..7751845 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -18,11 +18,11 @@
 package org.apache.hadoop.hive.llap.cache;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -43,7 +43,7 @@ import org.apache.orc.OrcProto.ColumnEncoding;
 
 import com.google.common.base.Function;
 
-public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
+public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugDump {
   private static final int DEFAULT_CLEANUP_INTERVAL = 600;
   private final Allocator allocator;
   private final AtomicInteger newEvictions = new AtomicInteger(0);
@@ -53,6 +53,14 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
   private final long cleanupInterval;
   private final LlapDaemonCacheMetrics metrics;
 
+  public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer {
+    public boolean isCached = false;
+    @Override
+    public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
+      evictionDispatcher.notifyEvicted(this);
+    }
+  }
+
   private static final class StripeInfoComparator implements
       Comparator<StripeData> {
     @Override
@@ -129,7 +137,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
 
     private final long rowCount;
     private final OrcProto.ColumnEncoding[] encodings;
-    private LlapDataBuffer[][][] data; // column index, stream type, buffers
+    private LlapSerDeDataBuffer[][][] data; // column index, stream type, buffers
 
     public StripeData(long knownTornStart, long firstStart, long lastStart, long lastEnd,
         long rowCount, ColumnEncoding[] encodings) {
@@ -139,7 +147,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
       this.lastEnd = lastEnd;
       this.encodings = encodings;
       this.rowCount = rowCount;
-      this.data = encodings == null ? null : new LlapDataBuffer[encodings.length][][];
+      this.data = encodings == null ? null : new LlapSerDeDataBuffer[encodings.length][][];
     }
  
     @Override
@@ -172,7 +180,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
       return encodings;
     }
 
-    public LlapDataBuffer[][][] getData() {
+    public LlapSerDeDataBuffer[][][] getData() {
       return data;
     }
 
@@ -191,18 +199,18 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
     }
   }
 
-  public static String toString(LlapDataBuffer[][][] data) {
+  public static String toString(LlapSerDeDataBuffer[][][] data) {
     if (data == null) return "null";
     StringBuilder sb = new StringBuilder("[");
     for (int i = 0; i < data.length; ++i) {
-      LlapDataBuffer[][] colData = data[i];
+      LlapSerDeDataBuffer[][] colData = data[i];
       if (colData == null) {
         sb.append("null, ");
         continue;
       }
       sb.append("colData [");
       for (int j = 0; j < colData.length; ++j) {
-        LlapDataBuffer[] streamData = colData[j];
+        LlapSerDeDataBuffer[] streamData = colData[j];
         if (streamData == null) {
           sb.append("null, ");
           continue;
@@ -220,19 +228,18 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
   }
   
 
-  public static String toString(LlapDataBuffer[][] data) {
+  public static String toString(LlapSerDeDataBuffer[][] data) {
     if (data == null) return "null";
     StringBuilder sb = new StringBuilder("[");
     for (int j = 0; j < data.length; ++j) {
-      LlapDataBuffer[] streamData = data[j];
+      LlapSerDeDataBuffer[] streamData = data[j];
       if (streamData == null) {
         sb.append("null, ");
         continue;
       }
       sb.append("[");
       for (int k = 0; k < streamData.length; ++k) {
-        LlapDataBuffer s = streamData[k];
-        sb.append(LlapDataBuffer.toDataString(s));
+        sb.append(streamData[k]);
       }
       sb.append("], ");
     }
@@ -405,11 +412,11 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
         continue;
       }
       stripe.encodings[colIx] = cStripe.encodings[colIx];
-      LlapDataBuffer[][] cColData = cStripe.data[colIx];
+      LlapSerDeDataBuffer[][] cColData = cStripe.data[colIx];
       assert cColData != null;
       for (int streamIx = 0;
           cColData != null && streamIx < cColData.length; ++streamIx) {
-        LlapDataBuffer[] streamData = cColData[streamIx];
+        LlapSerDeDataBuffer[] streamData = cColData[streamIx];
         // Note: this relies on the fact that we always evict the entire column, so if
         //       we have the column data, we assume we have all the streams we need.
         if (streamData == null) continue;
@@ -471,7 +478,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
     }
   }
 
-  private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) {
+  private boolean lockBuffer(LlapSerDeDataBuffer buffer, boolean doNotifyPolicy) {
     int rc = buffer.incRef();
     if (rc > 0) {
       metrics.incrCacheNumLockedBuffers();
@@ -561,28 +568,28 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
 
   private void lockAllBuffersForPut(StripeData si, Priority priority) {
     for (int i = 0; i < si.data.length; ++i) {
-      LlapDataBuffer[][] colData = si.data[i];
+      LlapSerDeDataBuffer[][] colData = si.data[i];
       if (colData == null) continue;
       for (int j = 0; j < colData.length; ++j) {
-        LlapDataBuffer[] streamData = colData[j];
+        LlapSerDeDataBuffer[] streamData = colData[j];
         if (streamData == null) continue;
         for (int k = 0; k < streamData.length; ++k) {
           boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet
           assert canLock;
           cachePolicy.cache(streamData[k], priority);
-          streamData[k].declaredCachedLength = streamData[k].getByteBufferRaw().remaining();
+          streamData[k].isCached = true;
         }
       }
     }
   }
 
   private void handleRemovedStripeInfo(StripeData removed) {
-    for (LlapDataBuffer[][] colData : removed.data) {
+    for (LlapSerDeDataBuffer[][] colData : removed.data) {
       handleRemovedColumnData(colData);
     }
   }
 
-  private void handleRemovedColumnData(LlapDataBuffer[][] removed) {
+  private void handleRemovedColumnData(LlapSerDeDataBuffer[][] removed) {
     // TODO: could we tell the policy that we don't care about these and have them evicted? or we
     //       could just deallocate them when unlocked, and free memory + handle that in eviction.
     //       For now, just abandon the blocks - eventually, they'll get evicted.
@@ -603,7 +610,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
           && !to.encodings[colIx].equals(from.encodings[colIx])) {
         throw new RuntimeException("Different encodings at " + colIx + ": " + from + "; " + to);
       }
-      LlapDataBuffer[][] fromColData = from.data[colIx];
+      LlapSerDeDataBuffer[][] fromColData = from.data[colIx];
       if (fromColData != null) {
         if (to.data[colIx] != null) {
           // Note: we assume here that the data that was returned to the caller from cache will not
@@ -616,12 +623,22 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
     } 
   }
 
-  private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) {
+  @Override
+  public void decRefBuffer(MemoryBuffer buffer) {
+    unlockBuffer((LlapSerDeDataBuffer)buffer, true);
+  }
+
+  @Override
+  public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+    for (MemoryBuffer b : cacheBuffers) {
+      unlockBuffer((LlapSerDeDataBuffer)b, true);
+    }
+  }
+
+  private void unlockBuffer(LlapSerDeDataBuffer buffer, boolean handleLastDecRef) {
     boolean isLastDecref = (buffer.decRef() == 0);
     if (handleLastDecRef && isLastDecref) {
-      // This is kind of not pretty, but this is how we detect whether buffer was cached.
-      // We would always set this for lookups at put time.
-      if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
+      if (buffer.isCached) {
         cachePolicy.notifyUnlock(buffer);
       } else {
         if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
@@ -633,13 +650,6 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
     metrics.decrCacheNumLockedBuffers();
   }
 
-  private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
-  public static LlapDataBuffer allocateFake() {
-    LlapDataBuffer fake = new LlapDataBuffer();
-    fake.initialize(-1, fakeBuf, 0, 1);
-    return fake;
-  }
-
   public final void notifyEvicted(MemoryBuffer buffer) {
     newEvictions.incrementAndGet();
   }
@@ -664,14 +674,14 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
       try {
         for (StripeData sd : fd.stripes) {
           for (int colIx = 0; colIx < sd.data.length; ++colIx) {
-            LlapDataBuffer[][] colData = sd.data[colIx];
+            LlapSerDeDataBuffer[][] colData = sd.data[colIx];
             if (colData == null) continue;
             boolean hasAllData = true;
             for (int j = 0; (j < colData.length) && hasAllData; ++j) {
-              LlapDataBuffer[] streamData = colData[j];
+              LlapSerDeDataBuffer[] streamData = colData[j];
               if (streamData == null) continue;
               for (int k = 0; k < streamData.length; ++k) {
-                LlapDataBuffer buf = streamData[k];
+                LlapSerDeDataBuffer buf = streamData[k];
                 hasAllData = hasAllData && lockBuffer(buf, false);
                 if (!hasAllData) break;
                 unlockBuffer(buf, true);
@@ -691,6 +701,18 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
   }
 
   @Override
+  public boolean incRefBuffer(MemoryBuffer buffer) {
+    // notifyReused implies that buffer is already locked; it's also called once for new
+    // buffers that are not cached yet. Don't notify cache policy.
+    return lockBuffer(((LlapSerDeDataBuffer)buffer), false);
+  }
+
+  @Override
+  public Allocator getAllocator() {
+    return allocator;
+  }
+
+  @Override
   public String debugDumpForOom() {
     StringBuilder sb = new StringBuilder("File cache state ");
     for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) {
@@ -711,25 +733,29 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
   @Override
   public void debugDumpShort(StringBuilder sb) {
     sb.append("\nSerDe cache state ");
-    int allLocked = 0, allUnlocked = 0, allEvicted = 0;
+    int allLocked = 0, allUnlocked = 0, allEvicted = 0, allMoving = 0;
     for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) {
       if (!e.getValue().incRef()) continue;
       try {
         FileData fd = e.getValue().getCache();
-        int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0;
+        int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0, fileMoving = 0;
         sb.append(fd.colCount).append(" columns, ").append(fd.stripes.size()).append(" stripes; ");
         for (StripeData stripe : fd.stripes) {
           if (stripe.data == null) continue;
           for (int i = 0; i < stripe.data.length; ++i) {
-            LlapDataBuffer[][] colData = stripe.data[i];
+            LlapSerDeDataBuffer[][] colData = stripe.data[i];
             if (colData == null) continue;
             for (int j = 0; j < colData.length; ++j) {
-              LlapDataBuffer[] streamData = colData[j];
+              LlapSerDeDataBuffer[] streamData = colData[j];
               if (streamData == null) continue;
               for (int k = 0; k < streamData.length; ++k) {
                 int newRc = streamData[k].incRef();
                 if (newRc < 0) {
-                  ++fileEvicted;
+                  if (newRc == LlapAllocatorBuffer.INCREF_EVICTED) {
+                    ++fileEvicted;
+                  } else if (newRc == LlapAllocatorBuffer.INCREF_FAILED) {
+                    ++fileMoving;
+                  }
                   continue;
                 }
                 try {
@@ -748,13 +774,14 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
         allLocked += fileLocked;
         allUnlocked += fileUnlocked;
         allEvicted += fileEvicted;
-        sb.append("\n  file " + e.getKey() + ": " + fileLocked + " locked, "
-            + fileUnlocked + " unlocked, " + fileEvicted + " evicted");
+        allMoving += fileMoving;
+        sb.append("\n  file " + e.getKey() + ": " + fileLocked + " locked, " + fileUnlocked
+            + " unlocked, " + fileEvicted + " evicted, " + fileMoving + " being moved");
       } finally {
         e.getValue().decRef();
       }
     }
-    sb.append("\nSerDe cache summary: " + allLocked + " locked, "
-        + allUnlocked + " unlocked, " + allEvicted + " evicted");
+    sb.append("\nSerDe cache summary: " + allLocked + " locked, " + allUnlocked + " unlocked, "
+        + allEvicted + " evicted, " + allMoving + " being moved");
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
index 51eb34e..f5a5f53 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -50,23 +50,32 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
     }
   }
 
+
   @Override
+  @Deprecated
   public void allocateMultiple(MemoryBuffer[] dest, int size) {
+    allocateMultiple(dest, size, null);
+  }
+
+  @Override
+  public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory) {
     for (int i = 0; i < dest.length; ++i) {
-      LlapDataBuffer buf = null;
+      LlapAllocatorBuffer buf = null;
       if (dest[i] == null) {
-        dest[i] = buf = createUnallocated();
+      // Note: this is backward compat only. Should be removed with createUnallocated.
+        dest[i] = buf = (factory != null)
+            ? (LlapAllocatorBuffer)factory.create() : createUnallocated();
       } else {
-        buf = (LlapDataBuffer)dest[i];
+        buf = (LlapAllocatorBuffer)dest[i];
       }
       ByteBuffer bb = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
-      buf.initialize(0, bb, 0, size);
+      buf.initialize(bb, 0, size);
     }
   }
 
   @Override
   public void deallocate(MemoryBuffer buffer) {
-    LlapDataBuffer buf = (LlapDataBuffer)buffer;
+    LlapAllocatorBuffer buf = (LlapAllocatorBuffer)buffer;
     ByteBuffer bb = buf.byteBuffer;
     buf.byteBuffer = null;
     if (!bb.isDirect()) return;
@@ -86,7 +95,8 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
   }
 
   @Override
-  public LlapDataBuffer createUnallocated() {
+  @Deprecated
+  public LlapAllocatorBuffer createUnallocated() {
     return new LlapDataBuffer();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
index af7cf3d..076c80f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -38,14 +38,14 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
     this.metrics = metrics;
   }
 
-  private boolean lockBuffer(LlapDataBuffer buffer) {
+  private boolean lockBuffer(LlapAllocatorBuffer buffer) {
     int rc = buffer.incRef();
     if (rc <= 0) return false;
     metrics.incrCacheNumLockedBuffers();
     return true;
   }
 
-  private void unlockBuffer(LlapDataBuffer buffer) {
+  private void unlockBuffer(LlapAllocatorBuffer buffer) {
     if (buffer.decRef() == 0) {
       if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
         LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer);
@@ -57,19 +57,19 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
 
   @Override
   public void decRefBuffer(MemoryBuffer buffer) {
-    unlockBuffer((LlapDataBuffer)buffer);
+    unlockBuffer((LlapAllocatorBuffer)buffer);
   }
 
   @Override
   public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
     for (MemoryBuffer b : cacheBuffers) {
-      unlockBuffer((LlapDataBuffer)b);
+      unlockBuffer((LlapAllocatorBuffer)b);
     }
   }
 
   @Override
   public boolean incRefBuffer(MemoryBuffer buffer) {
-    return lockBuffer((LlapDataBuffer)buffer);
+    return lockBuffer((LlapAllocatorBuffer)buffer);
   }
 
   @Override
@@ -88,7 +88,7 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
       MemoryBuffer[] chunks, long baseOffset, Priority priority,
       LowLevelCacheCounters qfCounters) {
     for (int i = 0; i < chunks.length; ++i) {
-      LlapDataBuffer buffer = (LlapDataBuffer)chunks[i];
+      LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)chunks[i];
       if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
         LlapIoImpl.LOCKING_LOGGER.trace("Locking {} at put time (no cache)", buffer);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 253532a..d107e67 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -116,7 +116,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     OrcMetadataCache metadataCache = null;
     LowLevelCache cache = null;
     SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed
-    BufferUsageManager bufferManager = null;
+    BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null;
     boolean isEncodeEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED);
     if (useLowLevelCache) {
       // Memory manager uses cache policy to trigger evictions, so create the policy first.
@@ -172,12 +172,13 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       cachePolicy.setParentDebugDumper(e);
 
       cacheImpl.startThreads(); // Start the cache threads.
-      bufferManager = cacheImpl; // Cache also serves as buffer manager.
+      bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager.
+      bufferManagerGeneric = serdeCache;
     } else {
       this.allocator = new SimpleAllocator(conf);
       memoryDumpRoot = null;
       SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
-      bufferManager = sbm;
+      bufferManagerOrc = bufferManagerGeneric = sbm;
       cache = sbm;
     }
     // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
@@ -189,9 +190,9 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     // TODO: this should depends on input format and be in a map, or something.
     FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf);
     this.orcCvp = new OrcColumnVectorProducer(
-        metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics, tracePool);
+        metadataCache, cache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool);
     this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer(
-        serdeCache, bufferManager, conf, cacheMetrics, ioMetrics, tracePool) : null;
+        serdeCache, bufferManagerGeneric, conf, cacheMetrics, ioMetrics, tracePool) : null;
     LOG.info("LLAP IO initialized");
 
     registerMXBeans();

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 655ce83..edd498e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -831,7 +833,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     stripeRgs = new boolean[stripeIxTo - stripeIxFrom][];
   }
 
-  private class DataWrapperForOrc implements DataReader, DataCache {
+  private class DataWrapperForOrc implements DataReader, DataCache, BufferObjectFactory {
     private final DataReader orcDataReader;
 
     private DataWrapperForOrc(DataWrapperForOrc other) {
@@ -951,6 +953,16 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
       return orcDataReader.readStripeFooter(stripe);
     }
+
+    @Override
+    public BufferObjectFactory getDataBufferFactory() {
+      return this;
+    }
+
+    @Override
+    public MemoryBuffer create() {
+      return new LlapDataBuffer();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 35d6178..187fa7d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
 import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
@@ -46,10 +47,12 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
+import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.FileData;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.LlapSerDeDataBuffer;
 import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.StripeData;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -94,7 +97,6 @@ import org.apache.orc.OrcFile.Version;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcProto.ColumnEncoding;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.OutStream;
 import org.apache.orc.PhysicalWriter;
 import org.apache.orc.PhysicalWriter.OutputReceiver;
 import org.apache.orc.impl.SchemaEvolution;
@@ -151,6 +153,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   private final SerDeLowLevelCacheImpl cache;
   private final BufferUsageManager bufferManager;
+  private final BufferObjectFactory bufferFactory;
   private final Configuration daemonConf;
   private final FileSplit split;
   private List<Integer> columnIds;
@@ -191,6 +194,12 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
           throws IOException {
     this.cache = cache;
     this.bufferManager = bufferManager;
+    this.bufferFactory = new BufferObjectFactory() {
+      @Override
+      public MemoryBuffer create() {
+        return new SerDeLowLevelCacheImpl.LlapSerDeDataBuffer();
+      }
+    };
     this.parts = parts;
     this.daemonConf = new Configuration(daemonConf);
     // Disable dictionary encoding for the writer.
@@ -303,7 +312,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       private static String toString(List<MemoryBuffer> data) {
         String s = "";
         for (MemoryBuffer buffer : data) {
-          s += LlapDataBuffer.toDataString(buffer) + ", ";
+          s += buffer + ", ";
         }
         return s;
       }
@@ -331,6 +340,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private CacheStripeData currentStripe;
     private final List<CacheStripeData> stripes = new ArrayList<>();
     private final BufferUsageManager bufferManager;
+    private final Allocator.BufferObjectFactory bufferFactory;
     /**
      * For !doesSourceHaveIncludes case, stores global column IDs to verify writer columns.
      * For doesSourceHaveIncludes case, stores source column IDs used to map things.
@@ -344,13 +354,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private final AtomicBoolean isStopped;
 
     public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds,
-        boolean[] writerIncludes, boolean doesSourceHaveIncludes, AtomicBoolean isStopped) {
+        boolean[] writerIncludes, boolean doesSourceHaveIncludes,
+        Allocator.BufferObjectFactory bufferFactory, AtomicBoolean isStopped) {
       this.bufferManager = bufferManager;
       assert writerIncludes != null; // Taken care of on higher level.
       this.writerIncludes = writerIncludes;
       this.doesSourceHaveIncludes = doesSourceHaveIncludes;
       this.columnIds = columnIds;
       this.isStopped = isStopped;
+      this.bufferFactory = bufferFactory;
       startStripe();
     }
 
@@ -438,7 +450,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
         if (LlapIoImpl.LOG.isTraceEnabled()) {
           LlapIoImpl.LOG.trace("Creating cache receiver for " + name);
         }
-        CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, isStopped);
+        CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, bufferFactory, name, isStopped);
         or = cor;
         List<CacheOutputReceiver> list = colStreams.get(name.getColumn());
         if (list == null) {
@@ -568,6 +580,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   private static final class CacheOutputReceiver implements CacheOutput, OutputReceiver {
     private final BufferUsageManager bufferManager;
+    private final BufferObjectFactory bufferFactory;
     private final StreamName name;
     private List<MemoryBuffer> buffers = null;
     private int lastBufferPos = -1;
@@ -576,8 +589,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     private final StoppableAllocator allocator;
 
     public CacheOutputReceiver(
-        BufferUsageManager bufferManager, StreamName name, AtomicBoolean isStopped) {
+        BufferUsageManager bufferManager,
+        BufferObjectFactory bufferFactory, StreamName name, AtomicBoolean isStopped) {
       this.bufferManager = bufferManager;
+      this.bufferFactory = bufferFactory;
       Allocator alloc = bufferManager.getAllocator();
       this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null;
       this.name = name;
@@ -598,13 +613,12 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
     private void allocateMultiple(MemoryBuffer[] dest, int size) {
       if (allocator != null) {
-        allocator.allocateMultiple(dest, size, isStopped);
+        allocator.allocateMultiple(dest, size, bufferFactory, isStopped);
       } else {
-        bufferManager.getAllocator().allocateMultiple(dest, size);
+        bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory);
       }
     }
 
-
     @Override
     public void output(ByteBuffer buffer) throws IOException {
       // TODO: avoid put() by working directly in OutStream?
@@ -629,7 +643,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       if (isNewBuffer) {
         MemoryBuffer[] dest = new MemoryBuffer[1];
         allocateMultiple(dest, size);
-        LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0];
+        LlapSerDeDataBuffer newBuffer = (LlapSerDeDataBuffer)dest[0];
         bb = newBuffer.getByteBufferRaw();
         lastBufferPos = bb.position();
         buffers.add(newBuffer);
@@ -720,10 +734,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   private void unlockAllBuffers(StripeData si) {
     for (int i = 0; i < si.getData().length; ++i) {
-      LlapDataBuffer[][] colData = si.getData()[i];
+      LlapSerDeDataBuffer[][] colData = si.getData()[i];
       if (colData == null) continue;
       for (int j = 0; j < colData.length; ++j) {
-        LlapDataBuffer[] streamData = colData[j];
+        LlapSerDeDataBuffer[] streamData = colData[j];
         if (streamData == null) continue;
         for (int k = 0; k < streamData.length; ++k) {
           bufferManager.decRefBuffer(streamData[k]);
@@ -760,10 +774,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   private void lockAllBuffers(StripeData sd) {
     for (int i = 0; i < sd.getData().length; ++i) {
-      LlapDataBuffer[][] colData = sd.getData()[i];
+      LlapSerDeDataBuffer[][] colData = sd.getData()[i];
       if (colData == null) continue;
       for (int j = 0; j < colData.length; ++j) {
-        LlapDataBuffer[] streamData = colData[j];
+        LlapSerDeDataBuffer[] streamData = colData[j];
         if (streamData == null) continue;
         for (int k = 0; k < streamData.length; ++k) {
           boolean canLock = bufferManager.incRefBuffer(streamData[k]);
@@ -937,7 +951,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     logProcessOneSlice(stripeIx, diskData, cacheData);
 
     ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
-    LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+    LlapSerDeDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
     long cacheRowCount = cacheData == null ? -1L : cacheData.getRowCount();
     SerDeStripeMetadata metadata = new SerDeStripeMetadata(stripeIx);
     StripeData sliceToCache = null;
@@ -963,7 +977,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       if (!hasAllData && splitIncludes[colIx]) {
         // The column has been read from disk.
         List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
-        LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
+        LlapSerDeDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
         if (streams == null) continue; // Struct column, such as root?
         Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
         while (iter.hasNext()) {
@@ -1026,7 +1040,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       return true; // Nothing to process.
     }
     ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
-    LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+    LlapSerDeDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
     if (cacheData != null) {
       // Don't validate column count - no encodings for vectors.
       validateCacheAndDisk(cacheData, diskData.getRowCount(), -1, diskData);
@@ -1078,7 +1092,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       if (!splitIncludes[colIx]) continue;
       // The column has been read from disk.
       List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
-      LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
+      LlapSerDeDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
       if (streams == null) continue; // Struct column, such as root?
       Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
       while (iter.hasNext()) {
@@ -1132,28 +1146,28 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
 
-  private static LlapDataBuffer[][] createArrayToCache(
+  private static LlapSerDeDataBuffer[][] createArrayToCache(
       StripeData sliceToCache, int colIx, List<CacheWriter.CacheStreamData> streams) {
     if (LlapIoImpl.LOG.isTraceEnabled()) {
       LlapIoImpl.LOG.trace("Processing streams for column " + colIx + ": " + streams);
     }
-    LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
-        = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+    LlapSerDeDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+        = new LlapSerDeDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
     return newCacheDataForCol;
   }
 
   private static int setStreamDataToCache(
-      LlapDataBuffer[][] newCacheDataForCol, CacheWriter.CacheStreamData stream) {
+      LlapSerDeDataBuffer[][] newCacheDataForCol, CacheWriter.CacheStreamData stream) {
     int streamIx = stream.name.getKind().getNumber();
-    // This is kinda hacky - we "know" these are LlapDataBuffer-s.
-    newCacheDataForCol[streamIx] = stream.data.toArray(new LlapDataBuffer[stream.data.size()]);
+    // This is kinda hacky - we "know" these are LlaSerDeDataBuffer-s.
+    newCacheDataForCol[streamIx] = stream.data.toArray(new LlapSerDeDataBuffer[stream.data.size()]);
     return streamIx;
   }
 
-  private void processColumnCacheData(LlapDataBuffer[][][] cacheBuffers,
+  private void processColumnCacheData(LlapSerDeDataBuffer[][][] cacheBuffers,
       OrcEncodedColumnBatch ecb, int colIx) {
     // The column has been obtained from cache.
-    LlapDataBuffer[][] colData = cacheBuffers[colIx];
+    LlapSerDeDataBuffer[][] colData = cacheBuffers[colIx];
     if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
       LlapIoImpl.CACHE_LOGGER.trace("Processing cache data for column " + colIx + ": "
           + SerDeLowLevelCacheImpl.toString(colData));
@@ -1178,8 +1192,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   private void discardUncachedBuffers(List<MemoryBuffer> list) {
     for (MemoryBuffer buffer : list) {
-      boolean isInvalidated = ((LlapDataBuffer)buffer).invalidate();
-      assert isInvalidated;
       bufferManager.getAllocator().deallocate(buffer);
     }
   }
@@ -1404,7 +1416,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       // TODO: move this into ctor? EW would need to create CacheWriter then
       List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds;
       writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
-          writer.isOnlyWritingIncludedColumns(), isStopped), daemonConf, split.getPath());
+          writer.isOnlyWritingIncludedColumns(), bufferFactory, isStopped), daemonConf, split.getPath());
       if (writer instanceof VectorDeserializeOrcWriter) {
         VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer;
         asyncWriter.startAsync(new AsyncCacheDataCallback());

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
index c9df7d9..981e52f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
@@ -228,6 +228,9 @@ class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
       WriteOperation op = null;
       int fallbackMs = 8;
       while (true) {
+        // The reason we poll here is that a blocking queue causes the query thread to spend
+        // non-trivial amount of time signaling when an element is added; we'd rather that the
+        // time was wasted on this background thread.
         op = queue.poll();
         if (op != null) break;
         if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins.

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index 20af0d0..dc053ee 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -104,8 +104,8 @@ public class OrcFileEstimateErrors extends LlapCacheableBuffer {
   }
 
   @Override
-  protected boolean invalidate() {
-    return true;
+  protected int invalidate() {
+    return INVALIDATE_OK;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index 2c7a234..b9d7a77 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -131,8 +131,8 @@ public final class OrcFileMetadata extends LlapCacheableBuffer
   }
 
   @Override
-  protected boolean invalidate() {
-    return true; // relies on GC, so it can always be evicted now.
+  protected int invalidate() {
+    return INVALIDATE_OK; // relies on GC, so it can always be evicted now.
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 1f3f7ea..17266bc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -168,8 +168,8 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt
   }
 
   @Override
-  protected boolean invalidate() {
-    return true;
+  protected int invalidate() {
+    return INVALIDATE_OK;
   }
 
   @Override