You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2023/05/08 11:21:38 UTC
[hive] branch master updated: HIVE-27187: Incremental rebuild of materialized view having aggregate and stored by iceberg - ADDENDUM (Krisztian Kasa, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9376eb679bc HIVE-27187: Incremental rebuild of materialized view having aggregate and stored by iceberg - ADDENDUM (Krisztian Kasa, reviewed by Denys Kuzmenko)
9376eb679bc is described below
commit 9376eb679bc5aefb0d79e96a2855cd583e686f44
Author: Krisztian Kasa <ka...@gmail.com>
AuthorDate: Mon May 8 13:21:31 2023 +0200
HIVE-27187: Incremental rebuild of materialized view having aggregate and stored by iceberg - ADDENDUM (Krisztian Kasa, reviewed by Denys Kuzmenko)
---
.../src/test/queries/positive/mv_iceberg_orc6.q | 5 +
.../src/test/queries/positive/mv_iceberg_orc7.q | 5 +
.../test/results/positive/mv_iceberg_orc7.q.out | 213 ++++++++++++++++++---
.../AlterMaterializedViewRebuildAnalyzer.java | 19 +-
.../NonNativeAcidMaterializedViewASTBuilder.java | 7 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 2 +
6 files changed, 209 insertions(+), 42 deletions(-)
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
index 52da0cac75a..70fdc943575 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
@@ -2,6 +2,11 @@
-- SORT_QUERY_RESULTS
--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+set hive.explain.user=false;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+
drop table if exists tbl_ice;
create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1');
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc7.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc7.q
index 28f06875596..c619e1a1639 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc7.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc7.q
@@ -1,6 +1,11 @@
-- MV source tables are iceberg and MV has aggregate.
-- SORT_QUERY_RESULTS
--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+--! qt:replace:/(\s+Version\sinterval\sfrom\:\s+)\d+(\s*)/$1#Masked#/
+
+set hive.explain.user=false;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.stats.column.autogather=false;
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out
index 2ad75a5b08a..76567afbb83 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out
@@ -60,65 +60,220 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
PREHOOK: query: explain cbo
alter materialized view mat1 rebuild
PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@mat1
+PREHOOK: Output: default@mat1
POSTHOOK: query: explain cbo
alter materialized view mat1 rebuild
POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@mat1
+POSTHOOK: Output: default@mat1
CBO PLAN:
-HiveAggregate(group=[{0}], agg#0=[count($2)])
- HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice])
+HiveProject($f0=[$3], $f1=[CASE(IS NULL($1), $4, IS NULL($4), $1, +($4, $1))])
+ HiveFilter(condition=[OR($2, IS NULL($2))])
+ HiveJoin(condition=[IS NOT DISTINCT FROM($0, $3)], joinType=[right], algorithm=[none], cost=[not available])
+ HiveProject(a=[$0], _c1=[$1], $f2=[true])
+ HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1])
+ HiveProject(a=[$0], $f1=[$1])
+ HiveAggregate(group=[{0}], agg#0=[count($2)])
+ HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#])
PREHOOK: query: explain
alter materialized view mat1 rebuild
PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@mat1
+PREHOOK: Output: default@mat1
POSTHOOK: query: explain
alter materialized view mat1 rebuild
POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@mat1
-Plan optimized by CBO.
+POSTHOOK: Output: default@mat1
+STAGE DEPENDENCIES:
+ Stage-3 is a root stage
+ Stage-4 depends on stages: Stage-3
+ Stage-0 depends on stages: Stage-4
+ Stage-5 depends on stages: Stage-0
+ Stage-6 depends on stages: Stage-5
-Vertex dependency in root stage
-Reducer 2 <- Map 1 (SIMPLE_EDGE)
+STAGE PLANS:
+ Stage: Stage-3
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+ Reducer 5 <- Map 4 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: default.mat1
+ Statistics: Num rows: 3 Data size: 36 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: a (type: int), _c1 (type: bigint), true (type: boolean), PARTITION__SPEC__ID (type: int), PARTITION__HASH (type: bigint), FILE__PATH (type: string), ROW__POSITION (type: bigint)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+ Statistics: Num rows: 3 Data size: 36 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 3 Data size: 36 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: bigint), _col2 (type: boolean), _col3 (type: int), _col4 (type: bigint), _col5 (type: string), _col6 (type: bigint)
+ Execution mode: vectorized
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: tbl_ice
+ Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
+ Version interval from: #Masked#
+ Select Operator
+ expressions: a (type: int), c (type: int)
+ outputColumnNames: a, c
+ Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count(c)
+ keys: a (type: int)
+ minReductionHashAggr: 0.99
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: bigint)
+ Execution mode: vectorized
+ Reducer 2
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Right Outer Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ nullSafes: [true]
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8
+ Statistics: Num rows: 3 Data size: 39 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: _col2 (type: boolean)
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col3 (type: int), _col4 (type: bigint), _col5 (type: string), _col6 (type: bigint), _col0 (type: int), _col1 (type: bigint)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint)
+ null sort order: aaaa
+ sort order: ++++
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col4 (type: int), _col5 (type: bigint)
+ Filter Operator
+ predicate: _col2 (type: boolean)
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col7 (type: int), CASE WHEN (_col1 is null) THEN (_col8) WHEN (_col8 is null) THEN (_col1) ELSE ((_col8 + _col1)) END (type: bigint)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+ name: default.mat1
+ Filter Operator
+ predicate: _col2 is null (type: boolean)
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col7 (type: int), CASE WHEN (_col1 is null) THEN (_col8) WHEN (_col8 is null) THEN (_col1) ELSE ((_col8 + _col1)) END (type: bigint)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+ name: default.mat1
+ Reducer 3
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type: bigint)
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+ name: default.mat1
+ Reducer 5
+ Execution mode: vectorized
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ keys: KEY._col0 (type: int)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ null sort order: z
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: bigint)
-Stage-4
- Materialized View Update{"name:":"default.mat1","update creation metadata:":"true"}
- Stage-3
- Stats Work{}
- Stage-0
- Move Operator
- table:{"name:":"default.mat1"}
- Stage-2
- Dependency Collection{}
- Stage-1
- Reducer 2 vectorized
- File Output Operator [FS_11]
- table:{"name:":"default.mat1"}
- Group By Operator [GBY_10] (rows=2 width=8)
- Output:["_col0","_col1"],aggregations:["count(VALUE._col0)"],keys:KEY._col0
- <-Map 1 [SIMPLE_EDGE] vectorized
- SHUFFLE [RS_9]
- PartitionCols:_col0
- Group By Operator [GBY_8] (rows=4 width=8)
- Output:["_col0","_col1"],aggregations:["count(c)"],keys:a
- Select Operator [SEL_7] (rows=4 width=8)
- Output:["a","c"]
- TableScan [TS_0] (rows=4 width=8)
- default@tbl_ice,tbl_ice,Tbl:COMPLETE,Col:NONE,Output:["a","c"]
+ Stage: Stage-4
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: false
+ table:
+ input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+ output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+ serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+ name: default.mat1
+
+ Stage: Stage-5
+ Stats Work
+ Basic Stats Work:
+
+ Stage: Stage-6
+ Materialized View Update
+ name: default.mat1
+ update creation metadata: true
PREHOOK: query: alter materialized view mat1 rebuild
PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@mat1
+PREHOOK: Output: default@mat1
POSTHOOK: query: alter materialized view mat1 rebuild
POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@mat1
+POSTHOOK: Output: default@mat1
PREHOOK: query: select * from mat1
PREHOOK: type: QUERY
PREHOOK: Input: default@mat1
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
index 9f0b99fe33e..c7ab47cc637 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
@@ -536,16 +536,17 @@ public class AlterMaterializedViewRebuildAnalyzer extends CalcitePlanner {
private void fixUpASTAggregateIncrementalRebuild(
ASTNode subqueryNodeInputROJ,
- ASTNode updateNode,
+ ASTNode updateInsertNode,
Map<Context.DestClausePrefix, ASTNode> disjuncts,
MaterializedViewASTBuilder astBuilder)
throws SemanticException {
- // Replace INSERT OVERWRITE by MERGE equivalent rewriting.
+ // Replace INSERT OVERWRITE by MERGE equivalent rewriting. The update branch is
+ // split to a delete (updateDeleteNode) and an insert (updateInsertNode) branch
// Here we need to do this complex AST rewriting that generates the same plan
// that a MERGE clause would generate because CBO does not support MERGE yet.
// TODO: Support MERGE as first class member in CBO to simplify this logic.
// 1) Replace INSERT OVERWRITE by INSERT
- ASTNode destinationNode = (ASTNode) updateNode.getChild(0);
+ ASTNode destinationNode = (ASTNode) updateInsertNode.getChild(0);
ASTNode newInsertInto = (ASTNode) ParseDriver.adaptor.create(
HiveParser.TOK_INSERT_INTO, "TOK_INSERT_INTO");
newInsertInto.addChildren(destinationNode.getChildren());
@@ -560,10 +561,10 @@ public class AlterMaterializedViewRebuildAnalyzer extends CalcitePlanner {
// 2) Copy INSERT branch and duplicate it, the first branch will be the UPDATE
// for the MERGE statement while the new branch will be the INSERT for the
// MERGE statement
- ASTNode updateParent = (ASTNode) updateNode.getParent();
- ASTNode insertNode = (ASTNode) ParseDriver.adaptor.dupTree(updateNode);
+ ASTNode updateParent = (ASTNode) updateInsertNode.getParent();
+ ASTNode insertNode = (ASTNode) ParseDriver.adaptor.dupTree(updateInsertNode);
insertNode.setParent(updateParent);
- // 3) Create ROW_ID column in select clause from left input for the RIGHT OUTER JOIN.
+ // 3) Add sort columns (ROW_ID in case of native) to select clause from left input for the RIGHT OUTER JOIN.
// This is needed for the UPDATE clause. Hence, we find the following node:
// TOK_QUERY
// TOK_FROM
@@ -589,14 +590,14 @@ public class AlterMaterializedViewRebuildAnalyzer extends CalcitePlanner {
.forEach(astNode -> ParseDriver.adaptor.addChild(selectNodeInputROJ, astNode));
// 4) Transform first INSERT branch into an UPDATE
// 4.1) Modifying filter condition.
- ASTNode whereClauseInUpdate = findWhereClause(updateNode);
+ ASTNode whereClauseInUpdate = findWhereClause(updateInsertNode);
if (whereClauseInUpdate.getChild(0).getType() != HiveParser.KW_OR) {
throw new SemanticException("OR clause expected below TOK_WHERE in incremental rewriting");
}
// We bypass the OR clause and select the first disjunct for the Update branch
ParseDriver.adaptor.setChild(whereClauseInUpdate, 0, disjuncts.get(Context.DestClausePrefix.UPDATE));
- ASTNode updateDeleteNode = (ASTNode) ParseDriver.adaptor.dupTree(updateNode);
- // 4.2) Adding ROW__ID field
+ ASTNode updateDeleteNode = (ASTNode) ParseDriver.adaptor.dupTree(updateInsertNode);
+ // 4.2) Adding acid sort columns (ROW__ID in case of native acid query StorageHandler otherwise)
ASTNode selectNodeInUpdateDelete = (ASTNode) updateDeleteNode.getChild(1);
if (selectNodeInUpdateDelete.getType() != HiveParser.TOK_SELECT) {
throw new SemanticException("TOK_SELECT expected in incremental rewriting got "
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/NonNativeAcidMaterializedViewASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/NonNativeAcidMaterializedViewASTBuilder.java
index e29548d9a12..f9d50a809d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/NonNativeAcidMaterializedViewASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/NonNativeAcidMaterializedViewASTBuilder.java
@@ -20,14 +20,11 @@ package org.apache.hadoop.hive.ql.ddl.view.materialized.alter.rebuild;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import java.util.List;
import java.util.stream.Collectors;
-import static java.util.Collections.singletonList;
-
public class NonNativeAcidMaterializedViewASTBuilder extends MaterializedViewASTBuilder {
private final Table mvTable;
@@ -44,6 +41,8 @@ public class NonNativeAcidMaterializedViewASTBuilder extends MaterializedViewAST
@Override
protected List<ASTNode> createAcidSortNodesInternal(String tableName) {
- return singletonList(createQualifiedColumnNode(tableName, VirtualColumn.ROWID.getName()));
+ return mvTable.getStorageHandler().acidSortColumns(mvTable, Context.Operation.DELETE).stream()
+ .map(fieldSchema -> createQualifiedColumnNode(tableName, fieldSchema.getName()))
+ .collect(Collectors.toList());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 5174ef13721..772947c25d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -3096,6 +3096,8 @@ public class AcidUtils {
assert t != null;
if (AcidUtils.isTransactionalTable(t) && sharedWrite) {
compBuilder.setSharedWrite();
+ } else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) {
+ compBuilder.setLock(getLockTypeFromStorageHandler(output, t));
} else {
compBuilder.setExclWrite();
}