You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/09/28 17:36:09 UTC

[impala] 01/03: IMPALA-12313: (part 1) Refactor modify statements

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit eb0e2bbf9020af68c8cbce8baa87f37a66071653
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Sep 12 17:07:41 2023 +0200

    IMPALA-12313: (part 1) Refactor modify statements
    
    This change refactors the classes and methods that implement
    modify statements like DELETE and UPDATE. ModifyStmt, DeleteStmt,
    UpdateStmt are created during parsing and contain information about
    the statement: FROM clause, WHERE clause, target table, etc.
    
    The logic that actually implements these operations is dependent
    on the type of the target table. Therefore during analysis, after
    the target table is resolved, we create the *Impl object (e.g.
    IcebergDeleteImpl, KuduUpdateImpl) that implements the logic. The
    impl object is in charge of creating the source statement of the
    operation, doing the necessary rewrites/masking, and also creating
    the data sink.
    
    Testing:
     * N/A: no new functionality / bug fix
    
    Change-Id: If15f64944f2e23064b7112ad5930abc775dd65ec
    Reviewed-on: http://gerrit.cloudera.org:8080/20477
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/DeleteStmt.java     |  25 +-
 .../apache/impala/analysis/DmlStatementBase.java   |   2 +
 .../apache/impala/analysis/IcebergDeleteImpl.java  |  44 +++
 .../apache/impala/analysis/IcebergModifyImpl.java  | 101 ++++++
 .../org/apache/impala/analysis/KuduDeleteImpl.java |  46 +++
 .../org/apache/impala/analysis/KuduModifyImpl.java |  63 ++++
 .../org/apache/impala/analysis/KuduUpdateImpl.java |  45 +++
 .../org/apache/impala/analysis/ModifyImpl.java     | 268 +++++++++++++++
 .../org/apache/impala/analysis/ModifyStmt.java     | 361 +++------------------
 .../org/apache/impala/analysis/UpdateStmt.java     |  23 +-
 .../java/org/apache/impala/planner/Planner.java    |   2 +-
 11 files changed, 639 insertions(+), 341 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
index 47164fb43..a02170fb6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -20,6 +20,9 @@ package org.apache.impala.analysis;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.planner.TableSink;
@@ -52,21 +55,21 @@ public class DeleteStmt extends ModifyStmt {
         new ArrayList<>(), other.wherePredicate_.clone());
   }
 
+  @Override
+  protected void createModifyImpl() {
+    if (table_ instanceof FeKuduTable) {
+      modifyImpl_ = new KuduDeleteImpl(this);
+    } else if (table_ instanceof FeIcebergTable) {
+      modifyImpl_ = new IcebergDeleteImpl(this);
+    }
+  }
+
   public DataSink createDataSink() {
-    // analyze() must have been called before.
-    Preconditions.checkState(table_ != null);
-    TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE,
-        partitionKeyExprs_, resultExprs_, referencedColumns_, false, false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
-        getKuduTransactionToken(),
-        maxTableSinks_);
-    Preconditions.checkState(!referencedColumns_.isEmpty());
-    return tableSink;
+    return modifyImpl_.createDataSink();
   }
 
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
-    resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
-    partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
+    modifyImpl_.substituteResultExprs(smap, analyzer);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
index 1afcd98b8..cf3dfd63d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
@@ -53,6 +53,8 @@ public abstract class DmlStatementBase extends StatementBase {
   }
 
   public FeTable getTargetTable() { return table_; }
+
+  protected void setTargetTable(FeTable tbl) { table_ = tbl; }
   public void setMaxTableSinks(int maxTableSinks) { this.maxTableSinks_ = maxTableSinks; }
 
   public boolean hasShuffleHint() { return false; }
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
new file mode 100644
index 000000000..f32ce3ff2
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class IcebergDeleteImpl extends IcebergModifyImpl {
+  public IcebergDeleteImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeIcebergTable);
+    TableSink tableSink = TableSink.create(modifyStmt_.table_, TableSink.Op.DELETE,
+        partitionKeyExprs_, resultExprs_, getReferencedColumns(), false, false,
+        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, null,
+        modifyStmt_.maxTableSinks_);
+    Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return tableSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
new file mode 100644
index 000000000..30b4c375e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergPositionDeleteTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TIcebergFileFormat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+abstract class IcebergModifyImpl extends ModifyImpl {
+  FeIcebergTable originalTargetTable_;
+  IcebergPositionDeleteTable icePosDelTable_;
+
+  public IcebergModifyImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+    originalTargetTable_ = (FeIcebergTable)modifyStmt_.getTargetTable();
+    icePosDelTable_ = new IcebergPositionDeleteTable(originalTargetTable_);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    // Make the virtual position delete table the new target table.
+    modifyStmt_.setTargetTable(icePosDelTable_);
+    modifyStmt_.setMaxTableSinks(analyzer.getQueryOptions().getMax_fs_writers());
+    if (modifyStmt_ instanceof UpdateStmt) {
+      throw new AnalysisException("UPDATE is not supported for Iceberg table " +
+          originalTargetTable_.getFullName());
+    }
+
+    if (icePosDelTable_.getFormatVersion() == 1) {
+      throw new AnalysisException("Iceberg V1 table do not support DELETE/UPDATE " +
+          "operations: " + originalTargetTable_.getFullName());
+    }
+
+    String deleteMode = originalTargetTable_.getIcebergApiTable().properties().get(
+        org.apache.iceberg.TableProperties.DELETE_MODE);
+    if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
+      throw new AnalysisException(String.format("Unsupported delete mode: '%s' for " +
+          "Iceberg table: %s", deleteMode, originalTargetTable_.getFullName()));
+    }
+
+    if (originalTargetTable_.getDeleteFileFormat() != TIcebergFileFormat.PARQUET) {
+      throw new AnalysisException("Impala can only write delete files in PARQUET, " +
+          "but the given table uses a different file format: " +
+          originalTargetTable_.getFullName());
+    }
+
+    Expr wherePredicate = modifyStmt_.getWherePredicate();
+    if (wherePredicate == null ||
+        org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate)) {
+      // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE TABLE t;
+      throw new AnalysisException("For deleting every row, please use TRUNCATE.");
+    }
+  }
+
+  @Override
+  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+  }
+
+  @Override
+  public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap) throws AnalysisException {
+    if (originalTargetTable_.isPartitioned()) {
+      String[] partitionCols;
+      partitionCols = new String[] {"PARTITION__SPEC__ID",
+          "ICEBERG__PARTITION__SERIALIZED"};
+      for (String k : partitionCols) {
+        addPartitioningColumn(analyzer, selectList, referencedColumns, uniqueSlots,
+            keySlots, colIndexMap, k);
+      }
+    }
+    String[] deleteCols;
+    deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
+    // Add the key columns as slot refs
+    for (String k : deleteCols) {
+      addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
+          colIndexMap, k, true);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
new file mode 100644
index 000000000..54625fb03
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class KuduDeleteImpl extends KuduModifyImpl {
+  public KuduDeleteImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    // analyze() must have been called before.
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeKuduTable);
+    TableSink tableSink = TableSink.create(modifyStmt_.table_, TableSink.Op.DELETE,
+        partitionKeyExprs_, resultExprs_, getReferencedColumns(), false, false,
+        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
+        modifyStmt_.getKuduTransactionToken(),
+        modifyStmt_.maxTableSinks_);
+        Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return tableSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
new file mode 100644
index 000000000..76290e977
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+abstract class KuduModifyImpl extends ModifyImpl {
+  // Target Kudu table.
+  FeKuduTable kuduTable_;
+
+  public KuduModifyImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+    kuduTable_ = (FeKuduTable)modifyStmt.getTargetTable();
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {}
+
+  @Override
+  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+    // cast result expressions to the correct type of the referenced slot of the
+    // target table
+    List<Pair<SlotRef, Expr>> assignments = modifyStmt_.getAssignments();
+    int keyColumnsOffset = kuduTable_.getPrimaryKeyColumnNames().size();
+    for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
+      sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
+          assignments.get(i - keyColumnsOffset).first.getType()));
+    }
+  }
+
+  @Override
+  public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap) throws AnalysisException {
+    // Add the key columns as slot refs
+    for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
+      addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
+          colIndexMap, k, false);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java
new file mode 100644
index 000000000..57e8df262
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduUpdateImpl.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class KuduUpdateImpl extends KuduModifyImpl {
+  public KuduUpdateImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    // analyze() must have been called before.
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeKuduTable);
+    DataSink dataSink = TableSink.create(modifyStmt_.table_, TableSink.Op.UPDATE,
+        ImmutableList.<Expr>of(), sourceStmt_.getResultExprs(), getReferencedColumns(),
+        false, false, new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
+        modifyStmt_.getKuduTransactionToken(), 0);
+    Preconditions.checkState(!getReferencedColumns().isEmpty());
+    return dataSink;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
new file mode 100644
index 000000000..08b97c764
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import static java.lang.String.format;
+
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.KuduColumn;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.rewrite.ExprRewriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Abstract class for implementing a Modify statement such as DELETE or UPDATE. Child
+ * classes implement logic specific to target table types.
+ */
+abstract class ModifyImpl {
+  abstract void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException;
+
+  abstract void addKeyColumns(Analyzer analyzer,
+      List<SelectListItem> selectList, List<Integer> referencedColumns,
+      Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap)
+      throws AnalysisException;
+
+  abstract void analyze(Analyzer analyzer) throws AnalysisException;
+
+  abstract DataSink createDataSink();
+
+  // The Modify statement for this modify impl. The ModifyStmt class holds information
+  // about the statement (e.g. target table type, FROM, WHERE clause, etc.)
+  ModifyStmt modifyStmt_;
+  /////////////////////////////////////////
+  // START: Members that are set in createSourceStmt().
+
+  // Result of the analysis of the internal SelectStmt that produces the rows that
+  // will be modified.
+  protected SelectStmt sourceStmt_;
+
+  // Output expressions that produce the final results to write to the target table. May
+  // include casts.
+  //
+  // In case of DELETE statements it contains the columns that identify the deleted
+  // rows (Kudu primary keys, Iceberg file_path / position).
+  protected List<Expr> resultExprs_ = new ArrayList<>();
+
+  // Exprs corresponding to the partitionKeyValues, if specified, or to the partition
+  // columns for tables.
+  protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
+
+  // For every column of the target table that is referenced in the optional
+  // 'sort.columns' table property, this list will contain the corresponding result expr
+  // from 'resultExprs_'. Before insertion, all rows will be sorted by these exprs. If the
+  // list is empty, no additional sorting by non-partitioning columns will be performed.
+  // The column list must not contain partition columns and must be empty for non-Hdfs
+  // tables.
+  protected List<Expr> sortExprs_ = new ArrayList<>();
+
+  // Position mapping of output expressions of the sourceStmt_ to column indices in the
+  // target table. The i'th position in this list maps to the referencedColumns_[i]'th
+  // position in the target table.
+  protected List<Integer> referencedColumns_ = new ArrayList<>();
+  // END: Members that are set in first run of analyze
+  /////////////////////////////////////////
+
+  public ModifyImpl(ModifyStmt modifyStmt) {
+    modifyStmt_ = modifyStmt;
+  }
+
+  public void reset() {
+    if (sourceStmt_ != null) sourceStmt_.reset();
+  }
+
+  /**
+   * Builds and validates the sourceStmt_. The select list of the sourceStmt_ contains
+   * first the SlotRefs for the key Columns, followed by the expressions representing the
+   * assignments. This method sets the member variables for the sourceStmt_ and the
+   * referencedColumns_.
+   *
+   * This only creates the sourceStmt_ once, following invocations will reuse the
+   * previously created statement.
+   */
+  protected void createSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+    if (sourceStmt_ == null) {
+      // Builds the select list and column position mapping for the target table.
+      ArrayList<SelectListItem> selectList = new ArrayList<>();
+      buildAndValidateAssignmentExprs(analyzer, selectList);
+
+      // Analyze the generated select statement.
+      sourceStmt_ = new SelectStmt(new SelectList(selectList), modifyStmt_.fromClause_,
+          modifyStmt_.wherePredicate_,
+          null, null, null, null);
+
+      addCastsToAssignmentsInSourceStmt(analyzer);
+    }
+    sourceStmt_.analyze(analyzer);
+  }
+
+  /**
+   * Validates the list of value assignments that should be used to modify the target
+   * table. It verifies that only those columns are referenced that belong to the target
+   * table, no key columns are modified, and that a single column is not modified multiple
+   * times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list of
+   * SelectListItems to the out parameter selectList that is used to build the select list
+   * for sourceStmt_. A list of integers indicating the column position of an entry in the
+   * select list in the target table is written to the field referencedColumns_.
+   *
+   * In addition to the expressions that are generated for each assignment, the
+   * expression list contains an expression for each key column. The key columns
+   * are always prepended to the list of expression representing the assignments.
+   */
+  private void buildAndValidateAssignmentExprs(Analyzer analyzer,
+      List<SelectListItem> selectList)
+      throws AnalysisException {
+    // The order of the referenced columns equals the order of the result expressions
+    Set<SlotId> uniqueSlots = new HashSet<>();
+    Set<SlotId> keySlots = new HashSet<>();
+
+    // Mapping from column name to index
+    List<Column> cols = modifyStmt_.table_.getColumnsInHiveOrder();
+    Map<String, Integer> colIndexMap = new HashMap<>();
+    for (int i = 0; i < cols.size(); i++) {
+      colIndexMap.put(cols.get(i).getName(), i);
+    }
+
+    addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
+        keySlots, colIndexMap);
+
+    // Assignments are only used in the context of updates.
+    for (Pair<SlotRef, Expr> valueAssignment : modifyStmt_.assignments_) {
+      SlotRef lhsSlotRef = valueAssignment.first;
+      lhsSlotRef.analyze(analyzer);
+
+      Expr rhsExpr = valueAssignment.second;
+      // No subqueries for rhs expression
+      if (rhsExpr.contains(Subquery.class)) {
+        throw new AnalysisException(
+            format("Subqueries are not supported as update expressions for column '%s'",
+                lhsSlotRef.toSql()));
+      }
+      rhsExpr.analyze(analyzer);
+
+      // Correct target table
+      if (!lhsSlotRef.isBoundByTupleIds(modifyStmt_.targetTableRef_.getId().asList())) {
+        throw new AnalysisException(
+            format("Left-hand side column '%s' in assignment expression '%s=%s' does not "
+                + "belong to target table '%s'", lhsSlotRef.toSql(), lhsSlotRef.toSql(),
+                rhsExpr.toSql(),
+                modifyStmt_.targetTableRef_.getDesc().getTable().getFullName()));
+      }
+
+      Column c = lhsSlotRef.getResolvedPath().destColumn();
+      // TODO(Kudu) Add test for this code-path when Kudu supports nested types
+      if (c == null) {
+        throw new AnalysisException(
+            format("Left-hand side in assignment expression '%s=%s' must be a column " +
+                "reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
+      }
+
+      if (keySlots.contains(lhsSlotRef.getSlotId())) {
+        boolean isSystemGeneratedColumn =
+            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
+        throw new AnalysisException(format("%s column '%s' cannot be updated.",
+            isSystemGeneratedColumn ? "System generated key" : "Key",
+            lhsSlotRef.toSql()));
+      }
+
+      if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
+        throw new AnalysisException(
+            format("Duplicate value assignment to column: '%s'", lhsSlotRef.toSql()));
+      }
+
+      rhsExpr = StatementBase.checkTypeCompatibility(
+          modifyStmt_.targetTableRef_.getDesc().getTable().getFullName(),
+          c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
+      uniqueSlots.add(lhsSlotRef.getSlotId());
+      selectList.add(new SelectListItem(rhsExpr, null));
+      referencedColumns_.add(colIndexMap.get(c.getName()));
+    }
+  }
+
+  protected void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap, String colName, boolean isSortingColumn)
+      throws AnalysisException {
+    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
+        keySlots, colIndexMap, colName);
+    resultExprs_.add(ref);
+    if (isSortingColumn) sortExprs_.add(ref);
+  }
+
+  protected void addPartitioningColumn(Analyzer analyzer, List<SelectListItem> selectList,
+  List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+  Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
+    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
+        keySlots, colIndexMap, colName);
+    partitionKeyExprs_.add(ref);
+    sortExprs_.add(ref);
+  }
+
+  private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
+    List<String> path = Path.createRawPath(modifyStmt_.targetTableRef_.getUniqueAlias(),
+        colName);
+    SlotRef ref = new SlotRef(path);
+    ref.analyze(analyzer);
+    selectList.add(new SelectListItem(ref, null));
+    uniqueSlots.add(ref.getSlotId());
+    keySlots.add(ref.getSlotId());
+    referencedColumns.add(colIndexMap.get(colName));
+    return ref;
+  }
+
+  public List<Expr> getPartitionKeyExprs() {
+     return partitionKeyExprs_;
+  }
+
+  public List<Expr> getSortExprs() {
+    return sortExprs_;
+  }
+
+  public QueryStmt getQueryStmt() {
+    return sourceStmt_;
+  }
+
+  public List<Integer> getReferencedColumns() {
+    return referencedColumns_;
+  }
+
+  public void castResultExprs(List<Type> types) throws AnalysisException {
+    sourceStmt_.castResultExprs(types);
+  }
+
+  public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
+    sourceStmt_.rewriteExprs(rewriter);
+  }
+
+  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
+    partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 197f72bad..b3a798918 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -20,25 +20,16 @@ package org.apache.impala.analysis;
 import static java.lang.String.format;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.impala.authorization.Privilege;
-import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
-import org.apache.impala.catalog.IcebergPositionDeleteTable;
-import org.apache.impala.catalog.KuduColumn;
-import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.rewrite.ExprRewriter;
-import org.apache.impala.thrift.TIcebergFileFormat;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -53,12 +44,21 @@ import com.google.common.base.Preconditions;
  *   - assignmentExprs (not null, can be empty)
  *   - wherePredicate (nullable)
  *
- * In the analysis phase, a SelectStmt is created with the result expressions set to
- * match the right-hand side of the assignments in addition to projecting the key columns
- * of the underlying table. During query execution, the plan that
- * is generated from this SelectStmt produces all rows that need to be modified.
+ * This class holds information from parsing and semantic analysis. Then it delegates
+ * implementation logic to the *Impl classes, e.g. KuduDeleteImpl, IcebergDeleteImpl,
+ * etc.
+ * In the analysis phase, the impl object creates a SelectStmt with the result expressions
+ * which hold information about the modified records (e.g. primary keys of Kudu tables,
+ * file_path / pos information of Iceberg data records).
+ * During query execution, the plan that is generated from this SelectStmt produces
+ * all rows that need to be modified.
+ *
+ * UPDATEs:
+ * The result of the SelectStmt contain the right-hand side of the assignments in addition
+ * to projecting the key columns of the underlying table.
  */
 public abstract class ModifyStmt extends DmlStatementBase {
+
   // List of explicitly mentioned assignment expressions in the UPDATE's SET clause
   protected final List<Pair<SlotRef, Expr>> assignments_;
 
@@ -71,40 +71,14 @@ public abstract class ModifyStmt extends DmlStatementBase {
   // TableRef identifying the target table, set during analysis.
   protected TableRef targetTableRef_;
 
+  // FROM clause of the statement
   protected FromClause fromClause_;
 
   /////////////////////////////////////////
   // START: Members that are set in first run of analyze().
 
-  // Exprs correspond to the partitionKeyValues, if specified, or to the partition columns
-  // for tables.
-  protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
-
-  // For every column of the target table that is referenced in the optional
-  // 'sort.columns' table property, this list will contain the corresponding result expr
-  // from 'resultExprs_'. Before insertion, all rows will be sorted by these exprs. If the
-  // list is empty, no additional sorting by non-partitioning columns will be performed.
-  // The column list must not contain partition columns and must be empty for non-Hdfs
-  // tables.
-  protected List<Expr> sortExprs_ = new ArrayList<>();
-
-  // Output expressions that produce the final results to write to the target table. May
-  // include casts. Set in first run of analyze().
-  //
-  // In case of DELETE statements it contains the columns that identify the deleted rows.
-  protected List<Expr> resultExprs_ = new ArrayList<>();
-
-  // Result of the analysis of the internal SelectStmt that produces the rows that
-  // will be modified.
-  protected SelectStmt sourceStmt_;
-
   // Implementation of the modify statement. Depends on the target table type.
-  private ModifyImpl modifyImpl_;
-
-  // Position mapping of output expressions of the sourceStmt_ to column indices in the
-  // target table. The i'th position in this list maps to the referencedColumns_[i]'th
-  // position in the target table. Set in createSourceStmt() during analysis.
-  protected List<Integer> referencedColumns_ = new ArrayList<>();
+  protected ModifyImpl modifyImpl_;
 
   // END: Members that are set in first run of analyze
   /////////////////////////////////////////
@@ -137,10 +111,8 @@ public abstract class ModifyStmt extends DmlStatementBase {
   /**
    * The analysis of the ModifyStmt proceeds as follows: First, the FROM clause is
    * analyzed and the targetTablePath is verified to be a valid alias into the FROM
-   * clause. When the target table is identified, the assignment expressions are
-   * validated and as a last step the internal SelectStmt is produced and analyzed.
-   * Potential query rewrites for the select statement are implemented here and are not
-   * triggered externally by the statement rewriter.
+   * clause. It also identifies the target table. Raises errors for unsupported table
+   * types.
    */
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
@@ -172,7 +144,6 @@ public abstract class ModifyStmt extends DmlStatementBase {
 
     Preconditions.checkNotNull(targetTableRef_);
     FeTable dstTbl = targetTableRef_.getTable();
-    table_ = dstTbl;
     // Only Kudu and Iceberg tables can be updated.
     if (!(dstTbl instanceof FeKuduTable) && !(dstTbl instanceof FeIcebergTable)) {
       throw new AnalysisException(
@@ -180,154 +151,59 @@ public abstract class ModifyStmt extends DmlStatementBase {
               "but the following table is neither: %s",
               dstTbl.getFullName()));
     }
-    if (dstTbl instanceof FeKuduTable) {
-      modifyImpl_ = this.new ModifyKudu();
-    } else if (dstTbl instanceof FeIcebergTable) {
-      modifyImpl_ = this.new ModifyIceberg();
-    }
-
-    modifyImpl_.analyze(analyzer);
-
     // Make sure that the user is allowed to modify the target table. Use ALL because no
     // UPDATE / DELETE privilege exists yet (IMPALA-3840).
     analyzer.registerAuthAndAuditEvent(dstTbl, Privilege.ALL);
-
-    // Validates the assignments_ and creates the sourceStmt_.
-    if (sourceStmt_ == null) createSourceStmt(analyzer);
-    sourceStmt_.analyze(analyzer);
+    table_ = dstTbl;
+    if (modifyImpl_ == null) createModifyImpl();
+    modifyImpl_.analyze(analyzer);
+    // Create and analyze the source statement.
+    modifyImpl_.createSourceStmt(analyzer);
     // Add target table to descriptor table.
     analyzer.getDescTbl().setTargetTable(table_);
 
     sqlString_ = toSql();
   }
 
+  /**
+   * Creates the implementation class for this statement. Ony called once during the
+   * first run of analyze().
+   */
+  abstract protected void createModifyImpl();
+
   @Override
   public void reset() {
     super.reset();
     fromClause_.reset();
-    if (sourceStmt_ != null) sourceStmt_.reset();
-    modifyImpl_ = null;
+    modifyImpl_.reset();
   }
 
   @Override
-  public List<Expr> getPartitionKeyExprs() { return partitionKeyExprs_; }
+  public List<Expr> getPartitionKeyExprs() { return modifyImpl_.getPartitionKeyExprs(); }
   @Override
-  public List<Expr> getSortExprs() { return sortExprs_; }
+  public List<Expr> getSortExprs() { return modifyImpl_.getSortExprs(); }
 
-  @Override
-  public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
-    return sourceStmt_.resolveTableMask(analyzer);
+  public List<Integer> getReferencedColumns() {
+    return modifyImpl_.getReferencedColumns();
   }
 
-  /**
-   * Builds and validates the sourceStmt_. The select list of the sourceStmt_ contains
-   * first the SlotRefs for the key Columns, followed by the expressions representing the
-   * assignments. This method sets the member variables for the sourceStmt_ and the
-   * referencedColumns_.
-   *
-   * This is only run once, on the first analysis. Following analysis will reset() and
-   * reuse previously created statements.
-   */
-  private void createSourceStmt(Analyzer analyzer)
-      throws AnalysisException {
-    // Builds the select list and column position mapping for the target table.
-    ArrayList<SelectListItem> selectList = new ArrayList<>();
-    buildAndValidateAssignmentExprs(analyzer, selectList);
-
-    // Analyze the generated select statement.
-    sourceStmt_ = new SelectStmt(new SelectList(selectList), fromClause_, wherePredicate_,
-        null, null, null, null);
-
-    modifyImpl_.addCastsToAssignmentsInSourceStmt(analyzer);
-  }
-
-  /**
-   * Validates the list of value assignments that should be used to modify the target
-   * table. It verifies that only those columns are referenced that belong to the target
-   * table, no key columns are modified, and that a single column is not modified multiple
-   * times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list of
-   * SelectListItems to the out parameter selectList that is used to build the select list
-   * for sourceStmt_. A list of integers indicating the column position of an entry in the
-   * select list in the target table is written to the out parameter referencedColumns.
-   *
-   * In addition to the expressions that are generated for each assignment, the
-   * expression list contains an expression for each key column. The key columns
-   * are always prepended to the list of expression representing the assignments.
-   */
-  private void buildAndValidateAssignmentExprs(Analyzer analyzer,
-      List<SelectListItem> selectList)
-      throws AnalysisException {
-    // The order of the referenced columns equals the order of the result expressions
-    Set<SlotId> uniqueSlots = new HashSet<>();
-    Set<SlotId> keySlots = new HashSet<>();
-
-    // Mapping from column name to index
-    List<Column> cols = table_.getColumnsInHiveOrder();
-    Map<String, Integer> colIndexMap = new HashMap<>();
-    for (int i = 0; i < cols.size(); i++) {
-      colIndexMap.put(cols.get(i).getName(), i);
-    }
-
-    modifyImpl_.addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
-        keySlots, colIndexMap);
-
-    // Assignments are only used in the context of updates.
-    for (Pair<SlotRef, Expr> valueAssignment : assignments_) {
-      SlotRef lhsSlotRef = valueAssignment.first;
-      lhsSlotRef.analyze(analyzer);
+  public Expr getWherePredicate() { return wherePredicate_; }
 
-      Expr rhsExpr = valueAssignment.second;
-      // No subqueries for rhs expression
-      if (rhsExpr.contains(Subquery.class)) {
-        throw new AnalysisException(
-            format("Subqueries are not supported as update expressions for column '%s'",
-                lhsSlotRef.toSql()));
-      }
-      rhsExpr.analyze(analyzer);
-
-      // Correct target table
-      if (!lhsSlotRef.isBoundByTupleIds(targetTableRef_.getId().asList())) {
-        throw new AnalysisException(
-            format("Left-hand side column '%s' in assignment expression '%s=%s' does not "
-                + "belong to target table '%s'", lhsSlotRef.toSql(), lhsSlotRef.toSql(),
-                rhsExpr.toSql(), targetTableRef_.getDesc().getTable().getFullName()));
-      }
-
-      Column c = lhsSlotRef.getResolvedPath().destColumn();
-      // TODO(Kudu) Add test for this code-path when Kudu supports nested types
-      if (c == null) {
-        throw new AnalysisException(
-            format("Left-hand side in assignment expression '%s=%s' must be a column " +
-                "reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
-      }
-
-      if (keySlots.contains(lhsSlotRef.getSlotId())) {
-        boolean isSystemGeneratedColumn =
-            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
-        throw new AnalysisException(format("%s column '%s' cannot be updated.",
-            isSystemGeneratedColumn ? "System generated key" : "Key",
-            lhsSlotRef.toSql()));
-      }
-
-      if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
-        throw new AnalysisException(
-            format("Duplicate value assignment to column: '%s'", lhsSlotRef.toSql()));
-      }
+  public List<Pair<SlotRef, Expr>> getAssignments() { return assignments_; }
 
-      rhsExpr = checkTypeCompatibility(targetTableRef_.getDesc().getTable().getFullName(),
-          c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
-      uniqueSlots.add(lhsSlotRef.getSlotId());
-      selectList.add(new SelectListItem(rhsExpr, null));
-      referencedColumns_.add(colIndexMap.get(c.getName()));
-    }
+  @Override
+  public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
+    return getQueryStmt().resolveTableMask(analyzer);
   }
 
   @Override
-  public List<Expr> getResultExprs() { return sourceStmt_.getResultExprs(); }
+  public List<Expr> getResultExprs() {
+    return modifyImpl_.getQueryStmt().getResultExprs();
+  }
 
   @Override
   public void castResultExprs(List<Type> types) throws AnalysisException {
-    sourceStmt_.castResultExprs(types);
+    modifyImpl_.castResultExprs(types);
   }
 
   @Override
@@ -336,165 +212,16 @@ public abstract class ModifyStmt extends DmlStatementBase {
     for (Pair<SlotRef, Expr> assignment: assignments_) {
       assignment.second = rewriter.rewrite(assignment.second, analyzer_);
     }
-    sourceStmt_.rewriteExprs(rewriter);
+    modifyImpl_.rewriteExprs(rewriter);
   }
 
-  public QueryStmt getQueryStmt() { return sourceStmt_; }
+  public QueryStmt getQueryStmt() { return modifyImpl_.getQueryStmt(); }
 
   /**
    * Return true if the target table is Kudu table.
-   * Since only Kudu tables can be updated, it must be true.
    */
   public boolean isTargetTableKuduTable() { return (table_ instanceof FeKuduTable); }
 
-  private void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-      Map<String, Integer> colIndexMap, String colName, boolean isSortingColumn)
-      throws AnalysisException {
-    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
-        keySlots, colIndexMap, colName);
-    resultExprs_.add(ref);
-    if (isSortingColumn) sortExprs_.add(ref);
-  }
-
-  private void addPartitioningColumn(Analyzer analyzer, List<SelectListItem> selectList,
-  List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-  Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
-    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
-        keySlots, colIndexMap, colName);
-    partitionKeyExprs_.add(ref);
-    sortExprs_.add(ref);
-  }
-
-  private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-      Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
-    List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), colName);
-    SlotRef ref = new SlotRef(path);
-    ref.analyze(analyzer);
-    selectList.add(new SelectListItem(ref, null));
-    uniqueSlots.add(ref.getSlotId());
-    keySlots.add(ref.getSlotId());
-    referencedColumns.add(colIndexMap.get(colName));
-    return ref;
-  }
-
   @Override
   public abstract String toSql(ToSqlOptions options);
-
-  private interface ModifyImpl {
-    void analyze(Analyzer analyzer) throws AnalysisException;
-
-    void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-      throws AnalysisException;
-
-    void addKeyColumns(Analyzer analyzer,
-        List<SelectListItem> selectList, List<Integer> referencedColumns,
-        Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap)
-        throws AnalysisException;
-  }
-
-  private class ModifyKudu implements ModifyImpl {
-    // Target Kudu table. Result of analysis.
-    FeKuduTable kuduTable_ = (FeKuduTable)table_;
-
-    @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException {}
-
-    @Override
-    public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-        throws AnalysisException {
-      // cast result expressions to the correct type of the referenced slot of the
-      // target table
-      int keyColumnsOffset = kuduTable_.getPrimaryKeyColumnNames().size();
-      for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) {
-        sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo(
-            assignments_.get(i - keyColumnsOffset).first.getType()));
-      }
-    }
-
-    @Override
-    public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
-        List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-        Map<String, Integer> colIndexMap) throws AnalysisException {
-      // Add the key columns as slot refs
-      for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
-        addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
-            colIndexMap, k, false);
-      }
-    }
-  }
-
-  private class ModifyIceberg implements ModifyImpl {
-    FeIcebergTable originalTargetTable_;
-    IcebergPositionDeleteTable icePosDelTable_;
-
-    public ModifyIceberg() {
-      originalTargetTable_ = (FeIcebergTable)table_;
-      icePosDelTable_ = new IcebergPositionDeleteTable((FeIcebergTable)table_);
-      // Make the virtual position delete table the new target table.
-      table_ = icePosDelTable_;
-    }
-
-    @Override
-    public void analyze(Analyzer analyzer) throws AnalysisException {
-      setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
-      if (ModifyStmt.this instanceof UpdateStmt) {
-        throw new AnalysisException("UPDATE is not supported for Iceberg table " +
-            originalTargetTable_.getFullName());
-      }
-
-      if (icePosDelTable_.getFormatVersion() == 1) {
-        throw new AnalysisException("Iceberg V1 table do not support DELETE/UPDATE " +
-            "operations: " + originalTargetTable_.getFullName());
-      }
-
-      String deleteMode = originalTargetTable_.getIcebergApiTable().properties().get(
-          org.apache.iceberg.TableProperties.DELETE_MODE);
-      if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
-        throw new AnalysisException(String.format("Unsupported delete mode: '%s' for " +
-            "Iceberg table: %s", deleteMode, originalTargetTable_.getFullName()));
-      }
-
-      if (originalTargetTable_.getDeleteFileFormat() != TIcebergFileFormat.PARQUET) {
-        throw new AnalysisException("Impala can only write delete files in PARQUET, " +
-            "but the given table uses a different file format: " +
-            originalTargetTable_.getFullName());
-      }
-
-      if (wherePredicate_ == null ||
-          org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate_)) {
-        // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE TABLE t;
-        throw new AnalysisException("For deleting every row, please use TRUNCATE.");
-      }
-    }
-
-    @Override
-    public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-        throws AnalysisException {
-    }
-
-    @Override
-    public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
-        List<Integer> referencedColumns,
-        Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap)
-        throws AnalysisException {
-      if (originalTargetTable_.isPartitioned()) {
-        String[] partitionCols;
-        partitionCols = new String[] {"PARTITION__SPEC__ID",
-            "ICEBERG__PARTITION__SERIALIZED"};
-        for (String k : partitionCols) {
-          addPartitioningColumn(analyzer, selectList, referencedColumns, uniqueSlots,
-              keySlots, colIndexMap, k);
-        }
-      }
-      String[] deleteCols;
-      deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
-      // Add the key columns as slot refs
-      for (String k : deleteCols) {
-        addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
-            colIndexMap, k, true);
-      }
-    }
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index 8f8dfee71..9e36ed1db 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -22,13 +22,12 @@ import static java.lang.String.format;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
-import org.apache.impala.planner.TableSink;
-import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 /**
  * Representation of an Update statement.
@@ -58,19 +57,19 @@ public class UpdateStmt extends ModifyStmt {
         new ArrayList<>(), other.wherePredicate_);
   }
 
+  @Override
+  protected void createModifyImpl() {
+    // Currently only Kudu tables are supported.
+    Preconditions.checkState(table_ instanceof FeKuduTable);
+    modifyImpl_ = new KuduUpdateImpl(this);
+  }
+
   /**
    * Return an instance of a KuduTableSink specialized as an Update operation.
    */
-  public DataSink createDataSink(List<Expr> resultExprs) {
+  public DataSink createDataSink() {
     // analyze() must have been called before.
-    Preconditions.checkState(table_ != null);
-    DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE,
-        ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
-        getKuduTransactionToken(),
-        0);
-    Preconditions.checkState(!referencedColumns_.isEmpty());
-    return dataSink;
+    return modifyImpl_.createDataSink();
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 9a35bef7e..c33761a8d 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -175,7 +175,7 @@ public class Planner {
       if (ctx_.isUpdate()) {
         // Set up update sink for root fragment
         rootFragment.setSink(
-            ctx_.getAnalysisResult().getUpdateStmt().createDataSink(resultExprs));
+            ctx_.getAnalysisResult().getUpdateStmt().createDataSink());
       } else if (ctx_.isDelete()) {
         // Set up delete sink for root fragment
         DeleteStmt deleteStmt = ctx_.getAnalysisResult().getDeleteStmt();