You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/06/14 03:16:25 UTC
hive git commit: HIVE-19866 : improve LLAP cache purge (Sergey
Shelukhin, reviewed by Gopal Vijayaraghavan)
Repository: hive
Updated Branches:
refs/heads/master 93e33e119 -> 086700efa
HIVE-19866 : improve LLAP cache purge (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/086700ef
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/086700ef
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/086700ef
Branch: refs/heads/master
Commit: 086700efae4a33ad6eec0e845cdb9480164450a2
Parents: 93e33e1
Author: sergey <se...@apache.org>
Authored: Wed Jun 13 19:49:04 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Jun 13 19:49:04 2018 -0700
----------------------------------------------------------------------
.../llap/cache/LowLevelCacheMemoryManager.java | 12 +++
.../llap/cache/LowLevelLrfuCachePolicy.java | 77 ++++++++++++++++++--
.../hive/llap/io/api/impl/LlapIoImpl.java | 18 +++--
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 40 ++++++++++
4 files changed, 133 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/086700ef/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 60d56d6..4297cfc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -133,4 +133,16 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
public void updateMaxSize(long maxSize) {
this.maxSize = maxSize;
}
+
+ public long purge() {
+ if (evictor == null) return 0;
+ long evicted = evictor.purge();
+ if (evicted == 0) return 0;
+ long usedMem = -1;
+ do {
+ usedMem = usedMemory.get();
+ } while (!usedMemory.compareAndSet(usedMem, usedMem - evicted));
+ metrics.incrCacheCapacityUsed(-evicted);
+ return evicted;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/086700ef/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 82e1934..e552fee 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -57,17 +57,18 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
* Perhaps we should use ConcurrentDoubleLinkedList (in public domain).
* ONLY LIST REMOVAL is allowed under list lock.
*/
- private final LlapCacheableBuffer[] heap;
+ private LlapCacheableBuffer[] heap;
+ private final Object heapLock = new Object();
private final ReentrantLock listLock = new ReentrantLock();
private LlapCacheableBuffer listHead, listTail;
/** Number of elements. */
private int heapSize = 0;
+ private final int maxHeapSize;
private EvictionListener evictionListener;
public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) {
lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
- int maxHeapSize = -1;
if (lambda == 0) {
maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case
} else {
@@ -120,7 +121,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
}
- synchronized (heap) {
+ synchronized (heapLock) {
// First, update buffer priority - we have just been using it.
buffer.priority = (buffer.lastUpdate == -1) ? F0
: touchPriority(time, buffer.lastUpdate, buffer.priority);
@@ -174,11 +175,75 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
@Override
public long purge() {
- long evicted = evictSomeBlocks(Long.MAX_VALUE);
- LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy", LlapUtil.humanReadableByteCount(evicted));
+ long evicted = 0;
+ LlapCacheableBuffer oldTail = null;
+ listLock.lock();
+ try {
+ LlapCacheableBuffer current = oldTail = listTail;
+ while (current != null) {
+ boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK != current.invalidate();
+ current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
+ if (canEvict) {
+ current = current.prev;
+ } else {
+ // Remove from the list.
+ LlapCacheableBuffer newCurrent = current.prev;
+ oldTail = removeFromLocalList(oldTail, current);
+ current = newCurrent;
+ }
+ }
+ listHead = listTail = null;
+ } finally {
+ listLock.unlock();
+ }
+
+ LlapCacheableBuffer[] oldHeap = null;
+ int oldHeapSize = -1;
+ synchronized (heapLock) {
+ oldHeap = heap;
+ oldHeapSize = heapSize;
+ heap = new LlapCacheableBuffer[maxHeapSize];
+ heapSize = 0;
+ for (int i = 0; i < oldHeapSize; ++i) {
+ LlapCacheableBuffer result = oldHeap[i];
+ result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
+ int invalidateResult = result.invalidate();
+ if (invalidateResult != LlapCacheableBuffer.INVALIDATE_OK) {
+ oldHeap[i] = null; // Removed from heap without evicting.
+ }
+ }
+ }
+ LlapCacheableBuffer current = oldTail;
+ while (current != null) {
+ evicted += current.getMemoryUsage();
+ evictionListener.notifyEvicted(current);
+ current = current.prev;
+ }
+ for (int i = 0; i < oldHeapSize; ++i) {
+ current = oldHeap[i];
+ if (current == null) continue;
+ evicted += current.getMemoryUsage();
+ evictionListener.notifyEvicted(current);
+ }
+ LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy",
+ LlapUtil.humanReadableByteCount(evicted));
return evicted;
}
+ private static LlapCacheableBuffer removeFromLocalList(
+ LlapCacheableBuffer tail, LlapCacheableBuffer current) {
+ if (current == tail) {
+ tail = current.prev;
+ } else {
+ current.next.prev = current.prev;
+ }
+ if (current.prev != null) {
+ current.prev.next = current.next;
+ }
+ current.prev = current.next = null;
+ return tail;
+ }
+
@Override
public long evictSomeBlocks(long memoryToReserve) {
@@ -190,7 +255,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
long time = timer.get();
while (evicted < memoryToReserve) {
LlapCacheableBuffer buffer = null;
- synchronized (heap) {
+ synchronized (heapLock) {
buffer = evictFromHeapUnderLock(time);
}
if (buffer == null) return evicted;
http://git-wip-us.apache.org/repos/asf/hive/blob/086700ef/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index e1e8a32..2fffeb8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -102,7 +102,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
private final LowLevelCache dataCache;
private final BufferUsageManager bufferManager;
private final Configuration daemonConf;
- private LowLevelCachePolicy cachePolicyWrapper;
+ private final LowLevelCacheMemoryManager memoryManager;
private List<LlapIoDebugDump> debugDumpComponents = new ArrayList<>();
@@ -147,17 +147,18 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
LowLevelCachePolicy realCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
+ LowLevelCachePolicy cachePolicyWrapper;
if (trackUsage) {
- this.cachePolicyWrapper = new CacheContentsTracker(realCachePolicy);
+ cachePolicyWrapper = new CacheContentsTracker(realCachePolicy);
} else {
- this.cachePolicyWrapper = realCachePolicy;
+ cachePolicyWrapper = realCachePolicy;
}
// Allocator uses memory manager to request memory, so create the manager next.
- LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+ this.memoryManager = new LowLevelCacheMemoryManager(
totalMemorySize, cachePolicyWrapper, cacheMetrics);
cacheMetrics.setCacheCapacityTotal(totalMemorySize);
// Cache uses allocator to allocate and deallocate, create allocator and then caches.
- BuddyAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+ BuddyAllocator allocator = new BuddyAllocator(conf, memoryManager, cacheMetrics);
this.allocator = allocator;
LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
cacheMetrics, cachePolicyWrapper, allocator, true);
@@ -170,7 +171,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
metadataCache = new MetadataCache(
- allocator, memManager, cachePolicyWrapper, useGapCache, cacheMetrics);
+ allocator, memoryManager, cachePolicyWrapper, useGapCache, cacheMetrics);
fileMetadataCache = metadataCache;
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
EvictionDispatcher e = new EvictionDispatcher(
@@ -198,6 +199,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm;
dataCache = sbm;
+ this.memoryManager = null;
debugDumpComponents.add(new LlapIoDebugDump() {
@Override
public void debugDumpShort(StringBuilder sb) {
@@ -234,8 +236,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
@Override
public long purge() {
- if (cachePolicyWrapper != null) {
- return cachePolicyWrapper.purge();
+ if (memoryManager != null) {
+ return memoryManager.purge();
}
return 0;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/086700ef/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 99841aa..6eb2eb5 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -179,6 +179,46 @@ public class TestLowLevelLrfuCachePolicy {
}
@Test
+ public void testPurge() {
+ final int HEAP_SIZE = 32;
+ Configuration conf = new Configuration();
+ conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f);
+ EvictionTracker et = new EvictionTracker();
+ LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, HEAP_SIZE, conf);
+ MetricsMock m = createMetricsMock();
+ LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(
+ HEAP_SIZE, lrfu, m.metricsMock);
+ lrfu.setEvictionListener(et);
+ assertEquals(0, lrfu.purge());
+ for (int testSize = 1; testSize <= HEAP_SIZE; ++testSize) {
+ LOG.info("Starting with " + testSize);
+ ArrayList<LlapDataBuffer> purge = new ArrayList<LlapDataBuffer>(testSize),
+ dontPurge = new ArrayList<LlapDataBuffer>(testSize);
+ for (int i = 0; i < testSize; ++i) {
+ LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
+ assertTrue(cache(mm, lrfu, et, buffer));
+ // Lock a few blocks without telling the policy.
+ if ((i + 1) % 3 == 0) {
+ buffer.incRef();
+ dontPurge.add(buffer);
+ } else {
+ purge.add(buffer);
+ }
+ }
+ lrfu.purge();
+ for (LlapDataBuffer buffer : purge) {
+ assertTrue(buffer + " " + testSize, buffer.isInvalid());
+ mm.releaseMemory(buffer.getMemoryUsage());
+ }
+ for (LlapDataBuffer buffer : dontPurge) {
+ assertFalse(buffer.isInvalid());
+ buffer.decRef();
+ mm.releaseMemory(buffer.getMemoryUsage());
+ }
+ }
+ }
+
+ @Test
public void testDeadlockResolution() {
int heapSize = 4;
LOG.info("Testing deadlock resolution");