You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2020/02/24 10:21:17 UTC

[hive] branch master updated: HIVE-22721: Add option for queries to only read from LLAP cache (Adam Szita, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bc4cc9b  HIVE-22721: Add option for queries to only read from LLAP cache (Adam Szita, reviewed by Peter Vary)
bc4cc9b is described below

commit bc4cc9b448913d07148307c836c543212e866659
Author: Adam Szita <sz...@cloudera.com>
AuthorDate: Thu Jan 9 16:32:23 2020 +0100

    HIVE-22721: Add option for queries to only read from LLAP cache (Adam Szita, reviewed by Peter Vary)
---
 common/src/java/org/apache/hadoop/hive/conf/HiveConf.java        | 2 ++
 .../apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java | 8 +++++++-
 .../hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java      | 7 +++++++
 ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java       | 9 +++++++++
 .../apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java  | 9 ++++++++-
 ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java | 2 +-
 .../org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java     | 4 ++--
 .../hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java | 5 +++++
 8 files changed, 41 insertions(+), 5 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c51b367..583603c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4120,6 +4120,8 @@ public class HiveConf extends Configuration {
         "Value is an integer. Default value is -1, which means that we will estimate this value from operators in the plan."),
     // The default is different on the client and server, so it's null here.
     LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."),
+    LLAP_IO_CACHE_ONLY("hive.llap.io.cache.only", false, "Whether the query should read from cache only. If set to " +
+        "true and a cache miss happens during the read an exception will occur. Primarily used for testing."),
     LLAP_IO_ROW_WRAPPER_ENABLED("hive.llap.io.row.wrapper.enabled", true, "Whether the LLAP IO row wrapper is enabled for non-vectorized queries."),
     LLAP_IO_ACID_ENABLED("hive.llap.io.acid", true, "Whether the LLAP IO layer is enabled for ACID."),
     LLAP_IO_TRACE_SIZE("hive.llap.io.trace.size", "2Mb",
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 92df717..c90ff52 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -106,6 +106,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
+
 /**
  * This produces EncodedColumnBatch via ORC EncodedDataImpl.
  * It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where
@@ -173,6 +175,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private Object fileKey;
   private final CacheTag cacheTag;
   private final Map<Path, PartitionDesc> parts;
+  private final boolean isReadCacheOnly;
 
   private Supplier<FileSystem> fsSupplier;
 
@@ -247,6 +250,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED).equalsIgnoreCase("decimal_64"));
     consumer.setFileMetadata(fileMetadata);
     consumer.setSchemaEvolution(evolution);
+    isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
   }
 
   @Override
@@ -463,7 +467,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     // Reader creation updates HDFS counters, don't do it here.
     DataWrapperForOrc dw = new DataWrapperForOrc();
     stripeReader = orcReader.encodedReader(
-        fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag);
+        fileKey, dw, dw, useObjectPools ? POOL_FACTORY : null, trace, useCodecPool, cacheTag, isReadCacheOnly);
     stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
     stripeReader.setStopped(isStopped);
   }
@@ -611,6 +615,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         }
       }
       counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
+      throwIfCacheOnlyRead(isReadCacheOnly);
     }
     ensureOrcReader();
     ByteBuffer tailBufferBb = orcReader.getSerializedFileFooter();
@@ -692,6 +697,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         }
       }
       counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
+      throwIfCacheOnlyRead(isReadCacheOnly);
     }
     long offset = si.getOffset() + si.getIndexLength() + si.getDataLength();
     long startTime = counters.startTimeCounter();
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index c73ba2c..c9e9c02 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -105,6 +105,8 @@ import org.apache.tez.common.counters.TezCounters;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
+
 public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     implements ConsumerFeedback<OrcEncodedColumnBatch>, TezCounterSource {
 
@@ -165,6 +167,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
   private final boolean[] writerIncludes;
   private FileReaderYieldReturn currentFileRead = null;
+  private final boolean isReadCacheOnly;
 
   /**
    * Data from cache currently being processed. We store it here so that we could decref
@@ -232,6 +235,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     SchemaEvolution evolution = new SchemaEvolution(schema, null,
         new Reader.Options(jobConf).include(writerIncludes));
     consumer.setSchemaEvolution(evolution);
+    isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
   }
 
   private static int determineAllocSize(BufferUsageManager bufferManager, Configuration conf) {
@@ -810,6 +814,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
     long endOfSplit = split.getStart() + split.getLength();
     this.cachedData = cache.getFileData(fileKey, split.getStart(),
         endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
+    if (!gotAllData.value) {
+      throwIfCacheOnlyRead(isReadCacheOnly);
+    }
     if (cachedData == null) {
       if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
         LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache");
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
index a041426..9ad1486 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hive.llap;
 
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -107,4 +109,11 @@ public final class LlapHiveUtils {
     return (MapWork) work;
   }
 
+  public static void throwIfCacheOnlyRead(boolean isCacheOnlyRead) throws IOException {
+    if (isCacheOnlyRead) {
+      throw new IOException("LLAP cache miss happened while reading. Aborting query as cache only reading is set. " +
+          "Set " + HiveConf.ConfVars.LLAP_IO_CACHE_ONLY.varname + " to false and repeat query if this was unintended.");
+    }
+  }
+
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 117e4e6..1f5a9d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -75,6 +75,8 @@ import com.google.protobuf.CodedInputStream;
 
 import sun.misc.Cleaner;
 
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
+
 
 /**
  * Encoded reader implementation.
@@ -152,11 +154,12 @@ class EncodedReaderImpl implements EncodedReader {
   private final CacheTag tag;
   private AtomicBoolean isStopped;
   private StoppableAllocator allocator;
+  private final boolean isReadCacheOnly;
 
   public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
       TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
       int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException {
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException {
     this.fileKey = fileKey;
     this.compressionKind = kind;
     this.isCompressed = kind != org.apache.orc.CompressionKind.NONE;
@@ -173,6 +176,7 @@ class EncodedReaderImpl implements EncodedReader {
     this.dataReader = dataReader;
     this.trace = trace;
     this.tag = tag;
+    this.isReadCacheOnly = isReadCacheOnly;
     if (POOLS != null) return;
     if (pf == null) {
       pf = new NoopPoolFactory();
@@ -600,6 +604,9 @@ class EncodedReaderImpl implements EncodedReader {
     BooleanRef isAllInCache = new BooleanRef();
     if (hasFileId) {
       cacheWrapper.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
+      if (!isAllInCache.value) {
+        throwIfCacheOnlyRead(isReadCacheOnly);
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Disk ranges after cache (found everything " + isAllInCache.value + "; file "
             + fileKey + ", base offset " + stripeOffset  + "): "
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 8d3336f..d571248 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -47,7 +47,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
    * @return The reader.
    */
   EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException;
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException;
 
   /** The factory that can create (or return) the pools used by encoded reader. */
   public interface PoolFactory {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index e137c24..ac34585 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -36,8 +36,8 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
 
   @Override
   public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
-      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag) throws IOException {
+      PoolFactory pf, IoTrace trace, boolean useCodecPool, CacheTag tag, boolean isReadCacheOnly) throws IOException {
     return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(),
-        bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag);
+        bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool, tag, isReadCacheOnly);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 2104746..f4a26ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -86,6 +86,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
@@ -110,6 +111,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private Path cacheFsPath;
   private static final int MAP_DEFINITION_LEVEL_MAX = 3;
   private Map<Path, PartitionDesc> parts;
+  private final boolean isReadCacheOnly;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -151,6 +153,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
       //initialize the rowbatchContext
       jobConf = conf;
+      isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
       rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf);
       ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
       if (inputSplit != null) {
@@ -316,6 +319,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       } finally {
         metadataCache.decRefBuffer(footerData);
       }
+    } else {
+      throwIfCacheOnlyRead(isReadCacheOnly);
     }
     final FileSystem fs = file.getFileSystem(configuration);
     final FileStatus stat = fs.getFileStatus(file);