You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/12/08 02:32:09 UTC
hive git commit: HIVE-12591 : LLAP cache counters displays -ve value
for CacheCapacityUsed (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/branch-2.0 f9d100264 -> bfc25f42f
HIVE-12591 : LLAP cache counters displays -ve value for CacheCapacityUsed (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bfc25f42
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bfc25f42
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bfc25f42
Branch: refs/heads/branch-2.0
Commit: bfc25f42f7c14aae907ae7dd96ba593469fcb9db
Parents: f9d1002
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Dec 7 17:26:03 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Dec 7 17:31:54 2015 -0800
----------------------------------------------------------------------
.../hadoop/hive/llap/cache/BuddyAllocator.java | 1 -
.../llap/cache/LowLevelCacheMemoryManager.java | 41 +++++++++------
.../llap/metrics/LlapDaemonCacheMetrics.java | 13 ++---
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 52 +++++++++++++++++---
4 files changed, 78 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bfc25f42/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 485a145..0c96efa 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -216,7 +216,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
private void deallocateInternal(MemoryBuffer buffer, boolean doReleaseMemory) {
LlapDataBuffer buf = (LlapDataBuffer)buffer;
long memUsage = buf.getMemoryUsage();
- metrics.decrCacheCapacityUsed(buf.byteBuffer.capacity());
arenas[buf.arenaIndex].deallocate(buf);
if (doReleaseMemory) {
memoryManager.releaseMemory(memUsage);
http://git-wip-us.apache.org/repos/asf/hive/blob/bfc25f42/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index d584ca8..8788e15 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -43,27 +43,35 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
this.evictor = evictor;
this.usedMemory = new AtomicLong(0);
this.metrics = metrics;
- metrics.incrCacheCapacityTotal(maxSize);
+ metrics.setCacheCapacityTotal(maxSize);
if (LlapIoImpl.LOGL.isInfoEnabled()) {
LlapIoImpl.LOG.info("Cache memory manager initialized with max size " + maxSize);
}
}
@Override
- public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) {
+ public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) {
// TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point?
int badCallCount = 0;
int nextLog = 4;
- while (memoryToReserve > 0) {
- long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve;
+ long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = memoryToReserve;
+ boolean result = true;
+ while (remainingToReserve > 0) {
+ long usedMem = usedMemory.get(), newUsedMem = usedMem + remainingToReserve;
if (newUsedMem <= maxSize) {
- if (usedMemory.compareAndSet(usedMem, newUsedMem)) break;
+ if (usedMemory.compareAndSet(usedMem, newUsedMem)) {
+ reservedTotalMetric += remainingToReserve;
+ break;
+ }
continue;
}
// TODO: for one-block case, we could move notification for the last block out of the loop.
- long evicted = evictor.evictSomeBlocks(memoryToReserve);
+ long evicted = evictor.evictSomeBlocks(remainingToReserve);
if (evicted == 0) {
- if (!waitForEviction) return false;
+ if (!waitForEviction) {
+ result = false;
+ break;
+ }
++badCallCount;
if (badCallCount == nextLog) {
LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?");
@@ -72,24 +80,28 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
Thread.sleep(Math.min(1000, nextLog));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- return false;
+ result = false;
+ break;
}
}
continue;
}
+ evictedTotalMetric += evicted;
badCallCount = 0;
// Adjust the memory - we have to account for what we have just evicted.
while (true) {
- long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted);
- if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reserveWithEviction)) {
- memoryToReserve -= reserveWithEviction;
+ long availableToReserveAfterEvict = maxSize - usedMem + evicted;
+ long reservedAfterEvict = Math.min(remainingToReserve, availableToReserveAfterEvict);
+ if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reservedAfterEvict)) {
+ remainingToReserve -= reservedAfterEvict;
+ reservedTotalMetric += reservedAfterEvict;
break;
}
usedMem = usedMemory.get();
}
}
- metrics.incrCacheCapacityUsed(memoryToReserve);
- return true;
+ metrics.incrCacheCapacityUsed(reservedTotalMetric - evictedTotalMetric);
+ return result;
}
@@ -103,12 +115,13 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
}
@Override
- public void releaseMemory(long memoryToRelease) {
+ public void releaseMemory(final long memoryToRelease) {
long oldV;
do {
oldV = usedMemory.get();
assert oldV >= memoryToRelease;
} while (!usedMemory.compareAndSet(oldV, oldV - memoryToRelease));
+ metrics.incrCacheCapacityUsed(-memoryToRelease);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/bfc25f42/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
index 3ffa6e0..52057e4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import com.google.common.annotations.VisibleForTesting;
@@ -53,9 +54,9 @@ public class LlapDaemonCacheMetrics implements MetricsSource {
@Metric
MutableCounterLong cacheReadRequests;
@Metric
- MutableCounterLong cacheCapacityTotal;
+ MutableGaugeLong cacheCapacityTotal;
@Metric
- MutableCounterLong cacheCapacityUsed;
+ MutableCounterLong cacheCapacityUsed; // Not using the gauge to avoid races.
@Metric
MutableCounterLong cacheRequestedBytes;
@Metric
@@ -77,18 +78,14 @@ public class LlapDaemonCacheMetrics implements MetricsSource {
return ms.register(displayName, null, new LlapDaemonCacheMetrics(displayName, sessionId));
}
- public void incrCacheCapacityTotal(long delta) {
- cacheCapacityTotal.incr(delta);
+ public void setCacheCapacityTotal(long value) {
+ cacheCapacityTotal.set(value);
}
public void incrCacheCapacityUsed(long delta) {
cacheCapacityUsed.incr(delta);
}
- public void decrCacheCapacityUsed(int delta) {
- cacheCapacityUsed.incr(-delta);
- }
-
public void incrCacheRequestedBytes(long delta) {
cacheRequestedBytes.incr(delta);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bfc25f42/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index bb530ef..d0abfa3 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -18,6 +18,13 @@
package org.apache.hadoop.hive.llap.cache;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.mockito.invocation.InvocationOnMock;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -133,7 +140,7 @@ public class TestLowLevelLrfuCachePolicy {
lfu.notifyUnlock(inserted.get(i));
}
}
- verifyOrder(mm, lfu, et, inserted);
+ verifyOrder(mm, lfu, et, inserted, null);
}
private Configuration createConf(int min, int heapSize, Double lambda) {
@@ -176,7 +183,7 @@ public class TestLowLevelLrfuCachePolicy {
lru.notifyUnlock(inserted.get(i));
}
}
- verifyOrder(mm, lru, et, inserted);
+ verifyOrder(mm, lru, et, inserted, null);
}
@Test
@@ -236,6 +243,27 @@ public class TestLowLevelLrfuCachePolicy {
lrfu.notifyUnlock(locked);
}
+ private static class MetricsMock {
+ public MetricsMock(AtomicLong cacheUsed, LlapDaemonCacheMetrics metricsMock) {
+ this.cacheUsed = cacheUsed;
+ this.metricsMock = metricsMock;
+ }
+ public AtomicLong cacheUsed;
+ public LlapDaemonCacheMetrics metricsMock;
+ }
+
+ private MetricsMock createMetricsMock() {
+ LlapDaemonCacheMetrics metricsMock = mock(LlapDaemonCacheMetrics.class);
+ final AtomicLong cacheUsed = new AtomicLong(0);
+ doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ cacheUsed.addAndGet((Long)invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(metricsMock).incrCacheCapacityUsed(anyLong());
+ return new MetricsMock(cacheUsed, metricsMock);
+ }
+
private void testHeapSize(int heapSize) {
LOG.info("Testing heap size " + heapSize);
Random rdm = new Random(1234);
@@ -243,8 +271,8 @@ public class TestLowLevelLrfuCachePolicy {
conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf);
- LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu,
- LlapDaemonCacheMetrics.create("test", "1"));
+ MetricsMock m = createMetricsMock();
+ LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu, m.metricsMock);
lrfu.setEvictionListener(et);
// Insert the number of elements plus 2, to trigger 2 evictions.
int toEvict = 2;
@@ -254,6 +282,7 @@ public class TestLowLevelLrfuCachePolicy {
for (int i = 0; i < heapSize + toEvict; ++i) {
LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
assertTrue(cache(mm, lrfu, et, buffer));
+ assertEquals((long)Math.min(i + 1, heapSize), m.cacheUsed.get());
LlapDataBuffer evictedBuf = getOneEvictedBuffer(et);
if (i < toEvict) {
evicted[i] = buffer;
@@ -275,6 +304,7 @@ public class TestLowLevelLrfuCachePolicy {
for (LlapDataBuffer buf : inserted) {
lock(lrfu, buf);
}
+ assertEquals(heapSize, m.cacheUsed.get());
assertFalse(mm.reserveMemory(1, false));
if (!et.evicted.isEmpty()) {
assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty());
@@ -291,24 +321,34 @@ public class TestLowLevelLrfuCachePolicy {
lrfu.notifyUnlock(buf);
}
}
- verifyOrder(mm, lrfu, et, inserted);
+ verifyOrder(mm, lrfu, et, inserted, m.cacheUsed);
}
private void verifyOrder(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy lrfu,
- EvictionTracker et, ArrayList<LlapDataBuffer> inserted) {
+ EvictionTracker et, ArrayList<LlapDataBuffer> inserted, AtomicLong cacheUsed) {
LlapDataBuffer block;
// Evict all blocks.
et.evicted.clear();
for (int i = 0; i < inserted.size(); ++i) {
assertTrue(mm.reserveMemory(1, false));
+ if (cacheUsed != null) {
+ assertEquals(inserted.size(), cacheUsed.get());
+ }
}
// The map should now be empty.
assertFalse(mm.reserveMemory(1, false));
+ if (cacheUsed != null) {
+ assertEquals(inserted.size(), cacheUsed.get());
+ }
for (int i = 0; i < inserted.size(); ++i) {
block = et.evicted.get(i);
assertTrue(block.isInvalid());
assertSame(inserted.get(i), block);
}
+ if (cacheUsed != null) {
+ mm.releaseMemory(inserted.size());
+ assertEquals(0, cacheUsed.get());
+ }
}
private String dumpInserted(ArrayList<LlapDataBuffer> inserted) {