You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/12/08 02:32:09 UTC

hive git commit: HIVE-12591 : LLAP cache counters displays -ve value for CacheCapacityUsed (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/branch-2.0 f9d100264 -> bfc25f42f


HIVE-12591 : LLAP cache counters displays -ve value for CacheCapacityUsed (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bfc25f42
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bfc25f42
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bfc25f42

Branch: refs/heads/branch-2.0
Commit: bfc25f42f7c14aae907ae7dd96ba593469fcb9db
Parents: f9d1002
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Dec 7 17:26:03 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Dec 7 17:31:54 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hive/llap/cache/BuddyAllocator.java  |  1 -
 .../llap/cache/LowLevelCacheMemoryManager.java  | 41 +++++++++------
 .../llap/metrics/LlapDaemonCacheMetrics.java    | 13 ++---
 .../llap/cache/TestLowLevelLrfuCachePolicy.java | 52 +++++++++++++++++---
 4 files changed, 78 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bfc25f42/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index 485a145..0c96efa 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -216,7 +216,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
   private void deallocateInternal(MemoryBuffer buffer, boolean doReleaseMemory) {
     LlapDataBuffer buf = (LlapDataBuffer)buffer;
     long memUsage = buf.getMemoryUsage();
-    metrics.decrCacheCapacityUsed(buf.byteBuffer.capacity());
     arenas[buf.arenaIndex].deallocate(buf);
     if (doReleaseMemory) {
       memoryManager.releaseMemory(memUsage);

http://git-wip-us.apache.org/repos/asf/hive/blob/bfc25f42/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
index d584ca8..8788e15 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheMemoryManager.java
@@ -43,27 +43,35 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
     this.evictor = evictor;
     this.usedMemory = new AtomicLong(0);
     this.metrics = metrics;
-    metrics.incrCacheCapacityTotal(maxSize);
+    metrics.setCacheCapacityTotal(maxSize);
     if (LlapIoImpl.LOGL.isInfoEnabled()) {
       LlapIoImpl.LOG.info("Cache memory manager initialized with max size " + maxSize);
     }
   }
 
   @Override
-  public boolean reserveMemory(long memoryToReserve, boolean waitForEviction) {
+  public boolean reserveMemory(final long memoryToReserve, boolean waitForEviction) {
     // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point?
     int badCallCount = 0;
     int nextLog = 4;
-    while (memoryToReserve > 0) {
-      long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve;
+    long evictedTotalMetric = 0, reservedTotalMetric = 0, remainingToReserve = memoryToReserve;
+    boolean result = true;
+    while (remainingToReserve > 0) {
+      long usedMem = usedMemory.get(), newUsedMem = usedMem + remainingToReserve;
       if (newUsedMem <= maxSize) {
-        if (usedMemory.compareAndSet(usedMem, newUsedMem)) break;
+        if (usedMemory.compareAndSet(usedMem, newUsedMem)) {
+          reservedTotalMetric += remainingToReserve;
+          break;
+        }
         continue;
       }
       // TODO: for one-block case, we could move notification for the last block out of the loop.
-      long evicted = evictor.evictSomeBlocks(memoryToReserve);
+      long evicted = evictor.evictSomeBlocks(remainingToReserve);
       if (evicted == 0) {
-        if (!waitForEviction) return false;
+        if (!waitForEviction) {
+          result = false;
+          break;
+        }
         ++badCallCount;
         if (badCallCount == nextLog) {
           LlapIoImpl.LOG.warn("Cannot evict blocks for " + badCallCount + " calls; cache full?");
@@ -72,24 +80,28 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
             Thread.sleep(Math.min(1000, nextLog));
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            return false;
+            result = false;
+            break;
           }
         }
         continue;
       }
+      evictedTotalMetric += evicted;
       badCallCount = 0;
       // Adjust the memory - we have to account for what we have just evicted.
       while (true) {
-        long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted);
-        if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reserveWithEviction)) {
-          memoryToReserve -= reserveWithEviction;
+        long availableToReserveAfterEvict = maxSize - usedMem + evicted;
+        long reservedAfterEvict = Math.min(remainingToReserve, availableToReserveAfterEvict);
+        if (usedMemory.compareAndSet(usedMem, usedMem - evicted + reservedAfterEvict)) {
+          remainingToReserve -= reservedAfterEvict;
+          reservedTotalMetric += reservedAfterEvict;
           break;
         }
         usedMem = usedMemory.get();
       }
     }
-    metrics.incrCacheCapacityUsed(memoryToReserve);
-    return true;
+    metrics.incrCacheCapacityUsed(reservedTotalMetric - evictedTotalMetric);
+    return result;
   }
 
 
@@ -103,12 +115,13 @@ public class LowLevelCacheMemoryManager implements MemoryManager {
   }
 
   @Override
-  public void releaseMemory(long memoryToRelease) {
+  public void releaseMemory(final long memoryToRelease) {
     long oldV;
     do {
       oldV = usedMemory.get();
       assert oldV >= memoryToRelease;
     } while (!usedMemory.compareAndSet(oldV, oldV - memoryToRelease));
+    metrics.incrCacheCapacityUsed(-memoryToRelease);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/bfc25f42/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
index 3ffa6e0..52057e4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonCacheMetrics.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -53,9 +54,9 @@ public class LlapDaemonCacheMetrics implements MetricsSource {
   @Metric
   MutableCounterLong cacheReadRequests;
   @Metric
-  MutableCounterLong cacheCapacityTotal;
+  MutableGaugeLong cacheCapacityTotal;
   @Metric
-  MutableCounterLong cacheCapacityUsed;
+  MutableCounterLong cacheCapacityUsed; // Not using the gauge to avoid races.
   @Metric
   MutableCounterLong cacheRequestedBytes;
   @Metric
@@ -77,18 +78,14 @@ public class LlapDaemonCacheMetrics implements MetricsSource {
     return ms.register(displayName, null, new LlapDaemonCacheMetrics(displayName, sessionId));
   }
 
-  public void incrCacheCapacityTotal(long delta) {
-    cacheCapacityTotal.incr(delta);
+  public void setCacheCapacityTotal(long value) {
+    cacheCapacityTotal.set(value);
   }
 
   public void incrCacheCapacityUsed(long delta) {
     cacheCapacityUsed.incr(delta);
   }
 
-  public void decrCacheCapacityUsed(int delta) {
-    cacheCapacityUsed.incr(-delta);
-  }
-
   public void incrCacheRequestedBytes(long delta) {
     cacheRequestedBytes.incr(delta);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/bfc25f42/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index bb530ef..d0abfa3 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -18,6 +18,13 @@
 package org.apache.hadoop.hive.llap.cache;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.mockito.invocation.InvocationOnMock;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -133,7 +140,7 @@ public class TestLowLevelLrfuCachePolicy {
         lfu.notifyUnlock(inserted.get(i));
       }
     }
-    verifyOrder(mm, lfu, et, inserted);
+    verifyOrder(mm, lfu, et, inserted, null);
   }
 
   private Configuration createConf(int min, int heapSize, Double lambda) {
@@ -176,7 +183,7 @@ public class TestLowLevelLrfuCachePolicy {
         lru.notifyUnlock(inserted.get(i));
       }
     }
-    verifyOrder(mm, lru, et, inserted);
+    verifyOrder(mm, lru, et, inserted, null);
   }
 
   @Test
@@ -236,6 +243,27 @@ public class TestLowLevelLrfuCachePolicy {
     lrfu.notifyUnlock(locked);
   }
 
+  private static class MetricsMock {
+    public MetricsMock(AtomicLong cacheUsed, LlapDaemonCacheMetrics metricsMock) {
+      this.cacheUsed = cacheUsed;
+      this.metricsMock = metricsMock;
+    }
+    public AtomicLong cacheUsed;
+    public LlapDaemonCacheMetrics metricsMock;
+  }
+
+  private MetricsMock createMetricsMock() {
+    LlapDaemonCacheMetrics metricsMock = mock(LlapDaemonCacheMetrics.class);
+    final AtomicLong cacheUsed = new AtomicLong(0);
+    doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        cacheUsed.addAndGet((Long)invocation.getArguments()[0]);
+        return null;
+      }
+    }).when(metricsMock).incrCacheCapacityUsed(anyLong());
+    return new MetricsMock(cacheUsed, metricsMock);
+  }
+
   private void testHeapSize(int heapSize) {
     LOG.info("Testing heap size " + heapSize);
     Random rdm = new Random(1234);
@@ -243,8 +271,8 @@ public class TestLowLevelLrfuCachePolicy {
     conf.setFloat(HiveConf.ConfVars.LLAP_LRFU_LAMBDA.varname, 0.2f); // very small heap, 14 elements
     EvictionTracker et = new EvictionTracker();
     LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(conf);
-    LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu,
-        LlapDaemonCacheMetrics.create("test", "1"));
+    MetricsMock m = createMetricsMock();
+    LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(conf, lrfu, m.metricsMock);
     lrfu.setEvictionListener(et);
     // Insert the number of elements plus 2, to trigger 2 evictions.
     int toEvict = 2;
@@ -254,6 +282,7 @@ public class TestLowLevelLrfuCachePolicy {
     for (int i = 0; i < heapSize + toEvict; ++i) {
       LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
       assertTrue(cache(mm, lrfu, et, buffer));
+      assertEquals((long)Math.min(i + 1, heapSize), m.cacheUsed.get());
       LlapDataBuffer evictedBuf = getOneEvictedBuffer(et);
       if (i < toEvict) {
         evicted[i] = buffer;
@@ -275,6 +304,7 @@ public class TestLowLevelLrfuCachePolicy {
     for (LlapDataBuffer buf : inserted) {
       lock(lrfu, buf);
     }
+    assertEquals(heapSize, m.cacheUsed.get());
     assertFalse(mm.reserveMemory(1, false));
     if (!et.evicted.isEmpty()) {
       assertTrue("Got " + et.evicted.get(0), et.evicted.isEmpty());
@@ -291,24 +321,34 @@ public class TestLowLevelLrfuCachePolicy {
         lrfu.notifyUnlock(buf);
       }
     }
-    verifyOrder(mm, lrfu, et, inserted);
+    verifyOrder(mm, lrfu, et, inserted, m.cacheUsed);
   }
 
   private void verifyOrder(LowLevelCacheMemoryManager mm, LowLevelLrfuCachePolicy lrfu,
-      EvictionTracker et, ArrayList<LlapDataBuffer> inserted) {
+      EvictionTracker et, ArrayList<LlapDataBuffer> inserted, AtomicLong cacheUsed) {
     LlapDataBuffer block;
     // Evict all blocks.
     et.evicted.clear();
     for (int i = 0; i < inserted.size(); ++i) {
       assertTrue(mm.reserveMemory(1, false));
+      if (cacheUsed != null) {
+        assertEquals(inserted.size(), cacheUsed.get());
+      }
     }
     // The map should now be empty.
     assertFalse(mm.reserveMemory(1, false));
+    if (cacheUsed != null) {
+      assertEquals(inserted.size(), cacheUsed.get());
+    }
     for (int i = 0; i < inserted.size(); ++i) {
       block = et.evicted.get(i);
       assertTrue(block.isInvalid());
       assertSame(inserted.get(i), block);
     }
+    if (cacheUsed != null) {
+      mm.releaseMemory(inserted.size());
+      assertEquals(0, cacheUsed.get());
+    }
   }
 
   private String dumpInserted(ArrayList<LlapDataBuffer> inserted) {