You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/16 04:30:47 UTC
svn commit: r1652337 - in /hive/branches/llap:
llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/
llap-server/src/java/org/apache/hadoop/hive/llap/cache/
llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/
llap-server/src/test/org...
Author: sershe
Date: Fri Jan 16 03:30:46 2015
New Revision: 1652337
URL: http://svn.apache.org/r1652337
Log:
Separated allocator and cache; unit tests for allocator, fixed a bunch of bugs
Added:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
Removed:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
Modified:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java Fri Jan 16 03:30:46 2015
@@ -24,7 +24,9 @@ public abstract class LlapMemoryBuffer {
protected LlapMemoryBuffer(ByteBuffer byteBuffer, int offset, int length) {
initialize(byteBuffer, offset, length);
}
- public void initialize(ByteBuffer byteBuffer, int offset, int length) {
+ protected LlapMemoryBuffer() {
+ }
+ protected void initialize(ByteBuffer byteBuffer, int offset, int length) {
this.byteBuffer = byteBuffer;
this.offset = offset;
this.length = length;
@@ -32,5 +34,4 @@ public abstract class LlapMemoryBuffer {
public ByteBuffer byteBuffer;
public int offset;
public int length;
-
}
\ No newline at end of file
Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Fri Jan 16 03:30:46 2015
@@ -23,11 +23,13 @@ public interface LowLevelCache {
/**
* Gets file data for particular offsets. Null entries mean no data.
+ * @param file File name; MUST be interned.
*/
LlapMemoryBuffer[] getFileData(String fileName, long[] offsets);
/**
* Puts file data into cache.
+ * @param file File name; MUST be interned.
* @return null if all data was put; bitmask indicating which chunks were not put otherwise;
* the replacement chunks from cache are updated directly in the array.
*/
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+
+final class BuddyAllocator {
+ private static final Log LOG = LogFactory.getLog(BuddyAllocator.class);
+
+ private final Arena[] arenas;
+ private AtomicInteger allocatedArenas = new AtomicInteger(0);
+
+ private final MemoryManager memoryManager;
+
+ // Config settings
+ private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;
+ private final int minAllocation, maxAllocation, arenaSize;
+ private final long maxSize;
+
+ public BuddyAllocator(Configuration conf, MemoryManager memoryManager) {
+ minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+ maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
+ arenaSize = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
+ maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+ if (minAllocation < 8) {
+ throw new AssertionError("Min allocation must be at least 8: " + minAllocation);
+ }
+ if (maxSize < arenaSize || arenaSize < maxAllocation || maxAllocation < minAllocation) {
+ throw new AssertionError("Inconsistent sizes of cache, arena and allocations: "
+ + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSize);
+ }
+ if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)
+ || (Long.bitCount(arenaSize) != 1)) {
+ // Technically, arena size only needs to be divisible by maxAlloc
+ throw new AssertionError("Allocation and arena sizes must be powers of two: "
+ + minAllocation + ", " + maxAllocation + ", " + arenaSize);
+ }
+ if ((maxSize % arenaSize) > 0 || (maxSize / arenaSize) > Integer.MAX_VALUE) {
+ throw new AssertionError(
+ "Cache size not consistent with arena size: " + arenaSize + "," + maxSize);
+ }
+ minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
+ maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
+ arenaSizeLog2 = 63 - Long.numberOfLeadingZeros(arenaSize);
+ maxArenas = (int)(maxSize / arenaSize);
+ arenas = new Arena[maxArenas];
+ for (int i = 0; i < maxArenas; ++i) {
+ arenas[i] = new Arena();
+ }
+ arenas[0].init();
+ allocatedArenas.set(1);
+ this.memoryManager = memoryManager;
+ }
+
+ // TODO: would it make sense to return buffers asynchronously?
+ public boolean allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+ assert size > 0 : "size is " + size;
+ if (size > maxAllocation) {
+ throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
+ }
+ int freeListIx = 31 - Integer.numberOfLeadingZeros(size);
+ if (size != (1 << freeListIx)) ++freeListIx; // not a power of two, add one more
+ freeListIx = Math.max(freeListIx - minAllocLog2, 0);
+ int allocLog2 = freeListIx + minAllocLog2;
+ int allocationSize = 1 << allocLog2;
+ // TODO: reserving the entire thing is not ideal before we alloc anything. Interleave?
+ memoryManager.reserveMemory(dest.length << allocLog2, true);
+
+ int ix = 0;
+ for (int i = 0; i < dest.length; ++i) {
+ if (dest[i] != null) continue;
+ dest[i] = new LlapCacheableBuffer(); // TODO: pool of objects?
+ }
+ // First try to quickly lock some of the correct-sized free lists and allocate from them.
+ int arenaCount = allocatedArenas.get();
+ long threadId = arenaCount > 1 ? Thread.currentThread().getId() : 0;
+ {
+ int startIndex = (int)(threadId % arenaCount), index = startIndex;
+ do {
+ int newIx = arenas[index].allocateFast(index, freeListIx, dest, ix, allocationSize);
+ if (newIx == dest.length) return true;
+ if (newIx != -1) { // Shouldn't happen.
+ ix = newIx;
+ }
+ if ((++index) == arenaCount) {
+ index = 0;
+ }
+ } while (index != startIndex);
+ }
+
+ // Then try to split bigger blocks. TODO: again, ideally we would tryLock at least once
+ for (int i = 0; i < arenaCount; ++i) {
+ int newIx = arenas[i].allocateWithSplit(i, freeListIx, dest, ix, allocationSize);
+ if (newIx == -1) break; // Shouldn't happen.
+ if (newIx == dest.length) return true;
+ ix = newIx;
+ }
+ // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
+ for (int i = arenaCount; i < arenas.length; ++i) {
+ ix = arenas[i].allocateWithExpand(i, freeListIx, dest, ix, allocationSize);
+ if (ix == dest.length) return true;
+ }
+ return false;
+ }
+
+ public static LlapCacheableBuffer allocateFake() {
+ LlapCacheableBuffer fake = new LlapCacheableBuffer();
+ fake.initialize(-1, null, -1, 1);
+ return fake;
+ }
+
+ public void deallocate(LlapCacheableBuffer buffer) {
+ arenas[buffer.arenaIndex].deallocate(buffer);
+ }
+
+ public String debugDump() {
+ StringBuilder result = new StringBuilder();
+ for (Arena arena : arenas) {
+ arena.debugDump(result);
+ }
+ result.append("\n");
+ return result.toString();
+ }
+
+ private class Arena {
+ private ByteBuffer data;
+ // Avoid storing headers with data since we expect binary size allocations.
+ // Each headers[i] is a "virtual" byte at i * minAllocation.
+ private byte[] headers;
+ private FreeList[] freeLists;
+
+ void init() {
+ data = ByteBuffer.allocateDirect(arenaSize);
+ int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
+ headers = new byte[maxMinAllocs];
+ int allocLog2Diff = maxAllocLog2 - minAllocLog2, freeListCount = allocLog2Diff + 1;
+ freeLists = new FreeList[freeListCount];
+ for (int i = 0; i < freeListCount; ++i) {
+ freeLists[i] = new FreeList();
+ }
+ int maxMaxAllocs = 1 << (arenaSizeLog2 - maxAllocLog2),
+ headerIndex = 0, headerStep = 1 << allocLog2Diff;
+ freeLists[allocLog2Diff].listHead = 0;
+ for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
+ // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
+ headers[headerIndex] = makeHeader(allocLog2Diff, false);
+ LOG.info("TODO# 1 mucking with " + System.identityHashCode(data) + ":" + offset);
+ data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerStep));
+ data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerStep));
+ headerIndex += headerStep;
+ }
+ }
+
+ public void debugDump(StringBuilder result) {
+ result.append("\nArena: ");
+ if (data == null) {
+ result.append(" not allocated");
+ return;
+ }
+ for (int i = 0; i < headers.length; ++i) {
+ byte header = headers[i];
+ if (header == 0) continue;
+ int freeListIx = (header >> 1) - 1, offset = offsetFromHeaderIndex(i);
+ boolean isFree = (header & 1) == 0;
+ result.append("\n block " + i + " at " + offset + ": size " + (1 << (freeListIx + minAllocLog2))
+ + ", " + (isFree ? "free" : "allocated"));
+ }
+ int allocSize = minAllocation;
+ for (int i = 0; i < freeLists.length; ++i, allocSize <<= 1) {
+ result.append("\n free list for size " + allocSize + ": ");
+ int nextItem = freeLists[i].listHead;
+ while (nextItem >= 0) {
+ result.append(nextItem + ", ");
+ nextItem = data.getInt(offsetFromHeaderIndex(nextItem));
+ }
+ }
+ }
+
+ private int allocateFast(int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ if (data == null) return -1; // not allocated yet
+ FreeList freeList = freeLists[freeListIx];
+ if (!freeList.lock.tryLock()) return ix;
+ try {
+ return allocateFromFreeListUnderLock(arenaIx, freeList, freeListIx, dest, ix, size);
+ } finally {
+ freeList.lock.unlock();
+ }
+ }
+
+ private int allocateWithSplit(int arenaIx, int freeListIx,
+ LlapMemoryBuffer[] dest, int ix, int allocationSize) {
+ if (data == null) return -1; // not allocated yet
+ FreeList freeList = freeLists[freeListIx];
+ int remaining = -1;
+ freeList.lock.lock();
+ try {
+ ix = allocateFromFreeListUnderLock(
+ arenaIx, freeList, freeListIx, dest, ix, allocationSize);
+ remaining = dest.length - ix;
+ if (remaining == 0) return ix;
+ } finally {
+ freeList.lock.unlock();
+ }
+ byte headerData = makeHeader(freeListIx, true);
+ int headerStep = 1 << freeListIx;
+ int splitListIx = freeListIx + 1;
+ while (remaining > 0 && splitListIx < freeLists.length) {
+ int splitWays = 1 << (splitListIx - freeListIx);
+ int lastSplitBlocksRemaining = -1, lastSplitNextHeader = -1;
+ FreeList splitList = freeLists[splitListIx];
+ splitList.lock.lock();
+ try {
+ int headerIx = splitList.listHead;
+ while (headerIx >= 0 && remaining > 0) {
+ int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset;
+ int toTake = Math.min(splitWays, remaining);
+ remaining -= toTake;
+ lastSplitBlocksRemaining = splitWays - toTake;
+ for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
+ headers[headerIx] = headerData;
+ ((LlapCacheableBuffer)dest[ix]).initialize(arenaIx, data, offset, allocationSize);
+ }
+ lastSplitNextHeader = headerIx;
+ headerIx = data.getInt(origOffset + 4);
+ }
+ replaceListHeadUnderLock(splitList, headerIx);
+ } finally {
+ splitList.lock.unlock();
+ }
+ if (remaining == 0) {
+ // We have just obtained all we needed by splitting at lastSplitBlockOffset; now
+ // we need to put the space remaining from that block into lower free lists.
+ int newListIndex = freeListIx;
+ while (lastSplitBlocksRemaining > 0) {
+ if ((lastSplitBlocksRemaining & 1) == 1) {
+ FreeList newFreeList = freeLists[newListIndex];
+ newFreeList.lock.lock();
+ headers[lastSplitNextHeader] = makeHeader(newListIndex, false);
+ try {
+ addBlockToFreeListUnderLock(newFreeList, lastSplitNextHeader);
+ } finally {
+ newFreeList.lock.unlock();
+ }
+ lastSplitNextHeader += (1 << newListIndex);
+ }
+ lastSplitBlocksRemaining >>>= 1;
+ ++newListIndex;
+ continue;
+ }
+ }
+ ++splitListIx;
+ }
+ return ix;
+ }
+
+ private void replaceListHeadUnderLock(FreeList freeList, int headerIx) {
+ if (headerIx == freeList.listHead) return;
+ if (headerIx >= 0) {
+ int newHeadOffset = offsetFromHeaderIndex(headerIx);
+ LOG.info("TODO# 3 mucking with " + System.identityHashCode(data) + ":" + newHeadOffset);
+ data.putInt(newHeadOffset, -1); // Remove backlink.
+ }
+ freeList.listHead = headerIx;
+ }
+
+ private int allocateWithExpand(
+ int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ if (data == null) {
+ synchronized (this) {
+ // Never goes from non-null to null, so this is the only place we need sync.
+ if (data == null) {
+ init();
+ allocatedArenas.incrementAndGet();
+ }
+ }
+ }
+ return allocateWithSplit(arenaIx, freeListIx, dest, ix, size);
+ }
+
+ public int offsetFromHeaderIndex(int lastSplitNextHeader) {
+ return lastSplitNextHeader << minAllocLog2;
+ }
+
+ public int allocateFromFreeListUnderLock(int arenaIx, FreeList freeList,
+ int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ int current = freeList.listHead;
+ while (current >= 0 && ix < dest.length) {
+ int offset = offsetFromHeaderIndex(current);
+ // Noone else has this either allocated or in a different free list; no sync needed.
+ headers[current] = makeHeader(freeListIx, true);
+ current = data.getInt(offset + 4);
+ ((LlapCacheableBuffer)dest[ix]).initialize(arenaIx, data, offset, size);
+ ++ix;
+ }
+ replaceListHeadUnderLock(freeList, current);
+ return ix;
+ }
+
+ private byte makeHeader(int freeListIx, boolean isInUse) {
+ return (byte)(((freeListIx + 1) << 1) | (isInUse ? 1 : 0));
+ }
+
+ public void deallocate(LlapCacheableBuffer buffer) {
+ assert data != null;
+ int freeListIx = 31 - Integer.numberOfLeadingZeros(buffer.length) - minAllocLog2,
+ headerIx = buffer.offset >>> minAllocLog2;
+ while (true) {
+ FreeList freeList = freeLists[freeListIx];
+ int bHeaderIx = headerIx ^ (1 << freeListIx);
+ freeList.lock.lock();
+ try {
+ if ((freeListIx == freeLists.length - 1)
+ || headers[bHeaderIx] != makeHeader(freeListIx, false)) {
+ // Buddy block is allocated, or it is on higher level of allocation than we are, or we
+ // have reached the top level. Add whatever we have got to the current free list.
+ addBlockToFreeListUnderLock(freeList, headerIx);
+ headers[headerIx] = makeHeader(freeListIx, false);
+ break;
+ }
+ // Buddy block is free and in the same free list we have locked. Take it out for merge.
+ removeBlockFromFreeList(freeList, bHeaderIx);
+ headers[bHeaderIx] = headers[headerIx] = 0; // Erase both headers of the blocks to merge.
+ } finally {
+ freeList.lock.unlock();
+ }
+ ++freeListIx;
+ headerIx = Math.min(headerIx, bHeaderIx);
+ }
+ }
+
+ private void addBlockToFreeListUnderLock(FreeList freeList, int headerIx) {
+ if (freeList.listHead >= 0) {
+ int oldHeadOffset = offsetFromHeaderIndex(freeList.listHead);
+ assert data.getInt(oldHeadOffset) == -1;
+ LOG.info("TODO# 4 mucking with " + System.identityHashCode(data) + ":" + oldHeadOffset);
+ data.putInt(oldHeadOffset, headerIx);
+ }
+ int offset = offsetFromHeaderIndex(headerIx);
+ LOG.info("TODO# 5 mucking with " + System.identityHashCode(data) + ":" + offset);
+ data.putInt(offset, -1);
+ data.putInt(offset + 4, freeList.listHead);
+ freeList.listHead = headerIx;
+ }
+
+ private void removeBlockFromFreeList(FreeList freeList, int headerIx) {
+ int bOffset = offsetFromHeaderIndex(headerIx),
+ bpHeaderIx = data.getInt(bOffset), bnHeaderIx = data.getInt(bOffset + 4);
+ if (freeList.listHead == headerIx) {
+ assert bpHeaderIx == -1;
+ freeList.listHead = bnHeaderIx;
+ }
+ if (bpHeaderIx != -1) {
+ data.putInt(offsetFromHeaderIndex(bpHeaderIx) + 4, bnHeaderIx);
+ LOG.info("TODO# 6 mucking with " + System.identityHashCode(data) + ":" + offsetFromHeaderIndex(bpHeaderIx) + " + 4");
+ }
+ if (bnHeaderIx != -1) {
+ data.putInt(offsetFromHeaderIndex(bnHeaderIx), bpHeaderIx);
+ LOG.info("TODO# 7 mucking with " + System.identityHashCode(data) + ":" + offsetFromHeaderIndex(bnHeaderIx));
+ }
+ }
+ }
+
+ private static class FreeList {
+ ReentrantLock lock = new ReentrantLock(false);
+ int listHead = -1; // Index of where the buffer is; in minAllocation units
+ // TODO: One possible improvement - store blocks arriving left over from splits, and
+ // blocks requested, to be able to wait for pending splits and reduce fragmentation.
+ // However, we are trying to increase fragmentation now, since we cater to single-size.
+ }
+}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Fri Jan 16 03:30:46 2015
@@ -26,8 +26,12 @@ import org.apache.hadoop.hive.llap.io.ap
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
public final class LlapCacheableBuffer extends LlapMemoryBuffer {
- public LlapCacheableBuffer(ByteBuffer byteBuffer, int offset, int length) {
- super(byteBuffer, offset, length);
+ private static final int EVICTED_REFCOUNT = -1;
+ static final int IN_LIST = -2, NOT_IN_CACHE = -1;
+
+ public void initialize(int arenaIndex, ByteBuffer byteBuffer, int offset, int length) {
+ super.initialize(byteBuffer, offset, length);
+ this.arenaIndex = arenaIndex;
}
public String toStringForCache() {
@@ -35,15 +39,13 @@ public final class LlapCacheableBuffer e
+ lastUpdate + " " + (isLocked() ? "!" : ".") + "]";
}
- private static final int EVICTED_REFCOUNT = -1;
private final AtomicInteger refCount = new AtomicInteger(0);
- // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object.
+ public int arenaIndex = -1;
public double priority;
public long lastUpdate = -1;
public LlapCacheableBuffer prev = null, next = null;
public int indexInHeap = NOT_IN_CACHE;
- public static final int IN_LIST = -2, NOT_IN_CACHE = -1;
@Override
public int hashCode() {
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+public class LowLevelCacheImpl implements LowLevelCache, EvictionListener {
+ private final BuddyAllocator allocator;
+
+ private AtomicInteger newEvictions = new AtomicInteger(0);
+ private final Thread cleanupThread;
+ private final ConcurrentHashMap<String, FileCache> cache =
+ new ConcurrentHashMap<String, FileCache>();
+ private final LowLevelCachePolicyBase cachePolicy;
+
+ public LowLevelCacheImpl(Configuration conf) {
+ int minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+ long maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+ cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
+ ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this)
+ : new LowLevelFifoCachePolicy(minAllocation, maxSize, this);
+ allocator = new BuddyAllocator(conf, cachePolicy);
+ cleanupThread = new CleanupThread();
+ cleanupThread.start();
+ }
+
+ @Override
+ public void allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+ allocator.allocateMultiple(dest, size);
+ }
+
+ @Override
+ public LlapMemoryBuffer[] getFileData(String fileName, long[] offsets) {
+ LlapMemoryBuffer[] result = null;
+ FileCache subCache = cache.get(fileName);
+ if (subCache == null || !subCache.incRef()) return result;
+ try {
+ for (int i = 0; i < offsets.length; ++i) {
+ while (true) { // Overwhelmingly only runs once.
+ long offset = offsets[i];
+ LlapCacheableBuffer buffer = subCache.cache.get(offset);
+ if (buffer == null) break;
+ if (lockBuffer(buffer)) {
+ if (result == null) {
+ result = new LlapCacheableBuffer[offsets.length];
+ }
+ result[i] = buffer;
+ break;
+ }
+ if (subCache.cache.remove(offset, buffer)) break;
+ }
+ }
+ } finally {
+ subCache.decRef();
+ }
+ return result;
+ }
+
+ private boolean lockBuffer(LlapCacheableBuffer buffer) {
+ int rc = buffer.incRef();
+ if (rc == 1) {
+ cachePolicy.notifyLock(buffer);
+ }
+ return rc >= 0;
+ }
+
+ @Override
+ public long[] putFileData(String fileName, long[] offsets, LlapMemoryBuffer[] buffers) {
+ long[] result = null;
+ assert buffers.length == offsets.length;
+ FileCache subCache = getOrAddFileSubCache(fileName);
+ try {
+ for (int i = 0; i < offsets.length; ++i) {
+ LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i];
+ long offset = offsets[i];
+ assert buffer.isLocked();
+ while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
+ LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
+ if (oldVal == null) {
+ // Cached successfully, add to policy.
+ cachePolicy.cache(buffer);
+ break;
+ }
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for "
+ + fileName + "@" + offset + "; old " + oldVal + ", new " + buffer);
+ }
+ if (lockBuffer(oldVal)) {
+ // We found an old, valid block for this key in the cache.
+ releaseBufferInternal(buffer);
+ buffers[i] = oldVal;
+ if (result == null) {
+ result = new long[align64(buffers.length) >>> 6];
+ }
+ result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value
+ break;
+ }
+ // We found some old value but couldn't incRef it; remove it.
+ subCache.cache.remove(offset, oldVal);
+ }
+ }
+ } finally {
+ subCache.decRef();
+ }
+ return result;
+ }
+
+ /**
+ * All this mess is necessary because we want to be able to remove sub-caches for fully
+ * evicted files. It may actually be better to have non-nested map with object keys?
+ */
+ public FileCache getOrAddFileSubCache(String fileName) {
+ FileCache newSubCache = null;
+ while (true) { // Overwhelmingly executes once.
+ FileCache subCache = cache.get(fileName);
+ if (subCache != null) {
+ if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
+ if (newSubCache == null) {
+ newSubCache = new FileCache();
+ newSubCache.incRef();
+ }
+ // Found a stale value we cannot incRef; try to replace it with new value.
+ if (cache.replace(fileName, subCache, newSubCache)) return newSubCache;
+ continue; // Someone else replaced/removed a stale value, try again.
+ }
+ // No value found.
+ if (newSubCache == null) {
+ newSubCache = new FileCache();
+ newSubCache.incRef();
+ }
+ FileCache oldSubCache = cache.putIfAbsent(fileName, newSubCache);
+ if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
+ if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
+ // Someone created one in parallel and then it went stale.
+ if (cache.replace(fileName, oldSubCache, newSubCache)) return newSubCache;
+ // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
+ }
+ }
+
+ private static int align64(int number) {
+ return ((number + 63) & ~63);
+ }
+
+
+ @Override
+ public void releaseBuffer(LlapMemoryBuffer buffer) {
+ releaseBufferInternal((LlapCacheableBuffer)buffer);
+ }
+
+ @Override
+ public void releaseBuffers(LlapMemoryBuffer[] cacheBuffers) {
+ for (int i = 0; i < cacheBuffers.length; ++i) {
+ releaseBufferInternal((LlapCacheableBuffer)cacheBuffers[i]);
+ }
+ }
+
+ public void releaseBufferInternal(LlapCacheableBuffer buffer) {
+ if (buffer.decRef() == 0) {
+ cachePolicy.notifyUnlock(buffer);
+ }
+ }
+
+ public static LlapCacheableBuffer allocateFake() {
+ LlapCacheableBuffer fake = new LlapCacheableBuffer();
+ fake.initialize(-1, null, -1, 1);
+ return fake;
+ }
+
+ @Override
+ public void notifyEvicted(LlapCacheableBuffer buffer) {
+ allocator.deallocate(buffer);
+ newEvictions.incrementAndGet();
+ }
+
+ private static class FileCache {
+ private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
+ // TODO: given the specific data, perhaps the nested thing should not be CHM
+ private ConcurrentHashMap<Long, LlapCacheableBuffer> cache
+ = new ConcurrentHashMap<Long, LlapCacheableBuffer>();
+ private AtomicInteger refCount = new AtomicInteger(0);
+
+ boolean incRef() {
+ while (true) {
+ int value = refCount.get();
+ if (value == EVICTED_REFCOUNT) return false;
+ if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
+ assert value >= 0;
+ if (refCount.compareAndSet(value, value + 1)) return true;
+ }
+ }
+
+ void decRef() {
+ int value = refCount.decrementAndGet();
+ if (value < 0) {
+ throw new AssertionError("Unexpected refCount " + value);
+ }
+ }
+
+ boolean startEvicting() {
+ while (true) {
+ int value = refCount.get();
+ if (value != 1) return false;
+ if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
+ }
+ }
+
+ void commitEvicting() {
+ boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
+ assert result;
+ }
+
+ void abortEvicting() {
+ boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
+ assert result;
+ }
+ }
+
+ private final class CleanupThread extends Thread {
+ private int APPROX_CLEANUP_INTERVAL_SEC = 600;
+
+ public CleanupThread() {
+ super("Llap low level cache cleanup thread");
+ setDaemon(true);
+ setPriority(1);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ doOneCleanupRound();
+ } catch (InterruptedException ex) {
+ LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
+ break;
+ }
+ }
+ }
+
+ private void doOneCleanupRound() throws InterruptedException {
+ while (true) {
+ int evictionsSinceLast = newEvictions.getAndSet(0);
+ if (evictionsSinceLast > 0) break;
+ synchronized (newEvictions) {
+ newEvictions.wait(10000);
+ }
+ }
+ // Duration is an estimate; if the size of the map changes, it can be very different.
+ long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L;
+ int leftToCheck = 0; // approximate
+ for (FileCache fc : cache.values()) {
+ leftToCheck += fc.cache.size();
+ }
+ // Iterate thru all the filecaches. This is best-effort.
+ // If these super-long-lived iterator affects the map in some bad way,
+ // we'd need to sleep once per round instead.
+ Iterator<Map.Entry<String, FileCache>> iter = cache.entrySet().iterator();
+ while (iter.hasNext()) {
+ FileCache fc = iter.next().getValue();
+ if (!fc.incRef()) {
+ throw new AssertionError("Something other than cleanup is removing elements from map");
+ }
+ // Iterate thru the file cache. This is best-effort.
+ Iterator<Map.Entry<Long, LlapCacheableBuffer>> subIter = fc.cache.entrySet().iterator();
+ boolean isEmpty = true;
+ while (subIter.hasNext()) {
+ Thread.sleep((leftToCheck <= 0)
+ ? 1 : (endTime - System.nanoTime()) / (1000000L * leftToCheck));
+ if (subIter.next().getValue().isInvalid()) {
+ subIter.remove();
+ } else {
+ isEmpty = false;
+ }
+ --leftToCheck;
+ }
+ if (!isEmpty) {
+ fc.decRef();
+ continue;
+ }
+ // FileCache might be empty; see if we can remove it. "tryWriteLock"
+ if (!fc.startEvicting()) continue;
+ if (fc.cache.isEmpty()) {
+ fc.commitEvicting();
+ iter.remove();
+ } else {
+ fc.abortEvicting();
+ }
+ }
+ }
+ }
+}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Fri Jan 16 03:30:46 2015
@@ -22,5 +22,4 @@ public interface LowLevelCachePolicy {
void cache(LlapCacheableBuffer buffer);
void notifyLock(LlapCacheableBuffer buffer);
void notifyUnlock(LlapCacheableBuffer buffer);
- boolean reserveMemory(long memoryToReserve, boolean oneEviction);
}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Fri Jan 16 03:30:46 2015
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.llap.cach
import java.util.concurrent.atomic.AtomicLong;
-public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy {
+public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy, MemoryManager {
private final AtomicLong usedMemory;
private final long maxSize;
private EvictionListener evictionListener;
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/MemoryManager.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+public interface MemoryManager {
+ boolean reserveMemory(long memoryToReserve, boolean waitForEviction);
+}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Fri Jan 16 03:30:46 2015
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cache.Cache;
-import org.apache.hadoop.hive.llap.cache.LowLevelBuddyCache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.NoopCache;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.api.VectorReader;
@@ -56,7 +56,7 @@ public class LlapIoImpl implements LlapI
boolean useLowLevelCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_LOW_LEVEL_CACHE);
// High-level cache not supported yet.
Cache<OrcCacheKey> cache = useLowLevelCache ? null : new NoopCache<OrcCacheKey>();
- LowLevelBuddyCache orcCache = useLowLevelCache ? new LowLevelBuddyCache(conf) : null;
+ LowLevelCacheImpl orcCache = useLowLevelCache ? new LowLevelCacheImpl(conf) : null;
this.edp = new OrcEncodedDataProducer(orcCache, cache, conf);
this.cvp = new OrcColumnVectorProducer(edp, conf);
}
Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java (added)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBuddyAllocator {
+ private static final Log LOG = LogFactory.getLog(TestBuddyAllocator.class);
+ private final Random rdm = new Random(2284);
+
+ private static class DummyMemoryManager implements MemoryManager {
+ @Override
+ public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) {
+ return true;
+ }
+ }
+
+ @Test
+ public void testVariableSizeAllocs() {
+ testVariableSizeInternal(1, 2, 1);
+ }
+
+ @Test
+ public void testVariableSizeMultiAllocs() {
+ testVariableSizeInternal(3, 2, 3);
+ testVariableSizeInternal(5, 2, 5);
+ }
+
+ @Test
+ public void testSameSizes() {
+ int min = 3, max = 8, maxAlloc = 1 << max;
+ Configuration conf = createConf(1 << min, maxAlloc, maxAlloc, maxAlloc);
+ BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
+ for (int i = min; i <= max; i <<= 1) {
+ allocSameSize(a, 1 << (max - i), i);
+ }
+ }
+
+ @Test
+ public void testMultipleArenas() {
+ int max = 8, maxAlloc = 1 << max, allocLog2 = max - 1, arenaCount = 5;
+ Configuration conf = createConf(1 << 3, maxAlloc, maxAlloc, maxAlloc * arenaCount);
+ BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
+ allocSameSize(a, arenaCount * 2, allocLog2);
+ }
+
+ @Test
+ public void testMTT() {
+ final int min = 3, max = 8, maxAlloc = 1 << max, allocsPerSize = 3;
+ Configuration conf = createConf(1 << min, maxAlloc, maxAlloc * 8, maxAlloc * 24);
+ final BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
+ ExecutorService executor = Executors.newFixedThreadPool(3);
+ FutureTask<Object> upTask = new FutureTask<Object>(new Runnable() {
+ public void run() {
+ allocateUp(a, min, max, allocsPerSize, false);
+ allocateUp(a, min, max, allocsPerSize, true);
+ }
+ }, null), downTask = new FutureTask<Object>(new Runnable() {
+ public void run() {
+ allocateDown(a, min, max, allocsPerSize, false);
+ allocateDown(a, min, max, allocsPerSize, true);
+ }
+ }, null), sameTask = new FutureTask<Object>(new Runnable() {
+ public void run() {
+ for (int i = min; i <= max; i <<= 1) {
+ allocSameSize(a, (1 << (max - i)) * allocsPerSize, i);
+ }
+ }
+ }, null);
+ executor.execute(sameTask);
+ executor.execute(upTask);
+ executor.execute(downTask);
+ try {
+ upTask.get();
+ downTask.get();
+ sameTask.get();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+
+ private void testVariableSizeInternal(int allocCount, int arenaSizeMult, int arenaCount) {
+ int min = 3, max = 8, maxAlloc = 1 << max, arenaSize = maxAlloc * arenaSizeMult;
+ Configuration conf = createConf(1 << min, maxAlloc, arenaSize, arenaSize * arenaCount);
+ BuddyAllocator a = new BuddyAllocator(conf, new DummyMemoryManager());
+ allocateUp(a, min, max, allocCount, true);
+ allocateDown(a, min, max, allocCount, true);
+ allocateDown(a, min, max, allocCount, false);
+ allocateUp(a, min, max, allocCount, true);
+ allocateUp(a, min, max, allocCount, false);
+ allocateDown(a, min, max, allocCount, true);
+ }
+
+ private void allocSameSize(BuddyAllocator a, int allocCount, int sizeLog2) {
+ LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[allocCount][];
+ long[][] testValues = new long[allocCount][];
+ for (int j = 0; j < allocCount; ++j) {
+ allocateAndUseBuffer(a, allocs, testValues, 1, j, sizeLog2);
+ }
+ deallocUpOrDown(a, false, allocs, testValues);
+ }
+
+ private void allocateUp(
+ BuddyAllocator a, int min, int max, int allocPerSize, boolean isSameOrderDealloc) {
+ int sizes = max - min + 1;
+ LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
+ // Put in the beginning; relies on the knowledge of internal implementation. Pave?
+ long[][] testValues = new long[sizes][];
+ for (int i = min; i <= max; ++i) {
+ allocateAndUseBuffer(a, allocs, testValues, allocPerSize, i - min, i);
+ }
+ deallocUpOrDown(a, isSameOrderDealloc, allocs, testValues);
+ }
+
+ private void allocateDown(
+ BuddyAllocator a, int min, int max, int allocPerSize, boolean isSameOrderDealloc) {
+ int sizes = max - min + 1;
+ LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
+ // Put in the beginning; relies on the knowledge of internal implementation. Pave?
+ long[][] testValues = new long[sizes][];
+ for (int i = max; i >= min; --i) {
+ allocateAndUseBuffer(a, allocs, testValues, allocPerSize, i - min, i);
+ }
+ deallocUpOrDown(a, isSameOrderDealloc, allocs, testValues);
+ }
+
+ private void allocateAndUseBuffer(BuddyAllocator a, LlapMemoryBuffer[][] allocs,
+ long[][] testValues, int allocCount, int index, int sizeLog2) {
+ allocs[index] = new LlapMemoryBuffer[allocCount];
+ testValues[index] = new long[allocCount];
+ int size = (1 << sizeLog2) - 1;
+ if (!a.allocateMultiple(allocs[index], size)) {
+ LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.debugDump());
+ fail();
+ }
+ // LOG.info("Allocated " + allocCount + " of " + size + "; " + a.debugDump());
+ for (int j = 0; j < allocCount; ++j) {
+ LlapMemoryBuffer mem = allocs[index][j];
+ long testValue = testValues[index][j] = rdm.nextLong();
+ mem.byteBuffer.putLong(mem.offset, testValue);
+ int halfLength = mem.length >> 1;
+ if (halfLength + 8 <= mem.length) {
+ mem.byteBuffer.putLong(mem.offset + halfLength, testValue);
+ }
+ }
+ }
+
+ private void deallocUpOrDown(BuddyAllocator a, boolean isSameOrderDealloc,
+ LlapMemoryBuffer[][] allocs, long[][] testValues) {
+ if (isSameOrderDealloc) {
+ for (int i = 0; i < allocs.length; ++i) {
+ deallocBuffers(a, allocs[i], testValues[i]);
+ }
+ } else {
+ for (int i = allocs.length - 1; i >= 0; --i) {
+ deallocBuffers(a, allocs[i], testValues[i]);
+ }
+ }
+ }
+
+ private void deallocBuffers(
+ BuddyAllocator a, LlapMemoryBuffer[] allocs, long[] testValues) {
+ for (int j = 0; j < allocs.length; ++j) {
+ LlapCacheableBuffer mem = (LlapCacheableBuffer)allocs[j];
+ assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset));
+ int halfLength = mem.length >> 1;
+ if (halfLength + 8 <= mem.length) {
+ assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset + halfLength));
+ }
+ a.deallocate(mem);
+ }
+ }
+
+ private Configuration createConf(int min, int max, int arena, int total) {
+ Configuration conf = new Configuration();
+ conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, min);
+ conf.setInt(ConfVars.LLAP_ORC_CACHE_MAX_ALLOC.varname, max);
+ conf.setInt(ConfVars.LLAP_ORC_CACHE_ARENA_SIZE.varname, arena);
+ conf.setLong(ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, total);
+ return conf;
+ }
+}
Added: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1652337&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (added)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Fri Jan 16 03:30:46 2015
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Assume;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestLowLevelCacheImpl {
+ private static final Log LOG = LogFactory.getLog(TestLowLevelCacheImpl.class);
+}
Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java?rev=1652337&r1=1652336&r2=1652337&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java Fri Jan 16 03:30:46 2015
@@ -38,11 +38,6 @@ public class TestLowLevelLrfuCachePolicy
}
@Test
- public void testHeapSize7() {
- testHeapSize(7);
- }
-
- @Test
public void testHeapSize8() {
testHeapSize(8);
}
@@ -52,6 +47,11 @@ public class TestLowLevelLrfuCachePolicy
testHeapSize(30);
}
+ @Test
+ public void testHeapSize64() {
+ testHeapSize(64);
+ }
+
private class EvictionTracker implements EvictionListener {
public List<LlapCacheableBuffer> evicted = new ArrayList<LlapCacheableBuffer>();
@@ -72,7 +72,7 @@ public class TestLowLevelLrfuCachePolicy
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
for (int i = 0; i < heapSize; ++i) {
- LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+ LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
assertTrue(cache(lfu, et, buffer));
inserted.add(buffer);
}
@@ -99,7 +99,7 @@ public class TestLowLevelLrfuCachePolicy
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
for (int i = 0; i < heapSize; ++i) {
- LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+ LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
assertTrue(cache(lru, et, buffer));
inserted.add(buffer);
}
@@ -122,7 +122,7 @@ public class TestLowLevelLrfuCachePolicy
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(new HiveConf(), 1, heapSize, et);
for (int i = 0; i < heapSize; ++i) {
- LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+ LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
assertTrue(cache(lrfu, et, buffer));
inserted.add(buffer);
}
@@ -137,7 +137,7 @@ public class TestLowLevelLrfuCachePolicy
unlock(lrfu, locked);
}
- private static final LlapCacheableBuffer CANNOT_EVICT = LowLevelBuddyCache.allocateFake();
+ private static final LlapCacheableBuffer CANNOT_EVICT = LowLevelCacheImpl.allocateFake();
// Buffers in test are fakes not linked to cache; notify cache policy explicitly.
public boolean cache(
LowLevelLrfuCachePolicy lrfu, EvictionTracker et, LlapCacheableBuffer buffer) {
@@ -172,7 +172,7 @@ public class TestLowLevelLrfuCachePolicy
LOG.info("Testing heap size " + heapSize);
Random rdm = new Random(1234);
HiveConf conf = new HiveConf();
- conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.05f); // very small heap? TODO#
+ conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf, 1, heapSize, et);
// Insert the number of elements plus 2, to trigger 2 evictions.
@@ -181,7 +181,7 @@ public class TestLowLevelLrfuCachePolicy
LlapCacheableBuffer[] evicted = new LlapCacheableBuffer[toEvict];
Assume.assumeTrue(toEvict <= heapSize);
for (int i = 0; i < heapSize + toEvict; ++i) {
- LlapCacheableBuffer buffer = LowLevelBuddyCache.allocateFake();
+ LlapCacheableBuffer buffer = LowLevelCacheImpl.allocateFake();
assertTrue(cache(lrfu, et, buffer));
LlapCacheableBuffer evictedBuf = getOneEvictedBuffer(et);
if (i < toEvict) {