You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/11/18 18:30:45 UTC
[hive] branch master updated: HIVE-22492: Amortize lock contention
due to LRFU accounting (Slim Bouguerra via Gopal V)
This is an automated email from the ASF dual-hosted git repository.
bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0781cf2 HIVE-22492: Amortize lock contention due to LRFU accounting (Slim Bouguerra via Gopal V)
0781cf2 is described below
commit 0781cf2c5104dafd0c5496631cafabac9d59df67
Author: Slim Bouguerra <bs...@apache.org>
AuthorDate: Mon Nov 18 10:29:20 2019 -0800
HIVE-22492: Amortize lock contention due to LRFU accounting (Slim Bouguerra via Gopal V)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 6 +-
.../hive/llap/cache/LowLevelLrfuCachePolicy.java | 86 +++++++++++++++++-----
.../hadoop/hive/llap/io/api/impl/LlapIoImpl.java | 7 +-
.../cache/TestCacheAllocationsEvictionsCycles.java | 1 +
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 6 ++
5 files changed, 85 insertions(+), 21 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8aaf546..9162253 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -359,6 +359,7 @@ public class HiveConf extends Configuration {
llapDaemonVarsSetLocal.add(ConfVars.LLAP_ALLOCATOR_DIRECT.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_USE_LRFU.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_LAMBDA.varname);
+ llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_USE_FILEID_PATH.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS.varname);
@@ -4119,12 +4120,15 @@ public class HiveConf extends Configuration {
"partitions and tables) for reporting."),
LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
"Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
- LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.000001f,
+ LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.1f,
"Lambda for ORC low-level cache LRFU cache policy. Must be in [0, 1]. 0 makes LRFU\n" +
"behave like LFU, 1 makes it behave like LRU, values in between balance accordingly.\n" +
"The meaning of this parameter is the inverse of the number of time ticks (cache\n" +
" operations, currently) that cause the combined recency-frequency of a block in cache\n" +
" to be halved."),
+ LLAP_LRFU_BP_WRAPPER_SIZE("hive.llap.io.lrfu.bp.wrapper.size", 64, "thread local queue "
+ + "used to amortize the lock contention, the idea hear is to try locking as soon we reach max size / 2 "
+ + "and block when max queue size reached"),
LLAP_CACHE_ALLOW_SYNTHETIC_FILEID("hive.llap.cache.allow.synthetic.fileid", true,
"Whether LLAP cache should use synthetic file ID if real one is not available. Systems\n" +
"like HDFS, Isilon, etc. provide a unique file/inode ID. On other FSes (e.g. local\n" +
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 759819d..d1d6acd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.metrics2.impl.MsInfo;
* that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies".
* Additionally, buffer locking has to be handled (locked buffer cannot be evicted).
*/
-public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
+public final class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
private final double lambda;
private double f(long x) {
return Math.pow(0.5, lambda * x);
@@ -67,7 +67,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
* ONLY LIST REMOVAL is allowed under list lock.
*/
private LlapCacheableBuffer[] heap;
- private final Object heapLock = new Object();
+ private final ReentrantLock heapLock = new ReentrantLock();
private final ReentrantLock listLock = new ReentrantLock();
private LlapCacheableBuffer listHead, listTail;
/** Number of elements. */
@@ -75,9 +75,15 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
private final int maxHeapSize;
private EvictionListener evictionListener;
private final PolicyMetrics metrics;
+ private final ThreadLocal<LlapCacheableBuffer[]> threadLocalBuffers;
+ private final ThreadLocal<Integer> threadLocalCount;
+ private final int maxQueueSize;
public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) {
- lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
+
+ this.maxQueueSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE);
+ this.lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
+
int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
if (lambda == 0) {
maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case
@@ -99,8 +105,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
// register new metrics provider for this cache policy
metrics = new PolicyMetrics(sessID);
- LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(),
- null, metrics);
+ LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(), null, metrics);
+ threadLocalBuffers = ThreadLocal.withInitial(() -> new LlapCacheableBuffer[maxQueueSize]);
+ threadLocalCount = ThreadLocal.withInitial(() -> 0);
}
@Override
@@ -140,11 +147,47 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
@Override
public void notifyUnlock(LlapCacheableBuffer buffer) {
- long time = timer.incrementAndGet();
- if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
- LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
+
+ int count = threadLocalCount.get();
+ final LlapCacheableBuffer[] cacheableBuffers = threadLocalBuffers.get() ;
+ if (count < maxQueueSize) {
+ cacheableBuffers[count] = buffer;
+ threadLocalCount.set(++count);
+ }
+ if (count <= maxQueueSize / 2) {
+ // case too early to flush
+ return;
+ }
+
+ if (count == maxQueueSize) {
+ // case we have to flush thus block on heap lock
+ heapLock.lock();
+ try {
+ doNotifyUnderHeapLock(count, cacheableBuffers);
+ } finally {
+ threadLocalCount.set(0);
+ heapLock.unlock();
+ }
+ return;
}
- synchronized (heapLock) {
+ if (heapLock.tryLock()) {
+ try {
+ doNotifyUnderHeapLock(count, cacheableBuffers);
+ } finally {
+ threadLocalCount.set(0);
+ heapLock.unlock();
+ }
+ }
+ }
+
+ private void doNotifyUnderHeapLock(int count, LlapCacheableBuffer[] cacheableBuffers) {
+ LlapCacheableBuffer buffer;
+ for (int i = 0; i < count; i++) {
+ buffer = cacheableBuffers[i];
+ long time = timer.incrementAndGet();
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
+ }
// First, update buffer priority - we have just been using it.
buffer.priority = (buffer.lastUpdate == -1) ? F0
: touchPriority(time, buffer.lastUpdate, buffer.priority);
@@ -200,7 +243,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
@Override
public long purge() {
long evicted = 0;
- LlapCacheableBuffer oldTail = null;
+ LlapCacheableBuffer oldTail;
listLock.lock();
try {
LlapCacheableBuffer current = listTail;
@@ -223,9 +266,10 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
listLock.unlock();
}
- LlapCacheableBuffer[] oldHeap = null;
- int oldHeapSize = -1;
- synchronized (heapLock) {
+ LlapCacheableBuffer[] oldHeap;
+ int oldHeapSize;
+ heapLock.lock();
+ try {
oldHeap = heap;
oldHeapSize = heapSize;
heap = new LlapCacheableBuffer[maxHeapSize];
@@ -238,6 +282,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
oldHeap[i] = null; // Removed from heap without evicting.
}
}
+ } finally {
+ heapLock.unlock();
}
LlapCacheableBuffer current = oldTail;
while (current != null) {
@@ -285,9 +331,12 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
// there's a small number of buffers and they all live in the heap).
long time = timer.get();
while (evicted < memoryToReserve) {
- LlapCacheableBuffer buffer = null;
- synchronized (heapLock) {
+ LlapCacheableBuffer buffer;
+ heapLock.lock();
+ try {
buffer = evictFromHeapUnderLock(time);
+ } finally {
+ heapLock.unlock();
}
if (buffer == null) {
return evicted;
@@ -300,7 +349,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
private long evictFromList(long memoryToReserve) {
long evicted = 0;
- LlapCacheableBuffer nextCandidate = null, firstCandidate = null;
+ LlapCacheableBuffer nextCandidate, firstCandidate;
listLock.lock();
// We assume that there are no locked blocks in the list; or if they are, they can be dropped.
// Therefore we always evict one contiguous sequence from the tail. We can find it in one pass,
@@ -702,7 +751,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
long lockedMeta = 0L; // number of bytes in locked metadata buffers
// aggregate values on the heap
- synchronized (heapLock) {
+ heapLock.lock();
+ try {
for (int heapIdx = 0; heapIdx < heapSize; ++heapIdx) {
LlapCacheableBuffer buff = heap[heapIdx];
@@ -720,6 +770,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
}
}
+ } finally {
+ heapLock.unlock();
}
// aggregate values on the evicition short list
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 0d9077c..fadefa2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -144,9 +144,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
// Memory manager uses cache policy to trigger evictions, so create the policy first.
boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
- int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
- LowLevelCachePolicy realCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
- minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
+ int minAllocSize = (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+ LowLevelCachePolicy
+ realCachePolicy =
+ useLrfu ? new LowLevelLrfuCachePolicy(minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
LowLevelCachePolicy cachePolicyWrapper;
if (trackUsage) {
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java
index f4d9057..d7f02d4 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java
@@ -57,6 +57,7 @@ public class TestCacheAllocationsEvictionsCycles {
Configuration conf = new Configuration();
// Set lambda to 1 so the heap size becomes 1 (LRU).
conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+ conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
int minBufferSize = 1;
cachePolicy = new LowLevelLrfuCachePolicy(minBufferSize, maxSize, conf);
memoryManager = new LowLevelCacheMemoryManager(maxSize, cachePolicy, CACHE_METRICS);
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 923042d..fbe58ff 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -57,6 +57,7 @@ public class TestLowLevelLrfuCachePolicy {
Configuration conf = new Configuration();
// Set lambda to 1 so the heap size becomes 1 (LRU).
conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+ conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
final LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, memSize, conf);
Field f = LowLevelLrfuCachePolicy.class.getDeclaredField("listLock");
f.setAccessible(true);
@@ -127,6 +128,7 @@ public class TestLowLevelLrfuCachePolicy {
Configuration conf = new Configuration();
ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f);
+ conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lfu,
@@ -157,6 +159,7 @@ public class TestLowLevelLrfuCachePolicy {
Configuration conf = new Configuration();
ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+ conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(1, heapSize, conf);
LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lru,
@@ -183,6 +186,7 @@ public class TestLowLevelLrfuCachePolicy {
final int HEAP_SIZE = 32;
Configuration conf = new Configuration();
conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f);
+ conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
EvictionTracker et = new EvictionTracker();
LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, HEAP_SIZE, conf);
MetricsMock m = createMetricsMock();
@@ -225,6 +229,7 @@ public class TestLowLevelLrfuCachePolicy {
ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
EvictionTracker et = new EvictionTracker();
Configuration conf = new Configuration();
+ conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu,
LlapDaemonCacheMetrics.create("test", "1"));
@@ -302,6 +307,7 @@ public class TestLowLevelLrfuCachePolicy {
Configuration conf = new Configuration();
conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements
EvictionTracker et = new EvictionTracker();
+ conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
MetricsMock m = createMetricsMock();
LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu, m.metricsMock);