You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/16 18:54:50 UTC
[2/2] hive git commit: HIVE-16992 : LLAP: monitoring and better
default lambda for LRFU policy (Sergey Shelukhin,
reviewed by Gopal Vijayaraghavan)
HIVE-16992 : LLAP: monitoring and better default lambda for LRFU policy (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f40d9447
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f40d9447
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f40d9447
Branch: refs/heads/master
Commit: f40d94476718df6de42caef3b18811f024fe8713
Parents: 1b074bc
Author: sergey <se...@apache.org>
Authored: Fri Mar 16 11:49:07 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Mar 16 11:49:07 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 10 +-
.../org/apache/hadoop/hive/llap/LlapUtil.java | 67 ++++++
.../hive/llap/cache/CacheContentsTracker.java | 215 +++++++++++++++++++
.../hive/llap/cache/LlapCacheableBuffer.java | 2 +
.../hadoop/hive/llap/cache/LlapDataBuffer.java | 10 +
.../hadoop/hive/llap/cache/LowLevelCache.java | 2 +-
.../hive/llap/cache/LowLevelCacheImpl.java | 3 +-
.../llap/cache/LowLevelLrfuCachePolicy.java | 3 +-
.../hive/llap/cache/SerDeLowLevelCacheImpl.java | 15 +-
.../hive/llap/cache/SimpleBufferManager.java | 5 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 15 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 65 ++----
.../llap/io/encoded/SerDeEncodedDataReader.java | 6 +-
.../hive/llap/io/metadata/MetadataCache.java | 45 ++--
.../llap/io/metadata/OrcFileEstimateErrors.java | 6 +
.../hive/llap/cache/TestLowLevelCacheImpl.java | 32 +--
.../hadoop/hive/llap/LlapCacheAwareFs.java | 12 +-
.../ql/io/orc/encoded/EncodedReaderImpl.java | 17 +-
.../hadoop/hive/ql/io/orc/encoded/Reader.java | 2 +-
.../hive/ql/io/orc/encoded/ReaderImpl.java | 4 +-
.../vector/VectorizedParquetRecordReader.java | 22 +-
.../apache/hadoop/hive/common/io/DataCache.java | 43 ++--
.../hive/common/io/FileMetadataCache.java | 24 ++-
23 files changed, 485 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 06efd02..f8e715d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3190,11 +3190,17 @@ public class HiveConf extends Configuration {
LLAP_ALLOCATOR_DEFRAG_HEADROOM("hive.llap.io.allocator.defrag.headroom", "1Mb",
"How much of a headroom to leave to allow allocator more flexibility to defragment.\n" +
"The allocator would further cap it to a fraction of total memory."),
+ LLAP_TRACK_CACHE_USAGE("hive.llap.io.track.cache.usage", true,
+ "Whether to tag LLAP cache contents, mapping them to Hive entities (paths for\n" +
+ "partitions and tables) for reporting."),
LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
"Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
- LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f,
+ LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.000001f,
"Lambda for ORC low-level cache LRFU cache policy. Must be in [0, 1]. 0 makes LRFU\n" +
- "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly."),
+ "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly.\n" +
+ "The meaning of this parameter is the inverse of the number of time ticks (cache\n" +
+ " operations, currently) that cause the combined recency-frequency of a block in cache\n" +
+ " to be halved."),
LLAP_CACHE_ALLOW_SYNTHETIC_FILEID("hive.llap.cache.allow.synthetic.fileid", false,
"Whether LLAP cache should use synthetic file ID if real one is not available. Systems\n" +
"like HDFS, Isilon, etc. provide a unique file/inode ID. On other FSes (e.g. local\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index aecaacc..50c0e22 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -299,4 +301,69 @@ public class LlapUtil {
LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddressVal);
return server;
}
+
+ // Copied from AcidUtils so we don't have to put the code using this into ql.
+ // TODO: Ideally, AcidUtils class and various constants should be in common.
+ private static final String BASE_PREFIX = "base_", DELTA_PREFIX = "delta_",
+ DELETE_DELTA_PREFIX = "delete_delta_", BUCKET_PREFIX = "bucket_",
+ DATABASE_PATH_SUFFIX = ".db", UNION_SUDBIR_PREFIX = "HIVE_UNION_SUBDIR_";
+
+ public static final char DERIVED_ENTITY_PARTITION_SEPARATOR = '/';
+
+ public static String getDbAndTableNameForMetrics(Path path, boolean includeParts) {
+ String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
+ int dbIx = -1;
+ // Try to find the default db postfix; don't check two last components - at least there
+ // should be a table and file (we could also try to throw away partition/bucket/acid stuff).
+ for (int i = 0; i < parts.length - 2; ++i) {
+ if (!parts[i].endsWith(DATABASE_PATH_SUFFIX)) continue;
+ if (dbIx >= 0) {
+ dbIx = -1; // Let's not guess which one is correct.
+ break;
+ }
+ dbIx = i;
+ }
+ if (dbIx >= 0) {
+ String dbAndTable = parts[dbIx].substring(
+ 0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1];
+ if (!includeParts) return dbAndTable;
+ for (int i = dbIx + 2; i < parts.length; ++i) {
+ if (!parts[i].contains("=")) break;
+ dbAndTable += "/" + parts[i];
+ }
+ return dbAndTable;
+ }
+
+ // Just go from the back and throw away everything we think is wrong; skip last item, the file.
+ boolean isInPartFields = false;
+ for (int i = parts.length - 2; i >= 0; --i) {
+ String p = parts[i];
+ boolean isPartField = p.contains("=");
+ if ((isInPartFields && !isPartField) || (!isPartField && !isSomeHiveDir(p))) {
+ dbIx = i - 1; // Assume this is the table we are at now.
+ break;
+ }
+ isInPartFields = isPartField;
+ }
+ // If we found something before we ran out of components, use it.
+ if (dbIx >= 0) {
+ String dbName = parts[dbIx];
+ if (dbName.endsWith(DATABASE_PATH_SUFFIX)) {
+ dbName = dbName.substring(0, dbName.length() - 3);
+ }
+ String dbAndTable = dbName + "." + parts[dbIx + 1];
+ if (!includeParts) return dbAndTable;
+ for (int i = dbIx + 2; i < parts.length; ++i) {
+ if (!parts[i].contains("=")) break;
+ dbAndTable += "/" + parts[i];
+ }
+ return dbAndTable;
+ }
+ return "unknown";
+ }
+
+ private static boolean isSomeHiveDir(String p) {
+ return p.startsWith(BASE_PREFIX) || p.startsWith(DELTA_PREFIX) || p.startsWith(BUCKET_PREFIX)
+ || p.startsWith(UNION_SUDBIR_PREFIX) || p.startsWith(DELETE_DELTA_PREFIX);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
new file mode 100644
index 0000000..4fbaac1
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+
+/**
+ * A wrapper around cache eviction policy that tracks cache contents via tags.
+ */
+public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListener {
+ private static final long CLEANUP_TIME_MS = 3600 * 1000L, MIN_TIME_MS = 300 * 1000L;
+
+ private final ConcurrentSkipListMap<String, TagState> tagInfo = new ConcurrentSkipListMap<>();
+ private EvictionListener evictionListener;
+ private LowLevelCachePolicy realPolicy;
+ private final Thread cleanupThread;
+
+ public CacheContentsTracker(LowLevelCachePolicy realPolicy) {
+ this.realPolicy = realPolicy;
+ realPolicy.setEvictionListener(this);
+ this.cleanupThread = new Thread(new CleanupRunnable());
+ this.cleanupThread.start();
+ }
+
+ private final class CleanupRunnable implements Runnable {
+ @Override
+ public void run() {
+ final long cleanupTimeNs = CLEANUP_TIME_MS * 1000000L;
+ long sleepTimeMs = CLEANUP_TIME_MS;
+ try {
+ while (true) {
+ Thread.sleep(sleepTimeMs);
+ long timeNs = System.nanoTime();
+ long nextCleanupInNs = cleanupTimeNs;
+ Iterator<TagState> iter = tagInfo.values().iterator();
+ while (iter.hasNext()) {
+ TagState v = iter.next();
+ synchronized (v) {
+ if (v.bufferCount > 0) continue; // The file is still cached.
+ long deltaNs = timeNs - v.emptyTimeNs;
+ if (deltaNs < cleanupTimeNs) {
+ nextCleanupInNs = Math.min(nextCleanupInNs, deltaNs);
+ continue;
+ } else {
+ iter.remove();
+ }
+ }
+ }
+ sleepTimeMs = Math.max(MIN_TIME_MS, nextCleanupInNs / 1000000L);
+ }
+ } catch (InterruptedException ex) {
+ return; // Interrupted.
+ }
+ }
+ }
+
+ private static class TagState {
+ public TagState(String name) {
+ this.name = name;
+ }
+ public final String name;
+ public long emptyTimeNs;
+ public long bufferCount, totalSize, maxCount, maxSize;
+ public boolean isRemoved = false;
+ }
+
+
+ private void reportCached(LlapCacheableBuffer buffer) {
+ long size = buffer.getMemoryUsage();
+ TagState state;
+ do {
+ state = getTagState(buffer);
+ } while (!reportCached(state, size));
+ state = null;
+ do {
+ state = getParentTagState(buffer);
+ if (state == null) break;
+ } while (!reportCached(state, size));
+ }
+
+ private boolean reportCached(TagState state, long size) {
+ synchronized (state) {
+ if (state.isRemoved) return false;
+ ++state.bufferCount;
+ state.totalSize += size;
+ state.maxSize = Math.max(state.maxSize, state.totalSize);
+ state.maxCount = Math.max(state.maxCount, state.bufferCount);
+ }
+ return true;
+ }
+
+ private void reportRemoved(LlapCacheableBuffer buffer) {
+ long size = buffer.getMemoryUsage();
+ TagState state;
+ do {
+ state = getTagState(buffer);
+ } while (!reportRemoved(state, size));
+ state = null;
+ do {
+ state = getParentTagState(buffer);
+ if (state == null) break;
+ } while (!reportRemoved(state, size));
+ }
+
+ private boolean reportRemoved(TagState state, long size) {
+ synchronized (state) {
+ if (state.isRemoved) return false;
+ --state.bufferCount;
+ assert state.bufferCount >= 0;
+ state.totalSize -= size;
+ if (state.bufferCount == 0) {
+ state.emptyTimeNs = System.nanoTime();
+ }
+ }
+ return true;
+ }
+
+ private TagState getTagState(LlapCacheableBuffer buffer) {
+ return getTagState(buffer.getTag());
+ }
+
+ private TagState getParentTagState(LlapCacheableBuffer buffer) {
+ String tag = buffer.getTag();
+ int ix = tag.indexOf(LlapUtil.DERIVED_ENTITY_PARTITION_SEPARATOR);
+ if (ix <= 0) return null;
+ return getTagState(tag.substring(0, ix));
+ }
+
+ private TagState getTagState(String tag) {
+ TagState state = tagInfo.get(tag);
+ if (state == null) {
+ state = new TagState(tag);
+ TagState old = tagInfo.putIfAbsent(tag, state);
+ state = (old == null) ? state : old;
+ }
+ return state;
+ }
+
+
+ @Override
+ public void cache(LlapCacheableBuffer buffer, Priority priority) {
+ realPolicy.cache(buffer, priority);
+ reportCached(buffer);
+ }
+
+ @Override
+ public void notifyLock(LlapCacheableBuffer buffer) {
+ realPolicy.notifyLock(buffer);
+ }
+
+ @Override
+ public void notifyUnlock(LlapCacheableBuffer buffer) {
+ realPolicy.notifyUnlock(buffer);
+ }
+
+ @Override
+ public void setEvictionListener(EvictionListener listener) {
+ evictionListener = listener;
+ }
+
+ @Override
+ public void setParentDebugDumper(LlapOomDebugDump dumper) {
+ realPolicy.setParentDebugDumper(dumper);
+ }
+
+
+ @Override
+ public long evictSomeBlocks(long memoryToReserve) {
+ return realPolicy.evictSomeBlocks(memoryToReserve);
+ }
+
+ @Override
+ public String debugDumpForOom() {
+ return realPolicy.debugDumpForOom();
+ }
+
+ @Override
+ public void debugDumpShort(StringBuilder sb) {
+ sb.append("\nCache state: ");
+ for (TagState state : tagInfo.values()) {
+ synchronized (state) {
+ sb.append("\n").append(state.name).append(": ").append(state.bufferCount).append("/")
+ .append(state.maxCount).append(", ").append(state.totalSize).append("/")
+ .append(state.maxSize);
+ }
+ }
+ realPolicy.debugDumpShort(sb);
+ }
+
+ @Override
+ public void notifyEvicted(LlapCacheableBuffer buffer) {
+ evictionListener.notifyEvicted(buffer);
+ reportRemoved(buffer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
index e976090..f91a5d9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
@@ -56,5 +56,7 @@ public abstract class LlapCacheableBuffer {
+ lastUpdate + " " + (isLocked() ? "!" : ".") + "]";
}
+ public abstract String getTag();
+
protected abstract boolean isLocked();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index 266f46e..405fca2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -24,9 +24,19 @@ public final class LlapDataBuffer extends LlapAllocatorBuffer {
/** ORC cache uses this to store compressed length; buffer is cached uncompressed, but
* the lookup is on compressed ranges, so we need to know this. */
public int declaredCachedLength = UNKNOWN_CACHED_LENGTH;
+ private String tag;
@Override
public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
evictionDispatcher.notifyEvicted(this);
}
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ @Override
+ public String getTag() {
+ return tag;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index a6330a3..af1b699 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -58,7 +58,7 @@ public interface LowLevelCache extends LlapOomDebugDump {
* the replacement chunks from cache are updated directly in the array.
*/
long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
- long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
+ long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag);
/** Notifies the cache that a particular buffer should be removed due to eviction. */
void notifyEvicted(MemoryBuffer buffer);
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index a5494c7..5e102d9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -288,7 +288,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers,
- long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
+ long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) {
long[] result = null;
assert buffers.length == ranges.length;
FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache =
@@ -304,6 +304,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
long offset = ranges[i].getOffset() + baseOffset;
assert buffer.declaredCachedLength == LlapDataBuffer.UNKNOWN_CACHED_LENGTH;
buffer.declaredCachedLength = ranges[i].getLength();
+ buffer.setTag(tag);
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapDataBuffer oldVal = subCache.getCache().putIfAbsent(offset, buffer);
if (oldVal == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 2659d9e..b42f761 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.cache;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -153,7 +154,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
} finally {
listLock.unlock();
}
- // Now insert the buffer in its place and restore heap property.
+ // Now insert the new buffer in its place and restore heap property.
buffer.indexInHeap = 0;
heapifyDownUnderLock(buffer, time);
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index 0871391..cb89d12 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -55,10 +55,18 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer {
public boolean isCached = false;
+ private String tag;
@Override
public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
evictionDispatcher.notifyEvicted(this);
}
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+ @Override
+ public String getTag() {
+ return tag;
+ }
}
private static final class StripeInfoComparator implements
@@ -491,7 +499,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
}
public void putFileData(final FileData data, Priority priority,
- LowLevelCacheCounters qfCounters) {
+ LowLevelCacheCounters qfCounters, String tag) {
// TODO: buffers are accounted for at allocation time, but ideally we should report the memory
// overhead from the java objects to memory manager and remove it when discarding file.
if (data.stripes == null || data.stripes.isEmpty()) {
@@ -521,7 +529,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
}
try {
for (StripeData si : data.stripes) {
- lockAllBuffersForPut(si, priority);
+ lockAllBuffersForPut(si, priority, tag);
}
if (data == cached) {
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
@@ -566,7 +574,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
}
}
- private void lockAllBuffersForPut(StripeData si, Priority priority) {
+ private void lockAllBuffersForPut(StripeData si, Priority priority, String tag) {
for (int i = 0; i < si.data.length; ++i) {
LlapSerDeDataBuffer[][] colData = si.data[i];
if (colData == null) continue;
@@ -576,6 +584,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
for (int k = 0; k < streamData.length; ++k) {
boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet
assert canLock;
+ streamData[k].setTag(tag);
cachePolicy.cache(streamData[k], priority);
streamData[k].isCached = true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
index 027e414..a1b6cae 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -84,9 +84,8 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
}
@Override
- public long[] putFileData(Object fileKey, DiskRange[] ranges,
- MemoryBuffer[] chunks, long baseOffset, Priority priority,
- LowLevelCacheCounters qfCounters) {
+ public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
+ long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) {
for (int i = 0; i < chunks.length; ++i) {
LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)chunks[i];
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 5a397be..e5bc3c2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.CacheContentsTracker;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
@@ -76,6 +77,7 @@ import org.apache.hive.common.util.FixedSizedObjectPool;
+
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -139,6 +141,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
+ boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
+ if (trackUsage) {
+ cachePolicy = new CacheContentsTracker(cachePolicy);
+ }
// Allocator uses memory manager to request memory, so create the manager next.
LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
totalMemorySize, cachePolicy, cacheMetrics);
@@ -256,7 +262,14 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges,
MemoryBuffer[] data, long baseOffset) {
- return lowLevelCache.putFileData(fileKey, ranges, data, baseOffset, Priority.NORMAL, null);
+ return putFileData(fileKey, ranges, data, baseOffset, null);
+ }
+
+ @Override
+ public long[] putFileData(Object fileKey, DiskRange[] ranges,
+ MemoryBuffer[] data, long baseOffset, String tag) {
+ return lowLevelCache.putFileData(
+ fileKey, ranges, data, baseOffset, Priority.NORMAL, null, tag);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 89df943..afb8fc5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
@@ -166,6 +167,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private EncodedReader stripeReader;
private CompressionCodec codec;
private Object fileKey;
+ private final String cacheTag;
private FileSystem fs;
/**
@@ -206,6 +208,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// LlapInputFormat needs to know the file schema to decide if schema evolution is supported.
orcReader = null;
+ cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
+ ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
fs = split.getPath().getFileSystem(jobConf);
@@ -268,7 +272,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
recordReaderTime(startTime);
return null;
}
- counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath()));
+ counters.setDesc(QueryFragmentCounters.Desc.TABLE,
+ LlapUtil.getDbAndTableNameForMetrics(split.getPath(), false));
counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
+ (fileKey == null ? "" : " (" + fileKey + ")"));
try {
@@ -429,7 +434,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// Reader creation updates HDFS counters, don't do it here.
DataWrapperForOrc dw = new DataWrapperForOrc();
stripeReader = orcReader.encodedReader(
- fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool);
+ fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag);
stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
}
@@ -437,49 +442,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
}
- private static String getDbAndTableName(Path path) {
- // Ideally, we'd get this from split; however, split doesn't contain any such thing and it's
- // actually pretty hard to get cause even split generator only uses paths. We only need this
- // for metrics; therefore, brace for BLACK MAGIC!
- String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
- int dbIx = -1;
- // Try to find the default db postfix; don't check two last components - at least there
- // should be a table and file (we could also try to throw away partition/bucket/acid stuff).
- for (int i = 0; i < parts.length - 2; ++i) {
- if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue;
- if (dbIx >= 0) {
- dbIx = -1; // Let's not guess.
- break;
- }
- dbIx = i;
- }
- if (dbIx >= 0) {
- return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1];
- }
-
- // Just go from the back and throw away everything we think is wrong; skip last item, the file.
- boolean isInPartFields = false;
- for (int i = parts.length - 2; i >= 0; --i) {
- String p = parts[i];
- boolean isPartField = p.contains("=");
- if ((isInPartFields && !isPartField) || (!isPartField && !p.startsWith(AcidUtils.BASE_PREFIX)
- && !p.startsWith(AcidUtils.DELTA_PREFIX) && !p.startsWith(AcidUtils.BUCKET_PREFIX))) {
- dbIx = i - 1;
- break;
- }
- isInPartFields = isPartField;
- }
- // If we found something before we ran out of components, use it.
- if (dbIx >= 0) {
- String dbName = parts[dbIx];
- if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) {
- dbName = dbName.substring(0, dbName.length() - 3);
- }
- return dbName + "." + parts[dbIx + 1];
- }
- return "unknown";
- }
-
private void validateFileMetadata() throws IOException {
if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
int bufferSize = fileMetadata.getCompressionBufferSize();
@@ -525,6 +487,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
if (rawDataReader != null && isRawDataReaderOpen) {
try {
rawDataReader.close();
+ rawDataReader = null;
} catch (IOException ex) {
// Ignore.
}
@@ -620,7 +583,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
ensureOrcReader();
ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
if (hasCache) {
- tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb);
+ tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag);
metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's copy of the buffer.
}
FileTail ft = orcReader.getFileTail();
@@ -713,7 +676,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
assert footerRange.next == null; // Can only happens w/zcr for a single input buffer.
if (hasCache) {
LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(
- stripeKey, footerRange.getData().duplicate());
+ stripeKey, footerRange.getData().duplicate(), cacheTag);
metadataCache.decRefBuffer(cacheBuf); // We don't use this one.
}
ByteBuffer bb = footerRange.getData().duplicate();
@@ -941,9 +904,15 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges,
MemoryBuffer[] data, long baseOffset) {
+ return putFileData(fileKey, ranges, data, baseOffset, null);
+ }
+
+ @Override
+ public long[] putFileData(Object fileKey, DiskRange[] ranges,
+ MemoryBuffer[] data, long baseOffset, String tag) {
if (data != null) {
return lowLevelCache.putFileData(
- fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
+ fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag);
} else if (metadataCache != null) {
metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 506146b..8b89ae9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
@@ -146,6 +147,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final Map<Path, PartitionDesc> parts;
private final Object fileKey;
+ private final String cacheTag;
private final FileSystem fs;
private volatile boolean isStopped = false;
@@ -212,6 +214,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
fileKey = determineFileId(fs, split,
HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID));
+ cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
+ ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
this.sourceInputFormat = sourceInputFormat;
this.sourceSerDe = sourceSerDe;
this.reporter = reporter;
@@ -735,7 +739,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
FileData fd = new FileData(fileKey, encodings.length);
fd.addStripe(sd);
- cache.putFileData(fd, Priority.NORMAL, counters);
+ cache.putFileData(fd, Priority.NORMAL, counters, cacheTag);
} else {
lockAllBuffers(sd);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
index cfb3e42..0184e30 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.llap.io.metadata;
import org.apache.hadoop.hive.common.FileUtils;
-
import org.apache.hadoop.hive.common.io.FileMetadataCache;
import java.io.IOException;
@@ -158,22 +157,33 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
@Override
public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer) {
- return putInternal(fileKey, tailBuffer);
+ return putInternal(fileKey, tailBuffer, null);
}
- public LlapBufferOrBuffers putStripeTail(OrcBatchKey stripeKey, ByteBuffer tailBuffer) {
- return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer);
+ @Override
+ public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag) {
+ return putInternal(fileKey, tailBuffer, tag);
}
+ public LlapBufferOrBuffers putStripeTail(
+ OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag) {
+ return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag);
+ }
@Override
public LlapBufferOrBuffers putFileMetadata(
Object fileKey, int length, InputStream is) throws IOException {
+ return putFileMetadata(fileKey, length, is, null);
+ }
+
+ @Override
+ public LlapBufferOrBuffers putFileMetadata(
+ Object fileKey, int length, InputStream is, String tag) throws IOException {
LlapBufferOrBuffers result = null;
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapBufferOrBuffers oldVal = metadata.get(fileKey);
if (oldVal == null) {
- result = wrapBbForFile(result, fileKey, length, is);
+ result = wrapBbForFile(result, fileKey, length, is, tag);
if (!lockBuffer(result, false)) {
throw new AssertionError("Cannot lock a newly created value " + result);
}
@@ -194,14 +204,14 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
@SuppressWarnings({ "rawtypes", "unchecked" })
private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
- Object fileKey, int length, InputStream stream) throws IOException {
+ Object fileKey, int length, InputStream stream, String tag) throws IOException {
if (result != null) return result;
int maxAlloc = allocator.getMaxAllocation();
LlapMetadataBuffer<Object>[] largeBuffers = null;
if (maxAlloc < length) {
largeBuffers = new LlapMetadataBuffer[length / maxAlloc];
for (int i = 0; i < largeBuffers.length; ++i) {
- largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey);
+ largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey, tag);
}
allocator.allocateMultiple(largeBuffers, maxAlloc, null);
for (int i = 0; i < largeBuffers.length; ++i) {
@@ -213,7 +223,7 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
return new LlapMetadataBuffers(largeBuffers);
} else {
LlapMetadataBuffer<Object>[] smallBuffer = new LlapMetadataBuffer[1];
- smallBuffer[0] = new LlapMetadataBuffer(fileKey);
+ smallBuffer[0] = new LlapMetadataBuffer(fileKey, tag);
allocator.allocateMultiple(smallBuffer, length, null);
readIntoCacheBuffer(stream, smallSize, smallBuffer[0]);
if (largeBuffers == null) {
@@ -239,12 +249,12 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
bb.position(pos);
}
- private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer) {
+ private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag) {
LlapBufferOrBuffers result = null;
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapBufferOrBuffers oldVal = metadata.get(key);
if (oldVal == null) {
- result = wrapBb(result, key, tailBuffer);
+ result = wrapBb(result, key, tailBuffer, tag);
oldVal = metadata.putIfAbsent(key, result);
if (oldVal == null) {
cacheInPolicy(result); // Cached successfully, add to policy.
@@ -302,17 +312,17 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
}
private <T> LlapBufferOrBuffers wrapBb(
- LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer) {
+ LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag) {
if (result != null) return result;
if (tailBuffer.remaining() <= allocator.getMaxAllocation()) {
// The common case by far.
- return wrapSmallBb(new LlapMetadataBuffer<T>(key), tailBuffer);
+ return wrapSmallBb(new LlapMetadataBuffer<T>(key, tag), tailBuffer);
} else {
int allocCount = determineAllocCount(tailBuffer);
@SuppressWarnings("unchecked")
LlapMetadataBuffer<T>[] results = new LlapMetadataBuffer[allocCount];
for (int i = 0; i < allocCount; ++i) {
- results[i] = new LlapMetadataBuffer<T>(key);
+ results[i] = new LlapMetadataBuffer<T>(key, tag);
}
wrapLargeBb(results, tailBuffer);
return new LlapMetadataBuffers<T>(results);
@@ -470,9 +480,11 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
public final static class LlapMetadataBuffer<T>
extends LlapAllocatorBuffer implements LlapBufferOrBuffers {
private final T key;
+ private String tag;
- public LlapMetadataBuffer(T key) {
+ public LlapMetadataBuffer(T key, String tag) {
this.key = key;
+ this.tag = tag;
}
@Override
@@ -504,6 +516,11 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
public LlapAllocatorBuffer[] getMultipleLlapBuffers() {
return null;
}
+
+ @Override
+ public String getTag() {
+ return tag;
+ }
}
public final static class LlapMetadataBuffers<T> implements LlapBufferOrBuffers {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index fd8839a..2f7fa24 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -121,4 +121,10 @@ public class OrcFileEstimateErrors extends LlapCacheableBuffer {
protected boolean isLocked() {
return false;
}
+
+ @Override
+ public String getTag() {
+ // We don't care about these.
+ return "OrcEstimates";
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 6a4b598..2c87bc2 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -130,9 +130,9 @@ Example code to test specific scenarios:
LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(),
new DummyAllocator(), true, -1); // no cleanup thread
final int FILE = 1;
- cache.putFileData(FILE, gaps(3756206, 4261729, 7294767, 7547564), fbs(3), 0, Priority.NORMAL, null);
- cache.putFileData(FILE, gaps(7790545, 11051556), fbs(1), 0, Priority.NORMAL, null);
- cache.putFileData(FILE, gaps(11864971, 11912961, 13350968, 13393630), fbs(3), 0, Priority.NORMAL, null);
+ cache.putFileData(FILE, gaps(3756206, 4261729, 7294767, 7547564), fbs(3), 0, Priority.NORMAL, null, null);
+ cache.putFileData(FILE, gaps(7790545, 11051556), fbs(1), 0, Priority.NORMAL, null, null);
+ cache.putFileData(FILE, gaps(11864971, 11912961, 13350968, 13393630), fbs(3), 0, Priority.NORMAL, null, null);
DiskRangeList dr = dr(3756206, 7313562);
MutateHelper mh = new MutateHelper(dr);
dr = dr.insertAfter(dr(7790545, 11051556));
@@ -149,14 +149,14 @@ Example code to test specific scenarios:
long fn1 = 1, fn2 = 2;
MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() };
verifyRefcount(fakes, 1, 1, 1, 1, 1, 1);
- assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null));
- assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3), 0, Priority.NORMAL, null));
+ assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null, null));
+ assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3), 0, Priority.NORMAL, null, null));
verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
verifyCacheGet(cache, fn2, 1, 3, fakes[2], fakes[3]);
verifyCacheGet(cache, fn1, 2, 4, fakes[1], dr(3, 4));
verifyRefcount(fakes, 3, 4, 3, 3, 1, 1);
MemoryBuffer[] bufsDiff = fbs(fakes, 4, 5);
- long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff, 0, Priority.NORMAL, null);
+ long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff, 0, Priority.NORMAL, null, null);
assertEquals(1, mask.length);
assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache.
assertSame(fakes[0], bufsDiff[1]); // Should have been replaced
@@ -207,7 +207,7 @@ Example code to test specific scenarios:
long fn = 1;
MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb() };
assertNull(cache.putFileData(
- fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null));
+ fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null, null));
verifyCacheGet(cache, fn, 1, 9, dr(1, 2), fakes[0], dr(4, 6), fakes[1], dr(8, 9));
verifyCacheGet(cache, fn, 2, 8, fakes[0], dr(4, 6), fakes[1]);
verifyCacheGet(cache, fn, 1, 5, dr(1, 2), fakes[0], dr(4, 5));
@@ -226,7 +226,7 @@ Example code to test specific scenarios:
long fn = 1;
MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb() };
assertNull(cache.putFileData(
- fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null));
+ fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null, null));
// We expect cache requests from the middle here
verifyCacheGet(cache, fn, 3, 4, fakes[0]);
verifyCacheGet(cache, fn, 3, 7, fakes[0], dr(4, 6), fakes[1]);
@@ -239,8 +239,8 @@ Example code to test specific scenarios:
new DummyAllocator(), true, -1); // no cleanup thread
long fn1 = 1, fn2 = 2;
MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb() };
- assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null));
- assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2), 0, Priority.NORMAL, null));
+ assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null, null));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2), 0, Priority.NORMAL, null, null));
verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
verifyCacheGet(cache, fn2, 1, 2, fakes[2]);
verifyRefcount(fakes, 3, 3, 3);
@@ -259,15 +259,15 @@ Example code to test specific scenarios:
long fn1 = 1, fn2 = 2;
MemoryBuffer[] fakes = new MemoryBuffer[] {
fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() };
- assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2), 0, Priority.NORMAL, null));
- assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3), 0, Priority.NORMAL, null));
+ assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2), 0, Priority.NORMAL, null, null));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3), 0, Priority.NORMAL, null, null));
evict(cache, fakes[0]);
evict(cache, fakes[3]);
long[] mask = cache.putFileData(
- fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7), 0, Priority.NORMAL, null);
+ fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7), 0, Priority.NORMAL, null, null);
assertEquals(1, mask.length);
assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and is stale; 4 doesn't
- assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8), 0, Priority.NORMAL, null));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8), 0, Priority.NORMAL, null, null));
verifyCacheGet(cache, fn1, 1, 5, fakes[4], fakes[1], fakes[2], fakes[7]);
}
@@ -304,7 +304,7 @@ Example code to test specific scenarios:
long fn = 1;
MemoryBuffer[] fakes = new MemoryBuffer[]{fb(), fb(), fb()};
cache.putFileData(fn, new DiskRange[]{dr(0, 100), dr(300, 500), dr(800, 1000)},
- fakes, 0, Priority.NORMAL, null);
+ fakes, 0, Priority.NORMAL, null, null);
assertEquals(0, metrics.getCacheRequestedBytes());
assertEquals(0, metrics.getCacheHitBytes());
list = new CreateHelper();
@@ -390,7 +390,7 @@ Example code to test specific scenarios:
buf.setNewAllocLocation(makeFakeArenaIndex(fileIndex, offsets[j]), 0);
buffers[j] = buf;
}
- long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null);
+ long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null, null);
puts += buffers.length;
long maskVal = 0;
if (mask != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index 62a174a..c75dd70 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -61,10 +61,10 @@ public class LlapCacheAwareFs extends FileSystem {
new ConcurrentHashMap<>();
public static Path registerFile(DataCache cache, Path path, Object fileKey,
- TreeMap<Long, Long> index, Configuration conf) throws IOException {
+ TreeMap<Long, Long> index, Configuration conf, String tag) throws IOException {
long splitId = currentSplitId.incrementAndGet();
CacheAwareInputStream stream = new CacheAwareInputStream(
- cache, conf, index, path, fileKey, -1);
+ cache, conf, index, path, fileKey, -1, tag);
if (files.putIfAbsent(splitId, stream) != null) {
throw new IOException("Record already exists for " + splitId);
}
@@ -166,23 +166,25 @@ public class LlapCacheAwareFs extends FileSystem {
private final TreeMap<Long, Long> chunkIndex;
private final Path path;
private final Object fileKey;
+ private final String tag;
private final Configuration conf;
private final DataCache cache;
private final int bufferSize;
private long position = 0;
public CacheAwareInputStream(DataCache cache, Configuration conf,
- TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize) {
+ TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize, String tag) {
this.cache = cache;
this.fileKey = fileKey;
this.chunkIndex = chunkIndex;
this.path = path;
this.conf = conf;
this.bufferSize = bufferSize;
+ this.tag = tag;
}
public LlapCacheAwareFs.CacheAwareInputStream cloneWithBufferSize(int bufferSize) {
- return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize);
+ return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize, tag);
}
@Override
@@ -307,7 +309,7 @@ public class LlapCacheAwareFs extends FileSystem {
}
smallBuffer = null;
}
- cache.putFileData(fileKey, cacheRanges, newCacheData, 0);
+ cache.putFileData(fileKey, cacheRanges, newCacheData, 0, tag);
} finally {
// We do not use the new cache buffers for the actual read, given the way read() API is.
// Therefore, we don't need to handle cache collisions - just decref all the buffers.
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 462c62f..4e17394 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -62,6 +62,8 @@ import org.apache.orc.impl.BufferChunk;
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace.RangesSrc;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
import org.apache.orc.OrcProto;
import com.google.common.annotations.VisibleForTesting;
@@ -130,7 +132,7 @@ class EncodedReaderImpl implements EncodedReader {
private final Object fileKey;
private final DataReader dataReader;
private boolean isDataReaderOpen = false;
- private final CompressionCodec codec;
+ private CompressionCodec codec;
private final boolean isCodecFromPool;
private boolean isCodecFailure = false;
private final boolean isCompressed;
@@ -143,11 +145,12 @@ class EncodedReaderImpl implements EncodedReader {
private final IoTrace trace;
private final TypeDescription fileSchema;
private final WriterVersion version;
+ private final String tag;
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException {
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException {
this.fileKey = fileKey;
this.compressionKind = kind;
this.isCompressed = kind != org.apache.orc.CompressionKind.NONE;
@@ -161,6 +164,7 @@ class EncodedReaderImpl implements EncodedReader {
this.cacheWrapper = cacheWrapper;
this.dataReader = dataReader;
this.trace = trace;
+ this.tag = tag;
if (POOLS != null) return;
if (pf == null) {
pf = new NoopPoolFactory();
@@ -686,6 +690,7 @@ class EncodedReaderImpl implements EncodedReader {
} else {
codec.close();
}
+ codec = null;
} catch (Exception ex) {
LOG.error("Ignoring error from codec", ex);
} finally {
@@ -849,7 +854,7 @@ class EncodedReaderImpl implements EncodedReader {
if (badEstimates != null && !badEstimates.isEmpty()) {
// Relies on the fact that cache does not actually store these.
DiskRange[] cacheKeys = badEstimates.toArray(new DiskRange[badEstimates.size()]);
- long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset);
+ long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset, tag);
assert result == null; // We don't expect conflicts from bad estimates.
}
@@ -909,7 +914,7 @@ class EncodedReaderImpl implements EncodedReader {
// 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
long[] collisionMask = cacheWrapper.putFileData(
- fileKey, cacheKeys, targetBuffers, baseOffset);
+ fileKey, cacheKeys, targetBuffers, baseOffset, tag);
processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
}
@@ -1163,7 +1168,8 @@ class EncodedReaderImpl implements EncodedReader {
// 5. Put uncompressed data to cache.
if (fileKey != null) {
- long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
+ long[] collisionMask = cacheWrapper.putFileData(
+ fileKey, cacheKeys, targetBuffers, baseOffset, tag);
processCacheCollisions(collisionMask, toCache, targetBuffers, null);
}
@@ -1261,6 +1267,7 @@ class EncodedReaderImpl implements EncodedReader {
}
codec.reset(); // We always need to call reset on the codec.
codec.decompress(src, dest);
+
dest.position(startPos);
int newLim = dest.limit();
if (newLim > startLim) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 57fb63b..210c987 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -46,7 +46,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
* @return The reader.
*/
EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException;
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException;
/** The factory that can create (or return) the pools used by encoded reader. */
public interface PoolFactory {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index 49cd9ba..a9a9f10 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -35,8 +35,8 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
@Override
public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException {
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException {
return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(),
- bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool);
+ bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 7b77eee..f64efe2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.FileMetadataCache;
import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
-
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapCacheAwareFs;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -188,6 +188,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
// if task.side.metadata is set, rowGroupOffsets is null
Object cacheKey = null;
+ String cacheTag = null;
// TODO: also support fileKey in splits, like OrcSplit does
if (metadataCache != null) {
cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file,
@@ -195,6 +196,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID));
}
if (cacheKey != null) {
+ if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) {
+ cacheTag = LlapUtil.getDbAndTableNameForMetrics(file, true);
+ }
// If we are going to use cache, change the path to depend on file ID for extra consistency.
FileSystem fs = file.getFileSystem(configuration);
if (cacheKey instanceof Long && HiveConf.getBoolVar(
@@ -207,13 +211,13 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
//TODO check whether rowGroupOffSets can be null
// then we need to apply the predicate push down filter
footer = readSplitFooter(
- configuration, file, cacheKey, range(split.getStart(), split.getEnd()));
+ configuration, file, cacheKey, range(split.getStart(), split.getEnd()), cacheTag);
MessageType fileSchema = footer.getFileMetaData().getSchema();
FilterCompat.Filter filter = getFilter(configuration);
blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
} else {
// otherwise we find the row groups that were selected on the client
- footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER);
+ footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER, cacheTag);
Set<Long> offsets = new HashSet<>();
for (long offset : rowGroupOffsets) {
offsets.add(offset);
@@ -250,13 +254,13 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
requestedSchema = DataWritableReadSupport
.getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration);
- Path path = wrapPathForCache(file, cacheKey, configuration, blocks);
+ Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag);
this.reader = new ParquetFileReader(
configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
}
private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration,
- List<BlockMetaData> blocks) throws IOException {
+ List<BlockMetaData> blocks, String tag) throws IOException {
if (fileKey == null || cache == null) {
return path;
}
@@ -277,13 +281,13 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
// Register the cache-aware path so that Parquet reader would go thru it.
configuration.set("fs." + LlapCacheAwareFs.SCHEME + ".impl",
LlapCacheAwareFs.class.getCanonicalName());
- path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration);
+ path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration, tag);
this.cacheFsPath = path;
return path;
}
- private ParquetMetadata readSplitFooter(
- JobConf configuration, final Path file, Object cacheKey, MetadataFilter filter) throws IOException {
+ private ParquetMetadata readSplitFooter(JobConf configuration, final Path file,
+ Object cacheKey, MetadataFilter filter, String tag) throws IOException {
MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null
: metadataCache.getFileMetadata(cacheKey);
if (footerData != null) {
@@ -313,7 +317,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
if (LOG.isInfoEnabled()) {
LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey);
}
- footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream);
+ footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag);
try {
return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
} finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
index 795739b..2ac0a18 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
@@ -60,25 +60,7 @@ public interface DataCache {
DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset,
DiskRangeListFactory factory, BooleanRef gotAllData);
- /**
- * Puts file data into cache, or gets older data in case of collisions.
- *
- * The memory buffers provided MUST be allocated via an allocator returned by getAllocator
- * method, to allow cache implementations that evict and then de-allocate the buffer.
- *
- * It is assumed that the caller will use the data immediately, therefore any buffers provided
- * to putFileData (or returned due to cache collision) are locked in cache to prevent eviction,
- * and must therefore be released back to cache via a corresponding call (releaseBuffer) when the
- * caller is done with it. Buffers rejected due to conflict will neither be locked, nor
- * automatically deallocated. The caller must take care to discard these buffers.
- *
- * @param fileKey Unique ID of the target file on the file system.
- * @param ranges The ranges for which the data is being cached. These objects will not be stored.
- * @param data The data for the corresponding ranges.
- * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC).
- * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
- * the replacement chunks from cache are updated directly in the array.
- */
+ @Deprecated
long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset);
/**
@@ -106,4 +88,27 @@ public interface DataCache {
* @return the factory
*/
Allocator.BufferObjectFactory getDataBufferFactory();
+
+
+ /**
+ * Puts file data into cache, or gets older data in case of collisions.
+ *
+ * The memory buffers provided MUST be allocated via an allocator returned by getAllocator
+ * method, to allow cache implementations that evict and then de-allocate the buffer.
+ *
+ * It is assumed that the caller will use the data immediately, therefore any buffers provided
+ * to putFileData (or returned due to cache collision) are locked in cache to prevent eviction,
+ * and must therefore be released back to cache via a corresponding call (releaseBuffer) when the
+ * caller is done with it. Buffers rejected due to conflict will neither be locked, nor
+ * automatically deallocated. The caller must take care to discard these buffers.
+ *
+ * @param fileKey Unique ID of the target file on the file system.
+ * @param ranges The ranges for which the data is being cached. These objects will not be stored.
+ * @param data The data for the corresponding ranges.
+ * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC).
+ * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
+ * the replacement chunks from cache are updated directly in the array.
+ */
+ long[] putFileData(Object fileKey, DiskRange[] ranges,
+ MemoryBuffer[] data, long baseOffset, String tag);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
index b417bd3..d1da7f5 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
@@ -32,17 +32,11 @@ public interface FileMetadataCache {
*/
MemoryBufferOrBuffers getFileMetadata(Object fileKey);
- /**
- * Puts the metadata for a given file (e.g. a footer buffer into cache).
- * @param fileKey The file key.
- * @param length The footer length.
- * @param is The stream to read the footer from.
- * @return The buffer or buffers representing the cached footer.
- * The caller must decref this buffer when done.
- */
+ @Deprecated
MemoryBufferOrBuffers putFileMetadata(
Object fileKey, int length, InputStream is) throws IOException;
+ @Deprecated
MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer);
/**
@@ -50,4 +44,18 @@ public interface FileMetadataCache {
* @param buffer The buffer to release.
*/
void decRefBuffer(MemoryBufferOrBuffers buffer);
+
+
+ /**
+ * Puts the metadata for a given file (e.g. a footer buffer into cache).
+ * @param fileKey The file key.
+ * @param length The footer length.
+ * @param is The stream to read the footer from.
+ * @return The buffer or buffers representing the cached footer.
+ * The caller must decref this buffer when done.
+ */
+ MemoryBufferOrBuffers putFileMetadata(
+ Object fileKey, int length, InputStream is, String tag) throws IOException;
+
+ MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag);
}
\ No newline at end of file