You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/13 20:53:40 UTC
hive git commit: HIVE-11200 : LLAP: Cache BuddyAllocator throws NPE
(Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/llap 1972e8432 -> fc9f75836
HIVE-11200 : LLAP: Cache BuddyAllocator throws NPE (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc9f7583
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc9f7583
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc9f7583
Branch: refs/heads/llap
Commit: fc9f7583660adc498a1985639adb2570cee391df
Parents: 1972e84
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Jul 13 11:53:45 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Jul 13 11:53:45 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/cache/BuddyAllocator.java | 87 +++++++++++++++-----
.../hive/llap/cache/TestBuddyAllocator.java | 31 +++++++
2 files changed, 97 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fc9f7583/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 3631418..fca6249 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -118,6 +118,9 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
// First try to quickly lock some of the correct-sized free lists and allocate from them.
int arenaCount = allocatedArenas.get();
+ if (arenaCount < 0) {
+ arenaCount = -arenaCount - 1; // Next arena is being allocated.
+ }
long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
{
int startIndex = (int)(threadId % arenaCount), index = startIndex;
@@ -317,8 +320,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
FreeList freeList = freeLists[freeListIx];
int remaining = -1;
freeList.lock.lock();
- // TODO: write some comments for this method
try {
+ // Try to allocate from target-sized free list, maybe we'll get lucky.
ix = allocateFromFreeListUnderLock(
arenaIx, freeList, freeListIx, dest, ix, allocationSize);
remaining = dest.length - ix;
@@ -326,9 +329,12 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
} finally {
freeList.lock.unlock();
}
- byte headerData = makeHeader(freeListIx, true);
- int headerStep = 1 << freeListIx;
- int splitListIx = freeListIx + 1;
+ byte headerData = makeHeader(freeListIx, true); // Header for newly allocated used blocks.
+ int headerStep = 1 << freeListIx; // Number of headers (smallest blocks) per target block.
+ int splitListIx = freeListIx + 1; // Next free list from which we will be splitting.
+ // Each iteration of this loop tries to split blocks from one level of the free list into
+ // target size blocks; if we cannot satisfy the allocation from the free list containing the
+ // blocks of a particular size, we'll try to split yet larger blocks, until we run out.
while (remaining > 0 && splitListIx < freeLists.length) {
int splitWaysLog2 = (splitListIx - freeListIx);
assert splitWaysLog2 > 0;
@@ -338,28 +344,33 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
FreeList splitList = freeLists[splitListIx];
splitList.lock.lock();
try {
- int headerIx = splitList.listHead;
+ int headerIx = splitList.listHead; // Index of the next free block to split.
while (headerIx >= 0 && remaining > 0) {
int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset;
- int toTake = Math.min(splitWays, remaining); // We split it splitWays and take toTake.
+ // We will split the block at headerIx [splitWays] ways, and take [toTake] blocks,
+ // which will leave [lastSplitBlocksRemaining] free blocks of target size.
+ int toTake = Math.min(splitWays, remaining);
remaining -= toTake;
lastSplitBlocksRemaining = splitWays - toTake; // Whatever remains.
- // Take toTake blocks by splitting the block at origOffset.
+ // Take toTake blocks by splitting the block at offset.
for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
headers[headerIx] = headerData;
// TODO: this could be done out of the lock, we only need to take the blocks out.
((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
}
lastSplitNextHeader = headerIx; // If anything remains, this is where it starts.
- headerIx = data.getInt(origOffset + 4); // Get next item from the free list.
+ headerIx = getNextFreeListItem(origOffset);
}
replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head.
} finally {
splitList.lock.unlock();
}
if (remaining == 0) {
- // We have just obtained all we needed by splitting at lastSplitBlockOffset; now
- // we need to put the space remaining from that block into lower free lists.
+ // We have just obtained all we needed by splitting some block; now we need
+ // to put the space remaining from that block into lower free lists.
+ // We'll put at most one block into each list, since 2 blocks can always be combined
+ // to make a larger-level block. Each bit in the remaining target-sized blocks count
+ // is one block in a list offset from target-sized list by bit index.
int newListIndex = freeListIx;
while (lastSplitBlocksRemaining > 0) {
if ((lastSplitBlocksRemaining & 1) == 1) {
@@ -394,17 +405,43 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
private int allocateWithExpand(
int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
- if (data == null) {
- synchronized (this) {
- // Never goes from non-null to null, so this is the only place we need sync.
- if (data == null) {
- init();
- allocatedArenas.incrementAndGet();
- metrics.incrAllocatedArena();
+ while (true) {
+ int arenaCount = allocatedArenas.get(), allocArenaCount = arenaCount;
+ if (arenaCount < 0) {
+ allocArenaCount = -arenaCount - 1; // Someone is allocating an arena.
+ }
+ if (allocArenaCount > arenaIx) {
+ // Someone already allocated this arena; just do the usual thing.
+ return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+ }
+ if ((arenaIx + 1) == -arenaCount) {
+ // Someone is allocating this arena. Wait a bit and recheck.
+ try {
+ synchronized (this) {
+ this.wait(100);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // Restore interrupt, won't handle here.
}
+ continue;
}
+ // Either this arena is being allocated, or it is already allocated, or it is next. The
+ // caller should not try to allocate another arena before waiting for the previous one.
+ assert arenaCount == arenaIx :
+ "Arena count " + arenaCount + " but " + arenaIx + " is not being allocated";
+ if (!allocatedArenas.compareAndSet(arenaCount, -arenaCount - 1)) {
+ continue; // CAS race, look again.
+ }
+ assert data == null;
+ init();
+ boolean isCommited = allocatedArenas.compareAndSet(-arenaCount - 1, arenaCount + 1);
+ assert isCommited;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ metrics.incrAllocatedArena();
+ return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
}
- return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
}
public int offsetFromHeaderIndex(int lastSplitNextHeader) {
@@ -418,7 +455,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
int offset = offsetFromHeaderIndex(current);
// Noone else has this either allocated or in a different free list; no sync needed.
headers[current] = makeHeader(freeListIx, true);
- current = data.getInt(offset + 4);
+ current = getNextFreeListItem(offset);
((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, size);
++ix;
}
@@ -426,6 +463,14 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
return ix;
}
+ private int getPrevFreeListItem(int offset) {
+ return data.getInt(offset);
+ }
+
+ private int getNextFreeListItem(int offset) {
+ return data.getInt(offset + 4);
+ }
+
private byte makeHeader(int freeListIx, boolean isInUse) {
return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
}
@@ -462,7 +507,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx) {
if (freeList.listHead >= 0) {
int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead);
- assert data.getInt(oldHeadOffset) == -1;
+ assert getPrevFreeListItem(oldHeadOffset) == -1;
data.putInt(oldHeadOffset, headerIx);
}
int offset = offsetFromHeaderIndex(headerIx);
@@ -473,7 +518,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
private void removeBlockFromFreeList(FreeList freeList, int headerIx) {
int bOffset = offsetFromHeaderIndex(headerIx),
- bpHeaderIx = data.getInt(bOffset), bnHeaderIx = data.getInt(bOffset + 4);
+ bpHeaderIx = getPrevFreeListItem(bOffset), bnHeaderIx = getNextFreeListItem(bOffset);
if (freeList.listHead == headerIx) {
assert bpHeaderIx == -1;
freeList.listHead = bnHeaderIx;
http://git-wip-us.apache.org/repos/asf/hive/blob/fc9f7583/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 7d265c1..50d5e19 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -136,6 +136,37 @@ public class TestBuddyAllocator {
}
}
+ @Test
+ public void testMTTArenas() {
+ final int min = 3, max = 4, maxAlloc = 1 << max, minAllocCount = 2048, threadCount = 4;
+ Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, (1 << min) * minAllocCount);
+ final BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(),
+ LlapDaemonCacheMetrics.create("test", "1"));
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ final CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1);
+ Callable<Void> testCallable = new Callable<Void>() {
+ public Void call() throws Exception {
+ syncThreadStart(cdlIn, cdlOut);
+ allocSameSize(a, minAllocCount / threadCount, min);
+ return null;
+ }
+ };
+ @SuppressWarnings("unchecked")
+ FutureTask<Void>[] allocTasks = new FutureTask[threadCount];
+ for (int i = 0; i < threadCount; ++i) {
+ allocTasks[i] = new FutureTask<>(testCallable);
+ executor.execute(allocTasks[i]);
+ }
+ try {
+ cdlIn.await(); // Wait for all threads to be ready.
+ cdlOut.countDown(); // Release them at the same time.
+ for (int i = 0; i < threadCount; ++i) {
+ allocTasks[i].get();
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
cdlIn.countDown();
try {