You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2019/03/25 16:17:46 UTC

[hive] 01/02: HIVE-21422: Add metrics to LRFU cache policy (Oliver Draese, reviewed by Gopal V)

This is an automated email from the ASF dual-hosted git repository.

gopalv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit 741630c1b81cb6928b935e3ca8c0bbbbed46838b
Author: Olli Draese <od...@cloudera.com>
AuthorDate: Mon Mar 25 09:16:37 2019 -0700

    HIVE-21422: Add metrics to LRFU cache policy (Oliver Draese, reviewed by Gopal V)
    
    Signed-off-by: Gopal V <go...@apache.org>
---
 .../hive/llap/cache/LowLevelLrfuCachePolicy.java   | 287 ++++++++++++++++++---
 1 file changed, 251 insertions(+), 36 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 704f2f1..759819d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -27,6 +27,15 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.impl.MsInfo;
 
 /**
  * Implementation of the algorithm from "On the Existence of a Spectrum of Policies
@@ -35,14 +44,14 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
  */
 public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   private final double lambda;
-  private final double f(long x) {
+  private double f(long x) {
     return Math.pow(0.5, lambda * x);
   }
   private static final double F0 = 1; // f(0) is always 1
-  private final double touchPriority(long time, long lastAccess, double previous) {
+  private double touchPriority(long time, long lastAccess, double previous) {
     return F0 + f(time - lastAccess) * previous;
   }
-  private final double expirePriority(long time, long lastAccess, double previous) {
+  private double expirePriority(long time, long lastAccess, double previous) {
     return f(time - lastAccess) * previous;
   }
 
@@ -65,6 +74,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   private int heapSize = 0;
   private final int maxHeapSize;
   private EvictionListener evictionListener;
+  private final PolicyMetrics metrics;
 
   public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) {
     lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
@@ -79,7 +89,18 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
         minBufferSize, lambda, maxHeapSize);
 
     heap = new LlapCacheableBuffer[maxHeapSize];
-    listHead = listTail = null;
+    listHead = null;
+    listTail = null;
+
+    String sessID = conf.get("llap.daemon.metrics.sessionid");
+    if (null == sessID) {
+      sessID = "<unknown>";
+    }
+
+    // register new metrics provider for this cache policy
+    metrics = new PolicyMetrics(sessID);
+    LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(),
+                                          null, metrics);
   }
 
   @Override
@@ -110,8 +131,10 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     // a locked item in either, it will remove it from cache; when we unlock, we are going to
     // put it back or update it, depending on whether this has happened. This should cause
     // most of the expensive cache update work to happen in unlock, not blocking processing.
-    if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return;
-    if (!listLock.tryLock()) return;
+    if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST || !listLock.tryLock()) {
+      return;
+    }
+
     removeFromListAndUnlock(buffer);
   }
 
@@ -149,7 +172,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
             listHead.prev = demoted;
             listHead = demoted;
           } else {
-            listHead = listTail = demoted;
+            listHead = demoted;
+            listTail = demoted;
             demoted.next = null;
           }
         } finally {
@@ -179,7 +203,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     LlapCacheableBuffer oldTail = null;
     listLock.lock();
     try {
-      LlapCacheableBuffer current = oldTail = listTail;
+      LlapCacheableBuffer current = listTail;
+      oldTail = listTail;
       while (current != null) {
         boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK == current.invalidate();
         current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
@@ -192,7 +217,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
           current = newCurrent;
         }
       }
-      listHead = listTail = null;
+      listHead = null;
+      listTail = null;
     } finally {
       listLock.unlock();
     }
@@ -221,7 +247,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     }
     for (int i = 0; i < oldHeapSize; ++i) {
       current = oldHeap[i];
-      if (current == null) continue;
+      if (current == null) {
+        continue;
+      }
       evicted += current.getMemoryUsage();
       evictionListener.notifyEvicted(current);
     }
@@ -240,7 +268,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     if (current.prev != null) {
       current.prev.next = current.next;
     }
-    current.prev = current.next = null;
+    current.prev = null;
+    current.next = null;
     return tail;
   }
 
@@ -249,7 +278,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   public long evictSomeBlocks(long memoryToReserve) {
     // In normal case, we evict the items from the list.
     long evicted = evictFromList(memoryToReserve);
-    if (evicted >= memoryToReserve) return evicted;
+    if (evicted >= memoryToReserve) {
+      return evicted;
+    }
     // This should not happen unless we are evicting a lot at once, or buffers are large (so
     // there's a small number of buffers and they all live in the heap).
     long time = timer.get();
@@ -258,7 +289,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
       synchronized (heapLock) {
         buffer = evictFromHeapUnderLock(time);
       }
-      if (buffer == null) return evicted;
+      if (buffer == null) {
+        return evicted;
+      }
       evicted += buffer.getMemoryUsage();
       evictionListener.notifyEvicted(buffer);
     }
@@ -273,7 +306,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     // Therefore we always evict one contiguous sequence from the tail. We can find it in one pass,
     // splice it out and then finalize the eviction outside of the list lock.
     try {
-      nextCandidate = firstCandidate = listTail;
+      nextCandidate = listTail;
+      firstCandidate = listTail;
       while (evicted < memoryToReserve && nextCandidate != null) {
         if (LlapCacheableBuffer.INVALIDATE_OK != nextCandidate.invalidate()) {
           // Locked, or invalidated, buffer was in the list - just drop it;
@@ -293,7 +327,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
       }
       if (firstCandidate != nextCandidate) {
         if (nextCandidate == null) {
-          listHead = listTail = null; // We have evicted the entire list.
+          listHead = null;
+          listTail = null; // We have evicted the entire list.
         } else {
           // Splice the section that we have evicted out of the list.
           // We have already updated the state above so no need to do that again.
@@ -313,9 +348,13 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   // Note: rarely called (unless buffers are very large or we evict a lot, or in LFU case).
   private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
     while (true) {
-      if (heapSize == 0) return null;
+      if (heapSize == 0) {
+        return null;
+      }
       LlapCacheableBuffer result = evictHeapElementUnderLock(time, 0);
-      if (result != null) return result;
+      if (result != null) {
+        return result;
+      }
     }
   }
 
@@ -324,11 +363,15 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     int ix = buffer.indexInHeap;
     double priority = buffer.priority;
     while (true) {
-      if (ix == 0) break; // Buffer is at the top of the heap.
+      if (ix == 0) {
+        break; // Buffer is at the top of the heap.
+      }
       int parentIx = (ix - 1) >>> 1;
       LlapCacheableBuffer parent = heap[parentIx];
       double parentPri = getHeapifyPriority(parent, time);
-      if (priority >= parentPri) break;
+      if (priority >= parentPri) {
+        break;
+      }
       heap[ix] = parent;
       parent.indexInHeap = ix;
       ix = parentIx;
@@ -369,7 +412,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
     double priority = buffer.priority;
     while (true) {
       int newIx = moveMinChildUp(ix, time, priority);
-      if (newIx == -1) break;
+      if (newIx == -1) {
+        break;
+      }
       ix = newIx;
     }
     buffer.indexInHeap = ix;
@@ -384,7 +429,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
    */
   private int moveMinChildUp(int targetPos, long time, double comparePri) {
     int leftIx = (targetPos << 1) + 1, rightIx = leftIx + 1;
-    if (leftIx >= heapSize) return -1; // Buffer is at the leaf node.
+    if (leftIx >= heapSize) {
+      return -1; // Buffer is at the leaf node.
+    }
     LlapCacheableBuffer left = heap[leftIx], right = null;
     if (rightIx < heapSize) {
       right = heap[rightIx];
@@ -405,7 +452,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
   }
 
   private double getHeapifyPriority(LlapCacheableBuffer buf, long time) {
-    if (buf == null) return Double.MAX_VALUE;
+    if (buf == null) {
+      return Double.MAX_VALUE;
+    }
     if (buf.lastUpdate != time && time >= 0) {
       buf.priority = expirePriority(time, buf.lastUpdate, buf.priority);
       buf.lastUpdate = time;
@@ -415,7 +464,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
 
   private void removeFromListAndUnlock(LlapCacheableBuffer buffer) {
     try {
-      if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return;
+      if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) {
+        return;
+      }
       removeFromListUnderLock(buffer);
     } finally {
       listLock.unlock();
@@ -547,20 +598,184 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
 
   @Override
   public void debugDumpShort(StringBuilder sb) {
-    sb.append("\nLRFU eviction list: ");
-    LlapCacheableBuffer listHeadLocal = listHead, listTailLocal = listTail;
-    if (listHeadLocal == null) {
-      sb.append("0 items");
-    } else {
-      LlapCacheableBuffer listItem = listHeadLocal;
-      int c = 0;
-      while (listItem != null) {
-        ++c;
-        if (listItem == listTailLocal) break;
-        listItem = listItem.next;
+    long[] metricData = metrics.getUsageStats();
+    sb.append("\nLRFU eviction list: ")
+      .append(metricData[PolicyMetrics.LISTSIZE]).append(" items");
+    sb.append("\nLRFU eviction heap: ")
+      .append(heapSize).append(" items (of max ").append(maxHeapSize).append(")");
+    sb.append("\nLRFU data on heap: ")
+      .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.DATAONHEAP]));
+    sb.append("\nLRFU metadata on heap: ")
+      .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.METAONHEAP]));
+    sb.append("\nLRFU data on eviction list: ")
+      .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.DATAONLIST]));
+    sb.append("\nLRFU metadata on eviction list: ")
+      .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.METAONLIST]));
+    sb.append("\nLRFU data locked: ")
+      .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.LOCKEDDATA]));
+    sb.append("\nLRFU metadata locked: ")
+      .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.LOCKEDMETA]));
+  }
+
+  /**
+   * Metrics Information for LRFU specific policy information.
+   * This enumeration is used by the @code PolicyMetrics instance to
+   * define and describe the metrics.
+   */
+  private enum PolicyInformation implements MetricsInfo {
+    PolicyMetrics("LRFU cache policy based metrics"),
+    DataOnHeap("Amount of bytes used for data on min-heap"),
+    DataOnList("Amount of bytes used for data on eviction short list"),
+    MetaOnHeap("Amount of bytes used for meta data on min-heap"),
+    MetaOnList("Amount of bytes used for meta data on eviction short list"),
+    DataLocked("Amount of locked data in bytes (in use)"),
+    MetaLocked("Amount of locked meta data in bytes (in use)"),
+    HeapSize("Number of buffers on the min-heap"),
+    HeapSizeMax("Capacity (number of buffers) of the min-heap"),
+    ListSize("Number of buffers on the eviction short list"),
+    TotalData("Total amount of bytes, used for data"),
+    TotalMeta("Total amount of bytes, used for meta data");
+
+    private final String description; // metric explaination
+
+    /**
+     * Creates a new enum value.
+     *
+     * @param description The explaination of the metric
+     */
+    PolicyInformation(String description) {
+      this.description = description;
+    }
+
+    @Override
+    public String description() {
+      return description;
+    }
+  }
+
+  /**
+   * Metrics provider for the LRFU cache policy.
+   * An instance of this class is providing JMX (through haddoop metrics)
+   * statistics for the LRFU cache policy for monitoring.
+   */
+  @Metrics(about = "LRFU Cache Policy Metrics", context = "cache")
+  private class PolicyMetrics implements MetricsSource {
+    public static final int DATAONHEAP = 0;
+    public static final int DATAONLIST = 1;
+    public static final int METAONHEAP = 2;
+    public static final int METAONLIST = 3;
+    public static final int LISTSIZE   = 4;
+    public static final int LOCKEDDATA = 5;
+    public static final int LOCKEDMETA = 6;
+
+    private final String session;  // identifier for the LLAP daemon
+
+    /**
+     * Creates a new metrics producer.
+     *
+     * @param session The LLAP daemon identifier
+     */
+    PolicyMetrics(String session) {
+      this.session = session;
+    }
+
+    /**
+     * Helper to get some basic LRFU usage statistics.
+     * This method returns a long array with the following content:
+     * - amount of data (bytes) on min-heap
+     * - amount of data (bytes) on eviction short list
+     * - amount of metadata (bytes) on min-heap
+     * - amount of metadata (bytes) on eviction short list
+     * - size of the eviction short list
+     * - amount of locked bytes for data
+     * - amount of locked bytes for metadata
+     *
+     * @return long array with LRFU stats
+     */
+    public long[] getUsageStats() {
+      long dataOnHeap = 0L;   // all non-meta related buffers on min-heap
+      long dataOnList = 0L;   // all non-meta related buffers on eviction list
+      long metaOnHeap = 0L;   // meta data buffers on min-heap
+      long metaOnList = 0L;   // meta data buffers on eviction list
+      long listSize   = 0L;   // number of entries on eviction list
+      long lockedData = 0L;   // number of bytes in locked data buffers
+      long lockedMeta = 0L;   // number of bytes in locked metadata buffers
+
+      // aggregate values on the heap
+      synchronized (heapLock) {
+        for (int heapIdx = 0; heapIdx < heapSize; ++heapIdx) {
+          LlapCacheableBuffer buff = heap[heapIdx];
+
+          if (null != buff) {
+            if (buff instanceof LlapMetadataBuffer) {
+              metaOnHeap += buff.getMemoryUsage();
+              if (buff.isLocked()) {
+                lockedMeta += buff.getMemoryUsage();
+              }
+            } else {
+              dataOnHeap += buff.getMemoryUsage();
+              if (buff.isLocked()) {
+                lockedData += buff.getMemoryUsage();
+              }
+            }
+          }
+        }
       }
-      sb.append(c + " items");
+
+      // aggregate values on the evicition short list
+      try {
+        listLock.lock();
+        LlapCacheableBuffer scan = listHead;
+        while (null != scan) {
+          if (scan instanceof LlapMetadataBuffer) {
+            metaOnList += scan.getMemoryUsage();
+            if (scan.isLocked()) {
+              lockedMeta += scan.getMemoryUsage();
+            }
+          } else {
+            dataOnList += scan.getMemoryUsage();
+            if (scan.isLocked()) {
+              lockedData += scan.getMemoryUsage();
+            }
+          }
+
+          ++listSize;
+          scan = scan.next;
+        }
+      } finally {
+        listLock.unlock();
+      }
+
+      return new long[] {dataOnHeap, dataOnList,
+                         metaOnHeap, metaOnList, listSize,
+                         lockedData, lockedMeta};
+    }
+
+    @Override
+    public synchronized void getMetrics(MetricsCollector collector, boolean all) {
+      long[] usageStats = getUsageStats();
+
+      // start a new record
+      MetricsRecordBuilder mrb = collector.addRecord(PolicyInformation.PolicyMetrics)
+                                          .setContext("cache")
+                                          .tag(MsInfo.ProcessName,
+                                               MetricsUtils.METRICS_PROCESS_NAME)
+                                          .tag(MsInfo.SessionId, session);
+
+      // add the values to the new record
+      mrb.addCounter(PolicyInformation.DataOnHeap,   usageStats[DATAONHEAP])
+          .addCounter(PolicyInformation.DataOnList,  usageStats[DATAONLIST])
+          .addCounter(PolicyInformation.MetaOnHeap,  usageStats[METAONHEAP])
+          .addCounter(PolicyInformation.MetaOnList,  usageStats[METAONLIST])
+          .addCounter(PolicyInformation.DataLocked,  usageStats[LOCKEDDATA])
+          .addCounter(PolicyInformation.MetaLocked,  usageStats[LOCKEDMETA])
+          .addCounter(PolicyInformation.HeapSize,    heapSize)
+          .addCounter(PolicyInformation.HeapSizeMax, maxHeapSize)
+          .addCounter(PolicyInformation.ListSize,    usageStats[LISTSIZE])
+          .addCounter(PolicyInformation.TotalData,   usageStats[DATAONHEAP]
+                                                     + usageStats[DATAONLIST])
+          .addCounter(PolicyInformation.TotalMeta,   usageStats[METAONHEAP]
+                                                     + usageStats[METAONLIST]);
     }
-    sb.append("\nLRFU eviction heap: " + heapSize + " items");
   }
 }