You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2023/05/08 11:21:38 UTC

[hive] branch master updated: HIVE-27187: Incremental rebuild of materialized view having aggregate and stored by iceberg - ADDENDUM (Krisztian Kasa, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9376eb679bc HIVE-27187: Incremental rebuild of materialized view having aggregate and stored by iceberg - ADDENDUM (Krisztian Kasa, reviewed by Denys Kuzmenko)
9376eb679bc is described below

commit 9376eb679bc5aefb0d79e96a2855cd583e686f44
Author: Krisztian Kasa <ka...@gmail.com>
AuthorDate: Mon May 8 13:21:31 2023 +0200

    HIVE-27187: Incremental rebuild of materialized view having aggregate and stored by iceberg - ADDENDUM (Krisztian Kasa, reviewed by Denys Kuzmenko)
---
 .../src/test/queries/positive/mv_iceberg_orc6.q    |   5 +
 .../src/test/queries/positive/mv_iceberg_orc7.q    |   5 +
 .../test/results/positive/mv_iceberg_orc7.q.out    | 213 ++++++++++++++++++---
 .../AlterMaterializedViewRebuildAnalyzer.java      |  19 +-
 .../NonNativeAcidMaterializedViewASTBuilder.java   |   7 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |   2 +
 6 files changed, 209 insertions(+), 42 deletions(-)

diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
index 52da0cac75a..70fdc943575 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
@@ -2,6 +2,11 @@
 -- SORT_QUERY_RESULTS
 --! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
 
+set hive.explain.user=false;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+
 drop table if exists tbl_ice;
 
 create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1');
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc7.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc7.q
index 28f06875596..c619e1a1639 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc7.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc7.q
@@ -1,6 +1,11 @@
 -- MV source tables are iceberg and MV has aggregate.
 -- SORT_QUERY_RESULTS
 --! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+--! qt:replace:/(\s+Version\sinterval\sfrom\:\s+)\d+(\s*)/$1#Masked#/
+
+set hive.explain.user=false;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 
 set hive.stats.column.autogather=false;
 
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out
index 2ad75a5b08a..76567afbb83 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc7.q.out
@@ -60,65 +60,220 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 PREHOOK: query: explain cbo
 alter materialized view mat1 rebuild
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@mat1
+PREHOOK: Output: default@mat1
 POSTHOOK: query: explain cbo
 alter materialized view mat1 rebuild
 POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@mat1
+POSTHOOK: Output: default@mat1
 CBO PLAN:
-HiveAggregate(group=[{0}], agg#0=[count($2)])
-  HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice])
+HiveProject($f0=[$3], $f1=[CASE(IS NULL($1), $4, IS NULL($4), $1, +($4, $1))])
+  HiveFilter(condition=[OR($2, IS NULL($2))])
+    HiveJoin(condition=[IS NOT DISTINCT FROM($0, $3)], joinType=[right], algorithm=[none], cost=[not available])
+      HiveProject(a=[$0], _c1=[$1], $f2=[true])
+        HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1])
+      HiveProject(a=[$0], $f1=[$1])
+        HiveAggregate(group=[{0}], agg#0=[count($2)])
+          HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#])
 
 PREHOOK: query: explain
 alter materialized view mat1 rebuild
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@mat1
+PREHOOK: Output: default@mat1
 POSTHOOK: query: explain
 alter materialized view mat1 rebuild
 POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@mat1
-Plan optimized by CBO.
+POSTHOOK: Output: default@mat1
+STAGE DEPENDENCIES:
+  Stage-3 is a root stage
+  Stage-4 depends on stages: Stage-3
+  Stage-0 depends on stages: Stage-4
+  Stage-5 depends on stages: Stage-0
+  Stage-6 depends on stages: Stage-5
 
-Vertex dependency in root stage
-Reducer 2 <- Map 1 (SIMPLE_EDGE)
+STAGE PLANS:
+  Stage: Stage-3
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: default.mat1
+                  Statistics: Num rows: 3 Data size: 36 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: a (type: int), _c1 (type: bigint), true (type: boolean), PARTITION__SPEC__ID (type: int), PARTITION__HASH (type: bigint), FILE__PATH (type: string), ROW__POSITION (type: bigint)
+                    outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                    Statistics: Num rows: 3 Data size: 36 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int)
+                      null sort order: z
+                      sort order: +
+                      Map-reduce partition columns: _col0 (type: int)
+                      Statistics: Num rows: 3 Data size: 36 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col1 (type: bigint), _col2 (type: boolean), _col3 (type: int), _col4 (type: bigint), _col5 (type: string), _col6 (type: bigint)
+            Execution mode: vectorized
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: tbl_ice
+                  Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
+                  Version interval from: #Masked#
+                  Select Operator
+                    expressions: a (type: int), c (type: int)
+                    outputColumnNames: a, c
+                    Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: count(c)
+                      keys: a (type: int)
+                      minReductionHashAggr: 0.99
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: bigint)
+            Execution mode: vectorized
+        Reducer 2 
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Right Outer Join 0 to 1
+                keys:
+                  0 _col0 (type: int)
+                  1 _col0 (type: int)
+                nullSafes: [true]
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6, _col7, _col8
+                Statistics: Num rows: 3 Data size: 39 Basic stats: COMPLETE Column stats: NONE
+                Filter Operator
+                  predicate: _col2 (type: boolean)
+                  Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: _col3 (type: int), _col4 (type: bigint), _col5 (type: string), _col6 (type: bigint), _col0 (type: int), _col1 (type: bigint)
+                    outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+                    Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int), _col1 (type: bigint), _col2 (type: string), _col3 (type: bigint)
+                      null sort order: aaaa
+                      sort order: ++++
+                      Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col4 (type: int), _col5 (type: bigint)
+                Filter Operator
+                  predicate: _col2 (type: boolean)
+                  Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: _col7 (type: int), CASE WHEN (_col1 is null) THEN (_col8) WHEN (_col8 is null) THEN (_col1) ELSE ((_col8 + _col1)) END (type: bigint)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                          output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                          serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                          name: default.mat1
+                Filter Operator
+                  predicate: _col2 is null (type: boolean)
+                  Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: _col7 (type: int), CASE WHEN (_col1 is null) THEN (_col8) WHEN (_col8 is null) THEN (_col1) ELSE ((_col8 + _col1)) END (type: bigint)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                          output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                          serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                          name: default.mat1
+        Reducer 3 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: bigint), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: bigint), VALUE._col0 (type: int), VALUE._col1 (type: bigint)
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
+                Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 13 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+                      output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+                      serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+                      name: default.mat1
+        Reducer 5 
+            Execution mode: vectorized
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                keys: KEY._col0 (type: int)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: int)
+                  null sort order: z
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: int)
+                  Statistics: Num rows: 2 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col1 (type: bigint)
 
-Stage-4
-  Materialized View Update{"name:":"default.mat1","update creation metadata:":"true"}
-    Stage-3
-      Stats Work{}
-        Stage-0
-          Move Operator
-            table:{"name:":"default.mat1"}
-            Stage-2
-              Dependency Collection{}
-                Stage-1
-                  Reducer 2 vectorized
-                  File Output Operator [FS_11]
-                    table:{"name:":"default.mat1"}
-                    Group By Operator [GBY_10] (rows=2 width=8)
-                      Output:["_col0","_col1"],aggregations:["count(VALUE._col0)"],keys:KEY._col0
-                    <-Map 1 [SIMPLE_EDGE] vectorized
-                      SHUFFLE [RS_9]
-                        PartitionCols:_col0
-                        Group By Operator [GBY_8] (rows=4 width=8)
-                          Output:["_col0","_col1"],aggregations:["count(c)"],keys:a
-                          Select Operator [SEL_7] (rows=4 width=8)
-                            Output:["a","c"]
-                            TableScan [TS_0] (rows=4 width=8)
-                              default@tbl_ice,tbl_ice,Tbl:COMPLETE,Col:NONE,Output:["a","c"]
+  Stage: Stage-4
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: false
+          table:
+              input format: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+              output format: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+              serde: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+              name: default.mat1
+
+  Stage: Stage-5
+    Stats Work
+      Basic Stats Work:
+
+  Stage: Stage-6
+    Materialized View Update
+      name: default.mat1
+      update creation metadata: true
 
 PREHOOK: query: alter materialized view mat1 rebuild
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@mat1
+PREHOOK: Output: default@mat1
 POSTHOOK: query: alter materialized view mat1 rebuild
 POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@mat1
+POSTHOOK: Output: default@mat1
 PREHOOK: query: select * from mat1
 PREHOOK: type: QUERY
 PREHOOK: Input: default@mat1
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
index 9f0b99fe33e..c7ab47cc637 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
@@ -536,16 +536,17 @@ public class AlterMaterializedViewRebuildAnalyzer extends CalcitePlanner {
 
   private void fixUpASTAggregateIncrementalRebuild(
           ASTNode subqueryNodeInputROJ,
-          ASTNode updateNode,
+          ASTNode updateInsertNode,
           Map<Context.DestClausePrefix, ASTNode> disjuncts,
           MaterializedViewASTBuilder astBuilder)
           throws SemanticException {
-    // Replace INSERT OVERWRITE by MERGE equivalent rewriting.
+    // Replace INSERT OVERWRITE by MERGE equivalent rewriting. The update branch is
+    // split to a delete (updateDeleteNode) and an insert (updateInsertNode) branch
     // Here we need to do this complex AST rewriting that generates the same plan
     // that a MERGE clause would generate because CBO does not support MERGE yet.
     // TODO: Support MERGE as first class member in CBO to simplify this logic.
     // 1) Replace INSERT OVERWRITE by INSERT
-    ASTNode destinationNode = (ASTNode) updateNode.getChild(0);
+    ASTNode destinationNode = (ASTNode) updateInsertNode.getChild(0);
     ASTNode newInsertInto = (ASTNode) ParseDriver.adaptor.create(
             HiveParser.TOK_INSERT_INTO, "TOK_INSERT_INTO");
     newInsertInto.addChildren(destinationNode.getChildren());
@@ -560,10 +561,10 @@ public class AlterMaterializedViewRebuildAnalyzer extends CalcitePlanner {
     // 2) Copy INSERT branch and duplicate it, the first branch will be the UPDATE
     // for the MERGE statement while the new branch will be the INSERT for the
     // MERGE statement
-    ASTNode updateParent = (ASTNode) updateNode.getParent();
-    ASTNode insertNode = (ASTNode) ParseDriver.adaptor.dupTree(updateNode);
+    ASTNode updateParent = (ASTNode) updateInsertNode.getParent();
+    ASTNode insertNode = (ASTNode) ParseDriver.adaptor.dupTree(updateInsertNode);
     insertNode.setParent(updateParent);
-    // 3) Create ROW_ID column in select clause from left input for the RIGHT OUTER JOIN.
+    // 3) Add sort columns (ROW_ID in case of native) to select clause from left input for the RIGHT OUTER JOIN.
     // This is needed for the UPDATE clause. Hence, we find the following node:
     // TOK_QUERY
     //   TOK_FROM
@@ -589,14 +590,14 @@ public class AlterMaterializedViewRebuildAnalyzer extends CalcitePlanner {
             .forEach(astNode -> ParseDriver.adaptor.addChild(selectNodeInputROJ, astNode));
     // 4) Transform first INSERT branch into an UPDATE
     // 4.1) Modifying filter condition.
-    ASTNode whereClauseInUpdate = findWhereClause(updateNode);
+    ASTNode whereClauseInUpdate = findWhereClause(updateInsertNode);
     if (whereClauseInUpdate.getChild(0).getType() != HiveParser.KW_OR) {
       throw new SemanticException("OR clause expected below TOK_WHERE in incremental rewriting");
     }
     // We bypass the OR clause and select the first disjunct for the Update branch
     ParseDriver.adaptor.setChild(whereClauseInUpdate, 0, disjuncts.get(Context.DestClausePrefix.UPDATE));
-    ASTNode updateDeleteNode = (ASTNode) ParseDriver.adaptor.dupTree(updateNode);
-    // 4.2) Adding ROW__ID field
+    ASTNode updateDeleteNode = (ASTNode) ParseDriver.adaptor.dupTree(updateInsertNode);
+    // 4.2) Adding acid sort columns (ROW__ID in case of native acid query StorageHandler otherwise)
     ASTNode selectNodeInUpdateDelete = (ASTNode) updateDeleteNode.getChild(1);
     if (selectNodeInUpdateDelete.getType() != HiveParser.TOK_SELECT) {
       throw new SemanticException("TOK_SELECT expected in incremental rewriting got "
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/NonNativeAcidMaterializedViewASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/NonNativeAcidMaterializedViewASTBuilder.java
index e29548d9a12..f9d50a809d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/NonNativeAcidMaterializedViewASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/NonNativeAcidMaterializedViewASTBuilder.java
@@ -20,14 +20,11 @@ package org.apache.hadoop.hive.ql.ddl.view.materialized.alter.rebuild;
 
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.singletonList;
-
 public class NonNativeAcidMaterializedViewASTBuilder extends MaterializedViewASTBuilder {
   private final Table mvTable;
 
@@ -44,6 +41,8 @@ public class NonNativeAcidMaterializedViewASTBuilder extends MaterializedViewAST
 
   @Override
   protected List<ASTNode> createAcidSortNodesInternal(String tableName) {
-    return singletonList(createQualifiedColumnNode(tableName, VirtualColumn.ROWID.getName()));
+    return mvTable.getStorageHandler().acidSortColumns(mvTable, Context.Operation.DELETE).stream()
+            .map(fieldSchema -> createQualifiedColumnNode(tableName, fieldSchema.getName()))
+            .collect(Collectors.toList());
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 5174ef13721..772947c25d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -3096,6 +3096,8 @@ public class AcidUtils {
         assert t != null;
         if (AcidUtils.isTransactionalTable(t) && sharedWrite) {
           compBuilder.setSharedWrite();
+        } else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) {
+          compBuilder.setLock(getLockTypeFromStorageHandler(output, t));
         } else {
           compBuilder.setExclWrite();
         }