You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/08/24 18:32:05 UTC
[1/3] hive git commit: HIVE-16233 : llap: Query failed with
AllocatorOutOfMemoryException (Sergey Shelukhin,
reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)
Repository: hive
Updated Branches:
refs/heads/branch-2 a4b913360 -> bd32deb44
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 390b34b..364897c 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -57,7 +57,7 @@ public class TestBuddyAllocator {
isMapped = mmap;
}
- private static class DummyMemoryManager implements MemoryManager {
+ static class DummyMemoryManager implements MemoryManager {
@Override
public void reserveMemory(long memoryToReserve, AtomicBoolean isStopped) {
}
@@ -76,11 +76,6 @@ public class TestBuddyAllocator {
}
@Override
- public long forceReservedMemory(int allocationSize, int count) {
- return allocationSize * count;
- }
-
- @Override
public void debugDumpShort(StringBuilder sb) {
}
}
@@ -99,8 +94,9 @@ public class TestBuddyAllocator {
@Test
public void testSameSizes() throws Exception {
int min = 3, max = 8, maxAlloc = 1 << max;
- BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc, maxAlloc,
- tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+ BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc,
+ maxAlloc, 0, tmpDir, new DummyMemoryManager(),
+ LlapDaemonCacheMetrics.create("test", "1"), null);
for (int i = max; i >= min; --i) {
allocSameSize(a, 1 << (max - i), i);
}
@@ -109,16 +105,18 @@ public class TestBuddyAllocator {
@Test
public void testMultipleArenas() throws Exception {
int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5;
- BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount,
- tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+ BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << 3, maxAlloc, maxAlloc,
+ maxAlloc * arenaCount, 0, tmpDir, new DummyMemoryManager(),
+ LlapDaemonCacheMetrics.create("test", "1"), null);
allocSameSize(a, arenaCount * 2, allocLog2);
}
@Test
public void testMTT() {
final int min = 3, max = 8, maxAlloc = 1 << max, allocsPerSize = 3;
- final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc * 8,
- maxAlloc * 24, tmpDir, new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"));
+ final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc,
+ maxAlloc * 8, maxAlloc * 24, 0, tmpDir, new DummyMemoryManager(),
+ LlapDaemonCacheMetrics.create("test", "1"), null);
ExecutorService executor = Executors.newFixedThreadPool(3);
final CountDownLatch cdlIn = new CountDownLatch(3), cdlOut = new CountDownLatch(1);
FutureTask<Void> upTask = new FutureTask<Void>(new Callable<Void>() {
@@ -162,8 +160,8 @@ public class TestBuddyAllocator {
public void testMTTArenas() {
final int min = 3, max = 4, maxAlloc = 1 << max, minAllocCount = 2048, threadCount = 4;
final BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc,
- (1 << min) * minAllocCount, tmpDir, new DummyMemoryManager(),
- LlapDaemonCacheMetrics.create("test", "1"));
+ (1 << min) * minAllocCount, 0, tmpDir, new DummyMemoryManager(),
+ LlapDaemonCacheMetrics.create("test", "1"), null);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
final CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1);
Callable<Void> testCallable = new Callable<Void>() {
@@ -189,7 +187,8 @@ public class TestBuddyAllocator {
throw new RuntimeException(t);
}
}
- private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+
+ static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
cdlIn.countDown();
try {
cdlOut.await();
@@ -202,8 +201,8 @@ public class TestBuddyAllocator {
int allocCount, int arenaSizeMult, int arenaCount) throws Exception {
int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult;
BuddyAllocator a = new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, arenaSize,
- arenaSize * arenaCount, tmpDir , new DummyMemoryManager(),
- LlapDaemonCacheMetrics.create("test", "1"));
+ arenaSize * arenaCount, 0, tmpDir, new DummyMemoryManager(),
+ LlapDaemonCacheMetrics.create("test", "1"), null);
allocateUp(a, min, max, allocCount, true);
allocateDown(a, min, max, allocCount, true);
allocateDown(a, min, max, allocCount, false);
@@ -253,19 +252,34 @@ public class TestBuddyAllocator {
try {
a.allocateMultiple(allocs[index], size);
} catch (AllocatorOutOfMemoryException ex) {
- LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.debugDumpForOomInternal());
+ LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.testDump());
throw ex;
}
// LOG.info("Allocated " + allocCount + " of " + size + "; " + a.debugDump());
for (int j = 0; j < allocCount; ++j) {
MemoryBuffer mem = allocs[index][j];
long testValue = testValues[index][j] = rdm.nextLong();
- int pos = mem.getByteBufferRaw().position();
- mem.getByteBufferRaw().putLong(pos, testValue);
- int halfLength = mem.getByteBufferRaw().remaining() >> 1;
- if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) {
- mem.getByteBufferRaw().putLong(pos + halfLength, testValue);
- }
+ putTestValue(mem, testValue);
+ }
+ }
+
+ public static void putTestValue(MemoryBuffer mem, long testValue) {
+ int pos = mem.getByteBufferRaw().position();
+ mem.getByteBufferRaw().putLong(pos, testValue);
+ int halfLength = mem.getByteBufferRaw().remaining() >> 1;
+ if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) {
+ mem.getByteBufferRaw().putLong(pos + halfLength, testValue);
+ }
+ }
+
+ public static void checkTestValue(MemoryBuffer mem, long testValue, String str) {
+ int pos = mem.getByteBufferRaw().position();
+ assertEquals("Failed to match (" + pos + ") on " + str,
+ testValue, mem.getByteBufferRaw().getLong(pos));
+ int halfLength = mem.getByteBufferRaw().remaining() >> 1;
+ if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) {
+ assertEquals("Failed to match half (" + (pos + halfLength) + ") on " + str,
+ testValue, mem.getByteBufferRaw().getLong(pos + halfLength));
}
}
@@ -286,14 +300,9 @@ public class TestBuddyAllocator {
BuddyAllocator a, MemoryBuffer[] allocs, long[] testValues) {
for (int j = 0; j < allocs.length; ++j) {
LlapDataBuffer mem = (LlapDataBuffer)allocs[j];
- int pos = mem.getByteBufferRaw().position();
- assertEquals("Failed to match (" + pos + ") on " + j + "/" + allocs.length,
- testValues[j], mem.getByteBufferRaw().getLong(pos));
- int halfLength = mem.getByteBufferRaw().remaining() >> 1;
- if (halfLength + 8 <= mem.getByteBufferRaw().remaining()) {
- assertEquals("Failed to match half (" + (pos + halfLength) + ") on " + j + "/"
- + allocs.length, testValues[j], mem.getByteBufferRaw().getLong(pos + halfLength));
- }
+ long testValue = testValues[j];
+ String str = j + "/" + allocs.length;
+ checkTestValue(mem, testValue, str);
a.deallocate(mem);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java
new file mode 100644
index 0000000..79c44a7
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocatorForceEvict.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.common.io.Allocator.AllocatorOutOfMemoryException;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.cache.TestBuddyAllocator.DummyMemoryManager;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This relies on allocations basically being sequential, no internal reordering. Rather,
+ * the specific paths it takes do; all the scenarios should work regardless of allocation.
+ */
+public class TestBuddyAllocatorForceEvict {
+ private static final Logger LOG = LoggerFactory.getLogger(TestBuddyAllocatorForceEvict.class);
+ private static final DummyMemoryManager MM = new TestBuddyAllocator.DummyMemoryManager();
+ private static final LlapDaemonCacheMetrics METRICS = LlapDaemonCacheMetrics.create("test", "1");
+
+ @Test(timeout = 6000)
+ public void testSimple() {
+ runSimpleTests(false);
+ runSimpleTests(true);
+ }
+
+ public void runSimpleTests(boolean isBruteOnly) {
+ runSimple1to2Discard(create(1024, 1, 1024, true, isBruteOnly), 256);
+ runSimple1to2Discard(create(1024, 1, 1024, false, isBruteOnly), 256);
+ runSimple1to2Discard(create(512, 2, 1024, false, isBruteOnly), 256);
+ }
+
+ @Test(timeout = 6000)
+ public void testSmallBlocks() {
+ runSmallBlockersTests(false);
+ runSmallBlockersTests(true);
+ }
+
+ public void runSmallBlockersTests(boolean isBruteOnly) {
+ runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, false, false);
+ runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, true, false);
+ runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, false, true);
+ runSmallBlockersDiscard(create(1024, 1, 1024, false, isBruteOnly), 128, true, true);
+
+ runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, false, false);
+ runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, true, false);
+ runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, false, true);
+ runSmallBlockersDiscard(create(512, 2, 1024, false, isBruteOnly), 128, true, true);
+ }
+
+ @Test(timeout = 6000)
+ public void testZebra() {
+ runZebraTests(false);
+ runZebraTests(true);
+ }
+
+ public void runZebraTests(boolean isBruteOnly) {
+ runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 1);
+ runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 64, 8, 1);
+ runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 2);
+ runZebraDiscard(create(1024, 1, 1024, false, isBruteOnly), 32, 16, 4);
+
+ runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 32, 16, 1);
+ runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 64, 8, 1);
+ runZebraDiscard(create(512, 2, 1024, false, isBruteOnly), 32, 16, 2);
+
+ runZebraDiscard(create(256, 4, 1024, false, isBruteOnly), 32, 16, 2);
+ runZebraDiscard(create(256, 4, 1024, false, isBruteOnly), 32, 16, 4);
+ }
+
+ @Test(timeout = 6000)
+ public void testUnevenZebra() {
+ runUnevenZebraTests(false);
+ runUnevenZebraTests(true);
+ }
+
+ public void runUnevenZebraTests(boolean isBruteOnly) {
+ runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly),
+ new int[] { 256, 256, 128, 128, 128, 128 }, new int[] { 0, 2, 4 }, 512);
+ runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly),
+ new int[] { 256, 256, 64, 64, 64, 64, 64, 64, 64, 64 },
+ new int[] { 0, 2, 4, 6, 8 }, 512);
+
+ runCustomDiscard(create(512, 2, 1024, false, isBruteOnly),
+ new int[] { 256, 256, 128, 128, 128, 128 }, new int[] { 0, 2, 4 }, 512);
+ runCustomDiscard(create(512, 2, 1024, false, isBruteOnly),
+ new int[] { 256, 256, 64, 64, 64, 64, 64, 64, 64, 64 },
+ new int[] { 0, 2, 4, 6, 8 }, 512);
+ }
+
+ @Test(timeout = 6000)
+ public void testComplex1() {
+ runComplexTests(false);
+ runComplexTests(true);
+ }
+
+ public void runComplexTests(boolean isBruteOnly) {
+ runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly),
+ new int[] { 256, 128, 64, 64, 256, 64, 64, 128 },
+ new int[] { 0, 3, 6, 7 }, 512 );
+ runCustomDiscard(create(1024, 1, 1024, false, isBruteOnly),
+ new int[] { 256, 64, 64, 64, 64, 256, 64, 64, 128 },
+ new int[] { 0, 4, 7, 8 }, 512 );
+
+ runCustomDiscard(create(512, 2, 1024, false, isBruteOnly),
+ new int[] { 256, 128, 64, 64, 256, 64, 64, 128 },
+ new int[] { 0, 3, 6, 7 }, 512 );
+ runCustomDiscard(create(512, 2, 1024, false, isBruteOnly),
+ new int[] { 256, 64, 64, 64, 64, 256, 64, 64, 128 },
+ new int[] { 0, 4, 7, 8 }, 512 );
+ }
+
+ static class MttTestCallableResult {
+ public int successes, ooms, allocSize;
+ @Override
+ public String toString() {
+ return "allocation size " + allocSize + ": " + successes + " allocations, " + ooms + " OOMs";
+ }
+ }
+
+ static class MttTestCallable implements Callable<MttTestCallableResult> {
+ private final CountDownLatch cdlIn, cdlOut;
+ private final int allocSize, allocCount, iterCount;
+ private final BuddyAllocator a;
+
+ public MttTestCallable(CountDownLatch cdlIn, CountDownLatch cdlOut, BuddyAllocator a,
+ int allocSize, int allocCount, int iterCount) {
+ this.cdlIn = cdlIn;
+ this.cdlOut = cdlOut;
+ this.a = a;
+ this.allocSize = allocSize;
+ this.allocCount = allocCount;
+ this.iterCount = iterCount;
+ }
+
+ public MttTestCallableResult call() throws Exception {
+ LOG.info(Thread.currentThread().getId() + " thread starts");
+ TestBuddyAllocator.syncThreadStart(cdlIn, cdlOut);
+ MttTestCallableResult result = new MttTestCallableResult();
+ result.allocSize = allocSize;
+ List<MemoryBuffer> allocs = new ArrayList<>(allocCount);
+ LlapAllocatorBuffer[] dest = new LlapAllocatorBuffer[1];
+ for (int i = 0; i < iterCount; ++i) {
+ for (int j = 0; j < allocCount; ++j) {
+ try {
+ dest[0] = null;
+ a.allocateMultiple(dest, allocSize);
+ LlapAllocatorBuffer buf = dest[0];
+ assertTrue(buf.incRef() > 0);
+ allocs.add(buf);
+ ++result.successes;
+ buf.decRef();
+ } catch (AllocatorOutOfMemoryException ex) {
+ ++result.ooms;
+ } catch (Throwable ex) {
+ LOG.error("Failed", ex);
+ throw new Exception(ex);
+ }
+ }
+ for (MemoryBuffer buf : allocs) {
+ try {
+ a.deallocate(buf);
+ } catch (Throwable ex) {
+ LOG.error("Failed", ex);
+ throw new Exception(ex);
+ }
+ }
+ allocs.clear();
+ }
+ return result;
+ }
+ }
+
+ @Test(timeout = 200000)
+ public void testMtt() {
+ final int baseAllocSizeLog2 = 3, maxAllocSizeLog2 = 10, totalSize = 8192,
+ baseAllocSize = 1 << baseAllocSizeLog2, maxAllocSize = 1 << maxAllocSizeLog2;
+ final int threadCount = maxAllocSizeLog2 - baseAllocSizeLog2 + 1;
+ final int iterCount = 500;
+ final BuddyAllocator a = create(maxAllocSize, 4, totalSize, true, false);
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount + 1);
+ CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1);
+ @SuppressWarnings("unchecked")
+ FutureTask<MttTestCallableResult>[] allocTasks = new FutureTask[threadCount];
+ FutureTask<Void> dumpTask = createAllocatorDumpTask(a);
+ for (int allocSize = baseAllocSize, i = 0; allocSize <= maxAllocSize; allocSize <<= 1, ++i) {
+ allocTasks[i] = new FutureTask<>(new MttTestCallable(
+ cdlIn, cdlOut, a, allocSize, totalSize / allocSize, iterCount));
+ executor.execute(allocTasks[i]);
+ }
+ executor.execute(dumpTask);
+
+ runMttTest(a, allocTasks, cdlIn, cdlOut, dumpTask, null, null, totalSize, maxAllocSize);
+ }
+
+ public static void runMttTest(BuddyAllocator a, FutureTask<?>[] allocTasks,
+ CountDownLatch cdlIn, CountDownLatch cdlOut, FutureTask<Void> dumpTask,
+ FutureTask<Void> defragTask, AtomicBoolean defragStopped, int totalSize, int maxAllocSize) {
+ Throwable t = null;
+ try {
+ cdlIn.await(); // Wait for all threads to be ready.
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ cdlOut.countDown(); // Release them at the same time.
+ for (int i = 0; i < allocTasks.length; ++i) {
+ try {
+ Object result = allocTasks[i].get();
+ LOG.info("" + result);
+ } catch (Throwable tt) {
+ LOG.error("Test callable failed", tt);
+ if (t == null) {
+ a.dumpTestLog();
+ t = tt;
+ }
+ }
+ }
+ dumpTask.cancel(true);
+ if (defragTask != null) {
+ defragStopped.set(true);
+ try {
+ defragTask.get();
+ } catch (Throwable tt) {
+ LOG.error("Defragmentation thread failed", t);
+ if (t == null) {
+ a.dumpTestLog();
+ t = tt;
+ }
+ }
+ }
+ if (t != null) {
+ throw new RuntimeException("One of the errors", t);
+ }
+ // All the tasks should have deallocated their stuff. Make sure we can allocate everything.
+ LOG.info("Allocator state after all the tasks: " + a.testDump());
+ try {
+ allocate(a, totalSize / maxAllocSize, maxAllocSize, 0);
+ } catch (Throwable tt) {
+ a.dumpTestLog();
+ throw tt;
+ }
+ }
+
+ public static FutureTask<Void> createAllocatorDumpTask(final BuddyAllocator a) {
+ return new FutureTask<Void>(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ int logs = 40000; // Prevent excessive logging in case of deadlocks or slowness.
+ while ((--logs) >= 0) {
+ LOG.info("Allocator state (MTT): " + a.testDump());
+ Thread.sleep(10);
+ }
+ return null;
+ }
+ });
+ }
+
+ private static void runCustomDiscard(BuddyAllocator a, int[] sizes, int[] dealloc, int size) {
+ LlapAllocatorBuffer[] initial = prepareCustomFragmentedAllocator(a, sizes, dealloc, true);
+ LlapAllocatorBuffer after = allocate(a, 1, size, initial.length + 1)[0];
+ LOG.info("After: " + a.testDump());
+ for (int i = 0; i < initial.length; ++i) {
+ if (initial[i] == null) continue;
+ checkTestValue(initial[i], i + 1, null, false);
+ a.deallocate(initial[i]);
+ }
+ checkTestValue(after, initial.length + 1, null, true);
+ a.deallocate(after);
+ }
+
+ private static void runZebraDiscard(
+ BuddyAllocator a, int baseSize, int pairCount, int allocs) {
+ LlapAllocatorBuffer[] initial = prepareZebraFragmentedAllocator(a, baseSize, pairCount, true);
+ int allocFraction = allocs * 2;
+ int bigAllocSize = pairCount * 2 * baseSize / allocFraction;
+ LlapAllocatorBuffer[] after = allocate(a, allocs, bigAllocSize, 1 + initial.length);
+ LOG.info("After: " + a.testDump());
+ for (int i = 0; i < pairCount; ++i) {
+ int ix = (i << 1) + 1;
+ checkTestValue(initial[ix], ix + 1, null, false);
+ }
+ checkTestValue(after[0], 1 + initial.length, null, true);
+ }
+
+ public static LlapAllocatorBuffer[] prepareZebraFragmentedAllocator(
+ BuddyAllocator a, int baseSize, int pairCount, boolean doIncRef) {
+ // Allocate 1-1-... xN; free every other one, allocate N/2 (or N/4).
+ LlapAllocatorBuffer[] initial = allocate(a, pairCount * 2, baseSize, 1, doIncRef);
+ for (int i = 0; i < pairCount; ++i) {
+ a.deallocate(initial[i << 1]);
+ initial[i << 1] = null;
+ }
+ LOG.info("Before: " + a.testDump());
+ a.setOomLoggingForTest(true);
+ return initial;
+ }
+
+ private void runSimple1to2Discard(BuddyAllocator a, int baseSize) {
+ // Allocate 1-1-1-1; free 0&2; allocate 2
+ LlapAllocatorBuffer[] initial = prepareSimpleFragmentedAllocator(a, baseSize, true);
+ LlapAllocatorBuffer[] after = allocate(a, 1, baseSize * 2, 1 + initial.length);
+ LOG.info("After: " + a.testDump());
+ checkInitialValues(initial, 0, 2);
+ checkTestValue(after[0], 1 + initial.length, null, true);
+ a.deallocate(initial[0]);
+ a.deallocate(initial[2]);
+ a.deallocate(after[0]);
+ }
+
+ public static LlapAllocatorBuffer[] prepareSimpleFragmentedAllocator(
+ BuddyAllocator a, int baseSize, boolean doIncRef) {
+ LlapAllocatorBuffer[] initial = allocate(a, 4, baseSize, 1, doIncRef);
+ checkInitialValues(initial, 0, 2);
+ a.deallocate(initial[1]);
+ a.deallocate(initial[3]);
+ LOG.info("Before: " + a.testDump());
+ a.setOomLoggingForTest(true);
+ return initial;
+ }
+
+ private void runSmallBlockersDiscard(BuddyAllocator a,
+ int baseSize, boolean deallocOneFirst, boolean deallocOneSecond) {
+ LlapAllocatorBuffer[] initial = prepareAllocatorWithSmallFragments(
+ a, baseSize, deallocOneFirst, deallocOneSecond, true);
+ int bigAllocSize = baseSize * 4;
+ LlapAllocatorBuffer[] after = allocate(a, 1, bigAllocSize, 1 + initial.length);
+ LOG.info("After: " + a.testDump());
+ checkInitialValues(initial, 2, 4);
+ checkTestValue(after[0], 1 + initial.length, null, true);
+ }
+
+ public static LlapAllocatorBuffer[] prepareAllocatorWithSmallFragments(BuddyAllocator a,
+ int baseSize, boolean deallocOneFirst, boolean deallocOneSecond, boolean doIncRef) {
+ // Allocate 2-1-1-2-1-1; free 0,3 and optionally 1 or 5; allocate 4
+ int offset = 0;
+ LlapAllocatorBuffer[] initial = new LlapAllocatorBuffer[6];
+ initial[offset++] = allocate(a, 1, baseSize * 2, offset + 1, doIncRef)[0];
+ MemoryBuffer[] tmp = allocate(a, 2, baseSize, offset + 1);
+ System.arraycopy(tmp, 0, initial, offset, 2);
+ offset += 2;
+ initial[offset++] = allocate(a, 1, baseSize * 2, offset + 1, doIncRef)[0];
+ tmp = allocate(a, 2, baseSize, offset + 1);
+ System.arraycopy(tmp, 0, initial, offset, 2);
+ if (deallocOneFirst) {
+ a.deallocate(initial[1]);
+ }
+ if (deallocOneSecond) {
+ a.deallocate(initial[5]);
+ }
+ a.deallocate(initial[0]);
+ a.deallocate(initial[3]);
+ LOG.info("Before: " + a.testDump());
+ a.setOomLoggingForTest(true);
+ return initial;
+ }
+
+ private static void checkInitialValues(LlapAllocatorBuffer[] bufs, int... indexes) {
+ for (int index : indexes) {
+ LlapAllocatorBuffer buf = bufs[index];
+ if (!incRefIfNotEvicted(buf, false)) continue;
+ try {
+ checkTestValue(buf, index + 1, null, false);
+ } finally {
+ buf.decRef();
+ }
+ }
+ }
+
+ private static boolean incRefIfNotEvicted(LlapAllocatorBuffer buf, boolean mustExist) {
+ int rc = buf.tryIncRef();
+ if (rc == LlapAllocatorBuffer.INCREF_FAILED) {
+ fail("Failed to incref (bad state) " + buf);
+ }
+ if (rc <= 0 && mustExist) {
+ fail("Failed to incref (evicted) " + buf);
+ }
+ return rc > 0; // We expect evicted, but not failed.
+ }
+
+ private static void checkTestValue(
+ LlapAllocatorBuffer mem, long testValue, String str, boolean mustExist) {
+ if (!incRefIfNotEvicted(mem, mustExist)) return;
+ try {
+ TestBuddyAllocator.checkTestValue(mem, testValue, str);
+ } finally {
+ mem.decRef();
+ }
+ }
+
+ public static BuddyAllocator create(int max, int arenas, int total, boolean isShortcut,
+ boolean isBruteForceOnly) {
+ BuddyAllocator result = new BuddyAllocator(false, false, 8, max, arenas, total, 0,
+ null, MM, METRICS, isBruteForceOnly ? "brute" : null);
+ if (!isShortcut) {
+ result.disableDefragShortcutForTest();
+ }
+ result.setOomLoggingForTest(false);
+ return result;
+ }
+
+ private static LlapAllocatorBuffer[] allocate(
+ BuddyAllocator a, int count, int size, int baseValue) {
+ return allocate(a, count, size, baseValue, true);
+ }
+
+ public static LlapAllocatorBuffer[] allocate(
+ BuddyAllocator a, int count, int size, int baseValue, boolean doIncRef) {
+ LlapAllocatorBuffer[] allocs = new LlapAllocatorBuffer[count];
+ try {
+ a.allocateMultiple(allocs, size);
+ } catch (AllocatorOutOfMemoryException ex) {
+ LOG.error("Failed to allocate " + allocs.length + " of " + size + "; " + a.testDump());
+ throw ex;
+ }
+ for (int i = 0; i < count; ++i) {
+ // Make sure buffers are eligible for discard.
+ if (doIncRef) {
+ int rc = allocs[i].incRef();
+ assertTrue(rc > 0);
+ }
+ TestBuddyAllocator.putTestValue(allocs[i], baseValue + i);
+ if (doIncRef) {
+ allocs[i].decRef();
+ }
+ }
+ return allocs;
+ }
+
+ public static LlapAllocatorBuffer[] prepareCustomFragmentedAllocator(
+ BuddyAllocator a, int[] sizes, int[] dealloc, boolean doIncRef) {
+ LlapAllocatorBuffer[] initial = new LlapAllocatorBuffer[sizes.length];
+ for (int i = 0; i < sizes.length; ++i) {
+ initial[i] = allocate(a, 1, sizes[i], i + 1, doIncRef)[0];
+ }
+ for (int i = 0; i < dealloc.length; ++i) {
+ a.deallocate(initial[dealloc[i]]);
+ initial[dealloc[i]] = null;
+ }
+ LOG.info("Before: " + a.testDump());
+ a.setOomLoggingForTest(true);
+ return initial;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index e95f807..ab10285 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -59,7 +59,7 @@ public class TestLowLevelCacheImpl {
public void allocateMultiple(MemoryBuffer[] dest, int size) {
for (int i = 0; i < dest.length; ++i) {
LlapDataBuffer buf = new LlapDataBuffer();
- buf.initialize(0, null, -1, size);
+ buf.initialize(null, -1, size);
dest[i] = buf;
}
}
@@ -86,6 +86,12 @@ public class TestLowLevelCacheImpl {
public MemoryBuffer createUnallocated() {
return new LlapDataBuffer();
}
+
+ @Override
+ public void allocateMultiple(MemoryBuffer[] dest, int size,
+ BufferObjectFactory factory) throws AllocatorOutOfMemoryException {
+ allocateMultiple(dest, size);
+ }
}
private static class DummyCachePolicy implements LowLevelCachePolicy {
@@ -116,11 +122,6 @@ public class TestLowLevelCacheImpl {
}
@Override
- public long tryEvictContiguousData(int allocationSize, int count) {
- return count * allocationSize;
- }
-
- @Override
public void debugDumpShort(StringBuilder sb) {
}
}
@@ -249,7 +250,7 @@ Example code to test specific scenarios:
evict(cache, fakes[2]);
verifyCacheGet(cache, fn1, 1, 3, dr(1, 2), fakes[1]);
verifyCacheGet(cache, fn2, 1, 2, dr(1, 2));
- verifyRefcount(fakes, -1, 4, -1);
+ verifyRefcount(fakes, 0, 4, 0);
}
@Test
@@ -372,8 +373,8 @@ Example code to test specific scenarios:
continue;
}
++gets;
- LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).getBuffer();
- assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
+ LlapAllocatorBuffer result = (LlapAllocatorBuffer)((CacheChunk)iter).getBuffer();
+ assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.getArenaIndex());
cache.decRefBuffer(result);
iter = iter.next;
}
@@ -388,7 +389,7 @@ Example code to test specific scenarios:
MemoryBuffer[] buffers = new MemoryBuffer[count];
for (int j = 0; j < offsets.length; ++j) {
LlapDataBuffer buf = LowLevelCacheImpl.allocateFake();
- buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]);
+ buf.setNewAllocLocation(makeFakeArenaIndex(fileIndex, offsets[j]), 0);
buffers[j] = buf;
}
long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null);
@@ -401,7 +402,7 @@ Example code to test specific scenarios:
for (int j = 0; j < offsets.length; ++j) {
LlapDataBuffer buf = (LlapDataBuffer)(buffers[j]);
if ((maskVal & 1) == 1) {
- assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex);
+ assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.getArenaIndex());
}
maskVal >>= 1;
cache.decRefBuffer(buf);
@@ -415,7 +416,7 @@ Example code to test specific scenarios:
}
private int makeFakeArenaIndex(int fileIndex, long offset) {
- return (int)((fileIndex << 16) + offset);
+ return (int)((fileIndex << 12) + offset);
}
};
@@ -438,7 +439,7 @@ Example code to test specific scenarios:
if (r instanceof CacheChunk) {
LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).getBuffer();
cache.decRefBuffer(result);
- if (victim == null && result.invalidate()) {
+ if (victim == null && result.invalidate() == LlapCacheableBuffer.INVALIDATE_OK) {
++evictions;
victim = result;
}
@@ -491,7 +492,7 @@ Example code to test specific scenarios:
for (int i = 0; i < refCount; ++i) {
victimBuffer.decRef();
}
- assertTrue(victimBuffer.invalidate());
+ assertTrue(LlapCacheableBuffer.INVALIDATE_OK == victimBuffer.invalidate());
cache.notifyEvicted(victimBuffer);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 210cbb0..f86a37c 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -205,61 +204,6 @@ public class TestLowLevelLrfuCachePolicy {
assertNotSame(locked, evicted);
unlock(lrfu, locked);
}
-
-
- @Test
- public void testForceEvictBySize() {
- int heapSize = 12;
- LOG.info("Testing force-eviction out of order");
- Configuration conf = new Configuration();
- ArrayList<LlapDataBuffer> sizeTwo = new ArrayList<LlapDataBuffer>(4),
- sizeOne = new ArrayList<LlapDataBuffer>(4);
- conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.45f); // size of LFU heap is 4
- EvictionTracker et = new EvictionTracker();
- LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
- lrfu.setEvictionListener(et);
- for (int i = 0; i < 2; ++i) {
- sizeTwo.add(cacheSizeTwoFake(et, lrfu));
- for (int j = 0; j < 2; ++j) {
- LlapDataBuffer fake = LowLevelCacheImpl.allocateFake();
- assertTrue(cache(null, lrfu, et, fake));
- sizeOne.add(fake);
- }
- sizeTwo.add(cacheSizeTwoFake(et, lrfu));
- }
- // Now we should have two in the heap and two in the list, which is an implementation detail.
- // Evict only big blocks.
- et.evicted.clear();
- assertEquals(8, lrfu.tryEvictContiguousData(2, 4));
- for (int i = 0; i < sizeTwo.size(); ++i) {
- LlapDataBuffer block = et.evicted.get(i);
- assertTrue(block.isInvalid());
- assertSame(sizeTwo.get(i), block);
- }
- et.evicted.clear();
- // Evict small blocks when no big ones are available.
- assertEquals(2, lrfu.tryEvictContiguousData(2, 1));
- for (int i = 0; i < 2; ++i) {
- LlapDataBuffer block = et.evicted.get(i);
- assertTrue(block.isInvalid());
- assertSame(sizeOne.get(i), block);
- }
- et.evicted.clear();
- // Evict the rest.
- assertEquals(2, lrfu.evictSomeBlocks(3));
- for (int i = 2; i < sizeOne.size(); ++i) {
- LlapDataBuffer block = et.evicted.get(i - 2);
- assertTrue(block.isInvalid());
- assertSame(sizeOne.get(i), block);
- }
- }
-
- private LlapDataBuffer cacheSizeTwoFake(EvictionTracker et, LowLevelLrfuCachePolicy lrfu) {
- LlapDataBuffer fake = new LlapDataBuffer();
- fake.initialize(-1, ByteBuffer.wrap(new byte[2]), 0, 2);
- assertTrue(cache(null, lrfu, et, fake));
- return fake;
- }
// Buffers in test are fakes not linked to cache; notify cache policy explicitly.
public boolean cache(LowLevelCacheMemoryManager mm,
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 1d5954e..630f305 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
@@ -65,11 +66,6 @@ public class TestOrcMetadataCache {
}
@Override
- public long tryEvictContiguousData(int allocationSize, int count) {
- return 0;
- }
-
- @Override
public void debugDumpShort(StringBuilder sb) {
}
}
@@ -97,11 +93,6 @@ public class TestOrcMetadataCache {
}
@Override
- public long forceReservedMemory(int allocationSize, int count) {
- return allocationSize * count;
- }
-
- @Override
public void debugDumpShort(StringBuilder sb) {
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 5e718c3..7ad457d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -406,7 +406,7 @@ class EncodedReaderImpl implements EncodedReader {
hasError = false;
} finally {
// At this point, everything in the list is going to have a refcount of one. Unless it
- // failed between the allocation and the incref for a single item, we should be ok.
+ // failed between the allocation and the incref for a single item, we should be ok.
if (hasError) {
releaseInitialRefcounts(toRead.next);
if (toRelease != null) {
@@ -456,7 +456,7 @@ class EncodedReaderImpl implements EncodedReader {
if (sctx.stripeLevelStream == null) {
sctx.stripeLevelStream = POOLS.csdPool.take();
// We will be using this for each RG while also sending RGs to processing.
- // To avoid buffers being unlocked, run refcount one ahead; so each RG
+ // To avoid buffers being unlocked, run refcount one ahead; so each RG
// processing will decref once, and the last one will unlock the buffers.
sctx.stripeLevelStream.incRef();
// For stripe-level streams we don't need the extra refcount on the block.
@@ -706,8 +706,8 @@ class EncodedReaderImpl implements EncodedReader {
assert originalCbIndex >= 0;
// Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
// and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
- // is not in cache.
- cacheWrapper.getAllocator().deallocate(getBuffer());
+ // is not in cache. releaseBuffer will decref the buffer, and also deallocate.
+ cacheWrapper.releaseBuffer(this.buffer);
cacheWrapper.reuseBuffer(replacementBuffer);
// Replace the buffer in our big range list, as well as in current results.
this.buffer = replacementBuffer;
@@ -959,7 +959,7 @@ class EncodedReaderImpl implements EncodedReader {
* to handle just for this case.
* We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
* allocator. Uncompressed case is not mainline though so let's not complicate it.
- * @param kind
+ * @param kind
*/
private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start,
long streamOffset, long streamEnd, Kind kind) throws IOException {
@@ -1137,9 +1137,9 @@ class EncodedReaderImpl implements EncodedReader {
private void allocateMultiple(MemoryBuffer[] dest, int size) {
if (allocator != null) {
- allocator.allocateMultiple(dest, size, isStopped);
+ allocator.allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory(), isStopped);
} else {
- cacheWrapper.getAllocator().allocateMultiple(dest, size);
+ cacheWrapper.getAllocator().allocateMultiple(dest, size, cacheWrapper.getDataBufferFactory());
}
}
@@ -1477,7 +1477,7 @@ class EncodedReaderImpl implements EncodedReader {
ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset,
cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers, true);
if (compressed.remaining() <= 0 && toRelease.remove(compressed)) {
- releaseBuffer(compressed, true); // We copied the entire buffer.
+ releaseBuffer(compressed, true); // We copied the entire buffer.
} // else there's more data to process; will be handled in next call.
return cc;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
index 2172bd2..3aef6f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/StoppableAllocator.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
public interface StoppableAllocator extends Allocator {
/** Stoppable allocate method specific to branch-2. */
- void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped)
+ void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped)
throws AllocatorOutOfMemoryException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java
index 16b9713..775233c 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/Allocator.java
@@ -29,6 +29,10 @@ public interface Allocator {
private static final long serialVersionUID = 268124648177151761L;
}
+ public interface BufferObjectFactory {
+ MemoryBuffer create();
+ }
+
/**
* Allocates multiple buffers of a given size.
* @param dest Array where buffers are placed. Objects are reused if already there
@@ -36,14 +40,27 @@ public interface Allocator {
* @param size Allocation size.
* @throws AllocatorOutOfMemoryException Cannot allocate.
*/
+ @Deprecated
void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException;
/**
+ * Allocates multiple buffers of a given size.
+ * @param dest Array where buffers are placed. Objects are reused if already there
+ * (see createUnallocated), created otherwise.
+ * @param size Allocation size.
+ * @param factory A factory to create the objects in the dest array, if needed.
+ * @throws AllocatorOutOfMemoryException Cannot allocate.
+ */
+ void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory)
+ throws AllocatorOutOfMemoryException;
+
+ /**
* Creates an unallocated memory buffer object. This object can be passed to allocateMultiple
* to allocate; this is useful if data structures are created for separate buffers that can
* later be allocated together.
* @return a new unallocated memory buffer
*/
+ @Deprecated
MemoryBuffer createUnallocated();
/** Deallocates a memory buffer.
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
index e53b737..552f20e 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
@@ -100,4 +100,10 @@ public interface DataCache {
* @return the allocator
*/
Allocator getAllocator();
+
+ /**
+ * Gets the buffer object factory associated with this DataCache, to use with allocator.
+ * @return the factory
+ */
+ Allocator.BufferObjectFactory getDataBufferFactory();
}
[3/3] hive git commit: HIVE-16233 : llap: Query failed with
AllocatorOutOfMemoryException (Sergey Shelukhin,
reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)
Posted by se...@apache.org.
HIVE-16233 : llap: Query failed with AllocatorOutOfMemoryException (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bd32deb4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bd32deb4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bd32deb4
Branch: refs/heads/branch-2
Commit: bd32deb44b8b9baf96f2cdbbed8d9a58c3ec73d4
Parents: a4b9133
Author: sergey <se...@apache.org>
Authored: Thu Aug 23 14:23:44 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Aug 24 11:26:04 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 10 +-
.../hadoop/hive/llap/cache/BuddyAllocator.java | 1499 +++++++++++++++---
.../hive/llap/cache/LlapAllocatorBuffer.java | 396 +++++
.../hive/llap/cache/LlapCacheableBuffer.java | 6 +-
.../hadoop/hive/llap/cache/LlapDataBuffer.java | 119 +-
.../hive/llap/cache/LowLevelCacheImpl.java | 26 +-
.../llap/cache/LowLevelCacheMemoryManager.java | 54 +-
.../hive/llap/cache/LowLevelCachePolicy.java | 2 -
.../llap/cache/LowLevelFifoCachePolicy.java | 16 +-
.../llap/cache/LowLevelLrfuCachePolicy.java | 95 +-
.../hadoop/hive/llap/cache/MemoryManager.java | 2 -
.../hive/llap/cache/SerDeLowLevelCacheImpl.java | 115 +-
.../hadoop/hive/llap/cache/SimpleAllocator.java | 22 +-
.../hive/llap/cache/SimpleBufferManager.java | 12 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 11 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 14 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 70 +-
.../io/encoded/VectorDeserializeOrcWriter.java | 3 +
.../llap/io/metadata/OrcFileEstimateErrors.java | 4 +-
.../hive/llap/io/metadata/OrcFileMetadata.java | 4 +-
.../llap/io/metadata/OrcStripeMetadata.java | 4 +-
.../hive/llap/cache/TestBuddyAllocator.java | 73 +-
.../cache/TestBuddyAllocatorForceEvict.java | 470 ++++++
.../hive/llap/cache/TestLowLevelCacheImpl.java | 29 +-
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 56 -
.../hive/llap/cache/TestOrcMetadataCache.java | 11 +-
.../ql/io/orc/encoded/EncodedReaderImpl.java | 16 +-
.../ql/io/orc/encoded/StoppableAllocator.java | 3 +-
.../apache/hadoop/hive/common/io/Allocator.java | 17 +
.../apache/hadoop/hive/common/io/DataCache.java | 6 +
30 files changed, 2405 insertions(+), 760 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 53dfdd9..f884eda 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2947,7 +2947,7 @@ public class HiveConf extends Configuration {
"LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
"custom off-heap allocator, 'none' doesn't use either (this mode may result in\n" +
"significant performance degradation)"),
- LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "256Kb", new SizeValidator(),
+ LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "16Kb", new SizeValidator(),
"Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
"padded to minimum allocation. For ORC, should generally be the same as the expected\n" +
"compression buffer size, or next lowest power of 2. Must be a power of 2."),
@@ -2974,6 +2974,14 @@ public class HiveConf extends Configuration {
LLAP_ALLOCATOR_MAPPED_PATH("hive.llap.io.allocator.mmap.path", "/tmp",
new WritableDirectoryValidator(),
"The directory location for mapping NVDIMM/NVMe flash storage into the ORC low-level cache."),
+ LLAP_ALLOCATOR_DISCARD_METHOD("hive.llap.io.allocator.discard.method", "both",
+ new StringSet("freelist", "brute", "both"),
+ "Which method to use to force-evict blocks to deal with fragmentation:\n" +
+ "freelist - use half-size free list (discards less, but also less reliable); brute -\n" +
+ "brute force, discard whatever we can; both - first try free list, then brute force."),
+ LLAP_ALLOCATOR_DEFRAG_HEADROOM("hive.llap.io.allocator.defrag.headroom", "1Mb",
+ "How much of a headroom to leave to allow allocator more flexibility to defragment.\n" +
+ "The allocator would further cap it to a fraction of total memory."),
LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
"Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f,
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index af9243a..abe3fc8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -34,6 +34,7 @@ import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
+import org.apache.hive.common.util.FixedSizedObjectPool;
public final class BuddyAllocator
implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapOomDebugDump {
@@ -55,6 +57,8 @@ public final class BuddyAllocator
private final MemoryManager memoryManager;
private static final long MAX_DUMP_INTERVAL_NS = 300 * 1000000000L; // 5 minutes.
private final AtomicLong lastLog = new AtomicLong(-1);
+ private final LlapDaemonCacheMetrics metrics;
+ private static final int MAX_DISCARD_ATTEMPTS = 10, LOG_DISCARD_ATTEMPTS = 5;
// Config settings
private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;
@@ -63,19 +67,24 @@ public final class BuddyAllocator
private final boolean isDirect;
private final boolean isMapped;
private final Path cacheDir;
- private final LlapDaemonCacheMetrics metrics;
+
+ // These are only used for tests.
+ private boolean enableDefragShortcut = true, oomLogging = true;
// We don't know the acceptable size for Java array, so we'll use 1Gb boundary.
// That is guaranteed to fit any maximum allocation.
private static final int MAX_ARENA_SIZE = 1024*1024*1024;
// Don't try to operate with less than MIN_SIZE allocator space, it will just give you grief.
private static final int MIN_TOTAL_MEMORY_SIZE = 64*1024*1024;
+ // Maximum reasonable defragmentation headroom. Mostly kicks in on very small caches.
+ private static final float MAX_DEFRAG_HEADROOM_FRACTION = 0.01f;
private static final FileAttribute<Set<PosixFilePermission>> RWX = PosixFilePermissions
.asFileAttribute(PosixFilePermissions.fromString("rwx------"));
- private static final FileAttribute<Set<PosixFilePermission>> RW_ = PosixFilePermissions
- .asFileAttribute(PosixFilePermissions.fromString("rw-------"));
-
+ private final AtomicLong[] defragCounters;
+ private final boolean doUseFreeListDiscard, doUseBruteDiscard;
+ private final FixedSizedObjectPool<DiscardContext> ctxPool;
+ private final static boolean assertsEnabled = areAssertsEnabled();
public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) {
this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT),
@@ -83,9 +92,16 @@ public final class BuddyAllocator
(int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC),
(int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC),
HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT),
- getMaxTotalMemorySize(conf),
+ getMaxTotalMemorySize(conf),
+ HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_DEFRAG_HEADROOM),
HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED_PATH),
- mm, metrics);
+ mm, metrics, HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_DISCARD_METHOD));
+ }
+
+ private static boolean areAssertsEnabled() {
+ boolean assertsEnabled = false;
+ assert assertsEnabled = true;
+ return assertsEnabled;
}
private static long getMaxTotalMemorySize(Configuration conf) {
@@ -96,29 +112,21 @@ public final class BuddyAllocator
throw new RuntimeException("Allocator space is too small for reasonable operation; "
+ ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname + "=" + maxSize + ", but at least "
+ MIN_TOTAL_MEMORY_SIZE + " is required. If you cannot spare any memory, you can "
- + "disable LLAP IO entirely via " + ConfVars.LLAP_IO_ENABLED.varname + "; or set "
- + ConfVars.LLAP_IO_MEMORY_MODE.varname + " to 'none'");
+ + "disable LLAP IO entirely via " + ConfVars.LLAP_IO_ENABLED.varname);
}
@VisibleForTesting
- public BuddyAllocator(boolean isDirectVal, int minAllocVal, int maxAllocVal, int arenaCount,
- long maxSizeVal, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics) {
- this(isDirectVal, false /*isMapped*/, minAllocVal, maxAllocVal, arenaCount, maxSizeVal,
- null /* mapping path */, memoryManager, metrics);
- }
-
- @VisibleForTesting
- public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, int maxAllocVal,
- int arenaCount, long maxSizeVal, String mapPath, MemoryManager memoryManager,
- LlapDaemonCacheMetrics metrics) {
+ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal,
+ int maxAllocVal, int arenaCount, long maxSizeVal, long defragHeadroom, String mapPath,
+ MemoryManager memoryManager, LlapDaemonCacheMetrics metrics, String discardMethod) {
isDirect = isDirectVal;
isMapped = isMappedVal;
minAllocation = minAllocVal;
maxAllocation = maxAllocVal;
if (isMapped) {
try {
- cacheDir =
- Files.createTempDirectory(FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
+ cacheDir = Files.createTempDirectory(
+ FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
} catch (IOException ioe) {
// conf validator already checks this, so it will never trigger usually
throw new AssertionError("Configured mmap directory should be writable", ioe);
@@ -126,6 +134,69 @@ public final class BuddyAllocator
} else {
cacheDir = null;
}
+
+ arenaSize = validateAndDetermineArenaSize(arenaCount, maxSizeVal);
+ maxSize = validateAndDetermineMaxSize(maxSizeVal);
+ memoryManager.updateMaxSize(determineMaxMmSize(defragHeadroom, maxSize));
+
+ minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
+ maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
+ arenaSizeLog2 = 63 - Long.numberOfLeadingZeros(arenaSize);
+ maxArenas = (int)(maxSize / arenaSize);
+ arenas = new Arena[maxArenas];
+ for (int i = 0; i < maxArenas; ++i) {
+ arenas[i] = new Arena();
+ }
+ Arena firstArena = arenas[0];
+ firstArena.init(0);
+ allocatedArenas.set(1);
+ this.memoryManager = memoryManager;
+ defragCounters = new AtomicLong[maxAllocLog2 - minAllocLog2 + 1];
+ for (int i = 0; i < defragCounters.length; ++i) {
+ defragCounters[i] = new AtomicLong(0);
+ }
+ this.metrics = metrics;
+ metrics.incrAllocatedArena();
+ boolean isBoth = null == discardMethod || "both".equalsIgnoreCase(discardMethod);
+ doUseFreeListDiscard = isBoth || "freelist".equalsIgnoreCase(discardMethod);
+ doUseBruteDiscard = isBoth || "brute".equalsIgnoreCase(discardMethod);
+ ctxPool = new FixedSizedObjectPool<DiscardContext>(32,
+ new FixedSizedObjectPool.PoolObjectHelper<DiscardContext>() {
+ @Override
+ public DiscardContext create() {
+ return new DiscardContext();
+ }
+ @Override
+ public void resetBeforeOffer(DiscardContext t) {
+ }
+ });
+ }
+
+ public long determineMaxMmSize(long defragHeadroom, long maxMmSize) {
+ if (defragHeadroom > 0) {
+ long maxHeadroom = (long) Math.floor(maxSize * MAX_DEFRAG_HEADROOM_FRACTION);
+ defragHeadroom = Math.min(maxHeadroom, defragHeadroom);
+ LlapIoImpl.LOG.info("Leaving " + defragHeadroom + " of defragmentation headroom");
+ maxMmSize -= defragHeadroom;
+ }
+ return maxMmSize;
+ }
+
+ public long validateAndDetermineMaxSize(long maxSizeVal) {
+ if ((maxSizeVal % arenaSize) > 0) {
+ long oldMaxSize = maxSizeVal;
+ maxSizeVal = (maxSizeVal / arenaSize) * arenaSize;
+ LlapIoImpl.LOG.warn("Rounding cache size to " + maxSizeVal + " from " + oldMaxSize
+ + " to be divisible by arena size " + arenaSize);
+ }
+ if ((maxSizeVal / arenaSize) > Integer.MAX_VALUE) {
+ throw new RuntimeException(
+ "Too many arenas needed to allocate the cache: " + arenaSize + ", " + maxSizeVal);
+ }
+ return maxSizeVal;
+ }
+
+ public int validateAndDetermineArenaSize(int arenaCount, long maxSizeVal) {
long arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : maxSizeVal / arenaCount;
// The math.min, and the fact that maxAllocation is an int, ensures we don't overflow.
arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
@@ -156,91 +227,53 @@ public final class BuddyAllocator
LlapIoImpl.LOG.warn("Rounding arena size to " + arenaSizeVal + " from " + oldArenaSize
+ " to be divisible by allocation size " + maxAllocation);
}
- arenaSize = (int)arenaSizeVal;
- if ((maxSizeVal % arenaSize) > 0) {
- long oldMaxSize = maxSizeVal;
- maxSizeVal = (maxSizeVal / arenaSize) * arenaSize;
- LlapIoImpl.LOG.warn("Rounding cache size to " + maxSizeVal + " from " + oldMaxSize
- + " to be divisible by arena size " + arenaSize);
- }
- if ((maxSizeVal / arenaSize) > Integer.MAX_VALUE) {
- throw new RuntimeException(
- "Too many arenas needed to allocate the cache: " + arenaSize + ", " + maxSizeVal);
- }
- maxSize = maxSizeVal;
- memoryManager.updateMaxSize(maxSize);
- minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
- maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
- arenaSizeLog2 = 63 - Long.numberOfLeadingZeros(arenaSize);
- maxArenas = (int)(maxSize / arenaSize);
- arenas = new Arena[maxArenas];
- for (int i = 0; i < maxArenas; ++i) {
- arenas[i] = new Arena();
- }
- arenas[0].init();
- allocatedArenas.set(1);
- this.memoryManager = memoryManager;
-
- this.metrics = metrics;
- metrics.incrAllocatedArena();
+ return (int)arenaSizeVal;
}
-
@Override
public void allocateMultiple(MemoryBuffer[] dest, int size)
throws AllocatorOutOfMemoryException {
- allocateMultiple(dest, size, null);
+ allocateMultiple(dest, size, null, null);
}
@Override
- public void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped)
+ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory)
+ throws AllocatorOutOfMemoryException {
+ allocateMultiple(dest, size, factory, null);
+ }
+
+ @Override
+ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped)
throws AllocatorOutOfMemoryException {
assert size > 0 : "size is " + size;
if (size > maxAllocation) {
throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
}
- int freeListIx = 31 - Integer.numberOfLeadingZeros(size);
- if (size != (1 << freeListIx)) ++freeListIx; // not a power of two, add one more
- freeListIx = Math.max(freeListIx - minAllocLog2, 0);
+ int freeListIx = determineFreeListForAllocation(size);
int allocLog2 = freeListIx + minAllocLog2;
int allocationSize = 1 << allocLog2;
// TODO: reserving the entire thing is not ideal before we alloc anything. Interleave?
memoryManager.reserveMemory(dest.length << allocLog2, isStopped);
- int destAllocIx = 0;
+
for (int i = 0; i < dest.length; ++i) {
if (dest[i] != null) continue;
- dest[i] = createUnallocated(); // TODO: pool of objects?
+ // Note: this is backward compat only. Should be removed with createUnallocated.
+ dest[i] = factory != null ? factory.create() : createUnallocated();
}
+
// First try to quickly lock some of the correct-sized free lists and allocate from them.
int arenaCount = allocatedArenas.get();
if (arenaCount < 0) {
arenaCount = -arenaCount - 1; // Next arena is being allocated.
}
+
+ // Note: we might want to be smarter if threadId-s are low and there more arenas than threads.
long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
- {
- int startArenaIx = (int)(threadId % arenaCount), index = startArenaIx;
- do {
- int newDestIx = arenas[index].allocateFast(
- index, freeListIx, dest, destAllocIx, allocationSize);
- if (newDestIx == dest.length) return;
- assert newDestIx != -1;
- destAllocIx = newDestIx;
- if ((++index) == arenaCount) {
- index = 0;
- }
- } while (index != startArenaIx);
- }
-
- // 1) We can get fragmented on large blocks of uncompressed data. The memory might be
- // in there, but it might be in separate small blocks. This is a complicated problem, and
- // several solutions (in order of decreasing ugliness and increasing complexity) are: just
- // ask to evict the exact-sized block (there may be no such block), evict from a particular
- // arena (policy would know allocator internals somewhat), store buffer mapping and ask to
- // evict from specific choice of blocks next to each other or next to already-evicted block,
- // and finally do a compaction (requires a block mapping and complex sync). For now we'd just
- // force-evict some memory and avoid both complexity and ugliness, since large blocks are rare.
- // 2) Fragmentation aside (TODO: and this is a very hacky solution for that),
- // we called reserveMemory so we know that there's memory waiting for us somewhere.
+ int destAllocIx = allocateFast(dest, null, 0, dest.length,
+ freeListIx, allocationSize, (int)(threadId % arenaCount), arenaCount);
+ if (destAllocIx == dest.length) return;
+
+ // We called reserveMemory so we know that there's memory waiting for us somewhere.
// However, we have a class of rare race conditions related to the order of locking/checking of
// different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2.
// We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2;
@@ -255,63 +288,361 @@ public final class BuddyAllocator
// allocator thread (or threads per arena).
// The 2nd one is probably much simpler and will allow us to get rid of a lot of sync code.
// But for now we will just retry. We will evict more each time.
- long forceReserved = 0;
int attempt = 0;
+ boolean isFailed = false;
+ int memoryForceReleased = 0;
try {
+ int discardFailed = 0;
while (true) {
- // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once
- {
- int startArenaIx = (int)((threadId + attempt) % arenaCount), arenaIx = startArenaIx;
- do {
- int newDestIx = arenas[arenaIx].allocateWithSplit(
- arenaIx, freeListIx, dest, destAllocIx, allocationSize);
- if (newDestIx == dest.length) return;
- assert newDestIx != -1;
- destAllocIx = newDestIx;
- if ((++arenaIx) == arenaCount) {
- arenaIx = 0;
- }
- } while (arenaIx != startArenaIx);
- }
+ // Try to split bigger blocks.
+ int startArenaIx = (int)((threadId + attempt) % arenaCount);
+ destAllocIx = allocateWithSplit(dest, null, destAllocIx, dest.length,
+ freeListIx, allocationSize, startArenaIx, arenaCount, -1);
+ if (destAllocIx == dest.length) return;
if (attempt == 0) {
// Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
- for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) {
- destAllocIx = arenas[arenaIx].allocateWithExpand(
- arenaIx, freeListIx, dest, destAllocIx, allocationSize);
+ destAllocIx = allocateWithExpand(
+ dest, destAllocIx, freeListIx, allocationSize, arenaCount);
+ if (destAllocIx == dest.length) return;
+ }
+
+ // Try to force-evict the fragments of the requisite size.
+ boolean hasDiscardedAny = false;
+ DiscardContext ctx = ctxPool.take();
+ try {
+ // Brute force may discard up to twice as many buffers.
+ int maxListSize = 1 << (doUseBruteDiscard ? freeListIx : (freeListIx - 1));
+ int requiredBlocks = dest.length - destAllocIx;
+ ctx.init(maxListSize, requiredBlocks);
+ // First, try to use the blocks of half size in every arena.
+ if (doUseFreeListDiscard && freeListIx > 0) {
+ discardBlocksBasedOnFreeLists(freeListIx, startArenaIx, arenaCount, ctx);
+ memoryForceReleased += ctx.memoryReleased;
+ hasDiscardedAny = ctx.resultCount > 0;
+ destAllocIx = allocateFromDiscardResult(
+ dest, destAllocIx, freeListIx, allocationSize, ctx);
if (destAllocIx == dest.length) return;
}
+ // Then, try the brute force search for something to throw away.
+ if (doUseBruteDiscard) {
+ ctx.resetResults();
+ discardBlocksBruteForce(freeListIx, startArenaIx, arenaCount, ctx);
+ memoryForceReleased += ctx.memoryReleased;
+ hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0);
+ destAllocIx = allocateFromDiscardResult(
+ dest, destAllocIx, freeListIx, allocationSize, ctx);
+
+ if (destAllocIx == dest.length) return;
+ }
+ } finally {
+ ctxPool.offer(ctx);
}
- int numberToForce = (dest.length - destAllocIx) * (attempt + 1);
- long newReserved = memoryManager.forceReservedMemory(allocationSize, numberToForce);
- forceReserved += newReserved;
- if (newReserved == 0) {
- // Cannot force-evict anything, give up.
+ if (hasDiscardedAny) {
+ discardFailed = 0;
+ } else if (++discardFailed > MAX_DISCARD_ATTEMPTS) {
String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of "
+ dest.length + " (entire cache is fragmented and locked, or an internal issue)";
logOomErrorMessage(msg);
+ isFailed = true;
throw new AllocatorOutOfMemoryException(msg);
}
- if (attempt == 0) {
- LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry");
- }
++attempt;
}
} finally {
- if (attempt > 4) {
- LlapIoImpl.LOG.warn("Allocation of " + dest.length + " buffers of size " + size
- + " took " + attempt + " attempts to evict enough memory");
+ memoryManager.releaseMemory(memoryForceReleased);
+ if (!isFailed && attempt >= LOG_DISCARD_ATTEMPTS) {
+ LlapIoImpl.LOG.info("Allocation of " + dest.length + " buffers of size " + size + " took "
+ + attempt + " attempts to free enough memory; force-released " + memoryForceReleased);
+ }
+ }
+ }
+
+ /** The context for the forced eviction of buffers. */
+ private static final class DiscardContext {
+ long[] results;
+ int resultCount;
+ int memoryReleased;
+
+ /**
+ * The headers for blocks we've either locked to move (if allocated), or have taken out
+ * of the free lists (if not) so that nobody allocates them while we are freeing space.
+ * All the headers will be from the arena currently being processed.
+ */
+ int[] victimHeaders;
+ int victimCount; // The count of the elements of the above that are set.
+
+ /**
+ * List-based: the base free buffers that will be paired with the space freed from
+ * victimHeaders to create the buffers of allocation size.
+ * Brute force: the buffers (that do not exist as separate buffers) composed of victimHeaders
+ * buffers; the future result buffers.
+ * All the headers will be from the arena currently being processed.
+ */
+ int[] baseHeaders;
+ int baseCount; // The count of the elements of the above that are set.
+
+ /**
+ * How many more results (or base headers) do we need to find?
+ * This object is reused between arenas; this is the only counter that is preserved.
+ */
+ int remainingToFind;
+
+ /** The headers from abandoned moved attempts that cannot yet be returned to the
+ * free lists, or unlocked due to some lock being held and deadlock potential. */
+ int[] abandonedHeaders;
+ int abandonedCount;
+
+ void init(int headersPerOneReq, int reqCount) {
+ resetResults();
+ remainingToFind = reqCount;
+ if (results == null || results.length < reqCount) {
+ results = new long[reqCount];
+ baseHeaders = new int[reqCount];
+ }
+ int maxVictimCount = headersPerOneReq * reqCount;
+ if (victimHeaders == null || victimHeaders.length < maxVictimCount) {
+ victimHeaders = new int[maxVictimCount];
+ }
+ }
+
+ void resetResults() {
+ resetBetweenArenas();
+ resultCount = memoryReleased = 0;
+ }
+
+ void resetBetweenArenas() {
+ // Reset everything for the next arena; assume everything has been cleaned.
+ victimCount = baseCount = abandonedCount = 0;
+ }
+
+ public void addResult(int arenaIx, int freeHeaderIx) {
+ results[resultCount] = makeIntPair(arenaIx, freeHeaderIx);
+ ++resultCount;
+ }
+
+ public void addBaseHeader(int headerIx) {
+ baseHeaders[baseCount] = headerIx;
+ ++baseCount;
+ --remainingToFind;
+ }
+
+ @Override
+ public String toString() {
+ return "[victimHeaders=" + Arrays.toString(victimHeaders) + ", victimCount="
+ + victimCount + ", baseHeaders=" + Arrays.toString(baseHeaders) + ", baseCount="
+ + baseCount + ", remainingToFind=" + remainingToFind + "]";
+ }
+ }
+
+ private void discardBlocksBasedOnFreeLists(
+ int freeListIx, int startArenaIx, int arenaCount, DiscardContext ctx) {
+ defragCounters[freeListIx].incrementAndGet();
+ // The free list level the blocks from which we need to merge.
+ final int mergeListIx = freeListIx - 1;
+
+ // Try to allocate using base-buffer approach from each arena.
+ int arenaIx = startArenaIx;
+ do {
+ Arena arena = arenas[arenaIx];
+ // Reserve blocks in this arena that would empty the sections of requisite size.
+ arena.reserveDiscardBlocksBasedOnFreeList(mergeListIx, ctx);
+ // Discard the blocks.
+ discardFromCtxBasedOnFreeList(arena, ctx, freeListIx);
+
+ if (ctx.remainingToFind == 0) return; // Reserved as much as we needed.
+ ctx.resetBetweenArenas();
+ arenaIx = getNextIx(arenaIx, arenaCount, 1);
+ } while (arenaIx != startArenaIx);
+ }
+
+ private void discardBlocksBruteForce(
+ int freeListIx, int startArenaIx, int arenaCount, DiscardContext ctx) {
+ // We are going to use this counter as a pseudo-random number for the start of the search.
+ // This is to avoid churning at the beginning of the arena all the time.
+ long counter = defragCounters[freeListIx].incrementAndGet();
+ // How many blocks of this size comprise an arena.
+ int positionsPerArena = 1 << (arenaSizeLog2 - (minAllocLog2 + freeListIx));
+ // Compute the pseudo-random position from the above, then derive the actual header.
+ int startHeaderIx = ((int) (counter % positionsPerArena)) << freeListIx;
+
+ // Try to allocate using brute force approach from each arena.
+ int arenaIx = startArenaIx;
+ do {
+ Arena arena = arenas[arenaIx];
+ // Reserve blocks in this arena that would empty the sections of requisite size.
+ arena.reserveDiscardBruteForce(freeListIx, ctx, startHeaderIx);
+ // Discard the blocks.
+ discardFromCtxBruteForce(arena, ctx, freeListIx);
+
+ if (ctx.remainingToFind == 0) return; // Reserved as much as we needed.
+ ctx.resetBetweenArenas();
+ arenaIx = getNextIx(arenaIx, arenaCount, 1);
+ } while (arenaIx != startArenaIx);
+ }
+
+ /**
+ * Frees up memory by deallocating based on base and victim buffers in MoveContext.
+ * @param freeListIx The list for which the blocks are being merged.
+ */
+ private void discardFromCtxBasedOnFreeList(Arena arena, DiscardContext ctx, int freeListIx) {
+ // Discard all the locked blocks.
+ discardAllBuffersFromCtx(arena, ctx);
+ // Finalize the headers.
+ for (int baseIx = ctx.baseCount - 1; baseIx >= 0; --baseIx) {
+ int baseHeaderIx = ctx.baseHeaders[baseIx];
+ int minHeaderIx = Math.min(baseHeaderIx, getBuddyHeaderIx(freeListIx - 1, baseHeaderIx));
+ finalizeDiscardResult(arena, ctx, freeListIx, minHeaderIx);
+ }
+ }
+
+ /**
+ * Frees up memory by deallocating based on base and victim buffers in MoveContext.
+ * @param freeListIx The list for which the blocks are being merged.
+ */
+ private void discardFromCtxBruteForce(Arena arena, DiscardContext ctx, int freeListIx) {
+ // Discard all the locked blocks.
+ discardAllBuffersFromCtx(arena, ctx);
+ // Finalize the headers.
+ for (int baseIx = ctx.baseCount - 1; baseIx >= 0; --baseIx) {
+ finalizeDiscardResult(arena, ctx, freeListIx, ctx.baseHeaders[baseIx]);
+ }
+ }
+
+ /**
+ * Sets the headers correctly for a newly-freed buffer after discarding stuff.
+ */
+ private void finalizeDiscardResult(
+ Arena arena, DiscardContext ctx, int freeListIx, int newlyFreeHeaderIx) {
+ int maxHeaderIx = newlyFreeHeaderIx + (1 << freeListIx);
+ if (assertsEnabled) {
+ arena.checkHeader(newlyFreeHeaderIx, -1, true);
+ }
+ arena.unsetHeaders(newlyFreeHeaderIx + 1, maxHeaderIx, CasLog.Src.CLEARED_VICTIM);
+ // Set the leftmost header of the base and its buddy (that are now being merged).
+ arena.setHeaderNoBufAlloc(newlyFreeHeaderIx, freeListIx, CasLog.Src.NEWLY_CLEARED);
+ ctx.addResult(arena.arenaIx, newlyFreeHeaderIx);
+ }
+
+ /**
+ * Discards all the victim buffers in the context.
+ */
+ private void discardAllBuffersFromCtx(Arena arena, DiscardContext ctx) {
+ for (int victimIx = 0; victimIx < ctx.victimCount; ++victimIx) {
+ int victimHeaderIx = ctx.victimHeaders[victimIx];
+ // Note: no location check here; the buffer is always locked for move.
+ LlapAllocatorBuffer buf = arena.buffers[victimHeaderIx];
+ if (buf == null) continue;
+ if (assertsEnabled) {
+ arena.checkHeader(victimHeaderIx, -1, true);
+ byte header = arena.headers[victimHeaderIx];
+ assertBufferLooksValid(freeListFromHeader(header), buf, arena.arenaIx, victimHeaderIx);
}
- // After we succeed (or fail), release the force-evicted memory to memory manager. We have
- // previously reserved enough to allocate all we need, so we don't take our allocation out
- // of this - as per the comment above, we basically just wasted a bunch of cache (and CPU).
- if (forceReserved > 0) {
- memoryManager.releaseMemory(forceReserved);
+ // We do not modify the header here; the caller will use this space.
+ arena.buffers[victimHeaderIx] = null;
+ long memUsage = buf.getMemoryUsage();
+ Boolean result = buf.endDiscard();
+ if (result == null) {
+ ctx.memoryReleased += memUsage; // We have essentially deallocated this.
+ } else if (result) {
+ // There was a parallel deallocate; it didn't account for the memory.
+ memoryManager.releaseMemory(memUsage);
+ } else {
+ // There was a parallel cache eviction - the evictor is accounting for the memory.
}
}
}
+ /**
+ * Unlocks the buffer after the discard has been abandoned.
+ */
+ private void cancelDiscard(LlapAllocatorBuffer buf, int arenaIx, int headerIx) {
+ Boolean result = buf.cancelDiscard();
+ if (result == null) return;
+ // If the result is not null, the buffer was evicted during the move.
+ if (result) {
+ long memUsage = buf.getMemoryUsage(); // Release memory - simple deallocation.
+ arenas[arenaIx].deallocate(buf, true);
+ memoryManager.releaseMemory(memUsage);
+ } else {
+ arenas[arenaIx].deallocate(buf, true); // No need to release memory - cache eviction.
+ }
+ }
+
+ /**
+ * Tries to allocate destCount - destIx blocks, using best-effort fast allocation.
+ * @param dest Option 1 - memory allocated is stored in these buffers.
+ * @param destHeaders Option 2 - memory allocated is reserved and headers returned via this.
+ * @param destIx The start index in either array where allocations are to be saved.
+ * @param destCount The end index in either array where allocations are to be saved.
+ * @param freeListIx The free list from which to allocate.
+ * @param allocSize Allocation size.
+ * @param startArenaIx From which arena to start allocating.
+ * @param arenaCount The active arena count.
+ * @return The index in the array until which the memory has been allocated.
+ */
+ private int allocateFast(MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount,
+ int freeListIx, int allocSize, int startArenaIx, int arenaCount) {
+ int index = startArenaIx;
+ do {
+ int newDestIx = arenas[index].allocateFast(
+ freeListIx, dest, destHeaders, destIx, destCount, allocSize);
+ if (newDestIx == destCount) return newDestIx;
+ assert newDestIx != -1;
+ destIx = newDestIx;
+ index = getNextIx(index, arenaCount, 1);
+ } while (index != startArenaIx);
+ return destIx;
+ }
+
+ /**
+ * Tries to allocate destCount - destIx blocks by allocating new arenas, if needed. Same args
+ * as allocateFast, except the allocations start at arenaCount (the first unallocated arena).
+ */
+ private int allocateWithExpand(MemoryBuffer[] dest, int destIx,
+ int freeListIx, int allocSize, int arenaCount) {
+ for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) {
+ destIx = arenas[arenaIx].allocateWithExpand(
+ arenaIx, freeListIx, dest, destIx, allocSize);
+ if (destIx == dest.length) return destIx;
+ }
+ return destIx;
+ }
+
+ /**
+ * Tries to allocate destCount - destIx blocks, waiting for locks and splitting the larger
+ * blocks if the correct sized blocks are not available. Args the same as allocateFast.
+ */
+ private int allocateWithSplit(MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount,
+ int freeListIx, int allocSize, int startArenaIx, int arenaCount, int maxSplitFreeListIx) {
+ int arenaIx = startArenaIx;
+ do {
+ int newDestIx = arenas[arenaIx].allocateWithSplit(
+ freeListIx, dest, destHeaders, destIx, destCount, allocSize, maxSplitFreeListIx);
+ if (newDestIx == destCount) return newDestIx;
+ assert newDestIx != -1;
+ destIx = newDestIx;
+ arenaIx = getNextIx(arenaIx, arenaCount, 1);
+ } while (arenaIx != startArenaIx);
+ return destIx;
+ }
+
+ /**
+ * Tries to allocate destCount - destIx blocks after the forced eviction of some other buffers.
+ * Args the same as allocateFast.
+ */
+ private int allocateFromDiscardResult(MemoryBuffer[] dest, int destAllocIx,
+ int freeListIx, int allocationSize, DiscardContext discardResult) {
+ for (int i = 0; i < discardResult.resultCount; ++i) {
+ long result = discardResult.results[i];
+ destAllocIx = arenas[getFirstInt(result)].allocateFromDiscard(
+ dest, destAllocIx, getSecondInt(result), freeListIx, allocationSize);
+ }
+ return destAllocIx;
+ }
+
private void logOomErrorMessage(String msg) {
+ if (!oomLogging) return;
while (true) {
long time = System.nanoTime();
long lastTime = lastLog.get();
@@ -321,7 +652,7 @@ public final class BuddyAllocator
continue;
}
if (shouldLog) {
- LlapIoImpl.LOG.error(msg + debugDumpForOom());
+ LlapIoImpl.LOG.error(msg + debugDumpForOomInternal());
} else {
LlapIoImpl.LOG.error(msg);
}
@@ -329,7 +660,7 @@ public final class BuddyAllocator
}
}
- /**
+ /**
* Arbitrarily, we start getting the state from Allocator. Allocator calls MM which calls
* the policies that call the eviction dispatcher that calls the caches. See init - these all
* are connected in a cycle, so we need to make sure the who-calls-whom order is definite.
@@ -337,6 +668,10 @@ public final class BuddyAllocator
@Override
public void debugDumpShort(StringBuilder sb) {
memoryManager.debugDumpShort(sb);
+ sb.append("\nDefrag counters: ");
+ for (int i = 0; i < defragCounters.length; ++i) {
+ sb.append(defragCounters[i].get()).append(", ");
+ }
sb.append("\nAllocator state:");
int unallocCount = 0, fullCount = 0;
long totalFree = 0;
@@ -358,21 +693,24 @@ public final class BuddyAllocator
@Override
public void deallocate(MemoryBuffer buffer) {
- deallocateInternal(buffer, true);
+ LlapAllocatorBuffer buf = (LlapAllocatorBuffer)buffer;
+ int arenaToRelease = buf.invalidateAndRelease();
+ if (arenaToRelease < 0) return; // The block is being moved; the move will release memory.
+ long memUsage = buf.getMemoryUsage();
+ arenas[arenaToRelease].deallocate(buf, false);
+ memoryManager.releaseMemory(memUsage);
}
@Override
public void deallocateEvicted(MemoryBuffer buffer) {
- deallocateInternal(buffer, false);
- }
-
- private void deallocateInternal(MemoryBuffer buffer, boolean doReleaseMemory) {
- LlapDataBuffer buf = (LlapDataBuffer)buffer;
- long memUsage = buf.getMemoryUsage();
- arenas[buf.arenaIndex].deallocate(buf);
- if (doReleaseMemory) {
- memoryManager.releaseMemory(memUsage);
- }
+ LlapAllocatorBuffer buf = (LlapAllocatorBuffer)buffer;
+ assert buf.isInvalid();
+ int arenaToRelease = buf.releaseInvalidated();
+ if (arenaToRelease < 0) return; // The block is being moved; the move will release memory.
+ arenas[arenaToRelease].deallocate(buf, false);
+ // Note: for deallocateEvicted, we do not release the memory to memManager; it may
+ // happen that the evictor tries to use the allowance before the move finishes.
+ // Retrying/more defrag should take care of that.
}
@Override
@@ -380,42 +718,7 @@ public final class BuddyAllocator
return isDirect;
}
- public String debugDumpForOomInternal() {
- StringBuilder result = new StringBuilder(
- "NOTE: with multiple threads the dump is not guaranteed to be consistent");
- for (Arena arena : arenas) {
- arena.debugDump(result);
- }
- return result.toString();
- }
-
- // BuddyAllocatorMXBean
- @Override
- public boolean getIsDirect() {
- return isDirect;
- }
-
- @Override
- public int getMinAllocation() {
- return minAllocation;
- }
-
- @Override
- public int getMaxAllocation() {
- return maxAllocation;
- }
-
- @Override
- public int getArenaSize() {
- return arenaSize;
- }
-
- @Override
- public long getMaxCacheSize() {
- return maxSize;
- }
-
- private ByteBuffer preallocate(int arenaSize) {
+ private ByteBuffer preallocateArenaBuffer(int arenaSize) {
if (isMapped) {
RandomAccessFile rwf = null;
File rf = null;
@@ -445,20 +748,28 @@ public final class BuddyAllocator
}
private class Arena {
+ private static final int FAILED_TO_RESERVE = Integer.MAX_VALUE;
+
+ private int arenaIx;
private ByteBuffer data;
// Avoid storing headers with data since we expect binary size allocations.
// Each headers[i] is a "virtual" byte at i * minAllocation.
- private byte[] headers;
+ private LlapAllocatorBuffer[] buffers;
+ // The TS rule for headers is - a header and buffer array element for some freeList
+ // can only be modified if the corresponding freeList lock is held.
+ private byte[] headers; // Free list indices of each unallocated block, for quick lookup.
private FreeList[] freeLists;
- void init() {
+ void init(int arenaIx) {
+ this.arenaIx = arenaIx;
try {
- data = preallocate(arenaSize);
+ data = preallocateArenaBuffer(arenaSize);
} catch (OutOfMemoryError oom) {
throw new OutOfMemoryError("Cannot allocate " + arenaSize + " bytes: " + oom.getMessage()
+ "; make sure your xmx and process size are set correctly.");
}
int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
+ buffers = new LlapAllocatorBuffer[maxMinAllocs];
headers = new byte[maxMinAllocs];
int allocLog2Diff = maxAllocLog2 - minAllocLog2, freeListCount = allocLog2Diff + 1;
freeLists = new FreeList[freeListCount];
@@ -469,14 +780,350 @@ public final class BuddyAllocator
headerIndex = 0, headerStep = 1 << allocLog2Diff;
freeLists[allocLog2Diff].listHead = 0;
for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
- // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
- headers[headerIndex] = makeHeader(allocLog2Diff, false);
+ setHeaderFree(headerIndex, allocLog2Diff, CasLog.Src.CTOR);
data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerStep));
data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerStep));
headerIndex += headerStep;
}
}
+ public void checkHeader(int headerIx, int freeListIx, boolean isLocked) {
+ checkHeaderByte(arenaIx, headerIx, freeListIx, isLocked, headers[headerIx]);
+ }
+
+ /**
+ * Reserves the blocks to use to merge into larger blocks.
+ * @param freeListIx The free list to reserve for.
+ * @param startHeaderIx The header index at which to start looking (to avoid churn at 0).
+ */
+ public void reserveDiscardBruteForce(int freeListIx, DiscardContext ctx, int startHeaderIx) {
+ if (data == null) return; // not allocated yet
+ int headerStep = 1 << freeListIx;
+ int headerIx = startHeaderIx;
+ do {
+ long reserveResult = reserveBlockContents(
+ freeListIx, headerIx, ctx.victimHeaders, ctx.victimCount, true);
+ int reservedCount = getFirstInt(reserveResult), moveSize = getSecondInt(reserveResult);
+ if (moveSize == FAILED_TO_RESERVE) {
+ for (int i = ctx.victimCount; i < ctx.victimCount + reservedCount; ++i) {
+ abandonOneHeaderBeingMoved(ctx.victimHeaders[i], CasLog.Src.ABANDON_MOVE);
+ }
+ } else {
+ ctx.victimCount += reservedCount;
+ ctx.addBaseHeader(headerIx);
+ }
+ headerIx = getNextIx(headerIx, headers.length, headerStep);
+ } while (ctx.remainingToFind > 0 && headerIx != startHeaderIx);
+ }
+
+ /**
+ * Reserves the blocks to use to merge into larger blocks.
+ * @param mergeListIx The free list to reserve base blocks from.
+ */
+ public void reserveDiscardBlocksBasedOnFreeList(int mergeListIx, DiscardContext ctx) {
+ if (data == null) return; // not allocated yet
+ FreeList freeList = freeLists[mergeListIx];
+ freeList.lock.lock();
+ try {
+ int freeHeaderIx = freeList.listHead;
+ while (freeHeaderIx >= 0) {
+ boolean reserved = false;
+ if (ctx.remainingToFind > 0) {
+ int headerToFreeIx = getBuddyHeaderIx(mergeListIx, freeHeaderIx);
+ long reserveResult = reserveBlockContents(mergeListIx, headerToFreeIx,
+ ctx.victimHeaders, ctx.victimCount, true);
+ int reservedCount = getFirstInt(reserveResult), moveSize = getSecondInt(reserveResult);
+ reserved = (moveSize != FAILED_TO_RESERVE);
+ if (!reserved) {
+ // We cannot abandon the attempt here; the concurrent operations might have released
+ // all the buffer comprising our buddy block, necessitating a merge into a higher
+ // list. That may deadlock with another thread locking its own victims (one can only
+ // take list locks separately, or moving DOWN). The alternative would be to release
+ // the free list lock before reserving, however iterating the list that way is
+ // difficult (we'd have to keep track of things on the main path to avoid re-trying
+ // the same headers repeatedly - we'd rather keep track of extra things on failure).
+ prepareAbandonUnfinishedMoveAttempt(ctx, reservedCount);
+ } else {
+ ctx.victimCount += reservedCount;
+ ctx.addBaseHeader(freeHeaderIx);
+ }
+ }
+ int nextFreeHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(freeHeaderIx));
+ if (reserved) {
+ removeBlockFromFreeList(freeList, freeHeaderIx, mergeListIx);
+ if (assertsEnabled) {
+ checkHeader(freeHeaderIx, mergeListIx, false);
+ }
+ setHeaderNoBufAlloc(freeHeaderIx, mergeListIx, CasLog.Src.NEW_BASE);
+ }
+ if (ctx.remainingToFind == 0) break;
+ freeHeaderIx = nextFreeHeaderIx;
+ }
+ } finally {
+ freeList.lock.unlock();
+ }
+ // See the above. Release the headers after unlocking.
+ for (int i = 0; i < ctx.abandonedCount; ++i) {
+ abandonOneHeaderBeingMoved(ctx.abandonedHeaders[i], CasLog.Src.ABANDON_AT_END);
+ }
+ ctx.abandonedCount = 0;
+ }
+
+ /**
+ * Saves the victim headers from a failed reserve into a separate array into the context.
+ * See the comment at the call site; this is to prevent deadlocks.
+ * @param startIx the victim count before this reserve was started.
+ */
+ private void prepareAbandonUnfinishedMoveAttempt(DiscardContext ctx, int count) {
+ if (count == 0) return; // Nothing to do.
+ int startIx = ctx.victimCount, start;
+ if (ctx.abandonedHeaders == null) {
+ start = 0;
+ ctx.abandonedHeaders = new int[count];
+ } else {
+ start = ctx.abandonedCount;
+ int newLen = start + count;
+ if (newLen > ctx.abandonedHeaders.length) {
+ ctx.abandonedHeaders = Arrays.copyOf(ctx.abandonedHeaders, newLen);
+ }
+ }
+ System.arraycopy(ctx.victimHeaders, startIx, ctx.abandonedHeaders, start, count);
+ ctx.abandonedCount += count;
+ ctx.victimCount = startIx;
+ }
+
+ /**
+ * Reserve all the contents of a particular block to merge them together.
+ * @param freeListIx The list to which the hypothetical block belongs.
+ * @param freeHeaderIx The header of the base block.
+ */
+ private long reserveBlockContents(int freeListIx, int headerToFreeIx,
+ int[] victimHeaders, int victimsOffset, boolean isDiscard) {
+ // Try opportunistically for the common case - the same-sized, allocated buddy.
+ if (enableDefragShortcut) {
+ LlapAllocatorBuffer buffer = buffers[headerToFreeIx];
+ byte header = headers[headerToFreeIx];
+ if (buffer != null && freeListFromHeader(header) == freeListIx) {
+ // Double-check the header under lock.
+ FreeList freeList = freeLists[freeListIx];
+ freeList.lock.lock();
+ try {
+ // Noone can take this buffer out and thus change the level after we lock, and if
+ // they take it out before we lock, then we will fail to lock (same as
+ // prepareOneHeaderForMove).
+ if (headers[headerToFreeIx] == header
+ && buffer.startMoveOrDiscard(arenaIx, headerToFreeIx, isDiscard)) {
+ if (assertsEnabled) {
+ assertBufferLooksValid(freeListIx, buffer, arenaIx, headerToFreeIx);
+ CasLog.logMove(arenaIx, headerToFreeIx, System.identityHashCode(buffer));
+ }
+ victimHeaders[victimsOffset] = headerToFreeIx;
+ return makeIntPair(1, buffer.allocSize);
+ }
+ } finally {
+ freeList.lock.unlock();
+ }
+ // We don't bail on failure - try in detail below.
+ }
+ }
+ // Examine the buddy block and its sub-blocks in detail.
+ long[] stack = new long[freeListIx + 1]; // Can never have more than this in elements.
+ int stackSize = 1;
+ // Seed with the buddy of this block (so the first iteration would target this block).
+ stack[0] = makeIntPair(freeListIx, getBuddyHeaderIx(freeListIx, headerToFreeIx));
+ int victimCount = 0;
+ int totalMoveSize = 0;
+ // We traverse the leaf nodes of the tree. The stack entries indicate the existing leaf
+ // nodes that we need to see siblings for, and sibling levels.
+ while (stackSize > 0) {
+ --stackSize;
+ long next = stack[stackSize];
+ int listLevel = getFirstInt(next); // This is not an actual list; see intermList.
+ int sourceHeaderIx = getSecondInt(next);
+ // Find the buddy of the header at list level. We don't know what list it is actually in.
+ int levelBuddyHeaderIx = getBuddyHeaderIx(listLevel, sourceHeaderIx);
+ // First, handle the actual thing we found.
+ long result = prepareOneHeaderForMove(levelBuddyHeaderIx, isDiscard, freeListIx);
+ if (result == -1) {
+ // We have failed to reserve a single header. Do not undo the previous ones here,
+ // the caller has to handle this to avoid races.
+ return makeIntPair(victimCount, FAILED_TO_RESERVE);
+ }
+ int allocSize = getFirstInt(result);
+ totalMoveSize += allocSize;
+ victimHeaders[victimsOffset + victimCount] = levelBuddyHeaderIx;
+ ++victimCount;
+ // Explaining this would really require a picture. Basically if the level is lower than
+ // our level, that means (imagine a tree) we are on the leftmost leaf node of the sub-tree
+ // under our sibling in the tree. So we'd need to look at the buddies of that leftmost leaf
+ // block on all the intermediate levels (aka all intermediate levels of the tree between
+ // this guy and our sibling). Including its own buddy on its own level. And so on for every
+ // sub-tree where our buddy is not on the same level as us (i.e. does not cover the entire
+ // sub-tree).
+ int actualBuddyListIx = getSecondInt(result);
+ for (int intermListIx = listLevel - 1; intermListIx >= actualBuddyListIx; --intermListIx) {
+ stack[stackSize++] = makeIntPair(intermListIx, levelBuddyHeaderIx);
+ }
+ }
+ return makeIntPair(victimCount, totalMoveSize);
+ }
+
+ /**
+ * Abandons the move attempt for a single header that may be free or allocated.
+ */
+ private void abandonOneHeaderBeingMoved(int headerIx, CasLog.Src src) {
+ byte header = headers[headerIx];
+ int freeListIx = freeListFromHeader(header);
+ if ((header & 1) != 1) failWithLog("Victim header not in use");
+ LlapAllocatorBuffer buf = buffers[headerIx];
+ if (buf != null) {
+ // Note: no location check; the buffer is always locked for move here.
+ if (assertsEnabled) {
+ assertBufferLooksValid(freeListIx, buf, arenaIx, headerIx);
+ }
+ cancelDiscard(buf, arenaIx, headerIx);
+ } else {
+ if (assertsEnabled) {
+ checkHeader(headerIx, -1, true);
+ }
+ addToFreeListWithMerge(headerIx, freeListIx, null, src);
+ }
+ }
+
+ /**
+ * Prepares victimHeaderIx header to be moved - locks if it's allocated, takes out of
+ * the free list if not.
+ * @return the list level to which the header belongs if this succeeded, -1 if not.
+ */
+ private long prepareOneHeaderForMove(int victimHeaderIx, boolean isDiscard, int maxListIx) {
+ byte header = headers[victimHeaderIx];
+ if (header == 0) return -1;
+ int freeListIx = freeListFromHeader(header);
+ if (freeListIx > maxListIx) {
+ // This can only come from a brute force discard; for now we don't discard blocks larger
+ // than the target block. We could discard it and add remainder to free lists.
+ // By definition if we are fragmented there should be a smaller buffer somewhere.
+ return -1;
+ }
+ if (buffers[victimHeaderIx] == null && (header & 1) == 1) {
+ return -1; // There's no buffer and another move is reserving this.
+ }
+ FreeList freeList = freeLists[freeListIx];
+ freeList.lock.lock();
+ try {
+ if (headers[victimHeaderIx] != header) {
+ // We bail if there are any changes. Note that we don't care about ABA here - all the
+ // stuff on the left has been taken out already so noone can touch it, and all the stuff
+ // on the right is yet to be seen so we don't care if they changed with this - if it's
+ // in the same free list, the processing sequence will remain the same going right.
+ return -1;
+ }
+ LlapAllocatorBuffer buffer = buffers[victimHeaderIx];
+ if (buffer == null && (header & 1) == 1) {
+ return -1; // The only ABA problem we care about. Ok to have another buffer in there;
+ // not ok to have a location locked by someone else.
+ }
+ int allocSize = 0;
+ if (buffer != null) {
+ // The buffer can only be removed after the removed flag has been set. If we are able to
+ // lock it here, noone can set the removed flag and thus remove it. That would also mean
+ // that the header is not free, and noone will touch the header either.
+ if (!buffer.startMoveOrDiscard(arenaIx, victimHeaderIx, isDiscard)) {
+ return -1;
+ }
+ CasLog.logMove(arenaIx, victimHeaderIx, System.identityHashCode(buffer));
+ allocSize = allocSizeFromFreeList(freeListIx);
+ } else {
+ // Take the empty buffer out of the free list.
+ setHeaderNoBufAlloc(victimHeaderIx, freeListIx, CasLog.Src.EMPTY_V);
+ removeBlockFromFreeList(freeList, victimHeaderIx, freeListIx);
+ }
+ return makeIntPair(allocSize, freeListIx);
+ } finally {
+ freeList.lock.unlock();
+ }
+ }
+
+ /** Allocates into an empty block obtained via a forced eviction. Same args as allocateFast. */
+ public int allocateFromDiscard(MemoryBuffer[] dest, int destIx,
+ int headerIx, int freeListIx, int allocationSize) {
+ LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+ initializeNewlyAllocated(buffer, allocationSize, headerIx, offsetFromHeaderIndex(headerIx));
+ if (assertsEnabled) {
+ checkHeader(headerIx, freeListIx, true);
+ }
+ setHeaderAlloc(headerIx, freeListIx, buffer, CasLog.Src.ALLOC_DEFRAG);
+ return destIx + 1;
+ }
+
+ /** Sets the header at an index to refer to an allocated buffer. */
+ private void setHeaderAlloc(int headerIx, int freeListIx, LlapAllocatorBuffer alloc,
+ CasLog.Src src) {
+ assert alloc != null;
+ headers[headerIx] = makeHeader(freeListIx, true);
+ buffers[headerIx] = alloc;
+ CasLog.logSet(src, arenaIx, headerIx, System.identityHashCode(alloc));
+ }
+
+ /** Sets the header at an index to refer to free space in a certain free list. */
+ private void setHeaderFree(int headerIndex, int freeListIx, CasLog.Src src) {
+ headers[headerIndex] = makeHeader(freeListIx, false);
+ buffers[headerIndex] = null;
+ CasLog.logSetFree(src, arenaIx, headerIndex, allocSizeFromFreeList(freeListIx));
+ }
+
+ /** Sets the header at an index to refer to some space in use, without an allocation. */
+ private void setHeaderNoBufAlloc(int headerIndex, int freeListIx, CasLog.Src src) {
+ headers[headerIndex] = makeHeader(freeListIx, true);
+ CasLog.logSetNb(src, arenaIx, headerIndex, allocSizeFromFreeList(freeListIx));
+ }
+
+ /** Unsets the header at an index (meaning this does not refer to a buffer). */
+ private void unsetHeader(int headerIndex, CasLog.Src src) {
+ headers[headerIndex] = 0;
+ CasLog.logUnset(src, arenaIx, headerIndex, headerIndex);
+ }
+
+ /** Unsets the headers (meaning this does not refer to a buffer). */
+ private void unsetHeaders(int fromHeaderIx, int toHeaderIx, CasLog.Src src) {
+ Arrays.fill(headers, fromHeaderIx, toHeaderIx, (byte)0);
+ CasLog.logUnset(src, arenaIx, fromHeaderIx, toHeaderIx - 1);
+ }
+
+ private void debugDump(StringBuilder result) {
+ result.append("\nArena: ");
+ if (data == null) {
+ result.append(" not allocated");
+ return;
+ }
+ // Try to get as consistent view as we can; make copy of the headers.
+ byte[] headers = new byte[this.headers.length];
+ System.arraycopy(this.headers, 0, headers, 0, headers.length);
+ int allocSize = minAllocation;
+ for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
+ result.append("\n free list for size " + allocSize + ": ");
+ FreeList freeList = freeLists[i];
+ freeList.lock.lock();
+ try {
+ int nextHeaderIx = freeList.listHead;
+ while (nextHeaderIx >= 0) {
+ result.append(nextHeaderIx + ", ");
+ nextHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(nextHeaderIx));
+ }
+ } finally {
+ freeList.lock.unlock();
+ }
+ }
+ for (int i = 0; i < headers.length; ++i) {
+ byte header = headers[i];
+ if (header == 0) continue;
+ int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
+ boolean isFree = buffers[i] == null;
+ result.append("\n block " + i + " at " + offset + ": size "
+ + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
+ }
+ }
+
public Integer debugDumpShort(StringBuilder result) {
if (data == null) {
return null;
@@ -507,78 +1154,67 @@ public final class BuddyAllocator
return total;
}
- public void debugDump(StringBuilder result) {
- result.append("\nArena: ");
+ private void testDump(StringBuilder result) {
+ result.append("{");
if (data == null) {
- result.append(" not allocated");
+ result.append("}, ");
return;
}
// Try to get as consistent view as we can; make copy of the headers.
byte[] headers = new byte[this.headers.length];
System.arraycopy(this.headers, 0, headers, 0, headers.length);
- int allocSize = minAllocation;
- for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
- result.append("\n free list for size " + allocSize + ": ");
- FreeList freeList = freeLists[i];
- freeList.lock.lock();
- try {
- int nextHeaderIx = freeList.listHead;
- while (nextHeaderIx >= 0) {
- result.append(nextHeaderIx + ", ");
- nextHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(nextHeaderIx));
- }
- } finally {
- freeList.lock.unlock();
- }
- }
for (int i = 0; i < headers.length; ++i) {
byte header = headers[i];
if (header == 0) continue;
- int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
- boolean isFree = (header & 1) == 0;
- result.append("\n block " + i + " at " + offset + ": size "
- + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
+ String allocState = ".";
+ if (buffers[i] != null) {
+ allocState = "*"; // Allocated
+ } else if ((header & 1) == 1) {
+ allocState = "!"; // Locked for defrag
+ }
+ int size = 1 << (freeListFromHeader(header) + minAllocLog2);
+ result.append("[").append(size).append(allocState).append("@").append(i).append("]");
}
+ result.append("}, ");
}
- private int freeListFromHeader(byte header) {
- return (header >> 1) - 1;
- }
-
- private int allocateFast(
- int arenaIx, int freeListIx, MemoryBuffer[] dest, int ix, int size) {
+ private int allocateFast(int freeListIx, MemoryBuffer[] dest, long[] destHeaders,
+ int destIx, int destCount, int allocSize) {
if (data == null) return -1; // not allocated yet
FreeList freeList = freeLists[freeListIx];
- if (!freeList.lock.tryLock()) return ix;
+ if (!freeList.lock.tryLock()) return destIx;
try {
- return allocateFromFreeListUnderLock(arenaIx, freeList, freeListIx, dest, ix, size);
+ return allocateFromFreeListUnderLock(
+ freeList, freeListIx, dest, destHeaders, destIx, destCount, allocSize);
} finally {
freeList.lock.unlock();
}
}
- private int allocateWithSplit(int arenaIx, int freeListIx,
- MemoryBuffer[] dest, int ix, int allocationSize) {
+ private int allocateWithSplit(int freeListIx, MemoryBuffer[] dest,
+ long[] destHeaders, int destIx, int destCount, int allocSize, int maxSplitFreeListIx) {
if (data == null) return -1; // not allocated yet
FreeList freeList = freeLists[freeListIx];
int remaining = -1;
freeList.lock.lock();
try {
// Try to allocate from target-sized free list, maybe we'll get lucky.
- ix = allocateFromFreeListUnderLock(
- arenaIx, freeList, freeListIx, dest, ix, allocationSize);
- remaining = dest.length - ix;
- if (remaining == 0) return ix;
+ destIx = allocateFromFreeListUnderLock(
+ freeList, freeListIx, dest, destHeaders, destIx, destCount, allocSize);
+ remaining = destCount - destIx;
+ if (remaining == 0) return destIx;
} finally {
freeList.lock.unlock();
}
- byte headerData = makeHeader(freeListIx, true); // Header for newly allocated used blocks.
int headerStep = 1 << freeListIx; // Number of headers (smallest blocks) per target block.
int splitListIx = freeListIx + 1; // Next free list from which we will be splitting.
// Each iteration of this loop tries to split blocks from one level of the free list into
// target size blocks; if we cannot satisfy the allocation from the free list containing the
// blocks of a particular size, we'll try to split yet larger blocks, until we run out.
- while (remaining > 0 && splitListIx < freeLists.length) {
+ if (maxSplitFreeListIx == -1) {
+ maxSplitFreeListIx = freeLists.length - 1;
+ }
+ while (remaining > 0 && splitListIx <= maxSplitFreeListIx) {
int splitWaysLog2 = (splitListIx - freeListIx);
assert splitWaysLog2 > 0;
int splitWays = 1 << splitWaysLog2; // How many ways each block splits into target size.
@@ -596,35 +1232,39 @@ public final class BuddyAllocator
remaining -= toTake;
lastSplitBlocksRemaining = splitWays - toTake; // Whatever remains.
// Take toTake blocks by splitting the block at offset.
- for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
- headers[headerIx] = headerData;
- // TODO: this could be done out of the lock, we only need to take the blocks out.
- ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
+ for (; toTake > 0; ++destIx, --toTake, headerIx += headerStep, offset += allocSize) {
+ if (assertsEnabled) {
+ checkHeader(headerIx, -1, false); // Cannot validate the list, it may be unset
+ }
+ if (dest != null) {
+ LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+ initializeNewlyAllocated(buffer, allocSize, headerIx, offset);
+ setHeaderAlloc(headerIx, freeListIx, buffer, CasLog.Src.ALLOC_SPLIT_BUF);
+ } else {
+ destHeaders[destIx] = makeIntPair(arenaIx, headerIx);
+ setHeaderNoBufAlloc(headerIx, freeListIx, CasLog.Src.ALLOC_SPLIT_DEFRAG);
+ }
}
lastSplitNextHeader = headerIx; // If anything remains, this is where it starts.
headerIx = getNextFreeListItem(origOffset);
}
- replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head.
+ replaceListHeadUnderLock(splitList, headerIx, splitListIx); // In the end, update free list head.
} finally {
splitList.lock.unlock();
}
+ CasLog.Src src = (dest != null) ? CasLog.Src.SPLIT_AFTER_BUF : CasLog.Src.SPLIT_AFTER_DEFRAG;
if (remaining == 0) {
// We have just obtained all we needed by splitting some block; now we need
// to put the space remaining from that block into lower free lists.
// We'll put at most one block into each list, since 2 blocks can always be combined
// to make a larger-level block. Each bit in the remaining target-sized blocks count
// is one block in a list offset from target-sized list by bit index.
+ // We do the merges here too, since the block we just allocated could immediately be
+ // moved out, then the resulting free space abandoned.
int newListIndex = freeListIx;
while (lastSplitBlocksRemaining > 0) {
if ((lastSplitBlocksRemaining & 1) == 1) {
- FreeList newFreeList = freeLists[newListIndex];
- newFreeList.lock.lock();
- headers[lastSplitNextHeader] = makeHeader(newListIndex, false);
- try {
- addBlockToFreeListUnderLock(newFreeList, lastSplitNextHeader);
- } finally {
- newFreeList.lock.unlock();
- }
+ addToFreeListWithMerge(lastSplitNextHeader, newListIndex, null, src);
lastSplitNextHeader += (1 << newListIndex);
}
lastSplitBlocksRemaining >>>= 1;
@@ -634,10 +1274,16 @@ public final class BuddyAllocator
}
++splitListIx;
}
- return ix;
+ return destIx;
+ }
+
+ private void initializeNewlyAllocated(
+ LlapAllocatorBuffer buffer, int allocSize, int headerIx, int offset) {
+ buffer.initialize(data, offset, allocSize);
+ buffer.setNewAllocLocation(arenaIx, headerIx);
}
- private void replaceListHeadUnderLock(FreeList freeList, int headerIx) {
+ private void replaceListHeadUnderLock(FreeList freeList, int headerIx, int ix) {
if (headerIx == freeList.listHead) return;
if (headerIx >= 0) {
int newHeadOffset = offsetFromHeaderIndex(headerIx);
@@ -655,7 +1301,7 @@ public final class BuddyAllocator
}
if (allocArenaCount > arenaIx) {
// Someone already allocated this arena; just do the usual thing.
- return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+ return allocateWithSplit(freeListIx, dest, null, ix, dest.length, size, -1);
}
if ((arenaIx + 1) == -arenaCount) {
// Someone is allocating this arena. Wait a bit and recheck.
@@ -676,34 +1322,39 @@ public final class BuddyAllocator
continue; // CAS race, look again.
}
assert data == null;
- init();
+ init(arenaIx);
boolean isCommited = allocatedArenas.compareAndSet(-arenaCount - 1, arenaCount + 1);
assert isCommited;
synchronized (this) {
this.notifyAll();
}
metrics.incrAllocatedArena();
- return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+ return allocateWithSplit(freeListIx, dest, null, ix, dest.length, size, -1);
}
}
- public int offsetFromHeaderIndex(int lastSplitNextHeader) {
- return lastSplitNextHeader << minAllocLog2;
- }
-
- public int allocateFromFreeListUnderLock(int arenaIx, FreeList freeList,
- int freeListIx, MemoryBuffer[] dest, int ix, int size) {
+ public int allocateFromFreeListUnderLock(FreeList freeList, int freeListIx,
+ MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount, int allocSize) {
int current = freeList.listHead;
- while (current >= 0 && ix < dest.length) {
- int offset = offsetFromHeaderIndex(current);
- // Noone else has this either allocated or in a different free list; no sync needed.
- headers[current] = makeHeader(freeListIx, true);
+ assert (dest == null) != (destHeaders == null);
+ while (current >= 0 && destIx < destCount) {
+ int offset = offsetFromHeaderIndex(current), allocHeaderIx = current;
current = getNextFreeListItem(offset);
- ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, size);
- ++ix;
+ if (assertsEnabled) {
+ checkHeader(allocHeaderIx, freeListIx, false);
+ }
+ if (dest != null) {
+ LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+ initializeNewlyAllocated(buffer, allocSize, allocHeaderIx, offset);
+ setHeaderAlloc(allocHeaderIx, freeListIx, buffer, CasLog.Src.ALLOC_FREE_BUF);
+ } else {
+ destHeaders[destIx] = makeIntPair(arenaIx, allocHeaderIx);
+ setHeaderNoBufAlloc(allocHeaderIx, freeListIx, CasLog.Src.ALLOC_FREE_DEFRAG);
+ }
+ ++destIx;
}
- replaceListHeadUnderLock(freeList, current);
- return ix;
+ replaceListHeadUnderLock(freeList, current, freeListIx);
+ return destIx;
}
private int getPrevFreeListItem(int offset) {
@@ -714,32 +1365,44 @@ public final class BuddyAllocator
return data.getInt(offset + 4);
}
- private byte makeHeader(int freeListIx, boolean isInUse) {
- return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
+ public void deallocate(LlapAllocatorBuffer buffer, boolean isAfterMove) {
+ assert data != null;
+ int pos = buffer.byteBuffer.position();
+ // Note: this is called by someone who has ensured the buffer is not going to be moved.
+ int headerIx = pos >>> minAllocLog2;
+ int freeListIx = freeListFromAllocSize(buffer.allocSize);
+ if (assertsEnabled && !isAfterMove) {
+ LlapAllocatorBuffer buf = buffers[headerIx];
+ if (buf != buffer) {
+ failWithLog(arenaIx + ":" + headerIx + " => "
+ + toDebugString(buffer) + ", " + toDebugString(buf));
+ }
+ assertBufferLooksValid(freeListFromHeader(headers[headerIx]), buf, arenaIx, headerIx);
+ checkHeader(headerIx, freeListIx, true);
+ }
+ buffers[headerIx] = null;
+ addToFreeListWithMerge(headerIx, freeListIx, buffer, CasLog.Src.DEALLOC);
}
- public void deallocate(LlapDataBuffer buffer) {
- assert data != null;
- int headerIx = buffer.byteBuffer.position() >>> minAllocLog2,
- freeListIx = freeListFromHeader(headers[headerIx]);
- assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2)
- : buffer.allocSize + " " + freeListIx;
+ private void addToFreeListWithMerge(int headerIx, int freeListIx,
+ LlapAllocatorBuffer buffer, CasLog.Src src) {
while (true) {
FreeList freeList = freeLists[freeListIx];
- int bHeaderIx = headerIx ^ (1 << freeListIx);
+ int bHeaderIx = getBuddyHeaderIx(freeListIx, headerIx);
freeList.lock.lock();
try {
if ((freeListIx == freeLists.length - 1)
|| headers[bHeaderIx] != makeHeader(freeListIx, false)) {
// Buddy block is allocated, or it is on higher level of allocation than we are, or we
// have reached the top level. Add whatever we have got to the current free list.
- addBlockToFreeListUnderLock(freeList, headerIx);
- headers[headerIx] = makeHeader(freeListIx, false);
+ addBlockToFreeListUnderLock(freeList, headerIx, freeListIx);
+ setHeaderFree(headerIx, freeListIx, src);
break;
}
// Buddy block is free and in the same free list we have locked. Take it out for merge.
- removeBlockFromFreeList(freeList, bHeaderIx);
- headers[bHeaderIx] = headers[headerIx] = 0; // Erase both headers of the blocks to merge.
+ removeBlockFromFreeList(freeList, bHeaderIx, freeListIx);
+ unsetHeader(bHeaderIx, src); // Erase both headers of the blocks to merge.
+ unsetHeader(headerIx, src);
} finally {
freeList.lock.unlock();
}
@@ -748,7 +1411,8 @@ public final class BuddyAllocator
}
}
- private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx) {
+ private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx, int ix) {
+ CasLog.logAddToList(arenaIx, headerIx, ix, freeList.listHead);
if (freeList.listHead >= 0) {
int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead);
assert getPrevFreeListItem(oldHeadOffset) == -1;
@@ -760,13 +1424,15 @@ public final class BuddyAllocator
freeList.listHead = headerIx;
}
- private void removeBlockFromFreeList(FreeList freeList, int headerIx) {
+ private void removeBlockFromFreeList(FreeList freeList, int headerIx, int ix) {
int bOffset = offsetFromHeaderIndex(headerIx),
bpHeaderIx = getPrevFreeListItem(bOffset), bnHeaderIx = getNextFreeListItem(bOffset);
+ CasLog.logRemoveFromList(arenaIx, headerIx, ix, freeList.listHead);
if (freeList.listHead == headerIx) {
assert bpHeaderIx == -1;
freeList.listHead = bnHeaderIx;
}
+ // Unnecessary: data.putInt(bOffset, -1); data.putInt(bOffset + 4, -1);
if (bpHeaderIx != -1) {
data.putInt(offsetFromHeaderIndex(bpHeaderIx) + 4, bnHeaderIx);
}
@@ -778,20 +1444,341 @@ public final class BuddyAllocator
private static class FreeList {
ReentrantLock lock = new ReentrantLock(false);
- int listHead = -1; // Index of where the buffer is; in minAllocation units
- // TODO: One possible improvement - store blocks arriving left over from splits, and
- // blocks requested, to be able to wait for pending splits and reduce fragmentation.
- // However, we are trying to increase fragmentation now, since we cater to single-size.
+ int listHead = -1; // Index of where the buffer is; in minAllocation units (headers array).
}
@Override
+ @Deprecated
public MemoryBuffer createUnallocated() {
return new LlapDataBuffer();
}
+ // BuddyAllocatorMXBean
+ @Override
+ public boolean getIsDirect() {
+ return isDirect;
+ }
+
+ @Override
+ public int getMinAllocation() {
+ return minAllocation;
+ }
+
+ @Override
+ public int getMaxAllocation() {
+ return maxAllocation;
+ }
+
+ @Override
+ public int getArenaSize() {
+ return arenaSize;
+ }
+
+ @Override
+ public long getMaxCacheSize() {
+ return maxSize;
+ }
+
+ // Various helper methods.
+ private static int getBuddyHeaderIx(int freeListIx, int headerIx) {
+ return headerIx ^ (1 << freeListIx);
+ }
+
+ private static int getNextIx(int ix, int count, int step) {
+ ix += step;
+ assert ix <= count; // We expect the start at 0 and count divisible by step.
+ return ix == count ? 0 : ix;
+ }
+
+ private static int freeListFromHeader(byte header) {
+ return (header >> 1) - 1;
+ }
+
+ private int freeListFromAllocSize(int allocSize) {
+ return (31 - Integer.numberOfLeadingZeros(allocSize)) - minAllocLog2;
+ }
+
+ private int allocSizeFromFreeList(int freeListIx) {
+ return 1 << (freeListIx + minAllocLog2);
+ }
+
+ public int offsetFromHeaderIndex(int lastSplitNextHeader) {
+ return lastSplitNextHeader << minAllocLog2;
+ }
+
+ private static byte makeHeader(int freeListIx, boolean isInUse) {
+ return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
+ }
+
+ private int determineFreeListForAllocation(int size) {
+ int freeListIx = 31 - Integer.numberOfLeadingZeros(size);
+ if (size != (1 << freeListIx)) ++freeListIx; // not a power of two, add one more
+ return Math.max(freeListIx - minAllocLog2, 0);
+ }
+
+ // Utility methods used to store pairs of ints as long.
+ private static long makeIntPair(int first, int second) {
+ return ((long)first) << 32 | second;
+ }
+ private static int getFirstInt(long result) {
+ return (int) (result >>> 32);
+ }
+ private static int getSecondInt(long result) {
+ return (int) (result & ((1L << 32) - 1));
+ }
+
+ // Debug/test related methods.
+ private void assertBufferLooksValid(
+ int freeListIx, LlapAllocatorBuffer buf, int arenaIx, int headerIx) {
+ if (buf.allocSize == allocSizeFromFreeList(freeListIx)) return;
+ failWithLog("Race; allocation size " + buf.allocSize + ", not "
+ + allocSizeFromFreeList(freeListIx) + " for free list "
+ + freeListIx + " at " + arenaIx + ":" + headerIx);
+ }
+
+ private static String toDebugString(LlapAllocatorBuffer buffer) {
+ return buffer == null ? "null" : buffer.toDebugString();
+ }
+
+ private void checkHeaderByte(
+ int arenaIx, int headerIx, int freeListIx, boolean isLocked, byte header) {
+ if (isLocked != ((header & 1) == 1)) {
+ failWithLog("Expected " + arenaIx + ":" + headerIx + " "
+ + (isLocked ? "" : "not ") + "locked: " + header);
+ }
+ if (freeListIx < 0) return;
+ if (freeListFromHeader(header) != freeListIx) {
+ failWithLog("Expected " + arenaIx + ":" + headerIx + " in list " + freeListIx
+ + ": " + freeListFromHeader(header));
+ }
+ }
+
+ @VisibleForTesting
+ void disableDefragShortcutForTest() {
+ this.enableDefragShortcut = false;
+ }
+
+ @VisibleForTesting
+ void setOomLoggingForTest(boolean b) {
+ this.oomLogging = b;
+ }
+
+ @VisibleForTesting
+ String testDump() {
+ StringBuilder sb = new StringBuilder();
+ for (Arena a : arenas) {
+ a.testDump(sb);
+ }
+ return sb.toString();
+ }
+
@Override
public String debugDumpForOom() {
return "\nALLOCATOR STATE:\n" + debugDumpForOomInternal()
+ "\nPARENT STATE:\n" + memoryManager.debugDumpForOom();
}
+
+ private String debugDumpForOomInternal() {
+ StringBuilder sb = new StringBuilder();
+ for (Arena a : arenas) {
+ a.debugDump(sb);
+ }
+ return sb.toString();
+ }
+
+ private void failWithLog(String string) {
+ CasLog.logError();
+ throw new AssertionError(string);
+ }
+
+ @VisibleForTesting
+ public void dumpTestLog() {
+ if (CasLog.casLog != null) {
+ CasLog.casLog.dumpLog(true);
+ }
+ }
+
+ private final static class CasLog {
+ // TODO: enable this for production debug, switching between two small buffers?
+ private final static CasLog casLog = null; //new CasLog();
+ public enum Src {
+ NEWLY_CLEARED,
+ SPLIT_AFTER_BUF,
+ SPLIT_AFTER_DEFRAG,
+ ALLOC_SPLIT_DEFRAG,
+ ALLOC_SPLIT_BUF,
+ ALLOC_DEFRAG,
+ EMPTY_V,
+ NEW_BASE,
+ CTOR,
+ MOVE_TO_NESTED,
+ MOVE_TO_ALLOC,
+ ABANDON_MOVE,
+ ABANDON_AT_END,
+ ABANDON_BASE,
+ CLEARED_BASE,
+ CLEARED_VICTIM,
+ UNUSABLE_NESTED,
+ ABANDON_NESTED,
+ DEALLOC,
+ ALLOC_FREE_DEFRAG,
+ ALLOC_FREE_BUF
+ }
+ private final int size;
+ private final long[] log;
+ private final AtomicInteger offset = new AtomicInteger(0);
+
+
+ public CasLog() {
+ size = 50000000;
+ log = new long[size];
+ }
+
+ public static final int START_MOVE = 0, SET_NB = 1, SET_BUF = 2, SET_FREE = 3,
+ ADD_TO_LIST = 4, REMOVE_FROM_LIST = 5, ERROR = 6, UNSET = 7;
+
+ public static void logMove(int arenaIx, int buddyHeaderIx, int identityHashCode) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(3) - 3;
+ casLog.log[ix] = makeIntPair(START_MOVE, identityHashCode);
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, buddyHeaderIx);
+ }
+
+ public static void logSetNb(CasLog.Src src, int arenaIx, int headerIndex, int size) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(SET_NB, src.ordinal());
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+ casLog.log[ix + 3] = size;
+ }
+
+ public static void logSetFree(CasLog.Src src, int arenaIx, int headerIndex, int size) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(SET_FREE, src.ordinal());
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+ casLog.log[ix + 3] = size;
+ }
+
+ public static void logUnset(CasLog.Src src, int arenaIx, int from, int to) {
+ if (casLog == null) return;
+ if (from > to) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(UNSET, src.ordinal());
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, from);
+ casLog.log[ix + 3] = makeIntPair(arenaIx, to);
+ }
+
+ public static void logSet(CasLog.Src src, int arenaIx, int headerIndex, int identityHashCode) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(SET_BUF, src.ordinal());
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+ casLog.log[ix + 3] = identityHashCode;
+ }
+
+ public static void logRemoveFromList(int arenaIx, int headerIx, int listIx, int listHead) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(REMOVE_FROM_LIST, listIx);
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIx);
+ casLog.log[ix + 3] = listHead;
+ }
+
+ public static void logAddToList(int arenaIx, int headerIx, int listIx, int listHead) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(ADD_TO_LIST, listIx);
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIx);
+ casLog.log[ix + 3] = listHead;
+ }
+
+ public static void logError() {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(2) - 2;
+ casLog.log[ix] = makeIntPair(ERROR, 0);
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ }
+
+ private int dumpOneLine(int ix) {
+ int event = getFirstInt(log[ix]);
+ switch (event) {
+ case START_MOVE: {
+ LlapIoImpl.LOG.info(prefix(ix) + " started to move "
+ + header(log[ix + 2]) + " " + Integer.toHexString(getSecondInt(log[ix])));
+ return ix + 3;
+ }
+ case SET_NB: {
+ LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+ + header(log[ix + 2]) + " to taken of size " + log[ix + 3]);
+ return ix + 4;
+ }
+ case SET_FREE: {
+ LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+ + header(log[ix + 2]) + " to free of size " + log[ix + 3]);
+ return ix + 4;
+ }
+ case UNSET: {
+ LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " unset ["
+ + header(log[ix + 2]) + ", " + header(log[ix + 3]) + "]");
+ return ix + 4;
+ }
+ case SET_BUF: {
+ LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+ + header(log[ix + 2]) + " to " + Integer.toHexString((int)log[ix + 3]));
+ return ix + 4;
+ }
+ case ADD_TO_LIST: {
+ //LlapIoImpl.LOG.info(prefix(ix) + " adding " + header(log[ix + 2]) + " to "
+ // + getSecondInt(log[ix]) + " before " + log[ix + 3]);
+ return ix + 4;
+ }
+ case REMOVE_FROM_LIST: {
+ // LlapIoImpl.LOG.info(prefix(ix) + " removing " + header(log[ix + 2]) + " from "
+ // + getSecondInt(log[ix]) + " head " + log[ix + 3]);
+ return ix + 4;
+ }
+ case ERROR: {
+ LlapIoImpl.LOG.error(prefix(ix) + " failed");
+ return ix + 2;
+ }
+ default: throw new AssertionError("Unknown " + event);
+ }
+ }
+
+ private String prefix(int ix) {
+ return ix + " thread-" + log[ix + 1];
+ }
+
+ private String src(int val) {
+ return Src.values()[val].name();
+ }
+
+ private String header(long l) {
+ return getFirstInt(l) + ":" + getSecondInt(l);
+ }
+
+
+ public synchronized void dumpLog(boolean doSleep) {
+ if (doSleep) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ int logSize = (int)offset.get();
+ int ix = 0;
+ while (ix < logSize) {
+ ix = dumpOneLine(ix);
+ }
+ offset.set(0);
+ }
+ }
}
[2/3] hive git commit: HIVE-16233 : llap: Query failed with
AllocatorOutOfMemoryException (Sergey Shelukhin,
reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
new file mode 100644
index 0000000..1490241
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
@@ -0,0 +1,396 @@
+package org.apache.hadoop.hive.llap.cache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * We want to have cacheable and non-allocator buffers, as well as allocator buffers with no
+ * cache dependency, and also ones that are both. Alas, we could only achieve this if we were
+ * using a real programming language.
+ */
+public abstract class LlapAllocatorBuffer extends LlapCacheableBuffer implements MemoryBuffer {
+ private final AtomicLong state = new AtomicLong(0);
+
+ public ByteBuffer byteBuffer;
+ /** Allocator uses this to remember the allocation size. Somewhat redundant with header. */
+ public int allocSize;
+
+ public void initialize(ByteBuffer byteBuffer, int offset, int length) {
+ this.byteBuffer = byteBuffer.slice();
+ this.byteBuffer.position(offset);
+ this.byteBuffer.limit(offset + length);
+ this.allocSize = length;
+ }
+
+ public void initializeWithExistingSlice(ByteBuffer byteBuffer, int allocSize) {
+ this.byteBuffer = byteBuffer;
+ this.allocSize = allocSize;
+ }
+
+ public void setNewAllocLocation(int arenaIx, int headerIx) {
+ assert state.get() == 0 : "New buffer state is not 0 " + this;
+ long newState = State.setFlag(State.setLocation(0, arenaIx, headerIx), State.FLAG_NEW_ALLOC);
+ if (!state.compareAndSet(0, newState)) {
+ throw new AssertionError("Contention on the new buffer " + this);
+ }
+ }
+
+ @Override
+ public ByteBuffer getByteBufferDup() {
+ return byteBuffer.duplicate();
+ }
+
+ @Override
+ public ByteBuffer getByteBufferRaw() {
+ return byteBuffer;
+ }
+
+ @Override
+ public long getMemoryUsage() {
+ return allocSize;
+ }
+
+ public int incRef() {
+ return incRefInternal(true);
+ }
+
+ int tryIncRef() {
+ return incRefInternal(false);
+ }
+
+ static final int INCREF_EVICTED = -1, INCREF_FAILED = -2;
+ private int incRefInternal(boolean doWait) {
+ long newValue = -1;
+ while (true) {
+ long oldValue = state.get();
+ if (State.hasFlags(oldValue, State.FLAG_EVICTED)) return INCREF_EVICTED;
+ if (State.hasFlags(oldValue, State.FLAG_MOVING)) {
+ if (!doWait || !waitForState()) return INCREF_FAILED; // Thread is being interrupted.
+ continue;
+ }
+ int oldRefCount = State.getRefCount(oldValue);
+ assert oldRefCount >= 0 : "oldValue is " + oldValue + " " + this;
+ if (oldRefCount == State.MAX_REFCOUNT) throw new AssertionError(this);
+ newValue = State.incRefCount(oldValue);
+ if (State.hasFlags(oldValue, State.FLAG_NEW_ALLOC)) {
+ // Remove new-alloc flag on first use. Full unlock after that would imply force-discarding
+ // this buffer is acceptable. This is kind of an ugly compact between the cache and us.
+ newValue = State.switchFlag(newValue, State.FLAG_NEW_ALLOC);
+ }
+ if (state.compareAndSet(oldValue, newValue)) break;
+ }
+ int newRefCount = State.getRefCount(newValue);
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Locked {}; new ref count {}", this, newRefCount);
+ }
+ return newRefCount;
+ }
+
+ @VisibleForTesting
+ int getRefCount() {
+ return State.getRefCount(state.get());
+ }
+
+ @VisibleForTesting
+ @Override
+ public boolean isLocked() {
+ // Best-effort check. We cannot do a good check against caller thread, since
+ // refCount could still be > 0 if someone else locked. This is used for asserts and logs.
+ return State.getRefCount(state.get()) > 0;
+ }
+
+ @VisibleForTesting
+ public boolean isInvalid() {
+ return State.hasFlags(state.get(), State.FLAG_EVICTED);
+ }
+
+
+ public int decRef() {
+ long newState, oldState;
+ do {
+ oldState = state.get();
+ newState = State.decRefCount(oldState);
+ } while (!state.compareAndSet(oldState, newState));
+ int newRefCount = State.getRefCount(newState);
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Unlocked {}; refcount {}", this, newRefCount);
+ }
+ if (newRefCount < 0) {
+ throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
+ }
+ return newRefCount;
+ }
+
+
+ /**
+ * Invalidates the cached buffer. The memory allocation in memory manager is managed by the
+ * caller, and therefore we mark the buffer as having released the memory too.
+ * @return Whether the we can invalidate; false if locked or already evicted.
+ */
+ @Override
+ public int invalidate() {
+ while (true) {
+ long oldValue = state.get();
+ if (State.getRefCount(oldValue) != 0) return INVALIDATE_FAILED;
+ if (State.hasFlags(oldValue, State.FLAG_EVICTED)) return INVALIDATE_ALREADY_INVALID;
+ long newValue = State.setFlag(oldValue, State.FLAG_EVICTED | State.FLAG_MEM_RELEASED);
+ if (state.compareAndSet(oldValue, newValue)) break;
+ }
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Invalidated {} due to eviction", this);
+ }
+ return INVALIDATE_OK;
+ }
+
+ /**
+ * Invalidates the uncached buffer. The memory allocation in memory manager is managed by the
+ * allocator; we will only mark it as released if there are no concurrent moves. In that case,
+ * the caller of this will release the memory; otherwise, the end of move will release.
+ * @return Arena index, if the buffer memory can be released; -1 otherwise.
+ */
+ public int invalidateAndRelease() {
+ boolean result;
+ long oldValue, newValue;
+ do {
+ result = false;
+ oldValue = state.get();
+ if (State.getRefCount(oldValue) != 0) {
+ throw new AssertionError("Refcount is " + State.getRefCount(oldValue));
+ }
+ if (State.hasFlags(oldValue, State.FLAG_EVICTED)) {
+ return -1; // Concurrent force-eviction - ignore.
+ }
+ newValue = State.setFlag(oldValue, State.FLAG_EVICTED);
+ if (!State.hasFlags(oldValue, State.FLAG_MOVING)) {
+ // No move pending, the allocator can release.
+ newValue = State.setFlag(newValue, State.FLAG_REMOVED | State.FLAG_MEM_RELEASED);
+ result = true;
+ }
+ } while (!state.compareAndSet(oldValue, newValue));
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Invalidated {} due to direct deallocation", this);
+ }
+ // Arena cannot change after we have marked it as released.
+ return result ? State.getArena(oldValue) : -1;
+ }
+
+ /**
+ * Marks previously invalidated buffer as released.
+ * @return Whether the buffer memory can be released.
+ */
+ public int releaseInvalidated() {
+ long oldValue, newValue;
+ do {
+ oldValue = state.get();
+ if (!State.hasFlags(oldValue, State.FLAG_EVICTED)) {
+ throw new AssertionError("Not invalidated");
+ }
+ if (State.hasFlags(oldValue, State.FLAG_MOVING | State.FLAG_REMOVED)) return -1;
+ // No move pending and no intervening discard, the allocator can release.
+ newValue = State.setFlag(oldValue, State.FLAG_REMOVED);
+ } while (!state.compareAndSet(oldValue, newValue));
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Removed {}", this);
+ }
+ // Arena cannot change after we have marked it as released.
+ return State.getArena(oldValue);
+ }
+
+ private boolean waitForState() {
+ synchronized (state) {
+ try {
+ state.wait(10);
+ return true;
+ } catch (InterruptedException e) {
+ LlapIoImpl.LOG.debug("Buffer incRef is deffering an interrupt");
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ long state = this.state.get();
+ int flags = State.getAllFlags(state);
+ return "0x" + Integer.toHexString(System.identityHashCode(this)) + "("
+ + State.getRefCount(state) + (flags == 0 ? "" : (", "
+ + State.toFlagString(flags))) + ")";
+ }
+
+
+ public String toDebugString() {
+ return toDebugString(state.get());
+ }
+
+ private String toDebugString(long state) {
+ return "0x" + Integer.toHexString(System.identityHashCode(this)) + "("
+ + State.getArena(state) + ":" + State.getHeader(state) + "; " + allocSize + "; "
+ + State.toFlagString(State.getAllFlags(state)) + ")";
+ }
+
+ public boolean startMoveOrDiscard(int arenaIx, int headerIx, boolean isForceDiscard) {
+ long oldValue, newValue;
+ do {
+ oldValue = state.get();
+ if (State.getRefCount(oldValue) != 0) {
+ return false;
+ }
+ int flags = State.getAllFlags(oldValue);
+ // The only allowed flag is new-alloc, and that only if we are not discarding.
+ if (flags != 0 && (isForceDiscard || flags != State.FLAG_NEW_ALLOC)) {
+ return false; // We could start a move if it's being evicted, but let's not do it for now.
+ }
+ if (State.getArena(oldValue) != arenaIx || State.getHeader(oldValue) != headerIx) {
+ return false; // The caller could re-check the location, but would probably find it locked.
+ }
+ newValue = State.setFlag(oldValue, State.FLAG_MOVING);
+ } while (!state.compareAndSet(oldValue, newValue));
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Locked {} in preparation for a move", this);
+ }
+ return true;
+ }
+
+ /**
+ * @return null if no action is required; otherwise, the buffer should be deallocated and,
+ * if the value is true, its memory should be released to the memory manager.
+ */
+ public Boolean cancelDiscard() {
+ long oldValue, newValue;
+ Boolean result;
+ do {
+ oldValue = state.get();
+ assert State.hasFlags(oldValue, State.FLAG_MOVING) : this.toDebugString();
+ newValue = State.switchFlag(oldValue, State.FLAG_MOVING);
+ result = null;
+ if (State.hasFlags(oldValue, State.FLAG_EVICTED)) {
+ if (State.hasFlags(oldValue, State.FLAG_REMOVED)) {
+ throw new AssertionError("Removed during the move " + this);
+ }
+ result = !State.hasFlags(oldValue, State.FLAG_MEM_RELEASED);
+ // Not necessary here cause noone will be looking at these after us; set them for clarity.
+ newValue = State.setFlag(newValue, State.FLAG_MEM_RELEASED | State.FLAG_REMOVED);
+ }
+ } while (!state.compareAndSet(oldValue, newValue));
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Move ended for {}", this);
+ }
+ synchronized (state) {
+ state.notifyAll();
+ }
+ return result;
+ }
+
+ /**
+ * @return null if no action is required; otherwise, the buffer should be deallocated and,
+ * if the value is true, its memory should be released to the memory manager.
+ */
+ public Boolean endDiscard() {
+ long oldValue, newValue;
+ Boolean result;
+ do {
+ oldValue = state.get();
+ assert State.hasFlags(oldValue, State.FLAG_MOVING);
+ newValue = State.switchFlag(oldValue, State.FLAG_MOVING);
+ newValue = State.setFlag(newValue,
+ State.FLAG_EVICTED | State.FLAG_MEM_RELEASED | State.FLAG_REMOVED);
+ result = null;
+ // See if someone else evicted this in parallel.
+ if (State.hasFlags(oldValue, State.FLAG_EVICTED)) {
+ if (State.hasFlags(oldValue, State.FLAG_REMOVED)) {
+ throw new AssertionError("Removed during the move " + this);
+ }
+ result = !State.hasFlags(oldValue, State.FLAG_MEM_RELEASED);
+ }
+ } while (!state.compareAndSet(oldValue, newValue));
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Discared {}", this);
+ }
+ synchronized (state) {
+ state.notifyAll();
+ }
+ return result;
+ }
+
+ private static final class State {
+ public static final int
+ FLAG_MOVING = 0b00001, // Locked by someone to move or force-evict.
+ FLAG_EVICTED = 0b00010, // Evicted. This is cache-specific.
+ FLAG_REMOVED = 0b00100, // Removed from allocator structures. The final state.
+ FLAG_MEM_RELEASED = 0b01000, // The memory was released to memory manager.
+ FLAG_NEW_ALLOC = 0b10000; // New allocation before the first use; cannot force-evict.
+ private static final int FLAGS_WIDTH = 5,
+ REFCOUNT_WIDTH = 19, ARENA_WIDTH = 16, HEADER_WIDTH = 24;
+
+ public static final long MAX_REFCOUNT = (1 << REFCOUNT_WIDTH) - 1;
+
+ private static final int REFCOUNT_SHIFT = FLAGS_WIDTH,
+ ARENA_SHIFT = REFCOUNT_SHIFT + REFCOUNT_WIDTH, HEADER_SHIFT = ARENA_SHIFT + ARENA_WIDTH;
+
+ private static final long FLAGS_MASK = (1L << FLAGS_WIDTH) - 1,
+ REFCOUNT_MASK = ((1L << REFCOUNT_WIDTH) - 1) << REFCOUNT_SHIFT,
+ ARENA_MASK = ((1L << ARENA_WIDTH) - 1) << ARENA_SHIFT,
+ HEADER_MASK = ((1L << HEADER_WIDTH) - 1) << HEADER_SHIFT;
+
+ public static boolean hasFlags(long value, int flags) {
+ return (value & flags) != 0;
+ }
+
+ public static int getAllFlags(long state) {
+ return (int) (state & FLAGS_MASK);
+ }
+ public static final int getRefCount(long state) {
+ return (int)((state & REFCOUNT_MASK) >>> REFCOUNT_SHIFT);
+ }
+ public static final int getArena(long state) {
+ return (int)((state & ARENA_MASK) >>> ARENA_SHIFT);
+ }
+ public static final int getHeader(long state) {
+ return (int)((state & HEADER_MASK) >>> HEADER_SHIFT);
+ }
+
+ public static final long incRefCount(long state) {
+ // Note: doesn't check for overflow. Could AND with max refcount mask but the caller checks.
+ return state + (1 << REFCOUNT_SHIFT);
+ }
+
+ public static final long decRefCount(long state) {
+ // Note: doesn't check for overflow. Could AND with max refcount mask but the caller checks.
+ return state - (1 << REFCOUNT_SHIFT);
+ }
+
+ public static final long setLocation(long state, int arenaIx, int headerIx) {
+ long arenaVal = ((long)arenaIx) << ARENA_SHIFT, arenaWMask = arenaVal & ARENA_MASK;
+ long headerVal = ((long)headerIx) << HEADER_SHIFT, headerWMask = headerVal & HEADER_MASK;
+ assert arenaVal == arenaWMask : "Arena " + arenaIx + " is wider than " + ARENA_WIDTH;
+ assert headerVal == headerWMask : "Header " + headerIx + " is wider than " + HEADER_WIDTH;
+ return (state & ~(ARENA_MASK | HEADER_MASK)) | arenaWMask | headerWMask;
+ }
+
+ public static final long setFlag(long state, int flags) {
+ assert flags <= FLAGS_MASK;
+ return state | flags;
+ }
+
+ public static final long switchFlag(long state, int flags) {
+ assert flags <= FLAGS_MASK;
+ return state ^ flags;
+ }
+
+ public static String toFlagString(int state) {
+ return StringUtils.leftPad(Integer.toBinaryString(state), REFCOUNT_SHIFT, '0');
+ }
+ }
+
+ @VisibleForTesting
+ int getArenaIndex() {
+ return State.getArena(state.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
index 5c0b6f3..b53d4df 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hive.llap.cache;
/**
* Buffer that can be managed by LowLevelEvictionPolicy.
+ * We want to have cacheable and non-allocator buffers, as well as allocator buffers with no
+ * cache dependency, and also ones that are both. Alas, we could only achieve this if we were
+ * using a real programming language.
*/
public abstract class LlapCacheableBuffer {
protected static final int IN_LIST = -2, NOT_IN_CACHE = -1;
@@ -38,7 +41,8 @@ public abstract class LlapCacheableBuffer {
/** Index in heap for LRFU/LFU cache policies. */
public int indexInHeap = NOT_IN_CACHE;
- protected abstract boolean invalidate();
+ public static final int INVALIDATE_OK = 0, INVALIDATE_FAILED = 1, INVALIDATE_ALREADY_INVALID = 2;
+ protected abstract int invalidate();
public abstract long getMemoryUsage();
public abstract void notifyEvicted(EvictionDispatcher evictionDispatcher);
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index 7d5c101..0c232c8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -18,132 +18,15 @@
package org.apache.hadoop.hive.llap.cache;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public final class LlapDataBuffer extends LlapCacheableBuffer implements MemoryBuffer {
-
- // For now, we don't track refcount for metadata blocks, don't clear them, don't reuse them and
- // basically rely on GC to remove them. So, refcount only applies to data blocks. If that
- // changes, refcount management should be move to LlapCacheableBuffer to be shared.
- private static final int EVICTED_REFCOUNT = -1;
+public final class LlapDataBuffer extends LlapAllocatorBuffer {
public static final int UNKNOWN_CACHED_LENGTH = -1;
- protected final AtomicInteger refCount = new AtomicInteger(0);
- public ByteBuffer byteBuffer;
- /** Allocator uses this to remember which arena to alloc from. */
- public int arenaIndex = -1;
- /** Allocator uses this to remember the allocation size. */
- public int allocSize;
/** ORC cache uses this to store compressed length; buffer is cached uncompressed, but
* the lookup is on compressed ranges, so we need to know this. */
public int declaredCachedLength = UNKNOWN_CACHED_LENGTH;
- public void initialize(
- int arenaIndex, ByteBuffer byteBuffer, int offset, int length) {
- this.byteBuffer = byteBuffer.slice();
- this.byteBuffer.position(offset);
- this.byteBuffer.limit(offset + length);
- this.arenaIndex = arenaIndex;
- this.allocSize = length;
- }
-
- @Override
- public ByteBuffer getByteBufferDup() {
- return byteBuffer.duplicate();
- }
-
- @Override
- public ByteBuffer getByteBufferRaw() {
- return byteBuffer;
- }
-
- @Override
- public long getMemoryUsage() {
- return allocSize;
- }
-
@Override
public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
evictionDispatcher.notifyEvicted(this);
}
-
- int incRef() {
- int newRefCount = -1;
- while (true) {
- int oldRefCount = refCount.get();
- if (oldRefCount == EVICTED_REFCOUNT) return -1;
- assert oldRefCount >= 0 : "oldRefCount is " + oldRefCount + " " + this;
- newRefCount = oldRefCount + 1;
- if (refCount.compareAndSet(oldRefCount, newRefCount)) break;
- }
- if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
- LlapIoImpl.LOCKING_LOGGER.trace("Locked {}; new ref count {}", this, newRefCount);
- }
- return newRefCount;
- }
-
- @VisibleForTesting
- int getRefCount() {
- return refCount.get();
- }
-
- @VisibleForTesting
- @Override
- public boolean isLocked() {
- // Best-effort check. We cannot do a good check against caller thread, since
- // refCount could still be > 0 if someone else locked. This is used for asserts and logs.
- return refCount.get() > 0;
- }
-
- @VisibleForTesting
- public boolean isInvalid() {
- return refCount.get() == EVICTED_REFCOUNT;
- }
-
- int decRef() {
- int newRefCount = refCount.decrementAndGet();
- if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
- LlapIoImpl.LOCKING_LOGGER.trace("Unlocked {}; refcount {}", this, newRefCount);
- }
- if (newRefCount < 0) {
- throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
- }
- return newRefCount;
- }
-
- /**
- * @return Whether the we can invalidate; false if locked or already evicted.
- */
- @Override
- public boolean invalidate() {
- while (true) {
- int value = refCount.get();
- if (value != 0) return false;
- if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break;
- }
- if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
- LlapIoImpl.LOCKING_LOGGER.trace("Invalidated {} due to eviction", this);
- }
- return true;
- }
-
- @Override
- public String toString() {
- int refCount = this.refCount.get();
- return "0x" + Integer.toHexString(System.identityHashCode(this)) + "(" + refCount + ")";
- }
-
- public static String toDataString(MemoryBuffer s) {
- if (s == null || s.getByteBufferRaw().remaining() == 0) return "" + s;
- byte b = s.getByteBufferRaw().get(s.getByteBufferRaw().position());
- int i = (b < 0) ? -b : b;
- return s + " (0x" + Integer.toHexString(i) + ")";
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 7673b5a..cc96d17 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -390,7 +390,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
public static LlapDataBuffer allocateFake() {
LlapDataBuffer fake = new LlapDataBuffer();
- fake.initialize(-1, fakeBuf, 0, 1);
+ fake.initialize(fakeBuf, 0, 1);
return fake;
}
@@ -420,7 +420,6 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
throws InterruptedException {
// Iterate thru the file cache. This is best-effort.
Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.getCache().entrySet().iterator();
- boolean isEmpty = true;
while (subIter.hasNext()) {
long time = -1;
isPastEndTime.value = isPastEndTime.value || ((time = System.nanoTime()) >= endTime);
@@ -428,8 +427,6 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
? 1 : (endTime - time) / (1000000L * leftToCheck));
if (subIter.next().getValue().isInvalid()) {
subIter.remove();
- } else {
- isEmpty = false;
}
--leftToCheck;
}
@@ -478,17 +475,21 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
@Override
public void debugDumpShort(StringBuilder sb) {
sb.append("\nORC cache state ");
- int allLocked = 0, allUnlocked = 0, allEvicted = 0;
+ int allLocked = 0, allUnlocked = 0, allEvicted = 0, allMoving = 0;
for (Map.Entry<Object, FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> e :
cache.entrySet()) {
if (!e.getValue().incRef()) continue;
try {
- int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0;
+ int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0, fileMoving = 0;
if (e.getValue().getCache().isEmpty()) continue;
for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) {
- int newRc = e2.getValue().incRef();
+ int newRc = e2.getValue().tryIncRef();
if (newRc < 0) {
- ++fileEvicted;
+ if (newRc == LlapAllocatorBuffer.INCREF_EVICTED) {
+ ++fileEvicted;
+ } else if (newRc == LlapAllocatorBuffer.INCREF_FAILED) {
+ ++fileMoving;
+ }
continue;
}
try {
@@ -504,13 +505,14 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
allLocked += fileLocked;
allUnlocked += fileUnlocked;
allEvicted += fileEvicted;
- sb.append("\n file " + e.getKey() + ": " + fileLocked + " locked, "
- + fileUnlocked + " unlocked, " + fileEvicted + " evicted");
+ allMoving += fileMoving;
+ sb.append("\n file " + e.getKey() + ": " + fileLocked + " locked, " + fileUnlocked
+ + " unlocked, " + fileEvicted + " evicted, " + fileMoving + " being moved");
} finally {
e.getValue().decRef();
}
}
- sb.append("\nORC cache summary: " + allLocked + " locked, "
- + allUnlocked + " unlocked, " + allEvicted + " evicted");
+ sb.append("\nORC cache summary: " + allLocked + " locked, " + allUnlocked + " unlocked, "
+ + allEvicted + " evicted, " + allMoving + " being moved");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index e30acb0..2c1086b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -99,17 +99,15 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
result = false;
break; // Test code path where we don't do more than one attempt.
}
- didDumpIoState = logEvictionIssue(++badCallCount, didDumpIoState);
- waitTimeMs = Math.min(1000, waitTimeMs << 1);
- assert waitTimeMs > 0;
- try {
- Thread.sleep(waitTimeMs);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+
+ if (isStopped != null && isStopped.get()) {
result = false;
break;
}
- if (isStopped != null && isStopped.get()) {
+ try {
+ Thread.sleep(badCallCount > 9 ? 1000 : (1 << badCallCount));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
result = false;
break;
}
@@ -139,46 +137,6 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
return result;
}
-
- private boolean logEvictionIssue(int badCallCount, boolean didDumpIoState) {
- if (badCallCount <= LOCKING_DEBUG_DUMP_THRESHOLD) return didDumpIoState;
- String ioStateDump = maybeDumpIoState(didDumpIoState);
- if (ioStateDump == null) {
- LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?");
- return didDumpIoState;
- } else {
- LlapIoImpl.LOG.warn("Cannot evict blocks; IO state:\n " + ioStateDump);
- return true;
- }
- }
-
- private String maybeDumpIoState(boolean didDumpIoState) {
- if (didDumpIoState) return null; // No more than once per reader.
- long now = System.nanoTime(), last = lastCacheDumpNs.get();
- while (true) {
- if (last != 0 && (now - last) < LOCKING_DEBUG_DUMP_PERIOD_NS) {
- return null; // We have recently dumped IO state into log.
- }
- if (lastCacheDumpNs.compareAndSet(last, now)) break;
- now = System.nanoTime();
- last = lastCacheDumpNs.get();
- }
- try {
- StringBuilder sb = new StringBuilder();
- memoryDumpRoot.debugDumpShort(sb);
- return sb.toString();
- } catch (Throwable t) {
- return "Failed to dump cache state: " + t.getClass() + " " + t.getMessage();
- }
- }
-
-
- @Override
- public long forceReservedMemory(int allocationSize, int count) {
- if (evictor == null) return 0;
- return evictor.tryEvictContiguousData(allocationSize, count);
- }
-
@Override
public void releaseMemory(final long memoryToRelease) {
long oldV;
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
index fd9d942..acbaf85 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
@@ -27,6 +27,4 @@ public interface LowLevelCachePolicy extends LlapOomDebugDump {
long evictSomeBlocks(long memoryToReserve);
void setEvictionListener(EvictionListener listener);
void setParentDebugDumper(LlapOomDebugDump dumper);
- /** TODO: temporary method until we have a better allocator */
- long tryEvictContiguousData(int allocationSize, int count);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
index 761fd00..e777dc7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
@@ -24,7 +24,6 @@ import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -83,8 +82,9 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
while (evicted < memoryToReserve && iter.hasNext()) {
LlapCacheableBuffer buffer = iter.next();
long memUsage = buffer.getMemoryUsage();
- if (memUsage < minSize || (minSize > 0 && !(buffer instanceof LlapDataBuffer))) continue;
- if (buffer.invalidate()) {
+ if (memUsage < minSize || (minSize > 0
+ && !(buffer instanceof LlapAllocatorBuffer))) continue;
+ if (LlapCacheableBuffer.INVALIDATE_OK == buffer.invalidate()) {
iter.remove();
evicted += memUsage;
evictionListener.notifyEvicted(buffer);
@@ -128,14 +128,4 @@ public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
parentDebugDump.debugDumpShort(sb);
}
}
-
- @Override
- public long tryEvictContiguousData(int allocationSize, int count) {
- long evicted = evictInternal(allocationSize * count, allocationSize);
- int remainingCount = count - (int)(evicted / allocationSize);
- if (remainingCount > 0) {
- evicted += evictInternal(allocationSize * remainingCount, -1);
- }
- return evicted;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 3973c8a..158347a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -18,17 +18,11 @@
package org.apache.hadoop.hive.llap.cache;
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -203,17 +197,6 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
return evicted;
}
- @Override
- public long tryEvictContiguousData(int allocationSize, int count) {
- int evicted = evictDataFromList(allocationSize, count);
- if (count <= evicted) return evicted * allocationSize;
- evicted += evictDataFromHeap(timer.get(), count - evicted, allocationSize);
- long evictedBytes = evicted * allocationSize;
- if (count <= evicted) return evictedBytes;
- evictedBytes += evictSomeBlocks(allocationSize * (count - evicted));
- return evictedBytes;
- }
-
private long evictFromList(long memoryToReserve) {
long evicted = 0;
LlapCacheableBuffer nextCandidate = null, firstCandidate = null;
@@ -224,8 +207,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
try {
nextCandidate = firstCandidate = listTail;
while (evicted < memoryToReserve && nextCandidate != null) {
- if (!nextCandidate.invalidate()) {
- // Locked buffer was in the list - just drop it; will be re-added on unlock.
+ if (LlapCacheableBuffer.INVALIDATE_OK != nextCandidate.invalidate()) {
+ // Locked, or invalidated, buffer was in the list - just drop it;
+ // will be re-added on unlock.
LlapCacheableBuffer lockedBuffer = nextCandidate;
if (firstCandidate == nextCandidate) {
firstCandidate = nextCandidate.prev;
@@ -258,42 +242,6 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
return evicted;
}
-
- private int evictDataFromList(int minSize, int count) {
- int evictedCount = 0;
- // Unlike the normal evictFromList, we don't necessarily evict a sequence of blocks. We won't
- // bother with optimization here and will just evict blocks one by one.
- List<LlapCacheableBuffer> evictedBuffers = new ArrayList<>(count);
- listLock.lock();
- try {
- LlapCacheableBuffer candidate = listTail;
- while (evictedCount < count && candidate != null) {
- LlapCacheableBuffer current = candidate;
- candidate = candidate.prev;
- long memUsage = current.getMemoryUsage();
- if (memUsage < minSize || !(current instanceof LlapDataBuffer)) continue;
- if (!current.invalidate()) {
- // Locked buffer was in the list - just drop it; will be re-added on unlock.
- removeFromListUnderLock(current);
- continue;
- }
- // Remove the buffer from the list.
- removeFromListUnderLock(current);
- // This makes granularity assumptions.
- assert memUsage % minSize == 0;
- evictedCount += (memUsage / minSize);
- evictedBuffers.add(current);
- }
- } finally {
- listLock.unlock();
- }
- for (LlapCacheableBuffer buffer : evictedBuffers) {
- evictionListener.notifyEvicted(buffer);
- }
- return evictedCount;
- }
-
-
// Note: rarely called (unless buffers are very large or we evict a lot, or in LFU case).
private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
while (true) {
@@ -303,38 +251,6 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
}
- // Note: almost never called (unless buffers are very large or we evict a lot, or LFU).
- private int evictDataFromHeap(long time, int count, int minSize) {
- LlapCacheableBuffer evicted = null;
- int evictedCount = 0;
- synchronized (heap) {
- // Priorities go out of the window here.
- int index = 0;
- while (index < heapSize && evictedCount < count) {
- LlapCacheableBuffer buffer = heap[index];
- long memUsage = buffer.getMemoryUsage();
- if (memUsage < minSize || !(buffer instanceof LlapDataBuffer)) {
- ++index;
- continue;
- }
- LlapCacheableBuffer result = evictHeapElementUnderLock(time, index);
- // Don't advance the index - the buffer has been removed either way.
- if (result != null) {
- assert memUsage % minSize == 0;
- evictedCount += (memUsage / minSize);
- if (evicted != null) {
- evictionListener.notifyEvicted(evicted);
- }
- evicted = result;
- }
- }
- }
- if (evicted != null) {
- evictionListener.notifyEvicted(evicted);
- }
- return evictedCount;
- }
-
private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) {
// See heapifyDown comment.
int ix = buffer.indexInHeap;
@@ -360,7 +276,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
--heapSize;
- boolean canEvict = result.invalidate();
+ int invalidateResult = result.invalidate();
+ boolean canEvict = invalidateResult == LlapCacheableBuffer.INVALIDATE_OK;
if (heapSize > 0) {
LlapCacheableBuffer newRoot = heap[heapSize];
newRoot.indexInHeap = ix;
@@ -370,7 +287,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
heapifyDownUnderLock(newRoot, time);
}
- // Otherwise we just removed a locked item from heap; unlock will re-add it, we continue.
+ // Otherwise we just removed a locked/invalid item from heap; we continue.
return canEvict ? result : null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
index e1133cd..ff1f269 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
@@ -23,7 +23,5 @@ import java.util.concurrent.atomic.AtomicBoolean;
public interface MemoryManager extends LlapOomDebugDump {
void releaseMemory(long memUsage);
void updateMaxSize(long maxSize);
- /** TODO: temporary method until we get a better allocator. */
- long forceReservedMemory(int allocationSize, int count);
void reserveMemory(long memoryToReserve, AtomicBoolean isStopped);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index cd5bc9b..7751845 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.hive.llap.cache;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -43,7 +43,7 @@ import org.apache.orc.OrcProto.ColumnEncoding;
import com.google.common.base.Function;
-public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
+public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
private final Allocator allocator;
private final AtomicInteger newEvictions = new AtomicInteger(0);
@@ -53,6 +53,14 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
private final long cleanupInterval;
private final LlapDaemonCacheMetrics metrics;
+ public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer {
+ public boolean isCached = false;
+ @Override
+ public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
+ evictionDispatcher.notifyEvicted(this);
+ }
+ }
+
private static final class StripeInfoComparator implements
Comparator<StripeData> {
@Override
@@ -129,7 +137,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
private final long rowCount;
private final OrcProto.ColumnEncoding[] encodings;
- private LlapDataBuffer[][][] data; // column index, stream type, buffers
+ private LlapSerDeDataBuffer[][][] data; // column index, stream type, buffers
public StripeData(long knownTornStart, long firstStart, long lastStart, long lastEnd,
long rowCount, ColumnEncoding[] encodings) {
@@ -139,7 +147,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
this.lastEnd = lastEnd;
this.encodings = encodings;
this.rowCount = rowCount;
- this.data = encodings == null ? null : new LlapDataBuffer[encodings.length][][];
+ this.data = encodings == null ? null : new LlapSerDeDataBuffer[encodings.length][][];
}
@Override
@@ -172,7 +180,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
return encodings;
}
- public LlapDataBuffer[][][] getData() {
+ public LlapSerDeDataBuffer[][][] getData() {
return data;
}
@@ -191,18 +199,18 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
}
}
- public static String toString(LlapDataBuffer[][][] data) {
+ public static String toString(LlapSerDeDataBuffer[][][] data) {
if (data == null) return "null";
StringBuilder sb = new StringBuilder("[");
for (int i = 0; i < data.length; ++i) {
- LlapDataBuffer[][] colData = data[i];
+ LlapSerDeDataBuffer[][] colData = data[i];
if (colData == null) {
sb.append("null, ");
continue;
}
sb.append("colData [");
for (int j = 0; j < colData.length; ++j) {
- LlapDataBuffer[] streamData = colData[j];
+ LlapSerDeDataBuffer[] streamData = colData[j];
if (streamData == null) {
sb.append("null, ");
continue;
@@ -220,19 +228,18 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
}
- public static String toString(LlapDataBuffer[][] data) {
+ public static String toString(LlapSerDeDataBuffer[][] data) {
if (data == null) return "null";
StringBuilder sb = new StringBuilder("[");
for (int j = 0; j < data.length; ++j) {
- LlapDataBuffer[] streamData = data[j];
+ LlapSerDeDataBuffer[] streamData = data[j];
if (streamData == null) {
sb.append("null, ");
continue;
}
sb.append("[");
for (int k = 0; k < streamData.length; ++k) {
- LlapDataBuffer s = streamData[k];
- sb.append(LlapDataBuffer.toDataString(s));
+ sb.append(streamData[k]);
}
sb.append("], ");
}
@@ -405,11 +412,11 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
continue;
}
stripe.encodings[colIx] = cStripe.encodings[colIx];
- LlapDataBuffer[][] cColData = cStripe.data[colIx];
+ LlapSerDeDataBuffer[][] cColData = cStripe.data[colIx];
assert cColData != null;
for (int streamIx = 0;
cColData != null && streamIx < cColData.length; ++streamIx) {
- LlapDataBuffer[] streamData = cColData[streamIx];
+ LlapSerDeDataBuffer[] streamData = cColData[streamIx];
// Note: this relies on the fact that we always evict the entire column, so if
// we have the column data, we assume we have all the streams we need.
if (streamData == null) continue;
@@ -471,7 +478,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
}
}
- private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) {
+ private boolean lockBuffer(LlapSerDeDataBuffer buffer, boolean doNotifyPolicy) {
int rc = buffer.incRef();
if (rc > 0) {
metrics.incrCacheNumLockedBuffers();
@@ -561,28 +568,28 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
private void lockAllBuffersForPut(StripeData si, Priority priority) {
for (int i = 0; i < si.data.length; ++i) {
- LlapDataBuffer[][] colData = si.data[i];
+ LlapSerDeDataBuffer[][] colData = si.data[i];
if (colData == null) continue;
for (int j = 0; j < colData.length; ++j) {
- LlapDataBuffer[] streamData = colData[j];
+ LlapSerDeDataBuffer[] streamData = colData[j];
if (streamData == null) continue;
for (int k = 0; k < streamData.length; ++k) {
boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet
assert canLock;
cachePolicy.cache(streamData[k], priority);
- streamData[k].declaredCachedLength = streamData[k].getByteBufferRaw().remaining();
+ streamData[k].isCached = true;
}
}
}
}
private void handleRemovedStripeInfo(StripeData removed) {
- for (LlapDataBuffer[][] colData : removed.data) {
+ for (LlapSerDeDataBuffer[][] colData : removed.data) {
handleRemovedColumnData(colData);
}
}
- private void handleRemovedColumnData(LlapDataBuffer[][] removed) {
+ private void handleRemovedColumnData(LlapSerDeDataBuffer[][] removed) {
// TODO: could we tell the policy that we don't care about these and have them evicted? or we
// could just deallocate them when unlocked, and free memory + handle that in eviction.
// For now, just abandon the blocks - eventually, they'll get evicted.
@@ -603,7 +610,7 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
&& !to.encodings[colIx].equals(from.encodings[colIx])) {
throw new RuntimeException("Different encodings at " + colIx + ": " + from + "; " + to);
}
- LlapDataBuffer[][] fromColData = from.data[colIx];
+ LlapSerDeDataBuffer[][] fromColData = from.data[colIx];
if (fromColData != null) {
if (to.data[colIx] != null) {
// Note: we assume here that the data that was returned to the caller from cache will not
@@ -616,12 +623,22 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
}
}
- private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) {
+ @Override
+ public void decRefBuffer(MemoryBuffer buffer) {
+ unlockBuffer((LlapSerDeDataBuffer)buffer, true);
+ }
+
+ @Override
+ public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+ for (MemoryBuffer b : cacheBuffers) {
+ unlockBuffer((LlapSerDeDataBuffer)b, true);
+ }
+ }
+
+ private void unlockBuffer(LlapSerDeDataBuffer buffer, boolean handleLastDecRef) {
boolean isLastDecref = (buffer.decRef() == 0);
if (handleLastDecRef && isLastDecref) {
- // This is kind of not pretty, but this is how we detect whether buffer was cached.
- // We would always set this for lookups at put time.
- if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
+ if (buffer.isCached) {
cachePolicy.notifyUnlock(buffer);
} else {
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
@@ -633,13 +650,6 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
metrics.decrCacheNumLockedBuffers();
}
- private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
- public static LlapDataBuffer allocateFake() {
- LlapDataBuffer fake = new LlapDataBuffer();
- fake.initialize(-1, fakeBuf, 0, 1);
- return fake;
- }
-
public final void notifyEvicted(MemoryBuffer buffer) {
newEvictions.incrementAndGet();
}
@@ -664,14 +674,14 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
try {
for (StripeData sd : fd.stripes) {
for (int colIx = 0; colIx < sd.data.length; ++colIx) {
- LlapDataBuffer[][] colData = sd.data[colIx];
+ LlapSerDeDataBuffer[][] colData = sd.data[colIx];
if (colData == null) continue;
boolean hasAllData = true;
for (int j = 0; (j < colData.length) && hasAllData; ++j) {
- LlapDataBuffer[] streamData = colData[j];
+ LlapSerDeDataBuffer[] streamData = colData[j];
if (streamData == null) continue;
for (int k = 0; k < streamData.length; ++k) {
- LlapDataBuffer buf = streamData[k];
+ LlapSerDeDataBuffer buf = streamData[k];
hasAllData = hasAllData && lockBuffer(buf, false);
if (!hasAllData) break;
unlockBuffer(buf, true);
@@ -691,6 +701,18 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
}
@Override
+ public boolean incRefBuffer(MemoryBuffer buffer) {
+ // notifyReused implies that buffer is already locked; it's also called once for new
+ // buffers that are not cached yet. Don't notify cache policy.
+ return lockBuffer(((LlapSerDeDataBuffer)buffer), false);
+ }
+
+ @Override
+ public Allocator getAllocator() {
+ return allocator;
+ }
+
+ @Override
public String debugDumpForOom() {
StringBuilder sb = new StringBuilder("File cache state ");
for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) {
@@ -711,25 +733,29 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
@Override
public void debugDumpShort(StringBuilder sb) {
sb.append("\nSerDe cache state ");
- int allLocked = 0, allUnlocked = 0, allEvicted = 0;
+ int allLocked = 0, allUnlocked = 0, allEvicted = 0, allMoving = 0;
for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) {
if (!e.getValue().incRef()) continue;
try {
FileData fd = e.getValue().getCache();
- int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0;
+ int fileLocked = 0, fileUnlocked = 0, fileEvicted = 0, fileMoving = 0;
sb.append(fd.colCount).append(" columns, ").append(fd.stripes.size()).append(" stripes; ");
for (StripeData stripe : fd.stripes) {
if (stripe.data == null) continue;
for (int i = 0; i < stripe.data.length; ++i) {
- LlapDataBuffer[][] colData = stripe.data[i];
+ LlapSerDeDataBuffer[][] colData = stripe.data[i];
if (colData == null) continue;
for (int j = 0; j < colData.length; ++j) {
- LlapDataBuffer[] streamData = colData[j];
+ LlapSerDeDataBuffer[] streamData = colData[j];
if (streamData == null) continue;
for (int k = 0; k < streamData.length; ++k) {
int newRc = streamData[k].incRef();
if (newRc < 0) {
- ++fileEvicted;
+ if (newRc == LlapAllocatorBuffer.INCREF_EVICTED) {
+ ++fileEvicted;
+ } else if (newRc == LlapAllocatorBuffer.INCREF_FAILED) {
+ ++fileMoving;
+ }
continue;
}
try {
@@ -748,13 +774,14 @@ public class SerDeLowLevelCacheImpl implements LlapOomDebugDump {
allLocked += fileLocked;
allUnlocked += fileUnlocked;
allEvicted += fileEvicted;
- sb.append("\n file " + e.getKey() + ": " + fileLocked + " locked, "
- + fileUnlocked + " unlocked, " + fileEvicted + " evicted");
+ allMoving += fileMoving;
+ sb.append("\n file " + e.getKey() + ": " + fileLocked + " locked, " + fileUnlocked
+ + " unlocked, " + fileEvicted + " evicted, " + fileMoving + " being moved");
} finally {
e.getValue().decRef();
}
}
- sb.append("\nSerDe cache summary: " + allLocked + " locked, "
- + allUnlocked + " unlocked, " + allEvicted + " evicted");
+ sb.append("\nSerDe cache summary: " + allLocked + " locked, " + allUnlocked + " unlocked, "
+ + allEvicted + " evicted, " + allMoving + " being moved");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
index 51eb34e..f5a5f53 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java
@@ -50,23 +50,32 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
}
}
+
@Override
+ @Deprecated
public void allocateMultiple(MemoryBuffer[] dest, int size) {
+ allocateMultiple(dest, size, null);
+ }
+
+ @Override
+ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory) {
for (int i = 0; i < dest.length; ++i) {
- LlapDataBuffer buf = null;
+ LlapAllocatorBuffer buf = null;
if (dest[i] == null) {
- dest[i] = buf = createUnallocated();
+ // Note: this is backward compat only. Should be removed with createUnallocated.
+ dest[i] = buf = (factory != null)
+ ? (LlapAllocatorBuffer)factory.create() : createUnallocated();
} else {
- buf = (LlapDataBuffer)dest[i];
+ buf = (LlapAllocatorBuffer)dest[i];
}
ByteBuffer bb = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
- buf.initialize(0, bb, 0, size);
+ buf.initialize(bb, 0, size);
}
}
@Override
public void deallocate(MemoryBuffer buffer) {
- LlapDataBuffer buf = (LlapDataBuffer)buffer;
+ LlapAllocatorBuffer buf = (LlapAllocatorBuffer)buffer;
ByteBuffer bb = buf.byteBuffer;
buf.byteBuffer = null;
if (!bb.isDirect()) return;
@@ -86,7 +95,8 @@ public final class SimpleAllocator implements Allocator, BuddyAllocatorMXBean {
}
@Override
- public LlapDataBuffer createUnallocated() {
+ @Deprecated
+ public LlapAllocatorBuffer createUnallocated() {
return new LlapDataBuffer();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
index af7cf3d..076c80f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -38,14 +38,14 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
this.metrics = metrics;
}
- private boolean lockBuffer(LlapDataBuffer buffer) {
+ private boolean lockBuffer(LlapAllocatorBuffer buffer) {
int rc = buffer.incRef();
if (rc <= 0) return false;
metrics.incrCacheNumLockedBuffers();
return true;
}
- private void unlockBuffer(LlapDataBuffer buffer) {
+ private void unlockBuffer(LlapAllocatorBuffer buffer) {
if (buffer.decRef() == 0) {
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer);
@@ -57,19 +57,19 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
@Override
public void decRefBuffer(MemoryBuffer buffer) {
- unlockBuffer((LlapDataBuffer)buffer);
+ unlockBuffer((LlapAllocatorBuffer)buffer);
}
@Override
public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
for (MemoryBuffer b : cacheBuffers) {
- unlockBuffer((LlapDataBuffer)b);
+ unlockBuffer((LlapAllocatorBuffer)b);
}
}
@Override
public boolean incRefBuffer(MemoryBuffer buffer) {
- return lockBuffer((LlapDataBuffer)buffer);
+ return lockBuffer((LlapAllocatorBuffer)buffer);
}
@Override
@@ -88,7 +88,7 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
MemoryBuffer[] chunks, long baseOffset, Priority priority,
LowLevelCacheCounters qfCounters) {
for (int i = 0; i < chunks.length; ++i) {
- LlapDataBuffer buffer = (LlapDataBuffer)chunks[i];
+ LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)chunks[i];
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
LlapIoImpl.LOCKING_LOGGER.trace("Locking {} at put time (no cache)", buffer);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 253532a..d107e67 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -116,7 +116,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
OrcMetadataCache metadataCache = null;
LowLevelCache cache = null;
SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed
- BufferUsageManager bufferManager = null;
+ BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null;
boolean isEncodeEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED);
if (useLowLevelCache) {
// Memory manager uses cache policy to trigger evictions, so create the policy first.
@@ -172,12 +172,13 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
cachePolicy.setParentDebugDumper(e);
cacheImpl.startThreads(); // Start the cache threads.
- bufferManager = cacheImpl; // Cache also serves as buffer manager.
+ bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager.
+ bufferManagerGeneric = serdeCache;
} else {
this.allocator = new SimpleAllocator(conf);
memoryDumpRoot = null;
SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
- bufferManager = sbm;
+ bufferManagerOrc = bufferManagerGeneric = sbm;
cache = sbm;
}
// IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
@@ -189,9 +190,9 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
// TODO: this should depends on input format and be in a map, or something.
FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf);
this.orcCvp = new OrcColumnVectorProducer(
- metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics, tracePool);
+ metadataCache, cache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool);
this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer(
- serdeCache, bufferManager, conf, cacheMetrics, ioMetrics, tracePool) : null;
+ serdeCache, bufferManagerGeneric, conf, cacheMetrics, ioMetrics, tracePool) : null;
LOG.info("LLAP IO initialized");
registerMXBeans();
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 655ce83..edd498e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -831,7 +833,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
stripeRgs = new boolean[stripeIxTo - stripeIxFrom][];
}
- private class DataWrapperForOrc implements DataReader, DataCache {
+ private class DataWrapperForOrc implements DataReader, DataCache, BufferObjectFactory {
private final DataReader orcDataReader;
private DataWrapperForOrc(DataWrapperForOrc other) {
@@ -951,6 +953,16 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
return orcDataReader.readStripeFooter(stripe);
}
+
+ @Override
+ public BufferObjectFactory getDataBufferFactory() {
+ return this;
+ }
+
+ @Override
+ public MemoryBuffer create() {
+ return new LlapDataBuffer();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 35d6178..187fa7d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
@@ -46,10 +47,12 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
+import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.FileData;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.LlapSerDeDataBuffer;
import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl.StripeData;
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -94,7 +97,6 @@ import org.apache.orc.OrcFile.Version;
import org.apache.orc.OrcProto;
import org.apache.orc.OrcProto.ColumnEncoding;
import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.OutStream;
import org.apache.orc.PhysicalWriter;
import org.apache.orc.PhysicalWriter.OutputReceiver;
import org.apache.orc.impl.SchemaEvolution;
@@ -151,6 +153,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final SerDeLowLevelCacheImpl cache;
private final BufferUsageManager bufferManager;
+ private final BufferObjectFactory bufferFactory;
private final Configuration daemonConf;
private final FileSplit split;
private List<Integer> columnIds;
@@ -191,6 +194,12 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
throws IOException {
this.cache = cache;
this.bufferManager = bufferManager;
+ this.bufferFactory = new BufferObjectFactory() {
+ @Override
+ public MemoryBuffer create() {
+ return new SerDeLowLevelCacheImpl.LlapSerDeDataBuffer();
+ }
+ };
this.parts = parts;
this.daemonConf = new Configuration(daemonConf);
// Disable dictionary encoding for the writer.
@@ -303,7 +312,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private static String toString(List<MemoryBuffer> data) {
String s = "";
for (MemoryBuffer buffer : data) {
- s += LlapDataBuffer.toDataString(buffer) + ", ";
+ s += buffer + ", ";
}
return s;
}
@@ -331,6 +340,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private CacheStripeData currentStripe;
private final List<CacheStripeData> stripes = new ArrayList<>();
private final BufferUsageManager bufferManager;
+ private final Allocator.BufferObjectFactory bufferFactory;
/**
* For !doesSourceHaveIncludes case, stores global column IDs to verify writer columns.
* For doesSourceHaveIncludes case, stores source column IDs used to map things.
@@ -344,13 +354,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final AtomicBoolean isStopped;
public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds,
- boolean[] writerIncludes, boolean doesSourceHaveIncludes, AtomicBoolean isStopped) {
+ boolean[] writerIncludes, boolean doesSourceHaveIncludes,
+ Allocator.BufferObjectFactory bufferFactory, AtomicBoolean isStopped) {
this.bufferManager = bufferManager;
assert writerIncludes != null; // Taken care of on higher level.
this.writerIncludes = writerIncludes;
this.doesSourceHaveIncludes = doesSourceHaveIncludes;
this.columnIds = columnIds;
this.isStopped = isStopped;
+ this.bufferFactory = bufferFactory;
startStripe();
}
@@ -438,7 +450,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("Creating cache receiver for " + name);
}
- CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name, isStopped);
+ CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, bufferFactory, name, isStopped);
or = cor;
List<CacheOutputReceiver> list = colStreams.get(name.getColumn());
if (list == null) {
@@ -568,6 +580,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private static final class CacheOutputReceiver implements CacheOutput, OutputReceiver {
private final BufferUsageManager bufferManager;
+ private final BufferObjectFactory bufferFactory;
private final StreamName name;
private List<MemoryBuffer> buffers = null;
private int lastBufferPos = -1;
@@ -576,8 +589,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final StoppableAllocator allocator;
public CacheOutputReceiver(
- BufferUsageManager bufferManager, StreamName name, AtomicBoolean isStopped) {
+ BufferUsageManager bufferManager,
+ BufferObjectFactory bufferFactory, StreamName name, AtomicBoolean isStopped) {
this.bufferManager = bufferManager;
+ this.bufferFactory = bufferFactory;
Allocator alloc = bufferManager.getAllocator();
this.allocator = alloc instanceof StoppableAllocator ? (StoppableAllocator) alloc : null;
this.name = name;
@@ -598,13 +613,12 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private void allocateMultiple(MemoryBuffer[] dest, int size) {
if (allocator != null) {
- allocator.allocateMultiple(dest, size, isStopped);
+ allocator.allocateMultiple(dest, size, bufferFactory, isStopped);
} else {
- bufferManager.getAllocator().allocateMultiple(dest, size);
+ bufferManager.getAllocator().allocateMultiple(dest, size, bufferFactory);
}
}
-
@Override
public void output(ByteBuffer buffer) throws IOException {
// TODO: avoid put() by working directly in OutStream?
@@ -629,7 +643,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
if (isNewBuffer) {
MemoryBuffer[] dest = new MemoryBuffer[1];
allocateMultiple(dest, size);
- LlapDataBuffer newBuffer = (LlapDataBuffer)dest[0];
+ LlapSerDeDataBuffer newBuffer = (LlapSerDeDataBuffer)dest[0];
bb = newBuffer.getByteBufferRaw();
lastBufferPos = bb.position();
buffers.add(newBuffer);
@@ -720,10 +734,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private void unlockAllBuffers(StripeData si) {
for (int i = 0; i < si.getData().length; ++i) {
- LlapDataBuffer[][] colData = si.getData()[i];
+ LlapSerDeDataBuffer[][] colData = si.getData()[i];
if (colData == null) continue;
for (int j = 0; j < colData.length; ++j) {
- LlapDataBuffer[] streamData = colData[j];
+ LlapSerDeDataBuffer[] streamData = colData[j];
if (streamData == null) continue;
for (int k = 0; k < streamData.length; ++k) {
bufferManager.decRefBuffer(streamData[k]);
@@ -760,10 +774,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private void lockAllBuffers(StripeData sd) {
for (int i = 0; i < sd.getData().length; ++i) {
- LlapDataBuffer[][] colData = sd.getData()[i];
+ LlapSerDeDataBuffer[][] colData = sd.getData()[i];
if (colData == null) continue;
for (int j = 0; j < colData.length; ++j) {
- LlapDataBuffer[] streamData = colData[j];
+ LlapSerDeDataBuffer[] streamData = colData[j];
if (streamData == null) continue;
for (int k = 0; k < streamData.length; ++k) {
boolean canLock = bufferManager.incRefBuffer(streamData[k]);
@@ -937,7 +951,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
logProcessOneSlice(stripeIx, diskData, cacheData);
ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
- LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+ LlapSerDeDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
long cacheRowCount = cacheData == null ? -1L : cacheData.getRowCount();
SerDeStripeMetadata metadata = new SerDeStripeMetadata(stripeIx);
StripeData sliceToCache = null;
@@ -963,7 +977,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
if (!hasAllData && splitIncludes[colIx]) {
// The column has been read from disk.
List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
- LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
+ LlapSerDeDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
if (streams == null) continue; // Struct column, such as root?
Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
while (iter.hasNext()) {
@@ -1026,7 +1040,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
return true; // Nothing to process.
}
ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
- LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+ LlapSerDeDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
if (cacheData != null) {
// Don't validate column count - no encodings for vectors.
validateCacheAndDisk(cacheData, diskData.getRowCount(), -1, diskData);
@@ -1078,7 +1092,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
if (!splitIncludes[colIx]) continue;
// The column has been read from disk.
List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
- LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
+ LlapSerDeDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
if (streams == null) continue; // Struct column, such as root?
Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
while (iter.hasNext()) {
@@ -1132,28 +1146,28 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
- private static LlapDataBuffer[][] createArrayToCache(
+ private static LlapSerDeDataBuffer[][] createArrayToCache(
StripeData sliceToCache, int colIx, List<CacheWriter.CacheStreamData> streams) {
if (LlapIoImpl.LOG.isTraceEnabled()) {
LlapIoImpl.LOG.trace("Processing streams for column " + colIx + ": " + streams);
}
- LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
- = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+ LlapSerDeDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+ = new LlapSerDeDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
return newCacheDataForCol;
}
private static int setStreamDataToCache(
- LlapDataBuffer[][] newCacheDataForCol, CacheWriter.CacheStreamData stream) {
+ LlapSerDeDataBuffer[][] newCacheDataForCol, CacheWriter.CacheStreamData stream) {
int streamIx = stream.name.getKind().getNumber();
- // This is kinda hacky - we "know" these are LlapDataBuffer-s.
- newCacheDataForCol[streamIx] = stream.data.toArray(new LlapDataBuffer[stream.data.size()]);
+ // This is kinda hacky - we "know" these are LlaSerDeDataBuffer-s.
+ newCacheDataForCol[streamIx] = stream.data.toArray(new LlapSerDeDataBuffer[stream.data.size()]);
return streamIx;
}
- private void processColumnCacheData(LlapDataBuffer[][][] cacheBuffers,
+ private void processColumnCacheData(LlapSerDeDataBuffer[][][] cacheBuffers,
OrcEncodedColumnBatch ecb, int colIx) {
// The column has been obtained from cache.
- LlapDataBuffer[][] colData = cacheBuffers[colIx];
+ LlapSerDeDataBuffer[][] colData = cacheBuffers[colIx];
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("Processing cache data for column " + colIx + ": "
+ SerDeLowLevelCacheImpl.toString(colData));
@@ -1178,8 +1192,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private void discardUncachedBuffers(List<MemoryBuffer> list) {
for (MemoryBuffer buffer : list) {
- boolean isInvalidated = ((LlapDataBuffer)buffer).invalidate();
- assert isInvalidated;
bufferManager.getAllocator().deallocate(buffer);
}
}
@@ -1404,7 +1416,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
// TODO: move this into ctor? EW would need to create CacheWriter then
List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds;
writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
- writer.isOnlyWritingIncludedColumns(), isStopped), daemonConf, split.getPath());
+ writer.isOnlyWritingIncludedColumns(), bufferFactory, isStopped), daemonConf, split.getPath());
if (writer instanceof VectorDeserializeOrcWriter) {
VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer;
asyncWriter.startAsync(new AsyncCacheDataCallback());
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
index c9df7d9..981e52f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java
@@ -228,6 +228,9 @@ class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable {
WriteOperation op = null;
int fallbackMs = 8;
while (true) {
+ // The reason we poll here is that a blocking queue causes the query thread to spend
+ // non-trivial amount of time signaling when an element is added; we'd rather that the
+ // time was wasted on this background thread.
op = queue.poll();
if (op != null) break;
if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins.
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index 20af0d0..dc053ee 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -104,8 +104,8 @@ public class OrcFileEstimateErrors extends LlapCacheableBuffer {
}
@Override
- protected boolean invalidate() {
- return true;
+ protected int invalidate() {
+ return INVALIDATE_OK;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index 2c7a234..b9d7a77 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -131,8 +131,8 @@ public final class OrcFileMetadata extends LlapCacheableBuffer
}
@Override
- protected boolean invalidate() {
- return true; // relies on GC, so it can always be evicted now.
+ protected int invalidate() {
+ return INVALIDATE_OK; // relies on GC, so it can always be evicted now.
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 1f3f7ea..17266bc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -168,8 +168,8 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt
}
@Override
- protected boolean invalidate() {
- return true;
+ protected int invalidate() {
+ return INVALIDATE_OK;
}
@Override