You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2019/03/25 16:17:46 UTC
[hive] 01/02: HIVE-21422: Add metrics to LRFU cache policy (Oliver
Draese, reviewed by Gopal V)
This is an automated email from the ASF dual-hosted git repository.
gopalv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
commit 741630c1b81cb6928b935e3ca8c0bbbbed46838b
Author: Olli Draese <od...@cloudera.com>
AuthorDate: Mon Mar 25 09:16:37 2019 -0700
HIVE-21422: Add metrics to LRFU cache policy (Oliver Draese, reviewed by Gopal V)
Signed-off-by: Gopal V <go...@apache.org>
---
.../hive/llap/cache/LowLevelLrfuCachePolicy.java | 287 ++++++++++++++++++---
1 file changed, 251 insertions(+), 36 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 704f2f1..759819d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -27,6 +27,15 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.impl.MsInfo;
/**
* Implementation of the algorithm from "On the Existence of a Spectrum of Policies
@@ -35,14 +44,14 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
*/
public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
private final double lambda;
- private final double f(long x) {
+ private double f(long x) {
return Math.pow(0.5, lambda * x);
}
private static final double F0 = 1; // f(0) is always 1
- private final double touchPriority(long time, long lastAccess, double previous) {
+ private double touchPriority(long time, long lastAccess, double previous) {
return F0 + f(time - lastAccess) * previous;
}
- private final double expirePriority(long time, long lastAccess, double previous) {
+ private double expirePriority(long time, long lastAccess, double previous) {
return f(time - lastAccess) * previous;
}
@@ -65,6 +74,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
private int heapSize = 0;
private final int maxHeapSize;
private EvictionListener evictionListener;
+ private final PolicyMetrics metrics;
public LowLevelLrfuCachePolicy(int minBufferSize, long maxSize, Configuration conf) {
lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
@@ -79,7 +89,18 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
minBufferSize, lambda, maxHeapSize);
heap = new LlapCacheableBuffer[maxHeapSize];
- listHead = listTail = null;
+ listHead = null;
+ listTail = null;
+
+ String sessID = conf.get("llap.daemon.metrics.sessionid");
+ if (null == sessID) {
+ sessID = "<unknown>";
+ }
+
+ // register new metrics provider for this cache policy
+ metrics = new PolicyMetrics(sessID);
+ LlapMetricsSystem.instance().register("LowLevelLrfuCachePolicy-" + MetricsUtils.getHostName(),
+ null, metrics);
}
@Override
@@ -110,8 +131,10 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
// a locked item in either, it will remove it from cache; when we unlock, we are going to
// put it back or update it, depending on whether this has happened. This should cause
// most of the expensive cache update work to happen in unlock, not blocking processing.
- if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return;
- if (!listLock.tryLock()) return;
+ if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST || !listLock.tryLock()) {
+ return;
+ }
+
removeFromListAndUnlock(buffer);
}
@@ -149,7 +172,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
listHead.prev = demoted;
listHead = demoted;
} else {
- listHead = listTail = demoted;
+ listHead = demoted;
+ listTail = demoted;
demoted.next = null;
}
} finally {
@@ -179,7 +203,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
LlapCacheableBuffer oldTail = null;
listLock.lock();
try {
- LlapCacheableBuffer current = oldTail = listTail;
+ LlapCacheableBuffer current = listTail;
+ oldTail = listTail;
while (current != null) {
boolean canEvict = LlapCacheableBuffer.INVALIDATE_OK == current.invalidate();
current.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
@@ -192,7 +217,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
current = newCurrent;
}
}
- listHead = listTail = null;
+ listHead = null;
+ listTail = null;
} finally {
listLock.unlock();
}
@@ -221,7 +247,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
for (int i = 0; i < oldHeapSize; ++i) {
current = oldHeap[i];
- if (current == null) continue;
+ if (current == null) {
+ continue;
+ }
evicted += current.getMemoryUsage();
evictionListener.notifyEvicted(current);
}
@@ -240,7 +268,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
if (current.prev != null) {
current.prev.next = current.next;
}
- current.prev = current.next = null;
+ current.prev = null;
+ current.next = null;
return tail;
}
@@ -249,7 +278,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
public long evictSomeBlocks(long memoryToReserve) {
// In normal case, we evict the items from the list.
long evicted = evictFromList(memoryToReserve);
- if (evicted >= memoryToReserve) return evicted;
+ if (evicted >= memoryToReserve) {
+ return evicted;
+ }
// This should not happen unless we are evicting a lot at once, or buffers are large (so
// there's a small number of buffers and they all live in the heap).
long time = timer.get();
@@ -258,7 +289,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
synchronized (heapLock) {
buffer = evictFromHeapUnderLock(time);
}
- if (buffer == null) return evicted;
+ if (buffer == null) {
+ return evicted;
+ }
evicted += buffer.getMemoryUsage();
evictionListener.notifyEvicted(buffer);
}
@@ -273,7 +306,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
// Therefore we always evict one contiguous sequence from the tail. We can find it in one pass,
// splice it out and then finalize the eviction outside of the list lock.
try {
- nextCandidate = firstCandidate = listTail;
+ nextCandidate = listTail;
+ firstCandidate = listTail;
while (evicted < memoryToReserve && nextCandidate != null) {
if (LlapCacheableBuffer.INVALIDATE_OK != nextCandidate.invalidate()) {
// Locked, or invalidated, buffer was in the list - just drop it;
@@ -293,7 +327,8 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
if (firstCandidate != nextCandidate) {
if (nextCandidate == null) {
- listHead = listTail = null; // We have evicted the entire list.
+ listHead = null;
+ listTail = null; // We have evicted the entire list.
} else {
// Splice the section that we have evicted out of the list.
// We have already updated the state above so no need to do that again.
@@ -313,9 +348,13 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
// Note: rarely called (unless buffers are very large or we evict a lot, or in LFU case).
private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
while (true) {
- if (heapSize == 0) return null;
+ if (heapSize == 0) {
+ return null;
+ }
LlapCacheableBuffer result = evictHeapElementUnderLock(time, 0);
- if (result != null) return result;
+ if (result != null) {
+ return result;
+ }
}
}
@@ -324,11 +363,15 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
int ix = buffer.indexInHeap;
double priority = buffer.priority;
while (true) {
- if (ix == 0) break; // Buffer is at the top of the heap.
+ if (ix == 0) {
+ break; // Buffer is at the top of the heap.
+ }
int parentIx = (ix - 1) >>> 1;
LlapCacheableBuffer parent = heap[parentIx];
double parentPri = getHeapifyPriority(parent, time);
- if (priority >= parentPri) break;
+ if (priority >= parentPri) {
+ break;
+ }
heap[ix] = parent;
parent.indexInHeap = ix;
ix = parentIx;
@@ -369,7 +412,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
double priority = buffer.priority;
while (true) {
int newIx = moveMinChildUp(ix, time, priority);
- if (newIx == -1) break;
+ if (newIx == -1) {
+ break;
+ }
ix = newIx;
}
buffer.indexInHeap = ix;
@@ -384,7 +429,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
*/
private int moveMinChildUp(int targetPos, long time, double comparePri) {
int leftIx = (targetPos << 1) + 1, rightIx = leftIx + 1;
- if (leftIx >= heapSize) return -1; // Buffer is at the leaf node.
+ if (leftIx >= heapSize) {
+ return -1; // Buffer is at the leaf node.
+ }
LlapCacheableBuffer left = heap[leftIx], right = null;
if (rightIx < heapSize) {
right = heap[rightIx];
@@ -405,7 +452,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
}
private double getHeapifyPriority(LlapCacheableBuffer buf, long time) {
- if (buf == null) return Double.MAX_VALUE;
+ if (buf == null) {
+ return Double.MAX_VALUE;
+ }
if (buf.lastUpdate != time && time >= 0) {
buf.priority = expirePriority(time, buf.lastUpdate, buf.priority);
buf.lastUpdate = time;
@@ -415,7 +464,9 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
private void removeFromListAndUnlock(LlapCacheableBuffer buffer) {
try {
- if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) return;
+ if (buffer.indexInHeap != LlapCacheableBuffer.IN_LIST) {
+ return;
+ }
removeFromListUnderLock(buffer);
} finally {
listLock.unlock();
@@ -547,20 +598,184 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
@Override
public void debugDumpShort(StringBuilder sb) {
- sb.append("\nLRFU eviction list: ");
- LlapCacheableBuffer listHeadLocal = listHead, listTailLocal = listTail;
- if (listHeadLocal == null) {
- sb.append("0 items");
- } else {
- LlapCacheableBuffer listItem = listHeadLocal;
- int c = 0;
- while (listItem != null) {
- ++c;
- if (listItem == listTailLocal) break;
- listItem = listItem.next;
+ long[] metricData = metrics.getUsageStats();
+ sb.append("\nLRFU eviction list: ")
+ .append(metricData[PolicyMetrics.LISTSIZE]).append(" items");
+ sb.append("\nLRFU eviction heap: ")
+ .append(heapSize).append(" items (of max ").append(maxHeapSize).append(")");
+ sb.append("\nLRFU data on heap: ")
+ .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.DATAONHEAP]));
+ sb.append("\nLRFU metadata on heap: ")
+ .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.METAONHEAP]));
+ sb.append("\nLRFU data on eviction list: ")
+ .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.DATAONLIST]));
+ sb.append("\nLRFU metadata on eviction list: ")
+ .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.METAONLIST]));
+ sb.append("\nLRFU data locked: ")
+ .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.LOCKEDDATA]));
+ sb.append("\nLRFU metadata locked: ")
+ .append(LlapUtil.humanReadableByteCount(metricData[PolicyMetrics.LOCKEDMETA]));
+ }
+
+ /**
+ * Metrics Information for LRFU specific policy information.
+ * This enumeration is used by the @code PolicyMetrics instance to
+ * define and describe the metrics.
+ */
+ private enum PolicyInformation implements MetricsInfo {
+ PolicyMetrics("LRFU cache policy based metrics"),
+ DataOnHeap("Amount of bytes used for data on min-heap"),
+ DataOnList("Amount of bytes used for data on eviction short list"),
+ MetaOnHeap("Amount of bytes used for meta data on min-heap"),
+ MetaOnList("Amount of bytes used for meta data on eviction short list"),
+ DataLocked("Amount of locked data in bytes (in use)"),
+ MetaLocked("Amount of locked meta data in bytes (in use)"),
+ HeapSize("Number of buffers on the min-heap"),
+ HeapSizeMax("Capacity (number of buffers) of the min-heap"),
+ ListSize("Number of buffers on the eviction short list"),
+ TotalData("Total amount of bytes, used for data"),
+ TotalMeta("Total amount of bytes, used for meta data");
+
+ private final String description; // metric explaination
+
+ /**
+ * Creates a new enum value.
+ *
+ * @param description The explaination of the metric
+ */
+ PolicyInformation(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String description() {
+ return description;
+ }
+ }
+
+ /**
+ * Metrics provider for the LRFU cache policy.
+ * An instance of this class is providing JMX (through haddoop metrics)
+ * statistics for the LRFU cache policy for monitoring.
+ */
+ @Metrics(about = "LRFU Cache Policy Metrics", context = "cache")
+ private class PolicyMetrics implements MetricsSource {
+ public static final int DATAONHEAP = 0;
+ public static final int DATAONLIST = 1;
+ public static final int METAONHEAP = 2;
+ public static final int METAONLIST = 3;
+ public static final int LISTSIZE = 4;
+ public static final int LOCKEDDATA = 5;
+ public static final int LOCKEDMETA = 6;
+
+ private final String session; // identifier for the LLAP daemon
+
+ /**
+ * Creates a new metrics producer.
+ *
+ * @param session The LLAP daemon identifier
+ */
+ PolicyMetrics(String session) {
+ this.session = session;
+ }
+
+ /**
+ * Helper to get some basic LRFU usage statistics.
+ * This method returns a long array with the following content:
+ * - amount of data (bytes) on min-heap
+ * - amount of data (bytes) on eviction short list
+ * - amount of metadata (bytes) on min-heap
+ * - amount of metadata (bytes) on eviction short list
+ * - size of the eviction short list
+ * - amount of locked bytes for data
+ * - amount of locked bytes for metadata
+ *
+ * @return long array with LRFU stats
+ */
+ public long[] getUsageStats() {
+ long dataOnHeap = 0L; // all non-meta related buffers on min-heap
+ long dataOnList = 0L; // all non-meta related buffers on eviction list
+ long metaOnHeap = 0L; // meta data buffers on min-heap
+ long metaOnList = 0L; // meta data buffers on eviction list
+ long listSize = 0L; // number of entries on eviction list
+ long lockedData = 0L; // number of bytes in locked data buffers
+ long lockedMeta = 0L; // number of bytes in locked metadata buffers
+
+ // aggregate values on the heap
+ synchronized (heapLock) {
+ for (int heapIdx = 0; heapIdx < heapSize; ++heapIdx) {
+ LlapCacheableBuffer buff = heap[heapIdx];
+
+ if (null != buff) {
+ if (buff instanceof LlapMetadataBuffer) {
+ metaOnHeap += buff.getMemoryUsage();
+ if (buff.isLocked()) {
+ lockedMeta += buff.getMemoryUsage();
+ }
+ } else {
+ dataOnHeap += buff.getMemoryUsage();
+ if (buff.isLocked()) {
+ lockedData += buff.getMemoryUsage();
+ }
+ }
+ }
+ }
}
- sb.append(c + " items");
+
+ // aggregate values on the evicition short list
+ try {
+ listLock.lock();
+ LlapCacheableBuffer scan = listHead;
+ while (null != scan) {
+ if (scan instanceof LlapMetadataBuffer) {
+ metaOnList += scan.getMemoryUsage();
+ if (scan.isLocked()) {
+ lockedMeta += scan.getMemoryUsage();
+ }
+ } else {
+ dataOnList += scan.getMemoryUsage();
+ if (scan.isLocked()) {
+ lockedData += scan.getMemoryUsage();
+ }
+ }
+
+ ++listSize;
+ scan = scan.next;
+ }
+ } finally {
+ listLock.unlock();
+ }
+
+ return new long[] {dataOnHeap, dataOnList,
+ metaOnHeap, metaOnList, listSize,
+ lockedData, lockedMeta};
+ }
+
+ @Override
+ public synchronized void getMetrics(MetricsCollector collector, boolean all) {
+ long[] usageStats = getUsageStats();
+
+ // start a new record
+ MetricsRecordBuilder mrb = collector.addRecord(PolicyInformation.PolicyMetrics)
+ .setContext("cache")
+ .tag(MsInfo.ProcessName,
+ MetricsUtils.METRICS_PROCESS_NAME)
+ .tag(MsInfo.SessionId, session);
+
+ // add the values to the new record
+ mrb.addCounter(PolicyInformation.DataOnHeap, usageStats[DATAONHEAP])
+ .addCounter(PolicyInformation.DataOnList, usageStats[DATAONLIST])
+ .addCounter(PolicyInformation.MetaOnHeap, usageStats[METAONHEAP])
+ .addCounter(PolicyInformation.MetaOnList, usageStats[METAONLIST])
+ .addCounter(PolicyInformation.DataLocked, usageStats[LOCKEDDATA])
+ .addCounter(PolicyInformation.MetaLocked, usageStats[LOCKEDMETA])
+ .addCounter(PolicyInformation.HeapSize, heapSize)
+ .addCounter(PolicyInformation.HeapSizeMax, maxHeapSize)
+ .addCounter(PolicyInformation.ListSize, usageStats[LISTSIZE])
+ .addCounter(PolicyInformation.TotalData, usageStats[DATAONHEAP]
+ + usageStats[DATAONLIST])
+ .addCounter(PolicyInformation.TotalMeta, usageStats[METAONHEAP]
+ + usageStats[METAONLIST]);
}
- sb.append("\nLRFU eviction heap: " + heapSize + " items");
}
}