You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2019/10/15 09:09:52 UTC

[hive] branch master updated: HIVE-22284: Improve LLAP CacheContentsTracker to collect and display correct statistics (Adam Szita, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fa1e91  HIVE-22284: Improve LLAP CacheContentsTracker to collect and display correct statistics (Adam Szita, reviewed by Peter Vary)
6fa1e91 is described below

commit 6fa1e911979d0bf0d4cd98216f3b9f89bd95b706
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Tue Oct 15 11:01:19 2019 +0200

    HIVE-22284: Improve LLAP CacheContentsTracker to collect and display correct statistics (Adam Szita, reviewed by Peter Vary)
---
 .../java/org/apache/hadoop/hive/llap/LlapUtil.java |   3 +-
 .../hive/llap/cache/CacheContentsTracker.java      | 110 +++++----
 .../hive/llap/cache/LlapCacheableBuffer.java       |   4 +-
 .../hadoop/hive/llap/cache/LlapDataBuffer.java     |   8 +-
 .../hadoop/hive/llap/cache/LowLevelCache.java      |   3 +-
 .../hadoop/hive/llap/cache/LowLevelCacheImpl.java  |  11 +-
 .../hive/llap/cache/SerDeLowLevelCacheImpl.java    |  11 +-
 .../hive/llap/cache/SimpleBufferManager.java       |   3 +-
 .../hadoop/hive/llap/io/api/impl/LlapIoImpl.java   |   3 +-
 .../hive/llap/io/api/impl/LlapRecordReader.java    |  40 +---
 .../llap/io/decode/OrcColumnVectorProducer.java    |   4 +-
 .../hive/llap/io/encoded/OrcEncodedDataReader.java |  18 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java    |  16 +-
 .../hive/llap/io/metadata/MetadataCache.java       |  25 +-
 .../llap/io/metadata/OrcFileEstimateErrors.java    |   5 +-
 .../hive/llap/cache/TestCacheContentsTracker.java  | 173 ++++++++++++++
 .../apache/hadoop/hive/llap/LlapCacheAwareFs.java  |   7 +-
 .../org/apache/hadoop/hive/llap/LlapHiveUtils.java | 110 +++++++++
 .../hive/ql/io/orc/encoded/EncodedReaderImpl.java  |   5 +-
 .../hadoop/hive/ql/io/orc/encoded/Reader.java      |   3 +-
 .../hadoop/hive/ql/io/orc/encoded/ReaderImpl.java  |   3 +-
 .../vector/VectorizedParquetRecordReader.java      |  24 +-
 .../org/apache/hadoop/hive/common/io/CacheTag.java | 263 +++++++++++++++++++++
 .../apache/hadoop/hive/common/io/DataCache.java    |   2 +-
 .../hadoop/hive/common/io/FileMetadataCache.java   |   8 +-
 25 files changed, 708 insertions(+), 154 deletions(-)

diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index a351a19..c26ab62 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -314,8 +314,7 @@ public class LlapUtil {
       DELETE_DELTA_PREFIX = "delete_delta_", BUCKET_PREFIX = "bucket_",
       DATABASE_PATH_SUFFIX = ".db", UNION_SUDBIR_PREFIX = "HIVE_UNION_SUBDIR_";
 
-  public static final char DERIVED_ENTITY_PARTITION_SEPARATOR = '/';
-
+  @Deprecated
   public static String getDbAndTableNameForMetrics(Path path, boolean includeParts) {
     String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
     int dbIx = -1;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
index 64c0125..733b30c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
@@ -18,19 +18,24 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 
+import static java.util.stream.Collectors.joining;
+
 /**
  * A wrapper around cache eviction policy that tracks cache contents via tags.
  */
 public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListener {
   private static final long CLEANUP_TIME_MS = 3600 * 1000L, MIN_TIME_MS = 300 * 1000L;
 
-  private final ConcurrentSkipListMap<String, TagState> tagInfo = new ConcurrentSkipListMap<>();
+  private final ConcurrentSkipListMap<CacheTag, TagState> tagInfo = new ConcurrentSkipListMap<>();
   private EvictionListener evictionListener;
   private LowLevelCachePolicy realPolicy;
   private final Thread cleanupThread;
@@ -75,56 +80,37 @@ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListen
   }
 
   private static class TagState {
-    public TagState(String name) {
-      this.name = name;
+    TagState(CacheTag cacheTag) {
+      this.cacheTag = cacheTag;
     }
-    public final String name;
+    public final CacheTag cacheTag;
     public long emptyTimeNs;
     public long bufferCount, totalSize, maxCount, maxSize;
-    public boolean isRemoved = false;
   }
 
-
   private void reportCached(LlapCacheableBuffer buffer) {
     long size = buffer.getMemoryUsage();
-    TagState state;
-    do {
-       state = getTagState(buffer);
-    } while (!reportCached(state, size));
-    state = null;
-    do {
-      state = getParentTagState(buffer);
-      if (state == null) break;
-    } while (!reportCached(state, size));
-  }
-
-  private boolean reportCached(TagState state, long size) {
+    TagState state = getTagState(buffer);
+    reportCached(state, size);
+  }
+
+  private void reportCached(TagState state, long size) {
     synchronized (state) {
-      if (state.isRemoved) return false;
       ++state.bufferCount;
       state.totalSize += size;
       state.maxSize = Math.max(state.maxSize, state.totalSize);
       state.maxCount = Math.max(state.maxCount, state.bufferCount);
     }
-    return true;
   }
 
   private void reportRemoved(LlapCacheableBuffer buffer) {
     long size = buffer.getMemoryUsage();
-    TagState state;
-    do {
-       state = getTagState(buffer);
-    } while (!reportRemoved(state, size));
-    state = null;
-    do {
-      state = getParentTagState(buffer);
-      if (state == null) break;
-    } while (!reportRemoved(state, size));
-  }
-
-  private boolean reportRemoved(TagState state, long size) {
+    TagState state = getTagState(buffer);
+    reportRemoved(state, size);
+  }
+
+  private void reportRemoved(TagState state, long size) {
     synchronized (state) {
-      if (state.isRemoved) return false;
       --state.bufferCount;
       assert state.bufferCount >= 0;
       state.totalSize -= size;
@@ -132,21 +118,13 @@ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListen
         state.emptyTimeNs = System.nanoTime();
       }
     }
-    return true;
   }
 
   private TagState getTagState(LlapCacheableBuffer buffer) {
     return getTagState(buffer.getTag());
   }
 
-  private TagState getParentTagState(LlapCacheableBuffer buffer) {
-    String tag = buffer.getTag();
-    int ix = tag.indexOf(LlapUtil.DERIVED_ENTITY_PARTITION_SEPARATOR);
-    if (ix <= 0) return null;
-    return getTagState(tag.substring(0, ix));
-  }
-
-  private TagState getTagState(String tag) {
+  private TagState getTagState(CacheTag tag) {
     TagState state = tagInfo.get(tag);
     if (state == null) {
       state = new TagState(tag);
@@ -191,14 +169,51 @@ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListen
 
   @Override
   public void debugDumpShort(StringBuilder sb) {
-    sb.append("\nCache state: ");
+    ArrayList<String> endResult = new ArrayList<>();
+    Map<CacheTag, TagState> summaries = new TreeMap<>();
+
     for (TagState state : tagInfo.values()) {
       synchronized (state) {
-        sb.append("\n").append(state.name).append(": ").append(state.bufferCount).append("/")
-          .append(state.maxCount).append(", ").append(state.totalSize).append("/")
-          .append(state.maxSize);
+        endResult.add(unsafePrintTagState(state));
+
+        // Handle summary calculation
+        CacheTag parentTag = CacheTag.createParentCacheTag(state.cacheTag);
+        while (parentTag != null) {
+          if (!summaries.containsKey(parentTag)) {
+            summaries.put(parentTag, new TagState(parentTag));
+          }
+          TagState parentState = summaries.get(parentTag);
+          parentState.bufferCount += state.bufferCount;
+          parentState.maxCount += state.maxCount;
+          parentState.totalSize += state.totalSize;
+          parentState.maxSize += state.maxSize;
+          parentTag = CacheTag.createParentCacheTag(parentTag);
+        }
       }
     }
+    for (TagState state : summaries.values()) {
+      endResult.add(unsafePrintTagState(state));
+    }
+    sb.append("\nCache state: \n");
+    sb.append(endResult.stream().sorted().collect(joining("\n")));
+  }
+
+  /**
+   * Constructs a String by pretty printing a TagState instance - for Web UI consumption.
+   * Note: does not lock on TagState instance.
+   * @param state
+   * @return
+   */
+  private String unsafePrintTagState(TagState state) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(state.cacheTag.getTableName());
+    if (state.cacheTag instanceof CacheTag.PartitionCacheTag) {
+      sb.append("/").append(String.join("/",
+          ((CacheTag.PartitionCacheTag) state.cacheTag).partitionDescToString()));
+    }
+    sb.append(" : ").append(state.bufferCount).append("/").append(state.maxCount).append(", ")
+            .append(state.totalSize).append("/").append(state.maxSize);
+    return sb.toString();
   }
 
   @Override
@@ -206,4 +221,5 @@ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListen
     evictionListener.notifyEvicted(buffer);
     reportRemoved(buffer);
   }
+
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
index f91a5d9..0526033 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
+import org.apache.hadoop.hive.common.io.CacheTag;
+
 /**
  * Buffer that can be managed by LowLevelEvictionPolicy.
  * We want to have cacheable and non-allocator buffers, as well as allocator buffers with no
@@ -56,7 +58,7 @@ public abstract class LlapCacheableBuffer {
         + lastUpdate + " " + (isLocked() ? "!" : ".") + "]";
   }
 
-  public abstract String getTag();
+  public abstract CacheTag getTag();
 
   protected abstract boolean isLocked();
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index 405fca2..3d5e08e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -18,25 +18,27 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
+import org.apache.hadoop.hive.common.io.CacheTag;
+
 public final class LlapDataBuffer extends LlapAllocatorBuffer {
   public static final int UNKNOWN_CACHED_LENGTH = -1;
 
   /** ORC cache uses this to store compressed length; buffer is cached uncompressed, but
    * the lookup is on compressed ranges, so we need to know this. */
   public int declaredCachedLength = UNKNOWN_CACHED_LENGTH;
-  private String tag;
+  private CacheTag tag;
 
   @Override
   public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
     evictionDispatcher.notifyEvicted(this);
   }
 
-  public void setTag(String tag) {
+  public void setTag(CacheTag tag) {
     this.tag = tag;
   }
 
   @Override
-  public String getTag() {
+  public CacheTag getTag() {
     return tag;
   }
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index 4dd3826..f780b29 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
@@ -58,7 +59,7 @@ public interface LowLevelCache {
    *         the replacement chunks from cache are updated directly in the array.
    */
   long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
-      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag);
+      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag);
 
   /** Notifies the cache that a particular buffer should be removed due to eviction. */
   void notifyEvicted(MemoryBuffer buffer);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 62d7e55..5d0f2ab 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
-import org.apache.orc.impl.RecordReaderUtils;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -29,19 +27,20 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hive.common.io.Allocator;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hive.common.util.Ref;
+import org.apache.orc.impl.RecordReaderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.Joiner;
 
 public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapIoDebugDump {
   private static final int DEFAULT_CLEANUP_INTERVAL = 600;
@@ -290,7 +289,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
 
   @Override
   public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers,
-      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) {
+      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag) {
     long[] result = null;
     assert buffers.length == ranges.length;
     FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache =
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index 2a39d2d..7930fd9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -75,16 +76,16 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
 
   public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer {
     public boolean isCached = false;
-    private String tag;
+    private CacheTag tag;
     @Override
     public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
       evictionDispatcher.notifyEvicted(this);
     }
-    public void setTag(String tag) {
+    public void setTag(CacheTag tag) {
       this.tag = tag;
     }
     @Override
-    public String getTag() {
+    public CacheTag getTag() {
       return tag;
     }
   }
@@ -523,7 +524,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
   }
 
   public void putFileData(final FileData data, Priority priority,
-      LowLevelCacheCounters qfCounters, String tag) {
+      LowLevelCacheCounters qfCounters, CacheTag tag) {
     // TODO: buffers are accounted for at allocation time, but ideally we should report the memory
     //       overhead from the java objects to memory manager and remove it when discarding file.
     if (data.stripes == null || data.stripes.isEmpty()) {
@@ -598,7 +599,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
     }
   }
 
-  private void lockAllBuffersForPut(StripeData si, Priority priority, String tag) {
+  private void lockAllBuffersForPut(StripeData si, Priority priority, CacheTag tag) {
     for (int i = 0; i < si.data.length; ++i) {
       LlapSerDeDataBuffer[][] colData = si.data[i];
       if (colData == null) continue;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
index 41855e1..b2f606a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.cache;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
 import org.apache.hadoop.hive.common.io.DiskRange;
@@ -85,7 +86,7 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
 
   @Override
   public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
-      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) {
+      long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag) {
     for (int i = 0; i < chunks.length; ++i) {
       LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)chunks[i];
       if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index c63ee5f..0d9077c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.management.ObjectName;
 
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -299,7 +300,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
 
     @Override
     public long[] putFileData(Object fileKey, DiskRange[] ranges,
-        MemoryBuffer[] data, long baseOffset, String tag) {
+        MemoryBuffer[] data, long baseOffset, CacheTag tag) {
       return lowLevelCache.putFileData(
           fileKey, ranges, data, baseOffset, Priority.NORMAL, null, tag);
     }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 1378a01..8cc81cc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -41,8 +41,6 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -55,7 +53,6 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
@@ -75,8 +72,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import com.google.common.collect.Lists;
-
 class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
@@ -116,7 +111,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
       List<Integer> tableIncludedCols, String hostName, ColumnVectorProducer cvp,
       ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
       Reporter reporter, Configuration daemonConf) throws IOException, HiveException {
-    MapWork mapWork = findMapWork(job);
+    MapWork mapWork = LlapHiveUtils.findMapWork(job);
     if (mapWork == null) return null; // No compatible MapWork.
     LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, tableIncludedCols, hostName,
         cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
@@ -302,37 +297,6 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
     return Math.max(bestEffortSize, queueLimitMin);
   }
 
-
-  private static MapWork findMapWork(JobConf job) throws HiveException {
-    String inputName = job.get(Utilities.INPUT_NAME, null);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Initializing for input " + inputName);
-    }
-    String prefixes = job.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
-    if (prefixes != null && !StringUtils.isBlank(prefixes)) {
-      // Currently SMB is broken, so we cannot check if it's  compatible with IO elevator.
-      // So, we don't use the below code that would get the correct MapWork. See HIVE-16985.
-      return null;
-    }
-
-    BaseWork work = null;
-    // HIVE-16985: try to find the fake merge work for SMB join, that is really another MapWork.
-    if (inputName != null) {
-      if (prefixes == null ||
-          !Lists.newArrayList(prefixes.split(",")).contains(inputName)) {
-        inputName = null;
-      }
-    }
-    if (inputName != null) {
-      work = Utilities.getMergeWork(job, inputName);
-    }
-
-    if (!(work instanceof MapWork)) {
-      work = Utilities.getMapWork(job);
-    }
-    return (MapWork) work;
-  }
-
   /**
    * Starts the data read pipeline
    */
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 2a0c5ca..17c4821 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -88,12 +88,12 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
       Consumer<ColumnVectorBatch> consumer, FileSplit split, Includes includes,
       SearchArgument sarg, QueryFragmentCounters counters, SchemaEvolutionFactory sef,
       InputFormat<?, ?> unused0, Deserializer unused1, Reporter reporter, JobConf job,
-      Map<Path, PartitionDesc> unused2) throws IOException {
+      Map<Path, PartitionDesc> parts) throws IOException {
     cacheMetrics.incrCacheReadRequests();
     OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
         consumer, includes, _skipCorrupt, counters, ioMetrics);
     OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
-        metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool);
+        metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool, parts);
     edc.init(reader, reader, reader.getTrace());
     return edc;
   }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 85a42f9..3fcf0dc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
 import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -41,7 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.FixedSizedObjectPool;
@@ -167,7 +170,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private EncodedReader stripeReader;
   private CompressionCodec codec;
   private Object fileKey;
-  private final String cacheTag;
+  private final CacheTag cacheTag;
+  private final Map<Path, PartitionDesc> parts;
 
   private Utilities.SupplierWithCheckedException<FileSystem, IOException> fsSupplier;
 
@@ -187,7 +191,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
       MetadataCache metadataCache, Configuration daemonConf, Configuration jobConf,
       FileSplit split, Includes includes, SearchArgument sarg, OrcEncodedDataConsumer consumer,
-      QueryFragmentCounters counters, SchemaEvolutionFactory sef, Pool<IoTrace> tracePool)
+      QueryFragmentCounters counters, SchemaEvolutionFactory sef, Pool<IoTrace> tracePool,
+      Map<Path, PartitionDesc> parts)
           throws IOException {
     this.lowLevelCache = lowLevelCache;
     this.metadataCache = metadataCache;
@@ -199,6 +204,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     this.counters = counters;
     this.trace = tracePool.take();
     this.tracePool = tracePool;
+    this.parts = parts;
     try {
       this.ugi = UserGroupInformation.getCurrentUser();
     } catch (IOException e) {
@@ -210,7 +216,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     // LlapInputFormat needs to know the file schema to decide if schema evolution is supported.
     orcReader = null;
     cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
-        ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
+        ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null;
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
     fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf);
@@ -278,7 +284,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       return null;
     }
     counters.setDesc(QueryFragmentCounters.Desc.TABLE,
-        LlapUtil.getDbAndTableNameForMetrics(split.getPath(), false));
+        LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), false, parts));
     counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
         + (fileKey == null ? "" : " (" + fileKey + ")"));
     try {
@@ -927,7 +933,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
 
     @Override
     public long[] putFileData(Object fileKey, DiskRange[] ranges,
-        MemoryBuffer[] data, long baseOffset, String tag) {
+        MemoryBuffer[] data, long baseOffset, CacheTag tag) {
       if (data != null) {
         return lowLevelCache.putFileData(
             fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index d414b14..c73ba2c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hive.llap.io.encoded;
 
-import org.apache.orc.impl.MemoryManager;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -37,16 +35,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
 import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
@@ -89,14 +88,15 @@ import org.apache.hive.common.util.Ref;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcConf;
-import org.apache.orc.OrcUtils;
 import org.apache.orc.OrcFile.EncodingStrategy;
 import org.apache.orc.OrcFile.Version;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcProto.ColumnEncoding;
-import org.apache.orc.TypeDescription;
+import org.apache.orc.OrcUtils;
 import org.apache.orc.PhysicalWriter;
 import org.apache.orc.PhysicalWriter.OutputReceiver;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.MemoryManager;
 import org.apache.orc.impl.SchemaEvolution;
 import org.apache.orc.impl.StreamName;
 import org.apache.tez.common.CallableWithNdc;
@@ -149,7 +149,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   private final Map<Path, PartitionDesc> parts;
 
   private final Object fileKey;
-  private final String cacheTag;
+  private final CacheTag cacheTag;
   private final FileSystem fs;
 
   private AtomicBoolean isStopped = new AtomicBoolean(false);
@@ -219,7 +219,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
         HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
         !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
     cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
-        ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
+        ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null;
     this.sourceInputFormat = sourceInputFormat;
     this.sourceSerDe = sourceSerDe;
     this.reporter = reporter;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
index 8400fe9..10bd736 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
-import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
 import org.apache.hadoop.hive.llap.cache.LlapIoDebugDump;
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
 
 public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
   private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata =
@@ -173,7 +172,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
 
   @Override
   public MemoryBufferOrBuffers putFileMetadata(Object fileKey,
-      ByteBuffer tailBuffer, String tag) {
+      ByteBuffer tailBuffer, CacheTag tag) {
     return putInternal(fileKey, tailBuffer, tag, null);
   }
 
@@ -184,26 +183,26 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
   }
 
   public LlapBufferOrBuffers putStripeTail(
-      OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+      OrcBatchKey stripeKey, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) {
     return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag, isStopped);
   }
 
   @Override
   public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
-      InputStream is, String tag) throws IOException {
+      InputStream is, CacheTag tag) throws IOException {
     return putFileMetadata(fileKey, length, is, tag, null);
   }
 
 
   @Override
   public LlapBufferOrBuffers putFileMetadata(Object fileKey,
-      ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+      ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) {
     return putInternal(fileKey, tailBuffer, tag, isStopped);
   }
 
   @Override
   public LlapBufferOrBuffers putFileMetadata(Object fileKey, int length, InputStream is,
-      String tag, AtomicBoolean isStopped) throws IOException {
+      CacheTag tag, AtomicBoolean isStopped) throws IOException {
     LlapBufferOrBuffers result = null;
     while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
       LlapBufferOrBuffers oldVal = metadata.get(fileKey);
@@ -229,7 +228,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
-      Object fileKey, int length, InputStream stream, String tag, AtomicBoolean isStopped) throws IOException {
+      Object fileKey, int length, InputStream stream, CacheTag tag, AtomicBoolean isStopped) throws IOException {
     if (result != null) return result;
     int maxAlloc = allocator.getMaxAllocation();
     LlapMetadataBuffer<Object>[] largeBuffers = null;
@@ -274,7 +273,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
     bb.position(pos);
   }
 
-  private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+  private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) {
     LlapBufferOrBuffers result = null;
     while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
       LlapBufferOrBuffers oldVal = metadata.get(key);
@@ -337,7 +336,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
   }
 
   private <T> LlapBufferOrBuffers wrapBb(
-      LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+      LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) {
     if (result != null) return result;
     if (tailBuffer.remaining() <= allocator.getMaxAllocation()) {
       // The common case by far.
@@ -507,9 +506,9 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
   public final static class LlapMetadataBuffer<T>
       extends LlapAllocatorBuffer implements LlapBufferOrBuffers {
     private final T key;
-    private String tag;
+    private CacheTag tag;
 
-    public LlapMetadataBuffer(T key, String tag) {
+    public LlapMetadataBuffer(T key, CacheTag tag) {
       this.key = key;
       this.tag = tag;
     }
@@ -545,7 +544,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
     }
 
     @Override
-    public String getTag() {
+    public CacheTag getTag() {
       return tag;
     }
   }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index 30dc1b9..33e1680 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
 import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
@@ -123,8 +124,8 @@ public class OrcFileEstimateErrors extends LlapCacheableBuffer {
   }
 
   @Override
-  public String getTag() {
+  public CacheTag getTag() {
     // We don't care about these.
-    return "OrcEstimates";
+    return CacheTag.build("OrcEstimates");
   }
 }
\ No newline at end of file
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
new file mode 100644
index 0000000..1d242e0
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import org.apache.hadoop.hive.common.io.CacheTag;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.toCollection;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for TestCacheContentsTracker functions.
+ */
+public class TestCacheContentsTracker {
+
+  private static CacheContentsTracker tracker;
+
+  @BeforeClass
+  public static void setup() {
+    LowLevelCachePolicy lowLevelCachePolicyMock = mock(LowLevelCachePolicy.class);
+    EvictionListener evictionListenerMock = mock(EvictionListener.class);
+    tracker = new CacheContentsTracker(lowLevelCachePolicyMock);
+    tracker.setEvictionListener(evictionListenerMock);
+  }
+
+  /**
+   * Tests parent CacheTag generation by checking each step when traversing from 3rd level
+   * partition to DB level.
+   */
+  @Test
+  public void testParentCacheTagGeneration() {
+    CacheTag db = cacheTagBuilder("dbname");
+    CacheTag table = cacheTagBuilder("dbname.tablename");
+    CacheTag p = cacheTagBuilder("dbname.tablename", "p=v1");
+    CacheTag pp = cacheTagBuilder("dbname.tablename", "p=v1", "pp=vv1");
+    CacheTag ppp = cacheTagBuilder("dbname.tablename", "p=v1", "pp=vv1", "ppp=vvv1");
+
+    assertTrue(pp.compareTo(CacheTag.createParentCacheTag(ppp)) == 0);
+    assertTrue(p.compareTo(CacheTag.createParentCacheTag(pp)) == 0);
+    assertTrue(table.compareTo(CacheTag.createParentCacheTag(p)) == 0);
+    assertTrue(db.compareTo(CacheTag.createParentCacheTag(table)) == 0);
+    assertNull(CacheTag.createParentCacheTag(db));
+  }
+
+  /**
+   * Caches some mock buffers and checks summary produced by CacheContentsTracker. Later this is
+   * done again after some mock buffers were evicted.
+   */
+  @Test
+  public void testAggregatedStatsGeneration() {
+    cacheTestBuffers();
+    StringBuilder sb = new StringBuilder();
+    tracker.debugDumpShort(sb);
+    assertEquals(EXPECTED_CACHE_STATE_WHEN_FULL, sb.toString());
+
+    evictSomeTestBuffers();
+    sb = new StringBuilder();
+    tracker.debugDumpShort(sb);
+    assertEquals(EXPECTED_CACHE_STATE_AFTER_EVICTION, sb.toString());
+  }
+
+  private static LlapCacheableBuffer createMockBuffer(long size, CacheTag cacheTag) {
+    LlapCacheableBuffer llapCacheableBufferMock = mock(LlapCacheableBuffer.class);
+
+    doAnswer(invocationOnMock -> {
+      return size;
+    }).when(llapCacheableBufferMock).getMemoryUsage();
+
+    doAnswer(invocationOnMock -> {
+      return cacheTag;
+    }).when(llapCacheableBufferMock).getTag();
+
+    return llapCacheableBufferMock;
+  }
+
+  private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
+    if (partitions != null && partitions.length > 0) {
+      LinkedList<String> parts = Arrays.stream(partitions).collect(toCollection(LinkedList::new));
+      return CacheTag.build(dbAndTable, parts);
+    } else {
+      return CacheTag.build(dbAndTable);
+    }
+  }
+
+  private static void cacheTestBuffers() {
+    tracker.cache(createMockBuffer(4 * 1024L,
+        cacheTagBuilder("default.testtable")), null);
+    tracker.cache(createMockBuffer(2 * 1024L,
+        cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1")), null);
+    tracker.cache(createMockBuffer(32 * 1024L,
+        cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1")), null);
+    tracker.cache(createMockBuffer(64 * 1024L,
+        cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv2")), null);
+    tracker.cache(createMockBuffer(128 * 1024L,
+        cacheTagBuilder("otherdb.testtable", "p=v2", "pp=vv1")), null);
+    tracker.cache(createMockBuffer(256 * 1024L,
+        cacheTagBuilder("otherdb.testtable2", "p=v3")), null);
+    tracker.cache(createMockBuffer(512 * 1024 * 1024L,
+        cacheTagBuilder("otherdb.testtable2", "p=v3")), null);
+    tracker.cache(createMockBuffer(1024 * 1024 * 1024L,
+        cacheTagBuilder("otherdb.testtable3")), null);
+    tracker.cache(createMockBuffer(2 * 1024 * 1024L,
+        cacheTagBuilder("default.testtable")), null);
+  }
+
+  private static void evictSomeTestBuffers() {
+    tracker.notifyEvicted(createMockBuffer(32 * 1024L,
+        cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1")));
+    tracker.notifyEvicted(createMockBuffer(512 * 1024 * 1024L,
+        cacheTagBuilder("otherdb.testtable2", "p=v3")));
+    tracker.notifyEvicted(createMockBuffer(2 * 1024 * 1024L,
+        cacheTagBuilder("default.testtable")));
+    tracker.notifyEvicted(createMockBuffer(4 * 1024L,
+        cacheTagBuilder("default.testtable")));
+  }
+
+  private static final String EXPECTED_CACHE_STATE_WHEN_FULL =
+      "\n" +
+          "Cache state: \n" +
+          "default : 2/2, 2101248/2101248\n" +
+          "default.testtable : 2/2, 2101248/2101248\n" +
+          "otherdb : 7/7, 1611106304/1611106304\n" +
+          "otherdb.testtable : 4/4, 231424/231424\n" +
+          "otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
+          "otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
+          "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
+          "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
+          "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
+          "otherdb.testtable2 : 2/2, 537133056/537133056\n" +
+          "otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
+          "otherdb.testtable3 : 1/1, 1073741824/1073741824";
+
+  private static final String EXPECTED_CACHE_STATE_AFTER_EVICTION =
+      "\n" +
+          "Cache state: \n" +
+          "default : 0/2, 0/2101248\n" +
+          "default.testtable : 0/2, 0/2101248\n" +
+          "otherdb : 5/7, 1074202624/1611106304\n" +
+          "otherdb.testtable : 3/4, 198656/231424\n" +
+          "otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
+          "otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
+          "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
+          "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
+          "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
+          "otherdb.testtable2 : 1/2, 262144/537133056\n" +
+          "otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
+          "otherdb.testtable3 : 1/1, 1073741824/1073741824";
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index af04a51..c887394 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -61,7 +62,7 @@ public class LlapCacheAwareFs extends FileSystem {
       new ConcurrentHashMap<>();
 
   public static Path registerFile(DataCache cache, Path path, Object fileKey,
-      TreeMap<Long, Long> index, Configuration conf, String tag) throws IOException {
+      TreeMap<Long, Long> index, Configuration conf, CacheTag tag) throws IOException {
     long splitId = currentSplitId.incrementAndGet();
     CacheAwareInputStream stream = new CacheAwareInputStream(
         cache, conf, index, path, fileKey, -1, tag);
@@ -170,14 +171,14 @@ public class LlapCacheAwareFs extends FileSystem {
     private final TreeMap<Long, Long> chunkIndex;
     private final Path path;
     private final Object fileKey;
-    private final String tag;
+    private final CacheTag tag;
     private final Configuration conf;
     private final DataCache cache;
     private final int bufferSize;
     private long position = 0;
 
     public CacheAwareInputStream(DataCache cache, Configuration conf,
-        TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize, String tag) {
+        TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize, CacheTag tag) {
       this.cache = cache;
       this.fileKey = fileKey;
       this.chunkIndex = chunkIndex;
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
new file mode 100644
index 0000000..a041426
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Covers utility functions that are used by LLAP code and depend on Hive constructs e.g. ql code.
+ */
+public final class LlapHiveUtils {
+
+  public static final Logger LOG = LoggerFactory.getLogger(LlapHiveUtils.class);
+
+  private LlapHiveUtils() {
+    // Not to be used;
+  }
+
+  public static CacheTag getDbAndTableNameForMetrics(Path path, boolean includeParts,
+        Map<Path, PartitionDesc> parts) {
+
+    assert(parts != null);
+
+    // Look for PartitionDesc instance matching our Path
+    Path parentPath = path;
+    PartitionDesc part = parts.get(parentPath);
+    while (!parentPath.isRoot() && part == null) {
+      parentPath = parentPath.getParent();
+      part = parts.get(parentPath);
+    }
+
+    // Fallback to legacy cache tag creation logic.
+    if (part == null) {
+      return CacheTag.build(LlapUtil.getDbAndTableNameForMetrics(path, includeParts));
+    }
+
+    if (!includeParts || !part.isPartitioned()) {
+      return CacheTag.build(part.getTableName());
+    } else {
+      return CacheTag.build(part.getTableName(), part.getPartSpec());
+    }
+  }
+
+  /**
+   * Returns MapWork based what is serialized in the JobConf instance provided.
+   * @param job
+   * @return the MapWork instance. Might be null if missing.
+   * @throws HiveException
+   */
+  public static MapWork findMapWork(JobConf job) throws HiveException {
+    String inputName = job.get(Utilities.INPUT_NAME, null);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initializing for input " + inputName);
+    }
+    String prefixes = job.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+    if (prefixes != null && !StringUtils.isBlank(prefixes)) {
+      // Currently SMB is broken, so we cannot check if it's  compatible with IO elevator.
+      // So, we don't use the below code that would get the correct MapWork. See HIVE-16985.
+      return null;
+    }
+
+    BaseWork work = null;
+    // HIVE-16985: try to find the fake merge work for SMB join, that is really another MapWork.
+    if (inputName != null) {
+      if (prefixes == null ||
+              !Lists.newArrayList(prefixes.split(",")).contains(inputName)) {
+        inputName = null;
+      }
+    }
+    if (inputName != null) {
+      work = Utilities.getMergeWork(job, inputName);
+    }
+
+    if (!(work instanceof MapWork)) {
+      work = Utilities.getMapWork(job);
+    }
+    return (MapWork) work;
+  }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 241a300..117e4e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
 import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -148,14 +149,14 @@ class EncodedReaderImpl implements EncodedReader {
   private final IoTrace trace;
   private final TypeDescription fileSchema;
   private final WriterVersion version;
-  private final String tag;
+  private final CacheTag tag;
   private AtomicBoolean isStopped;
   private StoppableAllocator allocator;
 
   public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
       TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
       int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException {
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException {
     this.fileKey = fileKey;
     this.compressionKind = kind;
     this.isCompressed = kind != org.apache.orc.CompressionKind.NONE;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 210c987..8d3336f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.common.Pool;
 import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -46,7 +47,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
    * @return The reader.
    */
   EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException;
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException;
 
   /** The factory that can create (or return) the pools used by encoded reader. */
   public interface PoolFactory {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index a9a9f10..e137c24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.DataReader;
@@ -35,7 +36,7 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
 
   @Override
   public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException {
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException {
     return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(),
         bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 61e2556..ea6dfb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -22,13 +22,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.FileMetadataCache;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapCacheAwareFs;
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -37,6 +38,9 @@ import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
 import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
@@ -78,6 +82,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -104,6 +109,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private Object[] partitionValues;
   private Path cacheFsPath;
   private static final int MAP_DEFINITION_LEVEL_MAX = 3;
+  private Map<Path, PartitionDesc> parts;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -170,13 +176,19 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   @SuppressWarnings("deprecation")
   public void initialize(
     InputSplit oldSplit,
-    JobConf configuration) throws IOException, InterruptedException {
+    JobConf configuration) throws IOException, InterruptedException, HiveException {
     // the oldSplit may be null during the split phase
     if (oldSplit == null) {
       return;
     }
     ParquetMetadata footer;
     List<BlockMetaData> blocks;
+
+    MapWork mapWork = LlapHiveUtils.findMapWork(jobConf);
+    if (mapWork != null) {
+      parts = mapWork.getPathToPartitionInfo();
+    }
+
     ParquetInputSplit split = (ParquetInputSplit) oldSplit;
     boolean indexAccess =
       configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false);
@@ -190,7 +202,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
 
     // if task.side.metadata is set, rowGroupOffsets is null
     Object cacheKey = null;
-    String cacheTag = null;
+    CacheTag cacheTag = null;
     // TODO: also support fileKey in splits, like OrcSplit does
     if (metadataCache != null) {
       cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file,
@@ -200,7 +212,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     }
     if (cacheKey != null) {
       if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) {
-        cacheTag = LlapUtil.getDbAndTableNameForMetrics(file, true);
+        cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(file, true, parts);
       }
       // If we are going to use cache, change the path to depend on file ID for extra consistency.
       FileSystem fs = file.getFileSystem(configuration);
@@ -265,7 +277,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   }
 
   private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration,
-      List<BlockMetaData> blocks, String tag) throws IOException {
+      List<BlockMetaData> blocks, CacheTag tag) throws IOException {
     if (fileKey == null || cache == null) {
       return path;
     }
@@ -292,7 +304,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   }
 
   private ParquetMetadata readSplitFooter(JobConf configuration, final Path file,
-      Object cacheKey, MetadataFilter filter, String tag) throws IOException {
+      Object cacheKey, MetadataFilter filter, CacheTag tag) throws IOException {
     MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null
         : metadataCache.getFileMetadata(cacheKey);
     if (footerData != null) {
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
new file mode 100644
index 0000000..623c181
--- /dev/null
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.io;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Used for identifying the related object of the buffer stored in cache.
+ * Comes in 3 flavours to optimize for minimal memory overhead:
+ * - TableCacheTag for tables without partitions: DB/table level
+ * - SinglePartitionCacheTag for tables with 1 partition level: DB/table/1st_partition
+ * - MultiPartitionCacheTag for tables with >1 partition levels:
+ *     DB/table/1st_partition/.../nth_partition .
+ */
+public abstract class CacheTag implements Comparable<CacheTag> {
+  /**
+   * Prepended by DB name and '.' .
+   */
+  protected final String tableName;
+
+  private CacheTag(String tableName) {
+    this.tableName = tableName.intern();
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public int compareTo(CacheTag o) {
+    return tableName.compareTo(o.tableName);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof CacheTag)) {
+      return false;
+    } else {
+      return this.compareTo((CacheTag) obj) == 0;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    int res = tableName.hashCode();
+    return res;
+  }
+
+  public static final CacheTag build(String tableName) {
+    if (StringUtils.isEmpty(tableName)) {
+      throw new IllegalArgumentException();
+    }
+    return new TableCacheTag(tableName);
+  }
+
+  public static final CacheTag build(String tableName, Map<String, String> partDescMap) {
+    if (StringUtils.isEmpty(tableName) || partDescMap == null || partDescMap.isEmpty()) {
+      throw new IllegalArgumentException();
+    }
+
+    LinkedList<String> partDescList = new LinkedList<>();
+
+    for (Map.Entry<String, String> entry : partDescMap.entrySet()) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(entry.getKey()).append("=").append(entry.getValue());
+      partDescList.add(sb.toString());
+    }
+
+    if (partDescList.size() == 1) {
+      return new SinglePartitionCacheTag(tableName, partDescList.get(0));
+    } else {
+      // In this case it must be >1
+      return new MultiPartitionCacheTag(tableName, partDescList);
+    }
+  }
+
+  // Assumes elements of partDescList are already in p1=v1 format
+  public static final CacheTag build(String tableName, LinkedList<String> partDescList) {
+    if (StringUtils.isEmpty(tableName) || partDescList == null || partDescList.isEmpty()) {
+      throw new IllegalArgumentException();
+    }
+
+    if (partDescList.size() == 1) {
+      return new SinglePartitionCacheTag(tableName, partDescList.get(0));
+    } else {
+      // In this case it must be >1
+      return new MultiPartitionCacheTag(tableName, partDescList);
+    }
+  }
+
+  /**
+   * Constructs a (fake) parent CacheTag instance by walking back in the hierarchy i.e. stepping
+   * from inner to outer partition levels, then producing a CacheTag for the table and finally
+   * the DB.
+   */
+  public static final CacheTag createParentCacheTag(CacheTag tag) {
+    if (tag == null) {
+      throw new IllegalArgumentException();
+    }
+
+    if (tag instanceof MultiPartitionCacheTag) {
+      MultiPartitionCacheTag multiPartitionCacheTag = (MultiPartitionCacheTag) tag;
+      if (multiPartitionCacheTag.partitionDesc.size() > 2) {
+        LinkedList<String> subList = new LinkedList<>(multiPartitionCacheTag.partitionDesc);
+        subList.removeLast();
+        return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName, subList);
+      } else {
+        return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName,
+            multiPartitionCacheTag.partitionDesc.get(0));
+      }
+    }
+
+    if (tag instanceof SinglePartitionCacheTag) {
+      return new TableCacheTag(tag.tableName);
+    } else {
+      // DB level
+      int ix = tag.tableName.indexOf(".");
+      if (ix <= 0) {
+        return null;
+      }
+      return new TableCacheTag(tag.tableName.substring(0, ix));
+    }
+
+  }
+
+  /**
+   * CacheTag for tables without partitions.
+   */
+  public static final class TableCacheTag extends CacheTag {
+
+    private TableCacheTag(String tableName) {
+      super(tableName);
+    }
+
+    @Override
+    public int compareTo(CacheTag o) {
+      if (o instanceof SinglePartitionCacheTag || o instanceof MultiPartitionCacheTag) {
+        return -1;
+      } else {
+        return super.compareTo(o);
+      }
+    }
+
+  }
+
+  /**
+   * CacheTag for tables with partitions.
+   */
+  public abstract static class PartitionCacheTag extends CacheTag {
+
+    private PartitionCacheTag(String tableName) {
+      super(tableName);
+    }
+
+    /**
+     * Returns a pretty printed String version of the partitionDesc in the format of p1=v1/p2=v2...
+     * @return the pretty printed String
+     */
+    public abstract String partitionDescToString();
+
+  }
+
+  /**
+   * CacheTag for tables with exactly one partition level.
+   */
+  public static final class SinglePartitionCacheTag extends PartitionCacheTag {
+
+    private final String partitionDesc;
+
+    private SinglePartitionCacheTag(String tableName, String partitionDesc) {
+      super(tableName);
+      if (StringUtils.isEmpty(partitionDesc)) {
+        throw new IllegalArgumentException();
+      }
+      this.partitionDesc = partitionDesc.intern();
+    }
+
+    @Override
+    public String partitionDescToString() {
+      return this.partitionDesc;
+    }
+
+    @Override
+    public int compareTo(CacheTag o) {
+      if (o instanceof TableCacheTag) {
+        return 1;
+      } else if (o instanceof MultiPartitionCacheTag) {
+        return -1;
+      }
+      SinglePartitionCacheTag other = (SinglePartitionCacheTag) o;
+      return super.compareTo(o) +
+          partitionDesc.toString().compareTo(other.partitionDesc.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode() + partitionDesc.hashCode();
+    }
+  }
+
+  /**
+   * CacheTag for tables with more than one partition level.
+   */
+  public static final class MultiPartitionCacheTag extends PartitionCacheTag {
+
+    private final LinkedList<String> partitionDesc;
+
+    private MultiPartitionCacheTag(String tableName, LinkedList<String> partitionDesc) {
+      super(tableName);
+      this.partitionDesc = partitionDesc;
+      if (this.partitionDesc != null && this.partitionDesc.size() > 1) {
+        this.partitionDesc.stream().forEach(p -> p.intern());
+      } else {
+        throw new IllegalArgumentException();
+      }
+    }
+
+    @Override
+    public int compareTo(CacheTag o) {
+      if (o instanceof TableCacheTag || o instanceof SinglePartitionCacheTag) {
+        return 1;
+      }
+      MultiPartitionCacheTag other = (MultiPartitionCacheTag) o;
+      return super.compareTo(o) +
+          partitionDesc.toString().compareTo(other.partitionDesc.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      int res = super.hashCode();
+      for (String p : partitionDesc) {
+        res += p.hashCode();
+      }
+      return res;
+    }
+
+    @Override
+    public String partitionDescToString() {
+      return String.join("/", this.partitionDesc);
+    }
+
+  }
+
+}
+
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
index 2ac0a18..9b23a71 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
@@ -110,5 +110,5 @@ public interface DataCache {
    *         the replacement chunks from cache are updated directly in the array.
    */
   long[] putFileData(Object fileKey, DiskRange[] ranges,
-      MemoryBuffer[] data, long baseOffset, String tag);
+      MemoryBuffer[] data, long baseOffset, CacheTag tag);
 }
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
index d7de361..4691722 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
@@ -42,10 +42,10 @@ public interface FileMetadataCache {
 
   @Deprecated
   MemoryBufferOrBuffers putFileMetadata(
-      Object fileKey, int length, InputStream is, String tag) throws IOException;
+      Object fileKey, int length, InputStream is, CacheTag tag) throws IOException;
 
   @Deprecated
-  MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag);
+  MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, CacheTag tag);
 
   /**
    * Releases the buffer returned from getFileMetadata or putFileMetadata method.
@@ -61,8 +61,8 @@ public interface FileMetadataCache {
    *         The caller must decref this buffer when done.
    */
   MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer,
-      String tag, AtomicBoolean isStopped);
+      CacheTag tag, AtomicBoolean isStopped);
 
   MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
-      InputStream is, String tag, AtomicBoolean isStopped) throws IOException;
+      InputStream is, CacheTag tag, AtomicBoolean isStopped) throws IOException;
 }