You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/03/16 18:54:50 UTC

[2/2] hive git commit: HIVE-16992 : LLAP: monitoring and better default lambda for LRFU policy (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)

HIVE-16992 : LLAP: monitoring and better default lambda for LRFU policy (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f40d9447
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f40d9447
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f40d9447

Branch: refs/heads/master
Commit: f40d94476718df6de42caef3b18811f024fe8713
Parents: 1b074bc
Author: sergey <se...@apache.org>
Authored: Fri Mar 16 11:49:07 2018 -0700
Committer: sergey <se...@apache.org>
Committed: Fri Mar 16 11:49:07 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  10 +-
 .../org/apache/hadoop/hive/llap/LlapUtil.java   |  67 ++++++
 .../hive/llap/cache/CacheContentsTracker.java   | 215 +++++++++++++++++++
 .../hive/llap/cache/LlapCacheableBuffer.java    |   2 +
 .../hadoop/hive/llap/cache/LlapDataBuffer.java  |  10 +
 .../hadoop/hive/llap/cache/LowLevelCache.java   |   2 +-
 .../hive/llap/cache/LowLevelCacheImpl.java      |   3 +-
 .../llap/cache/LowLevelLrfuCachePolicy.java     |   3 +-
 .../hive/llap/cache/SerDeLowLevelCacheImpl.java |  15 +-
 .../hive/llap/cache/SimpleBufferManager.java    |   5 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |  15 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |  65 ++----
 .../llap/io/encoded/SerDeEncodedDataReader.java |   6 +-
 .../hive/llap/io/metadata/MetadataCache.java    |  45 ++--
 .../llap/io/metadata/OrcFileEstimateErrors.java |   6 +
 .../hive/llap/cache/TestLowLevelCacheImpl.java  |  32 +--
 .../hadoop/hive/llap/LlapCacheAwareFs.java      |  12 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    |  17 +-
 .../hadoop/hive/ql/io/orc/encoded/Reader.java   |   2 +-
 .../hive/ql/io/orc/encoded/ReaderImpl.java      |   4 +-
 .../vector/VectorizedParquetRecordReader.java   |  22 +-
 .../apache/hadoop/hive/common/io/DataCache.java |  43 ++--
 .../hive/common/io/FileMetadataCache.java       |  24 ++-
 23 files changed, 485 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 06efd02..f8e715d 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3190,11 +3190,17 @@ public class HiveConf extends Configuration {
     LLAP_ALLOCATOR_DEFRAG_HEADROOM("hive.llap.io.allocator.defrag.headroom", "1Mb",
         "How much of a headroom to leave to allow allocator more flexibility to defragment.\n" +
         "The allocator would further cap it to a fraction of total memory."),
+    LLAP_TRACK_CACHE_USAGE("hive.llap.io.track.cache.usage", true,
+         "Whether to tag LLAP cache contents, mapping them to Hive entities (paths for\n" +
+         "partitions and tables) for reporting."),
     LLAP_USE_LRFU("hive.llap.io.use.lrfu", true,
         "Whether ORC low-level cache should use LRFU cache policy instead of default (FIFO)."),
-    LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.01f,
+    LLAP_LRFU_LAMBDA("hive.llap.io.lrfu.lambda", 0.000001f,
         "Lambda for ORC low-level cache LRFU cache policy. Must be in [0, 1]. 0 makes LRFU\n" +
-        "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly."),
+        "behave like LFU, 1 makes it behave like LRU, values in between balance accordingly.\n" +
+        "The meaning of this parameter is the inverse of the number of time ticks (cache\n" +
+        " operations, currently) that cause the combined recency-frequency of a block in cache\n" +
+        " to be halved."),
     LLAP_CACHE_ALLOW_SYNTHETIC_FILEID("hive.llap.cache.allow.synthetic.fileid", false,
         "Whether LLAP cache should use synthetic file ID if real one is not available. Systems\n" +
         "like HDFS, Isilon, etc. provide a unique file/inode ID. On other FSes (e.g. local\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index aecaacc..50c0e22 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -299,4 +301,69 @@ public class LlapUtil {
     LOG.info("Instantiated " + protocolClass.getSimpleName() + " at " + bindAddressVal);
     return server;
   }
+
+  // Copied from AcidUtils so we don't have to put the code using this into ql.
+  // TODO: Ideally, AcidUtils class and various constants should be in common.
+  private static final String BASE_PREFIX = "base_", DELTA_PREFIX = "delta_",
+      DELETE_DELTA_PREFIX = "delete_delta_", BUCKET_PREFIX = "bucket_",
+      DATABASE_PATH_SUFFIX = ".db", UNION_SUDBIR_PREFIX = "HIVE_UNION_SUBDIR_";
+
+  public static final char DERIVED_ENTITY_PARTITION_SEPARATOR = '/';
+
+  public static String getDbAndTableNameForMetrics(Path path, boolean includeParts) {
+    String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
+    int dbIx = -1;
+    // Try to find the default db postfix; don't check two last components - at least there
+    // should be a table and file (we could also try to throw away partition/bucket/acid stuff).
+    for (int i = 0; i < parts.length - 2; ++i) {
+      if (!parts[i].endsWith(DATABASE_PATH_SUFFIX)) continue;
+      if (dbIx >= 0) {
+        dbIx = -1; // Let's not guess which one is correct.
+        break;
+      }
+      dbIx = i;
+    }
+    if (dbIx >= 0) {
+      String dbAndTable = parts[dbIx].substring(
+          0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1];
+      if (!includeParts) return dbAndTable;
+      for (int i = dbIx + 2; i < parts.length; ++i) {
+        if (!parts[i].contains("=")) break;
+        dbAndTable += "/" + parts[i];
+      }
+      return dbAndTable;
+    }
+
+    // Just go from the back and throw away everything we think is wrong; skip last item, the file.
+    boolean isInPartFields = false;
+    for (int i = parts.length - 2; i >= 0; --i) {
+      String p = parts[i];
+      boolean isPartField = p.contains("=");
+      if ((isInPartFields && !isPartField) || (!isPartField && !isSomeHiveDir(p))) {
+        dbIx = i - 1; // Assume this is the table we are at now.
+        break;
+      }
+      isInPartFields = isPartField;
+    }
+    // If we found something before we ran out of components, use it.
+    if (dbIx >= 0) {
+      String dbName = parts[dbIx];
+      if (dbName.endsWith(DATABASE_PATH_SUFFIX)) {
+        dbName = dbName.substring(0, dbName.length() - 3);
+      }
+      String dbAndTable = dbName + "." + parts[dbIx + 1];
+      if (!includeParts) return dbAndTable;
+      for (int i = dbIx + 2; i < parts.length; ++i) {
+        if (!parts[i].contains("=")) break;
+        dbAndTable += "/" + parts[i];
+      }
+      return dbAndTable;
+    }
+    return "unknown";
+  }
+
+  private static boolean isSomeHiveDir(String p) {
+    return p.startsWith(BASE_PREFIX) || p.startsWith(DELTA_PREFIX) || p.startsWith(BUCKET_PREFIX)
+        || p.startsWith(UNION_SUDBIR_PREFIX) || p.startsWith(DELETE_DELTA_PREFIX);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
new file mode 100644
index 0000000..4fbaac1
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+
+/**
+ * A wrapper around cache eviction policy that tracks cache contents via tags.
+ */
+public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListener {
+  private static final long CLEANUP_TIME_MS = 3600 * 1000L, MIN_TIME_MS = 300 * 1000L;
+
+  private final ConcurrentSkipListMap<String, TagState> tagInfo = new ConcurrentSkipListMap<>();
+  private EvictionListener evictionListener;
+  private LowLevelCachePolicy realPolicy;
+  private final Thread cleanupThread;
+
+  public CacheContentsTracker(LowLevelCachePolicy realPolicy) {
+    this.realPolicy = realPolicy;
+    realPolicy.setEvictionListener(this);
+    this.cleanupThread = new Thread(new CleanupRunnable());
+    this.cleanupThread.start();
+  }
+
+  private final class CleanupRunnable implements Runnable {
+    @Override
+    public void run() {
+      final long cleanupTimeNs = CLEANUP_TIME_MS * 1000000L;
+      long sleepTimeMs = CLEANUP_TIME_MS;
+      try {
+        while (true) {
+          Thread.sleep(sleepTimeMs);
+          long timeNs = System.nanoTime();
+          long nextCleanupInNs = cleanupTimeNs;
+          Iterator<TagState> iter = tagInfo.values().iterator();
+          while (iter.hasNext()) {
+            TagState v = iter.next();
+            synchronized (v) {
+              if (v.bufferCount > 0) continue; // The file is still cached.
+              long deltaNs = timeNs - v.emptyTimeNs;
+              if (deltaNs < cleanupTimeNs) {
+                nextCleanupInNs = Math.min(nextCleanupInNs, deltaNs);
+                continue;
+              } else {
+                iter.remove();
+              }
+            }
+          }
+          sleepTimeMs = Math.max(MIN_TIME_MS, nextCleanupInNs / 1000000L);
+        }
+      } catch (InterruptedException ex) {
+        return; // Interrupted.
+      }
+    }
+  }
+
+  private static class TagState {
+    public TagState(String name) {
+      this.name = name;
+    }
+    public final String name;
+    public long emptyTimeNs;
+    public long bufferCount, totalSize, maxCount, maxSize;
+    public boolean isRemoved = false;
+  }
+
+
+  private void reportCached(LlapCacheableBuffer buffer) {
+    long size = buffer.getMemoryUsage();
+    TagState state;
+    do {
+       state = getTagState(buffer);
+    } while (!reportCached(state, size));
+    state = null;
+    do {
+      state = getParentTagState(buffer);
+      if (state == null) break;
+    } while (!reportCached(state, size));
+  }
+
+  private boolean reportCached(TagState state, long size) {
+    synchronized (state) {
+      if (state.isRemoved) return false;
+      ++state.bufferCount;
+      state.totalSize += size;
+      state.maxSize = Math.max(state.maxSize, state.totalSize);
+      state.maxCount = Math.max(state.maxCount, state.bufferCount);
+    }
+    return true;
+  }
+
+  private void reportRemoved(LlapCacheableBuffer buffer) {
+    long size = buffer.getMemoryUsage();
+    TagState state;
+    do {
+       state = getTagState(buffer);
+    } while (!reportRemoved(state, size));
+    state = null;
+    do {
+      state = getParentTagState(buffer);
+      if (state == null) break;
+    } while (!reportRemoved(state, size));
+  }
+
+  private boolean reportRemoved(TagState state, long size) {
+    synchronized (state) {
+      if (state.isRemoved) return false;
+      --state.bufferCount;
+      assert state.bufferCount >= 0;
+      state.totalSize -= size;
+      if (state.bufferCount == 0) {
+        state.emptyTimeNs = System.nanoTime();
+      }
+    }
+    return true;
+  }
+
+  private TagState getTagState(LlapCacheableBuffer buffer) {
+    return getTagState(buffer.getTag());
+  }
+
+  private TagState getParentTagState(LlapCacheableBuffer buffer) {
+    String tag = buffer.getTag();
+    int ix = tag.indexOf(LlapUtil.DERIVED_ENTITY_PARTITION_SEPARATOR);
+    if (ix <= 0) return null;
+    return getTagState(tag.substring(0, ix));
+  }
+
+  private TagState getTagState(String tag) {
+    TagState state = tagInfo.get(tag);
+    if (state == null) {
+      state = new TagState(tag);
+      TagState old = tagInfo.putIfAbsent(tag, state);
+      state = (old == null) ? state : old;
+    }
+    return state;
+  }
+
+
+  @Override
+  public void cache(LlapCacheableBuffer buffer, Priority priority) {
+    realPolicy.cache(buffer, priority);
+    reportCached(buffer);
+  }
+
+  @Override
+  public void notifyLock(LlapCacheableBuffer buffer) {
+    realPolicy.notifyLock(buffer);
+  }
+
+  @Override
+  public void notifyUnlock(LlapCacheableBuffer buffer) {
+    realPolicy.notifyUnlock(buffer);
+  }
+
+  @Override
+  public void setEvictionListener(EvictionListener listener) {
+    evictionListener = listener;
+  }
+
+  @Override
+  public void setParentDebugDumper(LlapOomDebugDump dumper) {
+    realPolicy.setParentDebugDumper(dumper);
+  }
+
+
+  @Override
+  public long evictSomeBlocks(long memoryToReserve) {
+    return realPolicy.evictSomeBlocks(memoryToReserve);
+  }
+
+  @Override
+  public String debugDumpForOom() {
+    return realPolicy.debugDumpForOom();
+  }
+
+  @Override
+  public void debugDumpShort(StringBuilder sb) {
+    sb.append("\nCache state: ");
+    for (TagState state : tagInfo.values()) {
+      synchronized (state) {
+        sb.append("\n").append(state.name).append(": ").append(state.bufferCount).append("/")
+          .append(state.maxCount).append(", ").append(state.totalSize).append("/")
+          .append(state.maxSize);
+      }
+    }
+    realPolicy.debugDumpShort(sb);
+  }
+
+  @Override
+  public void notifyEvicted(LlapCacheableBuffer buffer) {
+    evictionListener.notifyEvicted(buffer);
+    reportRemoved(buffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
index e976090..f91a5d9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
@@ -56,5 +56,7 @@ public abstract class LlapCacheableBuffer {
         + lastUpdate + " " + (isLocked() ? "!" : ".") + "]";
   }
 
+  public abstract String getTag();
+
   protected abstract boolean isLocked();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index 266f46e..405fca2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -24,9 +24,19 @@ public final class LlapDataBuffer extends LlapAllocatorBuffer {
   /** ORC cache uses this to store compressed length; buffer is cached uncompressed, but
    * the lookup is on compressed ranges, so we need to know this. */
   public int declaredCachedLength = UNKNOWN_CACHED_LENGTH;
+  private String tag;
 
   @Override
   public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
     evictionDispatcher.notifyEvicted(this);
   }
+
+  public void setTag(String tag) {
+    this.tag = tag;
+  }
+
+  @Override
+  public String getTag() {
+    return tag;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index a6330a3..af1b699 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -58,7 +58,7 @@ public interface LowLevelCache extends LlapOomDebugDump {
    *         the replacement chunks from cache are updated directly in the array.
    */
   long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
-      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
+      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag);
 
   /** Notifies the cache that a particular buffer should be removed due to eviction. */
   void notifyEvicted(MemoryBuffer buffer);

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index a5494c7..5e102d9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -288,7 +288,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
 
   @Override
   public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers,
-      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
+      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) {
     long[] result = null;
     assert buffers.length == ranges.length;
     FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache =
@@ -304,6 +304,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         long offset = ranges[i].getOffset() + baseOffset;
         assert buffer.declaredCachedLength == LlapDataBuffer.UNKNOWN_CACHED_LENGTH;
         buffer.declaredCachedLength = ranges[i].getLength();
+        buffer.setTag(tag);
         while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
           LlapDataBuffer oldVal = subCache.getCache().putIfAbsent(offset, buffer);
           if (oldVal == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 2659d9e..b42f761 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.cache;
 
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -153,7 +154,7 @@ public class LowLevelLrfuCachePolicy implements LowLevelCachePolicy {
         } finally {
           listLock.unlock();
         }
-        // Now insert the buffer in its place and restore heap property.
+        // Now insert the new buffer in its place and restore heap property.
         buffer.indexInHeap = 0;
         heapifyDownUnderLock(buffer, time);
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index 0871391..cb89d12 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -55,10 +55,18 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
 
   public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer {
     public boolean isCached = false;
+    private String tag;
     @Override
     public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
       evictionDispatcher.notifyEvicted(this);
     }
+    public void setTag(String tag) {
+      this.tag = tag;
+    }
+    @Override
+    public String getTag() {
+      return tag;
+    }
   }
 
   private static final class StripeInfoComparator implements
@@ -491,7 +499,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
   }
 
   public void putFileData(final FileData data, Priority priority,
-      LowLevelCacheCounters qfCounters) {
+      LowLevelCacheCounters qfCounters, String tag) {
     // TODO: buffers are accounted for at allocation time, but ideally we should report the memory
     //       overhead from the java objects to memory manager and remove it when discarding file.
     if (data.stripes == null || data.stripes.isEmpty()) {
@@ -521,7 +529,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
       }
       try {
         for (StripeData si : data.stripes) {
-          lockAllBuffersForPut(si, priority);
+          lockAllBuffersForPut(si, priority, tag);
         }
         if (data == cached) {
           if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
@@ -566,7 +574,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
     }
   }
 
-  private void lockAllBuffersForPut(StripeData si, Priority priority) {
+  private void lockAllBuffersForPut(StripeData si, Priority priority, String tag) {
     for (int i = 0; i < si.data.length; ++i) {
       LlapSerDeDataBuffer[][] colData = si.data[i];
       if (colData == null) continue;
@@ -576,6 +584,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
         for (int k = 0; k < streamData.length; ++k) {
           boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet
           assert canLock;
+          streamData[k].setTag(tag);
           cachePolicy.cache(streamData[k], priority);
           streamData[k].isCached = true;
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
index 027e414..a1b6cae 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -84,9 +84,8 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
   }
 
   @Override
-  public long[] putFileData(Object fileKey, DiskRange[] ranges,
-      MemoryBuffer[] chunks, long baseOffset, Priority priority,
-      LowLevelCacheCounters qfCounters) {
+  public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
+      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) {
     for (int i = 0; i < chunks.length; ++i) {
       LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)chunks[i];
       if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 5a397be..e5bc3c2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
+import org.apache.hadoop.hive.llap.cache.CacheContentsTracker;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
 import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
@@ -76,6 +77,7 @@ import org.apache.hive.common.util.FixedSizedObjectPool;
 
 
 
+
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -139,6 +141,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       int minAllocSize = (int)HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
       LowLevelCachePolicy cachePolicy = useLrfu ? new LowLevelLrfuCachePolicy(
           minAllocSize, totalMemorySize, conf) : new LowLevelFifoCachePolicy();
+      boolean trackUsage = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE);
+      if (trackUsage) {
+        cachePolicy = new CacheContentsTracker(cachePolicy);
+      }
       // Allocator uses memory manager to request memory, so create the manager next.
       LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager(
           totalMemorySize, cachePolicy, cacheMetrics);
@@ -256,7 +262,14 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     @Override
     public long[] putFileData(Object fileKey, DiskRange[] ranges,
         MemoryBuffer[] data, long baseOffset) {
-      return lowLevelCache.putFileData(fileKey, ranges, data, baseOffset, Priority.NORMAL, null);
+      return putFileData(fileKey, ranges, data, baseOffset, null);
+    }
+
+    @Override
+    public long[] putFileData(Object fileKey, DiskRange[] ranges,
+        MemoryBuffer[] data, long baseOffset, String tag) {
+      return lowLevelCache.putFileData(
+          fileKey, ranges, data, baseOffset, Priority.NORMAL, null, tag);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 89df943..afb8fc5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
@@ -166,6 +167,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private EncodedReader stripeReader;
   private CompressionCodec codec;
   private Object fileKey;
+  private final String cacheTag;
   private FileSystem fs;
 
   /**
@@ -206,6 +208,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
 
     // LlapInputFormat needs to know the file schema to decide if schema evolution is supported.
     orcReader = null;
+    cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
+        ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
     fs = split.getPath().getFileSystem(jobConf);
@@ -268,7 +272,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       recordReaderTime(startTime);
       return null;
     }
-    counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath()));
+    counters.setDesc(QueryFragmentCounters.Desc.TABLE,
+        LlapUtil.getDbAndTableNameForMetrics(split.getPath(), false));
     counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
         + (fileKey == null ? "" : " (" + fileKey + ")"));
     try {
@@ -429,7 +434,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     // Reader creation updates HDFS counters, don't do it here.
     DataWrapperForOrc dw = new DataWrapperForOrc();
     stripeReader = orcReader.encodedReader(
-        fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool);
+        fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag);
     stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
   }
 
@@ -437,49 +442,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
   }
 
-  private static String getDbAndTableName(Path path) {
-    // Ideally, we'd get this from split; however, split doesn't contain any such thing and it's
-    // actually pretty hard to get cause even split generator only uses paths. We only need this
-    // for metrics; therefore, brace for BLACK MAGIC!
-    String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
-    int dbIx = -1;
-    // Try to find the default db postfix; don't check two last components - at least there
-    // should be a table and file (we could also try to throw away partition/bucket/acid stuff).
-    for (int i = 0; i < parts.length - 2; ++i) {
-      if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue;
-      if (dbIx >= 0) {
-        dbIx = -1; // Let's not guess.
-        break;
-      }
-      dbIx = i;
-    }
-    if (dbIx >= 0) {
-      return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1];
-    }
-
-    // Just go from the back and throw away everything we think is wrong; skip last item, the file.
-    boolean isInPartFields = false;
-    for (int i = parts.length - 2; i >= 0; --i) {
-      String p = parts[i];
-      boolean isPartField = p.contains("=");
-      if ((isInPartFields && !isPartField) || (!isPartField && !p.startsWith(AcidUtils.BASE_PREFIX)
-          && !p.startsWith(AcidUtils.DELTA_PREFIX) && !p.startsWith(AcidUtils.BUCKET_PREFIX))) {
-        dbIx = i - 1;
-        break;
-      }
-      isInPartFields = isPartField;
-    }
-    // If we found something before we ran out of components, use it.
-    if (dbIx >= 0) {
-      String dbName = parts[dbIx];
-      if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) {
-        dbName = dbName.substring(0, dbName.length() - 3);
-      }
-      return dbName + "." + parts[dbIx + 1];
-    }
-    return "unknown";
-  }
-
   private void validateFileMetadata() throws IOException {
     if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
     int bufferSize = fileMetadata.getCompressionBufferSize();
@@ -525,6 +487,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     if (rawDataReader != null && isRawDataReaderOpen) {
       try {
         rawDataReader.close();
+        rawDataReader = null;
       } catch (IOException ex) {
         // Ignore.
       }
@@ -620,7 +583,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     ensureOrcReader();
     ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
     if (hasCache) {
-      tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb);
+      tailBuffers = metadataCache.putFileMetadata(fileKey, tailBufferBb, cacheTag);
       metadataCache.decRefBuffer(tailBuffers); // We don't use the cache's copy of the buffer.
     }
     FileTail ft = orcReader.getFileTail();
@@ -713,7 +676,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     assert footerRange.next == null; // Can only happens w/zcr for a single input buffer.
     if (hasCache) {
       LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(
-          stripeKey, footerRange.getData().duplicate());
+          stripeKey, footerRange.getData().duplicate(), cacheTag);
       metadataCache.decRefBuffer(cacheBuf); // We don't use this one.
     }
     ByteBuffer bb = footerRange.getData().duplicate();
@@ -941,9 +904,15 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     @Override
     public long[] putFileData(Object fileKey, DiskRange[] ranges,
         MemoryBuffer[] data, long baseOffset) {
+      return putFileData(fileKey, ranges, data, baseOffset, null);
+    }
+
+    @Override
+    public long[] putFileData(Object fileKey, DiskRange[] ranges,
+        MemoryBuffer[] data, long baseOffset, String tag) {
       if (data != null) {
         return lowLevelCache.putFileData(
-            fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
+            fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag);
       } else if (metadataCache != null) {
         metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 506146b..8b89ae9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
@@ -146,6 +147,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   private final Map<Path, PartitionDesc> parts;
 
   private final Object fileKey;
+  private final String cacheTag;
   private final FileSystem fs;
 
   private volatile boolean isStopped = false;
@@ -212,6 +214,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     fileKey = determineFileId(fs, split,
         HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
         HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID));
+    cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
+        ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
     this.sourceInputFormat = sourceInputFormat;
     this.sourceSerDe = sourceSerDe;
     this.reporter = reporter;
@@ -735,7 +739,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
       }
       FileData fd = new FileData(fileKey, encodings.length);
       fd.addStripe(sd);
-      cache.putFileData(fd, Priority.NORMAL, counters);
+      cache.putFileData(fd, Priority.NORMAL, counters, cacheTag);
     } else {
       lockAllBuffers(sd);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
index cfb3e42..0184e30 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.llap.io.metadata;
 
 import org.apache.hadoop.hive.common.FileUtils;
-
 import org.apache.hadoop.hive.common.io.FileMetadataCache;
 
 import java.io.IOException;
@@ -158,22 +157,33 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
 
   @Override
   public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer) {
-    return putInternal(fileKey, tailBuffer);
+    return putInternal(fileKey, tailBuffer, null);
   }
 
-  public LlapBufferOrBuffers putStripeTail(OrcBatchKey stripeKey, ByteBuffer tailBuffer) {
-    return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer);
+  @Override
+  public LlapBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag) {
+    return putInternal(fileKey, tailBuffer, tag);
   }
 
+  public LlapBufferOrBuffers putStripeTail(
+      OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag) {
+    return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag);
+  }
 
   @Override
   public LlapBufferOrBuffers putFileMetadata(
       Object fileKey, int length, InputStream is) throws IOException {
+    return putFileMetadata(fileKey, length, is, null);
+  }
+
+  @Override
+  public LlapBufferOrBuffers putFileMetadata(
+      Object fileKey, int length, InputStream is, String tag) throws IOException {
     LlapBufferOrBuffers result = null;
     while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
       LlapBufferOrBuffers oldVal = metadata.get(fileKey);
       if (oldVal == null) {
-        result = wrapBbForFile(result, fileKey, length, is);
+        result = wrapBbForFile(result, fileKey, length, is, tag);
         if (!lockBuffer(result, false)) {
           throw new AssertionError("Cannot lock a newly created value " + result);
         }
@@ -194,14 +204,14 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
-      Object fileKey, int length, InputStream stream) throws IOException {
+      Object fileKey, int length, InputStream stream, String tag) throws IOException {
     if (result != null) return result;
     int maxAlloc = allocator.getMaxAllocation();
     LlapMetadataBuffer<Object>[] largeBuffers = null;
     if (maxAlloc < length) {
       largeBuffers = new LlapMetadataBuffer[length / maxAlloc];
       for (int i = 0; i < largeBuffers.length; ++i) {
-        largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey);
+        largeBuffers[i] = new LlapMetadataBuffer<Object>(fileKey, tag);
       }
       allocator.allocateMultiple(largeBuffers, maxAlloc, null);
       for (int i = 0; i < largeBuffers.length; ++i) {
@@ -213,7 +223,7 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
       return new LlapMetadataBuffers(largeBuffers);
     } else {
       LlapMetadataBuffer<Object>[] smallBuffer = new LlapMetadataBuffer[1];
-      smallBuffer[0] = new LlapMetadataBuffer(fileKey);
+      smallBuffer[0] = new LlapMetadataBuffer(fileKey, tag);
       allocator.allocateMultiple(smallBuffer, length, null);
       readIntoCacheBuffer(stream, smallSize, smallBuffer[0]);
       if (largeBuffers == null) {
@@ -239,12 +249,12 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
     bb.position(pos);
   }
 
-  private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer) {
+  private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag) {
     LlapBufferOrBuffers result = null;
     while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
       LlapBufferOrBuffers oldVal = metadata.get(key);
       if (oldVal == null) {
-        result = wrapBb(result, key, tailBuffer);
+        result = wrapBb(result, key, tailBuffer, tag);
         oldVal = metadata.putIfAbsent(key, result);
         if (oldVal == null) {
           cacheInPolicy(result); // Cached successfully, add to policy.
@@ -302,17 +312,17 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
   }
 
   private <T> LlapBufferOrBuffers wrapBb(
-      LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer) {
+      LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag) {
     if (result != null) return result;
     if (tailBuffer.remaining() <= allocator.getMaxAllocation()) {
       // The common case by far.
-      return wrapSmallBb(new LlapMetadataBuffer<T>(key), tailBuffer);
+      return wrapSmallBb(new LlapMetadataBuffer<T>(key, tag), tailBuffer);
     } else {
       int allocCount = determineAllocCount(tailBuffer);
       @SuppressWarnings("unchecked")
       LlapMetadataBuffer<T>[] results = new LlapMetadataBuffer[allocCount];
       for (int i = 0; i < allocCount; ++i) {
-        results[i] = new LlapMetadataBuffer<T>(key);
+        results[i] = new LlapMetadataBuffer<T>(key, tag);
       }
       wrapLargeBb(results, tailBuffer);
       return new LlapMetadataBuffers<T>(results);
@@ -470,9 +480,11 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
   public final static class LlapMetadataBuffer<T>
       extends LlapAllocatorBuffer implements LlapBufferOrBuffers {
     private final T key;
+    private String tag;
 
-    public LlapMetadataBuffer(T key) {
+    public LlapMetadataBuffer(T key, String tag) {
       this.key = key;
+      this.tag = tag;
     }
 
     @Override
@@ -504,6 +516,11 @@ public class MetadataCache implements LlapOomDebugDump, FileMetadataCache {
     public LlapAllocatorBuffer[] getMultipleLlapBuffers() {
       return null;
     }
+
+    @Override
+    public String getTag() {
+      return tag;
+    }
   }
 
   public final static class LlapMetadataBuffers<T> implements LlapBufferOrBuffers {

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index fd8839a..2f7fa24 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -121,4 +121,10 @@ public class OrcFileEstimateErrors extends LlapCacheableBuffer {
   protected boolean isLocked() {
     return false;
   }
+
+  @Override
+  public String getTag() {
+    // We don't care about these.
+    return "OrcEstimates";
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 6a4b598..2c87bc2 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -130,9 +130,9 @@ Example code to test specific scenarios:
         LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(),
         new DummyAllocator(), true, -1); // no cleanup thread
     final int FILE = 1;
-    cache.putFileData(FILE, gaps(3756206, 4261729, 7294767, 7547564), fbs(3), 0, Priority.NORMAL, null);
-    cache.putFileData(FILE, gaps(7790545, 11051556), fbs(1), 0, Priority.NORMAL, null);
-    cache.putFileData(FILE, gaps(11864971, 11912961, 13350968, 13393630), fbs(3), 0, Priority.NORMAL, null);
+    cache.putFileData(FILE, gaps(3756206, 4261729, 7294767, 7547564), fbs(3), 0, Priority.NORMAL, null, null);
+    cache.putFileData(FILE, gaps(7790545, 11051556), fbs(1), 0, Priority.NORMAL, null, null);
+    cache.putFileData(FILE, gaps(11864971, 11912961, 13350968, 13393630), fbs(3), 0, Priority.NORMAL, null, null);
     DiskRangeList dr = dr(3756206, 7313562);
     MutateHelper mh = new MutateHelper(dr);
     dr = dr.insertAfter(dr(7790545, 11051556));
@@ -149,14 +149,14 @@ Example code to test specific scenarios:
     long fn1 = 1, fn2 = 2;
     MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() };
     verifyRefcount(fakes, 1, 1, 1, 1, 1, 1);
-    assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null));
-    assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3), 0, Priority.NORMAL, null));
+    assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null, null));
+    assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3), 0, Priority.NORMAL, null, null));
     verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
     verifyCacheGet(cache, fn2, 1, 3, fakes[2], fakes[3]);
     verifyCacheGet(cache, fn1, 2, 4, fakes[1], dr(3, 4));
     verifyRefcount(fakes, 3, 4, 3, 3, 1, 1);
     MemoryBuffer[] bufsDiff = fbs(fakes, 4, 5);
-    long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff, 0, Priority.NORMAL, null);
+    long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff, 0, Priority.NORMAL, null, null);
     assertEquals(1, mask.length);
     assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache.
     assertSame(fakes[0], bufsDiff[1]); // Should have been replaced
@@ -207,7 +207,7 @@ Example code to test specific scenarios:
     long fn = 1;
     MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb() };
     assertNull(cache.putFileData(
-        fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null));
+        fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null, null));
     verifyCacheGet(cache, fn, 1, 9, dr(1, 2), fakes[0], dr(4, 6), fakes[1], dr(8, 9));
     verifyCacheGet(cache, fn, 2, 8, fakes[0], dr(4, 6), fakes[1]);
     verifyCacheGet(cache, fn, 1, 5, dr(1, 2), fakes[0], dr(4, 5));
@@ -226,7 +226,7 @@ Example code to test specific scenarios:
     long fn = 1;
     MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb() };
     assertNull(cache.putFileData(
-        fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null));
+        fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null, null));
     // We expect cache requests from the middle here
     verifyCacheGet(cache, fn, 3, 4, fakes[0]);
     verifyCacheGet(cache, fn, 3, 7, fakes[0], dr(4, 6), fakes[1]);
@@ -239,8 +239,8 @@ Example code to test specific scenarios:
         new DummyAllocator(), true, -1); // no cleanup thread
     long fn1 = 1, fn2 = 2;
     MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb() };
-    assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null));
-    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2), 0, Priority.NORMAL, null));
+    assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null, null));
+    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2), 0, Priority.NORMAL, null, null));
     verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
     verifyCacheGet(cache, fn2, 1, 2, fakes[2]);
     verifyRefcount(fakes, 3, 3, 3);
@@ -259,15 +259,15 @@ Example code to test specific scenarios:
     long fn1 = 1, fn2 = 2;
     MemoryBuffer[] fakes = new MemoryBuffer[] {
         fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() };
-    assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2), 0, Priority.NORMAL, null));
-    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3), 0, Priority.NORMAL, null));
+    assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2), 0, Priority.NORMAL, null, null));
+    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3), 0, Priority.NORMAL, null, null));
     evict(cache, fakes[0]);
     evict(cache, fakes[3]);
     long[] mask = cache.putFileData(
-        fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7), 0, Priority.NORMAL, null);
+        fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7), 0, Priority.NORMAL, null, null);
     assertEquals(1, mask.length);
     assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and is stale; 4 doesn't
-    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8), 0, Priority.NORMAL, null));
+    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8), 0, Priority.NORMAL, null, null));
     verifyCacheGet(cache, fn1, 1, 5, fakes[4], fakes[1], fakes[2], fakes[7]);
   }
 
@@ -304,7 +304,7 @@ Example code to test specific scenarios:
     long fn = 1;
     MemoryBuffer[] fakes = new MemoryBuffer[]{fb(), fb(), fb()};
     cache.putFileData(fn, new DiskRange[]{dr(0, 100), dr(300, 500), dr(800, 1000)},
-        fakes, 0, Priority.NORMAL, null);
+        fakes, 0, Priority.NORMAL, null, null);
     assertEquals(0, metrics.getCacheRequestedBytes());
     assertEquals(0, metrics.getCacheHitBytes());
     list = new CreateHelper();
@@ -390,7 +390,7 @@ Example code to test specific scenarios:
                 buf.setNewAllocLocation(makeFakeArenaIndex(fileIndex, offsets[j]), 0);
                 buffers[j] = buf;
               }
-              long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null);
+              long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null, null);
               puts += buffers.length;
               long maskVal = 0;
               if (mask != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index 62a174a..c75dd70 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -61,10 +61,10 @@ public class LlapCacheAwareFs extends FileSystem {
       new ConcurrentHashMap<>();
 
   public static Path registerFile(DataCache cache, Path path, Object fileKey,
-      TreeMap<Long, Long> index, Configuration conf) throws IOException {
+      TreeMap<Long, Long> index, Configuration conf, String tag) throws IOException {
     long splitId = currentSplitId.incrementAndGet();
     CacheAwareInputStream stream = new CacheAwareInputStream(
-        cache, conf, index, path, fileKey, -1);
+        cache, conf, index, path, fileKey, -1, tag);
     if (files.putIfAbsent(splitId, stream) != null) {
       throw new IOException("Record already exists for " + splitId);
     }
@@ -166,23 +166,25 @@ public class LlapCacheAwareFs extends FileSystem {
     private final TreeMap<Long, Long> chunkIndex;
     private final Path path;
     private final Object fileKey;
+    private final String tag;
     private final Configuration conf;
     private final DataCache cache;
     private final int bufferSize;
     private long position = 0;
 
     public CacheAwareInputStream(DataCache cache, Configuration conf,
-        TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize) {
+        TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize, String tag) {
       this.cache = cache;
       this.fileKey = fileKey;
       this.chunkIndex = chunkIndex;
       this.path = path;
       this.conf = conf;
       this.bufferSize = bufferSize;
+      this.tag = tag;
     }
 
     public LlapCacheAwareFs.CacheAwareInputStream cloneWithBufferSize(int bufferSize) {
-      return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize);
+      return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize, tag);
     }
 
     @Override
@@ -307,7 +309,7 @@ public class LlapCacheAwareFs extends FileSystem {
               }
               smallBuffer = null;
             }
-            cache.putFileData(fileKey, cacheRanges, newCacheData, 0);
+            cache.putFileData(fileKey, cacheRanges, newCacheData, 0, tag);
           } finally {
             // We do not use the new cache buffers for the actual read, given the way read() API is.
             // Therefore, we don't need to handle cache collisions - just decref all the buffers.

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 462c62f..4e17394 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -62,6 +62,8 @@ import org.apache.orc.impl.BufferChunk;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace.RangesSrc;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
 import org.apache.orc.OrcProto;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -130,7 +132,7 @@ class EncodedReaderImpl implements EncodedReader {
   private final Object fileKey;
   private final DataReader dataReader;
   private boolean isDataReaderOpen = false;
-  private final CompressionCodec codec;
+  private CompressionCodec codec;
   private final boolean isCodecFromPool;
   private boolean isCodecFailure = false;
   private final boolean isCompressed;
@@ -143,11 +145,12 @@ class EncodedReaderImpl implements EncodedReader {
   private final IoTrace trace;
   private final TypeDescription fileSchema;
   private final WriterVersion version;
+  private final String tag;
 
   public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
       TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
       int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException {
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException {
     this.fileKey = fileKey;
     this.compressionKind = kind;
     this.isCompressed = kind != org.apache.orc.CompressionKind.NONE;
@@ -161,6 +164,7 @@ class EncodedReaderImpl implements EncodedReader {
     this.cacheWrapper = cacheWrapper;
     this.dataReader = dataReader;
     this.trace = trace;
+    this.tag = tag;
     if (POOLS != null) return;
     if (pf == null) {
       pf = new NoopPoolFactory();
@@ -686,6 +690,7 @@ class EncodedReaderImpl implements EncodedReader {
       } else {
         codec.close();
       }
+      codec = null;
     } catch (Exception ex) {
       LOG.error("Ignoring error from codec", ex);
     } finally {
@@ -849,7 +854,7 @@ class EncodedReaderImpl implements EncodedReader {
     if (badEstimates != null && !badEstimates.isEmpty()) {
       // Relies on the fact that cache does not actually store these.
       DiskRange[] cacheKeys = badEstimates.toArray(new DiskRange[badEstimates.size()]);
-      long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset);
+      long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset, tag);
       assert result == null; // We don't expect conflicts from bad estimates.
     }
 
@@ -909,7 +914,7 @@ class EncodedReaderImpl implements EncodedReader {
     // 6. Finally, put uncompressed data to cache.
     if (fileKey != null) {
       long[] collisionMask = cacheWrapper.putFileData(
-          fileKey, cacheKeys, targetBuffers, baseOffset);
+          fileKey, cacheKeys, targetBuffers, baseOffset, tag);
       processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
     }
 
@@ -1163,7 +1168,8 @@ class EncodedReaderImpl implements EncodedReader {
 
     // 5. Put uncompressed data to cache.
     if (fileKey != null) {
-      long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
+      long[] collisionMask = cacheWrapper.putFileData(
+          fileKey, cacheKeys, targetBuffers, baseOffset, tag);
       processCacheCollisions(collisionMask, toCache, targetBuffers, null);
     }
 
@@ -1261,6 +1267,7 @@ class EncodedReaderImpl implements EncodedReader {
     }
     codec.reset(); // We always need to call reset on the codec.
     codec.decompress(src, dest);
+
     dest.position(startPos);
     int newLim = dest.limit();
     if (newLim > startLim) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 57fb63b..210c987 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -46,7 +46,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
    * @return The reader.
    */
   EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException;
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException;
 
   /** The factory that can create (or return) the pools used by encoded reader. */
   public interface PoolFactory {

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index 49cd9ba..a9a9f10 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -35,8 +35,8 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
 
   @Override
   public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException {
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException {
     return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(),
-        bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool);
+        bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 7b77eee..f64efe2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -25,10 +25,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.FileMetadataCache;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
-
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapCacheAwareFs;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -188,6 +188,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
 
     // if task.side.metadata is set, rowGroupOffsets is null
     Object cacheKey = null;
+    String cacheTag = null;
     // TODO: also support fileKey in splits, like OrcSplit does
     if (metadataCache != null) {
       cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file,
@@ -195,6 +196,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
         HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID));
     }
     if (cacheKey != null) {
+      if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) {
+        cacheTag = LlapUtil.getDbAndTableNameForMetrics(file, true);
+      }
       // If we are going to use cache, change the path to depend on file ID for extra consistency.
       FileSystem fs = file.getFileSystem(configuration);
       if (cacheKey instanceof Long && HiveConf.getBoolVar(
@@ -207,13 +211,13 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       //TODO check whether rowGroupOffSets can be null
       // then we need to apply the predicate push down filter
       footer = readSplitFooter(
-          configuration, file, cacheKey, range(split.getStart(), split.getEnd()));
+          configuration, file, cacheKey, range(split.getStart(), split.getEnd()), cacheTag);
       MessageType fileSchema = footer.getFileMetaData().getSchema();
       FilterCompat.Filter filter = getFilter(configuration);
       blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
     } else {
       // otherwise we find the row groups that were selected on the client
-      footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER);
+      footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER, cacheTag);
       Set<Long> offsets = new HashSet<>();
       for (long offset : rowGroupOffsets) {
         offsets.add(offset);
@@ -250,13 +254,13 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     requestedSchema = DataWritableReadSupport
       .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration);
  
-    Path path = wrapPathForCache(file, cacheKey, configuration, blocks);
+    Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag);
     this.reader = new ParquetFileReader(
       configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
   }
 
   private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration,
-      List<BlockMetaData> blocks) throws IOException {
+      List<BlockMetaData> blocks, String tag) throws IOException {
     if (fileKey == null || cache == null) {
       return path;
     }
@@ -277,13 +281,13 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     // Register the cache-aware path so that Parquet reader would go thru it.
     configuration.set("fs." + LlapCacheAwareFs.SCHEME + ".impl",
         LlapCacheAwareFs.class.getCanonicalName());
-    path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration);
+    path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration, tag);
     this.cacheFsPath = path;
     return path;
   }
 
-  private ParquetMetadata readSplitFooter(
-      JobConf configuration, final Path file, Object cacheKey, MetadataFilter filter) throws IOException {
+  private ParquetMetadata readSplitFooter(JobConf configuration, final Path file,
+      Object cacheKey, MetadataFilter filter, String tag) throws IOException {
     MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null
         : metadataCache.getFileMetadata(cacheKey);
     if (footerData != null) {
@@ -313,7 +317,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       if (LOG.isInfoEnabled()) {
         LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey);
       }
-      footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream);
+      footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag);
       try {
         return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
       } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
index 795739b..2ac0a18 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
@@ -60,25 +60,7 @@ public interface DataCache {
   DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset,
       DiskRangeListFactory factory, BooleanRef gotAllData);
 
-  /**
-   * Puts file data into cache, or gets older data in case of collisions.
-   *
-   * The memory buffers provided MUST be allocated via an allocator returned by getAllocator
-   * method, to allow cache implementations that evict and then de-allocate the buffer.
-   *
-   * It is assumed that the caller will use the data immediately, therefore any buffers provided
-   * to putFileData (or returned due to cache collision) are locked in cache to prevent eviction,
-   * and must therefore be released back to cache via a corresponding call (releaseBuffer) when the
-   * caller is done with it. Buffers rejected due to conflict will neither be locked, nor
-   * automatically deallocated. The caller must take care to discard these buffers.
-   *
-   * @param fileKey Unique ID of the target file on the file system.
-   * @param ranges The ranges for which the data is being cached. These objects will not be stored.
-   * @param data The data for the corresponding ranges.
-   * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC).
-   * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
-   *         the replacement chunks from cache are updated directly in the array.
-   */
+  @Deprecated
   long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset);
 
   /**
@@ -106,4 +88,27 @@ public interface DataCache {
    * @return the factory
    */
   Allocator.BufferObjectFactory getDataBufferFactory();
+
+
+  /**
+   * Puts file data into cache, or gets older data in case of collisions.
+   *
+   * The memory buffers provided MUST be allocated via an allocator returned by getAllocator
+   * method, to allow cache implementations that evict and then de-allocate the buffer.
+   *
+   * It is assumed that the caller will use the data immediately, therefore any buffers provided
+   * to putFileData (or returned due to cache collision) are locked in cache to prevent eviction,
+   * and must therefore be released back to cache via a corresponding call (releaseBuffer) when the
+   * caller is done with it. Buffers rejected due to conflict will neither be locked, nor
+   * automatically deallocated. The caller must take care to discard these buffers.
+   *
+   * @param fileKey Unique ID of the target file on the file system.
+   * @param ranges The ranges for which the data is being cached. These objects will not be stored.
+   * @param data The data for the corresponding ranges.
+   * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC).
+   * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
+   *         the replacement chunks from cache are updated directly in the array.
+   */
+  long[] putFileData(Object fileKey, DiskRange[] ranges,
+      MemoryBuffer[] data, long baseOffset, String tag);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f40d9447/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
index b417bd3..d1da7f5 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
@@ -32,17 +32,11 @@ public interface FileMetadataCache {
    */
   MemoryBufferOrBuffers getFileMetadata(Object fileKey);
 
-  /**
-   * Puts the metadata for a given file (e.g. a footer buffer into cache).
-   * @param fileKey The file key.
-   * @param length The footer length.
-   * @param is The stream to read the footer from.
-   * @return The buffer or buffers representing the cached footer.
-   *         The caller must decref this buffer when done.
-   */
+  @Deprecated
   MemoryBufferOrBuffers putFileMetadata(
       Object fileKey, int length, InputStream is) throws IOException;
 
+  @Deprecated
   MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer);
 
   /**
@@ -50,4 +44,18 @@ public interface FileMetadataCache {
    * @param buffer The buffer to release.
    */
   void decRefBuffer(MemoryBufferOrBuffers buffer);
+
+
+  /**
+   * Puts the metadata for a given file (e.g. a footer buffer into cache).
+   * @param fileKey The file key.
+   * @param length The footer length.
+   * @param is The stream to read the footer from.
+   * @return The buffer or buffers representing the cached footer.
+   *         The caller must decref this buffer when done.
+   */
+  MemoryBufferOrBuffers putFileMetadata(
+      Object fileKey, int length, InputStream is, String tag) throws IOException;
+
+  MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag);
 } 
\ No newline at end of file