You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2024/01/02 19:04:04 UTC

(impala) branch master updated: IMPALA-12658: UPDATE Iceberg table FROM view throws IllegalStateException

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d9393643 IMPALA-12658: UPDATE Iceberg table FROM view throws IllegalStateException
4d9393643 is described below

commit 4d9393643515a6a5edba6b1126e2e404f68ea539
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Dec 20 17:45:37 2023 +0100

    IMPALA-12658: UPDATE Iceberg table FROM view throws IllegalStateException
    
    UPDATE FROM statement throws exception when updating Iceberg target
    table based on a view. This happens because we didn't correctly
    substitute all the expressions used by the IcebergUpdateImpl
    class.
    
    This patch fixes expression substitions, so from now VIEWs can also
    be used to UPDATE Iceberg tables.
    
    Testing:
     * added e2e test
     * added Ranger Column Masking test
    
    Change-Id: I80ccdb61327a50082f792a6d51f946b11c467dab
    Reviewed-on: http://gerrit.cloudera.org:8080/20825
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/IcebergModifyImpl.java  |  1 +
 .../apache/impala/analysis/IcebergUpdateImpl.java  |  7 +++---
 .../org/apache/impala/analysis/KuduModifyImpl.java |  1 +
 .../org/apache/impala/analysis/ModifyImpl.java     |  8 +++++--
 .../java/org/apache/impala/planner/Planner.java    |  7 +++---
 .../queries/QueryTest/iceberg-update-basic.test    | 18 +++++++++++++++
 .../queries/QueryTest/ranger_column_masking.test   | 26 +++++++++++++++++++++-
 tests/query_test/test_iceberg.py                   |  4 ++--
 8 files changed, 60 insertions(+), 12 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
index 5a816c6c0..d1084ab60 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
@@ -78,6 +78,7 @@ abstract class IcebergModifyImpl extends ModifyImpl {
 
   @Override
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    super.substituteResultExprs(smap, analyzer);
     sortExprs_ = Expr.substituteList(sortExprs_, smap, analyzer, true);
     deleteResultExprs_ = Expr.substituteList(deleteResultExprs_, smap, analyzer, true);
     deletePartitionKeyExprs_ = Expr.substituteList(
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
index 29cbcb467..d4a5595ae 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
@@ -135,15 +135,14 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
       if (expr == null) expr = createSlotRef(analyzer, col.getName());
       insertResultExprs_.add(expr);
     }
-    selectList.addAll(ExprUtil.exprsAsSelectList(insertResultExprs_));
     IcebergUtil.populatePartitionExprs(analyzer, null, columns,
         insertResultExprs_, originalTargetTable_, insertPartitionKeyExprs_, null);
-
     deletePartitionKeyExprs_ = getDeletePartitionExprs(analyzer);
     deleteResultExprs_ = getDeleteResultExprs(analyzer);
-    selectList.addAll(ExprUtil.exprsAsSelectList(deletePartitionKeyExprs_));
+    selectList.addAll(ExprUtil.exprsAsSelectList(insertResultExprs_));
+    selectList.addAll(ExprUtil.exprsAsSelectList(insertPartitionKeyExprs_));
     selectList.addAll(ExprUtil.exprsAsSelectList(deleteResultExprs_));
-
+    selectList.addAll(ExprUtil.exprsAsSelectList(deletePartitionKeyExprs_));
     addSortColumns();
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
index 8191d4725..d82919a7c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
@@ -140,6 +140,7 @@ abstract class KuduModifyImpl extends ModifyImpl {
 
   @Override
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    super.substituteResultExprs(smap, analyzer);
     resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
index 099d4483d..d6a43fe47 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
@@ -55,12 +55,16 @@ abstract class ModifyImpl {
   /**
    * Substitutes the result expressions, partition key expressions with smap.
    * Preserves the original types of those expressions during the substitution.
-   * It is usually invoked when a SORT node is added to the plan because the
+   * It is usually invoked when a SORT node or a VIEW is involved.
    * SORT node materializes sort expressions into the sort tuple, so after the
    * SORT node we only need to have slot refs to the materialized exprs. 'smap'
    * contains the mapping from the original exprs to the materialized slot refs.
+   * When VIEWs are involved, the slot references also need to be substituted with
+   * references to the actual base tables.
    */
-  abstract void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer);
+  void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    sourceStmt_.substituteResultExprs(smap, analyzer);
+  }
 
   // The Modify statement for this modify impl. The ModifyStmt class holds information
   // about the statement (e.g. target table type, FROM, WHERE clause, etc.)
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 48bf339e8..a43389f88 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -170,9 +170,6 @@ public class Planner {
       // set up table sink for root fragment
       rootFragment.setSink(insertStmt.createDataSink());
     } else {
-      QueryStmt queryStmt = ctx_.getQueryStmt();
-      queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
-      List<Expr> resultExprs = queryStmt.getResultExprs();
       if (ctx_.isUpdate() || ctx_.isDelete()) {
         DmlStatementBase stmt;
         if (ctx_.isUpdate()) {
@@ -181,6 +178,7 @@ public class Planner {
           stmt = ctx_.getAnalysisResult().getDeleteStmt();
         }
         Preconditions.checkNotNull(stmt);
+        stmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
         if (stmt.getTargetTable() instanceof FeIcebergTable) {
           rootFragment = createIcebergDmlPlanFragment(
               rootFragment, distributedPlanner, stmt, fragments);
@@ -188,6 +186,9 @@ public class Planner {
         // Set up update sink for root fragment
         rootFragment.setSink(stmt.createDataSink());
       } else if (ctx_.isQuery()) {
+        QueryStmt queryStmt = ctx_.getQueryStmt();
+        queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
+        List<Expr> resultExprs = queryStmt.getResultExprs();
         rootFragment.setSink(
             ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs));
       }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
index 183b9aa6c..f3e2c81c6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
@@ -248,6 +248,24 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_id_partitioned/data/p=1/del
 STRING, STRING, STRING, STRING
 ====
 ---- QUERY
+# Update partitioned Iceberg table based on VIEW
+create table ref_table_1 (ri int, rs string);
+insert into ref_table_1 values (1, 'Apache Impala');
+create table ref_table_2 (ri int, rs string);
+insert into ref_table_2 values (4, 'Apache Spark');
+create view ref_view as select * from ref_table_1 union select * from ref_table_2;
+update ice_id_partitioned SET s = rs
+FROM ice_id_partitioned JOIN ref_view ON (i = ri);
+---- DML_RESULTS: ice_id_partitioned
+1,0,'Apache Impala'
+2,0,'iceberg'
+3,0,'hive'
+4,1,'Apache Spark'
+5,2,'Kudu'
+---- TYPES
+INT,INT,STRING
+====
+---- QUERY
 # Negative test for UPDATE part 3:
 # updating partition column AND right side is non-constant value AND we have a FROM clause with multiple table refs.
 # For such operations, if there are multiple matches in the JOIN, the duplicated records can
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
index e6a766ac2..e0cb3cff9 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -662,7 +662,7 @@ select * from $UNIQUE_DB.masked_tbl;
 INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
 ====
 ---- QUERY
-# Test UPDATE stmt
+# Test UPDATE stmt for Kudu target when source is a masked table.
 create table $UNIQUE_DB.masked_kudu (id int primary key, string_col string) stored as kudu;
 insert into $UNIQUE_DB.masked_kudu select id, string_col from alltypestiny;
 insert into $UNIQUE_DB.masked_kudu values (1, NULL), (3, NULL), (5, NULL);
@@ -685,6 +685,30 @@ select * from $UNIQUE_DB.masked_kudu;
 INT, STRING
 ====
 ---- QUERY
+# Test UPDATE stmt for Iceberg target when source is a masked table.
+create table $UNIQUE_DB.masked_iceberg (id int, string_col string)
+stored as iceberg tblproperties ('format-version'='2');
+insert into $UNIQUE_DB.masked_iceberg select id, string_col from alltypestiny;
+insert into $UNIQUE_DB.masked_iceberg values (1, NULL), (3, NULL), (5, NULL);
+update k set k.string_col=a.string_col
+from $UNIQUE_DB.masked_iceberg k JOIN alltypestiny a ON (k.id=a.id);
+select * from $UNIQUE_DB.masked_iceberg;
+---- RESULTS
+0,'0aaa'
+1,'NULL'
+3,'NULL'
+5,'NULL'
+100,'1aaa'
+200,'0aaa'
+300,'1aaa'
+400,'0aaa'
+500,'1aaa'
+600,'0aaa'
+700,'1aaa'
+---- TYPES
+INT, STRING
+====
+---- QUERY
 # Test on CreateView. Should not mask the columns when used in sql generations.
 create view $UNIQUE_DB.masked_view as select * from alltypestiny;
 show create view $UNIQUE_DB.masked_view;
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 3524e68ea..a505dfea3 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1407,10 +1407,10 @@ class TestIcebergV2Table(IcebergTestSuite):
 
     hive_results = get_hive_results("ice_id_partitioned", "i")
     assert hive_results == \
-        "1,0,Impala\n"     \
+        "1,0,Apache Impala\n"     \
         "2,0,iceberg\n"    \
         "3,0,hive\n"       \
-        "4,1,spark\n"      \
+        "4,1,Apache Spark\n"      \
         "5,2,Kudu\n"
 
     hive_results = get_hive_results("ice_bucket_transform", "i")