You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2019/11/18 18:30:45 UTC

[hive] branch master updated: HIVE-22492: Amortize lock contention due to LRFU accounting (Slim Bouguerra via Gopal V)

This is an automated email from the ASF dual-hosted git repository.

bslim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0781cf2  HIVE-22492: Amortize lock contention due to LRFU accounting (Slim Bouguerra via Gopal V)
0781cf2 is described below

commit 0781cf2c5104dafd0c5496631cafabac9d59df67
Author: Slim Bouguerra <bs...@apache.org>
AuthorDate: Mon Nov 18 10:29:20 2019 -0800

    HIVE-22492: Amortize lock contention due to LRFU accounting (Slim Bouguerra via Gopal V)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  6 +-
 .../hive/llap/cache/LowLevelLrfuCachePolicy.java   | 86 +++++++++++++++++-----
 .../hadoop/hive/llap/io/api/impl/LlapIoImpl.java   |  7 +-
 .../cache/TestCacheAllocationsEvictionsCycles.java |  1 +
 .../llap/cache/TestLowLevelLrfuCachePolicy.java    |  6 ++
 5 files changed, 85 insertions(+), 21 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8aaf546..9162253 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -359,6 +359,7 @@ public class HiveConf extends Configuration {
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_ALLOCATOR_DIRECT.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_USE_LRFU.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_LAMBDA.varname);
+    llapDaemonVarsSetLocal.add(ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_USE_FILEID_PATH.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_IO_DECODING_METRICS_PERCENTILE_INTERVALS.varname);
@@ -4119,12 +4120,15 @@ public class HiveConf extends Configuration {
          "partitions and tables) for reporting."),
     LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
         "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
-    LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.000001f,
+    LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.1f,
         "Lambda for ORC low-level cache LRFU cache policy. Must be in [0, 1]. 0 makes LRFU\n" +
         "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly.\n" +
         "The meaning of this parameter is the inverse of the number of time ticks (cache\n" +
         " operations, currently) that cause the combined recency-frequency of a block in cache\n" +
         " to be halved."),
+    LLAP_LRFU_BP_WRAPPER_SIZE("hive.llap.io.lrfu.bp.wrapper.size", 64, "thread local queue "
+        + "used to amortize the lock contention, the idea hear is to try locking as soon we reach max size / 2 "
+        + "and block when max queue size reached"),
     LLAP_CACHE_ALLOW_SYNTHETIC_FILEID("hive.llap.cache.allow.synthetic.fileid", true,
         "Whether LLAP cache should use synthetic file ID if real one is not available. Systems\n" +
         "like HDFS, Isilon, etc. provide a unique file/inode ID. On other FSes (e.g. local\n" +
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 759819d..d1d6acd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.metrics2.impl.MsInfo;
  * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies".
  * Additionally, buffer locking has to be handled (locked buffer cannot be evicted).
  */
-public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
+public final class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   private final double lambda;
   private double f(long x) {
     return Math.pow(0.5, lambda * x);
@@ -67,7 +67,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
    * ONLY LIST REMOVAL is allowed under list lock.
    */
   private LlapCacheableBuffer[] heap;
-  private final Object heapLock = new Object();
+  private final ReentrantLock heapLock = new ReentrantLock();
   private final ReentrantLock listLock = new ReentrantLock();
   private LlapCacheableBuffer listHead, listTail;
   /** Number of elements. */
@@ -75,9 +75,15 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   private final int maxHeapSize;
   private EvictionListener evictionListener;
   private final PolicyMetrics metrics;
+  private final ThreadLocal<LlapCacheableBuffer[]> threadLocalBuffers;
+  private final ThreadLocal<Integer> threadLocalCount;
+  private final int maxQueueSize;
 
   public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) {
-    lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
+
+    this.maxQueueSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE);
+    this.lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
+
     int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
     if (lambda == 0) {
       maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case
@@ -99,8 +105,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
 
     // register new metrics provider for this cache policy
     metrics = new PolicyMetrics(sessID);
-    LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(),
-                                          null, metrics);
+    LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(), null, metrics);
+    threadLocalBuffers = ThreadLocal.withInitial(() -> new LlapCacheableBuffer[maxQueueSize]);
+    threadLocalCount = ThreadLocal.withInitial(() -> 0);
   }
 
   @Override
@@ -140,11 +147,47 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
 
   @Override
   public void notifyUnlock(LlapCacheableBuffer buffer) {
-    long time = timer.incrementAndGet();
-    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
-      LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
+
+    int count = threadLocalCount.get();
+    final LlapCacheableBuffer[] cacheableBuffers = threadLocalBuffers.get() ;
+    if (count < maxQueueSize) {
+      cacheableBuffers[count] = buffer;
+      threadLocalCount.set(++count);
+    }
+    if (count <= maxQueueSize / 2) {
+      // case too early to flush
+      return;
+    }
+
+    if (count == maxQueueSize) {
+      // case we have to flush thus block on heap lock
+      heapLock.lock();
+      try {
+        doNotifyUnderHeapLock(count, cacheableBuffers);
+      } finally {
+        threadLocalCount.set(0);
+        heapLock.unlock();
+      }
+      return;
     }
-    synchronized (heapLock) {
+    if (heapLock.tryLock()) {
+      try {
+        doNotifyUnderHeapLock(count, cacheableBuffers);
+      } finally {
+        threadLocalCount.set(0);
+        heapLock.unlock();
+      }
+    }
+  }
+
+  private void doNotifyUnderHeapLock(int count, LlapCacheableBuffer[] cacheableBuffers) {
+    LlapCacheableBuffer buffer;
+    for (int i = 0; i < count; i++) {
+      buffer = cacheableBuffers[i];
+      long time = timer.incrementAndGet();
+      if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+        LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
+      }
       // First, update buffer priority - we have just been using it.
       buffer.priority = (buffer.lastUpdate == -1) ? F0
           : touchPriority(time, buffer.lastUpdate, buffer.priority);
@@ -200,7 +243,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   @Override
   public long purge() {
     long evicted = 0;
-    LlapCacheableBuffer oldTail = null;
+    LlapCacheableBuffer oldTail;
     listLock.lock();
     try {
       LlapCacheableBuffer current = listTail;
@@ -223,9 +266,10 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
       listLock.unlock();
     }
 
-    LlapCacheableBuffer[] oldHeap = null;
-    int oldHeapSize = -1;
-    synchronized (heapLock) {
+    LlapCacheableBuffer[] oldHeap;
+    int oldHeapSize;
+    heapLock.lock();
+    try {
       oldHeap = heap;
       oldHeapSize = heapSize;
       heap = new LlapCacheableBuffer[maxHeapSize];
@@ -238,6 +282,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
           oldHeap[i] = null; // Removed from heap without evicting.
         }
       }
+    } finally {
+      heapLock.unlock();
     }
     LlapCacheableBuffer current = oldTail;
     while (current != null) {
@@ -285,9 +331,12 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     // there's a small number of buffers and they all live in the heap).
     long time = timer.get();
     while (evicted < memoryToReserve) {
-      LlapCacheableBuffer buffer = null;
-      synchronized (heapLock) {
+      LlapCacheableBuffer buffer;
+      heapLock.lock();
+      try {
         buffer = evictFromHeapUnderLock(time);
+      } finally {
+        heapLock.unlock();
       }
       if (buffer == null) {
         return evicted;
@@ -300,7 +349,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
 
   private long evictFromList(long memoryToReserve) {
     long evicted = 0;
-    LlapCacheableBuffer nextCandidate = null, firstCandidate = null;
+    LlapCacheableBuffer nextCandidate, firstCandidate;
     listLock.lock();
     // We assume that there are no locked blocks in the list; or if they are, they can be dropped.
     // Therefore we always evict one contiguous sequence from the tail. We can find it in one pass,
@@ -702,7 +751,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
       long lockedMeta = 0L;   // number of bytes in locked metadata buffers
 
       // aggregate values on the heap
-      synchronized (heapLock) {
+      heapLock.lock();
+      try {
         for (int heapIdx = 0; heapIdx < heapSize; ++heapIdx) {
           LlapCacheableBuffer buff = heap[heapIdx];
 
@@ -720,6 +770,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
             }
           }
         }
+      } finally {
+        heapLock.unlock();
       }
 
       // aggregate values on the evicition short list
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 0d9077c..fadefa2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -144,9 +144,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
       // Memory manager uses cache policy to trigger evictions, so create the policy first.
       boolean useLrfu = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU);
       long totalMemorySize = HiveConf.getSizeVar(conf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE);
-      int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
-      LowLevelCachePolicy realCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
-          minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
+      int minAllocSize = (int) HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
+      LowLevelCachePolicy
+          realCachePolicy =
+          useLrfu ? new LowLevelLrfuCachePolicy(minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
       boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
       LowLevelCachePolicy cachePolicyWrapper;
       if (trackUsage) {
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java
index f4d9057..d7f02d4 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheAllocationsEvictionsCycles.java
@@ -57,6 +57,7 @@ public class TestCacheAllocationsEvictionsCycles {
     Configuration conf = new Configuration();
     // Set lambda to 1 so the heap size becomes 1 (LRU).
     conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+    conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
     int minBufferSize = 1;
     cachePolicy = new LowLevelLrfuCachePolicy(minBufferSize, maxSize, conf);
     memoryManager = new LowLevelCacheMemoryManager(maxSize, cachePolicy, CACHE_METRICS);
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 923042d..fbe58ff 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -57,6 +57,7 @@ public class TestLowLevelLrfuCachePolicy {
     Configuration conf = new Configuration();
     // Set lambda to 1 so the heap size becomes 1 (LRU).
     conf.setDouble(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+    conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
     final LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, memSize, conf);
     Field f = LowLevelLrfuCachePolicy.class.getDeclaredField("listLock");
     f.setAccessible(true);
@@ -127,6 +128,7 @@ public class TestLowLevelLrfuCachePolicy {
     Configuration conf = new Configuration();
     ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
     conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.0f);
+    conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
     EvictionTracker et = new EvictionTracker();
     LowLevelLrfuCachePolicy lfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
     LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lfu,
@@ -157,6 +159,7 @@ public class TestLowLevelLrfuCachePolicy {
     Configuration conf = new Configuration();
     ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
     conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 1.0f);
+    conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
     EvictionTracker et = new EvictionTracker();
     LowLevelLrfuCachePolicy lru = new LowLevelLrfuCachePolicy(1, heapSize, conf);
     LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lru,
@@ -183,6 +186,7 @@ public class TestLowLevelLrfuCachePolicy {
     final int HEAP_SIZE = 32;
     Configuration conf = new Configuration();
     conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f);
+    conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
     EvictionTracker et = new EvictionTracker();
     LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, HEAP_SIZE, conf);
     MetricsMock m = createMetricsMock();
@@ -225,6 +229,7 @@ public class TestLowLevelLrfuCachePolicy {
     ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
     EvictionTracker et = new EvictionTracker();
     Configuration conf = new Configuration();
+    conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
     LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
     LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu,
         LlapDaemonCacheMetrics.create("test", "1"));
@@ -302,6 +307,7 @@ public class TestLowLevelLrfuCachePolicy {
     Configuration conf = new Configuration();
     conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements
     EvictionTracker et = new EvictionTracker();
+    conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 1);
     LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
     MetricsMock m = createMetricsMock();
     LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu, m.metricsMock);