You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2019/10/15 09:09:52 UTC
[hive] branch master updated: HIVE-22284: Improve LLAP
CacheContentsTracker to collect and display correct statistics (Adam Szita,
reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6fa1e91 HIVE-22284: Improve LLAP CacheContentsTracker to collect and display correct statistics (Adam Szita, reviewed by Peter Vary)
6fa1e91 is described below
commit 6fa1e911979d0bf0d4cd98216f3b9f89bd95b706
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Tue Oct 15 11:01:19 2019 +0200
HIVE-22284: Improve LLAP CacheContentsTracker to collect and display correct statistics (Adam Szita, reviewed by Peter Vary)
---
.../java/org/apache/hadoop/hive/llap/LlapUtil.java | 3 +-
.../hive/llap/cache/CacheContentsTracker.java | 110 +++++----
.../hive/llap/cache/LlapCacheableBuffer.java | 4 +-
.../hadoop/hive/llap/cache/LlapDataBuffer.java | 8 +-
.../hadoop/hive/llap/cache/LowLevelCache.java | 3 +-
.../hadoop/hive/llap/cache/LowLevelCacheImpl.java | 11 +-
.../hive/llap/cache/SerDeLowLevelCacheImpl.java | 11 +-
.../hive/llap/cache/SimpleBufferManager.java | 3 +-
.../hadoop/hive/llap/io/api/impl/LlapIoImpl.java | 3 +-
.../hive/llap/io/api/impl/LlapRecordReader.java | 40 +---
.../llap/io/decode/OrcColumnVectorProducer.java | 4 +-
.../hive/llap/io/encoded/OrcEncodedDataReader.java | 18 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 16 +-
.../hive/llap/io/metadata/MetadataCache.java | 25 +-
.../llap/io/metadata/OrcFileEstimateErrors.java | 5 +-
.../hive/llap/cache/TestCacheContentsTracker.java | 173 ++++++++++++++
.../apache/hadoop/hive/llap/LlapCacheAwareFs.java | 7 +-
.../org/apache/hadoop/hive/llap/LlapHiveUtils.java | 110 +++++++++
.../hive/ql/io/orc/encoded/EncodedReaderImpl.java | 5 +-
.../hadoop/hive/ql/io/orc/encoded/Reader.java | 3 +-
.../hadoop/hive/ql/io/orc/encoded/ReaderImpl.java | 3 +-
.../vector/VectorizedParquetRecordReader.java | 24 +-
.../org/apache/hadoop/hive/common/io/CacheTag.java | 263 +++++++++++++++++++++
.../apache/hadoop/hive/common/io/DataCache.java | 2 +-
.../hadoop/hive/common/io/FileMetadataCache.java | 8 +-
25 files changed, 708 insertions(+), 154 deletions(-)
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index a351a19..c26ab62 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -314,8 +314,7 @@ public class LlapUtil {
DELETE_DELTA_PREFIX = "delete_delta_", BUCKET_PREFIX = "bucket_",
DATABASE_PATH_SUFFIX = ".db", UNION_SUDBIR_PREFIX = "HIVE_UNION_SUBDIR_";
- public static final char DERIVED_ENTITY_PARTITION_SEPARATOR = '/';
-
+ @Deprecated
public static String getDbAndTableNameForMetrics(Path path, boolean includeParts) {
String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
int dbIx = -1;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
index 64c0125..733b30c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/CacheContentsTracker.java
@@ -18,19 +18,24 @@
package org.apache.hadoop.hive.llap.cache;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import static java.util.stream.Collectors.joining;
+
/**
* A wrapper around cache eviction policy that tracks cache contents via tags.
*/
public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListener {
private static final long CLEANUP_TIME_MS = 3600 * 1000L, MIN_TIME_MS = 300 * 1000L;
- private final ConcurrentSkipListMap<String, TagState> tagInfo = new ConcurrentSkipListMap<>();
+ private final ConcurrentSkipListMap<CacheTag, TagState> tagInfo = new ConcurrentSkipListMap<>();
private EvictionListener evictionListener;
private LowLevelCachePolicy realPolicy;
private final Thread cleanupThread;
@@ -75,56 +80,37 @@ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListen
}
private static class TagState {
- public TagState(String name) {
- this.name = name;
+ TagState(CacheTag cacheTag) {
+ this.cacheTag = cacheTag;
}
- public final String name;
+ public final CacheTag cacheTag;
public long emptyTimeNs;
public long bufferCount, totalSize, maxCount, maxSize;
- public boolean isRemoved = false;
}
-
private void reportCached(LlapCacheableBuffer buffer) {
long size = buffer.getMemoryUsage();
- TagState state;
- do {
- state = getTagState(buffer);
- } while (!reportCached(state, size));
- state = null;
- do {
- state = getParentTagState(buffer);
- if (state == null) break;
- } while (!reportCached(state, size));
- }
-
- private boolean reportCached(TagState state, long size) {
+ TagState state = getTagState(buffer);
+ reportCached(state, size);
+ }
+
+ private void reportCached(TagState state, long size) {
synchronized (state) {
- if (state.isRemoved) return false;
++state.bufferCount;
state.totalSize += size;
state.maxSize = Math.max(state.maxSize, state.totalSize);
state.maxCount = Math.max(state.maxCount, state.bufferCount);
}
- return true;
}
private void reportRemoved(LlapCacheableBuffer buffer) {
long size = buffer.getMemoryUsage();
- TagState state;
- do {
- state = getTagState(buffer);
- } while (!reportRemoved(state, size));
- state = null;
- do {
- state = getParentTagState(buffer);
- if (state == null) break;
- } while (!reportRemoved(state, size));
- }
-
- private boolean reportRemoved(TagState state, long size) {
+ TagState state = getTagState(buffer);
+ reportRemoved(state, size);
+ }
+
+ private void reportRemoved(TagState state, long size) {
synchronized (state) {
- if (state.isRemoved) return false;
--state.bufferCount;
assert state.bufferCount >= 0;
state.totalSize -= size;
@@ -132,21 +118,13 @@ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListen
state.emptyTimeNs = System.nanoTime();
}
}
- return true;
}
private TagState getTagState(LlapCacheableBuffer buffer) {
return getTagState(buffer.getTag());
}
- private TagState getParentTagState(LlapCacheableBuffer buffer) {
- String tag = buffer.getTag();
- int ix = tag.indexOf(LlapUtil.DERIVED_ENTITY_PARTITION_SEPARATOR);
- if (ix <= 0) return null;
- return getTagState(tag.substring(0, ix));
- }
-
- private TagState getTagState(String tag) {
+ private TagState getTagState(CacheTag tag) {
TagState state = tagInfo.get(tag);
if (state == null) {
state = new TagState(tag);
@@ -191,14 +169,51 @@ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListen
@Override
public void debugDumpShort(StringBuilder sb) {
- sb.append("\nCache state: ");
+ ArrayList<String> endResult = new ArrayList<>();
+ Map<CacheTag, TagState> summaries = new TreeMap<>();
+
for (TagState state : tagInfo.values()) {
synchronized (state) {
- sb.append("\n").append(state.name).append(": ").append(state.bufferCount).append("/")
- .append(state.maxCount).append(", ").append(state.totalSize).append("/")
- .append(state.maxSize);
+ endResult.add(unsafePrintTagState(state));
+
+ // Handle summary calculation
+ CacheTag parentTag = CacheTag.createParentCacheTag(state.cacheTag);
+ while (parentTag != null) {
+ if (!summaries.containsKey(parentTag)) {
+ summaries.put(parentTag, new TagState(parentTag));
+ }
+ TagState parentState = summaries.get(parentTag);
+ parentState.bufferCount += state.bufferCount;
+ parentState.maxCount += state.maxCount;
+ parentState.totalSize += state.totalSize;
+ parentState.maxSize += state.maxSize;
+ parentTag = CacheTag.createParentCacheTag(parentTag);
+ }
}
}
+ for (TagState state : summaries.values()) {
+ endResult.add(unsafePrintTagState(state));
+ }
+ sb.append("\nCache state: \n");
+ sb.append(endResult.stream().sorted().collect(joining("\n")));
+ }
+
+ /**
+ * Constructs a String by pretty printing a TagState instance - for Web UI consumption.
+ * Note: does not lock on TagState instance.
+ * @param state
+ * @return
+ */
+ private String unsafePrintTagState(TagState state) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(state.cacheTag.getTableName());
+ if (state.cacheTag instanceof CacheTag.PartitionCacheTag) {
+ sb.append("/").append(String.join("/",
+ ((CacheTag.PartitionCacheTag) state.cacheTag).partitionDescToString()));
+ }
+ sb.append(" : ").append(state.bufferCount).append("/").append(state.maxCount).append(", ")
+ .append(state.totalSize).append("/").append(state.maxSize);
+ return sb.toString();
}
@Override
@@ -206,4 +221,5 @@ public class CacheContentsTracker implements LowLevelCachePolicy, EvictionListen
evictionListener.notifyEvicted(buffer);
reportRemoved(buffer);
}
+
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
index f91a5d9..0526033 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.llap.cache;
+import org.apache.hadoop.hive.common.io.CacheTag;
+
/**
* Buffer that can be managed by LowLevelEvictionPolicy.
* We want to have cacheable and non-allocator buffers, as well as allocator buffers with no
@@ -56,7 +58,7 @@ public abstract class LlapCacheableBuffer {
+ lastUpdate + " " + (isLocked() ? "!" : ".") + "]";
}
- public abstract String getTag();
+ public abstract CacheTag getTag();
protected abstract boolean isLocked();
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index 405fca2..3d5e08e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -18,25 +18,27 @@
package org.apache.hadoop.hive.llap.cache;
+import org.apache.hadoop.hive.common.io.CacheTag;
+
public final class LlapDataBuffer extends LlapAllocatorBuffer {
public static final int UNKNOWN_CACHED_LENGTH = -1;
/** ORC cache uses this to store compressed length; buffer is cached uncompressed, but
* the lookup is on compressed ranges, so we need to know this. */
public int declaredCachedLength = UNKNOWN_CACHED_LENGTH;
- private String tag;
+ private CacheTag tag;
@Override
public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
evictionDispatcher.notifyEvicted(this);
}
- public void setTag(String tag) {
+ public void setTag(CacheTag tag) {
this.tag = tag;
}
@Override
- public String getTag() {
+ public CacheTag getTag() {
return tag;
}
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index 4dd3826..f780b29 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.llap.cache;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
@@ -58,7 +59,7 @@ public interface LowLevelCache {
* the replacement chunks from cache are updated directly in the array.
*/
long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
- long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag);
+ long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag);
/** Notifies the cache that a particular buffer should be removed due to eviction. */
void notifyEvicted(MemoryBuffer buffer);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 62d7e55..5d0f2ab 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.llap.cache;
-import org.apache.orc.impl.RecordReaderUtils;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
@@ -29,19 +27,20 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.common.io.Allocator;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hive.common.util.Ref;
+import org.apache.orc.impl.RecordReaderUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.Joiner;
public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapIoDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
@@ -290,7 +289,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers,
- long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) {
+ long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag) {
long[] result = null;
assert buffers.length == ranges.length;
FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache =
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index 2a39d2d..7930fd9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -75,16 +76,16 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
public static final class LlapSerDeDataBuffer extends LlapAllocatorBuffer {
public boolean isCached = false;
- private String tag;
+ private CacheTag tag;
@Override
public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
evictionDispatcher.notifyEvicted(this);
}
- public void setTag(String tag) {
+ public void setTag(CacheTag tag) {
this.tag = tag;
}
@Override
- public String getTag() {
+ public CacheTag getTag() {
return tag;
}
}
@@ -523,7 +524,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
}
public void putFileData(final FileData data, Priority priority,
- LowLevelCacheCounters qfCounters, String tag) {
+ LowLevelCacheCounters qfCounters, CacheTag tag) {
// TODO: buffers are accounted for at allocation time, but ideally we should report the memory
// overhead from the java objects to memory manager and remove it when discarding file.
if (data.stripes == null || data.stripes.isEmpty()) {
@@ -598,7 +599,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapIoDebugDu
}
}
- private void lockAllBuffersForPut(StripeData si, Priority priority, String tag) {
+ private void lockAllBuffersForPut(StripeData si, Priority priority, CacheTag tag) {
for (int i = 0; i < si.data.length; ++i) {
LlapSerDeDataBuffer[][] colData = si.data[i];
if (colData == null) continue;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
index 41855e1..b2f606a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleBufferManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.cache;
import java.util.List;
import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
import org.apache.hadoop.hive.common.io.DiskRange;
@@ -85,7 +86,7 @@ public class SimpleBufferManager implements BufferUsageManager, LowLevelCache {
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
- long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, String tag) {
+ long baseOffset, Priority priority, LowLevelCacheCounters qfCounters, CacheTag tag) {
for (int i = 0; i < chunks.length; ++i) {
LlapAllocatorBuffer buffer = (LlapAllocatorBuffer)chunks[i];
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index c63ee5f..0d9077c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import javax.management.ObjectName;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -299,7 +300,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges,
- MemoryBuffer[] data, long baseOffset, String tag) {
+ MemoryBuffer[] data, long baseOffset, CacheTag tag) {
return lowLevelCache.putFileData(
fileKey, ranges, data, baseOffset, Priority.NORMAL, null, tag);
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 1378a01..8cc81cc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -41,8 +41,6 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -55,7 +53,6 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
@@ -75,8 +72,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import com.google.common.collect.Lists;
-
class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
@@ -116,7 +111,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
List<Integer> tableIncludedCols, String hostName, ColumnVectorProducer cvp,
ExecutorService executor, InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe,
Reporter reporter, Configuration daemonConf) throws IOException, HiveException {
- MapWork mapWork = findMapWork(job);
+ MapWork mapWork = LlapHiveUtils.findMapWork(job);
if (mapWork == null) return null; // No compatible MapWork.
LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, tableIncludedCols, hostName,
cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf);
@@ -302,37 +297,6 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
return Math.max(bestEffortSize, queueLimitMin);
}
-
- private static MapWork findMapWork(JobConf job) throws HiveException {
- String inputName = job.get(Utilities.INPUT_NAME, null);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initializing for input " + inputName);
- }
- String prefixes = job.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
- if (prefixes != null && !StringUtils.isBlank(prefixes)) {
- // Currently SMB is broken, so we cannot check if it's compatible with IO elevator.
- // So, we don't use the below code that would get the correct MapWork. See HIVE-16985.
- return null;
- }
-
- BaseWork work = null;
- // HIVE-16985: try to find the fake merge work for SMB join, that is really another MapWork.
- if (inputName != null) {
- if (prefixes == null ||
- !Lists.newArrayList(prefixes.split(",")).contains(inputName)) {
- inputName = null;
- }
- }
- if (inputName != null) {
- work = Utilities.getMergeWork(job, inputName);
- }
-
- if (!(work instanceof MapWork)) {
- work = Utilities.getMapWork(job);
- }
- return (MapWork) work;
- }
-
/**
* Starts the data read pipeline
*/
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 2a0c5ca..17c4821 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -88,12 +88,12 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
Consumer<ColumnVectorBatch> consumer, FileSplit split, Includes includes,
SearchArgument sarg, QueryFragmentCounters counters, SchemaEvolutionFactory sef,
InputFormat<?, ?> unused0, Deserializer unused1, Reporter reporter, JobConf job,
- Map<Path, PartitionDesc> unused2) throws IOException {
+ Map<Path, PartitionDesc> parts) throws IOException {
cacheMetrics.incrCacheReadRequests();
OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
consumer, includes, _skipCorrupt, counters, ioMetrics);
OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
- metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool);
+ metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool, parts);
edc.init(reader, reader, reader.getTrace());
return edc;
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 85a42f9..3fcf0dc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -41,7 +43,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
@@ -70,6 +72,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.FixedSizedObjectPool;
@@ -167,7 +170,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private EncodedReader stripeReader;
private CompressionCodec codec;
private Object fileKey;
- private final String cacheTag;
+ private final CacheTag cacheTag;
+ private final Map<Path, PartitionDesc> parts;
private Utilities.SupplierWithCheckedException<FileSystem, IOException> fsSupplier;
@@ -187,7 +191,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
MetadataCache metadataCache, Configuration daemonConf, Configuration jobConf,
FileSplit split, Includes includes, SearchArgument sarg, OrcEncodedDataConsumer consumer,
- QueryFragmentCounters counters, SchemaEvolutionFactory sef, Pool<IoTrace> tracePool)
+ QueryFragmentCounters counters, SchemaEvolutionFactory sef, Pool<IoTrace> tracePool,
+ Map<Path, PartitionDesc> parts)
throws IOException {
this.lowLevelCache = lowLevelCache;
this.metadataCache = metadataCache;
@@ -199,6 +204,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
this.counters = counters;
this.trace = tracePool.take();
this.tracePool = tracePool;
+ this.parts = parts;
try {
this.ugi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
@@ -210,7 +216,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// LlapInputFormat needs to know the file schema to decide if schema evolution is supported.
orcReader = null;
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
- ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
+ ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
fsSupplier = Utilities.getFsSupplier(split.getPath(), jobConf);
@@ -278,7 +284,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
return null;
}
counters.setDesc(QueryFragmentCounters.Desc.TABLE,
- LlapUtil.getDbAndTableNameForMetrics(split.getPath(), false));
+ LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), false, parts));
counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
+ (fileKey == null ? "" : " (" + fileKey + ")"));
try {
@@ -927,7 +933,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges,
- MemoryBuffer[] data, long baseOffset, String tag) {
+ MemoryBuffer[] data, long baseOffset, CacheTag tag) {
if (data != null) {
return lowLevelCache.putFileData(
fileKey, ranges, data, baseOffset, Priority.NORMAL, counters, tag);
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index d414b14..c73ba2c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.llap.io.encoded;
-import org.apache.orc.impl.MemoryManager;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@@ -37,16 +35,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
@@ -89,14 +88,15 @@ import org.apache.hive.common.util.Ref;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
-import org.apache.orc.OrcUtils;
import org.apache.orc.OrcFile.EncodingStrategy;
import org.apache.orc.OrcFile.Version;
import org.apache.orc.OrcProto;
import org.apache.orc.OrcProto.ColumnEncoding;
-import org.apache.orc.TypeDescription;
+import org.apache.orc.OrcUtils;
import org.apache.orc.PhysicalWriter;
import org.apache.orc.PhysicalWriter.OutputReceiver;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.MemoryManager;
import org.apache.orc.impl.SchemaEvolution;
import org.apache.orc.impl.StreamName;
import org.apache.tez.common.CallableWithNdc;
@@ -149,7 +149,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final Map<Path, PartitionDesc> parts;
private final Object fileKey;
- private final String cacheTag;
+ private final CacheTag cacheTag;
private final FileSystem fs;
private AtomicBoolean isStopped = new AtomicBoolean(false);
@@ -219,7 +219,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
!HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
- ? LlapUtil.getDbAndTableNameForMetrics(split.getPath(), true) : null;
+ ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, parts) : null;
this.sourceInputFormat = sourceInputFormat;
this.sourceSerDe = sourceSerDe;
this.reporter = reporter;
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
index 8400fe9..10bd736 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/MetadataCache.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
-import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
import org.apache.hadoop.hive.llap.cache.LlapIoDebugDump;
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.encoded.StoppableAllocator;
public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata =
@@ -173,7 +172,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
@Override
public MemoryBufferOrBuffers putFileMetadata(Object fileKey,
- ByteBuffer tailBuffer, String tag) {
+ ByteBuffer tailBuffer, CacheTag tag) {
return putInternal(fileKey, tailBuffer, tag, null);
}
@@ -184,26 +183,26 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
}
public LlapBufferOrBuffers putStripeTail(
- OrcBatchKey stripeKey, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+ OrcBatchKey stripeKey, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) {
return putInternal(new StripeKey(stripeKey.fileKey, stripeKey.stripeIx), tailBuffer, tag, isStopped);
}
@Override
public MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
- InputStream is, String tag) throws IOException {
+ InputStream is, CacheTag tag) throws IOException {
return putFileMetadata(fileKey, length, is, tag, null);
}
@Override
public LlapBufferOrBuffers putFileMetadata(Object fileKey,
- ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+ ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) {
return putInternal(fileKey, tailBuffer, tag, isStopped);
}
@Override
public LlapBufferOrBuffers putFileMetadata(Object fileKey, int length, InputStream is,
- String tag, AtomicBoolean isStopped) throws IOException {
+ CacheTag tag, AtomicBoolean isStopped) throws IOException {
LlapBufferOrBuffers result = null;
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapBufferOrBuffers oldVal = metadata.get(fileKey);
@@ -229,7 +228,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
@SuppressWarnings({ "rawtypes", "unchecked" })
private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
- Object fileKey, int length, InputStream stream, String tag, AtomicBoolean isStopped) throws IOException {
+ Object fileKey, int length, InputStream stream, CacheTag tag, AtomicBoolean isStopped) throws IOException {
if (result != null) return result;
int maxAlloc = allocator.getMaxAllocation();
LlapMetadataBuffer<Object>[] largeBuffers = null;
@@ -274,7 +273,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
bb.position(pos);
}
- private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+ private <T> LlapBufferOrBuffers putInternal(T key, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) {
LlapBufferOrBuffers result = null;
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapBufferOrBuffers oldVal = metadata.get(key);
@@ -337,7 +336,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
}
private <T> LlapBufferOrBuffers wrapBb(
- LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, String tag, AtomicBoolean isStopped) {
+ LlapBufferOrBuffers result, T key, ByteBuffer tailBuffer, CacheTag tag, AtomicBoolean isStopped) {
if (result != null) return result;
if (tailBuffer.remaining() <= allocator.getMaxAllocation()) {
// The common case by far.
@@ -507,9 +506,9 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
public final static class LlapMetadataBuffer<T>
extends LlapAllocatorBuffer implements LlapBufferOrBuffers {
private final T key;
- private String tag;
+ private CacheTag tag;
- public LlapMetadataBuffer(T key, String tag) {
+ public LlapMetadataBuffer(T key, CacheTag tag) {
this.key = key;
this.tag = tag;
}
@@ -545,7 +544,7 @@ public class MetadataCache implements LlapIoDebugDump, FileMetadataCache {
}
@Override
- public String getTag() {
+ public CacheTag getTag() {
return tag;
}
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
index 30dc1b9..33e1680 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
@@ -123,8 +124,8 @@ public class OrcFileEstimateErrors extends LlapCacheableBuffer {
}
@Override
- public String getTag() {
+ public CacheTag getTag() {
// We don't care about these.
- return "OrcEstimates";
+ return CacheTag.build("OrcEstimates");
}
}
\ No newline at end of file
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
new file mode 100644
index 0000000..1d242e0
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import org.apache.hadoop.hive.common.io.CacheTag;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static java.util.stream.Collectors.toCollection;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for TestCacheContentsTracker functions.
+ */
+public class TestCacheContentsTracker {
+
+ private static CacheContentsTracker tracker;
+
+ @BeforeClass
+ public static void setup() {
+ LowLevelCachePolicy lowLevelCachePolicyMock = mock(LowLevelCachePolicy.class);
+ EvictionListener evictionListenerMock = mock(EvictionListener.class);
+ tracker = new CacheContentsTracker(lowLevelCachePolicyMock);
+ tracker.setEvictionListener(evictionListenerMock);
+ }
+
+ /**
+ * Tests parent CacheTag generation by checking each step when traversing from 3rd level
+ * partition to DB level.
+ */
+ @Test
+ public void testParentCacheTagGeneration() {
+ CacheTag db = cacheTagBuilder("dbname");
+ CacheTag table = cacheTagBuilder("dbname.tablename");
+ CacheTag p = cacheTagBuilder("dbname.tablename", "p=v1");
+ CacheTag pp = cacheTagBuilder("dbname.tablename", "p=v1", "pp=vv1");
+ CacheTag ppp = cacheTagBuilder("dbname.tablename", "p=v1", "pp=vv1", "ppp=vvv1");
+
+ assertTrue(pp.compareTo(CacheTag.createParentCacheTag(ppp)) == 0);
+ assertTrue(p.compareTo(CacheTag.createParentCacheTag(pp)) == 0);
+ assertTrue(table.compareTo(CacheTag.createParentCacheTag(p)) == 0);
+ assertTrue(db.compareTo(CacheTag.createParentCacheTag(table)) == 0);
+ assertNull(CacheTag.createParentCacheTag(db));
+ }
+
+ /**
+ * Caches some mock buffers and checks summary produced by CacheContentsTracker. Later this is
+ * done again after some mock buffers were evicted.
+ */
+ @Test
+ public void testAggregatedStatsGeneration() {
+ cacheTestBuffers();
+ StringBuilder sb = new StringBuilder();
+ tracker.debugDumpShort(sb);
+ assertEquals(EXPECTED_CACHE_STATE_WHEN_FULL, sb.toString());
+
+ evictSomeTestBuffers();
+ sb = new StringBuilder();
+ tracker.debugDumpShort(sb);
+ assertEquals(EXPECTED_CACHE_STATE_AFTER_EVICTION, sb.toString());
+ }
+
+ private static LlapCacheableBuffer createMockBuffer(long size, CacheTag cacheTag) {
+ LlapCacheableBuffer llapCacheableBufferMock = mock(LlapCacheableBuffer.class);
+
+ doAnswer(invocationOnMock -> {
+ return size;
+ }).when(llapCacheableBufferMock).getMemoryUsage();
+
+ doAnswer(invocationOnMock -> {
+ return cacheTag;
+ }).when(llapCacheableBufferMock).getTag();
+
+ return llapCacheableBufferMock;
+ }
+
+ private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) {
+ if (partitions != null && partitions.length > 0) {
+ LinkedList<String> parts = Arrays.stream(partitions).collect(toCollection(LinkedList::new));
+ return CacheTag.build(dbAndTable, parts);
+ } else {
+ return CacheTag.build(dbAndTable);
+ }
+ }
+
+ private static void cacheTestBuffers() {
+ tracker.cache(createMockBuffer(4 * 1024L,
+ cacheTagBuilder("default.testtable")), null);
+ tracker.cache(createMockBuffer(2 * 1024L,
+ cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1")), null);
+ tracker.cache(createMockBuffer(32 * 1024L,
+ cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1")), null);
+ tracker.cache(createMockBuffer(64 * 1024L,
+ cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv2")), null);
+ tracker.cache(createMockBuffer(128 * 1024L,
+ cacheTagBuilder("otherdb.testtable", "p=v2", "pp=vv1")), null);
+ tracker.cache(createMockBuffer(256 * 1024L,
+ cacheTagBuilder("otherdb.testtable2", "p=v3")), null);
+ tracker.cache(createMockBuffer(512 * 1024 * 1024L,
+ cacheTagBuilder("otherdb.testtable2", "p=v3")), null);
+ tracker.cache(createMockBuffer(1024 * 1024 * 1024L,
+ cacheTagBuilder("otherdb.testtable3")), null);
+ tracker.cache(createMockBuffer(2 * 1024 * 1024L,
+ cacheTagBuilder("default.testtable")), null);
+ }
+
+ private static void evictSomeTestBuffers() {
+ tracker.notifyEvicted(createMockBuffer(32 * 1024L,
+ cacheTagBuilder("otherdb.testtable", "p=v1", "pp=vv1")));
+ tracker.notifyEvicted(createMockBuffer(512 * 1024 * 1024L,
+ cacheTagBuilder("otherdb.testtable2", "p=v3")));
+ tracker.notifyEvicted(createMockBuffer(2 * 1024 * 1024L,
+ cacheTagBuilder("default.testtable")));
+ tracker.notifyEvicted(createMockBuffer(4 * 1024L,
+ cacheTagBuilder("default.testtable")));
+ }
+
+ private static final String EXPECTED_CACHE_STATE_WHEN_FULL =
+ "\n" +
+ "Cache state: \n" +
+ "default : 2/2, 2101248/2101248\n" +
+ "default.testtable : 2/2, 2101248/2101248\n" +
+ "otherdb : 7/7, 1611106304/1611106304\n" +
+ "otherdb.testtable : 4/4, 231424/231424\n" +
+ "otherdb.testtable/p=v1 : 3/3, 100352/100352\n" +
+ "otherdb.testtable/p=v1/pp=vv1 : 2/2, 34816/34816\n" +
+ "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
+ "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
+ "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
+ "otherdb.testtable2 : 2/2, 537133056/537133056\n" +
+ "otherdb.testtable2/p=v3 : 2/2, 537133056/537133056\n" +
+ "otherdb.testtable3 : 1/1, 1073741824/1073741824";
+
+ private static final String EXPECTED_CACHE_STATE_AFTER_EVICTION =
+ "\n" +
+ "Cache state: \n" +
+ "default : 0/2, 0/2101248\n" +
+ "default.testtable : 0/2, 0/2101248\n" +
+ "otherdb : 5/7, 1074202624/1611106304\n" +
+ "otherdb.testtable : 3/4, 198656/231424\n" +
+ "otherdb.testtable/p=v1 : 2/3, 67584/100352\n" +
+ "otherdb.testtable/p=v1/pp=vv1 : 1/2, 2048/34816\n" +
+ "otherdb.testtable/p=v1/pp=vv2 : 1/1, 65536/65536\n" +
+ "otherdb.testtable/p=v2 : 1/1, 131072/131072\n" +
+ "otherdb.testtable/p=v2/pp=vv1 : 1/1, 131072/131072\n" +
+ "otherdb.testtable2 : 1/2, 262144/537133056\n" +
+ "otherdb.testtable2/p=v3 : 1/2, 262144/537133056\n" +
+ "otherdb.testtable3 : 1/1, 1073741824/1073741824";
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
index af04a51..c887394 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -61,7 +62,7 @@ public class LlapCacheAwareFs extends FileSystem {
new ConcurrentHashMap<>();
public static Path registerFile(DataCache cache, Path path, Object fileKey,
- TreeMap<Long, Long> index, Configuration conf, String tag) throws IOException {
+ TreeMap<Long, Long> index, Configuration conf, CacheTag tag) throws IOException {
long splitId = currentSplitId.incrementAndGet();
CacheAwareInputStream stream = new CacheAwareInputStream(
cache, conf, index, path, fileKey, -1, tag);
@@ -170,14 +171,14 @@ public class LlapCacheAwareFs extends FileSystem {
private final TreeMap<Long, Long> chunkIndex;
private final Path path;
private final Object fileKey;
- private final String tag;
+ private final CacheTag tag;
private final Configuration conf;
private final DataCache cache;
private final int bufferSize;
private long position = 0;
public CacheAwareInputStream(DataCache cache, Configuration conf,
- TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize, String tag) {
+ TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize, CacheTag tag) {
this.cache = cache;
this.fileKey = fileKey;
this.chunkIndex = chunkIndex;
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
new file mode 100644
index 0000000..a041426
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.google.common.collect.Lists;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Covers utility functions that are used by LLAP code and depend on Hive constructs e.g. ql code.
+ */
+public final class LlapHiveUtils {
+
+ public static final Logger LOG = LoggerFactory.getLogger(LlapHiveUtils.class);
+
+ private LlapHiveUtils() {
+ // Not to be used;
+ }
+
+ public static CacheTag getDbAndTableNameForMetrics(Path path, boolean includeParts,
+ Map<Path, PartitionDesc> parts) {
+
+ assert(parts != null);
+
+ // Look for PartitionDesc instance matching our Path
+ Path parentPath = path;
+ PartitionDesc part = parts.get(parentPath);
+ while (!parentPath.isRoot() && part == null) {
+ parentPath = parentPath.getParent();
+ part = parts.get(parentPath);
+ }
+
+ // Fallback to legacy cache tag creation logic.
+ if (part == null) {
+ return CacheTag.build(LlapUtil.getDbAndTableNameForMetrics(path, includeParts));
+ }
+
+ if (!includeParts || !part.isPartitioned()) {
+ return CacheTag.build(part.getTableName());
+ } else {
+ return CacheTag.build(part.getTableName(), part.getPartSpec());
+ }
+ }
+
+ /**
+ * Returns MapWork based what is serialized in the JobConf instance provided.
+ * @param job
+ * @return the MapWork instance. Might be null if missing.
+ * @throws HiveException
+ */
+ public static MapWork findMapWork(JobConf job) throws HiveException {
+ String inputName = job.get(Utilities.INPUT_NAME, null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initializing for input " + inputName);
+ }
+ String prefixes = job.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+ if (prefixes != null && !StringUtils.isBlank(prefixes)) {
+ // Currently SMB is broken, so we cannot check if it's compatible with IO elevator.
+ // So, we don't use the below code that would get the correct MapWork. See HIVE-16985.
+ return null;
+ }
+
+ BaseWork work = null;
+ // HIVE-16985: try to find the fake merge work for SMB join, that is really another MapWork.
+ if (inputName != null) {
+ if (prefixes == null ||
+ !Lists.newArrayList(prefixes.split(",")).contains(inputName)) {
+ inputName = null;
+ }
+ }
+ if (inputName != null) {
+ work = Utilities.getMergeWork(job, inputName);
+ }
+
+ if (!(work instanceof MapWork)) {
+ work = Utilities.getMapWork(job);
+ }
+ return (MapWork) work;
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 241a300..117e4e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
@@ -148,14 +149,14 @@ class EncodedReaderImpl implements EncodedReader {
private final IoTrace trace;
private final TypeDescription fileSchema;
private final WriterVersion version;
- private final String tag;
+ private final CacheTag tag;
private AtomicBoolean isStopped;
private StoppableAllocator allocator;
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException {
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException {
this.fileKey = fileKey;
this.compressionKind = kind;
this.isCompressed = kind != org.apache.orc.CompressionKind.NONE;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 210c987..8d3336f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
@@ -46,7 +47,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
* @return The reader.
*/
EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException;
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException;
/** The factory that can create (or return) the pools used by encoded reader. */
public interface PoolFactory {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index a9a9f10..e137c24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
@@ -35,7 +36,7 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
@Override
public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool, String tag) throws IOException {
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException {
return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(),
bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 61e2556..ea6dfb8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -22,13 +22,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.FileMetadataCache;
import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapCacheAwareFs;
-import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -37,6 +38,9 @@ import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
@@ -78,6 +82,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -104,6 +109,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private Object[] partitionValues;
private Path cacheFsPath;
private static final int MAP_DEFINITION_LEVEL_MAX = 3;
+ private Map<Path, PartitionDesc> parts;
/**
* For each request column, the reader to read this column. This is NULL if this column
@@ -170,13 +176,19 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
@SuppressWarnings("deprecation")
public void initialize(
InputSplit oldSplit,
- JobConf configuration) throws IOException, InterruptedException {
+ JobConf configuration) throws IOException, InterruptedException, HiveException {
// the oldSplit may be null during the split phase
if (oldSplit == null) {
return;
}
ParquetMetadata footer;
List<BlockMetaData> blocks;
+
+ MapWork mapWork = LlapHiveUtils.findMapWork(jobConf);
+ if (mapWork != null) {
+ parts = mapWork.getPathToPartitionInfo();
+ }
+
ParquetInputSplit split = (ParquetInputSplit) oldSplit;
boolean indexAccess =
configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false);
@@ -190,7 +202,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
// if task.side.metadata is set, rowGroupOffsets is null
Object cacheKey = null;
- String cacheTag = null;
+ CacheTag cacheTag = null;
// TODO: also support fileKey in splits, like OrcSplit does
if (metadataCache != null) {
cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file,
@@ -200,7 +212,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
}
if (cacheKey != null) {
if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) {
- cacheTag = LlapUtil.getDbAndTableNameForMetrics(file, true);
+ cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(file, true, parts);
}
// If we are going to use cache, change the path to depend on file ID for extra consistency.
FileSystem fs = file.getFileSystem(configuration);
@@ -265,7 +277,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
}
private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration,
- List<BlockMetaData> blocks, String tag) throws IOException {
+ List<BlockMetaData> blocks, CacheTag tag) throws IOException {
if (fileKey == null || cache == null) {
return path;
}
@@ -292,7 +304,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
}
private ParquetMetadata readSplitFooter(JobConf configuration, final Path file,
- Object cacheKey, MetadataFilter filter, String tag) throws IOException {
+ Object cacheKey, MetadataFilter filter, CacheTag tag) throws IOException {
MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null
: metadataCache.getFileMetadata(cacheKey);
if (footerData != null) {
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
new file mode 100644
index 0000000..623c181
--- /dev/null
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.io;
+
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Used for identifying the related object of the buffer stored in cache.
+ * Comes in 3 flavours to optimize for minimal memory overhead:
+ * - TableCacheTag for tables without partitions: DB/table level
+ * - SinglePartitionCacheTag for tables with 1 partition level: DB/table/1st_partition
+ * - MultiPartitionCacheTag for tables with >1 partition levels:
+ * DB/table/1st_partition/.../nth_partition .
+ */
+public abstract class CacheTag implements Comparable<CacheTag> {
+ /**
+ * Prepended by DB name and '.' .
+ */
+ protected final String tableName;
+
+ private CacheTag(String tableName) {
+ this.tableName = tableName.intern();
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public int compareTo(CacheTag o) {
+ return tableName.compareTo(o.tableName);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof CacheTag)) {
+ return false;
+ } else {
+ return this.compareTo((CacheTag) obj) == 0;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int res = tableName.hashCode();
+ return res;
+ }
+
+ public static final CacheTag build(String tableName) {
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException();
+ }
+ return new TableCacheTag(tableName);
+ }
+
+ public static final CacheTag build(String tableName, Map<String, String> partDescMap) {
+ if (StringUtils.isEmpty(tableName) || partDescMap == null || partDescMap.isEmpty()) {
+ throw new IllegalArgumentException();
+ }
+
+ LinkedList<String> partDescList = new LinkedList<>();
+
+ for (Map.Entry<String, String> entry : partDescMap.entrySet()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(entry.getKey()).append("=").append(entry.getValue());
+ partDescList.add(sb.toString());
+ }
+
+ if (partDescList.size() == 1) {
+ return new SinglePartitionCacheTag(tableName, partDescList.get(0));
+ } else {
+ // In this case it must be >1
+ return new MultiPartitionCacheTag(tableName, partDescList);
+ }
+ }
+
+ // Assumes elements of partDescList are already in p1=v1 format
+ public static final CacheTag build(String tableName, LinkedList<String> partDescList) {
+ if (StringUtils.isEmpty(tableName) || partDescList == null || partDescList.isEmpty()) {
+ throw new IllegalArgumentException();
+ }
+
+ if (partDescList.size() == 1) {
+ return new SinglePartitionCacheTag(tableName, partDescList.get(0));
+ } else {
+ // In this case it must be >1
+ return new MultiPartitionCacheTag(tableName, partDescList);
+ }
+ }
+
+ /**
+ * Constructs a (fake) parent CacheTag instance by walking back in the hierarchy i.e. stepping
+ * from inner to outer partition levels, then producing a CacheTag for the table and finally
+ * the DB.
+ */
+ public static final CacheTag createParentCacheTag(CacheTag tag) {
+ if (tag == null) {
+ throw new IllegalArgumentException();
+ }
+
+ if (tag instanceof MultiPartitionCacheTag) {
+ MultiPartitionCacheTag multiPartitionCacheTag = (MultiPartitionCacheTag) tag;
+ if (multiPartitionCacheTag.partitionDesc.size() > 2) {
+ LinkedList<String> subList = new LinkedList<>(multiPartitionCacheTag.partitionDesc);
+ subList.removeLast();
+ return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName, subList);
+ } else {
+ return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName,
+ multiPartitionCacheTag.partitionDesc.get(0));
+ }
+ }
+
+ if (tag instanceof SinglePartitionCacheTag) {
+ return new TableCacheTag(tag.tableName);
+ } else {
+ // DB level
+ int ix = tag.tableName.indexOf(".");
+ if (ix <= 0) {
+ return null;
+ }
+ return new TableCacheTag(tag.tableName.substring(0, ix));
+ }
+
+ }
+
+ /**
+ * CacheTag for tables without partitions.
+ */
+ public static final class TableCacheTag extends CacheTag {
+
+ private TableCacheTag(String tableName) {
+ super(tableName);
+ }
+
+ @Override
+ public int compareTo(CacheTag o) {
+ if (o instanceof SinglePartitionCacheTag || o instanceof MultiPartitionCacheTag) {
+ return -1;
+ } else {
+ return super.compareTo(o);
+ }
+ }
+
+ }
+
+ /**
+ * CacheTag for tables with partitions.
+ */
+ public abstract static class PartitionCacheTag extends CacheTag {
+
+ private PartitionCacheTag(String tableName) {
+ super(tableName);
+ }
+
+ /**
+ * Returns a pretty printed String version of the partitionDesc in the format of p1=v1/p2=v2...
+ * @return the pretty printed String
+ */
+ public abstract String partitionDescToString();
+
+ }
+
+ /**
+ * CacheTag for tables with exactly one partition level.
+ */
+ public static final class SinglePartitionCacheTag extends PartitionCacheTag {
+
+ private final String partitionDesc;
+
+ private SinglePartitionCacheTag(String tableName, String partitionDesc) {
+ super(tableName);
+ if (StringUtils.isEmpty(partitionDesc)) {
+ throw new IllegalArgumentException();
+ }
+ this.partitionDesc = partitionDesc.intern();
+ }
+
+ @Override
+ public String partitionDescToString() {
+ return this.partitionDesc;
+ }
+
+ @Override
+ public int compareTo(CacheTag o) {
+ if (o instanceof TableCacheTag) {
+ return 1;
+ } else if (o instanceof MultiPartitionCacheTag) {
+ return -1;
+ }
+ SinglePartitionCacheTag other = (SinglePartitionCacheTag) o;
+ return super.compareTo(o) +
+ partitionDesc.toString().compareTo(other.partitionDesc.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() + partitionDesc.hashCode();
+ }
+ }
+
+ /**
+ * CacheTag for tables with more than one partition level.
+ */
+ public static final class MultiPartitionCacheTag extends PartitionCacheTag {
+
+ private final LinkedList<String> partitionDesc;
+
+ private MultiPartitionCacheTag(String tableName, LinkedList<String> partitionDesc) {
+ super(tableName);
+ this.partitionDesc = partitionDesc;
+ if (this.partitionDesc != null && this.partitionDesc.size() > 1) {
+ this.partitionDesc.stream().forEach(p -> p.intern());
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @Override
+ public int compareTo(CacheTag o) {
+ if (o instanceof TableCacheTag || o instanceof SinglePartitionCacheTag) {
+ return 1;
+ }
+ MultiPartitionCacheTag other = (MultiPartitionCacheTag) o;
+ return super.compareTo(o) +
+ partitionDesc.toString().compareTo(other.partitionDesc.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ int res = super.hashCode();
+ for (String p : partitionDesc) {
+ res += p.hashCode();
+ }
+ return res;
+ }
+
+ @Override
+ public String partitionDescToString() {
+ return String.join("/", this.partitionDesc);
+ }
+
+ }
+
+}
+
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
index 2ac0a18..9b23a71 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/DataCache.java
@@ -110,5 +110,5 @@ public interface DataCache {
* the replacement chunks from cache are updated directly in the array.
*/
long[] putFileData(Object fileKey, DiskRange[] ranges,
- MemoryBuffer[] data, long baseOffset, String tag);
+ MemoryBuffer[] data, long baseOffset, CacheTag tag);
}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
index d7de361..4691722 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/FileMetadataCache.java
@@ -42,10 +42,10 @@ public interface FileMetadataCache {
@Deprecated
MemoryBufferOrBuffers putFileMetadata(
- Object fileKey, int length, InputStream is, String tag) throws IOException;
+ Object fileKey, int length, InputStream is, CacheTag tag) throws IOException;
@Deprecated
- MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, String tag);
+ MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer, CacheTag tag);
/**
* Releases the buffer returned from getFileMetadata or putFileMetadata method.
@@ -61,8 +61,8 @@ public interface FileMetadataCache {
* The caller must decref this buffer when done.
*/
MemoryBufferOrBuffers putFileMetadata(Object fileKey, ByteBuffer tailBuffer,
- String tag, AtomicBoolean isStopped);
+ CacheTag tag, AtomicBoolean isStopped);
MemoryBufferOrBuffers putFileMetadata(Object fileKey, int length,
- InputStream is, String tag, AtomicBoolean isStopped) throws IOException;
+ InputStream is, CacheTag tag, AtomicBoolean isStopped) throws IOException;
}