You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/06/14 03:16:25 UTC

hive git commit: HIVE-19866 : improve LLAP cache purge (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)

Repository: hive
Updated Branches:
  refs/heads/master 93e33e119 -> 086700efa


HIVE-19866 : improve LLAP cache purge (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/086700ef
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/086700ef
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/086700ef

Branch: refs/heads/master
Commit: 086700efae4a33ad6eec0e845cdb9480164450a2
Parents: 93e33e1
Author: sergey <se...@apache.org>
Authored: Wed Jun 13 19:49:04 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Wed Jun 13 19:49:04 2018 -0700

----------------------------------------------------------------------
 .../llap/cache/LowLevelCacheMemoryManager.java  | 12 +++
 .../llap/cache/LowLevelLrfuCachePolicy.java     | 77 ++++++++++++++++++--
 .../hive/llap/io/api/impl/LlapIoImpl.java       | 18 +++--
 .../llap/cache/TestLowLevelLrfuCachePolicy.java | 40 ++++++++++
 4 files changed, 133 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/086700ef/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index 60d56d6..4297cfc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -133,4 +133,16 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
   public void updateMaxSize(long maxSize) {
     this.maxSize = maxSize;
   }
+
+  public long purge() {
+    if (evictor == null) return 0;
+    long evicted = evictor.purge();
+    if (evicted == 0) return 0;
+    long usedMem = -1;
+    do {
+      usedMem = usedMemory.get();
+    } while (!usedMemory.compareAndSet(usedMem, usedMem - evicted));
+    metrics.incrCacheCapacityUsed(-evicted);
+    return evicted;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/086700ef/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 82e1934..e552fee 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -57,17 +57,18 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
    * Perhaps we should use ConcurrentDoubleLinkedList (in public domain).
    * ONLY LIST REMOVAL is allowed under list lock.
    */
-  private final LlapCacheableBuffer[] heap;
+  private LlapCacheableBuffer[] heap;
+  private final Object heapLock = new Object();
   private final ReentrantLock listLock = new ReentrantLock();
   private LlapCacheableBuffer listHead, listTail;
   /** Number of elements. */
   private int heapSize = 0;
+  private final int maxHeapSize;
   private EvictionListener evictionListener;
 
   public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) {
     lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
     int maxBuffers = (int)Math.ceil((maxSize * 1.0) / minBufferSize);
-    int maxHeapSize = -1;
     if (lambda == 0) {
       maxHeapSize = maxBuffers; // lrfuThreshold is +inf in this case
     } else {
@@ -120,7 +121,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
       LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
     }
-    synchronized (heap) {
+    synchronized (heapLock) {
       // First, update buffer priority - we have just been using it.
       buffer.priority = (buffer.lastUpdate == -1) ? F0
           : touchPriority(time, buffer.lastUpdate, buffer.priority);
@@ -174,11 +175,75 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
 
   @Override
   public long purge() {
-    long evicted = evictSomeBlocks(Long.MAX_VALUE);
-    LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy", LlapUtil.humanReadableByteCount(evicted));
+    long evicted = 0;
+    LlapCacheableBuffer oldTail = null;
+    listLock.lock();
+    try {
+      LlapCacheableBuffer current = oldTail = listTail;
+      while (current != null) {
+        boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK != current.invalidate();
+        current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
+        if (canEvict) {
+          current = current.prev;
+        } else {
+          // Remove from the list.
+          LlapCacheableBuffer newCurrent = current.prev;
+          oldTail = removeFromLocalList(oldTail, current);
+          current = newCurrent;
+        }
+      }
+      listHead = listTail = null;
+    } finally {
+      listLock.unlock();
+    }
+
+    LlapCacheableBuffer[] oldHeap = null;
+    int oldHeapSize = -1;
+    synchronized (heapLock) {
+      oldHeap = heap;
+      oldHeapSize = heapSize;
+      heap = new LlapCacheableBuffer[maxHeapSize];
+      heapSize = 0;
+      for (int i = 0; i < oldHeapSize; ++i) {
+        LlapCacheableBuffer result = oldHeap[i];
+        result.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
+        int invalidateResult = result.invalidate();
+        if (invalidateResult != LlapCacheableBuffer.INVALIDATE_OK) {
+          oldHeap[i] = null; // Removed from heap without evicting.
+        }
+      }
+    }
+    LlapCacheableBuffer current = oldTail;
+    while (current != null) {
+      evicted += current.getMemoryUsage();
+      evictionListener.notifyEvicted(current);
+      current = current.prev;
+    }
+    for (int i = 0; i < oldHeapSize; ++i) {
+      current = oldHeap[i];
+      if (current == null) continue;
+      evicted += current.getMemoryUsage();
+      evictionListener.notifyEvicted(current);
+    }
+    LlapIoImpl.LOG.info("PURGE: evicted {} from LRFU policy",
+        LlapUtil.humanReadableByteCount(evicted));
     return evicted;
   }
 
+  private static LlapCacheableBuffer removeFromLocalList(
+      LlapCacheableBuffer tail, LlapCacheableBuffer current) {
+    if (current == tail) {
+      tail = current.prev;
+    } else {
+      current.next.prev = current.prev;
+    }
+    if (current.prev != null) {
+      current.prev.next = current.next;
+    }
+    current.prev = current.next = null;
+    return tail;
+  }
+
 
   @Override
   public long evictSomeBlocks(long memoryToReserve) {
@@ -190,7 +255,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     long time = timer.get();
     while (evicted < memoryToReserve) {
       LlapCacheableBuffer buffer = null;
-      synchronized (heap) {
+      synchronized (heapLock) {
         buffer = evictFromHeapUnderLock(time);
       }
       if (buffer == null) return evicted;

http://git-wip-us.apache.org/repos/asf/hive/blob/086700ef/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index e1e8a32..2fffeb8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -102,7 +102,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
   private final LowLevelCache dataCache;
   private final BufferUsageManager bufferManager;
   private final Configuration daemonConf;
-  private LowLevelCachePolicy cachePolicyWrapper;
+  private final LowLevelCacheMemoryManager memoryManager;
 
   private List<LlapIoDebugDump> debugDumpComponents = new ArrayList<>();
 
@@ -147,17 +147,18 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
       LowLevelCachePolicy realCachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
           minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
       boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
+      LowLevelCachePolicy cachePolicyWrapper;
       if (trackUsage) {
-        this.cachePolicyWrapper = new CacheContentsTracker(realCachePolicy);
+        cachePolicyWrapper = new CacheContentsTracker(realCachePolicy);
       } else {
-        this.cachePolicyWrapper = realCachePolicy;
+        cachePolicyWrapper = realCachePolicy;
       }
       // Allocator uses memory manager to request memory, so create the manager next.
-      LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
+      this.memoryManager = new LowLevelCacheMemoryManager(
           totalMemorySize, cachePolicyWrapper, cacheMetrics);
       cacheMetrics.setCacheCapacityTotal(totalMemorySize);
       // Cache uses allocator to allocate and deallocate, create allocator and then caches.
-      BuddyAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
+      BuddyAllocator allocator = new BuddyAllocator(conf, memoryManager, cacheMetrics);
       this.allocator = allocator;
       LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
           cacheMetrics, cachePolicyWrapper, allocator, true);
@@ -170,7 +171,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
 
       boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
       metadataCache = new MetadataCache(
-          allocator, memManager, cachePolicyWrapper, useGapCache, cacheMetrics);
+          allocator, memoryManager, cachePolicyWrapper, useGapCache, cacheMetrics);
       fileMetadataCache = metadataCache;
       // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
       EvictionDispatcher e = new EvictionDispatcher(
@@ -198,6 +199,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
       SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
       bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm;
       dataCache = sbm;
+      this.memoryManager = null;
       debugDumpComponents.add(new LlapIoDebugDump() {
         @Override
         public void debugDumpShort(StringBuilder sb) {
@@ -234,8 +236,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
 
   @Override
   public long purge() {
-    if (cachePolicyWrapper != null) {
-      return cachePolicyWrapper.purge();
+    if (memoryManager != null) {
+      return memoryManager.purge();
     }
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/086700ef/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 99841aa..6eb2eb5 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -179,6 +179,46 @@ public class TestLowLevelLrfuCachePolicy {
   }
 
   @Test
+  public void testPurge() {
+    final int HEAP_SIZE = 32;
+    Configuration conf = new Configuration();
+    conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f);
+    EvictionTracker et = new EvictionTracker();
+    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, HEAP_SIZE, conf);
+    MetricsMock m = createMetricsMock();
+    LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(
+        HEAP_SIZE, lrfu, m.metricsMock);
+    lrfu.setEvictionListener(et);
+    assertEquals(0, lrfu.purge());
+    for (int testSize = 1; testSize <= HEAP_SIZE; ++testSize) {
+      LOG.info("Starting with " + testSize);
+      ArrayList<LlapDataBuffer> purge = new ArrayList<LlapDataBuffer>(testSize),
+        dontPurge = new ArrayList<LlapDataBuffer>(testSize);
+      for (int i = 0; i < testSize; ++i) {
+        LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
+        assertTrue(cache(mm, lrfu, et, buffer));
+        // Lock a few blocks without telling the policy.
+        if ((i + 1) % 3 == 0) {
+          buffer.incRef();
+          dontPurge.add(buffer);
+        } else {
+          purge.add(buffer);
+        }
+      }
+      lrfu.purge();
+      for (LlapDataBuffer buffer : purge) {
+        assertTrue(buffer + " " + testSize, buffer.isInvalid());
+        mm.releaseMemory(buffer.getMemoryUsage());
+      }
+      for (LlapDataBuffer buffer : dontPurge) {
+        assertFalse(buffer.isInvalid());
+        buffer.decRef();
+        mm.releaseMemory(buffer.getMemoryUsage());
+      }
+    }
+  }
+
+  @Test
   public void testDeadlockResolution() {
     int heapSize = 4;
     LOG.info("Testing deadlock resolution");