You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/04/21 23:55:38 UTC

[hive] branch master updated: HIVE-23169 : Probe runtime support for LLAP (Panagiotis Garefalakis via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b9fadb  HIVE-23169 : Probe runtime support for LLAP (Panagiotis Garefalakis via Ashutosh Chauhan)
8b9fadb is described below

commit 8b9fadb5515aace73db5068cc81317b6f10e0f32
Author: Ashutosh Chauhan <ha...@apache.org>
AuthorDate: Tue Apr 21 16:54:58 2020 -0700

    HIVE-23169 : Probe runtime support for LLAP (Panagiotis Garefalakis via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hive/llap/io/api/impl/LlapRecordReader.java    | 51 ++++++++++++++++++++++
 .../hive/llap/io/decode/ColumnVectorProducer.java  |  6 +++
 .../llap/io/decode/OrcEncodedDataConsumer.java     |  4 ++
 3 files changed, 61 insertions(+)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index acb6b2d..417a42a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -82,6 +85,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
   private final SearchArgument sarg;
   private final VectorizedRowBatchCtx rbCtx;
   private final boolean isVectorized;
+  private final boolean probeDecodeEnabled;
   private VectorizedOrcAcidRowBatchReader acidReader;
   private final Object[] partitionValues;
 
@@ -196,6 +200,12 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
     this.includes = new IncludesImpl(tableIncludedCols, isAcidFormat, rbCtx,
         schema, job, isAcidScan && acidReader.includeAcidColumns());
 
+    this.probeDecodeEnabled = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_OPTIMIZE_SCAN_PROBEDECODE);
+    if (this.probeDecodeEnabled) {
+      includes.setProbeDecodeContext(mapWork.getProbeDecodeContext());
+      LOG.info("LlapRecordReader ProbeDecode is enabled");
+    }
+
     // Create the consumer of encoded data; it will coordinate decoding to CVBs.
     feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes,
         sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo());
@@ -629,6 +639,9 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
     private TypeDescription readerSchema;
     private JobConf jobConf;
 
+    // ProbeDecode Context for row-level filtering
+    private TableScanOperator.ProbeDecodeContext probeDecodeContext = null;
+
     public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan,
         VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema,
         JobConf jobConf, boolean includeAcidColumns) {
@@ -710,6 +723,10 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
           fileSchema, filePhysicalColumnIds, acidStructColumnId);
     }
 
+    public void setProbeDecodeContext(TableScanOperator.ProbeDecodeContext currProbeDecodeContext) {
+      this.probeDecodeContext = currProbeDecodeContext;
+    }
+
     @Override
     public List<Integer> getPhysicalColumnIds() {
       return filePhysicalColumnIds;
@@ -725,5 +742,39 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch>
       return OrcInputFormat.genIncludedTypes(
           fileSchema, filePhysicalColumnIds, acidStructColumnId);
     }
+
+    @Override
+    public String getQueryId() {
+      return HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID);
+    }
+
+    @Override
+    public boolean isProbeDecodeEnabled() {
+      return this.probeDecodeContext != null;
+    }
+
+    @Override
+    public byte getProbeMjSmallTablePos() {
+      return this.probeDecodeContext.getMjSmallTablePos();
+    }
+
+    @Override
+    public int getProbeColIdx() {
+      // TODO: is this the best way to get the ColId?
+      Pattern pattern = Pattern.compile("_col([0-9]+)");
+      Matcher matcher = pattern.matcher(this.probeDecodeContext.getMjBigTableKeyColName());
+      return matcher.find() ? Integer.parseInt(matcher.group(1)) : -1;
+    }
+
+    @Override
+    public String getProbeColName() {
+      return this.probeDecodeContext.getMjBigTableKeyColName();
+    }
+
+    @Override
+    public String getProbeCacheKey() {
+      return this.probeDecodeContext.getMjSmallTableCacheKey();
+    }
+
   }
 } 
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
index a830c07..e37379b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
@@ -49,6 +49,12 @@ public interface ColumnVectorProducer {
     List<Integer> getPhysicalColumnIds();
     List<Integer> getReaderLogicalColumnIds();
     TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema);
+    String getQueryId();
+    boolean isProbeDecodeEnabled();
+    byte getProbeMjSmallTablePos();
+    String getProbeCacheKey();
+    String getProbeColName();
+    int getProbeColIdx();
   }
 
   ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split,
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 1b41d4e..b697a0d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -84,6 +84,10 @@ public class OrcEncodedDataConsumer
     this.includes = includes;
     // TODO: get rid of this
     this.skipCorrupt = skipCorrupt;
+    if (includes.isProbeDecodeEnabled()) {
+      LlapIoImpl.LOG.info("OrcEncodedDataConsumer probeDecode is enabled with cacheKey {} colIndex {} and colName {}",
+              this.includes.getProbeCacheKey(), this.includes.getProbeColIdx(), this.includes.getProbeColName());
+    }
   }
 
   public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) {