You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/09/28 17:36:08 UTC

[impala] branch master updated (0063ccc83 -> 2d3289027)

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 0063ccc83 IMPALA-12371: Add better cardinality estimation for Iceberg V2 tables with deletes
     new eb0e2bbf9 IMPALA-12313: (part 1) Refactor modify statements
     new 1f82106af IMPALA-12089: Be able to skip pushing down a subset of the predicates in Iceberg
     new 2d3289027 IMPALA-12406: OPTIMIZE statement as an alias for INSERT OVERWRITE

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/service/query-options.cc                    |   3 +
 be/src/service/query-options.h                     |   5 +-
 common/thrift/ImpalaService.thrift                 |   4 +
 common/thrift/Query.thrift                         |   3 +
 fe/src/main/cup/sql-parser.cup                     |  17 +-
 .../apache/impala/analysis/AnalysisContext.java    |  14 +-
 .../org/apache/impala/analysis/DeleteStmt.java     |  25 +-
 .../apache/impala/analysis/DmlStatementBase.java   |   2 +
 .../{AlterDbStmt.java => IcebergDeleteImpl.java}   |  40 ++-
 .../analysis/IcebergExpressionCollector.java       |  84 +++++
 .../apache/impala/analysis/IcebergModifyImpl.java  | 101 ++++++
 .../org/apache/impala/analysis/InsertStmt.java     |   3 +-
 .../org/apache/impala/analysis/KuduDeleteImpl.java |  46 +++
 .../org/apache/impala/analysis/KuduModifyImpl.java |  63 ++++
 .../org/apache/impala/analysis/KuduUpdateImpl.java |  45 +++
 .../org/apache/impala/analysis/ModifyImpl.java     | 268 +++++++++++++++
 .../org/apache/impala/analysis/ModifyStmt.java     | 361 +++------------------
 .../org/apache/impala/analysis/OptimizeStmt.java   | 109 +++++++
 .../org/apache/impala/analysis/UpdateStmt.java     |  23 +-
 .../apache/impala/planner/IcebergScanPlanner.java  |  88 +++--
 .../java/org/apache/impala/planner/Planner.java    |   2 +-
 .../org/apache/impala/planner/PlannerContext.java  |   4 +-
 .../java/org/apache/impala/service/Frontend.java   |   5 +-
 fe/src/main/jflex/sql-scanner.flex                 |   1 +
 .../org/apache/impala/analysis/ParserTest.java     |   4 +-
 .../org/apache/impala/planner/PlannerTest.java     |  12 +
 .../iceberg-predicates-disabled-subsetting.test    |  32 ++
 .../queries/PlannerTest/iceberg-predicates.test    | 108 +++++-
 .../PlannerTest/iceberg-v2-tables-hash-join.test   |  48 ++-
 .../queries/PlannerTest/iceberg-v2-tables.test     |  48 ++-
 .../queries/PlannerTest/insert-sort-by-zorder.test |  27 ++
 .../queries/PlannerTest/tablesample.test           |   7 +-
 .../queries/QueryTest/iceberg-negative.test        |  18 +-
 .../queries/QueryTest/iceberg-optimize.test        | 123 +++++++
 .../queries/QueryTest/ranger_column_masking.test   |   6 +
 tests/authorization/test_ranger.py                 |   5 +
 tests/query_test/test_iceberg.py                   |  31 ++
 37 files changed, 1340 insertions(+), 445 deletions(-)
 copy fe/src/main/java/org/apache/impala/analysis/{AlterDbStmt.java => IcebergDeleteImpl.java} (50%)
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/IcebergExpressionCollector.java
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates-disabled-subsetting.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test


[impala] 03/03: IMPALA-12406: OPTIMIZE statement as an alias for INSERT OVERWRITE

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2d3289027c2ffdd245d13b60e6fa3f9b3e7bf833
Author: Noemi Pap-Takacs <np...@cloudera.com>
AuthorDate: Mon Jul 10 11:04:27 2023 +0200

    IMPALA-12406: OPTIMIZE statement as an alias for INSERT OVERWRITE
    
    If an Iceberg table is frequently updated/written to in small batches,
    a lot of small files are created. This decreases read performance.
    Similarly, frequent row-level deletes contribute to this problem
    by creating delete files, which have to be merged on read.
    
    So far INSERT OVERWRITE (rewriting the table with itself) has been used
    to compact Iceberg tables.
    However, it comes with some RESTRICTIONS:
    - The table should not have multiple partition specs/partition evolution.
    - The table should not contain complex types.
    
    The OPTIMIZE statement offers a new syntax and a solution limited to
    Iceberg tables to enhance read performance for subsequent operations.
    See IMPALA-12293 for details.
    
    Syntax: OPTIMIZE TABLE <table_name>;
    
    This first patch introduces the new syntax, temporarily as an alias
    for INSERT OVERWRITE.
    
    Note that executing OPTIMIZE TABLE requires ALL privileges.
    
    Testing:
     - negative tests
     - FE planner test
     - Ranger test
     - E2E tests
    
    Change-Id: Ief42537499ffe64fafdefe25c8d175539234c4e7
    Reviewed-on: http://gerrit.cloudera.org:8080/20405
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/cup/sql-parser.cup                     |  17 ++-
 .../apache/impala/analysis/AnalysisContext.java    |  14 ++-
 .../org/apache/impala/analysis/InsertStmt.java     |   3 +-
 .../org/apache/impala/analysis/OptimizeStmt.java   | 109 ++++++++++++++++++
 .../org/apache/impala/planner/PlannerContext.java  |   4 +-
 .../java/org/apache/impala/service/Frontend.java   |   5 +-
 fe/src/main/jflex/sql-scanner.flex                 |   1 +
 .../org/apache/impala/analysis/ParserTest.java     |   4 +-
 .../queries/PlannerTest/insert-sort-by-zorder.test |  27 +++++
 .../queries/QueryTest/iceberg-negative.test        |  18 ++-
 .../queries/QueryTest/iceberg-optimize.test        | 123 +++++++++++++++++++++
 .../queries/QueryTest/ranger_column_masking.test   |   6 +
 tests/authorization/test_ranger.py                 |   5 +
 tests/query_test/test_iceberg.py                   |  31 ++++++
 14 files changed, 356 insertions(+), 11 deletions(-)

diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index eca1ee3b8..e46a57307 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -318,7 +318,7 @@ terminal
   KW_JOIN, KW_JSONFILE, KW_KUDU, KW_LAST, KW_LEFT, KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES,
   KW_LOAD, KW_LOCATION, KW_LOGICAL_OR, KW_MANAGED_LOCATION, KW_MAP, KW_MERGE_FN,
   KW_METADATA, KW_MINUS, KW_NON, KW_NORELY, KW_NOT,
-  KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OF, KW_OFFSET, KW_ON, KW_OR,
+  KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OF, KW_OFFSET, KW_ON, KW_OPTIMIZE, KW_OR,
   KW_ORC, KW_ORDER, KW_OUTER,
   KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED,
   KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED, KW_PURGE,
@@ -462,6 +462,7 @@ nonterminal Expr sign_chain_expr;
 nonterminal InsertStmt insert_stmt, upsert_stmt;
 nonterminal UpdateStmt update_stmt;
 nonterminal DeleteStmt delete_stmt;
+nonterminal OptimizeStmt optimize_stmt;
 nonterminal List<Pair<SlotRef, Expr>> update_set_expr_list;
 nonterminal StatementBase explain_stmt;
 // Optional partition spec
@@ -673,6 +674,8 @@ stmt ::=
   {: RESULT = upsert; :}
   | delete_stmt:delete
   {: RESULT = delete; :}
+  | optimize_stmt:optimize
+  {: RESULT = optimize; :}
   | use_stmt:use
   {: RESULT = use; :}
   | show_tables_stmt:show_tables
@@ -837,6 +840,11 @@ explain_stmt ::=
      delete.setIsExplain();
      RESULT = delete;
   :}
+  | KW_EXPLAIN optimize_stmt:optimize
+  {:
+     optimize.setIsExplain();
+     RESULT = optimize;
+  :}
   ;
 
 copy_testcase_stmt ::=
@@ -986,6 +994,11 @@ delete_stmt ::=
   {: RESULT = new DeleteStmt(target_table, from, where); :}
   ;
 
+optimize_stmt ::=
+  KW_OPTIMIZE KW_TABLE table_name:table
+  {: RESULT = new OptimizeStmt(table); :}
+  ;
+
 opt_query_stmt ::=
   query_stmt:query
   {: RESULT = query; :}
@@ -4422,6 +4435,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_ON:r
   {: RESULT = r.toString(); :}
+  | KW_OPTIMIZE:r
+  {: RESULT = r.toString(); :}
   | KW_OR:r
   {: RESULT = r.toString(); :}
   | KW_ORC:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index a712cbaa8..c2776d04b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -95,6 +95,7 @@ public class AnalysisContext {
     public boolean isQueryStmt() { return stmt_ instanceof QueryStmt; }
     public boolean isSetOperationStmt() { return stmt_ instanceof SetOperationStmt; }
     public boolean isInsertStmt() { return stmt_ instanceof InsertStmt; }
+    public boolean isOptimizeStmt() { return stmt_ instanceof OptimizeStmt; }
     public boolean isDropDbStmt() { return stmt_ instanceof DropDbStmt; }
     public boolean isDropTableOrViewStmt() {
       return stmt_ instanceof DropTableOrViewStmt;
@@ -187,7 +188,7 @@ public class AnalysisContext {
     }
 
     public boolean isDmlStmt() {
-      return isInsertStmt() || isUpdateStmt() || isDeleteStmt();
+      return isInsertStmt() || isUpdateStmt() || isDeleteStmt() || isOptimizeStmt();
     }
 
     /**
@@ -197,7 +198,7 @@ public class AnalysisContext {
     public boolean isHierarchicalAuthStmt() {
       return isQueryStmt() || isInsertStmt() || isUpdateStmt() || isDeleteStmt()
           || isCreateTableAsSelectStmt() || isCreateViewStmt() || isAlterViewStmt()
-          || isTestCaseStmt();
+          || isOptimizeStmt() || isTestCaseStmt();
     }
 
     /**
@@ -296,9 +297,16 @@ public class AnalysisContext {
       return (QueryStmt) stmt_;
     }
 
+    public OptimizeStmt getOptimizeStmt() {
+      Preconditions.checkState(isOptimizeStmt());
+      return (OptimizeStmt) stmt_;
+    }
+
     public InsertStmt getInsertStmt() {
       if (isCreateTableAsSelectStmt()) {
         return getCreateTableAsSelectStmt().getInsertStmt();
+      } else if (isOptimizeStmt()) {
+        return getOptimizeStmt().getInsertStmt();
       } else {
         Preconditions.checkState(isInsertStmt());
         return (InsertStmt) stmt_;
@@ -407,7 +415,7 @@ public class AnalysisContext {
     }
     public boolean requiresExprRewrite() {
       return isQueryStmt() || isInsertStmt() || isCreateTableAsSelectStmt()
-          || isUpdateStmt() || isDeleteStmt();
+          || isUpdateStmt() || isDeleteStmt() || isOptimizeStmt();
     }
     public boolean requiresSetOperationRewrite() {
       return analyzer_.containsSetOperation() && !isCreateViewStmt() && !isAlterViewStmt()
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index ec9dc8946..433164ddf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -602,7 +602,8 @@ public class InsertStmt extends DmlStatementBase {
         if (iceTable.getPartitionSpecs().size() > 1) {
           throw new AnalysisException("The Iceberg table has multiple partition specs. " +
               "This means the outcome of dynamic partition overwrite is unforeseeable. " +
-              "Consider using TRUNCATE and INSERT INTO to overwrite your table.");
+              "Consider using TRUNCATE then INSERT INTO from the previous snapshot " +
+              "to overwrite your table.");
         }
         validateBucketTransformForOverwrite(iceTable);
       }
diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
new file mode 100644
index 000000000..519823e5a
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.base.Preconditions;
+import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.rewrite.ExprRewriter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Representation of an OPTIMIZE statement used to execute table maintenance tasks in
+ * Iceberg tables, such as:
+ * 1. compacting small files,
+ * 2. merging delete deltas.
+ * Currently, it executes these tasks as an alias for INSERT OVERWRITE:
+ * OPTIMIZE TABLE tbl; -->> INSERT OVERWRITE TABLE tbl SELECT * FROM tbl;
+ */
+public class OptimizeStmt extends DmlStatementBase {
+
+  // INSERT OVERWRITE statement that this OPTIMIZE statement is translated to.
+  private InsertStmt insertStmt_;
+  // Target table name as seen by the parser.
+  private final TableName originalTableName_;
+  // Target table that should be compacted. May be qualified by analyze().
+  private TableName tableName_;
+
+  public OptimizeStmt(TableName tableName) {
+    tableName_ = tableName;
+    originalTableName_ = tableName_;
+    List<SelectListItem> selectListItems = new ArrayList<>();
+    selectListItems.add(SelectListItem.createStarItem(null));
+    SelectList selectList = new SelectList(selectListItems);
+    List<TableRef> tableRefs = new ArrayList<>();
+    tableRefs.add(new TableRef(tableName.toPath(), null));
+    QueryStmt queryStmt = new SelectStmt(selectList, new FromClause(tableRefs), null,
+        null, null, null, null);
+    insertStmt_ = new InsertStmt(null, tableName, true, null, null, null, queryStmt,
+        null, false);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    if (isAnalyzed()) return;
+    super.analyze(analyzer);
+    insertStmt_.analyze(analyzer);
+
+    Preconditions.checkState(table_ == null);
+    if (!tableName_.isFullyQualified()) {
+      tableName_ = new TableName(analyzer.getDefaultDb(), tableName_.getTbl());
+    }
+    table_ = analyzer.getTable(tableName_, Privilege.ALL);
+    Preconditions.checkState(table_ == insertStmt_.getTargetTable());
+    if (!(table_ instanceof FeIcebergTable)) {
+      throw new AnalysisException("OPTIMIZE is only supported for Iceberg tables.");
+    }
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    tableName_ = originalTableName_;
+    insertStmt_.reset();
+  }
+
+
+  public InsertStmt getInsertStmt() { return insertStmt_; }
+
+  @Override
+  public List<Expr> getPartitionKeyExprs() {
+    return insertStmt_.getPartitionKeyExprs();
+  }
+
+  @Override
+  public List<Expr> getSortExprs() {
+    return insertStmt_.getSortExprs();
+  }
+
+  @Override
+  public void collectTableRefs(List<TableRef> tblRefs) {
+    insertStmt_.collectTableRefs(tblRefs);
+  }
+
+  @Override
+  public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
+    Preconditions.checkState(isAnalyzed());
+    insertStmt_.rewriteExprs(rewriter);
+  }
+
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
index 802b73893..03418e578 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
@@ -103,7 +103,9 @@ public class PlannerContext {
   public PlanNodeId getNextNodeId() { return nodeIdGenerator_.getNextId(); }
   public PlanFragmentId getNextFragmentId() { return fragmentIdGenerator_.getNextId(); }
   public boolean isInsertOrCtas() {
-    return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt();
+    //TODO: IMPALA-12412: remove isOptimizeStmt().
+    return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt()
+        || analysisResult_.isOptimizeStmt();
   }
   public boolean isInsert() { return analysisResult_.isInsertStmt(); }
   public boolean isCtas() { return analysisResult_.isCreateTableAsSelectStmt(); }
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 398a14854..2821dfc5e 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2381,7 +2381,8 @@ public class Frontend {
         }
       }
       if (!analysisResult.isExplainStmt() &&
-          (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())) {
+          (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt()
+          || analysisResult.isOptimizeStmt())) {
         InsertStmt insertStmt = analysisResult.getInsertStmt();
         FeTable targetTable = insertStmt.getTargetTable();
         if (AcidUtils.isTransactionalTable(
@@ -2508,7 +2509,7 @@ public class Frontend {
         result.query_exec_request.stmt_type = result.stmt_type;
         // fill in the metadata
         result.setResult_set_metadata(createQueryResultSetMetadata(analysisResult));
-      } else if (analysisResult.isInsertStmt() ||
+      } else if (analysisResult.isInsertStmt() || analysisResult.isOptimizeStmt() ||
           analysisResult.isCreateTableAsSelectStmt()) {
         // For CTAS the overall TExecRequest statement type is DDL, but the
         // query_exec_request should be DML
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 45cf79da9..481a7bd2f 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -198,6 +198,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("of", SqlParserSymbols.KW_OF);
     keywordMap.put("offset", SqlParserSymbols.KW_OFFSET);
     keywordMap.put("on", SqlParserSymbols.KW_ON);
+    keywordMap.put("optimize", SqlParserSymbols.KW_OPTIMIZE);
     keywordMap.put("or", SqlParserSymbols.KW_OR);
     keywordMap.put("||", SqlParserSymbols.KW_LOGICAL_OR);
     keywordMap.put("orc", SqlParserSymbols.KW_ORC);
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index e7c5214ff..d8972bc7a 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3579,8 +3579,8 @@ public class ParserTest extends FrontendTestBase {
         "^\n" +
         "Encountered: IDENTIFIER\n" +
         "Expected: ALTER, COMMENT, COMPUTE, COPY, CREATE, DELETE, DESCRIBE, DROP, " +
-            "EXPLAIN, GRANT, INSERT, INVALIDATE, LOAD, REFRESH, REVOKE, SELECT, SET, " +
-            "SHOW, TRUNCATE, UNSET, UPDATE, UPSERT, USE, VALUES, WITH\n");
+            "EXPLAIN, GRANT, INSERT, INVALIDATE, LOAD, OPTIMIZE, REFRESH, REVOKE, " +
+            "SELECT, SET, SHOW, TRUNCATE, UNSET, UPDATE, UPSERT, USE, VALUES, WITH\n");
 
     // missing select list
     ParserError("select from t",
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
index 76c8a81c4..a6dbe3f93 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
@@ -389,3 +389,30 @@ WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(b.`year`,
    HDFS partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> b.id
    row-size=12B cardinality=7.30K
+====
+# IMPALA-12293: Verify that OPTIMIZE respects the sorting properties.
+optimize table functional_parquet.iceberg_partition_transforms_zorder
+---- PLAN
+WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
+|  partitions=unavailable
+|
+01:SORT
+|  order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j
+|  row-size=44B cardinality=0
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
+   HDFS partitions=1/1 files=0 size=0B
+   row-size=36B cardinality=0
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
+|  partitions=unavailable
+|
+02:SORT
+|  order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j
+|  row-size=44B cardinality=0
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
+   HDFS partitions=1/1 files=0 size=0B
+   row-size=36B cardinality=0
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 9d9cc820e..7dfc2c287 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -779,9 +779,25 @@ delete from ice_delete where i = 1;
 AnalysisException: Impala can only write delete files in PARQUET, but the given table uses a different file format: $DATABASE.ice_delete
 ====
 ---- QUERY
-# Cannot delete from Iceberg table is write mode is not 'merge-on-read'
+# Cannot delete from Iceberg table if write mode is not 'merge-on-read'
 alter table ice_delete set tblproperties ('write.delete.format.default'='PARQUET', 'write.delete.mode'='copy-on-write');
 delete from ice_delete where i = 1;
 ---- CATCH
 AnalysisException: Unsupported delete mode: 'copy-on-write' for Iceberg table: $DATABASE.ice_delete
 ====
+---- QUERY
+optimize table non_iceberg_table;
+---- CATCH
+AnalysisException: OPTIMIZE is only supported for Iceberg tables.
+====
+---- QUERY
+optimize table iceberg_overwrite_bucket;
+---- CATCH
+AnalysisException: The Iceberg table has multiple partition specs. This means the outcome of dynamic partition overwrite is unforeseeable. Consider using TRUNCATE then INSERT INTO from the previous snapshot to overwrite your table.
+====
+---- QUERY
+CREATE TABLE ice_complex (id BIGINT NULL, int_array ARRAY<INT> NULL) STORED AS ICEBERG;
+optimize table ice_complex;
+---- CATCH
+AnalysisException: Unable to INSERT into target table ($DATABASE.ice_complex) because the column 'int_array' has a complex type 'ARRAY<INT>' and Impala doesn't support inserting into tables containing complex type columns
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
new file mode 100644
index 000000000..3912ce67b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
@@ -0,0 +1,123 @@
+====
+---- QUERY
+CREATE TABLE ice_optimize (i int, s string)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+# Insert rows one by one to write multiple small files.
+INSERT INTO ice_optimize VALUES(1, 'one');
+INSERT INTO ice_optimize VALUES(2, 'two');
+INSERT INTO ice_optimize VALUES(3, 'three');
+SHOW FILES IN ice_optimize;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# OPTIMIZE TABLE should create 1 data file.
+OPTIMIZE TABLE ice_optimize;
+SHOW FILES IN ice_optimize;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT * FROM ice_optimize;
+---- RESULTS
+1,'one'
+2,'two'
+3,'three'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+DELETE FROM ice_optimize WHERE i = 2;
+SHOW FILES IN ice_optimize;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/delete-.*parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# Checking that the delete file was merged and there is no delete file in the table.
+OPTIMIZE TABLE ice_optimize;
+SHOW FILES IN ice_optimize;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT * FROM ice_optimize;
+---- RESULTS
+1,'one'
+3,'three'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Schema evolution should work and return correct results according to the latest schema.
+ALTER TABLE ice_optimize DROP COLUMN s;
+ALTER TABLE ice_optimize ADD COLUMN b BOOLEAN;
+INSERT INTO ice_optimize VALUES(4, true);
+OPTIMIZE TABLE ice_optimize;
+SELECT * FROM ice_optimize;
+---- RESULTS
+1,NULL
+3,NULL
+4,true
+---- TYPES
+INT,BOOLEAN
+====
+---- QUERY
+CREATE TABLE ice_optimize_part
+PARTITIONED BY(i int)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='1');
+====
+---- QUERY
+# Insert values into each partition to write multiple small files in each.
+INSERT INTO ice_optimize_part VALUES(1), (2), (3);
+INSERT INTO ice_optimize_part VALUES(2), (3);
+INSERT INTO ice_optimize_part VALUES(1), (3);
+SHOW FILES IN ice_optimize_part;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# OPTIMIZE TABLE should create 1 data file per partition.
+OPTIMIZE TABLE ice_optimize_part;
+SHOW FILES IN ice_optimize_part;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
index 50297ae76..f7e5d9dfc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -733,6 +733,12 @@ compute stats functional.alltypestiny
 AuthorizationException: User '$USER' does not have privileges to execute 'ALTER' on: functional.alltypestiny
 ====
 ---- QUERY
+# Optimize statement on masked tables should be blocked, because reading and inserting masked data would result in data loss.
+optimize table functional_parquet.iceberg_partitioned
+---- CATCH
+AuthorizationException: User '$USER' does not have privileges to execute 'INSERT' on: functional_parquet.iceberg_partitioned
+====
+---- QUERY
 # Select masked INPUT_FILE__NAME plus all cols
 select input__file__name, * from alltypestiny order by id;
 ---- RESULTS
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 5b089bf88..936e9627c 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1481,6 +1481,11 @@ class TestRanger(CustomClusterTestSuite):
         "input__file__name",
         "CUSTOM", "mask_show_last_n({col}, 10, 'x', 'x', 'x', -1, '1')")
       policy_cnt += 1
+      # Add column masking policy to an Iceberg table.
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional_parquet", "iceberg_partitioned",
+        "id", "MASK_NULL")
+      policy_cnt += 1
       self.execute_query_expect_success(admin_client, "refresh authorization",
                                         user=ADMIN)
       self.run_test_case("QueryTest/ranger_column_masking", vector,
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 02cf985d9..d6317d268 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1301,3 +1301,34 @@ class TestIcebergV2Table(IcebergTestSuite):
     hive_output = self.run_stmt_in_hive("SELECT id FROM {} ORDER BY id".format(ice_t))
     # Test that Hive sees the same rows deleted.
     assert hive_output == "id\n4\n5\n6\n7\n8\n"
+
+  def test_optimize(self, vector, unique_database):
+    tbl_name = unique_database + ".optimize_iceberg"
+    self.execute_query("""create table {0} (i int)
+        stored as iceberg""".format(tbl_name))
+    self.execute_query("insert into {0} values (1);".format(tbl_name))
+    self.execute_query("insert into {0} values (2);".format(tbl_name))
+    self.execute_query("insert into {0} values (8);".format(tbl_name))
+    result_before_opt = self.execute_query("SELECT * FROM {}".format(tbl_name))
+    snapshots = get_snapshots(self.client, tbl_name, expected_result_size=3)
+    snapshot_before = snapshots[2]
+
+    # Check that a new snapshot is created after Iceberg table optimization.
+    self.execute_query("optimize table {0};".format(tbl_name))
+    snapshots = get_snapshots(self.client, tbl_name, expected_result_size=4)
+    snapshot_after = snapshots[3]
+    assert(snapshot_before.get_creation_time() < snapshot_after.get_creation_time())
+    # Check that the last snapshot's parent ID is the snapshot ID before 'OPTIMIZE TABLE'.
+    assert(snapshot_before.get_snapshot_id() == snapshot_after.get_parent_id())
+
+    result_after_opt = self.execute_query("SELECT * FROM {0}".format(tbl_name))
+    # Check that we get the same result from the table before and after 'OPTIMIZE TABLE'.
+    assert result_after_opt.data.sort() == result_before_opt.data.sort()
+
+    result_time_travel = self.execute_query(
+        "select * from {0} for system_version as of {1};".format(
+            tbl_name, snapshot_before.get_snapshot_id()))
+    # Check that time travel to the previous snapshot returns all results correctly.
+    assert result_after_opt.data.sort() == result_time_travel.data.sort()
+
+    self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database)


[impala] 02/03: IMPALA-12089: Be able to skip pushing down a subset of the predicates in Iceberg

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1f82106aff6fc2f0afb5a2c8ed754463955813a4
Author: Peter Rozsa <pr...@cloudera.com>
AuthorDate: Wed May 24 15:11:58 2023 +0200

    IMPALA-12089: Be able to skip pushing down a subset of the predicates in Iceberg
    
    This change adds a predicate filtering mechanism at planning time that
    locates Impala's predicates in the residual expressions from Iceberg
    planning. By locating all residual expressions, the remainder
    expression set can be calculated.
    
    The current implementation is an all-or-nothing filter, if 'planFiles()'
    (Iceberg API) returns no residual expression, then all Impala
    predicates can be skipped, if there's any residual expression, every
    Impala predicate is pushed down to the Impala scanner.
    
    Residual expressions are the remaining filter expressions after the
    pushdown of predicates into the Iceberg table scan. By locating the
    remainder expression, we can reduce the number of predicates that will
    be pushed down to the Impala scanner.
    
    After this change, the Iceberg residual expression handling is improved
    by locating the simple conjuncts in the residual expression and mapping
    back them to Impala conjuncts. For example, if the list of Impala
    conjuncts consists of two predicates 'col_i != 100' and 'col_s = "a"'
    and 'col_i' happens to be a partition column in the Iceberg table
    definition and Iceberg table scan can eliminate the expression, the
    residual expression will be 'col_s = "a"'. This expression can be mapped
    back as an Impala predicate, and any other expression can be removed
    from the effective Impala conjunct list, and pushed down to the scanner,
    skipping the unnecessary filtering of 'col_i'.
    
    If there's no residual expression, the behavior is the same as before,
    all predicate pushdown is skipped.
    If Impala is unable to match all residual expression to Impala conjuncts
    then all the conjunct are pushed dow to Impala scanner.
    
    This change offers the advantage of not pushing down already evaluated
    filters to the Impala scanner nodes, resulting in enhanced scanning
    performance. Additionally, if the filter expression affects columns that
    are unnecessary for the final result and can be filtered out during
    Iceberg's table scan, it leads to a reduced row size, thereby optimizing
    data retrieval and improving overall query efficiency.
    
    This solution is limited to cases where Impala's expression list
    contains only conjuncts, compound expressions are not supported, because
    partial elimination of compounds would involve expression rewrites in
    the Impala expression.
    
    A new query option is added: iceberg_predicate_pushdown_subsetting. The
    query option's default value is true. It can be turned off by setting it
    to false.
    
    Performance of the predicate location is measured on two edge cases:
     - 1000 expression, 999 skipped: on avreage 2 ms
     - 1000 expression, 1 skipped: on average 25 ms
    
    Tests:
     - planner test cases added for disabled mode
     - existing planner test cases adjusted
     - core tests passed
    
    Change-Id: I597f69ad03ecaf9e304613ef934654e3d9614ae8
    Reviewed-on: http://gerrit.cloudera.org:8080/20133
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options.cc                    |   3 +
 be/src/service/query-options.h                     |   5 +-
 common/thrift/ImpalaService.thrift                 |   4 +
 common/thrift/Query.thrift                         |   3 +
 .../analysis/IcebergExpressionCollector.java       |  84 ++++++++++++++++
 .../apache/impala/planner/IcebergScanPlanner.java  |  88 +++++++++++------
 .../org/apache/impala/planner/PlannerTest.java     |  12 +++
 .../iceberg-predicates-disabled-subsetting.test    |  32 ++++++
 .../queries/PlannerTest/iceberg-predicates.test    | 108 +++++++++++++++++++--
 .../PlannerTest/iceberg-v2-tables-hash-join.test   |  48 ++++++---
 .../queries/PlannerTest/iceberg-v2-tables.test     |  48 ++++++---
 .../queries/PlannerTest/tablesample.test           |   7 +-
 12 files changed, 370 insertions(+), 72 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 2db91041d..901acddbb 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1144,6 +1144,9 @@ Status impala::SetQueryOption(const string& key, const string& value,
         MemSpec mem_spec_val{};
         RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
         query_options->__set_mem_limit_coordinators(mem_spec_val.value);
+      }
+      case TImpalaQueryOptions::ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING: {
+        query_options->__set_iceberg_predicate_pushdown_subsetting(IsTrue(value));
         break;
       }
       default:
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 74de1b972..654d5199e 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::MEM_LIMIT_COORDINATORS + 1);                              \
+      TImpalaQueryOptions::ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING + 1);                   \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -306,7 +306,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)                                                       \
   QUERY_OPT_FN(mem_limit_coordinators, MEM_LIMIT_COORDINATORS,                           \
       TQueryOptionLevel::ADVANCED)                                                       \
-  ;
+  QUERY_OPT_FN(iceberg_predicate_pushdown_subsetting,                                    \
+      ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING, TQueryOptionLevel::DEVELOPMENT);
 
 /// Enforce practical limits on some query options to avoid undesired query state.
 static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 0ee327216..660501513 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -841,6 +841,10 @@ enum TImpalaQueryOptions {
   // a) an int (= number of bytes);
   // b) a float followed by "M" (MB) or "G" (GB)
   MEM_LIMIT_COORDINATORS = 164
+
+  // Enables predicate subsetting for Iceberg plan nodes. If enabled, expressions
+  // evaluated by Iceberg are not pushed down the scanner node.
+  ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING = 165;
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 033e756ed..edd827765 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -660,6 +660,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   165: optional i64 mem_limit_coordinators = 0;
+
+  // See comment in ImpalaService.thrift
+  166: optional bool iceberg_predicate_pushdown_subsetting = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergExpressionCollector.java b/fe/src/main/java/org/apache/impala/analysis/IcebergExpressionCollector.java
new file mode 100644
index 000000000..742388a24
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergExpressionCollector.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.expressions.BoundAggregate;
+import org.apache.iceberg.expressions.BoundPredicate;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor;
+import org.apache.iceberg.expressions.UnboundAggregate;
+import org.apache.iceberg.expressions.UnboundPredicate;
+
+/**
+ * Visitor implementation to traverse Iceberg expression tree and locate predicates.
+ */
+public class IcebergExpressionCollector extends ExpressionVisitor<List<Expression>> {
+  @Override
+  public List<Expression> alwaysTrue() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<Expression> alwaysFalse() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<Expression> not(List<Expression> result) {
+    return result;
+  }
+
+  @Override
+  public List<Expression> and(List<Expression> leftResult, List<Expression> rightResult) {
+    leftResult.addAll(rightResult);
+    return leftResult;
+  }
+
+  @Override
+  public List<Expression> or(List<Expression> leftResult, List<Expression> rightResult) {
+    leftResult.addAll(rightResult);
+    return leftResult;
+  }
+
+  @Override
+  public <T> List<Expression> predicate(BoundPredicate<T> pred) {
+    List<Expression> expressions = new ArrayList<>();
+    expressions.add(pred);
+    return expressions;
+  }
+
+  @Override
+  public <T> List<Expression> predicate(UnboundPredicate<T> pred) {
+    List<Expression> expressions = new ArrayList<>();
+    expressions.add(pred);
+    return expressions;
+  }
+
+  @Override
+  public <T, C> List<Expression> aggregate(BoundAggregate<T, C> agg) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public <T> List<Expression> aggregate(UnboundAggregate<T> agg) {
+    return Collections.emptyList();
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index d336f446c..0d4d37ca2 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -22,11 +22,14 @@ import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -37,6 +40,8 @@ import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.expressions.ExpressionVisitors;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Expression.Operation;
 import org.apache.iceberg.expressions.True;
@@ -47,6 +52,7 @@ import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.CompoundPredicate;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.IcebergExpressionCollector;
 import org.apache.impala.analysis.InPredicate;
 import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.JoinOperator;
@@ -80,7 +86,6 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
-import org.apache.impala.planner.IcebergDeleteNode;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TColumnStats;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
@@ -101,29 +106,31 @@ import org.slf4j.LoggerFactory;
  * class deals with such complexities.
  */
 public class IcebergScanPlanner {
-  private final static Logger LOG = LoggerFactory.getLogger(IcebergScanPlanner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(IcebergScanPlanner.class);
 
   private Analyzer analyzer_;
   private PlannerContext ctx_;
   private TableRef tblRef_;
   private List<Expr> conjuncts_;
   private MultiAggregateInfo aggInfo_;
-
-  // Iceberg compatible expressions that are pushed down to Iceberg for query planning.
-  private final List<Expression> icebergPredicates_ = new ArrayList<>();
-  // The Impala representation of the expressions in 'icebergPredicates_'
-  private final List<Expr> impalaIcebergPredicates_ = new ArrayList<>();
-  // Indicates whether we have to push down 'impalaIcebergPredicates' to Impala's scan
-  // node or has Iceberg already done the partition pruning and no further rows could be
-  // skipped using these filters.
-  private boolean canSkipPushingDownIcebergPredicates_ = false;
-
+  // Mapping for translated Impala expressions
+  private final Map<Expression, Expr> impalaIcebergPredicateMapping_ =
+      new LinkedHashMap<>();
+  // Residual expressions after Iceberg planning
+  private final Set<Expression> residualExpressions_ =
+      new TreeSet<>(Comparator.comparing(ExpressionUtil::toSanitizedString));
+  // Expressions filtered by Iceberg's planFiles, subset of 'translatedExpressions_''s
+  // values
+  private final Set<Expr> skippedExpressions_ =
+      new TreeSet<>(Comparator.comparingInt(System::identityHashCode));
+  // Impala expressions that can't be translated into Iceberg expressions
+  private final List<Expr> untranslatedExpressions_ = new ArrayList<>();
+  // Conjuncts on columns not involved in IDENTITY-partitioning.
+  private List<Expr> nonIdentityConjuncts_ = new ArrayList<>();
   private List<FileDescriptor> dataFilesWithoutDeletes_ = new ArrayList<>();
   private List<FileDescriptor> dataFilesWithDeletes_ = new ArrayList<>();
   private Set<FileDescriptor> deleteFiles_ = new HashSet<>();
 
-  // Conjuncts on columns not involved in IDENTITY-partitioning.
-  private List<Expr> nonIdentityConjuncts_ = new ArrayList<>();
 
   // Statistics about the data and delete files. Useful for memory estimates of the
   // ANTI JOIN
@@ -163,9 +170,8 @@ public class IcebergScanPlanner {
    *  - no time travel
    */
   private boolean needIcebergForPlanning() {
-    return
-        !icebergPredicates_.isEmpty() ||
-        tblRef_.getTimeTravelSpec() != null;
+    return !impalaIcebergPredicateMapping_.isEmpty()
+        || tblRef_.getTimeTravelSpec() != null;
   }
 
   private void setFileDescriptorsBasedOnFileStore() throws ImpalaException {
@@ -366,14 +372,14 @@ public class IcebergScanPlanner {
   private void filterFileDescriptors() throws ImpalaException {
     TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
 
-    canSkipPushingDownIcebergPredicates_ = true;
-    try (CloseableIterable<FileScanTask> fileScanTasks = IcebergUtil.planFiles(
-        getIceTable(), new ArrayList<Expression>(icebergPredicates_), timeTravelSpec)) {
+    try (CloseableIterable<FileScanTask> fileScanTasks =
+        IcebergUtil.planFiles(getIceTable(),
+            new ArrayList<>(impalaIcebergPredicateMapping_.keySet()), timeTravelSpec)) {
       long dataFilesCacheMisses = 0;
       for (FileScanTask fileScanTask : fileScanTasks) {
         Expression residualExpr = fileScanTask.residual();
         if (residualExpr != null && !(residualExpr instanceof True)) {
-          canSkipPushingDownIcebergPredicates_ = false;
+          residualExpressions_.add(residualExpr);
         }
         Pair<FileDescriptor, Boolean> fileDesc = getFileDescriptor(fileScanTask.file());
         if (!fileDesc.second) ++dataFilesCacheMisses;
@@ -409,14 +415,42 @@ public class IcebergScanPlanner {
   }
 
   private void filterConjuncts() {
-    if (canSkipPushingDownIcebergPredicates_) {
-      conjuncts_.removeAll(impalaIcebergPredicates_);
+    if (residualExpressions_.isEmpty()) {
+      conjuncts_.removeAll(impalaIcebergPredicateMapping_.values());
+      return;
     }
+    if (!analyzer_.getQueryOptions().iceberg_predicate_pushdown_subsetting) return;
+    trySubsettingPredicatesBeingPushedDown();
+  }
+
+  private boolean trySubsettingPredicatesBeingPushedDown() {
+    long startTime = System.currentTimeMillis();
+    List<Expr> expressionsToRetain = new ArrayList<>(untranslatedExpressions_);
+    for (Expression expression : residualExpressions_) {
+      List<Expression> locatedExpressions = ExpressionVisitors.visit(expression,
+          new IcebergExpressionCollector());
+      for (Expression located : locatedExpressions) {
+        Expr expr = impalaIcebergPredicateMapping_.get(located);
+        /* If we fail to locate any of the Iceberg residual expressions then we skip
+         filtering the predicates to be pushed down to Impala scanner.*/
+        if (expr == null) return false;
+        expressionsToRetain.add(expr);
+      }
+    }
+    skippedExpressions_.addAll(
+        conjuncts_.stream().filter(expr -> !expressionsToRetain.contains(expr)).collect(
+            Collectors.toSet()));
+    conjuncts_ = expressionsToRetain;
+    LOG.debug("Iceberg predicate pushdown subsetting took {} ms",
+        (System.currentTimeMillis() - startTime));
+    return true;
   }
 
   private List<Expr> getSkippedConjuncts() {
-    if (!canSkipPushingDownIcebergPredicates_) return Collections.emptyList();
-    return impalaIcebergPredicates_;
+    if (!residualExpressions_.isEmpty()) {
+      return new ArrayList<>(skippedExpressions_);
+    }
+    return new ArrayList<>(impalaIcebergPredicateMapping_.values());
   }
 
   private void updateDeleteStatistics() {
@@ -628,11 +662,11 @@ public class IcebergScanPlanner {
       throws ImpalaException {
     Expression predicate = convertIcebergPredicate(expr);
     if (predicate != null) {
-      icebergPredicates_.add(predicate);
-      impalaIcebergPredicates_.add(expr);
+      impalaIcebergPredicateMapping_.put(predicate, expr);
       LOG.debug("Push down the predicate: " + predicate + " to iceberg");
       return true;
     }
+    untranslatedExpressions_.add(expr);
     return false;
   }
 
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 497dcb8a7..54c753eca 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1274,6 +1274,18 @@ public class PlannerTest extends PlannerTestBase {
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
+  /**
+   * Checks exercising predicate pushdown with Iceberg tables, without predicate
+   * subsetting.
+   */
+  @Test
+  public void testDisabledIcebergPredicateSubsetting() {
+    TQueryOptions queryOptions = new TQueryOptions();
+    queryOptions.setIceberg_predicate_pushdown_subsetting(false);
+    runPlannerTestFile("iceberg-predicates-disabled-subsetting", "functional_parquet",
+        queryOptions, ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+  }
+
   /**
    * Check that Iceberg V2 table scans work as expected.
    */
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates-disabled-subsetting.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates-disabled-subsetting.test
new file mode 100644
index 000000000..5421100c1
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates-disabled-subsetting.test
@@ -0,0 +1,32 @@
+# If one or more non-partition predicates are present, then all predicates are pushed down
+# to the scan node. ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING query option is disabled for
+# these tests.
+select user from iceberg_partitioned where action = "download" and user = "Lisa";
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.97KB
+   predicates: `user` = 'Lisa', action = 'download'
+   row-size=24B cardinality=1
+====
+# If no residual expression remain after Iceberg's planning, all partition-based
+# predicates can be skipped
+select * from iceberg_partitioned where action = "download";
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.97KB
+   skipped Iceberg predicates: action = 'download'
+   row-size=44B cardinality=6
+====
+select * from iceberg_partitioned where action = "download" and event_time < "2022-01-01";
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.97KB
+   skipped Iceberg predicates: action = 'download', event_time < TIMESTAMP '2022-01-01 00:00:00'
+   row-size=44B cardinality=1
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test
index a682813a0..f3892e23f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test
@@ -34,22 +34,21 @@ PLAN-ROOT SINK
    skipped Iceberg predicates: `year` = 2010
    row-size=20B cardinality=730
 ====
-# Here both predicates are pushed to Iceberg and also to Impala's scan node. However,
-# here is a room for optimisation as we could skip pushing down 'year' to the scan node
-# as it won't filter further rows.
+# Here both predicates are pushed to Iceberg and only one is pushed to Impala's scan node,
+# 'year' predicate is filtered out, as it won't filter further rows
 SELECT id, int_col, string_col from iceberg_partition_evolution where year = 2010 and id > 1000;
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
    HDFS partitions=1/1 files=730 size=1.25MB
-   predicates: `year` = 2010, id > 1000
-   row-size=24B cardinality=730
+   predicates: id > 1000
+   skipped Iceberg predicates: `year` = 2010
+   row-size=20B cardinality=730
 ====
 # If we have predicates on partition columns with non-identity transform that could not
 # be pushed to Iceberg then all the predicates are also pushed to Impala's scan node.
-# However, here is a room for optimisation as we could skip pushing down 'year' to the
-# scan node as it won't filter further rows.
+# 'year' predicate is filtered out, as it won't filter further rows
 SELECT * FROM iceberg_partition_evolution
 WHERE year = 2010 AND date_string_col='061610';
 ---- PLAN
@@ -57,7 +56,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
    HDFS partitions=1/1 files=2 size=3.49KB
-   predicates: `year` = 2010, date_string_col = '061610'
+   predicates: date_string_col = '061610'
+   skipped Iceberg predicates: `year` = 2010
    row-size=40B cardinality=2
 ====
 # Checks when all the predicates are skipped in a count(*) query then the relevant
@@ -75,3 +75,95 @@ PLAN-ROOT SINK
    skipped Iceberg predicates: action = 'click'
    row-size=8B cardinality=6
 ====
+# List of predicates contains an untranslated expression: user = NULL.
+# (Iceberg predicate conversion can't handle ref = null expressions, it will result an
+# untranslated expression which must be pushed down to Impala's scanner.)
+select user from iceberg_partitioned where action = 'click' and user = null
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.85KB
+   predicates: `user` = NULL
+   skipped Iceberg predicates: action = 'click'
+   row-size=12B cardinality=1
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.85KB
+   predicates: `user` = NULL
+   skipped Iceberg predicates: action = 'click'
+   row-size=12B cardinality=1
+====
+# List of predicates contains an untranslated expression (action = NULL)
+select * from iceberg_partitioned where action = NULL and event_time < "2022-01-01";
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   predicates: action = NULL
+   skipped Iceberg predicates: event_time < TIMESTAMP '2022-01-01 00:00:00'
+   row-size=44B cardinality=2
+====
+# List of predicates contains an untranslated expression (action LIKE "d%") and a redisual
+# expression after Iceberg's filtering
+select * from iceberg_partitioned where action like "d%" and event_time < "2022-01-01" and id < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=9 size=10.33KB
+   predicates: id < 10, action LIKE 'd%'
+   skipped Iceberg predicates: event_time < TIMESTAMP '2022-01-01 00:00:00'
+   row-size=44B cardinality=1
+====
+# Compound expression partially evaluated by Iceberg, and cannot be mapped back to Impala expression
+select * from iceberg_partitioned where action like "d%" and (event_time < "2020-01-01" or id > 10)
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=10 size=11.42KB
+   predicates: (event_time < TIMESTAMP '2020-01-01 00:00:00' OR id > 10), action LIKE 'd%'
+   row-size=44B cardinality=1
+====
+# Predicate on a partition introduced by partition evolution pushed down to the scan node and
+# predicate on a partition that existed before partition evolution skipped.
+select * from iceberg_partition_evolution where month = 12 and year = 2010
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_evolution]
+   HDFS partitions=1/1 files=62 size=108.30KB
+   predicates: `month` = 12
+   skipped Iceberg predicates: `year` = 2010
+   row-size=40B cardinality=620
+====
+# Compound expression "(id > 5 or (id < 2))" returned as a residual expression, but separated
+# by the expression collector (IcebergExpressionCollector), mapping lookup fails, thus falling
+# back to the push down everything
+select * from iceberg_partitioned where action in ('click', 'view') and (id > 5 or (id < 2))
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=12 size=13.65KB
+   predicates: action IN ('click', 'view'), (id > 5 OR (id < 2))
+   row-size=44B cardinality=1
+====
+# IS NOT NULL predicate skipped on partitioned column 'action', predicate on
+# non-partitioned 'id' column is pushed down.
+select * from iceberg_partitioned where action is not null and id < 10
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=9 size=10.33KB
+   predicates: id < 10
+   skipped Iceberg predicates: action IS NOT NULL
+   row-size=44B cardinality=1
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables-hash-join.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables-hash-join.test
index 1ee7848d7..51b7b8c46 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables-hash-join.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables-hash-join.test
@@ -822,7 +822,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: id > 0, action = 'download'
+   predicates: id > 0
+   skipped Iceberg predicates: action = 'download'
    row-size=64B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -840,7 +841,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: id > 0, action = 'download'
+   predicates: id > 0
+   skipped Iceberg predicates: action = 'download'
    row-size=64B cardinality=1
 ====
 select * from iceberg_v2_partitioned_position_deletes
@@ -857,7 +859,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: `user` = 'Lisa', action = 'download'
+   predicates: `user` = 'Lisa'
+   skipped Iceberg predicates: action = 'download'
    row-size=64B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -875,7 +878,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: `user` = 'Lisa', action = 'download'
+   predicates: `user` = 'Lisa'
+   skipped Iceberg predicates: action = 'download'
    row-size=64B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (action = 'click' or action = 'view') and id > 0;
@@ -884,7 +888,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, action IN ('click', 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: action IN ('click', 'view')
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -893,7 +898,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, action IN ('click', 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: action IN ('click', 'view')
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where action in ('click', 'view') and id > 0;
@@ -902,7 +908,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, action IN ('click', 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: action IN ('click', 'view')
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -911,7 +918,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, action IN ('click', 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: action IN ('click', 'view')
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action = 'click') and id > 0;
@@ -920,7 +928,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=6 size=6.85KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click')
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -929,7 +938,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=6 size=6.85KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click')
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action = 'click' or action = 'view') and id > 0;
@@ -938,7 +948,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view')
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -947,7 +958,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view')
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action in ('click', 'view')) and id > 0;
@@ -956,7 +968,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view'))
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view'))
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -965,7 +978,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view'))
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view'))
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action > 'a') and id > 0;
@@ -974,7 +988,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=20 size=22.90KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a')
    row-size=32B cardinality=2
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -983,7 +998,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=20 size=22.90KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a')
    row-size=32B cardinality=2
 ====
 # All predicates are pushed down to Iceberg and won't filter any further rows. Skip pushing it to Scan node.
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
index c0f31b79a..00360ecdb 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
@@ -822,7 +822,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: id > 0, action = 'download'
+   predicates: id > 0
+   skipped Iceberg predicates: action = 'download'
    row-size=64B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -840,7 +841,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: id > 0, action = 'download'
+   predicates: id > 0
+   skipped Iceberg predicates: action = 'download'
    row-size=64B cardinality=1
 ====
 select * from iceberg_v2_partitioned_position_deletes
@@ -857,7 +859,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: `user` = 'Lisa', action = 'download'
+   predicates: `user` = 'Lisa'
+   skipped Iceberg predicates: action = 'download'
    row-size=64B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -875,7 +878,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=1 size=1.17KB
-   predicates: `user` = 'Lisa', action = 'download'
+   predicates: `user` = 'Lisa'
+   skipped Iceberg predicates: action = 'download'
    row-size=64B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (action = 'click' or action = 'view') and id > 0;
@@ -884,7 +888,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, action IN ('click', 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: action IN ('click', 'view')
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -893,7 +898,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, action IN ('click', 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: action IN ('click', 'view')
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where action in ('click', 'view') and id > 0;
@@ -902,7 +908,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, action IN ('click', 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: action IN ('click', 'view')
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -911,7 +918,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, action IN ('click', 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: action IN ('click', 'view')
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action = 'click') and id > 0;
@@ -920,7 +928,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=6 size=6.85KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click')
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -929,7 +938,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=6 size=6.85KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click')
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action = 'click' or action = 'view') and id > 0;
@@ -938,7 +948,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view')
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -947,7 +958,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view')
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action in ('click', 'view')) and id > 0;
@@ -956,7 +968,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view'))
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view'))
    row-size=32B cardinality=1
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -965,7 +978,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=14 size=15.93KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view'))
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view'))
    row-size=32B cardinality=1
 ====
 select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action > 'a') and id > 0;
@@ -974,7 +988,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=20 size=22.90KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a')
    row-size=32B cardinality=2
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
@@ -983,7 +998,8 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=20 size=22.90KB
-   predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a')
+   predicates: id > 0
+   skipped Iceberg predicates: (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a')
    row-size=32B cardinality=2
 ====
 # All predicates are pushed down to Iceberg and won't filter any further rows. Skip pushing it to Scan node.
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 3193f0254..795e7ff79 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -323,13 +323,14 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.iceberg_partitioned]
    HDFS partitions=1/1 files=4 size=4.57KB
-   predicates: id > CAST(0 AS INT), action = 'click'
+   predicates: id > CAST(0 AS INT)
+   skipped Iceberg predicates: action = 'click'
    stored statistics:
      table: rows=20 size=22.90KB
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=5
-   parquet statistics predicates: id > CAST(0 AS INT), action = 'click'
-   parquet dictionary predicates: id > CAST(0 AS INT), action = 'click'
+   parquet statistics predicates: id > CAST(0 AS INT)
+   parquet dictionary predicates: id > CAST(0 AS INT)
    mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=44B cardinality=1
    in pipelines: 00(GETNEXT)


[impala] 01/03: IMPALA-12313: (part 1) Refactor modify statements

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eb0e2bbf9020af68c8cbce8baa87f37a66071653
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Sep 12 17:07:41 2023 +0200

    IMPALA-12313: (part 1) Refactor modify statements
    
    This change refactors the classes and methods that implement
    modify statements like DELETE and UPDATE. ModifyStmt, DeleteStmt,
    UpdateStmt are created during parsing and contain information about
    the statement: FROM clause, WHERE clause, target table, etc.
    
    The logic that actually implements these operations is dependent
    on the type of the target table. Therefore during analysis, after
    the target table is resolved, we create the *Impl object (e.g.
    IcebergDeleteImpl, KuduUpdateImpl) that implements the logic. The
    impl object is in charge of creating the source statement of the
    operation, doing the necessary rewrites/masking, and also creating
    the data sink.
    
    Testing:
     * N/A: no new functionality / bug fix
    
    Change-Id: If15f64944f2e23064b7112ad5930abc775dd65ec
    Reviewed-on: http://gerrit.cloudera.org:8080/20477
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/DeleteStmt.java     |  25 +-
 .../apache/impala/analysis/DmlStatementBase.java   |   2 +
 .../apache/impala/analysis/IcebergDeleteImpl.java  |  44 +++
 .../apache/impala/analysis/IcebergModifyImpl.java  | 101 ++++++
 .../org/apache/impala/analysis/KuduDeleteImpl.java |  46 +++
 .../org/apache/impala/analysis/KuduModifyImpl.java |  63 ++++
 .../org/apache/impala/analysis/KuduUpdateImpl.java |  45 +++
 .../org/apache/impala/analysis/ModifyImpl.java     | 268 +++++++++++++++
 .../org/apache/impala/analysis/ModifyStmt.java     | 361 +++------------------
 .../org/apache/impala/analysis/UpdateStmt.java     |  23 +-
 .../java/org/apache/impala/planner/Planner.java    |   2 +-
 11 files changed, 639 insertions(+), 341 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
index 47164fb43..a02170fb6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -20,6 +20,9 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.planner.TableSink;
@@ -52,21 +55,21 @@ public class DeleteStmt extends ModifyStmt {
         new ArrayList<>(), other.wherePredicate_.clone());
   }
 
+  @Override
+  protected void createModifyImpl() {
+    if (table_ instanceof FeKuduTable) {
+      modifyImpl_ = new KuduDeleteImpl(this);
+    } else if (table_ instanceof FeIcebergTable) {
+      modifyImpl_ = new IcebergDeleteImpl(this);
+    }
+  }
+
   public DataSink createDataSink() {
-    // analyze() must have been called before.
-    Preconditions.checkState(table_ != null);
-    TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
-        partitionKeyExprs_, resultExprs_, referencedColumns_, false, false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
-        getKuduTransactionToken(),
-        maxTableSinks_);
-    Preconditions.checkState(!referencedColumns_.isEmpty());
-    return tableSink;
+    return modifyImpl_.createDataSink();
   }
 
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
-    resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
-    partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
+    modifyImpl_.substituteResultExprs(smap, analyzer);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
index 1afcd98b8..cf3dfd63d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
@@ -53,6 +53,8 @@ public abstract class DmlStatementBase extends StatementBase {
   }
 
   public FeTable getTargetTable() { return table_; }
+
+  protected void setTargetTable(FeTable tbl) { table_ = tbl; }
   public void setMaxTableSinks(int maxTableSinks) { this.maxTableSinks_ = maxTableSinks; }
 
   public boolean hasShuffleHint() { return false; }
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
new file mode 100644
index 000000000..f32ce3ff2
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class IcebergDeleteImpl extends IcebergModifyImpl {
+  public IcebergDeleteImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeIcebergTable);
+    TableSink tableSink = TableSink.create(modifyStmt_.table_, TableSink.Op.DELETE,
+        partitionKeyExprs_, resultExprs_, getReferencedColumns(), false, false,
+        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, null,
+        modifyStmt_.maxTableSinks_);
+    Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return tableSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
new file mode 100644
index 000000000..30b4c375e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergPositionDeleteTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TIcebergFileFormat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+abstract class IcebergModifyImpl extends ModifyImpl {
+  FeIcebergTable originalTargetTable_;
+  IcebergPositionDeleteTable icePosDelTable_;
+
+  public IcebergModifyImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+    originalTargetTable_ = (FeIcebergTable)modifyStmt_.getTargetTable();
+    icePosDelTable_ = new IcebergPositionDeleteTable(originalTargetTable_);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    // Make the virtual position delete table the new target table.
+    modifyStmt_.setTargetTable(icePosDelTable_);
+    modifyStmt_.setMaxTableSinks(analyzer.getQueryOptions().getMax_fs_writers());
+    if (modifyStmt_ instanceof UpdateStmt) {
+      throw new AnalysisException("UPDATE is not supported for Iceberg table " +
+          originalTargetTable_.getFullName());
+    }
+
+    if (icePosDelTable_.getFormatVersion() == 1) {
+      throw new AnalysisException("Iceberg V1 table do not support DELETE/UPDATE " +
+          "operations: " + originalTargetTable_.getFullName());
+    }
+
+    String deleteMode = originalTargetTable_.getIcebergApiTable().properties().get(
+        org.apache.iceberg.TableProperties.DELETE_MODE);
+    if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
+      throw new AnalysisException(String.format("Unsupported delete mode: '%s' for " +
+          "Iceberg table: %s", deleteMode, originalTargetTable_.getFullName()));
+    }
+
+    if (originalTargetTable_.getDeleteFileFormat() != TIcebergFileFormat.PARQUET) {
+      throw new AnalysisException("Impala can only write delete files in PARQUET, " +
+          "but the given table uses a different file format: " +
+          originalTargetTable_.getFullName());
+    }
+
+    Expr wherePredicate = modifyStmt_.getWherePredicate();
+    if (wherePredicate == null ||
+        org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate)) {
+      // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE TABLE t;
+      throw new AnalysisException("For deleting every row, please use TRUNCATE.");
+    }
+  }
+
+  @Override
+  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+  }
+
+  @Override
+  public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap) throws AnalysisException {
+    if (originalTargetTable_.isPartitioned()) {
+      String[] partitionCols;
+      partitionCols = new String[] {"PARTITION__SPEC__ID",
+          "ICEBERG__PARTITION__SERIALIZED"};
+      for (String k : partitionCols) {
+        addPartitioningColumn(analyzer, selectList, referencedColumns, uniqueSlots,
+            keySlots, colIndexMap, k);
+      }
+    }
+    String[] deleteCols;
+    deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
+    // Add the key columns as slot refs
+    for (String k : deleteCols) {
+      addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
+          colIndexMap, k, true);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
new file mode 100644
index 000000000..54625fb03
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class KuduDeleteImpl extends KuduModifyImpl {
+  public KuduDeleteImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    // analyze() must have been called before.
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeKuduTable);
+    TableSink tableSink = TableSink.create(modifyStmt_.table_, TableSink.Op.DELETE,
+        partitionKeyExprs_, resultExprs_, getReferencedColumns(), false, false,
+        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
+        modifyStmt_.getKuduTransactionToken(),
+        modifyStmt_.maxTableSinks_);
+        Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return tableSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
new file mode 100644
index 000000000..76290e977
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+abstract class KuduModifyImpl extends ModifyImpl {
+  // Target Kudu table.
+  FeKuduTable kuduTable_;
+
+  public KuduModifyImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+    kuduTable_ = (FeKuduTable)modifyStmt.getTargetTable();
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {}
+
+  @Override
+  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+    // cast result expressions to the correct type of the referenced slot of the
+    // target table
+    List<Pair<SlotRef, Expr>> assignments = modifyStmt_.getAssignments();
+    int keyColumnsOffset = kuduTable_.getPrimaryKeyColumnNames().size();
+    for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
+      sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
+          assignments.get(i - keyColumnsOffset).first.getType()));
+    }
+  }
+
+  @Override
+  public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap) throws AnalysisException {
+    // Add the key columns as slot refs
+    for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
+      addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
+          colIndexMap, k, false);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java
new file mode 100644
index 000000000..57e8df262
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class KuduUpdateImpl extends KuduModifyImpl {
+  public KuduUpdateImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    // analyze() must have been called before.
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeKuduTable);
+    DataSink dataSink = TableSink.create(modifyStmt_.table_, TableSink.Op.UPDATE,
+        ImmutableList.<Expr>of(), sourceStmt_.getResultExprs(), getReferencedColumns(),
+        false, false, new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
+        modifyStmt_.getKuduTransactionToken(), 0);
+    Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return dataSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
new file mode 100644
index 000000000..08b97c764
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import static java.lang.String.format;
+
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.KuduColumn;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.rewrite.ExprRewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Abstract class for implementing a Modify statement such as DELETE or UPDATE. Child
+ * classes implement logic specific to target table types.
+ */
+abstract class ModifyImpl {
+  abstract void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException;
+
+  abstract void addKeyColumns(Analyzer analyzer,
+      List<SelectListItem> selectList, List<Integer> referencedColumns,
+      Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap)
+      throws AnalysisException;
+
+  abstract void analyze(Analyzer analyzer) throws AnalysisException;
+
+  abstract DataSink createDataSink();
+
+  // The Modify statement for this modify impl. The ModifyStmt class holds information
+  // about the statement (e.g. target table type, FROM, WHERE clause, etc.)
+  ModifyStmt modifyStmt_;
+  /////////////////////////////////////////
+  // START: Members that are set in createSourceStmt().
+
+  // Result of the analysis of the internal SelectStmt that produces the rows that
+  // will be modified.
+  protected SelectStmt sourceStmt_;
+
+  // Output expressions that produce the final results to write to the target table. May
+  // include casts.
+  //
+  // In case of DELETE statements it contains the columns that identify the deleted
+  // rows (Kudu primary keys, Iceberg file_path / position).
+  protected List<Expr> resultExprs_ = new ArrayList<>();
+
+  // Exprs corresponding to the partitionKeyValues, if specified, or to the partition
+  // columns for tables.
+  protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
+
+  // For every column of the target table that is referenced in the optional
+  // 'sort.columns' table property, this list will contain the corresponding result expr
+  // from 'resultExprs_'. Before insertion, all rows will be sorted by these exprs. If the
+  // list is empty, no additional sorting by non-partitioning columns will be performed.
+  // The column list must not contain partition columns and must be empty for non-Hdfs
+  // tables.
+  protected List<Expr> sortExprs_ = new ArrayList<>();
+
+  // Position mapping of output expressions of the sourceStmt_ to column indices in the
+  // target table. The i'th position in this list maps to the referencedColumns_[i]'th
+  // position in the target table.
+  protected List<Integer> referencedColumns_ = new ArrayList<>();
+  // END: Members that are set in first run of analyze
+  /////////////////////////////////////////
+
+  public ModifyImpl(ModifyStmt modifyStmt) {
+    modifyStmt_ = modifyStmt;
+  }
+
+  public void reset() {
+    if (sourceStmt_ != null) sourceStmt_.reset();
+  }
+
+  /**
+   * Builds and validates the sourceStmt_. The select list of the sourceStmt_ contains
+   * first the SlotRefs for the key Columns, followed by the expressions representing the
+   * assignments. This method sets the member variables for the sourceStmt_ and the
+   * referencedColumns_.
+   *
+   * This only creates the sourceStmt_ once, following invocations will reuse the
+   * previously created statement.
+   */
+  protected void createSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+    if (sourceStmt_ == null) {
+      // Builds the select list and column position mapping for the target table.
+      ArrayList<SelectListItem> selectList = new ArrayList<>();
+      buildAndValidateAssignmentExprs(analyzer, selectList);
+
+      // Analyze the generated select statement.
+      sourceStmt_ = new SelectStmt(new SelectList(selectList), modifyStmt_.fromClause_,
+          modifyStmt_.wherePredicate_,
+          null, null, null, null);
+
+      addCastsToAssignmentsInSourceStmt(analyzer);
+    }
+    sourceStmt_.analyze(analyzer);
+  }
+
+  /**
+   * Validates the list of value assignments that should be used to modify the target
+   * table. It verifies that only those columns are referenced that belong to the target
+   * table, no key columns are modified, and that a single column is not modified multiple
+   * times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list of
+   * SelectListItems to the out parameter selectList that is used to build the select list
+   * for sourceStmt_. A list of integers indicating the column position of an entry in the
+   * select list in the target table is written to the field referencedColumns_.
+   *
+   * In addition to the expressions that are generated for each assignment, the
+   * expression list contains an expression for each key column. The key columns
+   * are always prepended to the list of expression representing the assignments.
+   */
+  private void buildAndValidateAssignmentExprs(Analyzer analyzer,
+      List<SelectListItem> selectList)
+      throws AnalysisException {
+    // The order of the referenced columns equals the order of the result expressions
+    Set<SlotId> uniqueSlots = new HashSet<>();
+    Set<SlotId> keySlots = new HashSet<>();
+
+    // Mapping from column name to index
+    List<Column> cols = modifyStmt_.table_.getColumnsInHiveOrder();
+    Map<String, Integer> colIndexMap = new HashMap<>();
+    for (int i = 0; i < cols.size(); i++) {
+      colIndexMap.put(cols.get(i).getName(), i);
+    }
+
+    addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
+        keySlots, colIndexMap);
+
+    // Assignments are only used in the context of updates.
+    for (Pair<SlotRef, Expr> valueAssignment : modifyStmt_.assignments_) {
+      SlotRef lhsSlotRef = valueAssignment.first;
+      lhsSlotRef.analyze(analyzer);
+
+      Expr rhsExpr = valueAssignment.second;
+      // No subqueries for rhs expression
+      if (rhsExpr.contains(Subquery.class)) {
+        throw new AnalysisException(
+            format("Subqueries are not supported as update expressions for column '%s'",
+                lhsSlotRef.toSql()));
+      }
+      rhsExpr.analyze(analyzer);
+
+      // Correct target table
+      if (!lhsSlotRef.isBoundByTupleIds(modifyStmt_.targetTableRef_.getId().asList())) {
+        throw new AnalysisException(
+            format("Left-hand side column '%s' in assignment expression '%s=%s' does not "
+                + "belong to target table '%s'", lhsSlotRef.toSql(), lhsSlotRef.toSql(),
+                rhsExpr.toSql(),
+                modifyStmt_.targetTableRef_.getDesc().getTable().getFullName()));
+      }
+
+      Column c = lhsSlotRef.getResolvedPath().destColumn();
+      // TODO(Kudu) Add test for this code-path when Kudu supports nested types
+      if (c == null) {
+        throw new AnalysisException(
+            format("Left-hand side in assignment expression '%s=%s' must be a column " +
+                "reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
+      }
+
+      if (keySlots.contains(lhsSlotRef.getSlotId())) {
+        boolean isSystemGeneratedColumn =
+            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
+        throw new AnalysisException(format("%s column '%s' cannot be updated.",
+            isSystemGeneratedColumn ? "System generated key" : "Key",
+            lhsSlotRef.toSql()));
+      }
+
+      if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
+        throw new AnalysisException(
+            format("Duplicate value assignment to column: '%s'", lhsSlotRef.toSql()));
+      }
+
+      rhsExpr = StatementBase.checkTypeCompatibility(
+          modifyStmt_.targetTableRef_.getDesc().getTable().getFullName(),
+          c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
+      uniqueSlots.add(lhsSlotRef.getSlotId());
+      selectList.add(new SelectListItem(rhsExpr, null));
+      referencedColumns_.add(colIndexMap.get(c.getName()));
+    }
+  }
+
+  protected void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap, String colName, boolean isSortingColumn)
+      throws AnalysisException {
+    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
+        keySlots, colIndexMap, colName);
+    resultExprs_.add(ref);
+    if (isSortingColumn) sortExprs_.add(ref);
+  }
+
+  protected void addPartitioningColumn(Analyzer analyzer, List<SelectListItem> selectList,
+  List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+  Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
+    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
+        keySlots, colIndexMap, colName);
+    partitionKeyExprs_.add(ref);
+    sortExprs_.add(ref);
+  }
+
+  private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
+    List<String> path = Path.createRawPath(modifyStmt_.targetTableRef_.getUniqueAlias(),
+        colName);
+    SlotRef ref = new SlotRef(path);
+    ref.analyze(analyzer);
+    selectList.add(new SelectListItem(ref, null));
+    uniqueSlots.add(ref.getSlotId());
+    keySlots.add(ref.getSlotId());
+    referencedColumns.add(colIndexMap.get(colName));
+    return ref;
+  }
+
+  public List<Expr> getPartitionKeyExprs() {
+     return partitionKeyExprs_;
+  }
+
+  public List<Expr> getSortExprs() {
+    return sortExprs_;
+  }
+
+  public QueryStmt getQueryStmt() {
+    return sourceStmt_;
+  }
+
+  public List<Integer> getReferencedColumns() {
+    return referencedColumns_;
+  }
+
+  public void castResultExprs(List<Type> types) throws AnalysisException {
+    sourceStmt_.castResultExprs(types);
+  }
+
+  public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
+    sourceStmt_.rewriteExprs(rewriter);
+  }
+
+  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
+    partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 197f72bad..b3a798918 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -20,25 +20,16 @@ package org.apache.impala.analysis;
 import static java.lang.String.format;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.impala.authorization.Privilege;
-import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.IcebergPositionDeleteTable;
-import org.apache.impala.catalog.KuduColumn;
-import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.rewrite.ExprRewriter;
-import org.apache.impala.thrift.TIcebergFileFormat;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -53,12 +44,21 @@ import com.google.common.base.Preconditions;
  *   - assignmentExprs (not null, can be empty)
  *   - wherePredicate (nullable)
  *
- * In the analysis phase, a SelectStmt is created with the result expressions set to
- * match the right-hand side of the assignments in addition to projecting the key columns
- * of the underlying table. During query execution, the plan that
- * is generated from this SelectStmt produces all rows that need to be modified.
+ * This class holds information from parsing and semantic analysis. Then it delegates
+ * implementation logic to the *Impl classes, e.g. KuduDeleteImpl, IcebergDeleteImpl,
+ * etc.
+ * In the analysis phase, the impl object creates a SelectStmt with the result expressions
+ * which hold information about the modified records (e.g. primary keys of Kudu tables,
+ * file_path / pos information of Iceberg data records).
+ * During query execution, the plan that is generated from this SelectStmt produces
+ * all rows that need to be modified.
+ *
+ * UPDATEs:
+ * The result of the SelectStmt contain the right-hand side of the assignments in addition
+ * to projecting the key columns of the underlying table.
  */
 public abstract class ModifyStmt extends DmlStatementBase {
+
   // List of explicitly mentioned assignment expressions in the UPDATE's SET clause
   protected final List<Pair<SlotRef, Expr>> assignments_;
 
@@ -71,40 +71,14 @@ public abstract class ModifyStmt extends DmlStatementBase {
   // TableRef identifying the target table, set during analysis.
   protected TableRef targetTableRef_;
 
+  // FROM clause of the statement
   protected FromClause fromClause_;
 
   /////////////////////////////////////////
   // START: Members that are set in first run of analyze().
 
-  // Exprs correspond to the partitionKeyValues, if specified, or to the partition columns
-  // for tables.
-  protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
-
-  // For every column of the target table that is referenced in the optional
-  // 'sort.columns' table property, this list will contain the corresponding result expr
-  // from 'resultExprs_'. Before insertion, all rows will be sorted by these exprs. If the
-  // list is empty, no additional sorting by non-partitioning columns will be performed.
-  // The column list must not contain partition columns and must be empty for non-Hdfs
-  // tables.
-  protected List<Expr> sortExprs_ = new ArrayList<>();
-
-  // Output expressions that produce the final results to write to the target table. May
-  // include casts. Set in first run of analyze().
-  //
-  // In case of DELETE statements it contains the columns that identify the deleted rows.
-  protected List<Expr> resultExprs_ = new ArrayList<>();
-
-  // Result of the analysis of the internal SelectStmt that produces the rows that
-  // will be modified.
-  protected SelectStmt sourceStmt_;
-
   // Implementation of the modify statement. Depends on the target table type.
-  private ModifyImpl modifyImpl_;
-
-  // Position mapping of output expressions of the sourceStmt_ to column indices in the
-  // target table. The i'th position in this list maps to the referencedColumns_[i]'th
-  // position in the target table. Set in createSourceStmt() during analysis.
-  protected List<Integer> referencedColumns_ = new ArrayList<>();
+  protected ModifyImpl modifyImpl_;
 
   // END: Members that are set in first run of analyze
   /////////////////////////////////////////
@@ -137,10 +111,8 @@ public abstract class ModifyStmt extends DmlStatementBase {
   /**
    * The analysis of the ModifyStmt proceeds as follows: First, the FROM clause is
    * analyzed and the targetTablePath is verified to be a valid alias into the FROM
-   * clause. When the target table is identified, the assignment expressions are
-   * validated and as a last step the internal SelectStmt is produced and analyzed.
-   * Potential query rewrites for the select statement are implemented here and are not
-   * triggered externally by the statement rewriter.
+   * clause. It also identifies the target table. Raises errors for unsupported table
+   * types.
    */
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
@@ -172,7 +144,6 @@ public abstract class ModifyStmt extends DmlStatementBase {
 
     Preconditions.checkNotNull(targetTableRef_);
     FeTable dstTbl = targetTableRef_.getTable();
-    table_ = dstTbl;
     // Only Kudu and Iceberg tables can be updated.
     if (!(dstTbl instanceof FeKuduTable) && !(dstTbl instanceof FeIcebergTable)) {
       throw new AnalysisException(
@@ -180,154 +151,59 @@ public abstract class ModifyStmt extends DmlStatementBase {
               "but the following table is neither: %s",
               dstTbl.getFullName()));
     }
-    if (dstTbl instanceof FeKuduTable) {
-      modifyImpl_ = this.new ModifyKudu();
-    } else if (dstTbl instanceof FeIcebergTable) {
-      modifyImpl_ = this.new ModifyIceberg();
-    }
-
-    modifyImpl_.analyze(analyzer);
-
     // Make sure that the user is allowed to modify the target table. Use ALL because no
     // UPDATE / DELETE privilege exists yet (IMPALA-3840).
     analyzer.registerAuthAndAuditEvent(dstTbl, Privilege.ALL);
-
-    // Validates the assignments_ and creates the sourceStmt_.
-    if (sourceStmt_ == null) createSourceStmt(analyzer);
-    sourceStmt_.analyze(analyzer);
+    table_ = dstTbl;
+    if (modifyImpl_ == null) createModifyImpl();
+    modifyImpl_.analyze(analyzer);
+    // Create and analyze the source statement.
+    modifyImpl_.createSourceStmt(analyzer);
     // Add target table to descriptor table.
     analyzer.getDescTbl().setTargetTable(table_);
 
     sqlString_ = toSql();
   }
 
+  /**
+   * Creates the implementation class for this statement. Ony called once during the
+   * first run of analyze().
+   */
+  abstract protected void createModifyImpl();
+
   @Override
   public void reset() {
     super.reset();
     fromClause_.reset();
-    if (sourceStmt_ != null) sourceStmt_.reset();
-    modifyImpl_ = null;
+    modifyImpl_.reset();
   }
 
   @Override
-  public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
+  public List<Expr> getPartitionKeyExprs() { return modifyImpl_.getPartitionKeyExprs(); }
   @Override
-  public List<Expr> getSortExprs() { return sortExprs_; }
+  public List<Expr> getSortExprs() { return modifyImpl_.getSortExprs(); }
 
-  @Override
-  public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
-    return sourceStmt_.resolveTableMask(analyzer);
+  public List<Integer> getReferencedColumns() {
+    return modifyImpl_.getReferencedColumns();
   }
 
-  /**
-   * Builds and validates the sourceStmt_. The select list of the sourceStmt_ contains
-   * first the SlotRefs for the key Columns, followed by the expressions representing the
-   * assignments. This method sets the member variables for the sourceStmt_ and the
-   * referencedColumns_.
-   *
-   * This is only run once, on the first analysis. Following analysis will reset() and
-   * reuse previously created statements.
-   */
-  private void createSourceStmt(Analyzer analyzer)
-      throws AnalysisException {
-    // Builds the select list and column position mapping for the target table.
-    ArrayList<SelectListItem> selectList = new ArrayList<>();
-    buildAndValidateAssignmentExprs(analyzer, selectList);
-
-    // Analyze the generated select statement.
-    sourceStmt_ = new SelectStmt(new SelectList(selectList), fromClause_, wherePredicate_,
-        null, null, null, null);
-
-    modifyImpl_.addCastsToAssignmentsInSourceStmt(analyzer);
-  }
-
-  /**
-   * Validates the list of value assignments that should be used to modify the target
-   * table. It verifies that only those columns are referenced that belong to the target
-   * table, no key columns are modified, and that a single column is not modified multiple
-   * times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list of
-   * SelectListItems to the out parameter selectList that is used to build the select list
-   * for sourceStmt_. A list of integers indicating the column position of an entry in the
-   * select list in the target table is written to the out parameter referencedColumns.
-   *
-   * In addition to the expressions that are generated for each assignment, the
-   * expression list contains an expression for each key column. The key columns
-   * are always prepended to the list of expression representing the assignments.
-   */
-  private void buildAndValidateAssignmentExprs(Analyzer analyzer,
-      List<SelectListItem> selectList)
-      throws AnalysisException {
-    // The order of the referenced columns equals the order of the result expressions
-    Set<SlotId> uniqueSlots = new HashSet<>();
-    Set<SlotId> keySlots = new HashSet<>();
-
-    // Mapping from column name to index
-    List<Column> cols = table_.getColumnsInHiveOrder();
-    Map<String, Integer> colIndexMap = new HashMap<>();
-    for (int i = 0; i < cols.size(); i++) {
-      colIndexMap.put(cols.get(i).getName(), i);
-    }
-
-    modifyImpl_.addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
-        keySlots, colIndexMap);
-
-    // Assignments are only used in the context of updates.
-    for (Pair<SlotRef, Expr> valueAssignment : assignments_) {
-      SlotRef lhsSlotRef = valueAssignment.first;
-      lhsSlotRef.analyze(analyzer);
+  public Expr getWherePredicate() { return wherePredicate_; }
 
-      Expr rhsExpr = valueAssignment.second;
-      // No subqueries for rhs expression
-      if (rhsExpr.contains(Subquery.class)) {
-        throw new AnalysisException(
-            format("Subqueries are not supported as update expressions for column '%s'",
-                lhsSlotRef.toSql()));
-      }
-      rhsExpr.analyze(analyzer);
-
-      // Correct target table
-      if (!lhsSlotRef.isBoundByTupleIds(targetTableRef_.getId().asList())) {
-        throw new AnalysisException(
-            format("Left-hand side column '%s' in assignment expression '%s=%s' does not "
-                + "belong to target table '%s'", lhsSlotRef.toSql(), lhsSlotRef.toSql(),
-                rhsExpr.toSql(), targetTableRef_.getDesc().getTable().getFullName()));
-      }
-
-      Column c = lhsSlotRef.getResolvedPath().destColumn();
-      // TODO(Kudu) Add test for this code-path when Kudu supports nested types
-      if (c == null) {
-        throw new AnalysisException(
-            format("Left-hand side in assignment expression '%s=%s' must be a column " +
-                "reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
-      }
-
-      if (keySlots.contains(lhsSlotRef.getSlotId())) {
-        boolean isSystemGeneratedColumn =
-            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
-        throw new AnalysisException(format("%s column '%s' cannot be updated.",
-            isSystemGeneratedColumn ? "System generated key" : "Key",
-            lhsSlotRef.toSql()));
-      }
-
-      if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
-        throw new AnalysisException(
-            format("Duplicate value assignment to column: '%s'", lhsSlotRef.toSql()));
-      }
+  public List<Pair<SlotRef, Expr>> getAssignments() { return assignments_; }
 
-      rhsExpr = checkTypeCompatibility(targetTableRef_.getDesc().getTable().getFullName(),
-          c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
-      uniqueSlots.add(lhsSlotRef.getSlotId());
-      selectList.add(new SelectListItem(rhsExpr, null));
-      referencedColumns_.add(colIndexMap.get(c.getName()));
-    }
+  @Override
+  public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
+    return getQueryStmt().resolveTableMask(analyzer);
   }
 
   @Override
-  public List<Expr> getResultExprs() { return sourceStmt_.getResultExprs(); }
+  public List<Expr> getResultExprs() {
+    return modifyImpl_.getQueryStmt().getResultExprs();
+  }
 
   @Override
   public void castResultExprs(List<Type> types) throws AnalysisException {
-    sourceStmt_.castResultExprs(types);
+    modifyImpl_.castResultExprs(types);
   }
 
   @Override
@@ -336,165 +212,16 @@ public abstract class ModifyStmt extends DmlStatementBase {
     for (Pair<SlotRef, Expr> assignment: assignments_) {
       assignment.second = rewriter.rewrite(assignment.second, analyzer_);
     }
-    sourceStmt_.rewriteExprs(rewriter);
+    modifyImpl_.rewriteExprs(rewriter);
   }
 
-  public QueryStmt getQueryStmt() { return sourceStmt_; }
+  public QueryStmt getQueryStmt() { return modifyImpl_.getQueryStmt(); }
 
   /**
    * Return true if the target table is Kudu table.
-   * Since only Kudu tables can be updated, it must be true.
    */
   public boolean isTargetTableKuduTable() { return (table_ instanceof FeKuduTable); }
 
-  private void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-      Map<String, Integer> colIndexMap, String colName, boolean isSortingColumn)
-      throws AnalysisException {
-    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
-        keySlots, colIndexMap, colName);
-    resultExprs_.add(ref);
-    if (isSortingColumn) sortExprs_.add(ref);
-  }
-
-  private void addPartitioningColumn(Analyzer analyzer, List<SelectListItem> selectList,
-  List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-  Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
-    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
-        keySlots, colIndexMap, colName);
-    partitionKeyExprs_.add(ref);
-    sortExprs_.add(ref);
-  }
-
-  private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-      Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
-    List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), colName);
-    SlotRef ref = new SlotRef(path);
-    ref.analyze(analyzer);
-    selectList.add(new SelectListItem(ref, null));
-    uniqueSlots.add(ref.getSlotId());
-    keySlots.add(ref.getSlotId());
-    referencedColumns.add(colIndexMap.get(colName));
-    return ref;
-  }
-
   @Override
   public abstract String toSql(ToSqlOptions options);
-
-  private interface ModifyImpl {
-    void analyze(Analyzer analyzer) throws AnalysisException;
-
-    void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-      throws AnalysisException;
-
-    void addKeyColumns(Analyzer analyzer,
-        List<SelectListItem> selectList, List<Integer> referencedColumns,
-        Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap)
-        throws AnalysisException;
-  }
-
-  private class ModifyKudu implements ModifyImpl {
-    // Target Kudu table. Result of analysis.
-    FeKuduTable kuduTable_ = (FeKuduTable)table_;
-
-    @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException {}
-
-    @Override
-    public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-        throws AnalysisException {
-      // cast result expressions to the correct type of the referenced slot of the
-      // target table
-      int keyColumnsOffset = kuduTable_.getPrimaryKeyColumnNames().size();
-      for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
-        sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
-            assignments_.get(i - keyColumnsOffset).first.getType()));
-      }
-    }
-
-    @Override
-    public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
-        List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-        Map<String, Integer> colIndexMap) throws AnalysisException {
-      // Add the key columns as slot refs
-      for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
-        addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
-            colIndexMap, k, false);
-      }
-    }
-  }
-
-  private class ModifyIceberg implements ModifyImpl {
-    FeIcebergTable originalTargetTable_;
-    IcebergPositionDeleteTable icePosDelTable_;
-
-    public ModifyIceberg() {
-      originalTargetTable_ = (FeIcebergTable)table_;
-      icePosDelTable_ = new IcebergPositionDeleteTable((FeIcebergTable)table_);
-      // Make the virtual position delete table the new target table.
-      table_ = icePosDelTable_;
-    }
-
-    @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException {
-      setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
-      if (ModifyStmt.this instanceof UpdateStmt) {
-        throw new AnalysisException("UPDATE is not supported for Iceberg table " +
-            originalTargetTable_.getFullName());
-      }
-
-      if (icePosDelTable_.getFormatVersion() == 1) {
-        throw new AnalysisException("Iceberg V1 table do not support DELETE/UPDATE " +
-            "operations: " + originalTargetTable_.getFullName());
-      }
-
-      String deleteMode = originalTargetTable_.getIcebergApiTable().properties().get(
-          org.apache.iceberg.TableProperties.DELETE_MODE);
-      if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
-        throw new AnalysisException(String.format("Unsupported delete mode: '%s' for " +
-            "Iceberg table: %s", deleteMode, originalTargetTable_.getFullName()));
-      }
-
-      if (originalTargetTable_.getDeleteFileFormat() != TIcebergFileFormat.PARQUET) {
-        throw new AnalysisException("Impala can only write delete files in PARQUET, " +
-            "but the given table uses a different file format: " +
-            originalTargetTable_.getFullName());
-      }
-
-      if (wherePredicate_ == null ||
-          org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate_)) {
-        // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE TABLE t;
-        throw new AnalysisException("For deleting every row, please use TRUNCATE.");
-      }
-    }
-
-    @Override
-    public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-        throws AnalysisException {
-    }
-
-    @Override
-    public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
-        List<Integer> referencedColumns,
-        Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap)
-        throws AnalysisException {
-      if (originalTargetTable_.isPartitioned()) {
-        String[] partitionCols;
-        partitionCols = new String[] {"PARTITION__SPEC__ID",
-            "ICEBERG__PARTITION__SERIALIZED"};
-        for (String k : partitionCols) {
-          addPartitioningColumn(analyzer, selectList, referencedColumns, uniqueSlots,
-              keySlots, colIndexMap, k);
-        }
-      }
-      String[] deleteCols;
-      deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
-      // Add the key columns as slot refs
-      for (String k : deleteCols) {
-        addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
-            colIndexMap, k, true);
-      }
-    }
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index 8f8dfee71..9e36ed1db 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -22,13 +22,12 @@ import static java.lang.String.format;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
-import org.apache.impala.planner.TableSink;
-import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 /**
  * Representation of an Update statement.
@@ -58,19 +57,19 @@ public class UpdateStmt extends ModifyStmt {
         new ArrayList<>(), other.wherePredicate_);
   }
 
+  @Override
+  protected void createModifyImpl() {
+    // Currently only Kudu tables are supported.
+    Preconditions.checkState(table_ instanceof FeKuduTable);
+    modifyImpl_ = new KuduUpdateImpl(this);
+  }
+
   /**
    * Return an instance of a KuduTableSink specialized as an Update operation.
    */
-  public DataSink createDataSink(List<Expr> resultExprs) {
+  public DataSink createDataSink() {
     // analyze() must have been called before.
-    Preconditions.checkState(table_ != null);
-    DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE,
-        ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
-        getKuduTransactionToken(),
-        0);
-    Preconditions.checkState(!referencedColumns_.isEmpty());
-    return dataSink;
+    return modifyImpl_.createDataSink();
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 9a35bef7e..c33761a8d 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -175,7 +175,7 @@ public class Planner {
       if (ctx_.isUpdate()) {
         // Set up update sink for root fragment
         rootFragment.setSink(
-            ctx_.getAnalysisResult().getUpdateStmt().createDataSink(resultExprs));
+            ctx_.getAnalysisResult().getUpdateStmt().createDataSink());
       } else if (ctx_.isDelete()) {
         // Set up delete sink for root fragment
         DeleteStmt deleteStmt = ctx_.getAnalysisResult().getDeleteStmt();