You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2020/02/24 10:21:17 UTC
[hive] branch master updated: HIVE-22721: Add option for queries to
only read from LLAP cache (Adam Szita, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new bc4cc9b HIVE-22721: Add option for queries to only read from LLAP cache (Adam Szita, reviewed by Peter Vary)
bc4cc9b is described below
commit bc4cc9b448913d07148307c836c543212e866659
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Thu Jan 9 16:32:23 2020 +0100
HIVE-22721: Add option for queries to only read from LLAP cache (Adam Szita, reviewed by Peter Vary)
---
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 2 ++
.../apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java | 8 +++++++-
.../hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java | 7 +++++++
ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java | 9 +++++++++
.../apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java | 9 ++++++++-
ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java | 2 +-
.../org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java | 4 ++--
.../hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java | 5 +++++
8 files changed, 41 insertions(+), 5 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c51b367..583603c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4120,6 +4120,8 @@ public class HiveConf extends Configuration {
"Value is an integer. Default value is -1, which means that we will estimate this value from operators in the plan."),
// The default is different on the client and server, so it's null here.
LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."),
+ LLAP_IO_CACHE_ONLY("hive.llap.io.cache.only", false, "Whether the query should read from cache only. If set to " +
+ "true and a cache miss happens during the read an exception will occur. Primarily used for testing."),
LLAP_IO_ROW_WRAPPER_ENABLED("hive.llap.io.row.wrapper.enabled", true, "Whether the LLAP IO row wrapper is enabled for non-vectorized queries."),
LLAP_IO_ACID_ENABLED("hive.llap.io.acid", true, "Whether the LLAP IO layer is enabled for ACID."),
LLAP_IO_TRACE_SIZE("hive.llap.io.trace.size", "2Mb",
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 92df717..c90ff52 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -106,6 +106,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
+
/**
* This produces EncodedColumnBatch via ORC EncodedDataImpl.
* It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where
@@ -173,6 +175,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private Object fileKey;
private final CacheTag cacheTag;
private final Map<Path, PartitionDesc> parts;
+ private final boolean isReadCacheOnly;
private Supplier<FileSystem> fsSupplier;
@@ -247,6 +250,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED).equalsIgnoreCase("decimal_64"));
consumer.setFileMetadata(fileMetadata);
consumer.setSchemaEvolution(evolution);
+ isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
}
@Override
@@ -463,7 +467,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// Reader creation updates HDFS counters, don't do it here.
DataWrapperForOrc dw = new DataWrapperForOrc();
stripeReader = orcReader.encodedReader(
- fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag);
+ fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag, isReadCacheOnly);
stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
stripeReader.setStopped(isStopped);
}
@@ -611,6 +615,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
}
counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
+ throwIfCacheOnlyRead(isReadCacheOnly);
}
ensureOrcReader();
ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
@@ -692,6 +697,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
}
counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
+ throwIfCacheOnlyRead(isReadCacheOnly);
}
long offset = si.getOffset() + si.getIndexLength() + si.getDataLength();
long startTime = counters.startTimeCounter();
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index c73ba2c..c9e9c02 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -105,6 +105,8 @@ import org.apache.tez.common.counters.TezCounters;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
+
public class SerDeEncodedDataReader extends CallableWithNdc<Void>
implements ConsumerFeedback<OrcEncodedColumnBatch>, TezCounterSource {
@@ -165,6 +167,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final boolean[] writerIncludes;
private FileReaderYieldReturn currentFileRead = null;
+ private final boolean isReadCacheOnly;
/**
* Data from cache currently being processed. We store it here so that we could decref
@@ -232,6 +235,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
SchemaEvolution evolution = new SchemaEvolution(schema, null,
new Reader.Options(jobConf).include(writerIncludes));
consumer.setSchemaEvolution(evolution);
+ isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
}
private static int determineAllocSize(BufferUsageManager bufferManager, Configuration conf) {
@@ -810,6 +814,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
long endOfSplit = split.getStart() + split.getLength();
this.cachedData = cache.getFileData(fileKey, split.getStart(),
endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
+ if (!gotAllData.value) {
+ throwIfCacheOnlyRead(isReadCacheOnly);
+ }
if (cachedData == null) {
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache");
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
index a041426..9ad1486 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hive.llap;
+import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -107,4 +109,11 @@ public final class LlapHiveUtils {
return (MapWork) work;
}
+ public static void throwIfCacheOnlyRead(boolean isCacheOnlyRead) throws IOException {
+ if (isCacheOnlyRead) {
+ throw new IOException("LLAP cache miss happened while reading. Aborting query as cache only reading is set. " +
+ "Set " + HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname + " to false and repeat query if this was unintended.");
+ }
+ }
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 117e4e6..1f5a9d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -75,6 +75,8 @@ import com.google.protobuf.CodedInputStream;
import sun.misc.Cleaner;
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
+
/**
* Encoded reader implementation.
@@ -152,11 +154,12 @@ class EncodedReaderImpl implements EncodedReader {
private final CacheTag tag;
private AtomicBoolean isStopped;
private StoppableAllocator allocator;
+ private final boolean isReadCacheOnly;
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException {
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException {
this.fileKey = fileKey;
this.compressionKind = kind;
this.isCompressed = kind != org.apache.orc.CompressionKind.NONE;
@@ -173,6 +176,7 @@ class EncodedReaderImpl implements EncodedReader {
this.dataReader = dataReader;
this.trace = trace;
this.tag = tag;
+ this.isReadCacheOnly = isReadCacheOnly;
if (POOLS != null) return;
if (pf == null) {
pf = new NoopPoolFactory();
@@ -600,6 +604,9 @@ class EncodedReaderImpl implements EncodedReader {
BooleanRef isAllInCache = new BooleanRef();
if (hasFileId) {
cacheWrapper.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
+ if (!isAllInCache.value) {
+ throwIfCacheOnlyRead(isReadCacheOnly);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Disk ranges after cache (found everything " + isAllInCache.value + "; file "
+ fileKey + ", base offset " + stripeOffset + "): "
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 8d3336f..d571248 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -47,7 +47,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
* @return The reader.
*/
EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException;
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException;
/** The factory that can create (or return) the pools used by encoded reader. */
public interface PoolFactory {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index e137c24..ac34585 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -36,8 +36,8 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
@Override
public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
- PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException {
+ PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException {
return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(),
- bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag);
+ bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag, isReadCacheOnly);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 2104746..f4a26ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -86,6 +86,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
@@ -110,6 +111,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private Path cacheFsPath;
private static final int MAP_DEFINITION_LEVEL_MAX = 3;
private Map<Path, PartitionDesc> parts;
+ private final boolean isReadCacheOnly;
/**
* For each request column, the reader to read this column. This is NULL if this column
@@ -151,6 +153,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
//initialize the rowbatchContext
jobConf = conf;
+ isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf);
ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
if (inputSplit != null) {
@@ -316,6 +319,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
} finally {
metadataCache.decRefBuffer(footerData);
}
+ } else {
+ throwIfCacheOnlyRead(isReadCacheOnly);
}
final FileSystem fs = file.getFileSystem(configuration);
final FileStatus stat = fs.getFileStatus(file);