You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/08/24 18:32:07 UTC
[3/3] hive git commit: HIVE-16233 : llap: Query failed with
AllocatorOutOfMemoryException (Sergey Shelukhin,
reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)
HIVE-16233 : llap: Query failed with AllocatorOutOfMemoryException (Sergey Shelukhin, reviewed by Prasanth Jayachandran and Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bd32deb4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bd32deb4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bd32deb4
Branch: refs/heads/branch-2
Commit: bd32deb44b8b9baf96f2cdbbed8d9a58c3ec73d4
Parents: a4b9133
Author: sergey <se...@apache.org>
Authored: Thu Aug 23 14:23:44 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Aug 24 11:26:04 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 10 +-
.../hadoop/hive/llap/cache/BuddyAllocator.java | 1499 +++++++++++++++---
.../hive/llap/cache/LlapAllocatorBuffer.java | 396 +++++
.../hive/llap/cache/LlapCacheableBuffer.java | 6 +-
.../hadoop/hive/llap/cache/LlapDataBuffer.java | 119 +-
.../hive/llap/cache/LowLevelCacheImpl.java | 26 +-
.../llap/cache/LowLevelCacheMemoryManager.java | 54 +-
.../hive/llap/cache/LowLevelCachePolicy.java | 2 -
.../llap/cache/LowLevelFifoCachePolicy.java | 16 +-
.../llap/cache/LowLevelLrfuCachePolicy.java | 95 +-
.../hadoop/hive/llap/cache/MemoryManager.java | 2 -
.../hive/llap/cache/SerDeLowLevelCacheImpl.java | 115 +-
.../hadoop/hive/llap/cache/SimpleAllocator.java | 22 +-
.../hive/llap/cache/SimpleBufferManager.java | 12 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 11 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 14 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 70 +-
.../io/encoded/VectorDeserializeOrcWriter.java | 3 +
.../llap/io/metadata/OrcFileEstimateErrors.java | 4 +-
.../hive/llap/io/metadata/OrcFileMetadata.java | 4 +-
.../llap/io/metadata/OrcStripeMetadata.java | 4 +-
.../hive/llap/cache/TestBuddyAllocator.java | 73 +-
.../cache/TestBuddyAllocatorForceEvict.java | 470 ++++++
.../hive/llap/cache/TestLowLevelCacheImpl.java | 29 +-
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 56 -
.../hive/llap/cache/TestOrcMetadataCache.java | 11 +-
.../ql/io/orc/encoded/EncodedReaderImpl.java | 16 +-
.../ql/io/orc/encoded/StoppableAllocator.java | 3 +-
.../apache/hadoop/hive/common/io/Allocator.java | 17 +
.../apache/hadoop/hive/common/io/DataCache.java | 6 +
30 files changed, 2405 insertions(+), 760 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 53dfdd9..f884eda 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2947,7 +2947,7 @@ public class HiveConf extends Configuration {
"LLAP IO memory usage; 'cache' (the default) uses data and metadata cache with a\n" +
"custom off-heap allocator, 'none' doesn't use either (this mode may result in\n" +
"significant performance degradation)"),
- LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "256Kb", new SizeValidator(),
+ LLAP_ALLOCATOR_MIN_ALLOC("hive.llap.io.allocator.alloc.min", "16Kb", new SizeValidator(),
"Minimum allocation possible from LLAP buddy allocator. Allocations below that are\n" +
"padded to minimum allocation. For ORC, should generally be the same as the expected\n" +
"compression buffer size, or next lowest power of 2. Must be a power of 2."),
@@ -2974,6 +2974,14 @@ public class HiveConf extends Configuration {
LLAP_ALLOCATOR_MAPPED_PATH("hive.llap.io.allocator.mmap.path", "/tmp",
new WritableDirectoryValidator(),
"The directory location for mapping NVDIMM/NVMe flash storage into the ORC low-level cache."),
+ LLAP_ALLOCATOR_DISCARD_METHOD("hive.llap.io.allocator.discard.method", "both",
+ new StringSet("freelist", "brute", "both"),
+ "Which method to use to force-evict blocks to deal with fragmentation:\n" +
+ "freelist - use half-size free list (discards less, but also less reliable); brute -\n" +
+ "brute force, discard whatever we can; both - first try free list, then brute force."),
+ LLAP_ALLOCATOR_DEFRAG_HEADROOM("hive.llap.io.allocator.defrag.headroom", "1Mb",
+ "How much of a headroom to leave to allow allocator more flexibility to defragment.\n" +
+ "The allocator would further cap it to a fraction of total memory."),
LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
"Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f,
http://git-wip-us.apache.org/repos/asf/hive/blob/bd32deb4/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index af9243a..abe3fc8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -34,6 +34,7 @@ import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
+import org.apache.hive.common.util.FixedSizedObjectPool;
public final class BuddyAllocator
implements EvictionAwareAllocator, StoppableAllocator, BuddyAllocatorMXBean, LlapOomDebugDump {
@@ -55,6 +57,8 @@ public final class BuddyAllocator
private final MemoryManager memoryManager;
private static final long MAX_DUMP_INTERVAL_NS = 300 * 1000000000L; // 5 minutes.
private final AtomicLong lastLog = new AtomicLong(-1);
+ private final LlapDaemonCacheMetrics metrics;
+ private static final int MAX_DISCARD_ATTEMPTS = 10, LOG_DISCARD_ATTEMPTS = 5;
// Config settings
private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;
@@ -63,19 +67,24 @@ public final class BuddyAllocator
private final boolean isDirect;
private final boolean isMapped;
private final Path cacheDir;
- private final LlapDaemonCacheMetrics metrics;
+
+ // These are only used for tests.
+ private boolean enableDefragShortcut = true, oomLogging = true;
// We don't know the acceptable size for Java array, so we'll use 1Gb boundary.
// That is guaranteed to fit any maximum allocation.
private static final int MAX_ARENA_SIZE = 1024*1024*1024;
// Don't try to operate with less than MIN_SIZE allocator space, it will just give you grief.
private static final int MIN_TOTAL_MEMORY_SIZE = 64*1024*1024;
+ // Maximum reasonable defragmentation headroom. Mostly kicks in on very small caches.
+ private static final float MAX_DEFRAG_HEADROOM_FRACTION = 0.01f;
private static final FileAttribute<Set<PosixFilePermission>> RWX = PosixFilePermissions
.asFileAttribute(PosixFilePermissions.fromString("rwx------"));
- private static final FileAttribute<Set<PosixFilePermission>> RW_ = PosixFilePermissions
- .asFileAttribute(PosixFilePermissions.fromString("rw-------"));
-
+ private final AtomicLong[] defragCounters;
+ private final boolean doUseFreeListDiscard, doUseBruteDiscard;
+ private final FixedSizedObjectPool<DiscardContext> ctxPool;
+ private final static boolean assertsEnabled = areAssertsEnabled();
public BuddyAllocator(Configuration conf, MemoryManager mm, LlapDaemonCacheMetrics metrics) {
this(HiveConf.getBoolVar(conf, ConfVars.LLAP_ALLOCATOR_DIRECT),
@@ -83,9 +92,16 @@ public final class BuddyAllocator
(int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC),
(int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MAX_ALLOC),
HiveConf.getIntVar(conf, ConfVars.LLAP_ALLOCATOR_ARENA_COUNT),
- getMaxTotalMemorySize(conf),
+ getMaxTotalMemorySize(conf),
+ HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_DEFRAG_HEADROOM),
HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_MAPPED_PATH),
- mm, metrics);
+ mm, metrics, HiveConf.getVar(conf, ConfVars.LLAP_ALLOCATOR_DISCARD_METHOD));
+ }
+
+ private static boolean areAssertsEnabled() {
+ boolean assertsEnabled = false;
+ assert assertsEnabled = true;
+ return assertsEnabled;
}
private static long getMaxTotalMemorySize(Configuration conf) {
@@ -96,29 +112,21 @@ public final class BuddyAllocator
throw new RuntimeException("Allocator space is too small for reasonable operation; "
+ ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname + "=" + maxSize + ", but at least "
+ MIN_TOTAL_MEMORY_SIZE + " is required. If you cannot spare any memory, you can "
- + "disable LLAP IO entirely via " + ConfVars.LLAP_IO_ENABLED.varname + "; or set "
- + ConfVars.LLAP_IO_MEMORY_MODE.varname + " to 'none'");
+ + "disable LLAP IO entirely via " + ConfVars.LLAP_IO_ENABLED.varname);
}
@VisibleForTesting
- public BuddyAllocator(boolean isDirectVal, int minAllocVal, int maxAllocVal, int arenaCount,
- long maxSizeVal, MemoryManager memoryManager, LlapDaemonCacheMetrics metrics) {
- this(isDirectVal, false /*isMapped*/, minAllocVal, maxAllocVal, arenaCount, maxSizeVal,
- null /* mapping path */, memoryManager, metrics);
- }
-
- @VisibleForTesting
- public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal, int maxAllocVal,
- int arenaCount, long maxSizeVal, String mapPath, MemoryManager memoryManager,
- LlapDaemonCacheMetrics metrics) {
+ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal,
+ int maxAllocVal, int arenaCount, long maxSizeVal, long defragHeadroom, String mapPath,
+ MemoryManager memoryManager, LlapDaemonCacheMetrics metrics, String discardMethod) {
isDirect = isDirectVal;
isMapped = isMappedVal;
minAllocation = minAllocVal;
maxAllocation = maxAllocVal;
if (isMapped) {
try {
- cacheDir =
- Files.createTempDirectory(FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
+ cacheDir = Files.createTempDirectory(
+ FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
} catch (IOException ioe) {
// conf validator already checks this, so it will never trigger usually
throw new AssertionError("Configured mmap directory should be writable", ioe);
@@ -126,6 +134,69 @@ public final class BuddyAllocator
} else {
cacheDir = null;
}
+
+ arenaSize = validateAndDetermineArenaSize(arenaCount, maxSizeVal);
+ maxSize = validateAndDetermineMaxSize(maxSizeVal);
+ memoryManager.updateMaxSize(determineMaxMmSize(defragHeadroom, maxSize));
+
+ minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
+ maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
+ arenaSizeLog2 = 63 - Long.numberOfLeadingZeros(arenaSize);
+ maxArenas = (int)(maxSize / arenaSize);
+ arenas = new Arena[maxArenas];
+ for (int i = 0; i < maxArenas; ++i) {
+ arenas[i] = new Arena();
+ }
+ Arena firstArena = arenas[0];
+ firstArena.init(0);
+ allocatedArenas.set(1);
+ this.memoryManager = memoryManager;
+ defragCounters = new AtomicLong[maxAllocLog2 - minAllocLog2 + 1];
+ for (int i = 0; i < defragCounters.length; ++i) {
+ defragCounters[i] = new AtomicLong(0);
+ }
+ this.metrics = metrics;
+ metrics.incrAllocatedArena();
+ boolean isBoth = null == discardMethod || "both".equalsIgnoreCase(discardMethod);
+ doUseFreeListDiscard = isBoth || "freelist".equalsIgnoreCase(discardMethod);
+ doUseBruteDiscard = isBoth || "brute".equalsIgnoreCase(discardMethod);
+ ctxPool = new FixedSizedObjectPool<DiscardContext>(32,
+ new FixedSizedObjectPool.PoolObjectHelper<DiscardContext>() {
+ @Override
+ public DiscardContext create() {
+ return new DiscardContext();
+ }
+ @Override
+ public void resetBeforeOffer(DiscardContext t) {
+ }
+ });
+ }
+
+ public long determineMaxMmSize(long defragHeadroom, long maxMmSize) {
+ if (defragHeadroom > 0) {
+ long maxHeadroom = (long) Math.floor(maxSize * MAX_DEFRAG_HEADROOM_FRACTION);
+ defragHeadroom = Math.min(maxHeadroom, defragHeadroom);
+ LlapIoImpl.LOG.info("Leaving " + defragHeadroom + " of defragmentation headroom");
+ maxMmSize -= defragHeadroom;
+ }
+ return maxMmSize;
+ }
+
+ public long validateAndDetermineMaxSize(long maxSizeVal) {
+ if ((maxSizeVal % arenaSize) > 0) {
+ long oldMaxSize = maxSizeVal;
+ maxSizeVal = (maxSizeVal / arenaSize) * arenaSize;
+ LlapIoImpl.LOG.warn("Rounding cache size to " + maxSizeVal + " from " + oldMaxSize
+ + " to be divisible by arena size " + arenaSize);
+ }
+ if ((maxSizeVal / arenaSize) > Integer.MAX_VALUE) {
+ throw new RuntimeException(
+ "Too many arenas needed to allocate the cache: " + arenaSize + ", " + maxSizeVal);
+ }
+ return maxSizeVal;
+ }
+
+ public int validateAndDetermineArenaSize(int arenaCount, long maxSizeVal) {
long arenaSizeVal = (arenaCount == 0) ? MAX_ARENA_SIZE : maxSizeVal / arenaCount;
// The math.min, and the fact that maxAllocation is an int, ensures we don't overflow.
arenaSizeVal = Math.max(maxAllocation, Math.min(arenaSizeVal, MAX_ARENA_SIZE));
@@ -156,91 +227,53 @@ public final class BuddyAllocator
LlapIoImpl.LOG.warn("Rounding arena size to " + arenaSizeVal + " from " + oldArenaSize
+ " to be divisible by allocation size " + maxAllocation);
}
- arenaSize = (int)arenaSizeVal;
- if ((maxSizeVal % arenaSize) > 0) {
- long oldMaxSize = maxSizeVal;
- maxSizeVal = (maxSizeVal / arenaSize) * arenaSize;
- LlapIoImpl.LOG.warn("Rounding cache size to " + maxSizeVal + " from " + oldMaxSize
- + " to be divisible by arena size " + arenaSize);
- }
- if ((maxSizeVal / arenaSize) > Integer.MAX_VALUE) {
- throw new RuntimeException(
- "Too many arenas needed to allocate the cache: " + arenaSize + ", " + maxSizeVal);
- }
- maxSize = maxSizeVal;
- memoryManager.updateMaxSize(maxSize);
- minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
- maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
- arenaSizeLog2 = 63 - Long.numberOfLeadingZeros(arenaSize);
- maxArenas = (int)(maxSize / arenaSize);
- arenas = new Arena[maxArenas];
- for (int i = 0; i < maxArenas; ++i) {
- arenas[i] = new Arena();
- }
- arenas[0].init();
- allocatedArenas.set(1);
- this.memoryManager = memoryManager;
-
- this.metrics = metrics;
- metrics.incrAllocatedArena();
+ return (int)arenaSizeVal;
}
-
@Override
public void allocateMultiple(MemoryBuffer[] dest, int size)
throws AllocatorOutOfMemoryException {
- allocateMultiple(dest, size, null);
+ allocateMultiple(dest, size, null, null);
}
@Override
- public void allocateMultiple(MemoryBuffer[] dest, int size, AtomicBoolean isStopped)
+ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory)
+ throws AllocatorOutOfMemoryException {
+ allocateMultiple(dest, size, factory, null);
+ }
+
+ @Override
+ public void allocateMultiple(MemoryBuffer[] dest, int size, BufferObjectFactory factory, AtomicBoolean isStopped)
throws AllocatorOutOfMemoryException {
assert size > 0 : "size is " + size;
if (size > maxAllocation) {
throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
}
- int freeListIx = 31 - Integer.numberOfLeadingZeros(size);
- if (size != (1 << freeListIx)) ++freeListIx; // not a power of two, add one more
- freeListIx = Math.max(freeListIx - minAllocLog2, 0);
+ int freeListIx = determineFreeListForAllocation(size);
int allocLog2 = freeListIx + minAllocLog2;
int allocationSize = 1 << allocLog2;
// TODO: reserving the entire thing is not ideal before we alloc anything. Interleave?
memoryManager.reserveMemory(dest.length << allocLog2, isStopped);
- int destAllocIx = 0;
+
for (int i = 0; i < dest.length; ++i) {
if (dest[i] != null) continue;
- dest[i] = createUnallocated(); // TODO: pool of objects?
+ // Note: this is backward compat only. Should be removed with createUnallocated.
+ dest[i] = factory != null ? factory.create() : createUnallocated();
}
+
// First try to quickly lock some of the correct-sized free lists and allocate from them.
int arenaCount = allocatedArenas.get();
if (arenaCount < 0) {
arenaCount = -arenaCount - 1; // Next arena is being allocated.
}
+
+ // Note: we might want to be smarter if threadId-s are low and there more arenas than threads.
long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
- {
- int startArenaIx = (int)(threadId % arenaCount), index = startArenaIx;
- do {
- int newDestIx = arenas[index].allocateFast(
- index, freeListIx, dest, destAllocIx, allocationSize);
- if (newDestIx == dest.length) return;
- assert newDestIx != -1;
- destAllocIx = newDestIx;
- if ((++index) == arenaCount) {
- index = 0;
- }
- } while (index != startArenaIx);
- }
-
- // 1) We can get fragmented on large blocks of uncompressed data. The memory might be
- // in there, but it might be in separate small blocks. This is a complicated problem, and
- // several solutions (in order of decreasing ugliness and increasing complexity) are: just
- // ask to evict the exact-sized block (there may be no such block), evict from a particular
- // arena (policy would know allocator internals somewhat), store buffer mapping and ask to
- // evict from specific choice of blocks next to each other or next to already-evicted block,
- // and finally do a compaction (requires a block mapping and complex sync). For now we'd just
- // force-evict some memory and avoid both complexity and ugliness, since large blocks are rare.
- // 2) Fragmentation aside (TODO: and this is a very hacky solution for that),
- // we called reserveMemory so we know that there's memory waiting for us somewhere.
+ int destAllocIx = allocateFast(dest, null, 0, dest.length,
+ freeListIx, allocationSize, (int)(threadId % arenaCount), arenaCount);
+ if (destAllocIx == dest.length) return;
+
+ // We called reserveMemory so we know that there's memory waiting for us somewhere.
// However, we have a class of rare race conditions related to the order of locking/checking of
// different allocation areas. Simple case - say we have 2 arenas, 256Kb available in arena 2.
// We look at arena 1; someone deallocs 256Kb from arena 1 and allocs the same from arena 2;
@@ -255,63 +288,361 @@ public final class BuddyAllocator
// allocator thread (or threads per arena).
// The 2nd one is probably much simpler and will allow us to get rid of a lot of sync code.
// But for now we will just retry. We will evict more each time.
- long forceReserved = 0;
int attempt = 0;
+ boolean isFailed = false;
+ int memoryForceReleased = 0;
try {
+ int discardFailed = 0;
while (true) {
- // Try to split bigger blocks. TODO: again, ideally we would tryLock at least once
- {
- int startArenaIx = (int)((threadId + attempt) % arenaCount), arenaIx = startArenaIx;
- do {
- int newDestIx = arenas[arenaIx].allocateWithSplit(
- arenaIx, freeListIx, dest, destAllocIx, allocationSize);
- if (newDestIx == dest.length) return;
- assert newDestIx != -1;
- destAllocIx = newDestIx;
- if ((++arenaIx) == arenaCount) {
- arenaIx = 0;
- }
- } while (arenaIx != startArenaIx);
- }
+ // Try to split bigger blocks.
+ int startArenaIx = (int)((threadId + attempt) % arenaCount);
+ destAllocIx = allocateWithSplit(dest, null, destAllocIx, dest.length,
+ freeListIx, allocationSize, startArenaIx, arenaCount, -1);
+ if (destAllocIx == dest.length) return;
if (attempt == 0) {
// Try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
- for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) {
- destAllocIx = arenas[arenaIx].allocateWithExpand(
- arenaIx, freeListIx, dest, destAllocIx, allocationSize);
+ destAllocIx = allocateWithExpand(
+ dest, destAllocIx, freeListIx, allocationSize, arenaCount);
+ if (destAllocIx == dest.length) return;
+ }
+
+ // Try to force-evict the fragments of the requisite size.
+ boolean hasDiscardedAny = false;
+ DiscardContext ctx = ctxPool.take();
+ try {
+ // Brute force may discard up to twice as many buffers.
+ int maxListSize = 1 << (doUseBruteDiscard ? freeListIx : (freeListIx - 1));
+ int requiredBlocks = dest.length - destAllocIx;
+ ctx.init(maxListSize, requiredBlocks);
+ // First, try to use the blocks of half size in every arena.
+ if (doUseFreeListDiscard && freeListIx > 0) {
+ discardBlocksBasedOnFreeLists(freeListIx, startArenaIx, arenaCount, ctx);
+ memoryForceReleased += ctx.memoryReleased;
+ hasDiscardedAny = ctx.resultCount > 0;
+ destAllocIx = allocateFromDiscardResult(
+ dest, destAllocIx, freeListIx, allocationSize, ctx);
if (destAllocIx == dest.length) return;
}
+ // Then, try the brute force search for something to throw away.
+ if (doUseBruteDiscard) {
+ ctx.resetResults();
+ discardBlocksBruteForce(freeListIx, startArenaIx, arenaCount, ctx);
+ memoryForceReleased += ctx.memoryReleased;
+ hasDiscardedAny = hasDiscardedAny || (ctx.resultCount > 0);
+ destAllocIx = allocateFromDiscardResult(
+ dest, destAllocIx, freeListIx, allocationSize, ctx);
+
+ if (destAllocIx == dest.length) return;
+ }
+ } finally {
+ ctxPool.offer(ctx);
}
- int numberToForce = (dest.length - destAllocIx) * (attempt + 1);
- long newReserved = memoryManager.forceReservedMemory(allocationSize, numberToForce);
- forceReserved += newReserved;
- if (newReserved == 0) {
- // Cannot force-evict anything, give up.
+ if (hasDiscardedAny) {
+ discardFailed = 0;
+ } else if (++discardFailed > MAX_DISCARD_ATTEMPTS) {
String msg = "Failed to allocate " + size + "; at " + destAllocIx + " out of "
+ dest.length + " (entire cache is fragmented and locked, or an internal issue)";
logOomErrorMessage(msg);
+ isFailed = true;
throw new AllocatorOutOfMemoryException(msg);
}
- if (attempt == 0) {
- LlapIoImpl.LOG.warn("Failed to allocate despite reserved memory; will retry");
- }
++attempt;
}
} finally {
- if (attempt > 4) {
- LlapIoImpl.LOG.warn("Allocation of " + dest.length + " buffers of size " + size
- + " took " + attempt + " attempts to evict enough memory");
+ memoryManager.releaseMemory(memoryForceReleased);
+ if (!isFailed && attempt >= LOG_DISCARD_ATTEMPTS) {
+ LlapIoImpl.LOG.info("Allocation of " + dest.length + " buffers of size " + size + " took "
+ + attempt + " attempts to free enough memory; force-released " + memoryForceReleased);
+ }
+ }
+ }
+
+ /** The context for the forced eviction of buffers. */
+ private static final class DiscardContext {
+ long[] results;
+ int resultCount;
+ int memoryReleased;
+
+ /**
+ * The headers for blocks we've either locked to move (if allocated), or have taken out
+ * of the free lists (if not) so that nobody allocates them while we are freeing space.
+ * All the headers will be from the arena currently being processed.
+ */
+ int[] victimHeaders;
+ int victimCount; // The count of the elements of the above that are set.
+
+ /**
+ * List-based: the base free buffers that will be paired with the space freed from
+ * victimHeaders to create the buffers of allocation size.
+ * Brute force: the buffers (that do not exist as separate buffers) composed of victimHeaders
+ * buffers; the future result buffers.
+ * All the headers will be from the arena currently being processed.
+ */
+ int[] baseHeaders;
+ int baseCount; // The count of the elements of the above that are set.
+
+ /**
+ * How many more results (or base headers) do we need to find?
+ * This object is reused between arenas; this is the only counter that is preserved.
+ */
+ int remainingToFind;
+
+ /** The headers from abandoned moved attempts that cannot yet be returned to the
+ * free lists, or unlocked due to some lock being held and deadlock potential. */
+ int[] abandonedHeaders;
+ int abandonedCount;
+
+ void init(int headersPerOneReq, int reqCount) {
+ resetResults();
+ remainingToFind = reqCount;
+ if (results == null || results.length < reqCount) {
+ results = new long[reqCount];
+ baseHeaders = new int[reqCount];
+ }
+ int maxVictimCount = headersPerOneReq * reqCount;
+ if (victimHeaders == null || victimHeaders.length < maxVictimCount) {
+ victimHeaders = new int[maxVictimCount];
+ }
+ }
+
+ void resetResults() {
+ resetBetweenArenas();
+ resultCount = memoryReleased = 0;
+ }
+
+ void resetBetweenArenas() {
+ // Reset everything for the next arena; assume everything has been cleaned.
+ victimCount = baseCount = abandonedCount = 0;
+ }
+
+ public void addResult(int arenaIx, int freeHeaderIx) {
+ results[resultCount] = makeIntPair(arenaIx, freeHeaderIx);
+ ++resultCount;
+ }
+
+ public void addBaseHeader(int headerIx) {
+ baseHeaders[baseCount] = headerIx;
+ ++baseCount;
+ --remainingToFind;
+ }
+
+ @Override
+ public String toString() {
+ return "[victimHeaders=" + Arrays.toString(victimHeaders) + ", victimCount="
+ + victimCount + ", baseHeaders=" + Arrays.toString(baseHeaders) + ", baseCount="
+ + baseCount + ", remainingToFind=" + remainingToFind + "]";
+ }
+ }
+
+ private void discardBlocksBasedOnFreeLists(
+ int freeListIx, int startArenaIx, int arenaCount, DiscardContext ctx) {
+ defragCounters[freeListIx].incrementAndGet();
+ // The free list level the blocks from which we need to merge.
+ final int mergeListIx = freeListIx - 1;
+
+ // Try to allocate using base-buffer approach from each arena.
+ int arenaIx = startArenaIx;
+ do {
+ Arena arena = arenas[arenaIx];
+ // Reserve blocks in this arena that would empty the sections of requisite size.
+ arena.reserveDiscardBlocksBasedOnFreeList(mergeListIx, ctx);
+ // Discard the blocks.
+ discardFromCtxBasedOnFreeList(arena, ctx, freeListIx);
+
+ if (ctx.remainingToFind == 0) return; // Reserved as much as we needed.
+ ctx.resetBetweenArenas();
+ arenaIx = getNextIx(arenaIx, arenaCount, 1);
+ } while (arenaIx != startArenaIx);
+ }
+
+ private void discardBlocksBruteForce(
+ int freeListIx, int startArenaIx, int arenaCount, DiscardContext ctx) {
+ // We are going to use this counter as a pseudo-random number for the start of the search.
+ // This is to avoid churning at the beginning of the arena all the time.
+ long counter = defragCounters[freeListIx].incrementAndGet();
+ // How many blocks of this size comprise an arena.
+ int positionsPerArena = 1 << (arenaSizeLog2 - (minAllocLog2 + freeListIx));
+ // Compute the pseudo-random position from the above, then derive the actual header.
+ int startHeaderIx = ((int) (counter % positionsPerArena)) << freeListIx;
+
+ // Try to allocate using brute force approach from each arena.
+ int arenaIx = startArenaIx;
+ do {
+ Arena arena = arenas[arenaIx];
+ // Reserve blocks in this arena that would empty the sections of requisite size.
+ arena.reserveDiscardBruteForce(freeListIx, ctx, startHeaderIx);
+ // Discard the blocks.
+ discardFromCtxBruteForce(arena, ctx, freeListIx);
+
+ if (ctx.remainingToFind == 0) return; // Reserved as much as we needed.
+ ctx.resetBetweenArenas();
+ arenaIx = getNextIx(arenaIx, arenaCount, 1);
+ } while (arenaIx != startArenaIx);
+ }
+
+ /**
+ * Frees up memory by deallocating based on base and victim buffers in MoveContext.
+ * @param freeListIx The list for which the blocks are being merged.
+ */
+ private void discardFromCtxBasedOnFreeList(Arena arena, DiscardContext ctx, int freeListIx) {
+ // Discard all the locked blocks.
+ discardAllBuffersFromCtx(arena, ctx);
+ // Finalize the headers.
+ for (int baseIx = ctx.baseCount - 1; baseIx >= 0; --baseIx) {
+ int baseHeaderIx = ctx.baseHeaders[baseIx];
+ int minHeaderIx = Math.min(baseHeaderIx, getBuddyHeaderIx(freeListIx - 1, baseHeaderIx));
+ finalizeDiscardResult(arena, ctx, freeListIx, minHeaderIx);
+ }
+ }
+
+ /**
+ * Frees up memory by deallocating based on base and victim buffers in MoveContext.
+ * @param freeListIx The list for which the blocks are being merged.
+ */
+ private void discardFromCtxBruteForce(Arena arena, DiscardContext ctx, int freeListIx) {
+ // Discard all the locked blocks.
+ discardAllBuffersFromCtx(arena, ctx);
+ // Finalize the headers.
+ for (int baseIx = ctx.baseCount - 1; baseIx >= 0; --baseIx) {
+ finalizeDiscardResult(arena, ctx, freeListIx, ctx.baseHeaders[baseIx]);
+ }
+ }
+
+ /**
+ * Sets the headers correctly for a newly-freed buffer after discarding stuff.
+ */
+ private void finalizeDiscardResult(
+ Arena arena, DiscardContext ctx, int freeListIx, int newlyFreeHeaderIx) {
+ int maxHeaderIx = newlyFreeHeaderIx + (1 << freeListIx);
+ if (assertsEnabled) {
+ arena.checkHeader(newlyFreeHeaderIx, -1, true);
+ }
+ arena.unsetHeaders(newlyFreeHeaderIx + 1, maxHeaderIx, CasLog.Src.CLEARED_VICTIM);
+ // Set the leftmost header of the base and its buddy (that are now being merged).
+ arena.setHeaderNoBufAlloc(newlyFreeHeaderIx, freeListIx, CasLog.Src.NEWLY_CLEARED);
+ ctx.addResult(arena.arenaIx, newlyFreeHeaderIx);
+ }
+
+ /**
+ * Discards all the victim buffers in the context.
+ */
+ private void discardAllBuffersFromCtx(Arena arena, DiscardContext ctx) {
+ for (int victimIx = 0; victimIx < ctx.victimCount; ++victimIx) {
+ int victimHeaderIx = ctx.victimHeaders[victimIx];
+ // Note: no location check here; the buffer is always locked for move.
+ LlapAllocatorBuffer buf = arena.buffers[victimHeaderIx];
+ if (buf == null) continue;
+ if (assertsEnabled) {
+ arena.checkHeader(victimHeaderIx, -1, true);
+ byte header = arena.headers[victimHeaderIx];
+ assertBufferLooksValid(freeListFromHeader(header), buf, arena.arenaIx, victimHeaderIx);
}
- // After we succeed (or fail), release the force-evicted memory to memory manager. We have
- // previously reserved enough to allocate all we need, so we don't take our allocation out
- // of this - as per the comment above, we basically just wasted a bunch of cache (and CPU).
- if (forceReserved > 0) {
- memoryManager.releaseMemory(forceReserved);
+ // We do not modify the header here; the caller will use this space.
+ arena.buffers[victimHeaderIx] = null;
+ long memUsage = buf.getMemoryUsage();
+ Boolean result = buf.endDiscard();
+ if (result == null) {
+ ctx.memoryReleased += memUsage; // We have essentially deallocated this.
+ } else if (result) {
+ // There was a parallel deallocate; it didn't account for the memory.
+ memoryManager.releaseMemory(memUsage);
+ } else {
+ // There was a parallel cache eviction - the evictor is accounting for the memory.
}
}
}
+ /**
+ * Unlocks the buffer after the discard has been abandoned.
+ */
+ private void cancelDiscard(LlapAllocatorBuffer buf, int arenaIx, int headerIx) {
+ Boolean result = buf.cancelDiscard();
+ if (result == null) return;
+ // If the result is not null, the buffer was evicted during the move.
+ if (result) {
+ long memUsage = buf.getMemoryUsage(); // Release memory - simple deallocation.
+ arenas[arenaIx].deallocate(buf, true);
+ memoryManager.releaseMemory(memUsage);
+ } else {
+ arenas[arenaIx].deallocate(buf, true); // No need to release memory - cache eviction.
+ }
+ }
+
+ /**
+ * Tries to allocate destCount - destIx blocks, using best-effort fast allocation.
+ * @param dest Option 1 - memory allocated is stored in these buffers.
+ * @param destHeaders Option 2 - memory allocated is reserved and headers returned via this.
+ * @param destIx The start index in either array where allocations are to be saved.
+ * @param destCount The end index in either array where allocations are to be saved.
+ * @param freeListIx The free list from which to allocate.
+ * @param allocSize Allocation size.
+ * @param startArenaIx From which arena to start allocating.
+ * @param arenaCount The active arena count.
+ * @return The index in the array until which the memory has been allocated.
+ */
+ private int allocateFast(MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount,
+ int freeListIx, int allocSize, int startArenaIx, int arenaCount) {
+ int index = startArenaIx;
+ do {
+ int newDestIx = arenas[index].allocateFast(
+ freeListIx, dest, destHeaders, destIx, destCount, allocSize);
+ if (newDestIx == destCount) return newDestIx;
+ assert newDestIx != -1;
+ destIx = newDestIx;
+ index = getNextIx(index, arenaCount, 1);
+ } while (index != startArenaIx);
+ return destIx;
+ }
+
+ /**
+ * Tries to allocate destCount - destIx blocks by allocating new arenas, if needed. Same args
+ * as allocateFast, except the allocations start at arenaCount (the first unallocated arena).
+ */
+ private int allocateWithExpand(MemoryBuffer[] dest, int destIx,
+ int freeListIx, int allocSize, int arenaCount) {
+ for (int arenaIx = arenaCount; arenaIx < arenas.length; ++arenaIx) {
+ destIx = arenas[arenaIx].allocateWithExpand(
+ arenaIx, freeListIx, dest, destIx, allocSize);
+ if (destIx == dest.length) return destIx;
+ }
+ return destIx;
+ }
+
+ /**
+ * Tries to allocate destCount - destIx blocks, waiting for locks and splitting the larger
+ * blocks if the correct sized blocks are not available. Args the same as allocateFast.
+ */
+ private int allocateWithSplit(MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount,
+ int freeListIx, int allocSize, int startArenaIx, int arenaCount, int maxSplitFreeListIx) {
+ int arenaIx = startArenaIx;
+ do {
+ int newDestIx = arenas[arenaIx].allocateWithSplit(
+ freeListIx, dest, destHeaders, destIx, destCount, allocSize, maxSplitFreeListIx);
+ if (newDestIx == destCount) return newDestIx;
+ assert newDestIx != -1;
+ destIx = newDestIx;
+ arenaIx = getNextIx(arenaIx, arenaCount, 1);
+ } while (arenaIx != startArenaIx);
+ return destIx;
+ }
+
+ /**
+ * Tries to allocate destCount - destIx blocks after the forced eviction of some other buffers.
+ * Args the same as allocateFast.
+ */
+ private int allocateFromDiscardResult(MemoryBuffer[] dest, int destAllocIx,
+ int freeListIx, int allocationSize, DiscardContext discardResult) {
+ for (int i = 0; i < discardResult.resultCount; ++i) {
+ long result = discardResult.results[i];
+ destAllocIx = arenas[getFirstInt(result)].allocateFromDiscard(
+ dest, destAllocIx, getSecondInt(result), freeListIx, allocationSize);
+ }
+ return destAllocIx;
+ }
+
private void logOomErrorMessage(String msg) {
+ if (!oomLogging) return;
while (true) {
long time = System.nanoTime();
long lastTime = lastLog.get();
@@ -321,7 +652,7 @@ public final class BuddyAllocator
continue;
}
if (shouldLog) {
- LlapIoImpl.LOG.error(msg + debugDumpForOom());
+ LlapIoImpl.LOG.error(msg + debugDumpForOomInternal());
} else {
LlapIoImpl.LOG.error(msg);
}
@@ -329,7 +660,7 @@ public final class BuddyAllocator
}
}
- /**
+ /**
* Arbitrarily, we start getting the state from Allocator. Allocator calls MM which calls
* the policies that call the eviction dispatcher that calls the caches. See init - these all
* are connected in a cycle, so we need to make sure the who-calls-whom order is definite.
@@ -337,6 +668,10 @@ public final class BuddyAllocator
@Override
public void debugDumpShort(StringBuilder sb) {
memoryManager.debugDumpShort(sb);
+ sb.append("\nDefrag counters: ");
+ for (int i = 0; i < defragCounters.length; ++i) {
+ sb.append(defragCounters[i].get()).append(", ");
+ }
sb.append("\nAllocator state:");
int unallocCount = 0, fullCount = 0;
long totalFree = 0;
@@ -358,21 +693,24 @@ public final class BuddyAllocator
@Override
public void deallocate(MemoryBuffer buffer) {
- deallocateInternal(buffer, true);
+ LlapAllocatorBuffer buf = (LlapAllocatorBuffer)buffer;
+ int arenaToRelease = buf.invalidateAndRelease();
+ if (arenaToRelease < 0) return; // The block is being moved; the move will release memory.
+ long memUsage = buf.getMemoryUsage();
+ arenas[arenaToRelease].deallocate(buf, false);
+ memoryManager.releaseMemory(memUsage);
}
@Override
public void deallocateEvicted(MemoryBuffer buffer) {
- deallocateInternal(buffer, false);
- }
-
- private void deallocateInternal(MemoryBuffer buffer, boolean doReleaseMemory) {
- LlapDataBuffer buf = (LlapDataBuffer)buffer;
- long memUsage = buf.getMemoryUsage();
- arenas[buf.arenaIndex].deallocate(buf);
- if (doReleaseMemory) {
- memoryManager.releaseMemory(memUsage);
- }
+ LlapAllocatorBuffer buf = (LlapAllocatorBuffer)buffer;
+ assert buf.isInvalid();
+ int arenaToRelease = buf.releaseInvalidated();
+ if (arenaToRelease < 0) return; // The block is being moved; the move will release memory.
+ arenas[arenaToRelease].deallocate(buf, false);
+ // Note: for deallocateEvicted, we do not release the memory to memManager; it may
+ // happen that the evictor tries to use the allowance before the move finishes.
+ // Retrying/more defrag should take care of that.
}
@Override
@@ -380,42 +718,7 @@ public final class BuddyAllocator
return isDirect;
}
- public String debugDumpForOomInternal() {
- StringBuilder result = new StringBuilder(
- "NOTE: with multiple threads the dump is not guaranteed to be consistent");
- for (Arena arena : arenas) {
- arena.debugDump(result);
- }
- return result.toString();
- }
-
- // BuddyAllocatorMXBean
- @Override
- public boolean getIsDirect() {
- return isDirect;
- }
-
- @Override
- public int getMinAllocation() {
- return minAllocation;
- }
-
- @Override
- public int getMaxAllocation() {
- return maxAllocation;
- }
-
- @Override
- public int getArenaSize() {
- return arenaSize;
- }
-
- @Override
- public long getMaxCacheSize() {
- return maxSize;
- }
-
- private ByteBuffer preallocate(int arenaSize) {
+ private ByteBuffer preallocateArenaBuffer(int arenaSize) {
if (isMapped) {
RandomAccessFile rwf = null;
File rf = null;
@@ -445,20 +748,28 @@ public final class BuddyAllocator
}
private class Arena {
+ private static final int FAILED_TO_RESERVE = Integer.MAX_VALUE;
+
+ private int arenaIx;
private ByteBuffer data;
// Avoid storing headers with data since we expect binary size allocations.
// Each headers[i] is a "virtual" byte at i * minAllocation.
- private byte[] headers;
+ private LlapAllocatorBuffer[] buffers;
+ // The TS rule for headers is - a header and buffer array element for some freeList
+ // can only be modified if the corresponding freeList lock is held.
+ private byte[] headers; // Free list indices of each unallocated block, for quick lookup.
private FreeList[] freeLists;
- void init() {
+ void init(int arenaIx) {
+ this.arenaIx = arenaIx;
try {
- data = preallocate(arenaSize);
+ data = preallocateArenaBuffer(arenaSize);
} catch (OutOfMemoryError oom) {
throw new OutOfMemoryError("Cannot allocate " + arenaSize + " bytes: " + oom.getMessage()
+ "; make sure your xmx and process size are set correctly.");
}
int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
+ buffers = new LlapAllocatorBuffer[maxMinAllocs];
headers = new byte[maxMinAllocs];
int allocLog2Diff = maxAllocLog2 - minAllocLog2, freeListCount = allocLog2Diff + 1;
freeLists = new FreeList[freeListCount];
@@ -469,14 +780,350 @@ public final class BuddyAllocator
headerIndex = 0, headerStep = 1 << allocLog2Diff;
freeLists[allocLog2Diff].listHead = 0;
for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
- // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
- headers[headerIndex] = makeHeader(allocLog2Diff, false);
+ setHeaderFree(headerIndex, allocLog2Diff, CasLog.Src.CTOR);
data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerStep));
data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerStep));
headerIndex += headerStep;
}
}
+ public void checkHeader(int headerIx, int freeListIx, boolean isLocked) {
+ checkHeaderByte(arenaIx, headerIx, freeListIx, isLocked, headers[headerIx]);
+ }
+
+ /**
+ * Reserves the blocks to use to merge into larger blocks.
+ * @param freeListIx The free list to reserve for.
+ * @param startHeaderIx The header index at which to start looking (to avoid churn at 0).
+ */
+ public void reserveDiscardBruteForce(int freeListIx, DiscardContext ctx, int startHeaderIx) {
+ if (data == null) return; // not allocated yet
+ int headerStep = 1 << freeListIx;
+ int headerIx = startHeaderIx;
+ do {
+ long reserveResult = reserveBlockContents(
+ freeListIx, headerIx, ctx.victimHeaders, ctx.victimCount, true);
+ int reservedCount = getFirstInt(reserveResult), moveSize = getSecondInt(reserveResult);
+ if (moveSize == FAILED_TO_RESERVE) {
+ for (int i = ctx.victimCount; i < ctx.victimCount + reservedCount; ++i) {
+ abandonOneHeaderBeingMoved(ctx.victimHeaders[i], CasLog.Src.ABANDON_MOVE);
+ }
+ } else {
+ ctx.victimCount += reservedCount;
+ ctx.addBaseHeader(headerIx);
+ }
+ headerIx = getNextIx(headerIx, headers.length, headerStep);
+ } while (ctx.remainingToFind > 0 && headerIx != startHeaderIx);
+ }
+
+ /**
+ * Reserves the blocks to use to merge into larger blocks.
+ * @param mergeListIx The free list to reserve base blocks from.
+ */
+ public void reserveDiscardBlocksBasedOnFreeList(int mergeListIx, DiscardContext ctx) {
+ if (data == null) return; // not allocated yet
+ FreeList freeList = freeLists[mergeListIx];
+ freeList.lock.lock();
+ try {
+ int freeHeaderIx = freeList.listHead;
+ while (freeHeaderIx >= 0) {
+ boolean reserved = false;
+ if (ctx.remainingToFind > 0) {
+ int headerToFreeIx = getBuddyHeaderIx(mergeListIx, freeHeaderIx);
+ long reserveResult = reserveBlockContents(mergeListIx, headerToFreeIx,
+ ctx.victimHeaders, ctx.victimCount, true);
+ int reservedCount = getFirstInt(reserveResult), moveSize = getSecondInt(reserveResult);
+ reserved = (moveSize != FAILED_TO_RESERVE);
+ if (!reserved) {
+ // We cannot abandon the attempt here; the concurrent operations might have released
+ // all the buffer comprising our buddy block, necessitating a merge into a higher
+ // list. That may deadlock with another thread locking its own victims (one can only
+ // take list locks separately, or moving DOWN). The alternative would be to release
+ // the free list lock before reserving, however iterating the list that way is
+ // difficult (we'd have to keep track of things on the main path to avoid re-trying
+ // the same headers repeatedly - we'd rather keep track of extra things on failure).
+ prepareAbandonUnfinishedMoveAttempt(ctx, reservedCount);
+ } else {
+ ctx.victimCount += reservedCount;
+ ctx.addBaseHeader(freeHeaderIx);
+ }
+ }
+ int nextFreeHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(freeHeaderIx));
+ if (reserved) {
+ removeBlockFromFreeList(freeList, freeHeaderIx, mergeListIx);
+ if (assertsEnabled) {
+ checkHeader(freeHeaderIx, mergeListIx, false);
+ }
+ setHeaderNoBufAlloc(freeHeaderIx, mergeListIx, CasLog.Src.NEW_BASE);
+ }
+ if (ctx.remainingToFind == 0) break;
+ freeHeaderIx = nextFreeHeaderIx;
+ }
+ } finally {
+ freeList.lock.unlock();
+ }
+ // See the above. Release the headers after unlocking.
+ for (int i = 0; i < ctx.abandonedCount; ++i) {
+ abandonOneHeaderBeingMoved(ctx.abandonedHeaders[i], CasLog.Src.ABANDON_AT_END);
+ }
+ ctx.abandonedCount = 0;
+ }
+
+ /**
+ * Saves the victim headers from a failed reserve into a separate array into the context.
+ * See the comment at the call site; this is to prevent deadlocks.
+ * @param startIx the victim count before this reserve was started.
+ */
+ private void prepareAbandonUnfinishedMoveAttempt(DiscardContext ctx, int count) {
+ if (count == 0) return; // Nothing to do.
+ int startIx = ctx.victimCount, start;
+ if (ctx.abandonedHeaders == null) {
+ start = 0;
+ ctx.abandonedHeaders = new int[count];
+ } else {
+ start = ctx.abandonedCount;
+ int newLen = start + count;
+ if (newLen > ctx.abandonedHeaders.length) {
+ ctx.abandonedHeaders = Arrays.copyOf(ctx.abandonedHeaders, newLen);
+ }
+ }
+ System.arraycopy(ctx.victimHeaders, startIx, ctx.abandonedHeaders, start, count);
+ ctx.abandonedCount += count;
+ ctx.victimCount = startIx;
+ }
+
+ /**
+ * Reserve all the contents of a particular block to merge them together.
+ * @param freeListIx The list to which the hypothetical block belongs.
+ * @param freeHeaderIx The header of the base block.
+ */
+ private long reserveBlockContents(int freeListIx, int headerToFreeIx,
+ int[] victimHeaders, int victimsOffset, boolean isDiscard) {
+ // Try opportunistically for the common case - the same-sized, allocated buddy.
+ if (enableDefragShortcut) {
+ LlapAllocatorBuffer buffer = buffers[headerToFreeIx];
+ byte header = headers[headerToFreeIx];
+ if (buffer != null && freeListFromHeader(header) == freeListIx) {
+ // Double-check the header under lock.
+ FreeList freeList = freeLists[freeListIx];
+ freeList.lock.lock();
+ try {
+ // Noone can take this buffer out and thus change the level after we lock, and if
+ // they take it out before we lock, then we will fail to lock (same as
+ // prepareOneHeaderForMove).
+ if (headers[headerToFreeIx] == header
+ && buffer.startMoveOrDiscard(arenaIx, headerToFreeIx, isDiscard)) {
+ if (assertsEnabled) {
+ assertBufferLooksValid(freeListIx, buffer, arenaIx, headerToFreeIx);
+ CasLog.logMove(arenaIx, headerToFreeIx, System.identityHashCode(buffer));
+ }
+ victimHeaders[victimsOffset] = headerToFreeIx;
+ return makeIntPair(1, buffer.allocSize);
+ }
+ } finally {
+ freeList.lock.unlock();
+ }
+ // We don't bail on failure - try in detail below.
+ }
+ }
+ // Examine the buddy block and its sub-blocks in detail.
+ long[] stack = new long[freeListIx + 1]; // Can never have more than this in elements.
+ int stackSize = 1;
+ // Seed with the buddy of this block (so the first iteration would target this block).
+ stack[0] = makeIntPair(freeListIx, getBuddyHeaderIx(freeListIx, headerToFreeIx));
+ int victimCount = 0;
+ int totalMoveSize = 0;
+ // We traverse the leaf nodes of the tree. The stack entries indicate the existing leaf
+ // nodes that we need to see siblings for, and sibling levels.
+ while (stackSize > 0) {
+ --stackSize;
+ long next = stack[stackSize];
+ int listLevel = getFirstInt(next); // This is not an actual list; see intermList.
+ int sourceHeaderIx = getSecondInt(next);
+ // Find the buddy of the header at list level. We don't know what list it is actually in.
+ int levelBuddyHeaderIx = getBuddyHeaderIx(listLevel, sourceHeaderIx);
+ // First, handle the actual thing we found.
+ long result = prepareOneHeaderForMove(levelBuddyHeaderIx, isDiscard, freeListIx);
+ if (result == -1) {
+ // We have failed to reserve a single header. Do not undo the previous ones here,
+ // the caller has to handle this to avoid races.
+ return makeIntPair(victimCount, FAILED_TO_RESERVE);
+ }
+ int allocSize = getFirstInt(result);
+ totalMoveSize += allocSize;
+ victimHeaders[victimsOffset + victimCount] = levelBuddyHeaderIx;
+ ++victimCount;
+ // Explaining this would really require a picture. Basically if the level is lower than
+ // our level, that means (imagine a tree) we are on the leftmost leaf node of the sub-tree
+ // under our sibling in the tree. So we'd need to look at the buddies of that leftmost leaf
+ // block on all the intermediate levels (aka all intermediate levels of the tree between
+ // this guy and our sibling). Including its own buddy on its own level. And so on for every
+ // sub-tree where our buddy is not on the same level as us (i.e. does not cover the entire
+ // sub-tree).
+ int actualBuddyListIx = getSecondInt(result);
+ for (int intermListIx = listLevel - 1; intermListIx >= actualBuddyListIx; --intermListIx) {
+ stack[stackSize++] = makeIntPair(intermListIx, levelBuddyHeaderIx);
+ }
+ }
+ return makeIntPair(victimCount, totalMoveSize);
+ }
+
+ /**
+ * Abandons the move attempt for a single header that may be free or allocated.
+ */
+ private void abandonOneHeaderBeingMoved(int headerIx, CasLog.Src src) {
+ byte header = headers[headerIx];
+ int freeListIx = freeListFromHeader(header);
+ if ((header & 1) != 1) failWithLog("Victim header not in use");
+ LlapAllocatorBuffer buf = buffers[headerIx];
+ if (buf != null) {
+ // Note: no location check; the buffer is always locked for move here.
+ if (assertsEnabled) {
+ assertBufferLooksValid(freeListIx, buf, arenaIx, headerIx);
+ }
+ cancelDiscard(buf, arenaIx, headerIx);
+ } else {
+ if (assertsEnabled) {
+ checkHeader(headerIx, -1, true);
+ }
+ addToFreeListWithMerge(headerIx, freeListIx, null, src);
+ }
+ }
+
+ /**
+ * Prepares victimHeaderIx header to be moved - locks if it's allocated, takes out of
+ * the free list if not.
+ * @return the list level to which the header belongs if this succeeded, -1 if not.
+ */
+ private long prepareOneHeaderForMove(int victimHeaderIx, boolean isDiscard, int maxListIx) {
+ byte header = headers[victimHeaderIx];
+ if (header == 0) return -1;
+ int freeListIx = freeListFromHeader(header);
+ if (freeListIx > maxListIx) {
+ // This can only come from a brute force discard; for now we don't discard blocks larger
+ // than the target block. We could discard it and add remainder to free lists.
+ // By definition if we are fragmented there should be a smaller buffer somewhere.
+ return -1;
+ }
+ if (buffers[victimHeaderIx] == null && (header & 1) == 1) {
+ return -1; // There's no buffer and another move is reserving this.
+ }
+ FreeList freeList = freeLists[freeListIx];
+ freeList.lock.lock();
+ try {
+ if (headers[victimHeaderIx] != header) {
+ // We bail if there are any changes. Note that we don't care about ABA here - all the
+ // stuff on the left has been taken out already so noone can touch it, and all the stuff
+ // on the right is yet to be seen so we don't care if they changed with this - if it's
+ // in the same free list, the processing sequence will remain the same going right.
+ return -1;
+ }
+ LlapAllocatorBuffer buffer = buffers[victimHeaderIx];
+ if (buffer == null && (header & 1) == 1) {
+ return -1; // The only ABA problem we care about. Ok to have another buffer in there;
+ // not ok to have a location locked by someone else.
+ }
+ int allocSize = 0;
+ if (buffer != null) {
+ // The buffer can only be removed after the removed flag has been set. If we are able to
+ // lock it here, noone can set the removed flag and thus remove it. That would also mean
+ // that the header is not free, and noone will touch the header either.
+ if (!buffer.startMoveOrDiscard(arenaIx, victimHeaderIx, isDiscard)) {
+ return -1;
+ }
+ CasLog.logMove(arenaIx, victimHeaderIx, System.identityHashCode(buffer));
+ allocSize = allocSizeFromFreeList(freeListIx);
+ } else {
+ // Take the empty buffer out of the free list.
+ setHeaderNoBufAlloc(victimHeaderIx, freeListIx, CasLog.Src.EMPTY_V);
+ removeBlockFromFreeList(freeList, victimHeaderIx, freeListIx);
+ }
+ return makeIntPair(allocSize, freeListIx);
+ } finally {
+ freeList.lock.unlock();
+ }
+ }
+
+ /** Allocates into an empty block obtained via a forced eviction. Same args as allocateFast. */
+ public int allocateFromDiscard(MemoryBuffer[] dest, int destIx,
+ int headerIx, int freeListIx, int allocationSize) {
+ LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+ initializeNewlyAllocated(buffer, allocationSize, headerIx, offsetFromHeaderIndex(headerIx));
+ if (assertsEnabled) {
+ checkHeader(headerIx, freeListIx, true);
+ }
+ setHeaderAlloc(headerIx, freeListIx, buffer, CasLog.Src.ALLOC_DEFRAG);
+ return destIx + 1;
+ }
+
+ /** Sets the header at an index to refer to an allocated buffer. */
+ private void setHeaderAlloc(int headerIx, int freeListIx, LlapAllocatorBuffer alloc,
+ CasLog.Src src) {
+ assert alloc != null;
+ headers[headerIx] = makeHeader(freeListIx, true);
+ buffers[headerIx] = alloc;
+ CasLog.logSet(src, arenaIx, headerIx, System.identityHashCode(alloc));
+ }
+
+ /** Sets the header at an index to refer to free space in a certain free list. */
+ private void setHeaderFree(int headerIndex, int freeListIx, CasLog.Src src) {
+ headers[headerIndex] = makeHeader(freeListIx, false);
+ buffers[headerIndex] = null;
+ CasLog.logSetFree(src, arenaIx, headerIndex, allocSizeFromFreeList(freeListIx));
+ }
+
+ /** Sets the header at an index to refer to some space in use, without an allocation. */
+ private void setHeaderNoBufAlloc(int headerIndex, int freeListIx, CasLog.Src src) {
+ headers[headerIndex] = makeHeader(freeListIx, true);
+ CasLog.logSetNb(src, arenaIx, headerIndex, allocSizeFromFreeList(freeListIx));
+ }
+
+ /** Unsets the header at an index (meaning this does not refer to a buffer). */
+ private void unsetHeader(int headerIndex, CasLog.Src src) {
+ headers[headerIndex] = 0;
+ CasLog.logUnset(src, arenaIx, headerIndex, headerIndex);
+ }
+
+ /** Unsets the headers (meaning this does not refer to a buffer). */
+ private void unsetHeaders(int fromHeaderIx, int toHeaderIx, CasLog.Src src) {
+ Arrays.fill(headers, fromHeaderIx, toHeaderIx, (byte)0);
+ CasLog.logUnset(src, arenaIx, fromHeaderIx, toHeaderIx - 1);
+ }
+
+ private void debugDump(StringBuilder result) {
+ result.append("\nArena: ");
+ if (data == null) {
+ result.append(" not allocated");
+ return;
+ }
+ // Try to get as consistent view as we can; make copy of the headers.
+ byte[] headers = new byte[this.headers.length];
+ System.arraycopy(this.headers, 0, headers, 0, headers.length);
+ int allocSize = minAllocation;
+ for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
+ result.append("\n free list for size " + allocSize + ": ");
+ FreeList freeList = freeLists[i];
+ freeList.lock.lock();
+ try {
+ int nextHeaderIx = freeList.listHead;
+ while (nextHeaderIx >= 0) {
+ result.append(nextHeaderIx + ", ");
+ nextHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(nextHeaderIx));
+ }
+ } finally {
+ freeList.lock.unlock();
+ }
+ }
+ for (int i = 0; i < headers.length; ++i) {
+ byte header = headers[i];
+ if (header == 0) continue;
+ int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
+ boolean isFree = buffers[i] == null;
+ result.append("\n block " + i + " at " + offset + ": size "
+ + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
+ }
+ }
+
public Integer debugDumpShort(StringBuilder result) {
if (data == null) {
return null;
@@ -507,78 +1154,67 @@ public final class BuddyAllocator
return total;
}
- public void debugDump(StringBuilder result) {
- result.append("\nArena: ");
+ private void testDump(StringBuilder result) {
+ result.append("{");
if (data == null) {
- result.append(" not allocated");
+ result.append("}, ");
return;
}
// Try to get as consistent view as we can; make copy of the headers.
byte[] headers = new byte[this.headers.length];
System.arraycopy(this.headers, 0, headers, 0, headers.length);
- int allocSize = minAllocation;
- for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
- result.append("\n free list for size " + allocSize + ": ");
- FreeList freeList = freeLists[i];
- freeList.lock.lock();
- try {
- int nextHeaderIx = freeList.listHead;
- while (nextHeaderIx >= 0) {
- result.append(nextHeaderIx + ", ");
- nextHeaderIx = getNextFreeListItem(offsetFromHeaderIndex(nextHeaderIx));
- }
- } finally {
- freeList.lock.unlock();
- }
- }
for (int i = 0; i < headers.length; ++i) {
byte header = headers[i];
if (header == 0) continue;
- int freeListIx = freeListFromHeader(header), offset = offsetFromHeaderIndex(i);
- boolean isFree = (header & 1) == 0;
- result.append("\n block " + i + " at " + offset + ": size "
- + (1 << (freeListIx + minAllocLog2)) + ", " + (isFree ? "free" : "allocated"));
+ String allocState = ".";
+ if (buffers[i] != null) {
+ allocState = "*"; // Allocated
+ } else if ((header & 1) == 1) {
+ allocState = "!"; // Locked for defrag
+ }
+ int size = 1 << (freeListFromHeader(header) + minAllocLog2);
+ result.append("[").append(size).append(allocState).append("@").append(i).append("]");
}
+ result.append("}, ");
}
- private int freeListFromHeader(byte header) {
- return (header >> 1) - 1;
- }
-
- private int allocateFast(
- int arenaIx, int freeListIx, MemoryBuffer[] dest, int ix, int size) {
+ private int allocateFast(int freeListIx, MemoryBuffer[] dest, long[] destHeaders,
+ int destIx, int destCount, int allocSize) {
if (data == null) return -1; // not allocated yet
FreeList freeList = freeLists[freeListIx];
- if (!freeList.lock.tryLock()) return ix;
+ if (!freeList.lock.tryLock()) return destIx;
try {
- return allocateFromFreeListUnderLock(arenaIx, freeList, freeListIx, dest, ix, size);
+ return allocateFromFreeListUnderLock(
+ freeList, freeListIx, dest, destHeaders, destIx, destCount, allocSize);
} finally {
freeList.lock.unlock();
}
}
- private int allocateWithSplit(int arenaIx, int freeListIx,
- MemoryBuffer[] dest, int ix, int allocationSize) {
+ private int allocateWithSplit(int freeListIx, MemoryBuffer[] dest,
+ long[] destHeaders, int destIx, int destCount, int allocSize, int maxSplitFreeListIx) {
if (data == null) return -1; // not allocated yet
FreeList freeList = freeLists[freeListIx];
int remaining = -1;
freeList.lock.lock();
try {
// Try to allocate from target-sized free list, maybe we'll get lucky.
- ix = allocateFromFreeListUnderLock(
- arenaIx, freeList, freeListIx, dest, ix, allocationSize);
- remaining = dest.length - ix;
- if (remaining == 0) return ix;
+ destIx = allocateFromFreeListUnderLock(
+ freeList, freeListIx, dest, destHeaders, destIx, destCount, allocSize);
+ remaining = destCount - destIx;
+ if (remaining == 0) return destIx;
} finally {
freeList.lock.unlock();
}
- byte headerData = makeHeader(freeListIx, true); // Header for newly allocated used blocks.
int headerStep = 1 << freeListIx; // Number of headers (smallest blocks) per target block.
int splitListIx = freeListIx + 1; // Next free list from which we will be splitting.
// Each iteration of this loop tries to split blocks from one level of the free list into
// target size blocks; if we cannot satisfy the allocation from the free list containing the
// blocks of a particular size, we'll try to split yet larger blocks, until we run out.
- while (remaining > 0 && splitListIx < freeLists.length) {
+ if (maxSplitFreeListIx == -1) {
+ maxSplitFreeListIx = freeLists.length - 1;
+ }
+ while (remaining > 0 && splitListIx <= maxSplitFreeListIx) {
int splitWaysLog2 = (splitListIx - freeListIx);
assert splitWaysLog2 > 0;
int splitWays = 1 << splitWaysLog2; // How many ways each block splits into target size.
@@ -596,35 +1232,39 @@ public final class BuddyAllocator
remaining -= toTake;
lastSplitBlocksRemaining = splitWays - toTake; // Whatever remains.
// Take toTake blocks by splitting the block at offset.
- for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
- headers[headerIx] = headerData;
- // TODO: this could be done out of the lock, we only need to take the blocks out.
- ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
+ for (; toTake > 0; ++destIx, --toTake, headerIx += headerStep, offset += allocSize) {
+ if (assertsEnabled) {
+ checkHeader(headerIx, -1, false); // Cannot validate the list, it may be unset
+ }
+ if (dest != null) {
+ LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+ initializeNewlyAllocated(buffer, allocSize, headerIx, offset);
+ setHeaderAlloc(headerIx, freeListIx, buffer, CasLog.Src.ALLOC_SPLIT_BUF);
+ } else {
+ destHeaders[destIx] = makeIntPair(arenaIx, headerIx);
+ setHeaderNoBufAlloc(headerIx, freeListIx, CasLog.Src.ALLOC_SPLIT_DEFRAG);
+ }
}
lastSplitNextHeader = headerIx; // If anything remains, this is where it starts.
headerIx = getNextFreeListItem(origOffset);
}
- replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head.
+ replaceListHeadUnderLock(splitList, headerIx, splitListIx); // In the end, update free list head.
} finally {
splitList.lock.unlock();
}
+ CasLog.Src src = (dest != null) ? CasLog.Src.SPLIT_AFTER_BUF : CasLog.Src.SPLIT_AFTER_DEFRAG;
if (remaining == 0) {
// We have just obtained all we needed by splitting some block; now we need
// to put the space remaining from that block into lower free lists.
// We'll put at most one block into each list, since 2 blocks can always be combined
// to make a larger-level block. Each bit in the remaining target-sized blocks count
// is one block in a list offset from target-sized list by bit index.
+ // We do the merges here too, since the block we just allocated could immediately be
+ // moved out, then the resulting free space abandoned.
int newListIndex = freeListIx;
while (lastSplitBlocksRemaining > 0) {
if ((lastSplitBlocksRemaining & 1) == 1) {
- FreeList newFreeList = freeLists[newListIndex];
- newFreeList.lock.lock();
- headers[lastSplitNextHeader] = makeHeader(newListIndex, false);
- try {
- addBlockToFreeListUnderLock(newFreeList, lastSplitNextHeader);
- } finally {
- newFreeList.lock.unlock();
- }
+ addToFreeListWithMerge(lastSplitNextHeader, newListIndex, null, src);
lastSplitNextHeader += (1 << newListIndex);
}
lastSplitBlocksRemaining >>>= 1;
@@ -634,10 +1274,16 @@ public final class BuddyAllocator
}
++splitListIx;
}
- return ix;
+ return destIx;
+ }
+
+ private void initializeNewlyAllocated(
+ LlapAllocatorBuffer buffer, int allocSize, int headerIx, int offset) {
+ buffer.initialize(data, offset, allocSize);
+ buffer.setNewAllocLocation(arenaIx, headerIx);
}
- private void replaceListHeadUnderLock(FreeList freeList, int headerIx) {
+ private void replaceListHeadUnderLock(FreeList freeList, int headerIx, int ix) {
if (headerIx == freeList.listHead) return;
if (headerIx >= 0) {
int newHeadOffset = offsetFromHeaderIndex(headerIx);
@@ -655,7 +1301,7 @@ public final class BuddyAllocator
}
if (allocArenaCount > arenaIx) {
// Someone already allocated this arena; just do the usual thing.
- return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+ return allocateWithSplit(freeListIx, dest, null, ix, dest.length, size, -1);
}
if ((arenaIx + 1) == -arenaCount) {
// Someone is allocating this arena. Wait a bit and recheck.
@@ -676,34 +1322,39 @@ public final class BuddyAllocator
continue; // CAS race, look again.
}
assert data == null;
- init();
+ init(arenaIx);
boolean isCommited = allocatedArenas.compareAndSet(-arenaCount - 1, arenaCount + 1);
assert isCommited;
synchronized (this) {
this.notifyAll();
}
metrics.incrAllocatedArena();
- return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+ return allocateWithSplit(freeListIx, dest, null, ix, dest.length, size, -1);
}
}
- public int offsetFromHeaderIndex(int lastSplitNextHeader) {
- return lastSplitNextHeader << minAllocLog2;
- }
-
- public int allocateFromFreeListUnderLock(int arenaIx, FreeList freeList,
- int freeListIx, MemoryBuffer[] dest, int ix, int size) {
+ public int allocateFromFreeListUnderLock(FreeList freeList, int freeListIx,
+ MemoryBuffer[] dest, long[] destHeaders, int destIx, int destCount, int allocSize) {
int current = freeList.listHead;
- while (current >= 0 && ix < dest.length) {
- int offset = offsetFromHeaderIndex(current);
- // Noone else has this either allocated or in a different free list; no sync needed.
- headers[current] = makeHeader(freeListIx, true);
+ assert (dest == null) != (destHeaders == null);
+ while (current >= 0 && destIx < destCount) {
+ int offset = offsetFromHeaderIndex(current), allocHeaderIx = current;
current = getNextFreeListItem(offset);
- ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, size);
- ++ix;
+ if (assertsEnabled) {
+ checkHeader(allocHeaderIx, freeListIx, false);
+ }
+ if (dest != null) {
+ LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)dest[destIx];
+ initializeNewlyAllocated(buffer, allocSize, allocHeaderIx, offset);
+ setHeaderAlloc(allocHeaderIx, freeListIx, buffer, CasLog.Src.ALLOC_FREE_BUF);
+ } else {
+ destHeaders[destIx] = makeIntPair(arenaIx, allocHeaderIx);
+ setHeaderNoBufAlloc(allocHeaderIx, freeListIx, CasLog.Src.ALLOC_FREE_DEFRAG);
+ }
+ ++destIx;
}
- replaceListHeadUnderLock(freeList, current);
- return ix;
+ replaceListHeadUnderLock(freeList, current, freeListIx);
+ return destIx;
}
private int getPrevFreeListItem(int offset) {
@@ -714,32 +1365,44 @@ public final class BuddyAllocator
return data.getInt(offset + 4);
}
- private byte makeHeader(int freeListIx, boolean isInUse) {
- return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
+ public void deallocate(LlapAllocatorBuffer buffer, boolean isAfterMove) {
+ assert data != null;
+ int pos = buffer.byteBuffer.position();
+ // Note: this is called by someone who has ensured the buffer is not going to be moved.
+ int headerIx = pos >>> minAllocLog2;
+ int freeListIx = freeListFromAllocSize(buffer.allocSize);
+ if (assertsEnabled && !isAfterMove) {
+ LlapAllocatorBuffer buf = buffers[headerIx];
+ if (buf != buffer) {
+ failWithLog(arenaIx + ":" + headerIx + " => "
+ + toDebugString(buffer) + ", " + toDebugString(buf));
+ }
+ assertBufferLooksValid(freeListFromHeader(headers[headerIx]), buf, arenaIx, headerIx);
+ checkHeader(headerIx, freeListIx, true);
+ }
+ buffers[headerIx] = null;
+ addToFreeListWithMerge(headerIx, freeListIx, buffer, CasLog.Src.DEALLOC);
}
- public void deallocate(LlapDataBuffer buffer) {
- assert data != null;
- int headerIx = buffer.byteBuffer.position() >>> minAllocLog2,
- freeListIx = freeListFromHeader(headers[headerIx]);
- assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2)
- : buffer.allocSize + " " + freeListIx;
+ private void addToFreeListWithMerge(int headerIx, int freeListIx,
+ LlapAllocatorBuffer buffer, CasLog.Src src) {
while (true) {
FreeList freeList = freeLists[freeListIx];
- int bHeaderIx = headerIx ^ (1 << freeListIx);
+ int bHeaderIx = getBuddyHeaderIx(freeListIx, headerIx);
freeList.lock.lock();
try {
if ((freeListIx == freeLists.length - 1)
|| headers[bHeaderIx] != makeHeader(freeListIx, false)) {
// Buddy block is allocated, or it is on higher level of allocation than we are, or we
// have reached the top level. Add whatever we have got to the current free list.
- addBlockToFreeListUnderLock(freeList, headerIx);
- headers[headerIx] = makeHeader(freeListIx, false);
+ addBlockToFreeListUnderLock(freeList, headerIx, freeListIx);
+ setHeaderFree(headerIx, freeListIx, src);
break;
}
// Buddy block is free and in the same free list we have locked. Take it out for merge.
- removeBlockFromFreeList(freeList, bHeaderIx);
- headers[bHeaderIx] = headers[headerIx] = 0; // Erase both headers of the blocks to merge.
+ removeBlockFromFreeList(freeList, bHeaderIx, freeListIx);
+ unsetHeader(bHeaderIx, src); // Erase both headers of the blocks to merge.
+ unsetHeader(headerIx, src);
} finally {
freeList.lock.unlock();
}
@@ -748,7 +1411,8 @@ public final class BuddyAllocator
}
}
- private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx) {
+ private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx, int ix) {
+ CasLog.logAddToList(arenaIx, headerIx, ix, freeList.listHead);
if (freeList.listHead >= 0) {
int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead);
assert getPrevFreeListItem(oldHeadOffset) == -1;
@@ -760,13 +1424,15 @@ public final class BuddyAllocator
freeList.listHead = headerIx;
}
- private void removeBlockFromFreeList(FreeList freeList, int headerIx) {
+ private void removeBlockFromFreeList(FreeList freeList, int headerIx, int ix) {
int bOffset = offsetFromHeaderIndex(headerIx),
bpHeaderIx = getPrevFreeListItem(bOffset), bnHeaderIx = getNextFreeListItem(bOffset);
+ CasLog.logRemoveFromList(arenaIx, headerIx, ix, freeList.listHead);
if (freeList.listHead == headerIx) {
assert bpHeaderIx == -1;
freeList.listHead = bnHeaderIx;
}
+ // Unnecessary: data.putInt(bOffset, -1); data.putInt(bOffset + 4, -1);
if (bpHeaderIx != -1) {
data.putInt(offsetFromHeaderIndex(bpHeaderIx) + 4, bnHeaderIx);
}
@@ -778,20 +1444,341 @@ public final class BuddyAllocator
private static class FreeList {
ReentrantLock lock = new ReentrantLock(false);
- int listHead = -1; // Index of where the buffer is; in minAllocation units
- // TODO: One possible improvement - store blocks arriving left over from splits, and
- // blocks requested, to be able to wait for pending splits and reduce fragmentation.
- // However, we are trying to increase fragmentation now, since we cater to single-size.
+ int listHead = -1; // Index of where the buffer is; in minAllocation units (headers array).
}
@Override
+ @Deprecated
public MemoryBuffer createUnallocated() {
return new LlapDataBuffer();
}
+ // BuddyAllocatorMXBean
+ @Override
+ public boolean getIsDirect() {
+ return isDirect;
+ }
+
+ @Override
+ public int getMinAllocation() {
+ return minAllocation;
+ }
+
+ @Override
+ public int getMaxAllocation() {
+ return maxAllocation;
+ }
+
+ @Override
+ public int getArenaSize() {
+ return arenaSize;
+ }
+
+ @Override
+ public long getMaxCacheSize() {
+ return maxSize;
+ }
+
+ // Various helper methods.
+ private static int getBuddyHeaderIx(int freeListIx, int headerIx) {
+ return headerIx ^ (1 << freeListIx);
+ }
+
+ private static int getNextIx(int ix, int count, int step) {
+ ix += step;
+ assert ix <= count; // We expect the start at 0 and count divisible by step.
+ return ix == count ? 0 : ix;
+ }
+
+ private static int freeListFromHeader(byte header) {
+ return (header >> 1) - 1;
+ }
+
+ private int freeListFromAllocSize(int allocSize) {
+ return (31 - Integer.numberOfLeadingZeros(allocSize)) - minAllocLog2;
+ }
+
+ private int allocSizeFromFreeList(int freeListIx) {
+ return 1 << (freeListIx + minAllocLog2);
+ }
+
+ public int offsetFromHeaderIndex(int lastSplitNextHeader) {
+ return lastSplitNextHeader << minAllocLog2;
+ }
+
+ private static byte makeHeader(int freeListIx, boolean isInUse) {
+ return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
+ }
+
+ private int determineFreeListForAllocation(int size) {
+ int freeListIx = 31 - Integer.numberOfLeadingZeros(size);
+ if (size != (1 << freeListIx)) ++freeListIx; // not a power of two, add one more
+ return Math.max(freeListIx - minAllocLog2, 0);
+ }
+
+ // Utility methods used to store pairs of ints as long.
+ private static long makeIntPair(int first, int second) {
+ return ((long)first) << 32 | second;
+ }
+ private static int getFirstInt(long result) {
+ return (int) (result >>> 32);
+ }
+ private static int getSecondInt(long result) {
+ return (int) (result & ((1L << 32) - 1));
+ }
+
+ // Debug/test related methods.
+ private void assertBufferLooksValid(
+ int freeListIx, LlapAllocatorBuffer buf, int arenaIx, int headerIx) {
+ if (buf.allocSize == allocSizeFromFreeList(freeListIx)) return;
+ failWithLog("Race; allocation size " + buf.allocSize + ", not "
+ + allocSizeFromFreeList(freeListIx) + " for free list "
+ + freeListIx + " at " + arenaIx + ":" + headerIx);
+ }
+
+ private static String toDebugString(LlapAllocatorBuffer buffer) {
+ return buffer == null ? "null" : buffer.toDebugString();
+ }
+
+ private void checkHeaderByte(
+ int arenaIx, int headerIx, int freeListIx, boolean isLocked, byte header) {
+ if (isLocked != ((header & 1) == 1)) {
+ failWithLog("Expected " + arenaIx + ":" + headerIx + " "
+ + (isLocked ? "" : "not ") + "locked: " + header);
+ }
+ if (freeListIx < 0) return;
+ if (freeListFromHeader(header) != freeListIx) {
+ failWithLog("Expected " + arenaIx + ":" + headerIx + " in list " + freeListIx
+ + ": " + freeListFromHeader(header));
+ }
+ }
+
+ @VisibleForTesting
+ void disableDefragShortcutForTest() {
+ this.enableDefragShortcut = false;
+ }
+
+ @VisibleForTesting
+ void setOomLoggingForTest(boolean b) {
+ this.oomLogging = b;
+ }
+
+ @VisibleForTesting
+ String testDump() {
+ StringBuilder sb = new StringBuilder();
+ for (Arena a : arenas) {
+ a.testDump(sb);
+ }
+ return sb.toString();
+ }
+
@Override
public String debugDumpForOom() {
return "\nALLOCATOR STATE:\n" + debugDumpForOomInternal()
+ "\nPARENT STATE:\n" + memoryManager.debugDumpForOom();
}
+
+ private String debugDumpForOomInternal() {
+ StringBuilder sb = new StringBuilder();
+ for (Arena a : arenas) {
+ a.debugDump(sb);
+ }
+ return sb.toString();
+ }
+
+ private void failWithLog(String string) {
+ CasLog.logError();
+ throw new AssertionError(string);
+ }
+
+ @VisibleForTesting
+ public void dumpTestLog() {
+ if (CasLog.casLog != null) {
+ CasLog.casLog.dumpLog(true);
+ }
+ }
+
+ private final static class CasLog {
+ // TODO: enable this for production debug, switching between two small buffers?
+ private final static CasLog casLog = null; //new CasLog();
+ public enum Src {
+ NEWLY_CLEARED,
+ SPLIT_AFTER_BUF,
+ SPLIT_AFTER_DEFRAG,
+ ALLOC_SPLIT_DEFRAG,
+ ALLOC_SPLIT_BUF,
+ ALLOC_DEFRAG,
+ EMPTY_V,
+ NEW_BASE,
+ CTOR,
+ MOVE_TO_NESTED,
+ MOVE_TO_ALLOC,
+ ABANDON_MOVE,
+ ABANDON_AT_END,
+ ABANDON_BASE,
+ CLEARED_BASE,
+ CLEARED_VICTIM,
+ UNUSABLE_NESTED,
+ ABANDON_NESTED,
+ DEALLOC,
+ ALLOC_FREE_DEFRAG,
+ ALLOC_FREE_BUF
+ }
+ private final int size;
+ private final long[] log;
+ private final AtomicInteger offset = new AtomicInteger(0);
+
+
+ public CasLog() {
+ size = 50000000;
+ log = new long[size];
+ }
+
+ public static final int START_MOVE = 0, SET_NB = 1, SET_BUF = 2, SET_FREE = 3,
+ ADD_TO_LIST = 4, REMOVE_FROM_LIST = 5, ERROR = 6, UNSET = 7;
+
+ public static void logMove(int arenaIx, int buddyHeaderIx, int identityHashCode) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(3) - 3;
+ casLog.log[ix] = makeIntPair(START_MOVE, identityHashCode);
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, buddyHeaderIx);
+ }
+
+ public static void logSetNb(CasLog.Src src, int arenaIx, int headerIndex, int size) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(SET_NB, src.ordinal());
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+ casLog.log[ix + 3] = size;
+ }
+
+ public static void logSetFree(CasLog.Src src, int arenaIx, int headerIndex, int size) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(SET_FREE, src.ordinal());
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+ casLog.log[ix + 3] = size;
+ }
+
+ public static void logUnset(CasLog.Src src, int arenaIx, int from, int to) {
+ if (casLog == null) return;
+ if (from > to) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(UNSET, src.ordinal());
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, from);
+ casLog.log[ix + 3] = makeIntPair(arenaIx, to);
+ }
+
+ public static void logSet(CasLog.Src src, int arenaIx, int headerIndex, int identityHashCode) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(SET_BUF, src.ordinal());
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIndex);
+ casLog.log[ix + 3] = identityHashCode;
+ }
+
+ public static void logRemoveFromList(int arenaIx, int headerIx, int listIx, int listHead) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(REMOVE_FROM_LIST, listIx);
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIx);
+ casLog.log[ix + 3] = listHead;
+ }
+
+ public static void logAddToList(int arenaIx, int headerIx, int listIx, int listHead) {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(4) - 4;
+ casLog.log[ix] = makeIntPair(ADD_TO_LIST, listIx);
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ casLog.log[ix + 2] = makeIntPair(arenaIx, headerIx);
+ casLog.log[ix + 3] = listHead;
+ }
+
+ public static void logError() {
+ if (casLog == null) return;
+ int ix = casLog.offset.addAndGet(2) - 2;
+ casLog.log[ix] = makeIntPair(ERROR, 0);
+ casLog.log[ix + 1] = Thread.currentThread().getId();
+ }
+
+ private int dumpOneLine(int ix) {
+ int event = getFirstInt(log[ix]);
+ switch (event) {
+ case START_MOVE: {
+ LlapIoImpl.LOG.info(prefix(ix) + " started to move "
+ + header(log[ix + 2]) + " " + Integer.toHexString(getSecondInt(log[ix])));
+ return ix + 3;
+ }
+ case SET_NB: {
+ LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+ + header(log[ix + 2]) + " to taken of size " + log[ix + 3]);
+ return ix + 4;
+ }
+ case SET_FREE: {
+ LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+ + header(log[ix + 2]) + " to free of size " + log[ix + 3]);
+ return ix + 4;
+ }
+ case UNSET: {
+ LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " unset ["
+ + header(log[ix + 2]) + ", " + header(log[ix + 3]) + "]");
+ return ix + 4;
+ }
+ case SET_BUF: {
+ LlapIoImpl.LOG.info(prefix(ix) + " " + src(getSecondInt(log[ix])) + " set "
+ + header(log[ix + 2]) + " to " + Integer.toHexString((int)log[ix + 3]));
+ return ix + 4;
+ }
+ case ADD_TO_LIST: {
+ //LlapIoImpl.LOG.info(prefix(ix) + " adding " + header(log[ix + 2]) + " to "
+ // + getSecondInt(log[ix]) + " before " + log[ix + 3]);
+ return ix + 4;
+ }
+ case REMOVE_FROM_LIST: {
+ // LlapIoImpl.LOG.info(prefix(ix) + " removing " + header(log[ix + 2]) + " from "
+ // + getSecondInt(log[ix]) + " head " + log[ix + 3]);
+ return ix + 4;
+ }
+ case ERROR: {
+ LlapIoImpl.LOG.error(prefix(ix) + " failed");
+ return ix + 2;
+ }
+ default: throw new AssertionError("Unknown " + event);
+ }
+ }
+
+ private String prefix(int ix) {
+ return ix + " thread-" + log[ix + 1];
+ }
+
+ private String src(int val) {
+ return Src.values()[val].name();
+ }
+
+ private String header(long l) {
+ return getFirstInt(l) + ":" + getSecondInt(l);
+ }
+
+
+ public synchronized void dumpLog(boolean doSleep) {
+ if (doSleep) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ int logSize = (int)offset.get();
+ int ix = 0;
+ while (ix < logSize) {
+ ix = dumpOneLine(ix);
+ }
+ offset.set(0);
+ }
+ }
}