You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/04/21 23:55:38 UTC
[hive] branch master updated: HIVE-23169 : Probe runtime support
for LLAP (Panagiotis Garefalakis via Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 8b9fadb HIVE-23169 : Probe runtime support for LLAP (Panagiotis Garefalakis via Ashutosh Chauhan)
8b9fadb is described below
commit 8b9fadb5515aace73db5068cc81317b6f10e0f32
Author: Ashutosh Chauhan <ha...@apache.org>
AuthorDate: Tue Apr 21 16:54:58 2020 -0700
HIVE-23169 : Probe runtime support for LLAP (Panagiotis Garefalakis via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hive/llap/io/api/impl/LlapRecordReader.java | 51 ++++++++++++++++++++++
.../hive/llap/io/decode/ColumnVectorProducer.java | 6 +++
.../llap/io/decode/OrcEncodedDataConsumer.java | 4 ++
3 files changed, 61 insertions(+)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index acb6b2d..417a42a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -82,6 +85,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
private final SearchArgument sarg;
private final VectorizedRowBatchCtx rbCtx;
private final boolean isVectorized;
+ private final boolean probeDecodeEnabled;
private VectorizedOrcAcidRowBatchReader acidReader;
private final Object[] partitionValues;
@@ -196,6 +200,12 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
this.includes = new IncludesImpl(tableIncludedCols, isAcidFormat, rbCtx,
schema, job, isAcidScan && acidReader.includeAcidColumns());
+ this.probeDecodeEnabled = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_OPTIMIZE_SCAN_PROBEDECODE);
+ if (this.probeDecodeEnabled) {
+ includes.setProbeDecodeContext(mapWork.getProbeDecodeContext());
+ LOG.info("LlapRecordReader ProbeDecode is enabled");
+ }
+
// Create the consumer of encoded data; it will coordinate decoding to CVBs.
feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes,
sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo());
@@ -629,6 +639,9 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
private TypeDescription readerSchema;
private JobConf jobConf;
+ // ProbeDecode Context for row-level filtering
+ private TableScanOperator.ProbeDecodeContext probeDecodeContext = null;
+
public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan,
VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema,
JobConf jobConf, boolean includeAcidColumns) {
@@ -710,6 +723,10 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
fileSchema, filePhysicalColumnIds, acidStructColumnId);
}
+ public void setProbeDecodeContext(TableScanOperator.ProbeDecodeContext currProbeDecodeContext) {
+ this.probeDecodeContext = currProbeDecodeContext;
+ }
+
@Override
public List<Integer> getPhysicalColumnIds() {
return filePhysicalColumnIds;
@@ -725,5 +742,39 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
return OrcInputFormat.genIncludedTypes(
fileSchema, filePhysicalColumnIds, acidStructColumnId);
}
+
+ @Override
+ public String getQueryId() {
+ return HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID);
+ }
+
+ @Override
+ public boolean isProbeDecodeEnabled() {
+ return this.probeDecodeContext != null;
+ }
+
+ @Override
+ public byte getProbeMjSmallTablePos() {
+ return this.probeDecodeContext.getMjSmallTablePos();
+ }
+
+ @Override
+ public int getProbeColIdx() {
+ // TODO: is this the best way to get the ColId?
+ Pattern pattern = Pattern.compile("_col([0-9]+)");
+ Matcher matcher = pattern.matcher(this.probeDecodeContext.getMjBigTableKeyColName());
+ return matcher.find() ? Integer.parseInt(matcher.group(1)) : -1;
+ }
+
+ @Override
+ public String getProbeColName() {
+ return this.probeDecodeContext.getMjBigTableKeyColName();
+ }
+
+ @Override
+ public String getProbeCacheKey() {
+ return this.probeDecodeContext.getMjSmallTableCacheKey();
+ }
+
}
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
index a830c07..e37379b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
@@ -49,6 +49,12 @@ public interface ColumnVectorProducer {
List<Integer> getPhysicalColumnIds();
List<Integer> getReaderLogicalColumnIds();
TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema);
+ String getQueryId();
+ boolean isProbeDecodeEnabled();
+ byte getProbeMjSmallTablePos();
+ String getProbeCacheKey();
+ String getProbeColName();
+ int getProbeColIdx();
}
ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 1b41d4e..b697a0d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -84,6 +84,10 @@ public class OrcEncodedDataConsumer
this.includes = includes;
// TODO: get rid of this
this.skipCorrupt = skipCorrupt;
+ if (includes.isProbeDecodeEnabled()) {
+ LlapIoImpl.LOG.info("OrcEncodedDataConsumer probeDecode is enabled with cacheKey {} colIndex {} and colName {}",
+ this.includes.getProbeCacheKey(), this.includes.getProbeColIdx(), this.includes.getProbeColName());
+ }
}
public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) {