You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2023/03/20 11:50:47 UTC
[hive] branch master updated: HIVE-27155: Iceberg: Vectorize virtual columns (Denys Kuzmenko, reviewed by Krisztian Kasa)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 48f56d5f2fa HIVE-27155: Iceberg: Vectorize virtual columns (Denys Kuzmenko, reviewed by Krisztian Kasa)
48f56d5f2fa is described below
commit 48f56d5f2facee1df9aad918d243e4e0ed2a7e37
Author: Denys Kuzmenko <de...@gmail.com>
AuthorDate: Mon Mar 20 13:50:38 2023 +0200
HIVE-27155: Iceberg: Vectorize virtual columns (Denys Kuzmenko, reviewed by Krisztian Kasa)
Closes #4113
---
.../apache/iceberg/mr/hive/IcebergAcidUtil.java | 2 +-
.../iceberg/mr/hive/vector/HiveBatchIterator.java | 48 ++-
.../mr/hive/vector/HiveVectorizedReader.java | 6 +-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 3 +-
.../apache/iceberg/mr/hive/TestHiveIcebergV2.java | 10 +-
.../hive/vector/TestHiveIcebergVectorization.java | 3 +-
.../test/queries/positive/iceberg_merge_schema.q | 41 +++
.../results/positive/iceberg_merge_schema.q.out | 348 +++++++++++++++++++++
.../test/results/positive/merge_iceberg_orc.q.out | 1 +
.../positive/merge_iceberg_partitioned_orc.q.out | 1 +
.../hive/ql/optimizer/physical/Vectorizer.java | 11 +-
11 files changed, 463 insertions(+), 11 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
index 6b4bca01e23..f6dae643034 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
@@ -202,7 +202,7 @@ public class IcebergAcidUtil {
return rec.get(FILE_READ_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
}
- private static long computeHash(StructProjection struct) {
+ public static long computeHash(StructProjection struct) {
long partHash = -1;
if (struct != null) {
Object[] partFields = new Object[struct.size()];
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
index 3b543b22aca..c12a0d039c7 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveBatchIterator.java
@@ -19,14 +19,23 @@
package org.apache.iceberg.mr.hive.vector;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.LongStream;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.RowPositionAwareVectorizedRecordReader;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
+import org.apache.iceberg.util.StructProjection;
/**
* Iterator wrapper around Hive's VectorizedRowBatch producer (MRv1 implementing) record readers.
@@ -41,15 +50,17 @@ public final class HiveBatchIterator implements CloseableIterator<HiveBatchConte
private final Object[] partitionValues;
private boolean advanced = false;
private long rowOffset = Long.MIN_VALUE;
+ private Map<Integer, ?> idToConstant;
HiveBatchIterator(RecordReader<NullWritable, VectorizedRowBatch> recordReader, JobConf job,
- int[] partitionColIndices, Object[] partitionValues) {
+ int[] partitionColIndices, Object[] partitionValues, Map<Integer, ?> idToConstant) {
this.recordReader = recordReader;
this.key = recordReader.createKey();
this.batch = recordReader.createValue();
this.vrbCtx = LlapHiveUtils.findMapWork(job).getVectorizedRowBatchCtx();
this.partitionColIndices = partitionColIndices;
this.partitionValues = partitionValues;
+ this.idToConstant = idToConstant;
}
@Override
@@ -79,6 +90,41 @@ public final class HiveBatchIterator implements CloseableIterator<HiveBatchConte
}
}
}
+ // Fill virtual columns
+ for (VirtualColumn vc : vrbCtx.getNeededVirtualColumns()) {
+ Object value;
+ int idx = vrbCtx.findVirtualColumnNum(vc);
+ switch (vc) {
+ case PARTITION_SPEC_ID:
+ value = idToConstant.get(MetadataColumns.SPEC_ID.fieldId());
+ vrbCtx.addPartitionColsToBatch(batch.cols[idx], value, idx);
+ break;
+ case PARTITION_HASH:
+ value = IcebergAcidUtil.computeHash(
+ (StructProjection) idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID));
+ vrbCtx.addPartitionColsToBatch(batch.cols[idx], value, idx);
+ break;
+ case FILE_PATH:
+ value = idToConstant.get(MetadataColumns.FILE_PATH.fieldId());
+ BytesColumnVector bcv = (BytesColumnVector) batch.cols[idx];
+ if (value == null) {
+ bcv.noNulls = false;
+ bcv.isNull[0] = true;
+ bcv.isRepeating = true;
+ } else {
+ bcv.fill(((String) value).getBytes());
+ }
+ break;
+ case ROW_POSITION:
+ value = LongStream.range(rowOffset, rowOffset + batch.size).toArray();
+ LongColumnVector lcv = (LongColumnVector) batch.cols[idx];
+ lcv.noNulls = true;
+ Arrays.fill(lcv.isNull, false);
+ lcv.isRepeating = false;
+ System.arraycopy(value, 0, lcv.vector, 0, batch.size);
+ break;
+ }
+ }
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 02ba73a476e..9223f5b4c07 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -166,7 +166,7 @@ public class HiveVectorizedReader {
}
CloseableIterable<HiveBatchContext> vrbIterable =
- createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues);
+ createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues, idToConstant);
return deleteFilter != null ? deleteFilter.filterBatch(vrbIterable) : vrbIterable;
@@ -251,10 +251,10 @@ public class HiveVectorizedReader {
private static CloseableIterable<HiveBatchContext> createVectorizedRowBatchIterable(
RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf job, int[] partitionColIndices,
- Object[] partitionValues) {
+ Object[] partitionValues, Map<Integer, ?> idToConstant) {
HiveBatchIterator iterator =
- new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);
+ new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues, idToConstant);
return new CloseableIterable<HiveBatchContext>() {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index d01e01ab034..4ad91ea6858 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -33,6 +33,7 @@ import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -293,7 +294,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
private CloseableIterator<T> nextTask() {
CloseableIterator<T> closeableIterator = open(tasks.next(), expectedSchema).iterator();
- if (!fetchVirtualColumns) {
+ if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) {
return closeableIterator;
}
return new IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator, expectedSchema, conf);
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
index c2a84c12918..721e3d012ee 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
@@ -371,7 +371,10 @@ public class TestHiveIcebergV2 extends HiveIcebergStorageHandlerWithEngineBase {
Type type = SUPPORTED_TYPES.get(i);
// TODO: remove this filter when issue #1881 is resolved
- if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
+ if (type == Types.UUIDType.get() &&
+ (fileFormat == FileFormat.PARQUET || fileFormat == FileFormat.ORC && isVectorized) ||
+ type == Types.TimeType.get() &&
+ fileFormat == FileFormat.PARQUET && isVectorized) {
continue;
}
@@ -551,7 +554,10 @@ public class TestHiveIcebergV2 extends HiveIcebergStorageHandlerWithEngineBase {
Type type = SUPPORTED_TYPES.get(i);
// TODO: remove this filter when issue #1881 is resolved
- if (type == Types.UUIDType.get() && fileFormat == FileFormat.PARQUET) {
+ if (type == Types.UUIDType.get() &&
+ (fileFormat == FileFormat.PARQUET || fileFormat == FileFormat.ORC && isVectorized) ||
+ type == Types.TimeType.get() &&
+ fileFormat == FileFormat.PARQUET && isVectorized) {
continue;
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
index 18059d41838..d01e7edea32 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/vector/TestHiveIcebergVectorization.java
@@ -111,7 +111,8 @@ public class TestHiveIcebergVectorization extends HiveIcebergStorageHandlerWithE
RecordReader<NullWritable, VectorizedRowBatch> internalVectorizedRecordReader =
inputFormat.getRecordReader(new FileSplit(dataFilePath, 0L, Long.MAX_VALUE, new String[]{}), jobConf,
new MockReporter());
- HiveBatchIterator hiveBatchIterator = new HiveBatchIterator(internalVectorizedRecordReader, jobConf, null, null);
+ HiveBatchIterator hiveBatchIterator = new HiveBatchIterator(
+ internalVectorizedRecordReader, jobConf, null, null, null);
// Expected to be one batch exactly
HiveBatchContext hiveBatchContext = hiveBatchIterator.next();
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q
index 8b5bb00dbad..641b373c0df 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_merge_schema.q
@@ -1,5 +1,10 @@
-- SORT_QUERY_RESULTS
+
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+
set hive.optimize.shared.work.merge.ts.schema=true;
+set hive.vectorized.execution.enabled=true;
CREATE EXTERNAL TABLE calls (
s_key bigint,
@@ -27,6 +32,42 @@ INSERT INTO display (skey, language_id, hierarchy_display) VALUES
(1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1');
+explain vectorization only detail MERGE INTO display USING (
+ SELECT distinct display_skey, display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub1
+
+ UNION ALL
+
+ SELECT distinct display_skey, null as display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub2
+) sub
+ON display.skey = sub.display_skey
+ and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+ UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+ INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1'));
+
+
MERGE INTO display USING (
SELECT distinct display_skey, display, display as orig_display
FROM (
diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out
index 537c4472966..0a5525d8fdc 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_merge_schema.q.out
@@ -62,6 +62,354 @@ POSTHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) VALU
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@display
+Warning: Shuffle Join MERGEJOIN[62][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[63][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 8' is a cross product
+PREHOOK: query: explain vectorization only detail MERGE INTO display USING (
+ SELECT distinct display_skey, display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub1
+
+ UNION ALL
+
+ SELECT distinct display_skey, null as display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub2
+) sub
+ON display.skey = sub.display_skey
+ and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+ UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+ INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1'))
+PREHOOK: type: QUERY
+PREHOOK: Input: default@calls
+PREHOOK: Input: default@display
+PREHOOK: Output: default@display
+PREHOOK: Output: default@display
+PREHOOK: Output: default@merge_tmp_table
+POSTHOOK: query: explain vectorization only detail MERGE INTO display USING (
+ SELECT distinct display_skey, display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub1
+
+ UNION ALL
+
+ SELECT distinct display_skey, null as display, display as orig_display
+ FROM (
+ SELECT D.skey display_skey, D.hierarchy_display display
+ FROM (
+ SELECT s_key FROM calls WHERE s_key = 1090969
+ ) R
+ INNER JOIN display D
+ ON R.s_key = D.skey AND D.language_id = 3
+ GROUP BY D.skey,
+ D.hierarchy_display
+ ) sub2
+) sub
+ON display.skey = sub.display_skey
+ and display.hierarchy_display = sub.display
+
+WHEN MATCHED THEN
+ UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
+WHEN NOT MATCHED THEN
+ INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1'))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@calls
+POSTHOOK: Input: default@display
+POSTHOOK: Output: default@display
+POSTHOOK: Output: default@display
+POSTHOOK: Output: default@merge_tmp_table
+PLAN VECTORIZATION:
+ enabled: true
+ enabledConditionsMet: [hive.vectorized.execution.enabled IS true]
+
+STAGE DEPENDENCIES:
+ Stage-4 is a root stage
+ Stage-5 depends on stages: Stage-4
+ Stage-0 depends on stages: Stage-5
+ Stage-6 depends on stages: Stage-0
+ Stage-3 depends on stages: Stage-5
+ Stage-7 depends on stages: Stage-3
+
+STAGE PLANS:
+ Stage: Stage-4
+ Tez
+ Edges:
+ Reducer 2 <- Map 1 (XPROD_EDGE), Map 10 (XPROD_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Union 4 (CONTAINS)
+ Reducer 5 <- Map 10 (SIMPLE_EDGE), Union 4 (SIMPLE_EDGE)
+ Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
+ Reducer 7 <- Reducer 5 (SIMPLE_EDGE)
+ Reducer 8 <- Map 1 (XPROD_EDGE), Map 10 (XPROD_EDGE)
+ Reducer 9 <- Reducer 8 (SIMPLE_EDGE), Union 4 (CONTAINS)
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan Vectorization:
+ native: true
+ vectorizationSchemaColumns: [0:s_key:bigint, 1:year:int, 2:PARTITION__SPEC__ID:int, 3:PARTITION__HASH:bigint, 4:FILE__PATH:string, 5:ROW__POSITION:bigint]
+ Filter Vectorization:
+ className: VectorFilterOperator
+ native: true
+ predicateExpression: FilterLongColEqualLongScalar(col 0:bigint, val 1090969)
+ Select Vectorization:
+ className: VectorSelectOperator
+ native: true
+ projectedOutputColumnNums: []
+ Reduce Sink Vectorization:
+ className: VectorReduceSinkEmptyKeyOperator
+ native: true
+ nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ Reduce Sink Vectorization:
+ className: VectorReduceSinkEmptyKeyOperator
+ native: true
+ nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ Execution mode: vectorized
+ Map Vectorization:
+ enabled: true
+ enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true
+ inputFormatFeatureSupport: []
+ featureSupportInUse: []
+ inputFileFormats: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ allNative: true
+ usesVectorUDFAdaptor: false
+ vectorized: true
+ rowBatchContext:
+ dataColumnCount: 2
+ includeColumns: [0]
+ dataColumns: s_key:bigint, year:int
+ partitionColumnCount: 0
+ scratchColumnTypeNames: []
+ Map 10
+ Map Operator Tree:
+ TableScan Vectorization:
+ native: true
+ vectorizationSchemaColumns: [0:skey:bigint, 1:hierarchy_number:string, 2:hierarchy_name:string, 3:language_id:int, 4:hierarchy_display:string, 5:orderby:string, 6:PARTITION__SPEC__ID:int, 7:PARTITION__HASH:bigint, 8:FILE__PATH:string, 9:ROW__POSITION:bigint]
+ Filter Vectorization:
+ className: VectorFilterOperator
+ native: true
+ predicateExpression: FilterExprAndExpr(children: FilterLongColEqualLongScalar(col 0:bigint, val 1090969), SelectColumnIsNotNull(col 4:string))
+ Select Vectorization:
+ className: VectorSelectOperator
+ native: true
+ projectedOutputColumnNums: [6, 7, 8, 9, 10, 1, 2, 3, 5, 11, 1, 2, 3, 4, 5]
+ selectExpressions: ConstantVectorExpression(val 1090969) -> 10:bigint, ConstantVectorExpression(val 1090969) -> 11:bigint
+ Reduce Sink Vectorization:
+ className: VectorReduceSinkStringOperator
+ keyColumns: 4:string
+ native: true
+ nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ valueColumns: 6:int, 7:bigint, 8:string, 9:bigint, 10:bigint, 1:string, 2:string, 3:int, 5:string, 11:bigint, 1:string, 2:string, 3:int, 5:string
+ Filter Vectorization:
+ className: VectorFilterOperator
+ native: true
+ predicateExpression: FilterExprAndExpr(children: FilterLongColEqualLongScalar(col 3:int, val 3), FilterLongColEqualLongScalar(col 0:bigint, val 1090969))
+ Select Vectorization:
+ className: VectorSelectOperator
+ native: true
+ projectedOutputColumnNums: [4]
+ Reduce Sink Vectorization:
+ className: VectorReduceSinkEmptyKeyOperator
+ native: true
+ nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ valueColumns: 4:string
+ Reduce Sink Vectorization:
+ className: VectorReduceSinkEmptyKeyOperator
+ native: true
+ nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ valueColumns: 4:string
+ Execution mode: vectorized
+ Map Vectorization:
+ enabled: true
+ enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true
+ inputFormatFeatureSupport: []
+ featureSupportInUse: []
+ inputFileFormats: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ allNative: true
+ usesVectorUDFAdaptor: false
+ vectorized: true
+ rowBatchContext:
+ dataColumnCount: 6
+ includeColumns: [0, 1, 2, 3, 4, 5]
+ dataColumns: skey:bigint, hierarchy_number:string, hierarchy_name:string, language_id:int, hierarchy_display:string, orderby:string
+ neededVirtualColumns: #Masked#
+ partitionColumnCount: 0
+ scratchColumnTypeNames: [bigint, bigint]
+ Reducer 2
+ MergeJoin Vectorization:
+ enabled: false
+ enableConditionsNotMet: Vectorizing MergeJoin Supported IS false
+ Reducer 3
+ Execution mode: vectorized
+ Reduce Vectorization:
+ enabled: true
+ enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez] IS true
+ reduceColumnNullOrder: z
+ reduceColumnSortOrder: +
+ allNative: false
+ usesVectorUDFAdaptor: false
+ vectorized: true
+ rowBatchContext:
+ dataColumnCount: 1
+ dataColumns: KEY._col0:string
+ partitionColumnCount: 0
+ scratchColumnTypeNames: []
+ Reduce Operator Tree:
+ Group By Vectorization:
+ className: VectorGroupByOperator
+ groupByMode: MERGEPARTIAL
+ keyExpressions: col 0:string
+ native: false
+ vectorProcessingMode: MERGE_PARTIAL
+ projectedOutputColumnNums: []
+ Select Vectorization:
+ className: VectorSelectOperator
+ native: true
+ projectedOutputColumnNums: [0, 0]
+ Reduce Sink Vectorization:
+ className: VectorReduceSinkStringOperator
+ keyColumns: 0:string
+ native: true
+ nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ valueColumns: 0:string
+ Reducer 5
+ MergeJoin Vectorization:
+ enabled: false
+ enableConditionsNotMet: Vectorizing MergeJoin Supported IS false
+ Reducer 6
+ Execution mode: vectorized
+ Reduce Vectorization:
+ enabled: true
+ enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez] IS true
+ reduceColumnNullOrder: zzzz
+ reduceColumnSortOrder: ++++
+ allNative: false
+ usesVectorUDFAdaptor: false
+ vectorized: true
+ rowBatchContext:
+ dataColumnCount: 10
+ dataColumns: KEY.reducesinkkey0:int, KEY.reducesinkkey1:bigint, KEY.reducesinkkey2:string, KEY.reducesinkkey3:bigint, VALUE._col0:bigint, VALUE._col1:string, VALUE._col2:string, VALUE._col3:int, VALUE._col4:string, VALUE._col5:string
+ partitionColumnCount: 0
+ scratchColumnTypeNames: []
+ Reduce Operator Tree:
+ Select Vectorization:
+ className: VectorSelectOperator
+ native: true
+ projectedOutputColumnNums: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
+ File Sink Vectorization:
+ className: VectorFileSinkOperator
+ native: false
+ Reducer 7
+ Execution mode: vectorized
+ Reduce Vectorization:
+ enabled: true
+ enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez] IS true
+ reduceColumnNullOrder: zzzz
+ reduceColumnSortOrder: ++++
+ allNative: false
+ usesVectorUDFAdaptor: true
+ vectorized: true
+ rowBatchContext:
+ dataColumnCount: 5
+ dataColumns: KEY._col0:int, KEY._col1:bigint, KEY._col2:string, KEY._col3:bigint, VALUE._col0:bigint
+ partitionColumnCount: 0
+ scratchColumnTypeNames: []
+ Reduce Operator Tree:
+ Group By Vectorization:
+ aggregators: VectorUDAFCountMerge(col 4:bigint) -> bigint
+ className: VectorGroupByOperator
+ groupByMode: MERGEPARTIAL
+ keyExpressions: col 0:int, col 1:bigint, col 2:string, col 3:bigint
+ native: false
+ vectorProcessingMode: MERGE_PARTIAL
+ projectedOutputColumnNums: [0]
+ Filter Vectorization:
+ className: VectorFilterOperator
+ native: true
+ predicateExpression: FilterLongColGreaterLongScalar(col 4:bigint, val 1)
+ Select Vectorization:
+ className: VectorSelectOperator
+ native: true
+ projectedOutputColumnNums: [5]
+ selectExpressions: VectorUDFAdaptor(cardinality_violation(_col0,_col1,_col2,_col3)) -> 5:int
+ File Sink Vectorization:
+ className: VectorFileSinkOperator
+ native: false
+ Reducer 8
+ MergeJoin Vectorization:
+ enabled: false
+ enableConditionsNotMet: Vectorizing MergeJoin Supported IS false
+ Reducer 9
+ Execution mode: vectorized
+ Reduce Vectorization:
+ enabled: true
+ enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez] IS true
+ reduceColumnNullOrder: z
+ reduceColumnSortOrder: +
+ allNative: false
+ usesVectorUDFAdaptor: false
+ vectorized: true
+ rowBatchContext:
+ dataColumnCount: 1
+ dataColumns: KEY._col0:string
+ partitionColumnCount: 0
+ scratchColumnTypeNames: []
+ Reduce Operator Tree:
+ Group By Vectorization:
+ className: VectorGroupByOperator
+ groupByMode: MERGEPARTIAL
+ keyExpressions: col 0:string
+ native: false
+ vectorProcessingMode: MERGE_PARTIAL
+ projectedOutputColumnNums: []
+ Select Vectorization:
+ className: VectorSelectOperator
+ native: true
+ projectedOutputColumnNums: [1, 0]
+ selectExpressions: ConstantVectorExpression(val null) -> 1:string
+ Reduce Sink Vectorization:
+ className: VectorReduceSinkStringOperator
+ keyColumns: 1:string
+ native: true
+ nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true
+ valueColumns: 0:string
+ Union 4
+
+ Stage: Stage-5
+
+ Stage: Stage-0
+
+ Stage: Stage-6
+
+ Stage: Stage-3
+
+ Stage: Stage-7
+
Warning: Shuffle Join MERGEJOIN[62][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
Warning: Shuffle Join MERGEJOIN[63][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 8' is a cross product
PREHOOK: query: MERGE INTO display USING (
diff --git a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out
index fe94c2b6bc1..513a23a304b 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_orc.q.out
@@ -111,6 +111,7 @@ STAGE PLANS:
Map-reduce partition columns: _col7 (type: int)
Statistics: Num rows: 4 Data size: 1212 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint), _col5 (type: string), _col6 (type: int), _col8 (type: int)
+ Execution mode: vectorized
Reducer 2
Reduce Operator Tree:
Merge Join Operator
diff --git a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_partitioned_orc.q.out b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_partitioned_orc.q.out
index d1778673d08..87de84056fe 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_partitioned_orc.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/merge_iceberg_partitioned_orc.q.out
@@ -113,6 +113,7 @@ STAGE PLANS:
Map-reduce partition columns: _col7 (type: int)
Statistics: Num rows: 4 Data size: 1212 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint), _col5 (type: string), _col6 (type: int), _col8 (type: int)
+ Execution mode: vectorized
Reducer 2
Reduce Operator Tree:
Merge Join Operator
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 3cf63e24c58..ce2e31d5880 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -297,8 +297,15 @@ public class Vectorizer implements PhysicalPlanResolver {
.collect(Collectors.toSet());
// The set of virtual columns that vectorized readers *MAY* support.
- public static final ImmutableSet<VirtualColumn> vectorizableVirtualColumns =
- ImmutableSet.of(VirtualColumn.ROWID, VirtualColumn.ROWISDELETED);
+ public static final ImmutableSet<VirtualColumn> vectorizableVirtualColumns =
+ ImmutableSet.of(
+ VirtualColumn.ROWID,
+ VirtualColumn.ROWISDELETED,
+ VirtualColumn.PARTITION_SPEC_ID,
+ VirtualColumn.PARTITION_HASH,
+ VirtualColumn.FILE_PATH,
+ VirtualColumn.ROW_POSITION
+ );
private HiveConf hiveConf;