You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/13 20:53:40 UTC

hive git commit: HIVE-11200 : LLAP: Cache BuddyAllocator throws NPE (Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/llap 1972e8432 -> fc9f75836


HIVE-11200 : LLAP: Cache BuddyAllocator throws NPE (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc9f7583
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc9f7583
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc9f7583

Branch: refs/heads/llap
Commit: fc9f7583660adc498a1985639adb2570cee391df
Parents: 1972e84
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Jul 13 11:53:45 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Jul 13 11:53:45 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/cache/BuddyAllocator.java  | 87 +++++++++++++++-----
 .../hive/llap/cache/TestBuddyAllocator.java     | 31 +++++++
 2 files changed, 97 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fc9f7583/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 3631418..fca6249 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -118,6 +118,9 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
     }
     // First try to quickly lock some of the correct-sized free lists and allocate from them.
     int arenaCount = allocatedArenas.get();
+    if (arenaCount < 0) {
+      arenaCount = -arenaCount - 1; // Next arena is being allocated.
+    }
     long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
     {
       int startIndex = (int)(threadId % arenaCount), index = startIndex;
@@ -317,8 +320,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
       FreeList freeList = freeLists[freeListIx];
       int remaining = -1;
       freeList.lock.lock();
-      // TODO: write some comments for this method
       try {
+        // Try to allocate from target-sized free list, maybe we'll get lucky.
         ix = allocateFromFreeListUnderLock(
             arenaIx, freeList, freeListIx, dest, ix, allocationSize);
         remaining = dest.length - ix;
@@ -326,9 +329,12 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
       } finally {
         freeList.lock.unlock();
       }
-      byte headerData = makeHeader(freeListIx, true);
-      int headerStep = 1 << freeListIx;
-      int splitListIx = freeListIx + 1;
+      byte headerData = makeHeader(freeListIx, true); // Header for newly allocated used blocks.
+      int headerStep = 1 << freeListIx; // Number of headers (smallest blocks) per target block.
+      int splitListIx = freeListIx + 1; // Next free list from which we will be splitting.
+      // Each iteration of this loop tries to split blocks from one level of the free list into
+      // target size blocks; if we cannot satisfy the allocation from the free list containing the
+      // blocks of a particular size, we'll try to split yet larger blocks, until we run out.
       while (remaining > 0 && splitListIx < freeLists.length) {
         int splitWaysLog2 = (splitListIx - freeListIx);
         assert splitWaysLog2 > 0;
@@ -338,28 +344,33 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
         FreeList splitList = freeLists[splitListIx];
         splitList.lock.lock();
         try {
-          int headerIx = splitList.listHead;
+          int headerIx = splitList.listHead; // Index of the next free block to split.
           while (headerIx >= 0 && remaining > 0) {
             int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset;
-            int toTake = Math.min(splitWays, remaining); // We split it splitWays and take toTake.
+            // We will split the block at headerIx [splitWays] ways, and take [toTake] blocks,
+            // which will leave [lastSplitBlocksRemaining] free blocks of target size.
+            int toTake = Math.min(splitWays, remaining);
             remaining -= toTake;
             lastSplitBlocksRemaining = splitWays - toTake; // Whatever remains.
-            // Take toTake blocks by splitting the block at origOffset.
+            // Take toTake blocks by splitting the block at offset.
             for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
               headers[headerIx] = headerData;
               // TODO: this could be done out of the lock, we only need to take the blocks out.
               ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
             }
             lastSplitNextHeader = headerIx; // If anything remains, this is where it starts.
-            headerIx = data.getInt(origOffset + 4); // Get next item from the free list.
+            headerIx = getNextFreeListItem(origOffset);
           }
           replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head.
         } finally {
           splitList.lock.unlock();
         }
         if (remaining == 0) {
-          // We have just obtained all we needed by splitting at lastSplitBlockOffset; now
-          // we need to put the space remaining from that block into lower free lists.
+          // We have just obtained all we needed by splitting some block; now we need
+          // to put the space remaining from that block into lower free lists.
+          // We'll put at most one block into each list, since 2 blocks can always be combined
+          // to make a larger-level block. Each bit in the remaining target-sized blocks count
+          // is one block in a list offset from target-sized list by bit index.
           int newListIndex = freeListIx;
           while (lastSplitBlocksRemaining > 0) {
             if ((lastSplitBlocksRemaining & 1) == 1) {
@@ -394,17 +405,43 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
 
     private int allocateWithExpand(
         int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
-      if (data == null) {
-        synchronized (this) {
-          // Never goes from non-null to null, so this is the only place we need sync.
-          if (data == null) {
-            init();
-            allocatedArenas.incrementAndGet();
-            metrics.incrAllocatedArena();
+      while (true) {
+        int arenaCount = allocatedArenas.get(), allocArenaCount = arenaCount;
+        if (arenaCount < 0)  {
+          allocArenaCount = -arenaCount - 1; // Someone is allocating an arena.
+        }
+        if (allocArenaCount > arenaIx) {
+          // Someone already allocated this arena; just do the usual thing.
+          return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+        }
+        if ((arenaIx + 1) == -arenaCount) {
+          // Someone is allocating this arena. Wait a bit and recheck.
+          try {
+            synchronized (this) {
+              this.wait(100);
+            }
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt(); // Restore interrupt, won't handle here.
           }
+          continue;
         }
+        // Either this arena is being allocated, or it is already allocated, or it is next. The
+        // caller should not try to allocate another arena before waiting for the previous one.
+        assert arenaCount == arenaIx :
+          "Arena count " + arenaCount + " but " + arenaIx + " is not being allocated";
+        if (!allocatedArenas.compareAndSet(arenaCount, -arenaCount - 1)) {
+          continue; // CAS race, look again.
+        }
+        assert data == null;
+        init();
+        boolean isCommited = allocatedArenas.compareAndSet(-arenaCount - 1, arenaCount + 1);
+        assert isCommited;
+        synchronized (this) {
+          this.notifyAll();
+        }
+        metrics.incrAllocatedArena();
+        return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
       }
-      return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
     }
 
     public int offsetFromHeaderIndex(int lastSplitNextHeader) {
@@ -418,7 +455,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
         int offset = offsetFromHeaderIndex(current);
         // Noone else has this either allocated or in a different free list; no sync needed.
         headers[current] = makeHeader(freeListIx, true);
-        current = data.getInt(offset + 4);
+        current = getNextFreeListItem(offset);
         ((LlapDataBuffer)dest[ix]).initialize(arenaIx, data, offset, size);
         ++ix;
       }
@@ -426,6 +463,14 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
       return ix;
     }
 
+    private int getPrevFreeListItem(int offset) {
+      return data.getInt(offset);
+    }
+
+    private int getNextFreeListItem(int offset) {
+      return data.getInt(offset + 4);
+    }
+
     private byte makeHeader(int freeListIx, boolean isInUse) {
       return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
     }
@@ -462,7 +507,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
     private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx) {
       if (freeList.listHead >= 0) {
         int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead);
-        assert data.getInt(oldHeadOffset) == -1;
+        assert getPrevFreeListItem(oldHeadOffset) == -1;
         data.putInt(oldHeadOffset, headerIx);
       }
       int offset = offsetFromHeaderIndex(headerIx);
@@ -473,7 +518,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
 
     private void removeBlockFromFreeList(FreeList freeList, int headerIx) {
       int bOffset = offsetFromHeaderIndex(headerIx),
-          bpHeaderIx = data.getInt(bOffset), bnHeaderIx = data.getInt(bOffset + 4);
+          bpHeaderIx = getPrevFreeListItem(bOffset), bnHeaderIx = getNextFreeListItem(bOffset);
       if (freeList.listHead == headerIx) {
         assert bpHeaderIx == -1;
         freeList.listHead = bnHeaderIx;

http://git-wip-us.apache.org/repos/asf/hive/blob/fc9f7583/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 7d265c1..50d5e19 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -136,6 +136,37 @@ public class TestBuddyAllocator {
     }
   }
 
+  @Test
+  public void testMTTArenas() {
+    final int min = 3, max = 4, maxAlloc = 1 << max, minAllocCount = 2048, threadCount = 4;
+    Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, (1 << min) * minAllocCount);
+    final BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager(),
+        LlapDaemonCacheMetrics.create("test", "1"));
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    final CountDownLatch cdlIn = new CountDownLatch(threadCount), cdlOut = new CountDownLatch(1);
+    Callable<Void> testCallable = new Callable<Void>() {
+      public Void call() throws Exception {
+        syncThreadStart(cdlIn, cdlOut);
+        allocSameSize(a, minAllocCount / threadCount, min);
+        return null;
+      }
+    };
+    @SuppressWarnings("unchecked")
+    FutureTask<Void>[] allocTasks = new FutureTask[threadCount];
+    for (int i = 0; i < threadCount; ++i) {
+      allocTasks[i] = new FutureTask<>(testCallable);
+      executor.execute(allocTasks[i]);
+    }
+    try {
+      cdlIn.await(); // Wait for all threads to be ready.
+      cdlOut.countDown(); // Release them at the same time.
+      for (int i = 0; i < threadCount; ++i) {
+        allocTasks[i].get();
+      }
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
   private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
     cdlIn.countDown();
     try {