You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/12/11 07:13:54 UTC

(impala) branch master updated (d75807a19 -> d9c067aa8)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from d75807a19 IMPALA-12385: Enable Periodic metrics by default
     new e326b3cc0 IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables
     new 3381fbf76 IMPALA-12595: Allow automatic removal of old logs from previous PID
     new d9c067aa8 IMPALA-12398: Fix Ranger role not exists when altering db/table/view owner to a role

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/global-flags.cc                      |   6 +
 be/src/common/logging.cc                           |  14 +-
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/data-sink.cc                           |   5 +
 be/src/exec/iceberg-delete-sink.cc                 |  44 ++-
 be/src/exec/iceberg-delete-sink.h                  |  20 ++
 be/src/exec/multi-table-sink.cc                    |  98 ++++++
 be/src/exec/multi-table-sink.h                     |  75 ++++
 be/src/exec/table-sink-base.cc                     |  18 +-
 be/src/exec/table-sink-base.h                      |   4 +-
 be/src/runtime/dml-exec-state.cc                   |  62 +++-
 be/src/runtime/dml-exec-state.h                    |  21 +-
 be/src/service/client-request-state.cc             |   7 +-
 bin/start-impala-cluster.py                        |   9 +
 common/protobuf/control_service.proto              |  16 +-
 common/thrift/DataSinks.thrift                     |   4 +
 common/thrift/ImpalaService.thrift                 |   1 +
 common/thrift/Types.thrift                         |   3 +-
 .../impala/analysis/AlterDbSetOwnerStmt.java       |  12 +-
 .../analysis/AlterTableOrViewSetOwnerStmt.java     |  12 +-
 .../org/apache/impala/analysis/DeleteStmt.java     |   2 +
 .../apache/impala/analysis/DescriptorTable.java    |  19 +-
 .../apache/impala/analysis/DmlStatementBase.java   |   3 +
 .../apache/impala/analysis/IcebergDeleteImpl.java  |  54 ++-
 .../apache/impala/analysis/IcebergModifyImpl.java  |  98 +++---
 .../apache/impala/analysis/IcebergUpdateImpl.java  | 196 +++++++++++
 .../org/apache/impala/analysis/InsertStmt.java     |  92 +----
 .../org/apache/impala/analysis/KuduDeleteImpl.java |   4 +-
 .../org/apache/impala/analysis/KuduModifyImpl.java | 132 ++++++-
 .../org/apache/impala/analysis/ModifyImpl.java     | 220 ++++--------
 .../org/apache/impala/analysis/ModifyStmt.java     |  12 +-
 .../org/apache/impala/analysis/OptimizeStmt.java   |  13 +-
 .../org/apache/impala/analysis/UpdateStmt.java     |  16 +-
 .../impala/authorization/AuthorizationChecker.java |   5 +
 .../authorization/NoopAuthorizationFactory.java    |   6 +
 .../ranger/RangerAuthorizationChecker.java         |   5 +
 .../impala/authorization/ranger/RangerUtil.java    |   9 +
 .../org/apache/impala/planner/HdfsTableSink.java   |  22 +-
 .../apache/impala/planner/IcebergDeleteSink.java   |  43 +--
 .../org/apache/impala/planner/MultiDataSink.java   | 127 +++++++
 .../java/org/apache/impala/planner/Planner.java    |  53 +--
 .../java/org/apache/impala/service/Frontend.java   |  33 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |   4 +
 .../main/java/org/apache/impala/util/ExprUtil.java |   7 +
 .../java/org/apache/impala/util/IcebergUtil.java   | 113 ++++++
 .../authorization/AuthorizationStmtTest.java       |  12 +
 .../authorization/AuthorizationTestBase.java       |   7 +-
 .../CatalogServiceTestCatalogWithRanger.java       | 104 ++++++
 .../org/apache/impala/common/FrontendTestBase.java |   5 +
 .../org/apache/impala/planner/PlannerTest.java     |   6 +
 .../impala/testutil/CatalogServiceTestCatalog.java |  21 +-
 shell/impala_client.py                             |  14 +-
 shell/impala_shell.py                              |   9 +-
 .../functional/functional_schema_template.sql      |   6 +-
 .../queries/PlannerTest/iceberg-v2-update.test     | 344 ++++++++++++++++++
 .../queries/PlannerTest/insert-sort-by-zorder.test |   3 +-
 .../queries/PlannerTest/insert.test                |   4 -
 .../queries/QueryTest/iceberg-negative.test        |  78 ++++-
 .../queries/QueryTest/iceberg-update-basic.test    | 388 +++++++++++++++++++++
 .../queries/QueryTest/ranger_column_masking.test   |  20 ++
 .../queries/QueryTest/ranger_row_filtering.test    |  21 ++
 tests/authorization/test_ranger.py                 |  24 ++
 tests/common/custom_cluster_test_suite.py          |   8 +-
 tests/custom_cluster/test_breakpad.py              |  98 +++++-
 tests/query_test/test_iceberg.py                   |  68 +++-
 tests/stress/test_update_stress.py                 | 105 ++++++
 66 files changed, 2606 insertions(+), 459 deletions(-)
 create mode 100644 be/src/exec/multi-table-sink.cc
 create mode 100644 be/src/exec/multi-table-sink.h
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
 create mode 100644 fe/src/main/java/org/apache/impala/planner/MultiDataSink.java
 create mode 100644 fe/src/test/java/org/apache/impala/authorization/CatalogServiceTestCatalogWithRanger.java
 create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
 create mode 100644 tests/stress/test_update_stress.py


(impala) 02/03: IMPALA-12595: Allow automatic removal of old logs from previous PID

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3381fbf761989ce46da4f83a952bd247dc652ff8
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Dec 4 14:25:35 2023 -0800

    IMPALA-12595: Allow automatic removal of old logs from previous PID
    
    IMPALA-11184 add code to target specific PID for log rotation. This
    align with glog behavior and grant safety. That is, it is strictly limit
    log rotation to only consider log files made by the currently running
    Impalad and exclude logs made by previous PID or other living-colocated
    Impalads. The downside of this limit is that logs can start accumulate
    in a node when impalad is frequently restarted and is only resolvable by
    admin doing manual log removal.
    
    To help avoid this manual removal, this patch adds a backend flag
    'log_rotation_match_pid' that relax the limit by dropping the PID in
    glob pattern. Default value for this new flag is False. However, for
    testing purpose, start-impala-cluster.py will override it to True since
    test minicluster logs to a common log directory. Setting
    'log_rotation_match_pid' to True will prevent one impalad from
    interfering with log rotation of other impalad in minicluster.
    
    As a minimum exercise for this new log rotation behavior,
    test_breakpad.py::TestLogging is modified to invoke
    start-impala-cluster.py with 'log_rotation_match_pid' set to False.
    
    Testing:
    - Add test_excessive_cerr_ignore_pid and test_excessive_cerr_match_pid.
    - Split TestLogging into two. One run test_excessive_cerr_ignore_pid in
      core exploration, while the other run the rest of logging tests in
      exhaustive exploration.
    - Pass exhaustive tests.
    
    Change-Id: I599799e73f27f941a1d7f3dec0f40b4f05ea5ceb
    Reviewed-on: http://gerrit.cloudera.org:8080/20754
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc             |  6 ++
 be/src/common/logging.cc                  | 14 ++++-
 bin/start-impala-cluster.py               |  9 +++
 tests/common/custom_cluster_test_suite.py |  8 ++-
 tests/custom_cluster/test_breakpad.py     | 98 +++++++++++++++++++++++++------
 5 files changed, 112 insertions(+), 23 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index b7190ff48..906a99ab3 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -232,6 +232,12 @@ DEFINE_bool(redirect_stdout_stderr, true,
 DEFINE_int32(max_log_files, 10, "Maximum number of log files to retain per severity "
     "level. The most recent log files are retained. If set to 0, all log files are "
     "retained.");
+DEFINE_bool(log_rotation_match_pid, false,
+    "If set to True, Impala log rotation will only consider log files that match with "
+    "PID of currently running service. Otherwise, log rotation will ignore the PID in "
+    "log file names and may remove older log files from previous PID run. "
+    "Set to True if log files from prior run must be retained or when running multiple "
+    "instances of same service with common log directory. Default to False.");
 
 static const string re2_mem_limit_help_msg =
     "Maximum bytes of memory to be used by re2's regex engine "
diff --git a/be/src/common/logging.cc b/be/src/common/logging.cc
index 825767ffa..f05a7815c 100644
--- a/be/src/common/logging.cc
+++ b/be/src/common/logging.cc
@@ -44,6 +44,7 @@ DECLARE_string(redaction_rules_file);
 DECLARE_string(log_filename);
 DECLARE_bool(redirect_stdout_stderr);
 DECLARE_int32(max_log_size);
+DECLARE_bool(log_rotation_match_pid);
 
 using boost::uuids::random_generator;
 using impala::TUniqueId;
@@ -98,10 +99,17 @@ impala::Status ResolveLogSymlink(const string& symlink_path, string& canonical_p
 // We specifically target the base file name created by glog.
 // Glog's default base file name follow this pattern:
 // "<program name>.<hostname>.<user name>.log.<severity level>.<date>-<time>.<pid>"
+// IMPALA-12595: FLAGS_log_rotation_match_pid is added to control whether to match against
+// PID or not.
 inline string GlobPatternForLog(google::LogSeverity severity) {
-  return strings::Substitute("$0/$1*.log.$2.*.$3", FLAGS_log_dir,
-      google::ProgramInvocationShortName(), google::GetLogSeverityName(severity),
-      getpid());
+  if (FLAGS_log_rotation_match_pid) {
+    return strings::Substitute("$0/$1*.log.$2.*.$3", FLAGS_log_dir,
+        google::ProgramInvocationShortName(), google::GetLogSeverityName(severity),
+        getpid());
+  } else {
+    return strings::Substitute("$0/$1*.log.$2.*", FLAGS_log_dir,
+        google::ProgramInvocationShortName(), google::GetLogSeverityName(severity));
+  }
 }
 
 impala::Status GetLatestCanonicalLogPath(
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 4fb86f224..ea274dedd 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -105,6 +105,10 @@ parser.add_option("--max_log_files", default=DEFAULT_IMPALA_MAX_LOG_FILES,
                   help="Max number of log files before rotation occurs.")
 parser.add_option("--log_level", type="int", dest="log_level", default=1,
                    help="Set the impalad backend logging level")
+parser.add_option("--ignore_pid_on_log_rotation", dest="ignore_pid_on_log_rotation",
+                  action='store_true', default=False,
+                  help=("Determine if log rotation should ignore or match PID in "
+                        "log file name."))
 parser.add_option("--jvm_args", dest="jvm_args", default="",
                   help="Additional arguments to pass to the JVM(s) during startup.")
 parser.add_option("--env_vars", dest="env_vars", default="",
@@ -282,6 +286,11 @@ def build_logging_args(service_name):
   logging"""
   result = ["-logbufsecs=5", "-v={0}".format(options.log_level),
       "-max_log_files={0}".format(options.max_log_files)]
+  if not options.ignore_pid_on_log_rotation:
+    # IMPALA-12595: ignore_pid_on_log_rotation default to False in this script.
+    # This is because multiple impalads still logs to the same log_dir in minicluster
+    # and we want to keep all logs for debugging purpose.
+    result += ["-log_rotation_match_pid=true"]
   if options.docker_network is None:
     # Impala inside a docker container should always log to the same location.
     result += ["-log_filename={0}".format(service_name),
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 17be12ad3..104be6c09 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -292,7 +292,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
                             expected_subscribers=0,
                             default_query_options=None,
                             statestored_timeout_s=60,
-                            impalad_timeout_s=60):
+                            impalad_timeout_s=60,
+                            ignore_pid_on_log_rotation=False):
     cls.impala_log_dir = impala_log_dir
     # We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests specifically test that
     # certain custom startup arguments work and we want to keep them independent of dev
@@ -304,6 +305,11 @@ class CustomClusterTestSuite(ImpalaTestSuite):
            '--log_dir=%s' % impala_log_dir,
            '--log_level=%s' % log_level]
 
+    if ignore_pid_on_log_rotation:
+      # IMPALA-12595: Ignore PID on log rotation for all custom cluster tests.
+      # Most of test in custom_cluster need to match PID, except some test for logging.
+      cmd.append('--ignore_pid_on_log_rotation')
+
     if use_exclusive_coordinators:
       cmd.append("--use_exclusive_coordinators")
 
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index 38c827588..81173e6a8 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -388,15 +388,12 @@ class TestBreakpadExhaustive(TestBreakpadBase):
     assert reduced_minidump_size < full_minidump_size
 
 
-class TestLogging(TestBreakpadBase):
-  """Exhaustive tests to check that impala log is rolled periodically, obeying
-  max_log_size and max_log_files, even in the presence of heavy stderr writing.
-  """
+class TestLoggingBase(TestBreakpadBase):
+  _default_max_log_files = 2
+
   @classmethod
   def setup_class(cls):
-    if cls.exploration_strategy() != 'exhaustive':
-      pytest.skip('These logging tests only run in exhaustive')
-    super(TestLogging, cls).setup_class()
+    super(TestLoggingBase, cls).setup_class()
 
   def start_cluster_with_args(self, cluster_size, log_dir, **kwargs):
     cluster_options = []
@@ -404,9 +401,11 @@ class TestLogging(TestBreakpadBase):
       daemon_options = " ".join("-{0}={1}".format(k, v) for k, v in kwargs.items())
       cluster_options.append("--{0}={1}".format(daemon_arg, daemon_options))
     self._start_impala_cluster(cluster_options, cluster_size=cluster_size,
-                               expected_num_impalads=cluster_size, impala_log_dir=log_dir)
+                               expected_num_impalads=cluster_size,
+                               impala_log_dir=log_dir,
+                               ignore_pid_on_log_rotation=True)
 
-  def assert_logs(self, daemon, max_count, max_bytes):
+  def assert_logs(self, daemon, max_count, max_bytes, match_pid=True):
     """Assert that there are at most 'max_count' of INFO + ERROR log files for the
     specified daemon and the individual file size does not exceed 'max_bytes'.
     Also assert that stdout/stderr are redirected to correct file on each rotation."""
@@ -415,11 +414,11 @@ class TestLogging(TestBreakpadBase):
                 + glob.glob("%s/%s*log.INFO.*" % (log_dir, daemon))
     assert len(log_paths) <= max_count
 
-    # group log_paths by pid and kind
+    # group log_paths by kind and pid (if match_pid).
     log_group = {}
     for path in sorted(log_paths):
       tok = path.split('.')
-      key = tok[-1] + '.' + tok[-3]  # pid + kind
+      key = tok[-1] + '.' + tok[-3] if match_pid else tok[-3]
       if key in log_group:
         log_group[key].append(path)
       else:
@@ -453,13 +452,15 @@ class TestLogging(TestBreakpadBase):
     except OSError:
       pass
 
-  def start_excessive_cerr_cluster(self, test_cluster_size=1, remove_symlink=False):
+  def start_excessive_cerr_cluster(self, test_cluster_size=1, remove_symlink=False,
+                                   match_pid=True, max_log_count_begin=0,
+                                   max_log_count_end=None):
     """Check that impalad log is kept being rotated when most writing activity is coming
     from stderr stream.
     Along with LogFaultInjectionThread in init.cc, this test will fill impalad error logs
     with approximately 128kb error messages per second."""
     test_logbufsecs = 3
-    test_max_log_files = 2
+    test_max_log_files = self._default_max_log_files
     test_max_log_size = 1  # 1 MB
     test_error_msg = ('123456789abcde_' * 64)  # 1 KB error message
     test_debug_actions = 'LOG_MAINTENANCE_STDERR:FAIL@1.0@' + test_error_msg
@@ -467,26 +468,69 @@ class TestLogging(TestBreakpadBase):
     os.chmod(self.tmp_dir, 0o744)
 
     expected_log_max_bytes = int(1.2 * 1024**2)  # 1.2 MB
-    self.assert_logs(daemon, 0, expected_log_max_bytes)
+    self.assert_logs(daemon, max_log_count_begin, expected_log_max_bytes)
     self.start_cluster_with_args(test_cluster_size, self.tmp_dir,
                                  logbufsecs=test_logbufsecs,
                                  max_log_files=test_max_log_files,
                                  max_log_size=test_max_log_size,
-                                 debug_actions=test_debug_actions)
+                                 debug_actions=test_debug_actions,
+                                 log_rotation_match_pid=('1' if match_pid else '0'))
     self.wait_for_num_processes(daemon, test_cluster_size, 30)
     # Count both INFO and ERROR logs
-    expected_log_max_count = test_max_log_files * test_cluster_size * 2
+    if max_log_count_end is None:
+      max_log_count_end = test_max_log_files * test_cluster_size * 2
     # Wait for log maintenance thread to flush and rotate the logs asynchronously.
+    duration = test_logbufsecs * 10
     start = time.time()
-    while (time.time() - start < 40):
+    while (time.time() - start < duration):
       time.sleep(1)
-      self.assert_logs(daemon, expected_log_max_count, expected_log_max_bytes)
+      self.assert_logs(daemon, max_log_count_end, expected_log_max_bytes)
       if (remove_symlink):
         pattern = self.tmp_dir + '/' + daemon + '*'
         symlinks = glob.glob(pattern + '.INFO') + glob.glob(pattern + '.ERROR')
         for symlink in symlinks:
           self.silent_remove(symlink)
 
+
+class TestLogging(TestLoggingBase):
+  """Core tests to check that impala log is rolled periodically, obeying
+  max_log_size, max_log_files, and log_rotation_match_pid even in the presence of heavy
+  stderr writing.
+  """
+
+  @classmethod
+  def setup_class(cls):
+    super(TestLogging, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  def test_excessive_cerr_ignore_pid(self):
+    """Test excessive cerr activity twice with restart in between and no PID matching."""
+    self.start_excessive_cerr_cluster(test_cluster_size=1, remove_symlink=False,
+                                      match_pid=False)
+    self.kill_cluster(SIGTERM)
+    # There should be no impalad/catalogd/statestored running.
+    assert self.get_num_processes('impalad') == 0
+    assert self.get_num_processes('catalogd') == 0
+    assert self.get_num_processes('statestored') == 0
+    max_count = self._default_max_log_files * 2
+    self.start_excessive_cerr_cluster(test_cluster_size=1, remove_symlink=False,
+                                      match_pid=False,
+                                      max_log_count_begin=max_count,
+                                      max_log_count_end=max_count)
+
+
+class TestLoggingExhaustive(TestLoggingBase):
+  """Exhaustive tests to check that impala log is rolled periodically, obeying
+  max_log_size, max_log_files, and log_rotation_match_pid even in the presence of heavy
+  stderr writing.
+  """
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('TestLoggingExhaustive only run in exhaustive')
+    super(TestLoggingExhaustive, cls).setup_class()
+
   @pytest.mark.execute_serially
   def test_excessive_cerr(self):
     """Test excessive cerr activity with single node cluster."""
@@ -495,4 +539,20 @@ class TestLogging(TestBreakpadBase):
   @pytest.mark.execute_serially
   def test_excessive_cerr_no_symlink(self):
     """Test excessive cerr activity with two node cluster and missing log symlinks."""
-    self.start_excessive_cerr_cluster(2, True)
+    self.start_excessive_cerr_cluster(test_cluster_size=2, remove_symlink=False)
+
+  @pytest.mark.execute_serially
+  def test_excessive_cerr_match_pid(self):
+    """Test excessive cerr activity twice with restart in between and PID matching."""
+    self.start_excessive_cerr_cluster(1, remove_symlink=False, match_pid=True)
+    self.kill_cluster(SIGTERM)
+    # There should be no impalad/catalogd/statestored running.
+    assert self.get_num_processes('impalad') == 0
+    assert self.get_num_processes('catalogd') == 0
+    assert self.get_num_processes('statestored') == 0
+    max_count_begin = self._default_max_log_files * 2
+    max_count_end = max_count_begin * 2
+    self.start_excessive_cerr_cluster(test_cluster_size=1, remove_symlink=False,
+                                      match_pid=True,
+                                      max_log_count_begin=max_count_begin,
+                                      max_log_count_end=max_count_end)


(impala) 01/03: IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e326b3cc0d9dbfe12ee93ea2b13a23a2e35a3450
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Oct 27 18:23:27 2023 +0200

    IMPALA-12313: (part 2) Limited UPDATE support for Iceberg tables
    
    This patch adds limited UPDATE support for Iceberg tables. The
    limitations mean users cannot update Iceberg tables if any of
    the following is true:
     * UPDATE value of partitioning column
     * UPDATE table that went through partition evolution
     * Table has SORT BY properties
    
    The above limitations will be resolved by part 3. The usual limitations
    like writing non-Parquet files, using copy-on-write, modifying V1 tables
    are out of scope of IMPALA-12313.
    
    This patch implements UPDATEs with the merge-on-read technique. This
    means the UPDATE statement writes both data files and delete files.
    Data files contain the updated records, delete files contain the
    position delete records of the old data records that have been
    touched.
    
    To achieve the above this patch introduces a new sink: MultiDataSink.
    We can configure multiple TableSinks for a single MultiDataSink object.
    During execution, the row batches sent to the MultiDataSink will be
    forwarded to all the TableSinks that have been registered.
    
    The UPDATE statement for an Iceberg table creates a source select
    statement with all table columns and virtual columns INPUT__FILE__NAME
    and FILE__POSITION. E.g. imagine we have a table 'tbl' with schema
    (i int, s string, k int), and we update the table with:
    
      UPDATE tbl SET k = 5 WHERE i % 100 = 11;
    
     The generated source statement will be ==>
    
      SELECT i, s, 5, INPUT__FILE__NAME, FILE__POSITION
      FROM tbl WHERE i % 100 = 11;
    
    Then we create two table sinks that refer to expressions from the above
    source statement:
    
      Insert sink (i, s, 5)
      Delete sink (INPUT__FILE__NAME, FILE__POSITION)
    
    The tuples in the rowbatch of MultiDataSink contain slots for all the
    above expressions (i, s, 5, INPUT__FILE__NAME, FILE__POSITION).
    MultiDataSink forwards each row batch to each registered TableSink.
    They will pick their relevant expressions from the tuple and write
    data/delete files. The tuples are sorted by INPUTE__FILE__NAME and
    FILE__POSITION because we need to write the delete records in this
    order.
    
    For partitioned tables we need to shuffle and sort the input tuples.
    In this case we also add virtual columns "PARTITION__SPEC__ID" and
    "ICEBERG__PARTITION__SERIALIZED" to the source statement and shuffle
    and sort the rows based on them.
    
    Data files and delete files are now separated in the DmlExecState, so
    at the end of the operation we'll have two sets of files. We use these
    two sets to create a new Iceberg snapshot.
    
    Why does this patch have the limitations?
     - Because we are shuffling and sorting rows based on the delete
       records and their partitions. This means that the new data files
       might not get written in an efficient way, e.g. there will be
       too many of them, or we will need to keep too many open file
       handles during writing.
       Also, if the table has SORT BY properties, we cannot respect
       it as the input rows are ordered in a way to favor the position
       deletes.
       Part 3 will introduce a buffering writer for position delete
       files. This means we will shuffle and sort records based on
       the data records' partitions and SORT BY properties while
       delete records get buffered and written out at the end (sorted
       by file_path and position). In some edge cases the delete records
       might not get written efficiently, but it is a smaller problem
       then inefficient data files.
    
    Testing:
     * negative tests
     * planner tests
     * update all supported data types
     * partitioned tables
     * Impala/Hive interop tests
     * authz tests
     * concurrent tests
    
    Change-Id: Iff0ef6075a2b6ebe130d15daa389ac1a505a7a08
    Reviewed-on: http://gerrit.cloudera.org:8080/20677
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/data-sink.cc                           |   5 +
 be/src/exec/iceberg-delete-sink.cc                 |  44 ++-
 be/src/exec/iceberg-delete-sink.h                  |  20 ++
 be/src/exec/multi-table-sink.cc                    |  98 ++++++
 be/src/exec/multi-table-sink.h                     |  75 ++++
 be/src/exec/table-sink-base.cc                     |  18 +-
 be/src/exec/table-sink-base.h                      |   4 +-
 be/src/runtime/dml-exec-state.cc                   |  62 +++-
 be/src/runtime/dml-exec-state.h                    |  21 +-
 be/src/service/client-request-state.cc             |   7 +-
 common/protobuf/control_service.proto              |  16 +-
 common/thrift/DataSinks.thrift                     |   4 +
 common/thrift/ImpalaService.thrift                 |   1 +
 common/thrift/Types.thrift                         |   3 +-
 .../org/apache/impala/analysis/DeleteStmt.java     |   2 +
 .../apache/impala/analysis/DescriptorTable.java    |  19 +-
 .../apache/impala/analysis/DmlStatementBase.java   |   3 +
 .../apache/impala/analysis/IcebergDeleteImpl.java  |  54 ++-
 .../apache/impala/analysis/IcebergModifyImpl.java  |  98 +++---
 .../apache/impala/analysis/IcebergUpdateImpl.java  | 196 +++++++++++
 .../org/apache/impala/analysis/InsertStmt.java     |  92 +----
 .../org/apache/impala/analysis/KuduDeleteImpl.java |   4 +-
 .../org/apache/impala/analysis/KuduModifyImpl.java | 132 ++++++-
 .../org/apache/impala/analysis/ModifyImpl.java     | 220 ++++--------
 .../org/apache/impala/analysis/ModifyStmt.java     |  12 +-
 .../org/apache/impala/analysis/OptimizeStmt.java   |  13 +-
 .../org/apache/impala/analysis/UpdateStmt.java     |  16 +-
 .../org/apache/impala/planner/HdfsTableSink.java   |  22 +-
 .../apache/impala/planner/IcebergDeleteSink.java   |  43 +--
 .../org/apache/impala/planner/MultiDataSink.java   | 127 +++++++
 .../java/org/apache/impala/planner/Planner.java    |  53 +--
 .../java/org/apache/impala/service/Frontend.java   |  33 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |   4 +
 .../main/java/org/apache/impala/util/ExprUtil.java |   7 +
 .../java/org/apache/impala/util/IcebergUtil.java   | 113 ++++++
 .../org/apache/impala/planner/PlannerTest.java     |   6 +
 shell/impala_client.py                             |  14 +-
 shell/impala_shell.py                              |   9 +-
 .../functional/functional_schema_template.sql      |   6 +-
 .../queries/PlannerTest/iceberg-v2-update.test     | 344 ++++++++++++++++++
 .../queries/PlannerTest/insert-sort-by-zorder.test |   3 +-
 .../queries/PlannerTest/insert.test                |   4 -
 .../queries/QueryTest/iceberg-negative.test        |  78 ++++-
 .../queries/QueryTest/iceberg-update-basic.test    | 388 +++++++++++++++++++++
 .../queries/QueryTest/ranger_column_masking.test   |  20 ++
 .../queries/QueryTest/ranger_row_filtering.test    |  21 ++
 tests/authorization/test_ranger.py                 |  11 +
 tests/query_test/test_iceberg.py                   |  68 +++-
 tests/stress/test_update_stress.py                 | 105 ++++++
 50 files changed, 2297 insertions(+), 422 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 3f4fb4c87..e7f27dc1a 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -83,6 +83,7 @@ add_library(Exec
   iceberg-delete-sink.cc
   incr-stats-util.cc
   join-builder.cc
+  multi-table-sink.cc
   nested-loop-join-builder.cc
   nested-loop-join-node.cc
   non-grouping-aggregator.cc
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index bb47b5837..dcf7d1894 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -28,6 +28,7 @@
 #include "exec/hdfs-table-sink.h"
 #include "exec/iceberg-delete-builder.h"
 #include "exec/iceberg-delete-sink.h"
+#include "exec/multi-table-sink.h"
 #include "exec/kudu/kudu-table-sink.h"
 #include "exec/kudu/kudu-util.h"
 #include "exec/nested-loop-join-builder.h"
@@ -125,6 +126,10 @@ Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
       *data_sink = pool->Add(new IcebergDeleteBuilderConfig());
       break;
     }
+    case TDataSinkType::MULTI_DATA_SINK: {
+      *data_sink = pool->Add(new MultiTableSinkConfig());
+      break;
+    }
     default:
       stringstream error_msg;
       map<int, const char*>::const_iterator i =
diff --git a/be/src/exec/iceberg-delete-sink.cc b/be/src/exec/iceberg-delete-sink.cc
index ca80218ca..1c6e21053 100644
--- a/be/src/exec/iceberg-delete-sink.cc
+++ b/be/src/exec/iceberg-delete-sink.cc
@@ -99,6 +99,8 @@ Status IcebergDeleteSink::Send(RuntimeState* state, RowBatch* batch) {
   // We don't do any work for an empty batch.
   if (batch->num_rows() == 0) return Status::OK();
 
+  RETURN_IF_ERROR(VerifyRowsNotDuplicated(batch));
+
   // If there are no partition keys then just pass the whole batch to one partition.
   if (dynamic_partition_key_expr_evals_.empty()) {
     if (current_partition_.first == nullptr) {
@@ -111,10 +113,39 @@ Status IcebergDeleteSink::Send(RuntimeState* state, RowBatch* batch) {
   return Status::OK();
 }
 
+Status IcebergDeleteSink::VerifyRowsNotDuplicated(RowBatch* batch) {
+  DCHECK_EQ(output_exprs_.size(), 2);
+  DCHECK_EQ(output_expr_evals_.size(), 2);
+
+  ScalarExpr* filepath_expr = output_exprs_[0];
+  ScalarExpr* position_expr = output_exprs_[1];
+  DCHECK(filepath_expr->type().IsStringType());
+  DCHECK(position_expr->type().IsIntegerType());
+
+  ScalarExprEvaluator* filepath_eval = output_expr_evals_[0];
+  ScalarExprEvaluator* position_eval = output_expr_evals_[1];
+  for (int i = 0; i < batch->num_rows(); ++i) {
+    TupleRow* row = batch->GetRow(i);
+    StringVal filepath_sv = filepath_eval->GetStringVal(row);
+    DCHECK(!filepath_sv.is_null);
+    BigIntVal position_bi = position_eval->GetBigIntVal(row);
+    DCHECK(!position_bi.is_null);
+    string filepath(reinterpret_cast<char*>(filepath_sv.ptr), filepath_sv.len);
+    int64_t position = position_bi.val;
+    if (prev_file_path_ == filepath && prev_position_ == position) {
+      return Status(Substitute("Duplicated row in DELETE sink. file_path='$0', pos='$1'. "
+          "If this is coming from an UPDATE statement with a JOIN, please check if there "
+          "multiple matches in the JOIN condition.", filepath, position));
+    }
+    prev_file_path_ = filepath;
+    prev_position_ = position;
+  }
+  return Status::OK();
+}
+
 inline Status IcebergDeleteSink::SetCurrentPartition(RuntimeState* state,
     const TupleRow* row, const string& key) {
   DCHECK(row != nullptr || key == ROOT_PARTITION_KEY);
-  PartitionMap::iterator existing_partition;
   if (current_partition_.first != nullptr &&
       key == current_clustered_partition_key_) {
     return Status::OK();
@@ -134,7 +165,7 @@ inline Status IcebergDeleteSink::SetCurrentPartition(RuntimeState* state,
 
   // Save the partition name so that the coordinator can create the partition
   // directory structure if needed.
-  state->dml_exec_state()->AddPartition(
+  dml_exec_state_.AddPartition(
       current_partition_.first->partition_name, prototype_partition_->id(),
       &table_desc_->hdfs_base_dir(),
       nullptr);
@@ -182,7 +213,7 @@ Status IcebergDeleteSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch*
         current_partition_.second.clear();
       }
       RETURN_IF_ERROR(FinalizePartitionFile(state,
-          current_partition_.first.get()));
+          current_partition_.first.get(), /*is_delete=*/true, &dml_exec_state_));
       if (current_partition_.first->writer.get() != nullptr) {
         current_partition_.first->writer->Close();
       }
@@ -207,7 +238,8 @@ Status IcebergDeleteSink::FlushFinal(RuntimeState* state) {
   SCOPED_TIMER(profile()->total_time_counter());
 
   if (current_partition_.first != nullptr) {
-    RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.first.get()));
+    RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.first.get(),
+        /*is_delete=*/true, &dml_exec_state_));
   }
   return Status::OK();
 }
@@ -216,6 +248,10 @@ void IcebergDeleteSink::Close(RuntimeState* state) {
   if (closed_) return;
   SCOPED_TIMER(profile()->total_time_counter());
 
+  DmlExecStatusPB dml_exec_proto;
+  dml_exec_state_.ToProto(&dml_exec_proto);
+  state->dml_exec_state()->Update(dml_exec_proto);
+
   if (current_partition_.first != nullptr) {
     if (current_partition_.first->writer != nullptr) {
       current_partition_.first->writer->Close();
diff --git a/be/src/exec/iceberg-delete-sink.h b/be/src/exec/iceberg-delete-sink.h
index 7206ae3b5..cb042cd50 100644
--- a/be/src/exec/iceberg-delete-sink.h
+++ b/be/src/exec/iceberg-delete-sink.h
@@ -78,6 +78,17 @@ class IcebergDeleteSink : public TableSinkBase {
       const TupleRow* row,
       OutputPartition* output_partition) override;
 
+  /// Verifies that the row batch does not contain duplicated rows. This can only happen
+  /// in the context of UPDATE FROM statements when we are updating a table based on
+  /// another table, e.g.:
+  /// UPDATE t SET t.x = s.x FROM ice_t t, source_tbl s where t.id = s.id;
+  /// Now, if 'source_tbl' has duplicate rows then the JOIN operator would produce
+  /// multiple matches for the same row, and we would insert them to the table.
+  /// Therefore, we should always raise an error if we find duplicated rows (i.e rows
+  /// having the same filepath + position), because that would corrupt the table data
+  /// and the delete files as well.
+  Status VerifyRowsNotDuplicated(RowBatch* batch);
+
   /// Returns the human-readable representation of a partition transform value. It is used
   /// to create the file paths. IcebergUtil.partitionDataFromDataFile() also expects
   /// partition values in this representation.
@@ -98,6 +109,15 @@ class IcebergDeleteSink : public TableSinkBase {
 
   /// The sink writes partitions one-by-one.
   PartitionPair current_partition_;
+
+  /// This sink has its own DmlExecState object because in the context of UPADTEs we
+  /// cannot modify the same DmlExecState object simultaneously (from the INSERT and
+  /// DELETE sinks). It is merged into state->dml_exec_state() in Close().
+  DmlExecState dml_exec_state_;
+
+  /// Variables necessary for validating that row batches don't contain duplicates.
+  std::string prev_file_path_;
+  int64_t prev_position_ = -1;
 };
 
 }
diff --git a/be/src/exec/multi-table-sink.cc b/be/src/exec/multi-table-sink.cc
new file mode 100644
index 000000000..3dd4b44c0
--- /dev/null
+++ b/be/src/exec/multi-table-sink.cc
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/object-pool.h"
+#include "exec/hdfs-table-sink.h"
+#include "exec/iceberg-delete-sink.h"
+#include "exec/multi-table-sink.h"
+#include "runtime/fragment-state.h"
+#include "runtime/runtime-state.h"
+
+namespace impala {
+
+Status MultiTableSinkConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+  for (const TDataSink& child_sink : tsink.child_data_sinks) {
+    // We only allow table sinks for now.
+    DCHECK(child_sink.__isset.table_sink);
+    DataSinkConfig* data_sink_config;
+    RETURN_IF_ERROR(DataSinkConfig::CreateConfig(child_sink, input_row_desc,
+        state, &data_sink_config));
+    DCHECK(data_sink_config != nullptr);
+    table_sink_configs_.push_back(static_cast<TableSinkBaseConfig*>(data_sink_config));
+  }
+  return Status::OK();
+}
+
+DataSink* MultiTableSinkConfig::CreateSink(RuntimeState* state) const {
+  TDataSinkId sink_id = state->fragment().idx;
+  return state->obj_pool()->Add(
+    new MultiTableSink(sink_id, *this, *tsink_, state));
+}
+
+MultiTableSink::MultiTableSink(TDataSinkId sink_id,
+    const MultiTableSinkConfig& sink_config, const TDataSink& dsink,
+    RuntimeState* state) : DataSink(sink_id, sink_config, "MultiTableSink", state) {
+  for (TableSinkBaseConfig* tbl_sink_config : sink_config.table_sink_configs()) {
+    TableSinkBase* tsink_base =
+        DCHECK_NOTNULL(dynamic_cast<TableSinkBase*>(tbl_sink_config->CreateSink(state)));
+    table_sinks_.push_back(tsink_base);
+    profile()->AddChild(tsink_base->profile());
+  }
+}
+
+Status MultiTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
+  for (TableSinkBase* tsink : table_sinks_) {
+    RETURN_IF_ERROR(tsink->Prepare(state, parent_mem_tracker));
+  }
+  return Status::OK();
+}
+
+Status MultiTableSink::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(DataSink::Open(state));
+  for (TableSinkBase* tsink : table_sinks_) {
+    RETURN_IF_ERROR(tsink->Open(state));
+  }
+  return Status::OK();
+}
+
+Status MultiTableSink::Send(RuntimeState* state, RowBatch* batch) {
+  for (TableSinkBase* tsink : table_sinks_) {
+    RETURN_IF_ERROR(tsink->Send(state, batch));
+  }
+  return Status::OK();
+}
+
+Status MultiTableSink::FlushFinal(RuntimeState* state) {
+  DCHECK(!closed_);
+  for (TableSinkBase* tsink : table_sinks_) {
+    RETURN_IF_ERROR(tsink->FlushFinal(state));
+  }
+  return Status::OK();
+}
+
+void MultiTableSink::Close(RuntimeState* state) {
+  for (TableSinkBase* tsink : table_sinks_) {
+    tsink->Close(state);
+  }
+  DataSink::Close(state);
+  DCHECK(closed_);
+}
+
+}
\ No newline at end of file
diff --git a/be/src/exec/multi-table-sink.h b/be/src/exec/multi-table-sink.h
new file mode 100644
index 000000000..05ca124f4
--- /dev/null
+++ b/be/src/exec/multi-table-sink.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "exec/data-sink.h"
+#include "exec/table-sink-base.h"
+
+namespace impala {
+
+class RowDescriptor;
+class RuntimeState;
+
+/// Configuration for creating multi table sink objects. A multi table sink has multiple
+/// child table sinks, so this holds the configurations of all the sinks.
+class MultiTableSinkConfig : public DataSinkConfig {
+ public:
+  /// Creates a new MultiTableSink object.
+  DataSink* CreateSink(RuntimeState* state) const override;
+
+  /// Returns the table sink configs of the child sinks.
+  const std::vector<TableSinkBaseConfig*> table_sink_configs() const {
+    return table_sink_configs_;
+  }
+
+  ~MultiTableSinkConfig() override {}
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      FragmentState* state) override;
+ private:
+   std::vector<TableSinkBaseConfig*> table_sink_configs_;
+};
+
+/// MultiTableSink has multiple child table sink objects. It sends the received row
+/// batches to all of its children.
+class MultiTableSink : public DataSink {
+ public:
+  MultiTableSink(TDataSinkId sink_id, const MultiTableSinkConfig& sink_config,
+      const TDataSink& dsink, RuntimeState* state);
+
+  //////////////////////////////////////
+  /// BEGIN: Following methods just delegate calls to the child sinks in 'table_sinks_'.
+  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
+
+  Status Open(RuntimeState* state) override;
+
+  Status Send(RuntimeState* state, RowBatch* batch) override;
+
+  Status FlushFinal(RuntimeState* state) override;
+
+  void Close(RuntimeState* state) override;
+  /// END: Methods above just delegate calls to the child sinks in 'table_sinks_'.
+  //////////////////////////////////////
+ private:
+  std::vector<TableSinkBase*> table_sinks_;
+};
+
+}
diff --git a/be/src/exec/table-sink-base.cc b/be/src/exec/table-sink-base.cc
index 3ec0d3bde..73285eb8c 100644
--- a/be/src/exec/table-sink-base.cc
+++ b/be/src/exec/table-sink-base.cc
@@ -414,18 +414,26 @@ bool TableSinkBase::ShouldSkipStaging(RuntimeState* state, OutputPartition* part
 }
 
 Status TableSinkBase::FinalizePartitionFile(
-    RuntimeState* state, OutputPartition* partition) {
+    RuntimeState* state, OutputPartition* partition, bool is_delete,
+    DmlExecState* dml_exec_state) {
+  if (dml_exec_state == nullptr) dml_exec_state = state->dml_exec_state();
   if (partition->tmp_hdfs_file == nullptr && !is_overwrite()) return Status::OK();
   SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));
 
   // OutputPartition writer could be nullptr if there is no row to output.
   if (partition->writer.get() != nullptr) {
     RETURN_IF_ERROR(partition->writer->Finalize());
-    state->dml_exec_state()->UpdatePartition(
+    dml_exec_state->UpdatePartition(
         partition->partition_name, partition->current_file_rows,
-        &partition->writer->stats());
-    state->dml_exec_state()->AddCreatedFile(*partition, IsIceberg(),
-        partition->writer->iceberg_file_stats());
+        &partition->writer->stats(), is_delete);
+    if (is_delete) {
+      DCHECK(IsIceberg());
+      dml_exec_state->AddCreatedDeleteFile(*partition,
+          partition->writer->iceberg_file_stats());
+    } else {
+      dml_exec_state->AddCreatedFile(*partition, IsIceberg(),
+          partition->writer->iceberg_file_stats());
+    }
   }
 
   RETURN_IF_ERROR(ClosePartitionFile(state, partition));
diff --git a/be/src/exec/table-sink-base.h b/be/src/exec/table-sink-base.h
index 14ce157da..85d4213c7 100644
--- a/be/src/exec/table-sink-base.h
+++ b/be/src/exec/table-sink-base.h
@@ -127,8 +127,8 @@ protected:
 
   /// Updates runtime stats of HDFS with rows written, then closes the file associated
   /// with the partition by calling ClosePartitionFile()
-  Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition)
-      WARN_UNUSED_RESULT;
+  Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition,
+      bool is_delete = false, DmlExecState* dml_exec_state = nullptr) WARN_UNUSED_RESULT;
 
   /// Writes all rows referenced by the row index vector in 'partition_pair' to the
   /// partition's writer and clears the row index vector afterwards.
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index 861ab4210..acf88ccdf 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -73,6 +73,9 @@ string DmlExecState::OutputPartitionStats(const string& prefix) {
     if (val.second.has_num_modified_rows()) {
       ss << "NumModifiedRows: " << val.second.num_modified_rows() << endl;
     }
+    if (val.second.has_num_deleted_rows()) {
+      ss << "NumDeletedRows: " << val.second.num_deleted_rows() << endl;
+    }
 
     if (!val.second.has_stats()) continue;
     const DmlStatsPB& stats = val.second.stats();
@@ -102,6 +105,8 @@ void DmlExecState::Update(const DmlExecStatusPB& dml_exec_status) {
     DmlPartitionStatusPB* status = &(per_partition_status_[part.first]);
     status->set_num_modified_rows(
         status->num_modified_rows() + part.second.num_modified_rows());
+    status->set_num_deleted_rows(
+      status->num_deleted_rows() + part.second.num_deleted_rows());
     status->set_kudu_latest_observed_ts(max<uint64_t>(
         part.second.kudu_latest_observed_ts(), status->kudu_latest_observed_ts()));
     status->set_id(part.second.id());
@@ -120,6 +125,13 @@ void DmlExecState::Update(const DmlExecStatusPB& dml_exec_status) {
       DCHECK(!file.staging_path().empty());
       files_to_move_[file.staging_path()] = file.final_path();
     }
+    for (int i = 0; i < part.second.created_delete_files_size(); ++i) {
+      const DmlFileStatusPb& file = part.second.created_delete_files(i);
+      *status->add_created_delete_files() = file;
+      if (!file.has_staging_path()) continue;
+      DCHECK(!file.staging_path().empty());
+      files_to_move_[file.staging_path()] = file.final_path();
+    }
   }
 }
 
@@ -149,6 +161,10 @@ bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
       const DmlFileStatusPb& file = partition.second.created_files(i);
       updatedPartition.files.push_back(file.final_path());
     }
+    for (int i = 0; i < partition.second.created_delete_files_size(); ++i) {
+      const DmlFileStatusPb& file = partition.second.created_delete_files(i);
+      updatedPartition.files.push_back(file.final_path());
+    }
     catalog_update->updated_partitions[partition.first] = updatedPartition;
   }
   return catalog_update->updated_partitions.size() != 0;
@@ -428,6 +444,10 @@ void DmlExecState::ToTDmlResult(TDmlResult* dml_result) {
   bool has_kudu_stats = false;
   for (const PartitionStatusMap::value_type& v: per_partition_status_) {
     dml_result->rows_modified[v.first] = v.second.num_modified_rows();
+    if (v.second.has_num_deleted_rows()) {
+      dml_result->__isset.rows_deleted = true;
+      dml_result->rows_deleted[v.first] = v.second.num_deleted_rows();
+    }
     if (v.second.has_stats() && v.second.stats().has_kudu_stats()) {
       has_kudu_stats = true;
     }
@@ -455,12 +475,17 @@ void DmlExecState::AddPartition(
 }
 
 void DmlExecState::UpdatePartition(const string& partition_name,
-    int64_t num_modified_rows_delta, const DmlStatsPB* insert_stats) {
+    int64_t num_rows_delta, const DmlStatsPB* insert_stats, bool is_delete) {
   lock_guard<mutex> l(lock_);
   PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
   DCHECK(entry != per_partition_status_.end());
-  entry->second.set_num_modified_rows(
-      entry->second.num_modified_rows() + num_modified_rows_delta);
+  if (is_delete) {
+    entry->second.set_num_deleted_rows(
+        entry->second.num_deleted_rows() + num_rows_delta);
+  } else {
+    entry->second.set_num_modified_rows(
+        entry->second.num_modified_rows() + num_rows_delta);
+  }
   if (insert_stats == nullptr) return;
   MergeDmlStats(*insert_stats, entry->second.mutable_stats());
 }
@@ -533,11 +558,26 @@ string createIcebergDataFileString(
 
 void DmlExecState::AddCreatedFile(const OutputPartition& partition, bool is_iceberg,
     const IcebergFileStats& insert_stats) {
+  AddFileAux(partition, is_iceberg, insert_stats, /*is_delete=*/false);
+}
+
+void DmlExecState::AddCreatedDeleteFile(const OutputPartition& partition,
+    const IcebergFileStats& insert_stats) {
+  AddFileAux(partition, /*is_iceberg=*/true, insert_stats, /*is_delete=*/true);
+}
+
+void DmlExecState::AddFileAux(const OutputPartition& partition, bool is_iceberg,
+    const IcebergFileStats& insert_stats, bool is_delete) {
   lock_guard<mutex> l(lock_);
   PartitionStatusMap::iterator entry =
       per_partition_status_.find(partition.partition_name);
   DCHECK(entry != per_partition_status_.end());
-  DmlFileStatusPb* file = entry->second.add_created_files();
+  DmlFileStatusPb* file;
+  if (is_delete) {
+    file = entry->second.add_created_delete_files();
+  } else {
+    file = entry->second.add_created_files();
+  }
   if (partition.current_file_final_name.empty()) {
     file->set_final_path(partition.current_file_name);
   } else {
@@ -567,6 +607,20 @@ vector<string> DmlExecState::CreateIcebergDataFilesVector() {
   return ret;
 }
 
+vector<string> DmlExecState::CreateIcebergDeleteFilesVector() {
+  vector<string> ret;
+  ret.reserve(per_partition_status_.size()); // min 1 file per partition
+  for (const PartitionStatusMap::value_type& partition : per_partition_status_) {
+    for (int i = 0; i < partition.second.created_delete_files_size(); ++i) {
+      const DmlFileStatusPb& file = partition.second.created_delete_files(i);
+      if (file.has_iceberg_data_file_fb()) {
+        ret.push_back(file.iceberg_data_file_fb());
+      }
+    }
+  }
+  return ret;
+}
+
 void DmlExecState::MergeDmlStats(const DmlStatsPB& src, DmlStatsPB* dst) {
   dst->set_bytes_written(dst->bytes_written() + src.bytes_written());
   if (src.has_kudu_stats()) {
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
index ecb150f3c..a2901b61a 100644
--- a/be/src/runtime/dml-exec-state.h
+++ b/be/src/runtime/dml-exec-state.h
@@ -69,14 +69,20 @@ class DmlExecState {
   /// Merge given values into stats for partition with name 'partition_name'.
   /// Ignores 'insert_stats' if nullptr.
   /// Requires that the partition already exist.
-  void UpdatePartition(const std::string& partition_name,
-      int64_t num_modified_rows_delta, const DmlStatsPB* insert_stats);
+  void UpdatePartition(const std::string& partition_name, int64_t num_rows_delta,
+      const DmlStatsPB* insert_stats, bool is_delete = false);
 
-  /// Extract information from 'partition', and add a new Iceberg data file.
+  /// Extract information from 'partition', and add a new data file.
   /// 'insert_stats' contains stats for the Iceberg data file.
   void AddCreatedFile(const OutputPartition& partition, bool is_iceberg,
       const IcebergFileStats& insert_stats);
 
+  /// Extract information from 'partition', and add a new delete file. This function
+  /// can only be called for Iceberg tables.
+  /// 'insert_stats' contains stats for the Iceberg delete file.
+  void AddCreatedDeleteFile(const OutputPartition& partition,
+      const IcebergFileStats& insert_stats);
+
   /// Used to initialize this state when execute Kudu DML. Must be called before
   /// SetKuduDmlStats().
   void InitForKuduDml();
@@ -114,10 +120,17 @@ class DmlExecState {
   /// Beeswax.
   void ToTDmlResult(TDmlResult* dml_result);
 
-  // Encodes file list info in flatbuffer format expected by Iceberg API.
+  // Encodes data file list info in flatbuffer format expected by Iceberg API.
   std::vector<std::string> CreateIcebergDataFilesVector();
 
+  // Encodes delete file list info in flatbuffer format expected by Iceberg API.
+  std::vector<std::string> CreateIcebergDeleteFilesVector();
+
  private:
+  /// Auxiliary function used by 'AddCreatedFile' and 'AddCreatedDeleteFile'.
+  void AddFileAux(const OutputPartition& partition, bool is_iceberg,
+    const IcebergFileStats& insert_stats, bool is_delete);
+
   /// protects all fields below
   std::mutex lock_;
 
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 0f24c0717..80ebe0326 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1514,7 +1514,7 @@ Status ClientRequestState::UpdateCatalog() {
   query_events_->MarkEvent("DML data written");
   SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
 
-  TQueryExecRequest query_exec_request = exec_request_->query_exec_request;
+  const TQueryExecRequest& query_exec_request = exec_request_->query_exec_request;
   if (query_exec_request.__isset.finalize_params) {
     const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
     TUpdateCatalogRequest catalog_update;
@@ -1562,7 +1562,12 @@ Status ClientRequestState::UpdateCatalog() {
           cat_ice_op.__set_is_overwrite(finalize_params.is_overwrite);
         } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) {
           cat_ice_op.__set_iceberg_delete_files_fb(
+              dml_exec_state->CreateIcebergDeleteFilesVector());
+        } else if (ice_finalize_params.operation == TIcebergOperation::UPDATE) {
+          cat_ice_op.__set_iceberg_data_files_fb(
               dml_exec_state->CreateIcebergDataFilesVector());
+          cat_ice_op.__set_iceberg_delete_files_fb(
+              dml_exec_state->CreateIcebergDeleteFilesVector());
         }
       }
 
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 40c329f8f..78d4222ab 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -78,21 +78,27 @@ message DmlPartitionStatusPB {
   // The number of rows modified in this partition
   optional int64 num_modified_rows = 2;
 
+  // The number of rows deleted in this partition
+  optional int64 num_deleted_rows = 3;
+
   // Detailed statistics gathered by table writers for this partition
-  optional DmlStatsPB stats = 3;
+  optional DmlStatsPB stats = 4;
 
   // Fully qualified URI to the base directory for this partition.
-  optional string partition_base_dir = 4;
+  optional string partition_base_dir = 5;
 
   // The latest observed Kudu timestamp reported by the local KuduSession.
   // This value is an unsigned int64.
-  optional int64 kudu_latest_observed_ts = 5;
+  optional int64 kudu_latest_observed_ts = 6;
 
   // List of files created during the DML statement in this partition.
-  repeated DmlFileStatusPb created_files = 6;
+  repeated DmlFileStatusPb created_files = 7;
+
+  // List of delete files created during the DML statement in this partition.
+  repeated DmlFileStatusPb created_delete_files = 8;
 
   // Fully qualified URI to the staging directory for this partition.
-  optional string staging_dir_to_clean_up = 7;
+  optional string staging_dir_to_clean_up = 9;
 }
 
 // The results of a DML statement, sent to the coordinator as part of
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 325eae58a..ba071d9dd 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -33,6 +33,7 @@ enum TDataSinkType {
   PLAN_ROOT_SINK = 3
   NESTED_LOOP_JOIN_BUILDER = 4
   ICEBERG_DELETE_BUILDER = 5
+  MULTI_DATA_SINK = 6
 }
 
 enum TSinkAction {
@@ -183,4 +184,7 @@ struct TDataSink {
 
   // Resource profile for this data sink. Always set.
   9: optional ResourceProfile.TBackendResourceProfile resource_profile
+
+  // Child data sinks if this is a MULTI_DATA_SINK.
+  10: optional list<TDataSink> child_data_sinks
 }
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 336e6ebed..13c6c4e34 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -881,6 +881,7 @@ struct TDmlResult {
   // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with
   // the root in an unpartitioned table being the empty string.
   1: required map<string, i64> rows_modified
+  3: optional map<string, i64> rows_deleted
 
   // Number of row operations attempted but not completed due to non-fatal errors
   // reported by the storage engine that Impala treats as warnings. Only applies to Kudu
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index f0fc2626c..aaecb0602 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -113,8 +113,9 @@ enum TStmtType {
 }
 
 enum TIcebergOperation {
-  INSERT = 0,
+  INSERT = 0
   DELETE = 1
+  UPDATE = 2
 }
 
 // Level of verboseness for "explain" output.
diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
index a02170fb6..0967109a5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java
@@ -64,10 +64,12 @@ public class DeleteStmt extends ModifyStmt {
     }
   }
 
+  @Override
   public DataSink createDataSink() {
     return modifyImpl_.createDataSink();
   }
 
+  @Override
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
     modifyImpl_.substituteResultExprs(smap, analyzer);
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
index 95ba0d084..4127fa794 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DescriptorTable.java
@@ -56,6 +56,9 @@ public class DescriptorTable {
   // Table id 0 is reserved for it. Set in QueryStmt.analyze() that produces a table sink,
   // e.g. InsertStmt.analyze(), ModifyStmt.analyze().
   private FeTable targetTable_;
+  // Sometimes we have multiple target tables (e.g. Iceberg UPDATEs), in which case
+  // we can reserve multiple ids for the target tables.
+  private final Map<FeTable, Integer> additionalTargetTableIds_ = new HashMap<>();
   // For each table, the set of partitions that are referenced by at least one
   // scan range.
   private final Map<FeTable, Set<Long>> referencedPartitionsPerTable_ = new HashMap<>();
@@ -112,6 +115,12 @@ public class DescriptorTable {
 
   public void setTargetTable(FeTable table) { targetTable_ = table; }
 
+  public int addTargetTable(FeTable table) {
+    int id = nextTableId_++;
+    additionalTargetTableIds_.put(table, id);
+    return id;
+  }
+
   /**
    * Find the set of referenced partitions for the given table.  Allocates a set if
    * none has been allocated for the table yet.
@@ -183,6 +192,12 @@ public class DescriptorTable {
       tableIdMap.put(targetTable_, TABLE_SINK_ID);
       referencedTables.put(targetTable_.getTableName(), targetTable_);
     }
+    for (Map.Entry<FeTable, Integer> tableIdEntry :
+        additionalTargetTableIds_.entrySet()) {
+      FeTable targetTable = tableIdEntry.getKey();
+      tableIdMap.put(targetTable, tableIdEntry.getValue());
+      referencedTables.put(targetTable.getTableName(), targetTable);
+    }
     for (TupleDescriptor tupleDesc: tupleDescs_.values()) {
       // inline view of a non-constant select has a non-materialized tuple descriptor
       // in the descriptor table just for type checking, which we need to skip
@@ -214,7 +229,9 @@ public class DescriptorTable {
     for (FeTable tbl: tableIdMap.keySet()) {
       Set<Long> referencedPartitions = null; // null means include all partitions.
       // We don't know which partitions are needed for INSERT, so do not prune partitions.
-      if (tbl != targetTable_) referencedPartitions = getReferencedPartitions(tbl);
+      if (tbl != targetTable_ && !additionalTargetTableIds_.containsKey(tbl)) {
+        referencedPartitions = getReferencedPartitions(tbl);
+      }
       result.addToTableDescriptors(
           tbl.toThriftDescriptor(tableIdMap.get(tbl), referencedPartitions));
     }
diff --git a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
index cf3dfd63d..d2be81dd4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
@@ -21,6 +21,7 @@ import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 
 import com.google.common.base.Preconditions;
+import org.apache.impala.planner.DataSink;
 
 import java.util.List;
 
@@ -62,6 +63,8 @@ public abstract class DmlStatementBase extends StatementBase {
   public boolean hasClusteredHint() { return false; }
   public boolean hasNoClusteredHint() { return false; }
 
+  abstract public DataSink createDataSink();
+  abstract public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer);
   abstract public List<Expr> getPartitionKeyExprs();
   abstract public List<Expr> getSortExprs();
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
index f32ce3ff2..6bc51d66f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
@@ -17,28 +17,70 @@
 
 package org.apache.impala.analysis;
 
+import java.util.List;
+
 import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.IcebergDeleteSink;
 import org.apache.impala.planner.TableSink;
 import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import org.apache.impala.util.ExprUtil;
 
 public class IcebergDeleteImpl extends IcebergModifyImpl {
   public IcebergDeleteImpl(ModifyStmt modifyStmt) {
     super(modifyStmt);
   }
 
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    // Make the virtual position delete table the new target table.
+    modifyStmt_.setTargetTable(icePosDelTable_);
+
+    String deleteMode = originalTargetTable_.getIcebergApiTable().properties().get(
+      org.apache.iceberg.TableProperties.DELETE_MODE);
+    if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
+      throw new AnalysisException(String.format("Unsupported delete mode: '%s' for " +
+          "Iceberg table: %s", deleteMode, originalTargetTable_.getFullName()));
+    }
+
+    Expr wherePredicate = modifyStmt_.getWherePredicate();
+    if (wherePredicate == null ||
+        org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate)) {
+      // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE TABLE t;
+      throw new AnalysisException("For deleting every row, please use TRUNCATE.");
+    }
+  }
+
+  @Override
+  protected void buildAndValidateSelectExprs(Analyzer analyzer,
+      List<SelectListItem> selectList)
+      throws AnalysisException {
+    deletePartitionKeyExprs_ = getDeletePartitionExprs(analyzer);
+    deleteResultExprs_ = getDeleteResultExprs(analyzer);
+    selectList.addAll(ExprUtil.exprsAsSelectList(deletePartitionKeyExprs_));
+    selectList.addAll(ExprUtil.exprsAsSelectList(deleteResultExprs_));
+    sortExprs_.addAll(deleteResultExprs_);
+  }
+
+  @Override
+  public List<Expr> getPartitionKeyExprs() { return deletePartitionKeyExprs_; }
+
+  @Override
+  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+    // No-op
+  }
+
   @Override
   public DataSink createDataSink() {
     Preconditions.checkState(modifyStmt_.table_ instanceof FeIcebergTable);
-    TableSink tableSink = TableSink.create(modifyStmt_.table_, TableSink.Op.DELETE,
-        partitionKeyExprs_, resultExprs_, getReferencedColumns(), false, false,
-        new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, null,
-        modifyStmt_.maxTableSinks_);
-    Preconditions.checkState(!getReferencedColumns().isEmpty());
-    return tableSink;
+    return new IcebergDeleteSink(icePosDelTable_, deletePartitionKeyExprs_,
+        deleteResultExprs_);
   }
 }
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
index 30b4c375e..5a816c6c0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergModifyImpl.java
@@ -22,14 +22,33 @@ import org.apache.impala.catalog.IcebergPositionDeleteTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TIcebergFileFormat;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+
+import com.google.common.collect.Lists;
 
 abstract class IcebergModifyImpl extends ModifyImpl {
   FeIcebergTable originalTargetTable_;
   IcebergPositionDeleteTable icePosDelTable_;
 
+  /////////////////////////////////////////
+  // START: Members that are set in buildAndValidateSelectExprs().
+
+  // All Iceberg modify statements (DELETE, UPDATE) need delete exprs.
+  protected List<Expr> deleteResultExprs_ = new ArrayList<>();
+  protected List<Expr> deletePartitionKeyExprs_ = new ArrayList<>();
+
+  // For every column of the target table that is referenced in the optional
+  // 'sort.columns' table property, this list will contain the corresponding result expr
+  // from 'resultExprs_'. Before insertion, all rows will be sorted by these exprs. If the
+  // list is empty, no additional sorting by non-partitioning columns will be performed.
+  // The column list must not contain partition columns and must be empty for non-Hdfs
+  // tables.
+  protected List<Expr> sortExprs_ = new ArrayList<>();
+  // END: Members that are set in buildAndValidateSelectExprs().
+  /////////////////////////////////////////
+
   public IcebergModifyImpl(ModifyStmt modifyStmt) {
     super(modifyStmt);
     originalTargetTable_ = (FeIcebergTable)modifyStmt_.getTargetTable();
@@ -38,64 +57,59 @@ abstract class IcebergModifyImpl extends ModifyImpl {
 
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
-    // Make the virtual position delete table the new target table.
-    modifyStmt_.setTargetTable(icePosDelTable_);
     modifyStmt_.setMaxTableSinks(analyzer.getQueryOptions().getMax_fs_writers());
-    if (modifyStmt_ instanceof UpdateStmt) {
-      throw new AnalysisException("UPDATE is not supported for Iceberg table " +
-          originalTargetTable_.getFullName());
-    }
 
     if (icePosDelTable_.getFormatVersion() == 1) {
       throw new AnalysisException("Iceberg V1 table do not support DELETE/UPDATE " +
           "operations: " + originalTargetTable_.getFullName());
     }
 
-    String deleteMode = originalTargetTable_.getIcebergApiTable().properties().get(
-        org.apache.iceberg.TableProperties.DELETE_MODE);
-    if (deleteMode != null && !deleteMode.equals("merge-on-read")) {
-      throw new AnalysisException(String.format("Unsupported delete mode: '%s' for " +
-          "Iceberg table: %s", deleteMode, originalTargetTable_.getFullName()));
-    }
-
     if (originalTargetTable_.getDeleteFileFormat() != TIcebergFileFormat.PARQUET) {
       throw new AnalysisException("Impala can only write delete files in PARQUET, " +
           "but the given table uses a different file format: " +
           originalTargetTable_.getFullName());
     }
-
-    Expr wherePredicate = modifyStmt_.getWherePredicate();
-    if (wherePredicate == null ||
-        org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate)) {
-      // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE TABLE t;
-      throw new AnalysisException("For deleting every row, please use TRUNCATE.");
-    }
   }
 
   @Override
-  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
-      throws AnalysisException {
+  public List<Expr> getSortExprs() {
+    return sortExprs_;
   }
 
   @Override
-  public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-      Map<String, Integer> colIndexMap) throws AnalysisException {
-    if (originalTargetTable_.isPartitioned()) {
-      String[] partitionCols;
-      partitionCols = new String[] {"PARTITION__SPEC__ID",
-          "ICEBERG__PARTITION__SERIALIZED"};
-      for (String k : partitionCols) {
-        addPartitioningColumn(analyzer, selectList, referencedColumns, uniqueSlots,
-            keySlots, colIndexMap, k);
-      }
-    }
-    String[] deleteCols;
-    deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"};
-    // Add the key columns as slot refs
-    for (String k : deleteCols) {
-      addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
-          colIndexMap, k, true);
+  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    sortExprs_ = Expr.substituteList(sortExprs_, smap, analyzer, true);
+    deleteResultExprs_ = Expr.substituteList(deleteResultExprs_, smap, analyzer, true);
+    deletePartitionKeyExprs_ = Expr.substituteList(
+        deletePartitionKeyExprs_, smap, analyzer, true);
+  }
+
+  public List<Expr> getDeletePartitionExprs(Analyzer analyzer) throws AnalysisException {
+    if (!originalTargetTable_.isPartitioned()) return Collections.emptyList();
+    return getSlotRefs(analyzer, Lists.newArrayList(
+        "PARTITION__SPEC__ID", "ICEBERG__PARTITION__SERIALIZED"));
+  }
+
+  public List<Expr> getDeleteResultExprs(Analyzer analyzer) throws AnalysisException {
+    return getSlotRefs(analyzer, Lists.newArrayList(
+        "INPUT__FILE__NAME", "FILE__POSITION"));
+  }
+
+  private List<Expr> getSlotRefs(Analyzer analyzer, List<String> cols)
+      throws AnalysisException {
+    List<Expr> ret = new ArrayList<>();
+    for (String col : cols) {
+      ret.add(createSlotRef(analyzer, col));
     }
+    return ret;
+  }
+
+  protected SlotRef createSlotRef(Analyzer analyzer, String colName)
+      throws AnalysisException {
+    List<String> path = Path.createRawPath(modifyStmt_.targetTableRef_.getUniqueAlias(),
+        colName);
+    SlotRef ref = new SlotRef(path);
+    ref.analyze(analyzer);
+    return ref;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
new file mode 100644
index 000000000..fc287313b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import static java.lang.String.format;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.iceberg.TableProperties;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergColumn;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.IcebergDeleteSink;
+import org.apache.impala.planner.MultiDataSink;
+import org.apache.impala.planner.TableSink;
+import org.apache.impala.thrift.TIcebergFileFormat;
+import org.apache.impala.thrift.TSortingOrder;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.impala.util.ExprUtil;
+import org.apache.impala.util.IcebergUtil;
+
+public class IcebergUpdateImpl extends IcebergModifyImpl {
+  // Id of the delete table in the descriptor table. Set in analyze().
+  private int deleteTableId_ = -1;
+
+  /////////////////////////////////////////
+  // START: Members that are set in buildAndValidateSelectExprs().
+  private List<Expr> insertResultExprs_ = new ArrayList<>();
+  private List<Expr> insertPartitionKeyExprs_ = new ArrayList<>();
+
+  // END: Members that are set in buildAndValidateSelectExprs().
+  /////////////////////////////////////////
+
+  public IcebergUpdateImpl(ModifyStmt modifyStmt) {
+    super(modifyStmt);
+  }
+
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    super.analyze(analyzer);
+    deleteTableId_ = analyzer.getDescTbl().addTargetTable(icePosDelTable_);
+    IcebergUtil.validateIcebergColumnsForInsert(originalTargetTable_);
+    if (originalTargetTable_.getPartitionSpecs().size() > 1) {
+      throw new AnalysisException(
+          String.format("Table '%s' has multiple partition specs, therefore " +
+              "cannot be used as a target table in an UPDATE statement",
+              originalTargetTable_.getFullName()));
+    }
+    String updateMode = originalTargetTable_.getIcebergApiTable().properties().get(
+        TableProperties.UPDATE_MODE);
+    if (updateMode != null && !updateMode.equals("merge-on-read")) {
+      throw new AnalysisException(String.format("Unsupported update mode: '%s' for " +
+          "Iceberg table: %s", updateMode, originalTargetTable_.getFullName()));
+    }
+    for (Column c : originalTargetTable_.getColumns()) {
+      if (c.getType().isComplexType()) {
+        throw new AnalysisException(String.format("Impala does not support updating " +
+            "tables with complex types. Table '%s' has column '%s' " +
+            "with type: %s", originalTargetTable_.getFullName(), c.getName(),
+            c.getType().toSql()));
+      }
+    }
+    Pair<List<Integer>, TSortingOrder> sortProperties =
+        AlterTableSetTblProperties.analyzeSortColumns(originalTargetTable_,
+            originalTargetTable_.getMetaStoreTable().getParameters());
+    if (!sortProperties.first.isEmpty()) {
+      throw new AnalysisException(String.format("Impala does not support updating " +
+              "sorted tables. Data files in table '%s' are sorted by the " +
+              "following column(s): %s", originalTargetTable_.getFullName(),
+          originalTargetTable_.getMetaStoreTable().getParameters().get(
+              AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS)));
+    }
+    if (originalTargetTable_.getIcebergFileFormat() != TIcebergFileFormat.PARQUET) {
+      throw new AnalysisException(String.format("Impala can only write Parquet data " +
+          "files, while table '%s' expects '%s' data files.",
+          originalTargetTable_.getFullName(),
+          originalTargetTable_.getIcebergFileFormat().toString()));
+    }
+  }
+
+  @Override
+  protected void buildAndValidateSelectExprs(Analyzer analyzer,
+      List<SelectListItem> selectList) throws AnalysisException {
+    Map<Integer, Expr> colToExprs = new HashMap<>();
+
+    for (Pair<SlotRef, Expr> valueAssignment : modifyStmt_.assignments_) {
+      SlotRef lhsSlotRef = valueAssignment.first;
+      lhsSlotRef.analyze(analyzer);
+
+      Expr rhsExpr = valueAssignment.second;
+      checkSubQuery(lhsSlotRef, rhsExpr);
+      rhsExpr.analyze(analyzer);
+
+      checkCorrectTargetTable(lhsSlotRef, rhsExpr);
+      checkLhsIsColumnRef(lhsSlotRef, rhsExpr);
+
+      IcebergColumn c = (IcebergColumn)lhsSlotRef.getResolvedPath().destColumn();
+      rhsExpr = checkTypeCompatiblity(analyzer, c, rhsExpr);
+      if (IcebergUtil.isPartitionColumn(
+          c, originalTargetTable_.getDefaultPartitionSpec())) {
+        throw new AnalysisException(
+            String.format("Left-hand side in assignment '%s = %s' refers to a " +
+                "partitioning column", lhsSlotRef.toSql(), rhsExpr.toSql()));
+      }
+
+      checkLhsOnlyAppearsOnce(colToExprs, c, lhsSlotRef, rhsExpr);
+      colToExprs.put(c.getPosition(), rhsExpr);
+    }
+
+    List<Column> columns = modifyStmt_.table_.getColumns();
+    for (Column col : columns) {
+      Expr expr = colToExprs.get(col.getPosition());
+      if (expr == null) expr = createSlotRef(analyzer, col.getName());
+      insertResultExprs_.add(expr);
+    }
+    selectList.addAll(ExprUtil.exprsAsSelectList(insertResultExprs_));
+    IcebergUtil.populatePartitionExprs(analyzer, null, columns,
+        insertResultExprs_, originalTargetTable_, insertPartitionKeyExprs_, null);
+
+    deletePartitionKeyExprs_ = getDeletePartitionExprs(analyzer);
+    deleteResultExprs_ = getDeleteResultExprs(analyzer);
+    selectList.addAll(ExprUtil.exprsAsSelectList(deletePartitionKeyExprs_));
+    selectList.addAll(ExprUtil.exprsAsSelectList(deleteResultExprs_));
+
+    sortExprs_.addAll(deleteResultExprs_);
+  }
+
+  @Override
+  public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
+      throws AnalysisException {
+    // Cast result expressions to the correct type of the referenced slot of the
+    // target table.
+    List<Column> columns = modifyStmt_.table_.getColumns();
+    for (int i = 0; i < insertResultExprs_.size(); ++i) {
+      Column col = columns.get(i);
+      Expr resultExpr = sourceStmt_.resultExprs_.get(i);
+      if (!col.getType().equals(resultExpr.getType())) {
+        Expr castTo = resultExpr.castTo(col.getType());
+        sourceStmt_.resultExprs_.set(i, castTo);
+      }
+    }
+  }
+
+  @Override
+  public List<Expr> getPartitionKeyExprs() {
+    return deletePartitionKeyExprs_;
+  }
+
+  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    super.substituteResultExprs(smap, analyzer);
+    insertResultExprs_ = Expr.substituteList(insertResultExprs_, smap, analyzer, true);
+    insertPartitionKeyExprs_ = Expr.substituteList(
+        insertPartitionKeyExprs_, smap, analyzer, true);
+  }
+
+  @Override
+  public DataSink createDataSink() {
+    // analyze() must have been called before.
+    Preconditions.checkState(modifyStmt_.table_ instanceof FeIcebergTable);
+
+    TableSink insertSink = TableSink.create(modifyStmt_.table_, TableSink.Op.INSERT,
+        insertPartitionKeyExprs_, insertResultExprs_, Collections.emptyList(), false,
+        false, new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, null,
+        modifyStmt_.maxTableSinks_);
+    TableSink deleteSink = new IcebergDeleteSink(
+        icePosDelTable_, deletePartitionKeyExprs_, deleteResultExprs_, deleteTableId_);
+
+    MultiDataSink ret = new MultiDataSink();
+    ret.addDataSink(insertSink);
+    ret.addDataSink(deleteSink);
+    return ret;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 433164ddf..d4e8d6579 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -607,7 +607,7 @@ public class InsertStmt extends DmlStatementBase {
         }
         validateBucketTransformForOverwrite(iceTable);
       }
-      validateIcebergColumnsForInsert(iceTable);
+      IcebergUtil.validateIcebergColumnsForInsert(iceTable);
     }
 
     if (isHBaseTable && overwrite_) {
@@ -615,20 +615,6 @@ public class InsertStmt extends DmlStatementBase {
     }
   }
 
-  private void validateIcebergColumnsForInsert(FeIcebergTable iceTable)
-      throws AnalysisException {
-    for (Types.NestedField field : iceTable.getIcebergSchema().columns()) {
-      org.apache.iceberg.types.Type iceType = field.type();
-      if (iceType.isPrimitiveType() && iceType instanceof Types.TimestampType) {
-        Types.TimestampType tsType = (Types.TimestampType)iceType;
-        if (tsType.shouldAdjustToUTC()) {
-          throw new AnalysisException("The Iceberg table has a TIMESTAMPTZ " +
-              "column that Impala cannot write.");
-        }
-      }
-    }
-  }
-
   /**
    * Validate if INSERT OVERWRITE could be allowed when the table has bucket partition
    * transform. 'INSERT OVERWRITE tbl SELECT * FROM tbl' can be allowed because the source
@@ -901,8 +887,9 @@ public class InsertStmt extends DmlStatementBase {
 
     if (isIcebergTarget()) {
       // Add partition key expressions in the order of the Iceberg partition fields.
-      addIcebergPartExprs(analyzer, widestTypeExprList, selectExprTargetColumns,
-          selectListExprs, icebergPartSpec);
+      IcebergUtil.populatePartitionExprs(
+          analyzer, widestTypeExprList, selectExprTargetColumns, selectListExprs,
+          (FeIcebergTable)table_, partitionKeyExprs_, partitionColPos_);
     } else {
       // Reorder the partition key exprs and names to be consistent with the target table
       // declaration, and store their column positions.  We need those exprs in the
@@ -1032,76 +1019,6 @@ public class InsertStmt extends DmlStatementBase {
     }
   }
 
-  /**
-   * Adds partition key expressions in the order of Iceberg partition spec fields.
-   */
-  private void addIcebergPartExprs(Analyzer analyzer, List<Expr> widestTypeExprList,
-      List<Column> selectExprTargetColumns, List<Expr> selectListExprs,
-      IcebergPartitionSpec icebergPartSpec) throws AnalysisException {
-    if (!icebergPartSpec.hasPartitionFields()) return;
-    for (IcebergPartitionField partField : icebergPartSpec.getIcebergPartitionFields()) {
-      if (partField.getTransformType() == TIcebergPartitionTransformType.VOID) continue;
-      for (int i = 0; i < selectListExprs.size(); ++i) {
-        IcebergColumn targetColumn = (IcebergColumn)selectExprTargetColumns.get(i);
-        if (targetColumn.getFieldId() != partField.getSourceId()) continue;
-        // widestTypeExpr is widest type expression for column i
-        Expr widestTypeExpr =
-            (widestTypeExprList != null) ? widestTypeExprList.get(i) : null;
-        Expr compatibleExpr = checkTypeCompatibility(targetTableName_.toString(),
-            targetColumn, selectListExprs.get(i), analyzer, widestTypeExpr);
-        Expr icebergPartitionTransformExpr =
-            getIcebergPartitionTransformExpr(partField, compatibleExpr);
-        partitionKeyExprs_.add(icebergPartitionTransformExpr);
-        partitionColPos_.add(targetColumn.getPosition());
-        break;
-      }
-    }
-  }
-
-  /**
-   * Returns the partition transform expression. E.g. if the partition transform is DAY,
-   * it returns 'to_date(compatibleExpr)'. If the partition transform is BUCKET,
-   * it returns 'iceberg_bucket_transform(compatibleExpr, transformParam)'.
-   */
-  private Expr getIcebergPartitionTransformExpr(IcebergPartitionField partField,
-      Expr compatibleExpr) {
-    String funcNameStr = transformTypeToFunctionName(partField.getTransformType());
-    if (funcNameStr == null || funcNameStr.equals("")) return compatibleExpr;
-    FunctionName fnName = new FunctionName(BuiltinsDb.NAME, funcNameStr);
-    List<Expr> paramList = new ArrayList<>();
-    paramList.add(compatibleExpr);
-    Integer transformParam = partField.getTransformParam();
-    if (transformParam != null) {
-      paramList.add(NumericLiteral.create(transformParam));
-    }
-    if (partField.getTransformType() == TIcebergPartitionTransformType.MONTH) {
-      paramList.add(new StringLiteral("yyyy-MM"));
-    }
-    else if (partField.getTransformType() == TIcebergPartitionTransformType.HOUR) {
-      paramList.add(new StringLiteral("yyyy-MM-dd-HH"));
-    }
-    FunctionCallExpr fnCall = new FunctionCallExpr(fnName, new FunctionParams(paramList));
-    fnCall.setIsInternalFnCall(true);
-    return fnCall;
-  }
-
-  /**
-   * Returns the builtin function to use for the given partition transform.
-   */
-  private static String transformTypeToFunctionName(
-      TIcebergPartitionTransformType transformType) {
-    switch (transformType) {
-      case IDENTITY: return "";
-      case HOUR:
-      case MONTH: return "from_timestamp";
-      case DAY: return "to_date";
-      case YEAR: return "year";
-      case BUCKET: return "iceberg_bucket_transform";
-      case TRUNCATE: return "iceberg_truncate_transform";
-    }
-    return "";
-  }
-
   private static Set<String> getKuduPartitionColumnNames(FeKuduTable table) {
     Set<String> ret = new HashSet<>();
     for (KuduPartitionParam partitionParam : table.getPartitionBy()) {
@@ -1246,6 +1163,7 @@ public class InsertStmt extends DmlStatementBase {
    * key expressions with smap. Preserves the original types of those expressions during
    * the substitution.
    */
+  @Override
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
     resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
     partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
index 54625fb03..f1249ccfd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduDeleteImpl.java
@@ -26,6 +26,8 @@ import org.apache.impala.thrift.TSortingOrder;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.util.Collections;
+
 public class KuduDeleteImpl extends KuduModifyImpl {
   public KuduDeleteImpl(ModifyStmt modifyStmt) {
     super(modifyStmt);
@@ -36,7 +38,7 @@ public class KuduDeleteImpl extends KuduModifyImpl {
     // analyze() must have been called before.
     Preconditions.checkState(modifyStmt_.table_ instanceof FeKuduTable);
     TableSink tableSink = TableSink.create(modifyStmt_.table_, TableSink.Op.DELETE,
-        partitionKeyExprs_, resultExprs_, getReferencedColumns(), false, false,
+        Collections.emptyList(), resultExprs_, getReferencedColumns(), false, false,
         new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1,
         modifyStmt_.getKuduTransactionToken(),
         modifyStmt_.maxTableSinks_);
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
index 76290e977..8191d4725 100644
--- a/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduModifyImpl.java
@@ -17,10 +17,18 @@
 
 package org.apache.impala.analysis;
 
+import static java.lang.String.format;
+
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeKuduTable;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +37,24 @@ abstract class KuduModifyImpl extends ModifyImpl {
   // Target Kudu table.
   FeKuduTable kuduTable_;
 
+  /////////////////////////////////////////
+  // START: Members that are set in buildAndValidateSelectExprs().
+
+  // Output expressions that produce the final results to write to the target table. May
+  // include casts.
+  //
+  // In case of DELETE statements it contains the columns that identify the deleted
+  // rows (Kudu primary keys, Iceberg file_path / position).
+  protected List<Expr> resultExprs_ = new ArrayList<>();
+
+  // Position mapping of output expressions of the sourceStmt_ to column indices in the
+  // target table. The i'th position in this list maps to the referencedColumns_[i]'th
+  // position in the target table.
+  protected List<Integer> referencedColumns_ = new ArrayList<>();
+
+  // END: Members that are set in buildAndValidateSelectExprs().
+  /////////////////////////////////////////
+
   public KuduModifyImpl(ModifyStmt modifyStmt) {
     super(modifyStmt);
     kuduTable_ = (FeKuduTable)modifyStmt.getTargetTable();
@@ -37,6 +63,109 @@ abstract class KuduModifyImpl extends ModifyImpl {
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {}
 
+  /**
+   * Validates the list of value assignments that should be used to modify the target
+   * table. It verifies that only those columns are referenced that belong to the target
+   * table, no key columns are modified, and that a single column is not modified multiple
+   * times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list of
+   * SelectListItems to the out parameter selectList that is used to build the select list
+   * for sourceStmt_. A list of integers indicating the column position of an entry in the
+   * select list in the target table is written to the field referencedColumns_.
+   *
+   * In addition to the expressions that are generated for each assignment, the
+   * expression list contains an expression for each key column. The key columns
+   * are always prepended to the list of expression representing the assignments.
+   */
+  @Override
+  protected void buildAndValidateSelectExprs(Analyzer analyzer,
+      List<SelectListItem> selectList)
+      throws AnalysisException {
+    // The order of the referenced columns equals the order of the result expressions
+    Set<SlotId> uniqueSlots = new HashSet<>();
+    Set<SlotId> keySlots = new HashSet<>();
+
+    // Mapping from column name to index
+    List<Column> cols = modifyStmt_.table_.getColumnsInHiveOrder();
+    Map<String, Integer> colIndexMap = new HashMap<>();
+    for (int i = 0; i < cols.size(); i++) {
+      colIndexMap.put(cols.get(i).getName(), i);
+    }
+
+    addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
+        keySlots, colIndexMap);
+
+    // Assignments are only used in the context of updates.
+    for (Pair<SlotRef, Expr> valueAssignment : modifyStmt_.assignments_) {
+      SlotRef lhsSlotRef = valueAssignment.first;
+      lhsSlotRef.analyze(analyzer);
+
+      Expr rhsExpr = valueAssignment.second;
+      checkSubQuery(lhsSlotRef, rhsExpr);
+      rhsExpr.analyze(analyzer);
+
+      checkCorrectTargetTable(lhsSlotRef, rhsExpr);
+      // TODO(Kudu) Add test for this code-path when Kudu supports nested types
+      checkLhsIsColumnRef(lhsSlotRef, rhsExpr);
+
+      Column c = lhsSlotRef.getResolvedPath().destColumn();
+
+      if (keySlots.contains(lhsSlotRef.getSlotId())) {
+        boolean isSystemGeneratedColumn =
+            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
+        throw new AnalysisException(format("%s column '%s' cannot be updated.",
+            isSystemGeneratedColumn ? "System generated key" : "Key",
+            lhsSlotRef.toSql()));
+      }
+
+      if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
+        throw new AnalysisException(
+            format("Duplicate value assignment to column: '%s'", lhsSlotRef.toSql()));
+      }
+
+      rhsExpr = StatementBase.checkTypeCompatibility(
+          modifyStmt_.targetTableRef_.getDesc().getTable().getFullName(),
+          c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
+      uniqueSlots.add(lhsSlotRef.getSlotId());
+      selectList.add(new SelectListItem(rhsExpr, null));
+      referencedColumns_.add(colIndexMap.get(c.getName()));
+    }
+  }
+
+  @Override
+  public List<Expr> getPartitionKeyExprs() { return Collections.emptyList(); }
+
+  public List<Integer> getReferencedColumns() {
+    return referencedColumns_;
+  }
+
+  @Override
+  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
+  }
+
+  protected void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap, String colName)
+      throws AnalysisException {
+    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
+        keySlots, colIndexMap, colName);
+    resultExprs_.add(ref);
+  }
+
+  private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> selectList,
+      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
+      Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
+    List<String> path = Path.createRawPath(modifyStmt_.targetTableRef_.getUniqueAlias(),
+        colName);
+    SlotRef ref = new SlotRef(path);
+    ref.analyze(analyzer);
+    selectList.add(new SelectListItem(ref, null));
+    uniqueSlots.add(ref.getSlotId());
+    keySlots.add(ref.getSlotId());
+    referencedColumns.add(colIndexMap.get(colName));
+    return ref;
+  }
+
   @Override
   public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
       throws AnalysisException {
@@ -50,14 +179,13 @@ abstract class KuduModifyImpl extends ModifyImpl {
     }
   }
 
-  @Override
   public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList,
       List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
       Map<String, Integer> colIndexMap) throws AnalysisException {
     // Add the key columns as slot refs
     for (String k : kuduTable_.getPrimaryKeyColumnNames()) {
       addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots,
-          colIndexMap, k, false);
+          colIndexMap, k);
     }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
index 08b97c764..5832e1c40 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
@@ -17,22 +17,18 @@
 
 package org.apache.impala.analysis;
 
-import static java.lang.String.format;
-
 import org.apache.impala.catalog.Column;
-import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
-import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.rewrite.ExprRewriter;
 
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
+import static java.lang.String.format;
 
 /**
  * Abstract class for implementing a Modify statement such as DELETE or UPDATE. Child
@@ -42,15 +38,29 @@ abstract class ModifyImpl {
   abstract void addCastsToAssignmentsInSourceStmt(Analyzer analyzer)
       throws AnalysisException;
 
-  abstract void addKeyColumns(Analyzer analyzer,
-      List<SelectListItem> selectList, List<Integer> referencedColumns,
-      Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap)
-      throws AnalysisException;
-
   abstract void analyze(Analyzer analyzer) throws AnalysisException;
 
+  /**
+   * This method is invoked during the creation of the source SELECT statement.
+   * It populates 'selectList' with expressions we want to write during the operation.
+   * (E.g. primary keys for Kudu, or file path and row position for Iceberg DELETEs, etc.)
+   * This also sets the partition expressions and sort expressions if required.
+   */
+  abstract void buildAndValidateSelectExprs(Analyzer analyzer,
+      List<SelectListItem> selectList) throws AnalysisException;
+
   abstract DataSink createDataSink();
 
+  /**
+   * Substitutes the result expressions, partition key expressions with smap.
+   * Preserves the original types of those expressions during the substitution.
+   * It is usually invoked when a SORT node is added to the plan because the
+   * SORT node materializes sort expressions into the sort tuple, so after the
+   * SORT node we only need to have slot refs to the materialized exprs. 'smap'
+   * contains the mapping from the original exprs to the materialized slot refs.
+   */
+  abstract void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer);
+
   // The Modify statement for this modify impl. The ModifyStmt class holds information
   // about the statement (e.g. target table type, FROM, WHERE clause, etc.)
   ModifyStmt modifyStmt_;
@@ -60,31 +70,7 @@ abstract class ModifyImpl {
   // Result of the analysis of the internal SelectStmt that produces the rows that
   // will be modified.
   protected SelectStmt sourceStmt_;
-
-  // Output expressions that produce the final results to write to the target table. May
-  // include casts.
-  //
-  // In case of DELETE statements it contains the columns that identify the deleted
-  // rows (Kudu primary keys, Iceberg file_path / position).
-  protected List<Expr> resultExprs_ = new ArrayList<>();
-
-  // Exprs corresponding to the partitionKeyValues, if specified, or to the partition
-  // columns for tables.
-  protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
-
-  // For every column of the target table that is referenced in the optional
-  // 'sort.columns' table property, this list will contain the corresponding result expr
-  // from 'resultExprs_'. Before insertion, all rows will be sorted by these exprs. If the
-  // list is empty, no additional sorting by non-partitioning columns will be performed.
-  // The column list must not contain partition columns and must be empty for non-Hdfs
-  // tables.
-  protected List<Expr> sortExprs_ = new ArrayList<>();
-
-  // Position mapping of output expressions of the sourceStmt_ to column indices in the
-  // target table. The i'th position in this list maps to the referencedColumns_[i]'th
-  // position in the target table.
-  protected List<Integer> referencedColumns_ = new ArrayList<>();
-  // END: Members that are set in first run of analyze
+  // END: Members that are set in createSourceStmt().
   /////////////////////////////////////////
 
   public ModifyImpl(ModifyStmt modifyStmt) {
@@ -109,150 +95,75 @@ abstract class ModifyImpl {
     if (sourceStmt_ == null) {
       // Builds the select list and column position mapping for the target table.
       ArrayList<SelectListItem> selectList = new ArrayList<>();
-      buildAndValidateAssignmentExprs(analyzer, selectList);
+      buildAndValidateSelectExprs(analyzer, selectList);
 
       // Analyze the generated select statement.
       sourceStmt_ = new SelectStmt(new SelectList(selectList), modifyStmt_.fromClause_,
           modifyStmt_.wherePredicate_,
           null, null, null, null);
-
+      sourceStmt_.analyze(analyzer);
       addCastsToAssignmentsInSourceStmt(analyzer);
+    } else {
+      sourceStmt_.analyze(analyzer);
     }
-    sourceStmt_.analyze(analyzer);
   }
 
-  /**
-   * Validates the list of value assignments that should be used to modify the target
-   * table. It verifies that only those columns are referenced that belong to the target
-   * table, no key columns are modified, and that a single column is not modified multiple
-   * times. Analyzes the Exprs and SlotRefs of assignments_ and writes a list of
-   * SelectListItems to the out parameter selectList that is used to build the select list
-   * for sourceStmt_. A list of integers indicating the column position of an entry in the
-   * select list in the target table is written to the field referencedColumns_.
-   *
-   * In addition to the expressions that are generated for each assignment, the
-   * expression list contains an expression for each key column. The key columns
-   * are always prepended to the list of expression representing the assignments.
-   */
-  private void buildAndValidateAssignmentExprs(Analyzer analyzer,
-      List<SelectListItem> selectList)
-      throws AnalysisException {
-    // The order of the referenced columns equals the order of the result expressions
-    Set<SlotId> uniqueSlots = new HashSet<>();
-    Set<SlotId> keySlots = new HashSet<>();
-
-    // Mapping from column name to index
-    List<Column> cols = modifyStmt_.table_.getColumnsInHiveOrder();
-    Map<String, Integer> colIndexMap = new HashMap<>();
-    for (int i = 0; i < cols.size(); i++) {
-      colIndexMap.put(cols.get(i).getName(), i);
-    }
-
-    addKeyColumns(analyzer, selectList, referencedColumns_, uniqueSlots,
-        keySlots, colIndexMap);
-
-    // Assignments are only used in the context of updates.
-    for (Pair<SlotRef, Expr> valueAssignment : modifyStmt_.assignments_) {
-      SlotRef lhsSlotRef = valueAssignment.first;
-      lhsSlotRef.analyze(analyzer);
-
-      Expr rhsExpr = valueAssignment.second;
-      // No subqueries for rhs expression
-      if (rhsExpr.contains(Subquery.class)) {
-        throw new AnalysisException(
-            format("Subqueries are not supported as update expressions for column '%s'",
-                lhsSlotRef.toSql()));
-      }
-      rhsExpr.analyze(analyzer);
-
-      // Correct target table
-      if (!lhsSlotRef.isBoundByTupleIds(modifyStmt_.targetTableRef_.getId().asList())) {
-        throw new AnalysisException(
-            format("Left-hand side column '%s' in assignment expression '%s=%s' does not "
-                + "belong to target table '%s'", lhsSlotRef.toSql(), lhsSlotRef.toSql(),
-                rhsExpr.toSql(),
-                modifyStmt_.targetTableRef_.getDesc().getTable().getFullName()));
-      }
-
-      Column c = lhsSlotRef.getResolvedPath().destColumn();
-      // TODO(Kudu) Add test for this code-path when Kudu supports nested types
-      if (c == null) {
-        throw new AnalysisException(
-            format("Left-hand side in assignment expression '%s=%s' must be a column " +
-                "reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
-      }
-
-      if (keySlots.contains(lhsSlotRef.getSlotId())) {
-        boolean isSystemGeneratedColumn =
-            c instanceof KuduColumn && ((KuduColumn)c).isAutoIncrementing();
-        throw new AnalysisException(format("%s column '%s' cannot be updated.",
-            isSystemGeneratedColumn ? "System generated key" : "Key",
-            lhsSlotRef.toSql()));
-      }
-
-      if (uniqueSlots.contains(lhsSlotRef.getSlotId())) {
-        throw new AnalysisException(
-            format("Duplicate value assignment to column: '%s'", lhsSlotRef.toSql()));
-      }
+  abstract public List<Expr> getPartitionKeyExprs();
 
-      rhsExpr = StatementBase.checkTypeCompatibility(
-          modifyStmt_.targetTableRef_.getDesc().getTable().getFullName(),
-          c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
-      uniqueSlots.add(lhsSlotRef.getSlotId());
-      selectList.add(new SelectListItem(rhsExpr, null));
-      referencedColumns_.add(colIndexMap.get(c.getName()));
+  protected void checkSubQuery(SlotRef lhsSlotRef, Expr rhsExpr)
+      throws AnalysisException {
+    if (rhsExpr.contains(Subquery.class)) {
+      throw new AnalysisException(
+          format("Subqueries are not supported as update expressions for column '%s'",
+              lhsSlotRef.toSql()));
     }
   }
 
-  protected void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-      Map<String, Integer> colIndexMap, String colName, boolean isSortingColumn)
+  protected void checkCorrectTargetTable(SlotRef lhsSlotRef, Expr rhsExpr)
       throws AnalysisException {
-    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
-        keySlots, colIndexMap, colName);
-    resultExprs_.add(ref);
-    if (isSortingColumn) sortExprs_.add(ref);
+    if (!lhsSlotRef.isBoundByTupleIds(modifyStmt_.targetTableRef_.getId().asList())) {
+      throw new AnalysisException(
+          format("Left-hand side column '%s' in assignment expression '%s=%s' does not "
+              + "belong to target table '%s'", lhsSlotRef.toSql(), lhsSlotRef.toSql(),
+              rhsExpr.toSql(),
+              modifyStmt_.targetTableRef_.getDesc().getTable().getFullName()));
+    }
   }
 
-  protected void addPartitioningColumn(Analyzer analyzer, List<SelectListItem> selectList,
-  List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-  Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
-    SlotRef ref = addSlotRef(analyzer, selectList, referencedColumns, uniqueSlots,
-        keySlots, colIndexMap, colName);
-    partitionKeyExprs_.add(ref);
-    sortExprs_.add(ref);
+  protected void checkLhsIsColumnRef(SlotRef lhsSlotRef, Expr rhsExpr)
+      throws AnalysisException {
+    Column c = lhsSlotRef.getResolvedPath().destColumn();
+    if (c == null) {
+      throw new AnalysisException(
+          format("Left-hand side in assignment expression '%s=%s' must be a column " +
+              "reference", lhsSlotRef.toSql(), rhsExpr.toSql()));
+    }
   }
 
-  private SlotRef addSlotRef(Analyzer analyzer, List<SelectListItem> selectList,
-      List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots,
-      Map<String, Integer> colIndexMap, String colName) throws AnalysisException {
-    List<String> path = Path.createRawPath(modifyStmt_.targetTableRef_.getUniqueAlias(),
-        colName);
-    SlotRef ref = new SlotRef(path);
-    ref.analyze(analyzer);
-    selectList.add(new SelectListItem(ref, null));
-    uniqueSlots.add(ref.getSlotId());
-    keySlots.add(ref.getSlotId());
-    referencedColumns.add(colIndexMap.get(colName));
-    return ref;
+  protected Expr checkTypeCompatiblity(Analyzer analyzer, Column c, Expr rhsExpr)
+      throws AnalysisException {
+    return StatementBase.checkTypeCompatibility(
+        modifyStmt_.targetTableRef_.getDesc().getTable().getFullName(),
+        c, rhsExpr, analyzer, null /*widestTypeSrcExpr*/);
   }
 
-  public List<Expr> getPartitionKeyExprs() {
-     return partitionKeyExprs_;
+  protected void checkLhsOnlyAppearsOnce(Map<Integer, Expr> colToExprs, Column c,
+      SlotRef lhsSlotRef, Expr rhsExpr) throws AnalysisException {
+    if (colToExprs.containsKey(c.getPosition())) {
+      throw new AnalysisException(
+          format("Left-hand side in assignment appears multiple times '%s=%s'",
+              lhsSlotRef.toSql(), rhsExpr.toSql()));
+    }
   }
 
   public List<Expr> getSortExprs() {
-    return sortExprs_;
+    return Collections.emptyList();
   }
 
   public QueryStmt getQueryStmt() {
     return sourceStmt_;
   }
 
-  public List<Integer> getReferencedColumns() {
-    return referencedColumns_;
-  }
-
   public void castResultExprs(List<Type> types) throws AnalysisException {
     sourceStmt_.castResultExprs(types);
   }
@@ -260,9 +171,4 @@ abstract class ModifyImpl {
   public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
     sourceStmt_.rewriteExprs(rewriter);
   }
-
-  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
-    resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
-    partitionKeyExprs_ = Expr.substituteList(partitionKeyExprs_, smap, analyzer, true);
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index b3a798918..0ee571646 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -129,6 +129,11 @@ public abstract class ModifyStmt extends DmlStatementBase {
     Path path = candidates.get(0);
     path.resolve();
 
+    if (!path.isResolved()) {
+      throw new AnalysisException(format("Cannot resolve path '%s' for DML statement.",
+          path.toString()));
+    }
+
     if (path.destTupleDesc() == null) {
       throw new AnalysisException(format(
           "'%s' is not a table alias. Using the FROM clause requires the target table " +
@@ -151,6 +156,9 @@ public abstract class ModifyStmt extends DmlStatementBase {
               "but the following table is neither: %s",
               dstTbl.getFullName()));
     }
+    if (dstTbl instanceof FeIcebergTable) {
+      setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
+    }
     // Make sure that the user is allowed to modify the target table. Use ALL because no
     // UPDATE / DELETE privilege exists yet (IMPALA-3840).
     analyzer.registerAuthAndAuditEvent(dstTbl, Privilege.ALL);
@@ -183,10 +191,6 @@ public abstract class ModifyStmt extends DmlStatementBase {
   @Override
   public List<Expr> getSortExprs() { return modifyImpl_.getSortExprs(); }
 
-  public List<Integer> getReferencedColumns() {
-    return modifyImpl_.getReferencedColumns();
-  }
-
   public Expr getWherePredicate() { return wherePredicate_; }
 
   public List<Pair<SlotRef, Expr>> getAssignments() { return assignments_; }
diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
index 519823e5a..8e381f8fd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -18,10 +18,12 @@
 package org.apache.impala.analysis;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.planner.DataSink;
 import org.apache.impala.rewrite.ExprRewriter;
 
 import java.util.ArrayList;
@@ -82,9 +84,18 @@ public class OptimizeStmt extends DmlStatementBase {
     insertStmt_.reset();
   }
 
-
   public InsertStmt getInsertStmt() { return insertStmt_; }
 
+  @Override
+  public DataSink createDataSink() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    throw new NotImplementedException();
+  }
+
   @Override
   public List<Expr> getPartitionKeyExprs() {
     return insertStmt_.getPartitionKeyExprs();
diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
index 9e36ed1db..0779cf808 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java
@@ -22,6 +22,7 @@ import static java.lang.String.format;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
@@ -59,14 +60,23 @@ public class UpdateStmt extends ModifyStmt {
 
   @Override
   protected void createModifyImpl() {
-    // Currently only Kudu tables are supported.
-    Preconditions.checkState(table_ instanceof FeKuduTable);
-    modifyImpl_ = new KuduUpdateImpl(this);
+    // Currently Kudu and Iceberg tables are supported.
+    if (table_ instanceof FeKuduTable) {
+      modifyImpl_ = new KuduUpdateImpl(this);
+    } else if (table_ instanceof FeIcebergTable) {
+      modifyImpl_ = new IcebergUpdateImpl(this);
+    }
+  }
+
+  @Override
+  public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
+    modifyImpl_.substituteResultExprs(smap, analyzer);
   }
 
   /**
    * Return an instance of a KuduTableSink specialized as an Update operation.
    */
+  @Override
   public DataSink createDataSink() {
     // analyze() must have been called before.
     return modifyImpl_.createDataSink();
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 397c81b74..d631c050c 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -229,15 +229,17 @@ public class HdfsTableSink extends TableSink {
         targetTable_.getFullName(), overwriteStr, partitionKeyStr));
     // Report the total number of partitions, independent of the number of nodes
     // and the data partition of the fragment executing this sink.
-    if (explainLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
-      long totalNumPartitions = Expr.getNumDistinctValues(partitionKeyExprs_);
-      if (totalNumPartitions == -1) {
-        output.append(detailPrefix + "partitions=unavailable");
-      } else {
-        output.append(detailPrefix + "partitions="
-            + (totalNumPartitions == 0 ? 1 : totalNumPartitions));
+    if (!(targetTable_ instanceof FeIcebergTable)) {
+      if (explainLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
+        long totalNumPartitions = Expr.getNumDistinctValues(partitionKeyExprs_);
+        if (totalNumPartitions == -1) {
+          output.append(detailPrefix + "partitions=unavailable");
+        } else {
+          output.append(detailPrefix + "partitions="
+              + (totalNumPartitions == 0 ? 1 : totalNumPartitions));
+        }
+        output.append("\n");
       }
-      output.append("\n");
     }
     if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       output.append(detailPrefix + "output exprs: ")
@@ -359,7 +361,7 @@ public class HdfsTableSink extends TableSink {
   /**
    * Return an estimate of the number of nodes the fragment with this sink will
    * run on. This is based on the number of nodes set for the plan root and has an
-   * upper limit set by the MAX_HDFS_WRITER query option.
+   * upper limit set by the MAX_FS_WRITERS query option.
    */
   public int getNumNodes() {
     int num_nodes = getFragment().getPlanRoot().getNumNodes();
@@ -374,7 +376,7 @@ public class HdfsTableSink extends TableSink {
   /**
    * Return an estimate of the number of instances the fragment with this sink
    * will run on. This is based on the number of instances set for the plan root
-   * and has an upper limit set by the MAX_HDFS_WRITER query option.
+   * and has an upper limit set by the MAX_FS_WRITERS query option.
    */
   public int getNumInstances() {
     int num_instances = getFragment().getPlanRoot().getNumInstances();
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
index 6a1fdd0e1..69386b169 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
@@ -39,19 +39,21 @@ import org.apache.impala.thrift.TTableSinkType;
  * Impala does this by doing an ANTI JOIN between data files and delete files.
  */
 public class IcebergDeleteSink extends TableSink {
-
-  // Set the limit on the maximum number of hdfs table sink instances.
-  // A value of 0 means no limit.
-  private int maxHdfsSinks_;
+  final private int deleteTableId_;
 
   // Exprs for computing the output partition(s).
   protected final List<Expr> partitionKeyExprs_;
 
   public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> partitionKeyExprs,
-      List<Expr> outputExprs, int maxTableSinks) {
+      List<Expr> outputExprs) {
+    this(targetTable, partitionKeyExprs, outputExprs, 0);
+  }
+
+  public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> partitionKeyExprs,
+      List<Expr> outputExprs, int deleteTableId) {
     super(targetTable, Op.DELETE, outputExprs);
     partitionKeyExprs_ = partitionKeyExprs;
-    maxHdfsSinks_ = maxTableSinks;
+    deleteTableId_ = deleteTableId;
   }
 
   @Override
@@ -119,6 +121,7 @@ public class IcebergDeleteSink extends TableSink {
     TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
             TTableSinkType.HDFS, sinkOp_.toThrift());
     tTableSink.iceberg_delete_sink = icebergDeleteSink;
+    tTableSink.setTarget_table_id(deleteTableId_);
     tsink.table_sink = tTableSink;
     tsink.output_exprs = Expr.treesToThrift(outputExprs_);
   }
@@ -134,34 +137,6 @@ public class IcebergDeleteSink extends TableSink {
     exprs.addAll(outputExprs_);
   }
 
-  /**
-   * Return an estimate of the number of nodes the fragment with this sink will
-   * run on. This is based on the number of nodes set for the plan root and has an
-   * upper limit set by the MAX_HDFS_WRITER query option.
-   */
-  public int getNumNodes() {
-    int num_nodes = getFragment().getPlanRoot().getNumNodes();
-    if (maxHdfsSinks_ > 0) {
-      // If there are more nodes than instances where the fragment was initially
-      // planned to run then, then the instances will be distributed evenly across them.
-      num_nodes = Math.min(num_nodes, getNumInstances());
-    }
-    return num_nodes;
-  }
-
-  /**
-   * Return an estimate of the number of instances the fragment with this sink
-   * will run on. This is based on the number of instances set for the plan root
-   * and has an upper limit set by the MAX_HDFS_WRITER query option.
-   */
-  public int getNumInstances() {
-    int num_instances = getFragment().getPlanRoot().getNumInstances();
-    if (maxHdfsSinks_ > 0) {
-      num_instances =  Math.min(num_instances, maxHdfsSinks_);
-    }
-    return num_instances;
-  }
-
   @Override
   public void computeRowConsumptionAndProductionToCost() {
     super.computeRowConsumptionAndProductionToCost();
diff --git a/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java b/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java
new file mode 100644
index 000000000..ee357b559
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/MultiDataSink.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.impala.analysis.Expr;
+
+import org.apache.impala.thrift.TDataSink;
+import org.apache.impala.thrift.TDataSinkType;
+import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TQueryOptions;
+
+/**
+ * MultiDataSink can aggregate multiple DataSinks that operate on the same source, e.g.
+ * on the same SELECT statement of a modify statement. The backend operator will send
+ * each row batch to each child data sink. Child data sinks might use different
+ * expressions from the same source tuples. E.g. some data sink might write a different
+ * subset of columns than other data sinks. One example is Iceberg UPDATEs, which
+ * insert records to data files and delete files simultaneously.
+ */
+public class MultiDataSink extends DataSink {
+  private List<DataSink> dataSinks_ = new ArrayList<>();
+
+  public MultiDataSink() {}
+
+  /**
+   * This must be called after all child sinks have been added.
+   */
+  @Override
+  public void setFragment(PlanFragment fragment) {
+    fragment_ = fragment;
+    for (DataSink tsink : dataSinks_) {
+      tsink.setFragment(fragment);
+    }
+  }
+
+  public void addDataSink(DataSink tsink) {
+    dataSinks_.add(tsink);
+  }
+
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    for (int i = 0; i < dataSinks_.size(); ++i) {
+      DataSink dsink = dataSinks_.get(i);
+      dsink.computeProcessingCost(queryOptions);
+      ProcessingCost dsinkCost = dsink.getProcessingCost();
+      if (i == 0) {
+        processingCost_ = dsinkCost;
+      } else {
+        processingCost_ = ProcessingCost.sumCost(processingCost_, dsinkCost);
+      }
+    }
+  }
+
+  @Override
+  public void computeResourceProfile(TQueryOptions queryOptions) {
+    for (int i = 0; i < dataSinks_.size(); ++i) {
+      DataSink dsink = dataSinks_.get(i);
+      dsink.computeResourceProfile(queryOptions);
+      ResourceProfile dsinkProfile = dsink.getResourceProfile();
+      if (i == 0) {
+        resourceProfile_ = dsinkProfile;
+      } else {
+        resourceProfile_.combine(dsinkProfile);
+      }
+    }
+  }
+
+  @Override
+  public void appendSinkExplainString(String prefix, String detailPrefix,
+      TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) {
+    output.append(String.format("%sMULTI DATA SINK\n", prefix));
+    for (DataSink dsink : dataSinks_) {
+      dsink.appendSinkExplainString(prefix + "|->", detailPrefix + "  ",
+          queryOptions, explainLevel, output);
+    }
+  }
+
+  @Override
+  protected String getLabel() {
+    return "MULTI DATA SINK";
+  }
+
+  @Override
+  protected void toThriftImpl(TDataSink tdsink) {
+    for (int i = 0; i < dataSinks_.size(); ++i) {
+      DataSink dsink = dataSinks_.get(i);
+      tdsink.addToChild_data_sinks(dsink.toThrift());
+    }
+  }
+
+  @Override
+  protected TDataSinkType getSinkType() {
+    return TDataSinkType.MULTI_DATA_SINK;
+  }
+
+  @Override
+  public void collectExprs(List<Expr> exprs) {
+    for (int i = 0; i < dataSinks_.size(); ++i) {
+      DataSink dsink = dataSinks_.get(i);
+      dsink.collectExprs(exprs);
+    }
+  }
+
+  @Override
+  public void computeRowConsumptionAndProductionToCost() {
+    super.computeRowConsumptionAndProductionToCost();
+    fragment_.setFixedInstanceCount(fragment_.getNumInstances());
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index c33761a8d..eb944f8f6 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -27,8 +27,8 @@ import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.ColumnLineageGraph;
-import org.apache.impala.analysis.DeleteStmt;
 import org.apache.impala.analysis.ColumnLineageGraph.ColumnLabel;
+import org.apache.impala.analysis.DmlStatementBase;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprSubstitutionMap;
 import org.apache.impala.analysis.InsertStmt;
@@ -160,7 +160,7 @@ public class Planner {
       InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
       insertStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
       if (!ctx_.isSingleNodeExec()) {
-        // repartition on partition keys
+        // Repartition on partition keys
         rootFragment = distributedPlanner.createDmlFragment(
             rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
       }
@@ -172,22 +172,20 @@ public class Planner {
       QueryStmt queryStmt = ctx_.getQueryStmt();
       queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
       List<Expr> resultExprs = queryStmt.getResultExprs();
-      if (ctx_.isUpdate()) {
-        // Set up update sink for root fragment
-        rootFragment.setSink(
-            ctx_.getAnalysisResult().getUpdateStmt().createDataSink());
-      } else if (ctx_.isDelete()) {
-        // Set up delete sink for root fragment
-        DeleteStmt deleteStmt = ctx_.getAnalysisResult().getDeleteStmt();
-        if (deleteStmt.getTargetTable() instanceof FeIcebergTable) {
-          if (!ctx_.isSingleNodeExec()) {
-            // repartition on partition keys
-            rootFragment = distributedPlanner.createDmlFragment(
-                rootFragment, deleteStmt, ctx_.getRootAnalyzer(), fragments);
-          }
-          createPreDeleteSort(deleteStmt, rootFragment, ctx_.getRootAnalyzer());
+      if (ctx_.isUpdate() || ctx_.isDelete()) {
+        DmlStatementBase stmt;
+        if (ctx_.isUpdate()) {
+          stmt = ctx_.getAnalysisResult().getUpdateStmt();
+        } else {
+          stmt = ctx_.getAnalysisResult().getDeleteStmt();
+        }
+        Preconditions.checkNotNull(stmt);
+        if (stmt.getTargetTable() instanceof FeIcebergTable) {
+          rootFragment = createIcebergDmlPlanFragment(
+              rootFragment, distributedPlanner, stmt, fragments);
         }
-        rootFragment.setSink(deleteStmt.createDataSink());
+        // Set up update sink for root fragment
+        rootFragment.setSink(stmt.createDataSink());
       } else if (ctx_.isQuery()) {
         rootFragment.setSink(
             ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs));
@@ -271,6 +269,18 @@ public class Planner {
     return fragments;
   }
 
+  public PlanFragment createIcebergDmlPlanFragment(PlanFragment rootFragment,
+      DistributedPlanner distributedPlanner, DmlStatementBase stmt,
+      List<PlanFragment> fragments) throws ImpalaException {
+    if (!ctx_.isSingleNodeExec()) {
+      // Repartition on partition keys
+      rootFragment = distributedPlanner.createDmlFragment(
+          rootFragment, stmt, ctx_.getRootAnalyzer(), fragments);
+    }
+    createPreDmlSort(stmt, rootFragment, ctx_.getRootAnalyzer());
+    return rootFragment;
+  }
+
   /**
    * Return a list of plans, each represented by the root of their fragment trees. May
    * return a single-node, distributed, or parallel plan depending on the query and
@@ -909,20 +919,21 @@ public class Planner {
     inputFragment.setPlanRoot(node);
   }
 
-  public void createPreDeleteSort(DeleteStmt deleteStmt, PlanFragment inputFragment,
+  public void createPreDmlSort(DmlStatementBase dmlStmt, PlanFragment inputFragment,
       Analyzer analyzer) throws ImpalaException {
     List<Expr> orderingExprs = new ArrayList<>();
 
-    orderingExprs.addAll(deleteStmt.getSortExprs());
+    orderingExprs.addAll(dmlStmt.getPartitionKeyExprs());
+    orderingExprs.addAll(dmlStmt.getSortExprs());
 
     // Build sortinfo to sort by the ordering exprs.
     List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true);
     List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), false);
     SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams,
         TSortingOrder.LEXICAL);
-    sortInfo.createSortTupleInfo(deleteStmt.getResultExprs(), analyzer);
+    sortInfo.createSortTupleInfo(dmlStmt.getResultExprs(), analyzer);
     sortInfo.getSortTupleDescriptor().materializeSlots();
-    deleteStmt.substituteResultExprs(sortInfo.getOutputSmap(), analyzer);
+    dmlStmt.substituteResultExprs(sortInfo.getOutputSmap(), analyzer);
 
     PlanNode node = SortNode.createTotalSortNode(
         ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo, 0);
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 88e975497..543c756a4 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -95,6 +95,7 @@ import org.apache.impala.analysis.StmtMetadataLoader;
 import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.analysis.TruncateStmt;
+import org.apache.impala.analysis.UpdateStmt;
 import org.apache.impala.authentication.saml.ImpalaSamlClient;
 import org.apache.impala.authorization.AuthorizationChecker;
 import org.apache.impala.authorization.AuthorizationConfig;
@@ -126,7 +127,6 @@ import org.apache.impala.catalog.ImpaladTableUsageTracker;
 import org.apache.impala.catalog.MaterializedViewHdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
-import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
@@ -2532,6 +2532,9 @@ public class Frontend {
         if (analysisResult.isDeleteStmt()) {
           addFinalizationParamsForDelete(queryCtx, queryExecRequest,
               analysisResult.getDeleteStmt());
+        } else if (analysisResult.isUpdateStmt()) {
+          addFinalizationParamsForUpdate(queryCtx, queryExecRequest,
+              analysisResult.getUpdateStmt());
         }
       }
       return result;
@@ -2603,15 +2606,33 @@ public class Frontend {
       targetTable = ((IcebergPositionDeleteTable)targetTable).getBaseTable();
       TFinalizeParams finalizeParams = addFinalizationParamsForDml(
           queryCtx, targetTable, false);
-      TIcebergDmlFinalizeParams iceFinalizeParams = new TIcebergDmlFinalizeParams();
-      iceFinalizeParams.operation = TIcebergOperation.DELETE;
-      FeIcebergTable iceTable = (FeIcebergTable)targetTable;
-      iceFinalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId());
-      iceFinalizeParams.setInitial_snapshot_id(iceTable.snapshotId());
+      TIcebergDmlFinalizeParams iceFinalizeParams = addFinalizationParamsForIcebergDml(
+        (FeIcebergTable)targetTable, TIcebergOperation.DELETE);
       finalizeParams.setIceberg_params(iceFinalizeParams);
       queryExecRequest.setFinalize_params(finalizeParams);
   }
 
+  private static void addFinalizationParamsForUpdate(
+      TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, UpdateStmt updateStmt) {
+    FeTable targetTable = updateStmt.getTargetTable();
+    if (!(targetTable instanceof FeIcebergTable)) return;
+    TFinalizeParams finalizeParams = addFinalizationParamsForDml(
+        queryCtx, targetTable, false);
+    TIcebergDmlFinalizeParams iceFinalizeParams = addFinalizationParamsForIcebergDml(
+        (FeIcebergTable)targetTable, TIcebergOperation.UPDATE);
+    finalizeParams.setIceberg_params(iceFinalizeParams);
+    queryExecRequest.setFinalize_params(finalizeParams);
+  }
+
+  private static TIcebergDmlFinalizeParams addFinalizationParamsForIcebergDml(
+      FeIcebergTable iceTable, TIcebergOperation iceOperation) {
+    TIcebergDmlFinalizeParams iceFinalizeParams = new TIcebergDmlFinalizeParams();
+    iceFinalizeParams.operation = iceOperation;
+    iceFinalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId());
+    iceFinalizeParams.setInitial_snapshot_id(iceTable.snapshotId());
+    return iceFinalizeParams;
+  }
+
   // This is public to allow external frontends to utilize this method to fill in the
   // finalization parameters for externally generated INSERTs.
   public static void addFinalizationParamsForInsert(
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index f98289311..eb385d09b 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -355,6 +355,10 @@ public class IcebergCatalogOpExecutor {
     switch (icebergOp.operation) {
       case INSERT: appendFiles(feIcebergTable, txn, icebergOp); break;
       case DELETE: deleteRows(feIcebergTable, txn, icebergOp); break;
+      case UPDATE: {
+        deleteRows(feIcebergTable, txn, icebergOp);
+        appendFiles(feIcebergTable, txn, icebergOp);
+      } break;
       default: throw new ImpalaRuntimeException(
           "Unknown Iceberg operation: " + icebergOp.operation);
     }
diff --git a/fe/src/main/java/org/apache/impala/util/ExprUtil.java b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
index 6a7e2cedc..0adb557b4 100644
--- a/fe/src/main/java/org/apache/impala/util/ExprUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/ExprUtil.java
@@ -23,6 +23,7 @@ import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.SelectListItem;
 import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -32,6 +33,7 @@ import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TColumnValue;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class ExprUtil {
   /**
@@ -133,4 +135,9 @@ public class ExprUtil {
     // expressions and potentially LLVM translation in BE. The function must also
     // run fast.
   }
+
+  public static List<SelectListItem> exprsAsSelectList(List<Expr> exprs) {
+    return exprs.stream().map(
+        e -> new SelectListItem(e, null)).collect(Collectors.toList());
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index f7092d2fb..d86205a64 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -65,12 +65,22 @@ import org.apache.iceberg.transforms.PartitionSpecVisitor;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.analysis.FunctionName;
+import org.apache.impala.analysis.FunctionParams;
 import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.IcebergPartitionTransform;
+import org.apache.impala.analysis.NumericLiteral;
+import org.apache.impala.analysis.StatementBase;
+import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TimeTravelSpec;
 import org.apache.impala.analysis.TimeTravelSpec.Kind;
+import org.apache.impala.catalog.BuiltinsDb;
 import org.apache.impala.catalog.Catalog;
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.IcebergColumn;
@@ -83,6 +93,7 @@ import org.apache.impala.catalog.iceberg.IcebergCatalogs;
 import org.apache.impala.catalog.iceberg.IcebergHadoopCatalog;
 import org.apache.impala.catalog.iceberg.IcebergHadoopTables;
 import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
@@ -1136,4 +1147,106 @@ public class IcebergUtil {
 
     return props;
   }
+
+  public static void validateIcebergColumnsForInsert(FeIcebergTable iceTable)
+      throws AnalysisException {
+    for (Types.NestedField field : iceTable.getIcebergSchema().columns()) {
+      org.apache.iceberg.types.Type iceType = field.type();
+      if (iceType.isPrimitiveType() && iceType instanceof Types.TimestampType) {
+        Types.TimestampType tsType = (Types.TimestampType)iceType;
+        if (tsType.shouldAdjustToUTC()) {
+          throw new AnalysisException("The Iceberg table has a TIMESTAMPTZ " +
+              "column that Impala cannot write.");
+        }
+      }
+    }
+  }
+
+  /**
+   * Creates partition key expressions in the order of Iceberg partition spec fields.
+   */
+  public static void populatePartitionExprs(Analyzer analyzer,
+      List<Expr> widestTypeExprList, List<Column> selectExprTargetColumns,
+      List<Expr> selectListExprs, FeIcebergTable targetTable,
+      List<Expr> outPartitionExprs, List<Integer> outPartitionColPos)
+      throws AnalysisException {
+    Preconditions.checkNotNull(outPartitionExprs);
+    Preconditions.checkState(outPartitionExprs.isEmpty());
+    Preconditions.checkState(outPartitionColPos == null || outPartitionColPos.isEmpty());
+    IcebergPartitionSpec icebergPartSpec = targetTable.getDefaultPartitionSpec();
+    if (!icebergPartSpec.hasPartitionFields()) return;
+    String tableName = targetTable.getFullName();
+    for (IcebergPartitionField partField : icebergPartSpec.getIcebergPartitionFields()) {
+      if (partField.getTransformType() == TIcebergPartitionTransformType.VOID) continue;
+      for (int i = 0; i < selectListExprs.size(); ++i) {
+        IcebergColumn targetColumn = (IcebergColumn)selectExprTargetColumns.get(i);
+        if (targetColumn.getFieldId() != partField.getSourceId()) continue;
+        // widestTypeExpr is widest type expression for column i
+        Expr widestTypeExpr =
+            (widestTypeExprList != null) ? widestTypeExprList.get(i) : null;
+        Expr icebergPartitionTransformExpr = getIcebergPartitionTransformExpr(analyzer,
+            tableName, partField, targetColumn, selectListExprs.get(i), widestTypeExpr);
+        outPartitionExprs.add(icebergPartitionTransformExpr);
+        if (outPartitionColPos != null) {
+          outPartitionColPos.add(targetColumn.getPosition());
+        }
+        break;
+      }
+    }
+  }
+
+  private static Expr getIcebergPartitionTransformExpr(Analyzer analyzer,
+      String targetTableName, IcebergPartitionField partField, IcebergColumn targetColumn,
+      Expr sourceExpr, Expr widestTypeExpr) throws AnalysisException {
+    Preconditions.checkState(targetColumn.getFieldId() == partField.getSourceId());
+    Expr compatibleExpr = StatementBase.checkTypeCompatibility(targetTableName,
+        targetColumn, sourceExpr, analyzer, widestTypeExpr);
+    Expr ret = IcebergUtil.getIcebergPartitionTransformExpr(partField, compatibleExpr);
+    ret.analyze(analyzer);
+    return ret;
+  }
+
+  /**
+   * Returns the partition transform expression. E.g. if the partition transform is DAY,
+   * it returns 'to_date(compatibleExpr)'. If the partition transform is BUCKET,
+   * it returns 'iceberg_bucket_transform(compatibleExpr, transformParam)'.
+   */
+  private static Expr getIcebergPartitionTransformExpr(IcebergPartitionField partField,
+                                                Expr compatibleExpr) {
+    String funcNameStr = transformTypeToFunctionName(partField.getTransformType());
+    if (funcNameStr == null || funcNameStr.equals("")) return compatibleExpr;
+    FunctionName fnName = new FunctionName(BuiltinsDb.NAME, funcNameStr);
+    List<Expr> paramList = new ArrayList<>();
+    paramList.add(compatibleExpr);
+    Integer transformParam = partField.getTransformParam();
+    if (transformParam != null) {
+      paramList.add(NumericLiteral.create(transformParam));
+    }
+    if (partField.getTransformType() == TIcebergPartitionTransformType.MONTH) {
+      paramList.add(new StringLiteral("yyyy-MM"));
+    }
+    else if (partField.getTransformType() == TIcebergPartitionTransformType.HOUR) {
+      paramList.add(new StringLiteral("yyyy-MM-dd-HH"));
+    }
+    FunctionCallExpr fnCall = new FunctionCallExpr(fnName, new FunctionParams(paramList));
+    fnCall.setIsInternalFnCall(true);
+    return fnCall;
+  }
+
+  /**
+   * Returns the builtin function to use for the given partition transform.
+   */
+  private static String transformTypeToFunctionName(
+      TIcebergPartitionTransformType transformType) {
+    switch (transformType) {
+      case IDENTITY: return "";
+      case HOUR:
+      case MONTH: return "from_timestamp";
+      case DAY: return "to_date";
+      case YEAR: return "year";
+      case BUCKET: return "iceberg_bucket_transform";
+      case TRUNCATE: return "iceberg_truncate_transform";
+    }
+    return "";
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 3267208c5..32a352752 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1315,6 +1315,12 @@ public class PlannerTest extends PlannerTestBase {
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
+  @Test
+  public void testIcebergV2Update() {
+    runPlannerTestFile("iceberg-v2-update", "functional_parquet",
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+  }
+
   /**
    * Check that Iceberg metadata table scan plans are as expected.
    */
diff --git a/shell/impala_client.py b/shell/impala_client.py
index ff6af015c..faf9486c3 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -974,8 +974,12 @@ class ImpalaHS2Client(ImpalaClient):
         raise RPCException("Impala DML operation did not return DML statistics.")
 
       num_rows = sum([int(k) for k in resp.dml_result.rows_modified.values()])
+      if resp.dml_result.rows_deleted:
+        num_deleted_rows = sum([int(k) for k in resp.dml_result.rows_deleted.values()])
+      else:
+        num_deleted_rows = None
       last_query_handle.is_closed = True
-      return (num_rows, resp.dml_result.num_row_errors)
+      return (num_rows, num_deleted_rows, resp.dml_result.num_row_errors)
     finally:
       self._clear_current_query_handle()
 
@@ -1293,7 +1297,7 @@ class StrictHS2Client(ImpalaHS2Client):
 
   def close_dml(self, last_query_handle):
     self.close_query(last_query_handle)
-    return (None, None)
+    return (None, None, None)
 
   def close_query(self, last_query_handle):
     # Set a member in the handle to make sure that it is idempotent
@@ -1450,8 +1454,12 @@ class ImpalaBeeswaxClient(ImpalaClient):
     if rpc_status != RpcStatus.OK:
        raise RPCException()
     num_rows = sum([int(k) for k in insert_result.rows_modified.values()])
+    if insert_result.rows_deleted:
+      num_deleted_rows = sum([int(k) for k in insert_result.rows_deleted.values()])
+    else:
+      num_deleted_rows = None
     last_query_handle.is_closed = True
-    return (num_rows, insert_result.num_row_errors)
+    return (num_rows, num_deleted_rows, insert_result.num_row_errors)
 
   def close_query(self, last_query_handle):
     # Set a member in the handle to make sure that it is idempotent
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index cadce7a89..cbc28f2a0 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1378,7 +1378,8 @@ class ImpalaShell(cmd.Cmd, object):
       if is_dml:
         # retrieve the error log
         warning_log = self.imp_client.get_warning_log(self.last_query_handle)
-        (num_rows, num_row_errors) = self.imp_client.close_dml(self.last_query_handle)
+        (num_rows, num_deleted_rows, num_row_errors) = self.imp_client.close_dml(
+            self.last_query_handle)
       else:
         # impalad does not support the fetching of metadata for certain types of queries.
         if not self.imp_client.expect_result_metadata(query_str, self.last_query_handle):
@@ -1421,7 +1422,11 @@ class ImpalaShell(cmd.Cmd, object):
       else:
         error_report = ""
 
-      if num_rows is not None:
+      if is_dml and num_rows == 0 and num_deleted_rows > 0:
+        verb = "Deleted"
+        self._print_if_verbose("%s %d row(s)%s in %2.2fs" %
+            (verb, num_deleted_rows, error_report, time_elapsed))
+      elif num_rows is not None:
         self._print_if_verbose("%s %d row(s)%s in %2.2fs" %
             (verb, num_rows, error_report, time_elapsed))
       else:
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index fc345e042..6ee4de13a 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3321,7 +3321,8 @@ iceberg_partition_evolution
 CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 (id int, int_col int, string_col string, date_string_col string, year int, month int)
 PARTITIONED BY SPEC (year, truncate(4, date_string_col))
-STORED AS ICEBERG;
+STORED AS ICEBERG
+TBLPROPERTIES ('format-version'='2');
 ---- DEPENDENT_LOAD
 # We can use 'date_string_col' as it is once IMPALA-11954 is done.
 INSERT INTO {db_name}{db_suffix}.iceberg_partition_evolution
@@ -3388,7 +3389,8 @@ iceberg_int_partitioned
 ---- CREATE
 CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (i INT, j INT, k INT)
 PARTITIONED BY SPEC (i, j)
-STORED AS ICEBERG;
+STORED AS ICEBERG
+TBLPROPERTIES ('format-version'='2');
 ====
 ---- DATASET
 functional
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
new file mode 100644
index 000000000..dd7fee1f9
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
@@ -0,0 +1,344 @@
+UPDATE iceberg_v2_no_deletes set s = concat(s,s) where i = 3
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
+|
+01:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=36B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
+   HDFS partitions=1/1 files=1 size=625B
+   predicates: i = 3
+   row-size=36B cardinality=1
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
+|
+01:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=36B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
+   HDFS partitions=1/1 files=1 size=625B
+   predicates: i = 3
+   row-size=36B cardinality=1
+====
+UPDATE iceberg_v2_delete_positional SET `data` = concat(`data`,'a') where id = 15
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+03:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=40B cardinality=1
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=40B cardinality=1
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   predicates: id = 15
+   row-size=40B cardinality=1
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+04:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=40B cardinality=1
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=40B cardinality=1
+|
+|--03:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   predicates: id = 15
+   row-size=40B cardinality=1
+====
+UPDATE iceberg_v2_delete_positional SET id = cast(id+1 as int)
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+03:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=40B cardinality=2
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=40B cardinality=2
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   row-size=40B cardinality=3
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+04:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=40B cardinality=2
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=40B cardinality=2
+|
+|--03:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   row-size=40B cardinality=3
+====
+UPDATE iceberg_v2_delete_positional SET id = 42 WHERE FILE__POSITION = id
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+03:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=32B cardinality=1
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=40B cardinality=1
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   predicates: FILE__POSITION = id
+   row-size=40B cardinality=1
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
+|
+04:SORT
+|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=32B cardinality=1
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=40B cardinality=1
+|
+|--03:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
+|     HDFS partitions=1/1 files=1 size=1.54KB
+|     row-size=182B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional]
+   HDFS partitions=1/1 files=1 size=662B
+   predicates: FILE__POSITION = id
+   row-size=40B cardinality=1
+====
+UPDATE iceberg_v2_partitioned_position_deletes set id = length(action)
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+03:SORT
+|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=76B cardinality=10
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=76B cardinality=10
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=3 size=9.47KB
+|     row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=3 size=3.48KB
+   row-size=76B cardinality=20
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+05:SORT
+|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=76B cardinality=10
+|
+04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=76B cardinality=10
+|
+|--03:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=3 size=9.47KB
+|     row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=3 size=3.48KB
+   row-size=76B cardinality=20
+====
+UPDATE iceberg_v2_partitioned_position_deletes set id = length(action) where action = 'click'
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+03:SORT
+|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=76B cardinality=3
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=76B cardinality=3
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.15KB
+|     row-size=204B cardinality=3
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.15KB
+   skipped Iceberg predicates: action = 'click'
+   row-size=76B cardinality=6
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+04:SORT
+|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=76B cardinality=3
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=76B cardinality=3
+|
+|--03:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.15KB
+|     row-size=204B cardinality=3
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.15KB
+   skipped Iceberg predicates: action = 'click'
+   row-size=76B cardinality=6
+====
+UPDATE target set user = s from iceberg_v2_partitioned_position_deletes target, iceberg_v2_positional_update_all_rows source where target.id = source.i
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+09:SORT
+|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=80B cardinality=10
+|
+08:HASH JOIN [INNER JOIN]
+|  hash predicates: target.id = source.i
+|  runtime filters: RF000 <- source.i
+|  row-size=104B cardinality=10
+|
+|--07:UNION
+|  |  pass-through-operands: all
+|  |  row-size=36B cardinality=4
+|  |
+|  |--05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  |  |  row-size=36B cardinality=1
+|  |  |
+|  |  |--04:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-04 source-position-delete]
+|  |  |     HDFS partitions=1/1 files=1 size=2.60KB
+|  |  |     row-size=246B cardinality=3
+|  |  |
+|  |  03:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows source]
+|  |     HDFS partitions=1/1 files=1 size=625B
+|  |     row-size=36B cardinality=3
+|  |
+|  06:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows source]
+|     HDFS partitions=1/1 files=1 size=625B
+|     row-size=36B cardinality=3
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  row-size=68B cardinality=10
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 target-position-delete]
+|     HDFS partitions=1/1 files=3 size=9.47KB
+|     row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes target]
+   HDFS partitions=1/1 files=3 size=3.48KB
+   runtime filters: RF000 -> target.id
+   row-size=68B cardinality=20
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
+|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|
+13:SORT
+|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  row-size=80B cardinality=10
+|
+12:EXCHANGE [HASH(target.PARTITION__SPEC__ID,target.ICEBERG__PARTITION__SERIALIZED)]
+|
+08:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: target.id = source.i
+|  runtime filters: RF000 <- source.i
+|  row-size=104B cardinality=10
+|
+|--11:EXCHANGE [BROADCAST]
+|  |
+|  07:UNION
+|  |  pass-through-operands: all
+|  |  row-size=36B cardinality=4
+|  |
+|  |--05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  |  |  row-size=36B cardinality=1
+|  |  |
+|  |  |--10:EXCHANGE [DIRECTED]
+|  |  |  |
+|  |  |  04:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-04 source-position-delete]
+|  |  |     HDFS partitions=1/1 files=1 size=2.60KB
+|  |  |     row-size=246B cardinality=3
+|  |  |
+|  |  03:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows source]
+|  |     HDFS partitions=1/1 files=1 size=625B
+|  |     row-size=36B cardinality=3
+|  |
+|  06:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows source]
+|     HDFS partitions=1/1 files=1 size=625B
+|     row-size=36B cardinality=3
+|
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
+|  row-size=68B cardinality=10
+|
+|--09:EXCHANGE [DIRECTED]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 target-position-delete]
+|     HDFS partitions=1/1 files=3 size=9.47KB
+|     row-size=204B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes target]
+   HDFS partitions=1/1 files=3 size=3.48KB
+   runtime filters: RF000 -> target.id
+   row-size=68B cardinality=20
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
index a6dbe3f93..4c9026e0a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
@@ -394,7 +394,6 @@ WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(b.`year`,
 optimize table functional_parquet.iceberg_partition_transforms_zorder
 ---- PLAN
 WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
-|  partitions=unavailable
 |
 01:SORT
 |  order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j
@@ -405,7 +404,6 @@ WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE
    row-size=36B cardinality=0
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
-|  partitions=unavailable
 |
 02:SORT
 |  order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j
@@ -416,3 +414,4 @@ WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE
 00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
    HDFS partitions=1/1 files=0 size=0B
    row-size=36B cardinality=0
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
index c8ef45e67..783624c48 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
@@ -945,7 +945,6 @@ insert into functional_parquet.iceberg_int_partitioned
 select id % 3, id % 2, id from functional.alltypes
 ---- PLAN
 WRITE TO HDFS [functional_parquet.iceberg_int_partitioned, OVERWRITE=false, PARTITION-KEYS=(id % 3,id % 2)]
-|  partitions=53290000
 |
 01:SORT
 |  order by: id % 3 ASC NULLS LAST, id % 2 ASC NULLS LAST
@@ -956,7 +955,6 @@ WRITE TO HDFS [functional_parquet.iceberg_int_partitioned, OVERWRITE=false, PART
    row-size=4B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional_parquet.iceberg_int_partitioned, OVERWRITE=false, PARTITION-KEYS=(id % 3,id % 2)]
-|  partitions=53290000
 |
 02:SORT
 |  order by: id % 3 ASC NULLS LAST, id % 2 ASC NULLS LAST
@@ -978,7 +976,6 @@ select years_add(timestamp_col, id % 3),
 from functional.alltypes
 ---- PLAN
 WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(years_add(timestamp_col, id % 3)),iceberg_bucket_transform(concat(string_col, date_string_col), 5))]
-|  partitions=5372800
 |
 01:SORT
 |  order by: LEXICAL: year(years_add(timestamp_col, id % 3)) ASC NULLS LAST, iceberg_bucket_transform(concat(string_col, date_string_col), 5) ASC NULLS LAST, ZORDER: CAST(id * 3 AS INT), CAST(10000 - id AS INT)
@@ -989,7 +986,6 @@ WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE
       row-size=53B cardinality=7.30K
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(years_add(timestamp_col, id % 3)),iceberg_bucket_transform(concat(string_col, date_string_col), 5))]
-|  partitions=5372800
 |
 02:SORT
 |  order by: LEXICAL: year(years_add(timestamp_col, id % 3)) ASC NULLS LAST, iceberg_bucket_transform(concat(string_col, date_string_col), 5) ASC NULLS LAST, ZORDER: CAST(id * 3 AS INT), CAST(10000 - id AS INT)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 6f70d510c..cd22079b2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -791,7 +791,8 @@ optimize table iceberg_overwrite_bucket;
 AnalysisException: The Iceberg table has multiple partition specs. This means the outcome of dynamic partition overwrite is unforeseeable. Consider using TRUNCATE then INSERT INTO from the previous snapshot to overwrite your table.
 ====
 ---- QUERY
-CREATE TABLE ice_complex (id BIGINT NULL, int_array ARRAY<INT> NULL) STORED AS ICEBERG;
+CREATE TABLE ice_complex (id BIGINT NULL, int_array ARRAY<INT> NULL) STORED AS ICEBERG
+TBLPROPERTIES ('format-version'='2');
 optimize table ice_complex;
 ---- CATCH
 AnalysisException: Unable to INSERT into target table ($DATABASE.ice_complex) because the column 'int_array' has a complex type 'ARRAY<INT>' and Impala doesn't support inserting into tables containing complex type columns
@@ -802,3 +803,78 @@ SELECT ICEBERG__DATA__SEQUENCE__NUMBER FROM functional_parquet.alltypes;
 ---- CATCH
 AnalysisException: Could not resolve column/field reference: 'iceberg__data__sequence__number'
 ====
+---- QUERY
+update functional_parquet.iceberg_partition_evolution set year = 2023 where id = 14393;
+---- CATCH
+AnalysisException: Table 'functional_parquet.iceberg_partition_evolution' has multiple partition specs, therefore cannot be used as a target table in an UPDATE statement
+====
+---- QUERY
+update functional_parquet.iceberg_int_partitioned set i = j;
+---- CATCH
+AnalysisException: Left-hand side in assignment 'i = j' refers to a partitioning column
+====
+---- QUERY
+update functional_parquet.iceberg_int_partitioned set k = 1, k = 2;
+---- CATCH
+AnalysisException: Left-hand side in assignment appears multiple times 'k=2'
+====
+---- QUERY
+create table cow (i int, j int) stored by iceberg tblproperties ('format-version'='2', 'write.update.mode'='copy-on-write');
+update cow set i = 2;
+---- CATCH
+AnalysisException: Unsupported update mode: 'copy-on-write' for Iceberg table: $DATABASE.cow
+====
+---- QUERY
+update ice_complex set id = id + 1;
+---- CATCH
+AnalysisException: Impala does not support updating tables with complex types. Table '$DATABASE.ice_complex' has column 'int_array' with type: ARRAY<INT>
+====
+---- QUERY
+# Cannot update virtual columns
+update functional_parquet.iceberg_int_partitioned set input__file__name = 'void';
+---- CATCH
+AnalysisException: Left-hand side in assignment expression 'input__file__name='void'' must be a column reference
+====
+---- QUERY
+update functional_parquet.iceberg_int_partitioned set file__position = 42;
+---- CATCH
+AnalysisException: Left-hand side in assignment expression 'file__position=42' must be a column reference
+====
+---- HIVE_QUERY
+CREATE TABLE $DATABASE.ice_v2_timestamptz (i int, ts TIMESTAMP WITH LOCAL TIME ZONE)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+invalidate metadata $DATABASE.ice_v2_timestamptz;
+update ice_v2_timestamptz set i = 3;
+---- CATCH
+AnalysisException: The Iceberg table has a TIMESTAMPTZ column that Impala cannot write.
+====
+---- QUERY
+create table sorted_ice (i int) sort by (i)
+stored by iceberg tblproperties ('format-version'='2');
+update sorted_ice set i = 4;
+---- CATCH
+AnalysisException: Impala does not support updating sorted tables. Data files in table '$DATABASE.sorted_ice' are sorted by the following column(s): i
+====
+---- QUERY
+# Metadata tables should raise proper error message.
+update $DATABASE.sorted_ice.history set parent_id = 0 where snapshot_id = 3;
+---- CATCH
+AnalysisException: Cannot resolve path '$DATABASE.sorted_ice.history' for DML statement.
+====
+---- QUERY
+create table update_orc (i int)
+stored by iceberg
+tblproperties ('write.delete.format.default'='parquet', 'write.format.default'='orc', 'format-version'='2');
+update update_orc set i = 3;
+---- CATCH
+AnalysisException: Impala can only write Parquet data files, while table '$DATABASE.update_orc' expects 'ORC' data files.
+====
+---- QUERY
+alter table update_orc set tblproperties ('write.delete.format.default'='orc', 'write.format.default'='parquet');
+update update_orc set i = 3;
+---- CATCH
+AnalysisException: Impala can only write delete files in PARQUET, but the given table uses a different file format: $DATABASE.update_orc
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
new file mode 100644
index 000000000..d44dd16ec
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
@@ -0,0 +1,388 @@
+====
+---- QUERY
+create table single_col (i int)
+stored by iceberg
+tblproperties ('format-version'='2');
+update single_col set i = 4;
+---- DML_RESULTS: single_col
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+NumModifiedRows: 0
+NumDeletedRows: 0
+====
+---- QUERY
+insert into single_col values (1), (2), (3);
+update single_col set i = cast(i + 1 as int);
+---- DML_RESULTS: single_col
+2
+3
+4
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+NumModifiedRows: 3
+NumDeletedRows: 3
+====
+---- QUERY
+update single_col set i = 1 where i = 2
+---- DML_RESULTS: single_col
+1
+3
+4
+---- TYPES
+INT
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+create table ice_alltypes (bool_col boolean, int_col int, bigint_col bigint, float_col float,
+  double_col double, dec_10_0_col decimal(10,0), dec_8_2_col decimal(8,2), date_col date,
+  timestamp_col timestamp, string_col string, binary_col binary)
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into ice_alltypes values (false, 0, 0, 0, 0, 0, 0, '2000-01-01', '2000-01-01 00:00:00',
+ 'zero', cast('zerob' as binary));
+---- DML_RESULTS: ice_alltypes
+false,0,0,0,0,0,0.00,2000-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 0
+====
+---- QUERY
+update ice_alltypes set bool_col = true;
+---- DML_RESULTS: ice_alltypes
+true,0,0,0,0,0,0.00,2000-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set int_col = 1;
+---- DML_RESULTS: ice_alltypes
+true,1,0,0,0,0,0.00,2000-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set bigint_col = 1;
+---- DML_RESULTS: ice_alltypes
+true,1,1,0,0,0,0.00,2000-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set float_col = 1;
+---- DML_RESULTS: ice_alltypes
+true,1,1,1,0,0,0.00,2000-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set double_col = 1;
+---- DML_RESULTS: ice_alltypes
+true,1,1,1,1,0,0.00,2000-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set dec_10_0_col = 1;
+---- DML_RESULTS: ice_alltypes
+true,1,1,1,1,1,0.00,2000-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set dec_8_2_col = 1;
+---- DML_RESULTS: ice_alltypes
+true,1,1,1,1,1,1.00,2000-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set date_col = '2001-01-01';
+---- DML_RESULTS: ice_alltypes
+true,1,1,1,1,1,1.00,2001-01-01,2000-01-01 00:00:00,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set timestamp_col = '2001-01-01 01:01:01';
+---- DML_RESULTS: ice_alltypes
+true,1,1,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'zero','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set string_col = 'one';
+---- DML_RESULTS: ice_alltypes
+true,1,1,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'one','zerob'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set binary_col = cast('oneb' as binary);
+---- DML_RESULTS: ice_alltypes
+true,1,1,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'one','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set bigint_col = 33, int_col = 3, string_col = 'three';
+---- DML_RESULTS: ice_alltypes
+true,3,33,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'three','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set bigint_col = 33, int_col = 3, string_col = 'three';
+---- DML_RESULTS: ice_alltypes
+true,3,33,1,1,1,1.00,2001-01-01,2001-01-01 01:01:01,'three','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+update ice_alltypes set dec_10_0_col = 23, dec_8_2_col = cast(123.123 as decimal(8, 2));
+---- DML_RESULTS: ice_alltypes
+true,3,33,1,1,23,123.12,2001-01-01,2001-01-01 01:01:01,'three','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 1
+NumDeletedRows: 1
+====
+---- QUERY
+# UPDATE table based on a reference table.
+insert into ice_alltypes values (false, 0, 0, 0, 0, 0, 0, '2000-01-01', '2000-01-01 00:00:00',
+ 'zero', cast('zerob' as binary));
+create table ref_table (i int, bi bigint, s string, d date);
+insert into ref_table values (0, 111, 'IMPALA', '2023-11-07'), (3, 222, 'ICEBERG', '2023-11-08');
+update ice_alltypes set bigint_col=bi, string_col=s, date_col=d from ice_alltypes, ref_table where int_col = i;
+---- DML_RESULTS: ice_alltypes
+false,0,111,0,0,0,0.00,2023-11-07,2000-01-01 00:00:00,'IMPALA','zerob'
+true,3,222,1,1,23,123.12,2023-11-08,2001-01-01 01:01:01,'ICEBERG','oneb'
+---- TYPES
+BOOLEAN,INT,BIGINT,FLOAT,DOUBLE,DECIMAL,DECIMAL,DATE,TIMESTAMP,STRING,BINARY
+---- RUNTIME_PROFILE
+NumModifiedRows: 2
+NumDeletedRows: 2
+====
+---- QUERY
+# If the JOIN in UPDATE has multiple matches Impala should raise an error.
+insert into ref_table values (0, 1111, 'IIMMPPAALLAA', '2023-12-01');
+update ice_alltypes set bigint_col=bi, string_col=s, date_col=d from ice_alltypes, ref_table where int_col = i;
+---- CATCH
+Duplicated row in DELETE sink.
+====
+---- QUERY
+create table ice_id_partitioned (i int, p int, s string)
+partitioned by spec(p)
+stored by iceberg
+tblproperties('format-version'='2');
+insert into ice_id_partitioned values
+(1, 0, 'impala'), (2, 0, 'iceberg'), (3, 0, 'hive'), (4, 1, 'spark'),
+(5, 2, 'kudu');
+update ice_id_partitioned set s='Impala' where i = 1;
+update ice_id_partitioned set s='Kudu' where i = 5;
+---- DML_RESULTS: ice_id_partitioned
+1,0,'Impala'
+2,0,'iceberg'
+3,0,'hive'
+4,1,'spark'
+5,2,'Kudu'
+---- TYPES
+INT,INT,STRING
+====
+---- QUERY
+show files in ice_id_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_id_partitioned/data/p=0/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_id_partitioned/data/p=0/delete-.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_id_partitioned/data/p=1/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_id_partitioned/data/p=2/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_id_partitioned/data/p=2/delete-.*parq','.*','','$ERASURECODE_POLICY'
+---- RESULTS: VERIFY_IS_NOT_IN
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_id_partitioned/data/p=1/delete-.*parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# Negative test for UPDATE part 3:
+# updating partition column AND right side is non-constant value AND we have a FROM clause
+# For such operations, if there are multiple matches in the JOIN, the duplicated records can
+# get shuffled independently to different sink operators, therefore they cannot check the
+# presence of duplicates. For statements like this we need to raise an error during analysis.
+UPDATE ice_id_partitioned set p = cast(ref_table.bi as int)
+FROM ice_id_partitioned, ref_table
+WHERE ice_id_partitioned.i = ref_table.i;
+---- CATCH
+AnalysisException: Left-hand side in assignment 'p = CAST(ref_table.bi AS INT)' refers to a partitioning column
+====
+---- QUERY
+create table ice_bucket_transform(i int, str string, bi bigint, ts timestamp)
+partitioned by spec(bucket(91, str), bucket(199, bi), bucket(403, ts))
+stored by iceberg
+tblproperties('format-version'='2');
+insert into ice_bucket_transform values (1, 'a fairly long string value', 1000, '1999-09-19 12:00:01'),
+  (2, 'bbb', 2030, '2001-01-01 00:00:00'), (3, 'cccccccccccccccccccccccccccccccccccccccc', -123, '2023-11-24 17:44:30');
+update ice_bucket_transform set i = cast(i * 2 as int);
+---- DML_RESULTS: ice_bucket_transform
+2,'a fairly long string value',1000,1999-09-19 12:00:01
+4,'bbb',2030,2001-01-01 00:00:00
+6,'cccccccccccccccccccccccccccccccccccccccc',-123,2023-11-24 17:44:30
+---- TYPES
+INT,STRING,BIGINT,TIMESTAMP
+====
+---- QUERY
+show files in ice_bucket_transform;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_bucket_transform/data/str_bucket=38/bi_bucket=103/ts_bucket=204/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_bucket_transform/data/str_bucket=38/bi_bucket=103/ts_bucket=204/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_bucket_transform/data/str_bucket=54/bi_bucket=108/ts_bucket=277/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_bucket_transform/data/str_bucket=54/bi_bucket=108/ts_bucket=277/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_bucket_transform/data/str_bucket=58/bi_bucket=34/ts_bucket=104/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_bucket_transform/data/str_bucket=58/bi_bucket=34/ts_bucket=104/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+create table ice_time_transforms_timestamp(id decimal(8, 4), ts1 timestamp, ts2 timestamp, ts3 timestamp, ts4 timestamp)
+partitioned by spec(year(ts1), month(ts2), day(ts3), hour(ts4))
+stored by iceberg
+tblproperties('format-version'='2');
+insert into ice_time_transforms_timestamp values
+  (0.75, '2001-01-01 01:01:01', '2001-01-01 01:01:01', '2001-01-01 01:01:01', '2001-01-01 01:01:01'),
+  (1.2345, '2023-11-24 18:02:00', '2023-11-24 18:02:00', '2023-11-24 18:02:00', '2023-11-24 18:02:00'),
+  (999.9999, '2199-12-31 23:59:59', '2199-12-31 23:59:59', '2199-12-31 23:59:59', '2199-12-31 23:59:59');
+update ice_time_transforms_timestamp set id = cast(id * 2 as decimal(8, 4));
+---- DML_RESULTS: ice_time_transforms_timestamp
+1.5000,2001-01-01 01:01:01,2001-01-01 01:01:01,2001-01-01 01:01:01,2001-01-01 01:01:01
+2.4690,2023-11-24 18:02:00,2023-11-24 18:02:00,2023-11-24 18:02:00,2023-11-24 18:02:00
+1999.9998,2199-12-31 23:59:59,2199-12-31 23:59:59,2199-12-31 23:59:59,2199-12-31 23:59:59
+---- TYPES
+DECIMAL,TIMESTAMP,TIMESTAMP,TIMESTAMP,TIMESTAMP
+====
+---- QUERY
+show files in ice_time_transforms_timestamp;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_timestamp/data/ts1_year=2001/ts2_month=2001-01/ts3_day=2001-01-01/ts4_hour=2001-01-01-01/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_timestamp/data/ts1_year=2001/ts2_month=2001-01/ts3_day=2001-01-01/ts4_hour=2001-01-01-01/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_timestamp/data/ts1_year=2023/ts2_month=2023-11/ts3_day=2023-11-24/ts4_hour=2023-11-24-18/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_timestamp/data/ts1_year=2023/ts2_month=2023-11/ts3_day=2023-11-24/ts4_hour=2023-11-24-18/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_timestamp/data/ts1_year=2199/ts2_month=2199-12/ts3_day=2199-12-31/ts4_hour=2199-12-31-23/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_timestamp/data/ts1_year=2199/ts2_month=2199-12/ts3_day=2199-12-31/ts4_hour=2199-12-31-23/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+create table ice_time_transforms_date(id decimal(8, 4), ts1 DATE, ts2 DATE, ts3 DATE)
+partitioned by spec(year(ts1), month(ts2), day(ts3))
+stored by iceberg
+tblproperties('format-version'='2');
+insert into ice_time_transforms_date values
+  (0.75, '2001-01-01 01:01:01', '2001-01-01 01:01:01', '2001-01-01 01:01:01'),
+  (1.2345, '2023-11-24 18:02:00', '2023-11-24 18:02:00', '2023-11-24 18:02:00'),
+  (999.9999, '2199-12-31 23:59:59', '2199-12-31 23:59:59', '2199-12-31 23:59:59');
+update ice_time_transforms_date set id = cast(id * 2 as decimal(8, 4));
+---- DML_RESULTS: ice_time_transforms_date
+1.5000,2001-01-01,2001-01-01,2001-01-01
+2.4690,2023-11-24,2023-11-24,2023-11-24
+1999.9998,2199-12-31,2199-12-31,2199-12-31
+---- TYPES
+DECIMAL,DATE,DATE,DATE
+====
+---- QUERY
+show files in ice_time_transforms_date;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_date/data/ts1_year=2001/ts2_month=2001-01/ts3_day=2001-01-01/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_date/data/ts1_year=2001/ts2_month=2001-01/ts3_day=2001-01-01/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_date/data/ts1_year=2023/ts2_month=2023-11/ts3_day=2023-11-24/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_date/data/ts1_year=2023/ts2_month=2023-11/ts3_day=2023-11-24/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_date/data/ts1_year=2199/ts2_month=2199-12/ts3_day=2199-12-31/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_time_transforms_date/data/ts1_year=2199/ts2_month=2199-12/ts3_day=2199-12-31/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+create table ice_part_transforms (i int, ts timestamp, s string, p bigint)
+partitioned by spec(day(ts), truncate(1, s), truncate(1000, p))
+stored by iceberg
+tblproperties('format-version'='2');
+insert into ice_part_transforms values (1, '2023-11-13 18:07:05', 'blue', 1234),
+ (2, '2023-11-13 18:07:23', 'gray', 2500), (3, '2023-11-14 19:07:05', 'green', 1700),
+ (4, '2023-11-01 00:11:11', 'black', 722);
+show files in ice_part_transforms;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-13/s_trunc=b/p_trunc=1000/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-13/s_trunc=g/p_trunc=2000/(?!delete-).*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-14/s_trunc=g/p_trunc=1000/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-01/s_trunc=b/p_trunc=0/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+update ice_part_transforms set i = cast(i*2 as int) where i % 2 = 0;
+---- DML_RESULTS: ice_part_transforms
+1,2023-11-13 18:07:05,'blue',1234
+4,2023-11-13 18:07:23,'gray',2500
+3,2023-11-14 19:07:05,'green',1700
+8,2023-11-01 00:11:11,'black',722
+---- TYPES
+INT,TIMESTAMP,STRING,BIGINT
+====
+---- QUERY
+show files in ice_part_transforms;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-13/s_trunc=b/p_trunc=1000/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-13/s_trunc=g/p_trunc=2000/(?!delete-).*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-13/s_trunc=g/p_trunc=2000/delete-.*parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-14/s_trunc=g/p_trunc=1000/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-01/s_trunc=b/p_trunc=0/(?!delete-).*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-01/s_trunc=b/p_trunc=0/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+---- RESULTS: VERIFY_IS_NOT_IN
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-13/s_trunc=b/p_trunc=1000/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transforms/data/ts_day=2023-11-14/s_trunc=g/p_trunc=1000/delete-.*.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
index f7e5d9dfc..dfa9cbf10 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -715,6 +715,14 @@ select id, bigint_col from functional.alltypesagg order by id limit 10
 INT,BIGINT
 ====
 ---- QUERY
+select * from functional_parquet.iceberg_v2_delete_positional;
+---- RESULTS
+1,'NULL'
+3,'NULL'
+---- TYPES
+BIGINT,STRING
+====
+---- QUERY
 # IMPALA-10554: Updates on masked tables should be blocked.
 insert into functional.alltypestiny partition(year, month) select * from functional.alltypes
 ---- CATCH
@@ -739,6 +747,18 @@ optimize table functional_parquet.iceberg_partitioned
 AuthorizationException: User '$USER' does not have privileges to execute 'INSERT' on: functional_parquet.iceberg_partitioned
 ====
 ---- QUERY
+# Deletes on masked tables should be blocked.
+delete from functional_parquet.iceberg_v2_delete_positional where id = 2;
+---- CATCH
+AuthorizationException: User '$USER' does not have privileges to access: functional_parquet.iceberg_v2_delete_positional
+====
+---- QUERY
+# Updates on masked tables should be blocked.
+update functional_parquet.iceberg_v2_delete_positional set `data` = concat(`data`, 'a');
+---- CATCH
+AuthorizationException: User '$USER' does not have privileges to access: functional_parquet.iceberg_v2_delete_positional
+====
+---- QUERY
 # Select masked INPUT_FILE__NAME plus all cols
 select input__file__name, * from alltypestiny order by id;
 ---- RESULTS
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
index f13992daf..0b23fecb8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_row_filtering.test
@@ -594,6 +594,15 @@ select id, b.item from functional_parquet.complextypestbl t, t.nested_struct.b
 BIGINT,INT
 ====
 ---- QUERY
+select * from functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files;
+---- RESULTS
+1,'a'
+3,'c'
+5,'X'
+---- TYPES
+INT,STRING
+====
+---- QUERY
 # TODO: Row-filtering policy should keep rows with "nested_struct.a is not NULL" on base
 # table 'complextypestbl'. But now we can't apply it since the collection column 'b' is
 # non-relative. Fails these queries until IMPALA-10484 is resolved. See the next query
@@ -634,3 +643,15 @@ compute stats functional.alltypestiny
 ---- CATCH
 AuthorizationException: User '$USER' does not have privileges to execute 'ALTER' on: functional.alltypestiny
 ====
+---- QUERY
+# Deletes should be blocked if the user cannot see all the rows from the table.
+delete from functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files where i = 2;
+---- CATCH
+AuthorizationException: User '$USER' does not have privileges to access: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files
+====
+---- QUERY
+# Updates on masked tables should be blocked.
+update functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files set s = concat(s, 'a');
+---- CATCH
+AuthorizationException: User '$USER' does not have privileges to access: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files
+====
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index bdb14a0e0..33cf061b5 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1486,6 +1486,11 @@ class TestRanger(CustomClusterTestSuite):
         unique_name + str(policy_cnt), user, "functional_parquet", "iceberg_partitioned",
         "id", "MASK_NULL")
       policy_cnt += 1
+      # Add column masking policy to an Iceberg V2 table.
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional_parquet",
+        "iceberg_v2_delete_positional", "data", "MASK_NULL")
+      policy_cnt += 1
       self.execute_query_expect_success(admin_client, "refresh authorization",
                                         user=ADMIN)
       self.run_test_case("QueryTest/ranger_column_masking", vector,
@@ -1656,6 +1661,12 @@ class TestRanger(CustomClusterTestSuite):
           unique_name + str(policy_cnt), user, "functional_parquet", "complextypestbl",
           "nested_struct.a is not NULL")
       policy_cnt += 1
+      # Row-filtering expr on Iceberg table
+      TestRanger._add_row_filtering_policy(
+          unique_name + str(policy_cnt), user, "functional_parquet",
+          "iceberg_v2_positional_not_all_data_files_have_delete_files",
+          "i % 2 = 1")
+      policy_cnt += 1
       admin_client.execute("refresh authorization")
       self.run_test_case("QueryTest/ranger_row_filtering", vector,
                          test_file_vars={'$UNIQUE_DB': unique_database})
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 2ae967d24..8b9c9fe5e 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1237,6 +1237,12 @@ class TestIcebergV2Table(IcebergTestSuite):
       lambda v: v.get_value('table_format').file_format == 'parquet')
     add_exec_option_dimension(cls, 'disable_optimized_iceberg_v2_read', [0, 1])
 
+  def should_run_for_hive(self, vector):
+    # Hive interop tests are very slow. Only run them for a subset of dimensions.
+    if vector.get_value('exec_option')['disable_optimized_iceberg_v2_read'] == 0:
+      return True
+    return False
+
   # The test uses pre-written Iceberg tables where the position delete files refer to
   # the data files via full URI, i.e. they start with 'hdfs://localhost:2050/...'. In the
   # dockerised environment the namenode is accessible on a different hostname/port.
@@ -1283,14 +1289,10 @@ class TestIcebergV2Table(IcebergTestSuite):
   def test_delete_partitioned(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-delete-partitioned', vector,
         unique_database)
-    if IS_HDFS:
-      self._partitioned_hive_tests(unique_database)
+    if IS_HDFS and self.should_run_for_hive(vector):
+      self._delete_partitioned_hive_tests(unique_database)
 
-  def _partitioned_hive_tests(self, db):
-    # Hive needs table property 'format-version' explicitly set
-    for tbl in ["id_part", "trunc_part", "multi_part", "evolve_part", "ice_store_sales"]:
-      self.run_stmt_in_hive(
-          "ALTER TABLE {}.{} SET TBLPROPERTIES('format-version'='2')".format(db, tbl))
+  def _delete_partitioned_hive_tests(self, db):
     hive_output = self.run_stmt_in_hive("SELECT * FROM {}.{} ORDER BY i".format(
         db, "id_part"))
     assert hive_output == "id_part.i,id_part.s\n"
@@ -1362,6 +1364,58 @@ class TestIcebergV2Table(IcebergTestSuite):
     # Test that Hive sees the same rows deleted.
     assert hive_output == "id\n4\n5\n6\n7\n8\n"
 
+  def test_update_basic(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-update-basic', vector,
+        unique_database)
+    if IS_HDFS and self.should_run_for_hive(vector):
+      self._update_basic_hive_tests(unique_database)
+
+  def _update_basic_hive_tests(self, db):
+    def get_hive_results(tbl, order_by_col):
+      stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col)
+      return self.run_stmt_in_hive(stmt).split("\n", 1)[1]
+
+    hive_results = get_hive_results("single_col", "i")
+    assert hive_results == "1\n3\n4\n"
+
+    hive_results = get_hive_results("ice_alltypes", "bool_col")
+    assert hive_results == \
+        "false,0,111,0.0,0.0,0,0.00,2023-11-07,2000-01-01 00:00:00.0,IMPALA,zerob\n" \
+        "true,3,222,1.0,1.0,23,123.12,2023-11-08,2001-01-01 01:01:01.0,ICEBERG,oneb\n"
+
+    hive_results = get_hive_results("ice_id_partitioned", "i")
+    assert hive_results == \
+        "1,0,Impala\n"     \
+        "2,0,iceberg\n"    \
+        "3,0,hive\n"       \
+        "4,1,spark\n"      \
+        "5,2,Kudu\n"
+
+    hive_results = get_hive_results("ice_bucket_transform", "i")
+    assert hive_results == \
+        "2,a fairly long string value,1000,1999-09-19 12:00:01.0\n" \
+        "4,bbb,2030,2001-01-01 00:00:00.0\n" \
+        "6,cccccccccccccccccccccccccccccccccccccccc,-123,2023-11-24 17:44:30.0\n"
+
+    hive_results = get_hive_results("ice_time_transforms_timestamp", "id")
+    assert hive_results == \
+        "1.5000,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0,2001-01-01 01:01:01.0\n" \
+        "2.4690,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0,2023-11-24 18:02:00.0\n" \
+        "1999.9998,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0,2199-12-31 23:59:59.0\n"  # noqa: E501
+
+    hive_results = get_hive_results("ice_time_transforms_date", "id")
+    assert hive_results == \
+        "1.5000,2001-01-01,2001-01-01,2001-01-01\n" \
+        "2.4690,2023-11-24,2023-11-24,2023-11-24\n" \
+        "1999.9998,2199-12-31,2199-12-31,2199-12-31\n"
+
+    hive_results = get_hive_results("ice_part_transforms", "i")
+    assert hive_results == \
+        "1,2023-11-13 18:07:05.0,blue,1234\n" \
+        "3,2023-11-14 19:07:05.0,green,1700\n" \
+        "4,2023-11-13 18:07:23.0,gray,2500\n" \
+        "8,2023-11-01 00:11:11.0,black,722\n"
+
   def test_optimize(self, vector, unique_database):
     tbl_name = unique_database + ".optimize_iceberg"
     self.execute_query("""create table {0} (i int)
diff --git a/tests/stress/test_update_stress.py b/tests/stress/test_update_stress.py
new file mode 100644
index 000000000..5510dc1db
--- /dev/null
+++ b/tests/stress/test_update_stress.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+from builtins import map, range
+import pytest
+import random
+import time
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.parametrize import UniqueDatabase
+from tests.stress.stress_util import run_tasks, Task
+
+
+# Stress test for concurrent UPDATE operations against Iceberg tables.
+class TestIcebergUpdateStress(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'targeted-stress'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestIcebergUpdateStress, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: (v.get_value('table_format').file_format == 'parquet'
+            and v.get_value('table_format').compression_codec == 'snappy'))
+
+  def _impala_role_concurrent_writer(self, tbl_name, col, num_updates):
+    """Increments values in column 'total' and in the column which is passed in 'col'."""
+    target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
+    impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    update_cnt = 0
+    while update_cnt < num_updates:
+      try:
+        impalad_client.execute(
+            "update {0} set total = total + 1, {1} = {1} + 1".format(tbl_name, col))
+        update_cnt += 1
+        # Sleep after a succesful operation.
+        time.sleep(random.random())
+      except Exception:
+        # Exceptions are expected due to concurrent operations.
+        pass
+    impalad_client.close()
+
+  def _impala_role_concurrent_checker(self, tbl_name, target_total):
+    """Checks if the table's invariant is true. The invariant is that the equation
+    'total == a + b + c' is true. Returns 'total'."""
+    def verify_result_set(result):
+      assert len(result.data) == 1
+      line = result.data[0]
+      [total, a, b, c] = list(map(int, (line.split('\t'))))
+      assert total == a + b + c
+      return total
+
+    target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
+    impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    total = 0
+    while total < target_total:
+      result = impalad_client.execute("select * from %s" % tbl_name)
+      new_total = verify_result_set(result)
+      assert total <= new_total
+      total = new_total
+      time.sleep(random.random())
+    impalad_client.close()
+
+  @pytest.mark.stress
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_iceberg_updates(self, unique_database):
+    """Issues UPDATE statements against multiple impalads in a way that some
+    invariants must be true when a spectator process inspects the table. E.g.
+    the value of a column should be equal to the sum of other columns."""
+    tbl_name = "%s.test_concurrent_updates" % unique_database
+    self.client.set_configuration_option("SYNC_DDL", "true")
+    self.client.execute("""create table {0}
+        (total bigint, a bigint, b bigint, c bigint)
+        stored as iceberg
+        tblproperties('format-version'='2')""".format(tbl_name,))
+    self.client.execute(
+        "insert into {} values (0, 0, 0, 0)".format(tbl_name))
+
+    num_checkers = 2
+    cols = 3
+    updates_per_col = 30
+    target_total = updates_per_col * cols
+
+    updater_a = Task(self._impala_role_concurrent_writer, tbl_name, "a", updates_per_col)
+    updater_b = Task(self._impala_role_concurrent_writer, tbl_name, "b", updates_per_col)
+    updater_c = Task(self._impala_role_concurrent_writer, tbl_name, "c", updates_per_col)
+    checkers = [Task(self._impala_role_concurrent_checker, tbl_name, target_total)
+                for i in range(0, num_checkers)]
+    run_tasks([updater_a, updater_b, updater_c] + checkers)


(impala) 03/03: IMPALA-12398: Fix Ranger role not exists when altering db/table/view owner to a role

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d9c067aa89313547c1d8dbf3840ebe308726f8c3
Author: jichen0919 <ji...@163.com>
AuthorDate: Mon Sep 25 23:02:36 2023 +0800

    IMPALA-12398: Fix Ranger role not exists when altering db/table/view owner to a role
    
    When Role '<ROLE_NAME>' is created with Ranger authorization enabled,
    if 'ALTER TABLE <TABLE_NAME> SET OWNER ROLE <ROLE_NAME>' statement is
    executed to assign role as the owner of the table, it will throw
    AnalysisException:Role '<ROLE_NAME>' does not exist.
    
    Before this patch, given the ALTER DATABASE/TABLE/VIEW SET OWNER ROLE
    statement, Impala always checked the existence of the given role in
    its AuthorizationPolicy. However, when the support for role-related
    statements with Ranger was added in IMPALA-10211, we only added the
    roles in RangerImpalaPlugin instead of AuthorizationPolicy.
    Therefore, the statement above  would fail even though an authorized
    user tries to set the owner to an existing role in RangerImpalaPlugin.
    
    This patch will directly use ranger impala plugin to check the
    existence of the role, instead of using AuthorizationPolicy object.
    
    Tests:
     - Pass unit tests. test method testAlterView in AuthorizationStmtTest
       is updated accordingly.
     - Pass e2e tests. test method _test_ownership in test_ranger.py is
       updated to cover the new implementation.
     - Pass core tests with ranger enabled.
    
    Change-Id: I2b029bdb90111dbd0eab5189360cc81090225cda
    Reviewed-on: http://gerrit.cloudera.org:8080/20508
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/analysis/AlterDbSetOwnerStmt.java       |  12 ++-
 .../analysis/AlterTableOrViewSetOwnerStmt.java     |  12 ++-
 .../impala/authorization/AuthorizationChecker.java |   5 +
 .../authorization/NoopAuthorizationFactory.java    |   6 ++
 .../ranger/RangerAuthorizationChecker.java         |   5 +
 .../impala/authorization/ranger/RangerUtil.java    |   9 ++
 .../authorization/AuthorizationStmtTest.java       |  12 +++
 .../authorization/AuthorizationTestBase.java       |   7 +-
 .../CatalogServiceTestCatalogWithRanger.java       | 104 +++++++++++++++++++++
 .../org/apache/impala/common/FrontendTestBase.java |   5 +
 .../impala/testutil/CatalogServiceTestCatalog.java |  21 +++--
 tests/authorization/test_ranger.py                 |  13 +++
 12 files changed, 197 insertions(+), 14 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java
index 84cd0b7c1..01c960c19 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterDbSetOwnerStmt.java
@@ -18,6 +18,8 @@
 package org.apache.impala.analysis;
 
 import com.google.common.base.Preconditions;
+import org.apache.impala.authorization.AuthorizationChecker;
+import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterDbParams;
@@ -55,9 +57,13 @@ public class AlterDbSetOwnerStmt extends AlterDbStmt {
     // We don't allow assigning to a non-existent role because Ranger should know about
     // all roles. Ranger does not track all users so we allow assigning to a user
     // that Ranger doesn't know about yet.
-    if (analyzer.isAuthzEnabled() && owner_.getOwnerType() == TOwnerType.ROLE
-        && analyzer.getCatalog().getAuthPolicy().getRole(ownerName) == null) {
-      throw new AnalysisException(String.format("Role '%s' does not exist.", ownerName));
+    if (analyzer.isAuthzEnabled() && owner_.getOwnerType() == TOwnerType.ROLE) {
+      AuthorizationFactory authzFactory = analyzer.getAuthzFactory();
+      AuthorizationChecker authzChecker = authzFactory.newAuthorizationChecker();
+      if (!authzChecker.roleExists(ownerName)) {
+        throw new AnalysisException(
+            String.format("Role '%s' does not exist.", ownerName));
+      }
     }
     // Set the servername here if authorization is enabled because analyzer_ is not
     // available in the toThrift() method.
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java
index baed48408..25d9ecb12 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableOrViewSetOwnerStmt.java
@@ -18,6 +18,8 @@
 package org.apache.impala.analysis;
 
 import com.google.common.base.Preconditions;
+import org.apache.impala.authorization.AuthorizationChecker;
+import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TAlterTableOrViewSetOwnerParams;
@@ -52,9 +54,13 @@ public abstract class AlterTableOrViewSetOwnerStmt extends AlterTableStmt {
     // We don't allow assigning to a non-existent role because Ranger should know about
     // all roles. Ranger does not track all users so we allow assigning to a user
     // that Ranger doesn't know about yet.
-    if (analyzer.isAuthzEnabled() && owner_.getOwnerType() == TOwnerType.ROLE
-        && analyzer.getCatalog().getAuthPolicy().getRole(ownerName) == null) {
-      throw new AnalysisException(String.format("Role '%s' does not exist.", ownerName));
+    if (analyzer.isAuthzEnabled() && owner_.getOwnerType() == TOwnerType.ROLE) {
+      AuthorizationFactory authzFactory = analyzer.getAuthzFactory();
+      AuthorizationChecker authzChecker = authzFactory.newAuthorizationChecker();
+      if (!authzChecker.roleExists(ownerName)) {
+        throw new AnalysisException(
+            String.format("Role '%s' does not exist.", ownerName));
+      }
     }
 
     tableName_ = analyzer.getFqTableName(tableName_);
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
index 10d698b2c..d64f1574f 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
@@ -112,4 +112,9 @@ public interface AuthorizationChecker {
    * This method is to be executed after AnalysisContext#analyze() is completed.
    */
   void postAnalyze(AuthorizationContext authzCtx);
+
+  /**
+   * This method returns whether the role exists for given role
+   */
+  boolean roleExists(String roleName);
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
index e3097f6a5..23733f680 100644
--- a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
@@ -239,6 +239,12 @@ public class NoopAuthorizationFactory implements AuthorizationFactory {
       @Override
       public void postAnalyze(AuthorizationContext authzCtx) {
       }
+
+      @Override
+      public boolean roleExists(String roleName) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", ClassUtil.getMethodName()));
+      }
     };
   }
 
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
index 046e3e99e..56f009f0c 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
@@ -754,4 +754,9 @@ public class RangerAuthorizationChecker extends BaseAuthorizationChecker {
 
   @VisibleForTesting
   public RangerImpalaPlugin getRangerImpalaPlugin() { return plugin_; }
+
+  @Override
+  public boolean roleExists(String roleName) {
+    return RangerUtil.roleExists(plugin_, roleName);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerUtil.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerUtil.java
index a7bf26d50..022236c11 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerUtil.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerUtil.java
@@ -19,13 +19,16 @@ package org.apache.impala.authorization.ranger;
 
 import com.google.common.collect.Sets;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.impala.authorization.AuthorizationFactory;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TPrivilege;
+import org.apache.ranger.plugin.model.RangerRole;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 
 /**
  * Collection of static functions to support Apache Ranger implementation
@@ -119,4 +122,10 @@ public class RangerUtil {
       throws Exception {
     plugin.getAllRoles(user, null);
   }
+
+  public static boolean roleExists(RangerImpalaPlugin plugin, String roleName) {
+    Set<RangerRole> roleSet = plugin.getRoles().getRangerRoles();
+    if (roleSet == null) return false;
+    return roleSet.stream().anyMatch(r -> r.getName().equals(roleName));
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
index 5e224f318..7c98bec3b 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationStmtTest.java
@@ -2561,6 +2561,18 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
       authzCatalog_.removeRole("foo_owner");
     }
 
+    // check ALTER VIEW SET OWNER ROLE should throw an AnalysisException.
+    // if role is removed
+    boolean exceptionThrown = false;
+    try {
+      parseAndAnalyze("alter view functional.alltypes_view set owner role foo_owner",
+          authzCtx_, frontend_);
+    } catch (AnalysisException e) {
+      exceptionThrown = true;
+      assertEquals("Role 'foo_owner' does not exist.", e.getLocalizedMessage());
+    }
+    assertTrue(exceptionThrown);
+
     // Database does not exist.
     authorize("alter view nodb.alltypes_view as select 1")
         .error(alterError("nodb"))
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
index e0db29e3a..b9b34c6c0 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTestBase.java
@@ -27,13 +27,13 @@ import org.apache.impala.authorization.ranger.RangerAuthorizationFactory;
 import org.apache.impala.authorization.ranger.RangerCatalogdAuthorizationManager;
 import org.apache.impala.authorization.ranger.RangerImpalaPlugin;
 import org.apache.impala.authorization.ranger.RangerImpalaResourceBuilder;
-import org.apache.impala.catalog.Role;
 import org.apache.impala.catalog.ScalarFunction;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.service.Frontend;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.testutil.ImpaladTestCatalog;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TDescribeOutputStyle;
@@ -112,6 +112,8 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
   protected final RangerImpalaPlugin rangerImpalaPlugin_;
   protected final RangerRESTClient rangerRestClient_;
 
+  protected final CatalogServiceTestCatalog testCatalog_;
+
   public AuthorizationTestBase(AuthorizationProvider authzProvider)
       throws ImpalaException {
     authzProvider_ = authzProvider;
@@ -122,7 +124,8 @@ public abstract class AuthorizationTestBase extends FrontendTestBase {
             SERVER_NAME, null, null, null);
         authzFactory_ = createAuthorizationFactory(authzProvider);
         authzCtx_ = createAnalysisCtx(authzFactory_, user_.getName());
-        authzCatalog_ = new ImpaladTestCatalog(authzFactory_);
+        testCatalog_ = CatalogServiceTestCatalogWithRanger.createWithAuth(authzFactory_);
+        authzCatalog_ = new ImpaladTestCatalog(testCatalog_);
         authzFrontend_ = new Frontend(authzFactory_, authzCatalog_);
         rangerImpalaPlugin_ =
             ((RangerAuthorizationChecker) authzFrontend_.getAuthzChecker())
diff --git a/fe/src/test/java/org/apache/impala/authorization/CatalogServiceTestCatalogWithRanger.java b/fe/src/test/java/org/apache/impala/authorization/CatalogServiceTestCatalogWithRanger.java
new file mode 100644
index 000000000..bd3d28888
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/authorization/CatalogServiceTestCatalogWithRanger.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.impala.authorization;
+
+import org.apache.impala.authorization.ranger.RangerAuthorizationChecker;
+import org.apache.impala.authorization.ranger.RangerImpalaPlugin;
+import org.apache.impala.authorization.ranger.RangerUtil;
+import org.apache.impala.catalog.MetaStoreClientPool;
+import org.apache.impala.catalog.Role;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.ranger.plugin.model.RangerRole;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class CatalogServiceTestCatalogWithRanger extends CatalogServiceTestCatalog {
+  private static final String RANGER_ADMIN_USER = "admin";
+  private RangerImpalaPlugin rangerImpalaPlugin_;
+  protected CatalogServiceTestCatalogWithRanger(boolean loadInBackground,
+      int numLoadingThreads, MetaStoreClientPool metaStoreClientPool)
+      throws ImpalaException {
+    super(loadInBackground, numLoadingThreads, metaStoreClientPool);
+  }
+
+  public static CatalogServiceTestCatalog create() {
+    return createWithAuth(new NoopAuthorizationFactory());
+  }
+
+  public static CatalogServiceTestCatalog createWithAuth(AuthorizationFactory factory) {
+    return createWithAuth(factory, new BaseTestCatalogSupplier() {
+      @Override
+      public CatalogServiceTestCatalog get() throws ImpalaException {
+        CatalogServiceTestCatalogWithRanger cs;
+        MetaStoreClientPool metaStoreClientPool = new MetaStoreClientPool(0, 0);
+        cs = new CatalogServiceTestCatalogWithRanger(false, 16, metaStoreClientPool);
+        RangerImpalaPlugin rangerImpalaPlugin =
+            ((RangerAuthorizationChecker) factory.newAuthorizationChecker())
+                .getRangerImpalaPlugin();
+        cs.setRangerImpalaPlugin(rangerImpalaPlugin);
+        return cs;
+      }
+    });
+  }
+
+  public void setRangerImpalaPlugin(RangerImpalaPlugin rangerImpalaPlugin_) {
+    this.rangerImpalaPlugin_ = rangerImpalaPlugin_;
+  }
+
+  @Override
+  public Role addRole(String roleName, Set<String> grantGroups) {
+    Role authRole = null;
+    RangerRole role = new RangerRole();
+    role.setName(roleName);
+    role.setCreatedByUser(RANGER_ADMIN_USER);
+    List<RangerRole.RoleMember> roleMemberList =
+        grantGroups.stream()
+            .map(s -> new RangerRole.RoleMember(s, s.equals(RANGER_ADMIN_USER)))
+            .collect(Collectors.toList());
+    role.setGroups(roleMemberList);
+    try {
+      rangerImpalaPlugin_.createRole(role, null);
+      rangerImpalaPlugin_.refreshPoliciesAndTags();
+      authRole = super.addRole(roleName, grantGroups);
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    }
+    return authRole;
+  }
+
+  @Override
+  public Role removeRole(String roleName) {
+    Role authRole = null;
+    try {
+      RangerUtil.validateRangerAdmin(rangerImpalaPlugin_, RANGER_ADMIN_USER);
+      rangerImpalaPlugin_.dropRole(RANGER_ADMIN_USER, roleName, null);
+      // need to invoke plugin to sync policy to avoid stale roles
+      // still exists in plugin cache
+      rangerImpalaPlugin_.refreshPoliciesAndTags();
+      authRole = super.removeRole(roleName);
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    }
+    return authRole;
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 9983fc644..a51aed0d8 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -440,6 +440,11 @@ public class FrontendTestBase extends AbstractFrontendTest {
           public void postAnalyze(AuthorizationContext authzCtx) {
           }
 
+          @Override
+          public boolean roleExists(String roleName) {
+            return catalog_.getAuthPolicy().getRole(roleName) != null;
+          }
+
           @Override
           public AuthorizationContext createAuthorizationContext(boolean doAudits,
               String sqlStmt, TSessionState sessionState,
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index cba20cbb7..e91678b3a 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -45,7 +45,7 @@ import java.util.UUID;
  */
 public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
   private CatalogOpExecutor opExecutor_;
-  private CatalogServiceTestCatalog(boolean loadInBackground, int numLoadingThreads,
+  protected CatalogServiceTestCatalog(boolean loadInBackground, int numLoadingThreads,
       MetaStoreClientPool metaStoreClientPool) throws ImpalaException {
     super(loadInBackground, numLoadingThreads, System.getProperty("java.io.tmpdir"),
         metaStoreClientPool);
@@ -57,6 +57,10 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
     rd.run();
   }
 
+  public interface BaseTestCatalogSupplier {
+    public abstract CatalogServiceTestCatalog get() throws ImpalaException;
+  }
+
   public static CatalogServiceTestCatalog create() {
     return createWithAuth(new NoopAuthorizationFactory());
   }
@@ -66,13 +70,19 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
    * authorization config.
    */
   public static CatalogServiceTestCatalog createWithAuth(AuthorizationFactory factory) {
+    return createWithAuth(factory,
+        () -> new CatalogServiceTestCatalog(false, 16, new MetaStoreClientPool(0, 0)));
+  }
+
+  public static CatalogServiceTestCatalog createWithAuth(
+      AuthorizationFactory factory, BaseTestCatalogSupplier catalogSupplier) {
     FeSupport.loadLibrary();
     CatalogServiceTestCatalog cs;
     try {
       if (MetastoreShim.getMajorVersion() > 2) {
         MetastoreShim.setHiveClientCapabilities();
       }
-      cs = new CatalogServiceTestCatalog(false, 16, new MetaStoreClientPool(0, 0));
+      cs = catalogSupplier.get();
       cs.setAuthzManager(factory.newAuthorizationManager(cs));
       cs.setMetastoreEventProcessor(NoOpEventProcessor.getInstance());
       cs.setCatalogMetastoreServer(NoOpCatalogMetastoreServer.INSTANCE);
@@ -80,15 +90,14 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
           new NoopAuthorizationFactory().getAuthorizationConfig(),
           new NoopAuthorizationFactory.NoopAuthorizationManager(),
           new TestHiveJavaFunctionFactory()));
-      cs.setEventFactoryForSyncToLatestEvent(new EventFactoryForSyncToLatestEvent(
-          cs.getCatalogOpExecutor()));
+      cs.setEventFactoryForSyncToLatestEvent(
+          new EventFactoryForSyncToLatestEvent(cs.getCatalogOpExecutor()));
       cs.reset();
     } catch (ImpalaException e) {
       throw new IllegalStateException(e.getMessage(), e);
     }
     return cs;
   }
-
   /**
    * Creates a transient test catalog instance backed by an embedded HMS derby database on
    * the local filesystem. The derby database is created from scratch and has no table
@@ -119,7 +128,7 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
   @Override
   public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
 
-  private void setCatalogOpExecutor(CatalogOpExecutor opExecutor) {
+  protected void setCatalogOpExecutor(CatalogOpExecutor opExecutor) {
     opExecutor_ = opExecutor;
   }
 
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 33cf061b5..db117004f 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1290,6 +1290,7 @@ class TestRanger(CustomClusterTestSuite):
     """Tests ownership privileges for databases and tables with ranger along with
     some known quirks in the implementation."""
     test_user = getuser()
+    test_role = 'test_role'
     test_db = "test_ranger_ownership_" + get_random_id(5).lower()
     # Create a test database as "admin" user. Owner is set accordingly.
     self._run_query_as_user("create database {0}".format(test_db), ADMIN, True)
@@ -1335,6 +1336,18 @@ class TestRanger(CustomClusterTestSuite):
       # Change the table owner back to admin.
       self._run_query_as_user(
           "alter table {0}.foo set owner user {1}".format(test_db, ADMIN), ADMIN, True)
+      # create role before test begin.
+      self._run_query_as_user("CREATE ROLE {0}".format(test_role), ADMIN, True)
+      # test alter table owner to role statement, expect success result.
+      stmt = "alter table {0}.foo set owner role {1}".format(test_db, test_role)
+      self._run_query_as_user(stmt, ADMIN, True)
+      # drop the role.
+      self._run_query_as_user("DROP ROLE {0}".format(test_role), ADMIN, True)
+      # alter table to a non-exist role, expect error showing "role doesn't exist".
+      stmt = "alter table {0}.foo set owner role {1}".format(test_db, test_role)
+      result = self._run_query_as_user(stmt, ADMIN, False)
+      err = "Role '{0}' does not exist.".format(test_role)
+      assert err in str(result)
       # test_user should not be authorized to run the queries anymore.
       result = self._run_query_as_user(
           "select * from {0}.foo".format(test_db), test_user, False)