You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/18 19:38:37 UTC

(impala) branch master updated: IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 68fe57ff8 IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
68fe57ff8 is described below

commit 68fe57ff8492a7afdf14a62cabd3e2b0fcade9d1
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Dec 1 18:31:47 2023 +0100

    IMPALA-12313: (part 3) Add UPDATE support for Iceberg tables.
    
    Part 2 had some limitations, most importantly it could not update
    Iceberg tables if any of the following were true:
     * UPDATE value of partitioning column
     * UPDATE table that went through partition evolution
     * Table has SORT BY properties
    
    The problem with partitions is that the delete record and new
    data record might belong to different partitions and records
    are shuffled across based on the partitions of the delete
    records, hence the data files might not get written efficiently.
    
    The problem with SORT BY properties, is that we need to write
    the position delete files ordered by (file_path, position).
    
    To address the above problems, this patch introduces a new
    backend operator: IcebergBufferedDeleteSink. This new operator
    extracts and aggregates the delete record information from
    the incoming row batches, then in FlushFinal it orders the
    position delete records and writes them out to files. This
    mechanism is similar to Hive's approach:
    https://github.com/apache/hive/pull/3251
    
    IcebergBufferedDeleteSink cannot spill to disk, so it can only
    run if there's enough memory to store the delete records. Paths
    are stored only once, and the int64_t positions are stored in
    a vector, so updating 100 Million records per node should require
    around 800MBs + (100K) filepaths ~= 820 MBs of memory per node.
    Spilling could be added later, but currently the need for it is not
    too realistic.
    
    Now records can get shuffled around based on the new data records'
    partition values, and the SORT operator sorts the records based on
    the SORT BY properties.
    
    There's only one case we don't allow the UPDATE statement:
     * UPDATE partition column AND
     * Right-hand side of assignment is non-constant expression AND
     * UPDATE statement has a JOIN
    
    When all of the above conditions meet, it would be possible to
    have an incorrect JOIN condition that has multiple matches for the
    data records, then the duplicated records would be shuffled
    independently (based on the new partition value) to different
    backend SINKs, and the different backend SINK would not be able
    to detect the duplicates.
    
    If any of the above conditions was false, then the duplicated records
    would be shuffled together to the same SINK, that could do the
    duplicate check.
    
    This patch also moves some code from IcebergDeleteSink to the
    newly introduced IcebergDeleteSinkBase.
    
    Testing:
     * planner tests
     * e2e tests
     * Impala/Hive interop tests
    
    Change-Id: I2bb97b4454165a292975d88dc9c23adb22ff7315
    Reviewed-on: http://gerrit.cloudera.org:8080/20760
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/CMakeLists.txt                         |   3 +
 be/src/exec/data-sink.cc                           |   2 +-
 be/src/exec/hdfs-table-sink.cc                     |  26 +-
 be/src/exec/hdfs-table-sink.h                      |   2 +-
 be/src/exec/iceberg-buffered-delete-sink.cc        | 418 +++++++++++++++++++
 be/src/exec/iceberg-buffered-delete-sink.h         | 132 ++++++
 be/src/exec/iceberg-delete-sink-base.cc            | 179 ++++++++
 be/src/exec/iceberg-delete-sink-base.h             |  65 +++
 be/src/exec/iceberg-delete-sink-config.cc          |  51 +++
 be/src/exec/iceberg-delete-sink-config.h           |  35 ++
 be/src/exec/iceberg-delete-sink.cc                 | 185 +--------
 be/src/exec/iceberg-delete-sink.h                  |  50 +--
 be/src/exec/table-sink-base.cc                     |  13 +-
 be/src/exec/table-sink-base.h                      |  20 +-
 be/src/exprs/slot-ref.h                            |   1 +
 be/src/runtime/dml-exec-state.h                    |   5 +
 common/thrift/DataSinks.thrift                     |   2 +
 .../apache/impala/analysis/DmlStatementBase.java   |   2 +
 .../apache/impala/analysis/IcebergUpdateImpl.java  |  65 +--
 .../org/apache/impala/analysis/InsertStmt.java     |   1 +
 .../org/apache/impala/analysis/ModifyImpl.java     |   3 +
 .../org/apache/impala/analysis/ModifyStmt.java     |   4 +
 .../org/apache/impala/analysis/OptimizeStmt.java   |   6 +
 ...eteSink.java => IcebergBufferedDeleteSink.java} |  56 ++-
 .../apache/impala/planner/IcebergDeleteSink.java   |   8 +-
 .../java/org/apache/impala/planner/Planner.java    |   9 +-
 .../functional/functional_schema_template.sql      |   6 +-
 .../queries/PlannerTest/iceberg-v2-update.test     | 206 ++++++---
 .../queries/PlannerTest/insert-sort-by-zorder.test |  12 +-
 .../queries/QueryTest/iceberg-negative.test        |  21 +-
 .../queries/QueryTest/iceberg-update-basic.test    |   4 +-
 .../QueryTest/iceberg-update-partitions.test       | 460 +++++++++++++++++++++
 .../queries/QueryTest/iceberg-update-stress.test   |  39 ++
 tests/query_test/test_iceberg.py                   |  77 +++-
 tests/stress/test_update_stress.py                 |  47 ++-
 35 files changed, 1836 insertions(+), 379 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index e7f27dc1a..e6561045e 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -78,9 +78,12 @@ add_library(Exec
   hdfs-table-sink.cc
   hdfs-table-writer.cc
   hdfs-text-table-writer.cc
+  iceberg-buffered-delete-sink.cc
   iceberg-delete-builder.cc
   iceberg-delete-node.cc
   iceberg-delete-sink.cc
+  iceberg-delete-sink-base.cc
+  iceberg-delete-sink-config.cc
   incr-stats-util.cc
   join-builder.cc
   multi-table-sink.cc
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index dcf7d1894..3f9a1037b 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -27,7 +27,7 @@
 #include "exec/hbase/hbase-table-sink.h"
 #include "exec/hdfs-table-sink.h"
 #include "exec/iceberg-delete-builder.h"
-#include "exec/iceberg-delete-sink.h"
+#include "exec/iceberg-delete-sink-config.h"
 #include "exec/multi-table-sink.h"
 #include "exec/kudu/kudu-table-sink.h"
 #include "exec/kudu/kudu-util.h"
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index dd86d4faf..26d48c59f 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -219,7 +219,8 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc
       dynamic_partition_key_expr_evals_, &last_row_key);
   if (last_row_key == current_clustered_partition_key_) {
     DCHECK(current_clustered_partition_->second.empty());
-    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_clustered_partition_));
+    RETURN_IF_ERROR(WriteRowsToPartition(state, batch,
+        current_clustered_partition_->first.get()));
     return Status::OK();
   }
 
@@ -235,7 +236,9 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc
       DCHECK(current_clustered_partition_ != nullptr);
       // Done with previous partition - write rows and close.
       if (!current_clustered_partition_->second.empty()) {
-        RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_clustered_partition_));
+        RETURN_IF_ERROR(WriteRowsToPartition(state, batch,
+            current_clustered_partition_->first.get(),
+            current_clustered_partition_->second));
         current_clustered_partition_->second.clear();
       }
       RETURN_IF_ERROR(FinalizePartitionFile(state,
@@ -257,7 +260,9 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc
     current_clustered_partition_->second.push_back(i);
   }
   // Write final set of rows to the partition but keep its file open.
-  RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_clustered_partition_));
+  RETURN_IF_ERROR(WriteRowsToPartition(state, batch,
+      current_clustered_partition_->first.get(), current_clustered_partition_->second));
+  current_clustered_partition_->second.clear();
   return Status::OK();
 }
 
@@ -326,8 +331,11 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
     // Create a new OutputPartition, and add it to partition_keys_to_output_partitions.
     const HdfsPartitionDescriptor* partition_descriptor = GetPartitionDescriptor(key);
     std::unique_ptr<OutputPartition> partition(new OutputPartition());
+    // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/"
+    // etc.
+    RETURN_IF_ERROR(ConstructPartitionInfo(row, partition.get()));
     Status status =
-        InitOutputPartition(state, *partition_descriptor, row, partition.get(),
+        InitOutputPartition(state, *partition_descriptor, partition.get(),
             no_more_rows);
     if (!status.ok()) {
       // We failed to create the output partition successfully. Clean it up now
@@ -370,7 +378,8 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) {
     PartitionPair* partition_pair;
     RETURN_IF_ERROR(
         GetOutputPartition(state, nullptr, ROOT_PARTITION_KEY, &partition_pair, false));
-    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, partition_pair));
+    DCHECK(partition_pair->second.empty());
+    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, partition_pair->first.get()));
   } else if (input_is_clustered_) {
     RETURN_IF_ERROR(WriteClusteredRowBatch(state, batch));
   } else {
@@ -385,8 +394,11 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) {
       partition_pair->second.push_back(i);
     }
     for (PartitionMap::value_type& partition : partition_keys_to_output_partitions_) {
-      if (!partition.second.second.empty()) {
-        RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &partition.second));
+      PartitionPair& partition_pair = partition.second;
+      if (!partition_pair.second.empty()) {
+        RETURN_IF_ERROR(WriteRowsToPartition(state, batch, partition_pair.first.get(),
+            partition_pair.second));
+        partition_pair.second.clear();
       }
     }
   }
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 7211454a6..7c8716f6c 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -138,7 +138,7 @@ class HdfsTableSink : public TableSinkBase {
   /// Staying with the above example this would hold ["a=12/31/11", "b=10"].
   Status ConstructPartitionInfo(
       const TupleRow* row,
-      OutputPartition* output_partition) override;
+      OutputPartition* output_partition);
 
   /// Returns partition descriptor object for the given key.
   const HdfsPartitionDescriptor* GetPartitionDescriptor(const std::string& key);
diff --git a/be/src/exec/iceberg-buffered-delete-sink.cc b/be/src/exec/iceberg-buffered-delete-sink.cc
new file mode 100644
index 000000000..4743d4065
--- /dev/null
+++ b/be/src/exec/iceberg-buffered-delete-sink.cc
@@ -0,0 +1,418 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/iceberg-buffered-delete-sink.h"
+
+#include <boost/algorithm/string.hpp>
+
+#include "common/object-pool.h"
+#include "exec/iceberg-delete-sink-config.h"
+#include "exec/parquet/hdfs-parquet-table-writer.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
+#include "exprs/slot-ref.h"
+#include "kudu/util/url-coding.h"
+#include "runtime/descriptors.h"
+#include "runtime/hdfs-fs-cache.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/coding-util.h"
+#include "util/debug-util.h"
+#include "util/hdfs-util.h"
+#include "util/iceberg-utility-functions.h"
+#include "util/impalad-metrics.h"
+#include "util/metrics.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+class IcebergBufferedDeleteSink::FilePositionsIterator {
+  public:
+  FilePositionsIterator(const FilePositions& file_pos) {
+    DCHECK(!file_pos.empty());
+    file_level_it_ = file_pos.begin();
+    file_level_end_ = file_pos.end();
+    DCHECK(!file_level_it_->second.empty());
+    pos_level_it_ = file_level_it_->second.begin();
+  }
+
+  bool HasNext() { return file_level_it_ != file_level_end_; }
+
+  std::pair<StringValue, int64_t> Next() {
+    /// It is only valid to call Next() if HasNext() returns true.
+    DCHECK(HasNext());
+    StringValue filepath = file_level_it_->first;
+    DCHECK(pos_level_it_ != file_level_it_->second.end());
+    int64_t pos = *pos_level_it_;
+    NextPos();
+    return {filepath, pos};
+  }
+
+  private:
+  void NextPos() {
+    DCHECK(pos_level_it_ != file_level_it_->second.end());
+    ++pos_level_it_;
+    if (pos_level_it_ == file_level_it_->second.end()) {
+      NextFile();
+    }
+  }
+
+  void NextFile() {
+    DCHECK(file_level_it_ != file_level_end_);
+    ++file_level_it_;
+    if (file_level_it_ != file_level_end_){
+      DCHECK(!file_level_it_->second.empty());
+      pos_level_it_ = file_level_it_->second.begin();
+    }
+  }
+
+  FilePositions::const_iterator file_level_it_;
+  FilePositions::const_iterator file_level_end_;
+  std::vector<int64_t>::const_iterator pos_level_it_;
+};
+
+IcebergBufferedDeleteSink::IcebergBufferedDeleteSink(TDataSinkId sink_id,
+    const IcebergDeleteSinkConfig& sink_config,
+    RuntimeState* state) :
+    IcebergDeleteSinkBase(sink_id, sink_config, "IcebergBufferedDeleteSink", state) {
+}
+
+Status IcebergBufferedDeleteSink::Prepare(RuntimeState* state,
+    MemTracker* parent_mem_tracker) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(IcebergDeleteSinkBase::Prepare(state, parent_mem_tracker));
+
+  position_sort_timer_ = ADD_TIMER(state->runtime_profile(),
+      "IcebergDeletePositionSortTimer");
+  flush_timer_ = ADD_TIMER(state->runtime_profile(), "IcebergDeleteRecordsFlushTime");
+
+  buffered_delete_pool_.reset(new MemPool(mem_tracker()));
+
+  return Status::OK();
+}
+
+Status IcebergBufferedDeleteSink::Open(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(IcebergDeleteSinkBase::Open(state));
+  return Status::OK();
+}
+
+Status IcebergBufferedDeleteSink::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  expr_results_pool_->Clear();
+  RETURN_IF_ERROR(state->CheckQueryState());
+  // We don't do any work for an empty batch.
+  if (batch->num_rows() == 0) return Status::OK();
+
+  RETURN_IF_ERROR(BufferDeleteRecords(batch));
+  return Status::OK();
+}
+
+IcebergBufferedDeleteSink::PartitionInfo IcebergBufferedDeleteSink::GetPartitionInfo(
+    TupleRow* row) {
+  if (partition_key_expr_evals_.empty()) {
+    return {table_desc_->IcebergSpecId(), ""};
+  }
+  DCHECK_EQ(partition_key_expr_evals_.size(), 2);
+  ScalarExprEvaluator* spec_id_eval = partition_key_expr_evals_[0];
+  ScalarExprEvaluator* partitions_eval = partition_key_expr_evals_[1];
+  int spec_id = spec_id_eval->GetIntVal(row).val;
+  StringVal partitions_strings_val = partitions_eval->GetStringVal(row);
+  string partition_values(reinterpret_cast<char*>(partitions_strings_val.ptr),
+      partitions_strings_val.len);
+  return {spec_id, partition_values};
+}
+
+std::pair<StringVal, int64_t> IcebergBufferedDeleteSink::GetDeleteRecord(TupleRow* row) {
+  auto filepath_eval = output_expr_evals_[0];
+  auto position_eval = output_expr_evals_[1];
+  StringVal filepath_sv = filepath_eval->GetStringVal(row);
+  DCHECK(!filepath_sv.is_null);
+  BigIntVal position_bi = position_eval->GetBigIntVal(row).val;
+  DCHECK(!position_bi.is_null);
+  int64_t position = position_bi.val;
+  return {filepath_sv, position};
+}
+
+Status IcebergBufferedDeleteSink::BufferDeleteRecords(RowBatch* batch) {
+  StringVal prev_filepath;
+  vector<int64_t>* prev_vector = nullptr;
+  for (int i = 0; i < batch->num_rows(); ++i) {
+    TupleRow* row = batch->GetRow(i);
+    StringVal filepath_sv;
+    int64_t position;
+    std::tie(filepath_sv, position) = GetDeleteRecord(row);
+
+    if (filepath_sv == prev_filepath) {
+      DCHECK(prev_vector != nullptr);
+      prev_vector->push_back(position);
+      continue;
+    }
+    StringValue filepath(reinterpret_cast<char*>(DCHECK_NOTNULL(filepath_sv.ptr)),
+        filepath_sv.len);
+    PartitionInfo part_info = GetPartitionInfo(row);
+    FilePositions& files_and_positions = partitions_to_file_positions_[part_info];
+
+    if (files_and_positions.find(filepath) == files_and_positions.end()) {
+      // The file path is not in the map yet, and 'filepath_sv.ptr' points to a memory
+      // location that is owned by 'batch'. Therefore we need to deep copy this
+      // file path and put the copied StringValue into 'files_and_positions'.
+      uint8_t* new_ptr;
+      RETURN_IF_ERROR(TryAllocateUnalignedBuffer(filepath_sv.len, &new_ptr));
+      memcpy(new_ptr, filepath_sv.ptr, filepath_sv.len);
+      filepath.Assign(reinterpret_cast<char*>(new_ptr), filepath_sv.len);
+    }
+    auto& positions_vec = files_and_positions[filepath];
+    positions_vec.push_back(position);
+
+    prev_filepath = filepath_sv;
+    prev_vector = &positions_vec;
+  }
+
+  return Status::OK();
+}
+
+void IcebergBufferedDeleteSink::SortBufferedRecords() {
+  SCOPED_TIMER(position_sort_timer_);
+  for (auto& parts_and_file_posistions : partitions_to_file_positions_) {
+    FilePositions& files_and_positions = parts_and_file_posistions.second;
+    for (auto& file_to_positions : files_and_positions) {
+      std::vector<int64_t>& positions = file_to_positions.second;
+      sort(positions.begin(), positions.end());
+    }
+  }
+}
+
+void IcebergBufferedDeleteSink::VLogBufferedRecords() {
+  if (!VLOG_ROW_IS_ON) return;
+  stringstream ss;
+  for (auto& entry : partitions_to_file_positions_) {
+    const PartitionInfo& part_info = entry.first;
+    int32_t spec_id = part_info.first;
+    string part_encoded;
+    bool succ = kudu::Base64Decode(part_info.second, &part_encoded);
+    DCHECK(succ);
+    ss << endl;
+    ss << Substitute("Entries for (spec_id=$0, partition=$1):", spec_id, part_encoded)
+       << endl;
+    for (auto& file_and_pos : entry.second) {
+      ss << "  " << file_and_pos.first << ": [";
+      std::vector<int64_t>& positions = file_and_pos.second;
+      for (int i = 0; i < positions.size(); ++i) {
+        int64_t pos = positions[i];
+        ss << pos;
+        if (i != positions.size() - 1) ss << ", ";
+      }
+      ss << "]" << endl;
+    }
+  }
+  VLOG_ROW << "IcebergBufferedDeleteSink's buffered entries:" << ss.str();
+}
+
+Status IcebergBufferedDeleteSink::VerifyBufferedRecords() {
+  for (auto& entry : partitions_to_file_positions_) {
+    StringValue prev_file;
+    for (auto& file_and_pos : entry.second) {
+      StringValue file = file_and_pos.first;
+      DCHECK_LT(prev_file, file);
+      prev_file = file;
+      std::vector<int64_t>& positions = file_and_pos.second;
+      DCHECK(!positions.empty());
+      int64_t prev_pos = positions[0];
+      for (int i = 1; i < positions.size(); ++i) {
+        int64_t pos = positions[i];
+        DCHECK_GE(pos, prev_pos);
+        if (pos == prev_pos) {
+          string filepath(file.Ptr(), file.Len());
+          return Status(Substitute(
+              "Duplicated row in DELETE sink. file_path='$0', pos='$1'. "
+              "If this is coming from an UPDATE statement, please check if there are "
+              "multiple matches in the JOIN condition.", filepath, pos));
+        }
+        prev_pos = pos;
+      }
+    }
+  }
+  return Status::OK();
+}
+
+Status IcebergBufferedDeleteSink::FlushBufferedRecords(RuntimeState* state) {
+  SCOPED_TIMER(flush_timer_);
+
+  int capacity = state->batch_size();
+  RowBatch row_batch(row_desc_, capacity, mem_tracker());
+  RETURN_IF_ERROR(InitializeOutputRowBatch(&row_batch));
+
+  for (auto& entry : partitions_to_file_positions_) {
+    int32_t spec_id = entry.first.first;
+    const string& partition_encoded = entry.first.second;
+    RETURN_IF_ERROR(SetCurrentPartition(state, spec_id, partition_encoded));
+    FilePositionsIterator it(entry.second);
+    while (it.HasNext()) {
+      row_batch.Reset();
+      RETURN_IF_ERROR(GetNextRowBatch(&row_batch, &it));
+      row_batch.VLogRows("IcebergBufferedDeleteSink");
+      RETURN_IF_ERROR(WriteRowsToPartition(state, &row_batch, current_partition_.get()));
+    }
+    DCHECK(current_partition_ != nullptr);
+    RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.get(),
+        /*is_delete=*/true, &dml_exec_state_));
+    current_partition_->writer->Close();
+  }
+  return Status::OK();
+}
+
+Status IcebergBufferedDeleteSink::InitializeOutputRowBatch(RowBatch* batch) {
+  SlotRef* filepath_ref = DCHECK_NOTNULL(dynamic_cast<SlotRef*>(output_exprs_[0]));
+  int tuple_idx = filepath_ref->GetTupleIdx();
+
+  int capacity = batch->capacity();
+  TupleDescriptor* tuple_desc = row_desc_->tuple_descriptors()[tuple_idx];
+  int rows_buffer_size = capacity * tuple_desc->byte_size();
+  uint8_t* rows_buffer;
+  RETURN_IF_ERROR(TryAllocateUnalignedBuffer(rows_buffer_size, &rows_buffer));
+  memset(rows_buffer, 0, rows_buffer_size);
+
+  for (int i = 0; i < capacity; ++i) {
+    TupleRow* row = batch->GetRow(i);
+    row->SetTuple(tuple_idx,
+        reinterpret_cast<Tuple*>(rows_buffer + i * tuple_desc->byte_size()));
+  }
+  return Status::OK();
+}
+
+Status IcebergBufferedDeleteSink::SetCurrentPartition(RuntimeState* state,
+    int32_t spec_id, const std::string& partition_encoded) {
+  current_partition_.reset(new OutputPartition());
+  // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/"
+  // etc.
+  RETURN_IF_ERROR(ConstructPartitionInfo(
+      spec_id, partition_encoded, current_partition_.get()));
+  Status status = InitOutputPartition(state, *prototype_partition_,
+      current_partition_.get(), false);
+  if (!status.ok()) {
+    // We failed to create the output partition successfully. Clean it up now.
+    if (current_partition_->writer != nullptr) {
+      current_partition_->writer->Close();
+    }
+    return status;
+  }
+
+  // With partition evolution it's possible that we have the same partition names
+  // with different spec ids. E.g. in case of TRUNCATE(1000, col) => TRUNCATE(500, col),
+  // we might need to delete rows from partition "col_trunc=1000" with both spec ids. In
+  // this case we might already have "col_trunc=1000" in dml_exec_state, so no need to
+  // add it.
+  if (!dml_exec_state_.PartitionExists(current_partition_->partition_name)) {
+    // Save the partition name so that the coordinator can create the partition
+    // directory structure if needed.
+    dml_exec_state_.AddPartition(
+        current_partition_->partition_name, prototype_partition_->id(),
+        &table_desc_->hdfs_base_dir(),
+        nullptr);
+  }
+  return Status::OK();
+}
+
+Status IcebergBufferedDeleteSink::TryAllocateUnalignedBuffer(int buffer_size,
+    uint8_t** buffer) {
+  *buffer = buffered_delete_pool_->TryAllocateUnaligned(buffer_size);
+  if (*buffer == nullptr) {
+    return Status(Substitute("Could not allocate $0 bytes for IcebergBufferedDeleteSink",
+        buffer_size));
+  }
+  return Status::OK();
+}
+
+Status IcebergBufferedDeleteSink::GetNextRowBatch(
+    RowBatch* batch, FilePositionsIterator* iterator) {
+  DCHECK_EQ(batch->num_rows(), 0);
+  int capacity = batch->capacity();
+  while (batch->num_rows() < capacity && iterator->HasNext()) {
+    const auto& next_entry = iterator->Next();
+    int row_idx = batch->AddRow();
+    TupleRow* row = batch->GetRow(row_idx);
+    WriteRow(next_entry.first, next_entry.second, row);
+    batch->CommitRows(1);
+  }
+  return Status::OK();
+}
+
+void IcebergBufferedDeleteSink::WriteRow(
+    StringValue filepath, int64_t offset, TupleRow* row) {
+  SlotRef* filepath_ref = DCHECK_NOTNULL(dynamic_cast<SlotRef*>(output_exprs_[0]));
+  SlotRef* position_ref = DCHECK_NOTNULL(dynamic_cast<SlotRef*>(output_exprs_[1]));
+
+  DCHECK(filepath_ref->type().IsStringType());
+  DCHECK(position_ref->type().IsIntegerType());
+
+  int filepath_tuple_idx = filepath_ref->GetTupleIdx();
+  int position_tuple_idx = position_ref->GetTupleIdx();
+  DCHECK_EQ(filepath_tuple_idx, position_tuple_idx);
+
+  StringValue* filepath_slot = row->GetTuple(filepath_tuple_idx)->
+      GetStringSlot(filepath_ref->GetSlotOffset());
+  int64_t* pos_slot = row->GetTuple(position_tuple_idx)->
+      GetBigIntSlot(position_ref->GetSlotOffset());
+
+  filepath_slot->Assign(filepath);
+  *pos_slot = offset;
+}
+
+Status IcebergBufferedDeleteSink::FlushFinal(RuntimeState* state) {
+  DCHECK(!closed_);
+  SCOPED_TIMER(profile()->total_time_counter());
+
+  SortBufferedRecords();
+  VLogBufferedRecords();
+  RETURN_IF_ERROR(VerifyBufferedRecords());
+  RETURN_IF_ERROR(FlushBufferedRecords(state));
+  return Status::OK();
+}
+
+void IcebergBufferedDeleteSink::Close(RuntimeState* state) {
+  if (closed_) return;
+  SCOPED_TIMER(profile()->total_time_counter());
+
+  DmlExecStatusPB dml_exec_proto;
+  dml_exec_state_.ToProto(&dml_exec_proto);
+  state->dml_exec_state()->Update(dml_exec_proto);
+
+  current_partition_.reset();
+  buffered_delete_pool_->FreeAll();
+
+  IcebergDeleteSinkBase::Close(state);
+  DCHECK(closed_);
+}
+
+string IcebergBufferedDeleteSink::DebugString() const {
+  stringstream out;
+  out << "IcebergBufferedDeleteSink("
+      << " table_desc=" << table_desc_->DebugString()
+      << " output_exprs=" << ScalarExpr::DebugString(output_exprs_);
+  if (!partition_key_exprs_.empty()) {
+    out << " partition_key_exprs=" << ScalarExpr::DebugString(partition_key_exprs_);
+  }
+  out << ")";
+  return out.str();
+}
+
+} // namespace impala
diff --git a/be/src/exec/iceberg-buffered-delete-sink.h b/be/src/exec/iceberg-buffered-delete-sink.h
new file mode 100644
index 000000000..fd51bf6c2
--- /dev/null
+++ b/be/src/exec/iceberg-buffered-delete-sink.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/iceberg-delete-sink-base.h"
+#include "exec/output-partition.h"
+#include "exec/table-sink-base.h"
+#include "runtime/string-value.h"
+
+#include <unordered_map>
+
+namespace impala {
+
+class MemPool;
+
+/// IcebergBufferedDeleteSink buffers the Iceberg position delete records in its
+/// internal data structures. In FlushFinal() it sorts the buffered records (per
+/// partition) and writes them out to position delete files (writing one partition
+/// at a time). So the main difference between IcebergBufferedDeleteSink and
+/// IcebergDeleteSink is that IcebergBufferedDeleteSink doesn't assume a sorted
+/// input of delete records.
+class IcebergBufferedDeleteSink : public IcebergDeleteSinkBase {
+ public:
+  IcebergBufferedDeleteSink(TDataSinkId sink_id,
+      const IcebergDeleteSinkConfig& sink_config,
+      RuntimeState* state);
+
+  /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
+  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
+
+  /// Opens output_exprs and partition_key_exprs.
+  Status Open(RuntimeState* state) override;
+
+  /// Buffers incoming row batches in 'current_partition_'.
+  Status Send(RuntimeState* state, RowBatch* batch) override;
+
+  /// Writes the buffered records to position delete files in the correct order.
+  Status FlushFinal(RuntimeState* state) override;
+
+  /// Releases buffers and merges the member 'dml_exec_state_' to
+  /// 'state->dml_exec_state()'.
+  void Close(RuntimeState* state) override;
+
+  std::string DebugString() const override;
+
+ private:
+  /// Contains partition spec id and the encoded partition values.
+  typedef std::pair<int32_t, std::string> PartitionInfo;
+  /// Contains the filepaths and the corresponding file positions of deleted records.
+  /// It's necessary to use a std::map so we can get back the file paths in order.
+  typedef std::map<StringValue, std::vector<int64_t>> FilePositions;
+
+  /// Nested iterator class to conveniently iterate over a FilePositions object.
+  class FilePositionsIterator;
+
+  /// Retreives partition information from 'row'.
+  PartitionInfo GetPartitionInfo(TupleRow* row);
+
+  /// Retreives delete record data from 'row'.
+  std::pair<impala_udf::StringVal, int64_t> GetDeleteRecord(TupleRow* row);
+
+  /// Iterates over 'batch' and stores delete records in 'partitions_to_file_positions_'.
+  Status BufferDeleteRecords(RowBatch* batch);
+
+  /// Sorts the buffered records.
+  void SortBufferedRecords();
+
+  /// Logs the buffered records at VLOG_ROW level.
+  void VLogBufferedRecords();
+
+  /// Verifies that there are no duplicates in the buffered delete records. It assumes
+  /// that SortBufferedRecords() has been called already.
+  Status VerifyBufferedRecords();
+
+  /// Writes all buffered delete records to position delete files.
+  Status FlushBufferedRecords(RuntimeState* state);
+
+  /// Initializes an empty output batch.
+  Status InitializeOutputRowBatch(RowBatch* batch);
+
+  /// Uses 'iterator' to write delete records from 'partitions_to_file_positions_'
+  /// to 'batch'.
+  Status GetNextRowBatch(RowBatch* batch, FilePositionsIterator* iterator);
+
+  /// Writes a single delete record <filepah, offset> to 'row'.
+  void WriteRow(StringValue filepath, int64_t offset, TupleRow* row);
+
+  /// Tries to allocate a buffer with size 'buffer_size'. Returns error when cannot
+  /// serve the request.
+  Status TryAllocateUnalignedBuffer(int buffer_size, uint8_t** buffer);
+
+  /// Sets the 'current_partition_' based on 'spec_id' and 'partitions'.
+  Status SetCurrentPartition(RuntimeState* state, int32_t spec_id,
+      const std::string& partitions);
+
+  /// Timer for 'SortBufferedRecords()'.
+  RuntimeProfile::Counter* position_sort_timer_;
+
+  /// Timer for 'FlushBufferedRecords()'.
+  RuntimeProfile::Counter* flush_timer_;
+
+  /// Buffer pool to serve allocation requests.
+  std::unique_ptr<MemPool> buffered_delete_pool_;
+
+  /// The sink writes partitions one-by-one.
+  std::unique_ptr<OutputPartition> current_partition_;
+
+  /// We collect the delete records from the input row batches in this member.
+  std::unordered_map<PartitionInfo, FilePositions> partitions_to_file_positions_;
+
+  /// This sink has its own DmlExecState object because in the context of UPADTEs we
+  /// cannot modify the same DmlExecState object simultaneously (from the INSERT and
+  /// DELETE sinks). It is merged into state->dml_exec_state() in Close().
+  DmlExecState dml_exec_state_;
+};
+
+}
diff --git a/be/src/exec/iceberg-delete-sink-base.cc b/be/src/exec/iceberg-delete-sink-base.cc
new file mode 100644
index 000000000..2ad64589e
--- /dev/null
+++ b/be/src/exec/iceberg-delete-sink-base.cc
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/iceberg-delete-sink-base.h"
+
+#include <boost/algorithm/string.hpp>
+
+#include "exec/iceberg-delete-sink-config.h"
+#include "exec/output-partition.h"
+#include "exprs/scalar-expr-evaluator.h"
+#include "kudu/util/url-coding.h"
+#include "util/debug-util.h"
+#include "util/iceberg-utility-functions.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+IcebergDeleteSinkBase::IcebergDeleteSinkBase(TDataSinkId sink_id,
+    const IcebergDeleteSinkConfig& sink_config, const std::string& name,
+    RuntimeState* state) :
+    TableSinkBase(sink_id, sink_config, name, state) {
+}
+
+Status IcebergDeleteSinkBase::Prepare(RuntimeState* state,
+    MemTracker* parent_mem_tracker) {
+  RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker));
+  unique_id_str_ = "delete-" + PrintId(state->fragment_instance_id(), "-");
+
+  // Resolve table id and set input tuple descriptor.
+  table_desc_ = static_cast<const HdfsTableDescriptor*>(
+      state->desc_tbl().GetTableDescriptor(table_id_));
+  if (table_desc_ == nullptr) {
+    stringstream error_msg("Failed to get table descriptor for table id: ");
+    error_msg << table_id_;
+    return Status(error_msg.str());
+  }
+
+  DCHECK_GE(output_expr_evals_.size(),
+      table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString();
+
+  return Status::OK();
+}
+
+Status IcebergDeleteSinkBase::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(TableSinkBase::Open(state));
+  DCHECK_EQ(partition_key_expr_evals_.size(), dynamic_partition_key_expr_evals_.size());
+  return Status::OK();
+}
+
+std::string IcebergDeleteSinkBase::HumanReadablePartitionValue(
+    TIcebergPartitionTransformType::type transform_type, const std::string& value,
+    Status* transform_result) {
+  if (!iceberg::IsTimeBasedPartition(transform_type) ||
+    value == table_desc_->null_partition_key_value()) {
+    *transform_result = Status::OK();
+    return value;
+  }
+  return iceberg::HumanReadableTime(transform_type, value, transform_result);
+}
+
+Status IcebergDeleteSinkBase::ConstructPartitionInfo(
+    const TupleRow* row, OutputPartition* output_partition) {
+  DCHECK(output_partition != nullptr);
+  DCHECK(output_partition->raw_partition_names.empty());
+
+  if (partition_key_expr_evals_.empty()) {
+    output_partition->iceberg_spec_id = table_desc_->IcebergSpecId();
+    return Status::OK();
+  }
+
+  DCHECK_EQ(partition_key_expr_evals_.size(), 2);
+
+  ScalarExprEvaluator* spec_id_eval = partition_key_expr_evals_[0];
+  ScalarExprEvaluator* partition_values_eval = partition_key_expr_evals_[1];
+
+  int spec_id = spec_id_eval->GetIntVal(row).val;
+
+  impala_udf::StringVal partition_values_sval = partition_values_eval->GetStringVal(row);
+  string partition_values_str((const char*)partition_values_sval.ptr,
+      partition_values_sval.len);
+  return ConstructPartitionInfo(spec_id, partition_values_str, output_partition);
+}
+
+Status IcebergDeleteSinkBase::ConstructPartitionInfo(int32_t spec_id,
+    const std::string& partition_values_str, OutputPartition* output_partition) {
+  if (partition_key_expr_evals_.empty()) {
+    DCHECK_EQ(spec_id, table_desc_->IcebergSpecId());
+    output_partition->iceberg_spec_id = spec_id;
+    return Status::OK();
+  }
+  output_partition->iceberg_spec_id = spec_id;
+
+  vector<string> non_void_partition_names;
+  vector<TIcebergPartitionTransformType::type> non_void_partition_transforms;
+  if (LIKELY(spec_id == table_desc_->IcebergSpecId())) {
+    // If 'spec_id' is the default spec id, then just copy the already populated
+    // non void partition names and transforms.
+    non_void_partition_names = table_desc_->IcebergNonVoidPartitionNames();
+    non_void_partition_transforms = table_desc_->IcebergNonVoidPartitionTransforms();
+  } else {
+    // Otherwise collect the non-void partition names belonging to 'spec_id'.
+    const TIcebergPartitionSpec& partition_spec =
+        table_desc_->IcebergPartitionSpecs()[spec_id];
+    for (const TIcebergPartitionField& spec_field : partition_spec.partition_fields) {
+      auto transform_type = spec_field.transform.transform_type;
+      if (transform_type != TIcebergPartitionTransformType::VOID) {
+        non_void_partition_names.push_back(spec_field.field_name);
+        non_void_partition_transforms.push_back(transform_type);
+      }
+    }
+  }
+
+  if (non_void_partition_names.empty()) {
+    DCHECK(partition_values_str.empty());
+    return Status::OK();
+  }
+
+  vector<string> partition_values_encoded;
+  boost::split(partition_values_encoded, partition_values_str, boost::is_any_of("."));
+  vector<string> partition_values_decoded;
+  partition_values_decoded.reserve(partition_values_encoded.size());
+  for (const string& encoded_part_val : partition_values_encoded) {
+    string decoded_val;
+    bool success = kudu::Base64Decode(encoded_part_val, &decoded_val);
+    // We encoded it, we must succeed decoding it.
+    DCHECK(success);
+    partition_values_decoded.push_back(std::move(decoded_val));
+  }
+
+  DCHECK_EQ(partition_values_decoded.size(), non_void_partition_names.size());
+  DCHECK_EQ(partition_values_decoded.size(), non_void_partition_transforms.size());
+
+  stringstream url_encoded_partition_name_ss;
+
+  for (int i = 0; i < partition_values_decoded.size(); ++i) {
+    auto transform_type = non_void_partition_transforms[i];
+    stringstream raw_partition_key_value_ss;
+    stringstream url_encoded_partition_key_value_ss;
+
+    raw_partition_key_value_ss << non_void_partition_names[i] << "=";
+    url_encoded_partition_key_value_ss << non_void_partition_names[i] << "=";
+
+    string& value_str = partition_values_decoded[i];
+    Status transform_status;
+    value_str = HumanReadablePartitionValue(
+        transform_type, value_str, &transform_status);
+    if (!transform_status.ok()) return transform_status;
+    raw_partition_key_value_ss << value_str;
+
+    string part_key_value = UrlEncodePartitionValue(value_str);
+    url_encoded_partition_key_value_ss << part_key_value;
+    if (i < partition_values_decoded.size() - 1) {
+      url_encoded_partition_key_value_ss << "/";
+    }
+    url_encoded_partition_name_ss << url_encoded_partition_key_value_ss.str();
+    output_partition->raw_partition_names.push_back(raw_partition_key_value_ss.str());
+  }
+
+  output_partition->partition_name = url_encoded_partition_name_ss.str();
+
+  return Status::OK();
+}
+
+}
diff --git a/be/src/exec/iceberg-delete-sink-base.h b/be/src/exec/iceberg-delete-sink-base.h
new file mode 100644
index 000000000..e4fe15547
--- /dev/null
+++ b/be/src/exec/iceberg-delete-sink-base.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/table-sink-base.h"
+
+namespace impala {
+
+class IcebergDeleteSinkConfig;
+class MemTracker;
+class RowBatch;
+class RuntimeState;
+class TupleRow;
+
+class IcebergDeleteSinkBase : public TableSinkBase {
+ public:
+  IcebergDeleteSinkBase(TDataSinkId sink_id, const IcebergDeleteSinkConfig& sink_config,
+      const std::string& name, RuntimeState* state);
+
+  /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
+  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
+
+  /// Opens output_exprs and partition_key_exprs.
+  Status Open(RuntimeState* state) override;
+
+  TSortingOrder::type sorting_order() const override { return TSortingOrder::LEXICAL; }
+
+ protected:
+  /// Fills output_partition's partition_name, raw_partition_names and
+  /// external_partition_name based on the row's columns. In case of partitioned
+  /// tables 'row' must contain the Iceberg virtual columns PARTITION__SPEC__ID and
+  /// ICEBERG__PARTITION__SERIALIZED. Every information needed for 'output_partition' can
+  /// be retrieved from these fields and from the 'table_desc_'.
+  Status ConstructPartitionInfo(const TupleRow* row, OutputPartition* output_partition);
+  Status ConstructPartitionInfo(int32_t spec_id, const std::string& partitions,
+      OutputPartition* output_partition);
+
+  /// Returns the human-readable representation of a partition transform value. It is used
+  /// to create the file paths. IcebergUtil.partitionDataFromDataFile() also expects
+  /// partition values in this representation.
+  /// E.g. if 'transform_type' is MONTH and 'value' is "7" this function returns
+  /// "1970-08".
+  /// Parse errors are set in 'transform_result'. If it is not OK, the return value
+  /// of this function does not contain any meaningful value.
+  std::string HumanReadablePartitionValue(
+      TIcebergPartitionTransformType::type transform_type, const std::string& value,
+      Status* transform_result);
+};
+
+}
\ No newline at end of file
diff --git a/be/src/exec/iceberg-delete-sink-config.cc b/be/src/exec/iceberg-delete-sink-config.cc
new file mode 100644
index 000000000..0ab358915
--- /dev/null
+++ b/be/src/exec/iceberg-delete-sink-config.cc
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/iceberg-delete-sink-config.h"
+
+#include "common/object-pool.h"
+#include "common/status.h"
+#include "exec/iceberg-buffered-delete-sink.h"
+#include "exec/iceberg-delete-sink.h"
+#include "exprs/scalar-expr.h"
+#include "runtime/mem-pool.h"
+
+namespace impala {
+
+DataSink* IcebergDeleteSinkConfig::CreateSink(RuntimeState* state) const {
+  TDataSinkId sink_id = state->fragment().idx;
+  if (this->tsink_->table_sink.iceberg_delete_sink.is_buffered) {
+    return state->obj_pool()->Add(
+      new IcebergBufferedDeleteSink(sink_id, *this, state));
+  } else {
+    return state->obj_pool()->Add(
+      new IcebergDeleteSink(sink_id, *this, state));
+  }
+}
+
+Status IcebergDeleteSinkConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
+  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
+  DCHECK(tsink_->__isset.table_sink);
+  DCHECK(tsink_->table_sink.__isset.iceberg_delete_sink);
+  RETURN_IF_ERROR(
+      ScalarExpr::Create(tsink_->table_sink.iceberg_delete_sink.partition_key_exprs,
+          *input_row_desc_, state, &partition_key_exprs_));
+  return Status::OK();
+}
+
+}
\ No newline at end of file
diff --git a/be/src/exec/iceberg-delete-sink-config.h b/be/src/exec/iceberg-delete-sink-config.h
new file mode 100644
index 000000000..c34111bb7
--- /dev/null
+++ b/be/src/exec/iceberg-delete-sink-config.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/table-sink-base.h"
+
+namespace impala {
+
+class IcebergDeleteSinkConfig : public TableSinkBaseConfig {
+ public:
+  DataSink* CreateSink(RuntimeState* state) const override;
+
+  ~IcebergDeleteSinkConfig() override {}
+
+ protected:
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      FragmentState* state) override;
+};
+
+}
\ No newline at end of file
diff --git a/be/src/exec/iceberg-delete-sink.cc b/be/src/exec/iceberg-delete-sink.cc
index 1c6e21053..657d75869 100644
--- a/be/src/exec/iceberg-delete-sink.cc
+++ b/be/src/exec/iceberg-delete-sink.cc
@@ -17,22 +17,18 @@
 
 #include "exec/iceberg-delete-sink.h"
 
-#include <boost/algorithm/string.hpp>
-
 #include "common/object-pool.h"
+#include "exec/iceberg-delete-sink-config.h"
 #include "exec/parquet/hdfs-parquet-table-writer.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "kudu/util/url-coding.h"
 #include "runtime/descriptors.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "util/coding-util.h"
-#include "util/debug-util.h"
 #include "util/hdfs-util.h"
-#include "util/iceberg-utility-functions.h"
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
 #include "util/runtime-profile-counters.h"
@@ -41,54 +37,20 @@
 
 namespace impala {
 
-DataSink* IcebergDeleteSinkConfig::CreateSink(RuntimeState* state) const {
-  TDataSinkId sink_id = state->fragment().idx;
-  return state->obj_pool()->Add(
-      new IcebergDeleteSink(sink_id, *this,
-          this->tsink_->table_sink.iceberg_delete_sink, state));
-}
-
-Status IcebergDeleteSinkConfig::Init(
-    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
-  RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state));
-  DCHECK(tsink_->__isset.table_sink);
-  DCHECK(tsink_->table_sink.__isset.iceberg_delete_sink);
-  RETURN_IF_ERROR(
-      ScalarExpr::Create(tsink_->table_sink.iceberg_delete_sink.partition_key_exprs,
-          *input_row_desc_, state, &partition_key_exprs_));
-  return Status::OK();
-}
-
 IcebergDeleteSink::IcebergDeleteSink(TDataSinkId sink_id,
-    const IcebergDeleteSinkConfig& sink_config, const TIcebergDeleteSink& ice_del_sink,
-    RuntimeState* state) :
-    TableSinkBase(sink_id, sink_config, "IcebergDeleteSink", state) {
+    const IcebergDeleteSinkConfig& sink_config, RuntimeState* state) :
+    IcebergDeleteSinkBase(sink_id, sink_config, "IcebergDeleteSink", state) {
 }
 
 Status IcebergDeleteSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
   SCOPED_TIMER(profile()->total_time_counter());
-  RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker));
-  unique_id_str_ = "delete-" + PrintId(state->fragment_instance_id(), "-");
-
-  // Resolve table id and set input tuple descriptor.
-  table_desc_ = static_cast<const HdfsTableDescriptor*>(
-      state->desc_tbl().GetTableDescriptor(table_id_));
-  if (table_desc_ == nullptr) {
-    stringstream error_msg("Failed to get table descriptor for table id: ");
-    error_msg << table_id_;
-    return Status(error_msg.str());
-  }
-
-  DCHECK_GE(output_expr_evals_.size(),
-      table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString();
-
+  RETURN_IF_ERROR(IcebergDeleteSinkBase::Prepare(state, parent_mem_tracker));
   return Status::OK();
 }
 
 Status IcebergDeleteSink::Open(RuntimeState* state) {
   SCOPED_TIMER(profile()->total_time_counter());
-  RETURN_IF_ERROR(TableSinkBase::Open(state));
-  DCHECK_EQ(partition_key_expr_evals_.size(), dynamic_partition_key_expr_evals_.size());
+  RETURN_IF_ERROR(IcebergDeleteSinkBase::Open(state));
   return Status::OK();
 }
 
@@ -106,7 +68,8 @@ Status IcebergDeleteSink::Send(RuntimeState* state, RowBatch* batch) {
     if (current_partition_.first == nullptr) {
       RETURN_IF_ERROR(SetCurrentPartition(state, nullptr, ROOT_PARTITION_KEY));
     }
-    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &current_partition_));
+    DCHECK(current_partition_.second.empty());
+    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_partition_.first.get()));
   } else {
     RETURN_IF_ERROR(WriteClusteredRowBatch(state, batch));
   }
@@ -153,7 +116,10 @@ inline Status IcebergDeleteSink::SetCurrentPartition(RuntimeState* state,
 
   current_partition_.first.reset(new OutputPartition());
   current_partition_.second.clear();
-  Status status = InitOutputPartition(state, *prototype_partition_, row,
+  // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/"
+  // etc.
+  RETURN_IF_ERROR(ConstructPartitionInfo(row, current_partition_.first.get()));
+  Status status = InitOutputPartition(state, *prototype_partition_,
       current_partition_.first.get(), false);
   if (!status.ok()) {
     // We failed to create the output partition successfully. Clean it up now.
@@ -165,7 +131,7 @@ inline Status IcebergDeleteSink::SetCurrentPartition(RuntimeState* state,
 
   // Save the partition name so that the coordinator can create the partition
   // directory structure if needed.
-  dml_exec_state_.AddPartition(
+  state->dml_exec_state()->AddPartition(
       current_partition_.first->partition_name, prototype_partition_->id(),
       &table_desc_->hdfs_base_dir(),
       nullptr);
@@ -193,7 +159,7 @@ Status IcebergDeleteSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch*
       dynamic_partition_key_expr_evals_, &last_row_key);
   if (last_row_key == current_clustered_partition_key_) {
     DCHECK(current_partition_.second.empty());
-    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &current_partition_));
+    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_partition_.first.get()));
     return Status::OK();
   }
 
@@ -209,11 +175,12 @@ Status IcebergDeleteSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch*
       DCHECK(current_partition_.first->writer != nullptr);
       // Done with previous partition - write rows and close.
       if (!current_partition_.second.empty()) {
-        RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &current_partition_));
+        RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_partition_.first.get(),
+            current_partition_.second));
         current_partition_.second.clear();
       }
       RETURN_IF_ERROR(FinalizePartitionFile(state,
-          current_partition_.first.get(), /*is_delete=*/true, &dml_exec_state_));
+          current_partition_.first.get(), /*is_delete=*/true));
       if (current_partition_.first->writer.get() != nullptr) {
         current_partition_.first->writer->Close();
       }
@@ -229,7 +196,9 @@ Status IcebergDeleteSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch*
     current_partition_.second.push_back(i);
   }
   // Write final set of rows to the partition but keep its file open.
-  RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &current_partition_));
+  RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_partition_.first.get(),
+      current_partition_.second));
+  current_partition_.second.clear();
   return Status::OK();
 }
 
@@ -239,7 +208,7 @@ Status IcebergDeleteSink::FlushFinal(RuntimeState* state) {
 
   if (current_partition_.first != nullptr) {
     RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.first.get(),
-        /*is_delete=*/true, &dml_exec_state_));
+        /*is_delete=*/true));
   }
   return Status::OK();
 }
@@ -248,10 +217,6 @@ void IcebergDeleteSink::Close(RuntimeState* state) {
   if (closed_) return;
   SCOPED_TIMER(profile()->total_time_counter());
 
-  DmlExecStatusPB dml_exec_proto;
-  dml_exec_state_.ToProto(&dml_exec_proto);
-  state->dml_exec_state()->Update(dml_exec_proto);
-
   if (current_partition_.first != nullptr) {
     if (current_partition_.first->writer != nullptr) {
       current_partition_.first->writer->Close();
@@ -261,114 +226,8 @@ void IcebergDeleteSink::Close(RuntimeState* state) {
   }
 
   current_partition_.first.reset();
-  TableSinkBase::Close(state);
-  closed_ = true;
-}
-
-std::string IcebergDeleteSink::HumanReadablePartitionValue(
-    TIcebergPartitionTransformType::type transform_type, const std::string& value,
-    Status* transform_result) {
-  if (!iceberg::IsTimeBasedPartition(transform_type) ||
-    value == table_desc_->null_partition_key_value()) {
-    *transform_result = Status::OK();
-    return value;
-  }
-  return iceberg::HumanReadableTime(transform_type, value, transform_result);
-}
-
-Status IcebergDeleteSink::ConstructPartitionInfo(
-    const TupleRow* row,
-    OutputPartition* output_partition) {
-  DCHECK(output_partition != nullptr);
-  DCHECK(output_partition->raw_partition_names.empty());
-
-  if (partition_key_expr_evals_.empty()) {
-    output_partition->iceberg_spec_id = table_desc_->IcebergSpecId();
-    return Status::OK();
-  }
-
-  DCHECK_EQ(partition_key_expr_evals_.size(), 2);
-
-  ScalarExprEvaluator* spec_id_eval = partition_key_expr_evals_[0];
-  ScalarExprEvaluator* partitions_eval = partition_key_expr_evals_[1];
-
-  int spec_id = spec_id_eval->GetIntVal(row).val;
-  output_partition->iceberg_spec_id = spec_id;
-
-  StringVal partitions_strings_val = partitions_eval->GetStringVal(row);
-  string partition_values_str((const char*)partitions_strings_val.ptr,
-      partitions_strings_val.len);
-
-  vector<string> non_void_partition_names;
-  vector<TIcebergPartitionTransformType::type> non_void_partition_transforms;
-  if (LIKELY(spec_id == table_desc_->IcebergSpecId())) {
-    // If 'spec_id' is the default spec id, then just copy the already populated
-    // non void partition names and transforms.
-    non_void_partition_names = table_desc_->IcebergNonVoidPartitionNames();
-    non_void_partition_transforms = table_desc_->IcebergNonVoidPartitionTransforms();
-  } else {
-    // Otherwise collect the non-void partition names belonging to 'spec_id'.
-    const TIcebergPartitionSpec& partition_spec =
-        table_desc_->IcebergPartitionSpecs()[spec_id];
-    for (const TIcebergPartitionField& spec_field : partition_spec.partition_fields) {
-      auto transform_type = spec_field.transform.transform_type;
-      if (transform_type != TIcebergPartitionTransformType::VOID) {
-        non_void_partition_names.push_back(spec_field.field_name);
-        non_void_partition_transforms.push_back(transform_type);
-      }
-    }
-  }
-
-  if (non_void_partition_names.empty()) {
-    DCHECK(partition_values_str.empty());
-    return Status::OK();
-  }
-
-  vector<string> partition_values_encoded;
-  boost::split(partition_values_encoded, partition_values_str, boost::is_any_of("."));
-  vector<string> partition_values_decoded;
-  partition_values_decoded.reserve(partition_values_encoded.size());
-  for (const string& encoded_part_val : partition_values_encoded) {
-    string decoded_val;
-    bool success = kudu::Base64Decode(encoded_part_val, &decoded_val);
-    // We encoded it, we must succeed decoding it.
-    DCHECK(success);
-    partition_values_decoded.push_back(std::move(decoded_val));
-  }
-
-  DCHECK_EQ(partition_values_decoded.size(), non_void_partition_names.size());
-  DCHECK_EQ(partition_values_decoded.size(), non_void_partition_transforms.size());
-
-  stringstream url_encoded_partition_name_ss;
-  stringstream external_partition_name_ss;
-
-  for (int i = 0; i < partition_values_decoded.size(); ++i) {
-    auto transform_type = non_void_partition_transforms[i];
-    stringstream raw_partition_key_value_ss;
-    stringstream encoded_partition_key_value_ss;
-
-    raw_partition_key_value_ss << non_void_partition_names[i] << "=";
-    encoded_partition_key_value_ss << non_void_partition_names[i] << "=";
-
-    string& value_str = partition_values_decoded[i];
-    Status transform_status;
-    value_str = HumanReadablePartitionValue(
-        transform_type, value_str, &transform_status);
-    if (!transform_status.ok()) return transform_status;
-    raw_partition_key_value_ss << value_str;
-
-    string part_key_value = UrlEncodePartitionValue(value_str);
-    encoded_partition_key_value_ss << part_key_value;
-    if (i < partition_values_decoded.size() - 1) encoded_partition_key_value_ss << "/";
-
-    url_encoded_partition_name_ss << encoded_partition_key_value_ss.str();
-    output_partition->raw_partition_names.push_back(raw_partition_key_value_ss.str());
-  }
-
-  output_partition->partition_name = url_encoded_partition_name_ss.str();
-  output_partition->external_partition_name = external_partition_name_ss.str();
-
-  return Status::OK();
+  IcebergDeleteSinkBase::Close(state);
+  DCHECK(closed_);
 }
 
 string IcebergDeleteSink::DebugString() const {
diff --git a/be/src/exec/iceberg-delete-sink.h b/be/src/exec/iceberg-delete-sink.h
index cb042cd50..9be88d294 100644
--- a/be/src/exec/iceberg-delete-sink.h
+++ b/be/src/exec/iceberg-delete-sink.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "exec/iceberg-delete-sink-base.h"
 #include "exec/output-partition.h"
 #include "exec/table-sink-base.h"
 
@@ -25,35 +26,24 @@
 namespace impala {
 
 class Expr;
+class IcebergDeleteSinkConfig;
 class TupleDescriptor;
 class TupleRow;
-class RuntimeState;
 class MemTracker;
 
-class IcebergDeleteSinkConfig : public TableSinkBaseConfig {
- public:
-  DataSink* CreateSink(RuntimeState* state) const override;
-
-  ~IcebergDeleteSinkConfig() override {}
-
- protected:
-  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
-      FragmentState* state) override;
-};
-
-class IcebergDeleteSink : public TableSinkBase {
+class IcebergDeleteSink : public IcebergDeleteSinkBase {
  public:
   IcebergDeleteSink(TDataSinkId sink_id, const IcebergDeleteSinkConfig& sink_config,
-    const TIcebergDeleteSink& hdfs_sink, RuntimeState* state);
+    RuntimeState* state);
 
   /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
   Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
 
-  /// Opens output_exprs and partition_key_exprs, prepares the single output partition for
-  /// static inserts, and populates partition_descriptor_map_.
+  /// Opens output_exprs and partition_key_exprs.
   Status Open(RuntimeState* state) override;
 
-  /// Append all rows in batch to the temporary Hdfs files corresponding to partitions.
+  /// Append all rows in batch to position delete files. It is assumed that
+  /// that rows are ordered by partitions, filepaths, and positions.
   Status Send(RuntimeState* state, RowBatch* batch) override;
 
   /// Finalize any open files.
@@ -61,23 +51,11 @@ class IcebergDeleteSink : public TableSinkBase {
   Status FlushFinal(RuntimeState* state) override;
 
   /// Closes writers, output_exprs and partition_key_exprs and releases resources.
-  /// The temporary files will be moved to their final destination by the Coordinator.
   void Close(RuntimeState* state) override;
 
-  TSortingOrder::type sorting_order() const override { return TSortingOrder::LEXICAL; }
-
   std::string DebugString() const override;
 
  private:
-  /// Fills output_partition's partition_name, raw_partition_names and
-  /// external_partition_name based on the row's columns. In case of partitioned
-  /// tables 'row' must contain the Iceberg virtual columns PARTITION__SPEC__ID and
-  /// ICEBERG__PARTITION__SERIALIZED. Every information needed for 'output_partition' can
-  /// be retrieved from these fields and from the 'table_desc_'.
-  Status ConstructPartitionInfo(
-      const TupleRow* row,
-      OutputPartition* output_partition) override;
-
   /// Verifies that the row batch does not contain duplicated rows. This can only happen
   /// in the context of UPDATE FROM statements when we are updating a table based on
   /// another table, e.g.:
@@ -87,15 +65,10 @@ class IcebergDeleteSink : public TableSinkBase {
   /// Therefore, we should always raise an error if we find duplicated rows (i.e rows
   /// having the same filepath + position), because that would corrupt the table data
   /// and the delete files as well.
+  /// For a case where deduplication is not possible at the sink level, see the comment
+  /// in IcebergUpdateImpl.buildAndValidateSelectExprs() in the Frontend Java code.
   Status VerifyRowsNotDuplicated(RowBatch* batch);
 
-  /// Returns the human-readable representation of a partition transform value. It is used
-  /// to create the file paths. IcebergUtil.partitionDataFromDataFile() also expects
-  /// partition values in this representation.
-  std::string HumanReadablePartitionValue(
-      TIcebergPartitionTransformType::type transform_type, const std::string& value,
-      Status* transform_result);
-
   /// Maps all rows in 'batch' to partitions and appends them to their temporary Hdfs
   /// files. The input must be ordered by the partition key expressions.
   Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT;
@@ -110,11 +83,6 @@ class IcebergDeleteSink : public TableSinkBase {
   /// The sink writes partitions one-by-one.
   PartitionPair current_partition_;
 
-  /// This sink has its own DmlExecState object because in the context of UPADTEs we
-  /// cannot modify the same DmlExecState object simultaneously (from the INSERT and
-  /// DELETE sinks). It is merged into state->dml_exec_state() in Close().
-  DmlExecState dml_exec_state_;
-
   /// Variables necessary for validating that row batches don't contain duplicates.
   std::string prev_file_path_;
   int64_t prev_position_ = -1;
diff --git a/be/src/exec/table-sink-base.cc b/be/src/exec/table-sink-base.cc
index 73285eb8c..4fc9cf7b9 100644
--- a/be/src/exec/table-sink-base.cc
+++ b/be/src/exec/table-sink-base.cc
@@ -192,12 +192,8 @@ void TableSinkBase::BuildHdfsFileNames(
 }
 
 Status TableSinkBase::InitOutputPartition(RuntimeState* state,
-    const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
+    const HdfsPartitionDescriptor& partition_descriptor,
     OutputPartition* output_partition, bool empty_partition) {
-  // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/"
-  // etc.
-  RETURN_IF_ERROR(ConstructPartitionInfo(row, output_partition));
-
   BuildHdfsFileNames(partition_descriptor, output_partition);
 
   if (ShouldSkipStaging(state, output_partition)) {
@@ -366,16 +362,16 @@ Status TableSinkBase::CreateNewTmpFile(RuntimeState* state,
 }
 
 Status TableSinkBase::WriteRowsToPartition(
-    RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) {
+    RuntimeState* state, RowBatch* batch, OutputPartition* output_partition,
+    const std::vector<int32_t>& indices) {
   // The rows of this batch may span multiple files. We repeatedly pass the row batch to
   // the writer until it sets new_file to false, indicating that all rows have been
   // written. The writer tracks where it is in the batch when it returns with new_file
   // set.
   bool new_file;
   while (true) {
-    OutputPartition* output_partition = partition_pair->first.get();
     Status status =
-        output_partition->writer->AppendRows(batch, partition_pair->second, &new_file);
+        output_partition->writer->AppendRows(batch, indices, &new_file);
     if (!status.ok()) {
       // IMPALA-10607: Deletes partition file if staging is skipped when appending rows
       // fails. Otherwise, it leaves the file in un-finalized state.
@@ -390,7 +386,6 @@ Status TableSinkBase::WriteRowsToPartition(
     RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
     RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
   }
-  partition_pair->second.clear();
   return Status::OK();
 }
 
diff --git a/be/src/exec/table-sink-base.h b/be/src/exec/table-sink-base.h
index 85d4213c7..4434599e8 100644
--- a/be/src/exec/table-sink-base.h
+++ b/be/src/exec/table-sink-base.h
@@ -86,15 +86,14 @@ protected:
 
   virtual bool IsHiveAcid() const { return false; }
 
-  virtual Status ConstructPartitionInfo(
-      const TupleRow* row,
-      OutputPartition* output_partition) = 0;
-
   /// Initialises the filenames of a given output partition, and opens the temporary file.
-  /// The partition key is derived from 'row'. If the partition will not have any rows
-  /// added to it, empty_partition must be true.
+  /// The caller of this function must already have filled partition-related information
+  /// in 'output_partition', such as 'iceberg_spec_id', 'partition_name',
+  /// 'raw_partition_names', 'external_partition_name' for table types where these fields
+  /// are applicable.
+  /// If the partition will not have any rows added to it, empty_partition must be true.
   Status InitOutputPartition(RuntimeState* state,
-      const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row,
+      const HdfsPartitionDescriptor& partition_descriptor,
       OutputPartition* output_partition, bool empty_partition) WARN_UNUSED_RESULT;
 
   /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition.
@@ -130,10 +129,11 @@ protected:
   Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition,
       bool is_delete = false, DmlExecState* dml_exec_state = nullptr) WARN_UNUSED_RESULT;
 
-  /// Writes all rows referenced by the row index vector in 'partition_pair' to the
-  /// partition's writer and clears the row index vector afterwards.
+  /// Writes all rows in 'batch' referenced by the row index vector in 'indices' to the
+  /// partition's writer. If 'indices' is empty, then it writes all rows in 'batch'.
   Status WriteRowsToPartition(
-      RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair)
+      RuntimeState* state, RowBatch* batch, OutputPartition* partition,
+      const std::vector<int32_t>& indices = {})
       WARN_UNUSED_RESULT;
 
   /// Closes the hdfs file for this partition as well as the writer.
diff --git a/be/src/exprs/slot-ref.h b/be/src/exprs/slot-ref.h
index 421eaad1e..aee033aeb 100644
--- a/be/src/exprs/slot-ref.h
+++ b/be/src/exprs/slot-ref.h
@@ -65,6 +65,7 @@ class SlotRef : public ScalarExpr {
   static const char* LLVM_CLASS_NAME;
   NullIndicatorOffset GetNullIndicatorOffset() const { return null_indicator_offset_; }
   const SlotDescriptor* GetSlotDescriptor() const { return slot_desc_; }
+  int GetTupleIdx() const { return tuple_idx_; }
   int GetSlotOffset() const { return slot_offset_; }
   virtual const TupleDescriptor* GetCollectionTupleDesc() const override;
 
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
index a2901b61a..da9f4ff3d 100644
--- a/be/src/runtime/dml-exec-state.h
+++ b/be/src/runtime/dml-exec-state.h
@@ -66,6 +66,11 @@ class DmlExecState {
   void AddPartition(const std::string& name, int64_t id, const std::string* base_dir,
       const std::string* staging_dir_to_clean_up);
 
+  /// Returns true if partition with 'name' already exists.
+  bool PartitionExists(const std::string& name) {
+    return per_partition_status_.find(name) != per_partition_status_.end();
+  }
+
   /// Merge given values into stats for partition with name 'partition_name'.
   /// Ignores 'insert_stats' if nullptr.
   /// Requires that the partition already exist.
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index ba071d9dd..90ec3e29a 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -111,6 +111,8 @@ struct TIcebergDeleteSink {
   // Partition expressions of this sink. In case of Iceberg DELETEs these are the
   // partition spec id and the serialized partition data.
   1: required list<Exprs.TExpr> partition_key_exprs
+  // True if we are using the buffered delete sink.
+  2: required bool is_buffered = false
 }
 
 // Structure to encapsulate specific options that are passed down to the KuduTableSink
diff --git a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
index d2be81dd4..3618a72a0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DmlStatementBase.java
@@ -22,6 +22,7 @@ import org.apache.impala.catalog.FeTable;
 
 import com.google.common.base.Preconditions;
 import org.apache.impala.planner.DataSink;
+import org.apache.impala.thrift.TSortingOrder;
 
 import java.util.List;
 
@@ -67,6 +68,7 @@ public abstract class DmlStatementBase extends StatementBase {
   abstract public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer);
   abstract public List<Expr> getPartitionKeyExprs();
   abstract public List<Expr> getSortExprs();
+  abstract public TSortingOrder getSortingOrder();
 
   /**
    * Return bytes of Kudu transaction token.
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
index fc287313b..29cbcb467 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergUpdateImpl.java
@@ -17,8 +17,6 @@
 
 package org.apache.impala.analysis;
 
-import static java.lang.String.format;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,14 +30,13 @@ import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
-import org.apache.impala.planner.IcebergDeleteSink;
+import org.apache.impala.planner.IcebergBufferedDeleteSink;
 import org.apache.impala.planner.MultiDataSink;
 import org.apache.impala.planner.TableSink;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TSortingOrder;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import org.apache.impala.util.ExprUtil;
 import org.apache.impala.util.IcebergUtil;
 
@@ -52,6 +49,10 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
   private List<Expr> insertResultExprs_ = new ArrayList<>();
   private List<Expr> insertPartitionKeyExprs_ = new ArrayList<>();
 
+  private List<Integer> sortColumns_ = new ArrayList<>();
+
+  // The sort order used for tables that have SORT BY columns.
+  private TSortingOrder sortingOrder_ = TSortingOrder.LEXICAL;
   // END: Members that are set in buildAndValidateSelectExprs().
   /////////////////////////////////////////
 
@@ -63,12 +64,6 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
     super.analyze(analyzer);
     deleteTableId_ = analyzer.getDescTbl().addTargetTable(icePosDelTable_);
     IcebergUtil.validateIcebergColumnsForInsert(originalTargetTable_);
-    if (originalTargetTable_.getPartitionSpecs().size() > 1) {
-      throw new AnalysisException(
-          String.format("Table '%s' has multiple partition specs, therefore " +
-              "cannot be used as a target table in an UPDATE statement",
-              originalTargetTable_.getFullName()));
-    }
     String updateMode = originalTargetTable_.getIcebergApiTable().properties().get(
         TableProperties.UPDATE_MODE);
     if (updateMode != null && !updateMode.equals("merge-on-read")) {
@@ -86,13 +81,6 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
     Pair<List<Integer>, TSortingOrder> sortProperties =
         AlterTableSetTblProperties.analyzeSortColumns(originalTargetTable_,
             originalTargetTable_.getMetaStoreTable().getParameters());
-    if (!sortProperties.first.isEmpty()) {
-      throw new AnalysisException(String.format("Impala does not support updating " +
-              "sorted tables. Data files in table '%s' are sorted by the " +
-              "following column(s): %s", originalTargetTable_.getFullName(),
-          originalTargetTable_.getMetaStoreTable().getParameters().get(
-              AlterTableSortByStmt.TBL_PROP_SORT_COLUMNS)));
-    }
     if (originalTargetTable_.getIcebergFileFormat() != TIcebergFileFormat.PARQUET) {
       throw new AnalysisException(String.format("Impala can only write Parquet data " +
           "files, while table '%s' expects '%s' data files.",
@@ -119,13 +107,24 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
 
       IcebergColumn c = (IcebergColumn)lhsSlotRef.getResolvedPath().destColumn();
       rhsExpr = checkTypeCompatiblity(analyzer, c, rhsExpr);
-      if (IcebergUtil.isPartitionColumn(
-          c, originalTargetTable_.getDefaultPartitionSpec())) {
+      // In case of a JOIN, and if duplicated rows are shuffled independently, we cannot
+      // do duplicate checking in the SINK. This is the case when the following
+      // conditions are true:
+      // * UPDATE FROM statement with multiple table references
+      // * Updating partition column value with a non-constant expression
+      // Therefore we are throwing an exception here because we cannot guarantee
+      // that the result will be valid.
+      // TODO(IMPALA-12531): Mention the MERGE statement in the error message,
+      // as the MERGE statement should be able to do the duplicate checking.
+      if (IcebergUtil.isPartitionColumn(c,
+          originalTargetTable_.getDefaultPartitionSpec()) &&
+          (modifyStmt_.fromClause_ != null && modifyStmt_.fromClause_.size() > 1) &&
+          !rhsExpr.isConstant()) {
         throw new AnalysisException(
-            String.format("Left-hand side in assignment '%s = %s' refers to a " +
-                "partitioning column", lhsSlotRef.toSql(), rhsExpr.toSql()));
+            String.format("Cannot UPDATE partitioning column '%s' via UPDATE FROM " +
+                "statement with multiple table refs, and when right-hand side '%s' is " +
+                "non-constant. ", lhsSlotRef.toSql(), rhsExpr.toSql()));
       }
-
       checkLhsOnlyAppearsOnce(colToExprs, c, lhsSlotRef, rhsExpr);
       colToExprs.put(c.getPosition(), rhsExpr);
     }
@@ -145,7 +144,18 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
     selectList.addAll(ExprUtil.exprsAsSelectList(deletePartitionKeyExprs_));
     selectList.addAll(ExprUtil.exprsAsSelectList(deleteResultExprs_));
 
-    sortExprs_.addAll(deleteResultExprs_);
+    addSortColumns();
+  }
+
+  private void addSortColumns() throws AnalysisException {
+    Pair<List<Integer>, TSortingOrder> sortProperties =
+        AlterTableSetTblProperties.analyzeSortColumns(originalTargetTable_,
+            originalTargetTable_.getMetaStoreTable().getParameters());
+    sortColumns_ = sortProperties.first;
+    sortingOrder_ = sortProperties.second;
+
+    // Assign sortExprs_ based on sortColumns_.
+    for (Integer colIdx: sortColumns_) sortExprs_.add(insertResultExprs_.get(colIdx));
   }
 
   @Override
@@ -166,9 +176,12 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
 
   @Override
   public List<Expr> getPartitionKeyExprs() {
-    return deletePartitionKeyExprs_;
+    return insertPartitionKeyExprs_;
   }
 
+  @Override
+  public TSortingOrder getSortingOrder() { return sortingOrder_; }
+
   public void substituteResultExprs(ExprSubstitutionMap smap, Analyzer analyzer) {
     super.substituteResultExprs(smap, analyzer);
     insertResultExprs_ = Expr.substituteList(insertResultExprs_, smap, analyzer, true);
@@ -183,9 +196,9 @@ public class IcebergUpdateImpl extends IcebergModifyImpl {
 
     TableSink insertSink = TableSink.create(modifyStmt_.table_, TableSink.Op.INSERT,
         insertPartitionKeyExprs_, insertResultExprs_, Collections.emptyList(), false,
-        false, new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, null,
+        false, new Pair<>(sortColumns_, sortingOrder_), -1, null,
         modifyStmt_.maxTableSinks_);
-    TableSink deleteSink = new IcebergDeleteSink(
+    TableSink deleteSink = new IcebergBufferedDeleteSink(
         icePosDelTable_, deletePartitionKeyExprs_, deleteResultExprs_, deleteTableId_);
 
     MultiDataSink ret = new MultiDataSink();
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index d4e8d6579..05d1b9168 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -1113,6 +1113,7 @@ public class InsertStmt extends DmlStatementBase {
   public boolean isTargetTableKuduTable() { return (table_ instanceof FeKuduTable); }
   public void setWriteId(long writeId) { this.writeId_ = writeId; }
   public boolean isOverwrite() { return overwrite_; }
+  @Override
   public TSortingOrder getSortingOrder() { return sortingOrder_; }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
index 5832e1c40..099d4483d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyImpl.java
@@ -22,6 +22,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.rewrite.ExprRewriter;
+import org.apache.impala.thrift.TSortingOrder;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -171,4 +172,6 @@ abstract class ModifyImpl {
   public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
     sourceStmt_.rewriteExprs(rewriter);
   }
+
+  public TSortingOrder getSortingOrder() { return TSortingOrder.LEXICAL; }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
index 0ee571646..eb4b28a27 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java
@@ -33,6 +33,7 @@ import org.apache.impala.rewrite.ExprRewriter;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import org.apache.impala.thrift.TSortingOrder;
 
 /**
  * Abstract super class for statements that modify existing data like
@@ -195,6 +196,9 @@ public abstract class ModifyStmt extends DmlStatementBase {
 
   public List<Pair<SlotRef, Expr>> getAssignments() { return assignments_; }
 
+  @Override
+  public TSortingOrder getSortingOrder() { return modifyImpl_.getSortingOrder(); }
+
   @Override
   public boolean resolveTableMask(Analyzer analyzer) throws AnalysisException {
     return getQueryStmt().resolveTableMask(analyzer);
diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
index 8e381f8fd..ecbb23b66 100644
--- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -25,6 +25,7 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.planner.DataSink;
 import org.apache.impala.rewrite.ExprRewriter;
+import org.apache.impala.thrift.TSortingOrder;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -96,6 +97,11 @@ public class OptimizeStmt extends DmlStatementBase {
     throw new NotImplementedException();
   }
 
+  @Override
+  public TSortingOrder getSortingOrder() {
+    throw new NotImplementedException();
+  }
+
   @Override
   public List<Expr> getPartitionKeyExprs() {
     return insertStmt_.getPartitionKeyExprs();
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java b/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
similarity index 73%
copy from fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
copy to fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
index 69386b169..fda1f11be 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
@@ -17,12 +17,10 @@
 
 package org.apache.impala.planner;
 
-import java.util.List;
-
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeIcebergTable;
-
+import org.apache.impala.common.ByteUnits;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
@@ -31,26 +29,23 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
 
-/**
- * Sink for deleting data from Iceberg tables. Impala deletes data via 'merge-on-read'
- * strategy, which means it writes Iceberg position delete files. These files contain
- * information (file_path, position) about the deleted rows. Query engines reading from
- * an Iceberg table need to exclude the deleted rows from the result of the table scan.
- * Impala does this by doing an ANTI JOIN between data files and delete files.
- */
-public class IcebergDeleteSink extends TableSink {
+import java.util.List;
+
+public class IcebergBufferedDeleteSink extends TableSink {
+
   final private int deleteTableId_;
 
   // Exprs for computing the output partition(s).
   protected final List<Expr> partitionKeyExprs_;
 
-  public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> partitionKeyExprs,
-      List<Expr> outputExprs) {
+  public IcebergBufferedDeleteSink(FeIcebergTable targetTable,
+      List<Expr> partitionKeyExprs, List<Expr> outputExprs) {
     this(targetTable, partitionKeyExprs, outputExprs, 0);
   }
 
-  public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> partitionKeyExprs,
-      List<Expr> outputExprs, int deleteTableId) {
+  public IcebergBufferedDeleteSink(FeIcebergTable targetTable,
+      List<Expr> partitionKeyExprs, List<Expr> outputExprs,
+      int deleteTableId) {
     super(targetTable, Op.DELETE, outputExprs);
     partitionKeyExprs_ = partitionKeyExprs;
     deleteTableId_ = deleteTableId;
@@ -66,30 +61,30 @@ public class IcebergDeleteSink extends TableSink {
   public void computeResourceProfile(TQueryOptions queryOptions) {
     PlanNode inputNode = fragment_.getPlanRoot();
     final int numInstances = fragment_.getNumInstances();
-    // Input is clustered, so it produces a single partition at a time.
-    final long numBufferedPartitionsPerInstance = 1;
     // For regular Parquet files we estimate 1GB memory consumption which is already
     // a conservative, i.e. probably too high memory estimate.
     // Writing out position delete files means we are writing filenames and positions
     // per partition. So assuming 0.5 GB per position delete file writer can be still
     // considered a very conservative estimate.
-    final long perPartitionMemReq = 512L * 1024L * 1024L;
+    final long perPartitionMemReq = 512L * ByteUnits.MEGABYTE;
+    // The writer also buffers the file paths and positions before it can start writing
+    // out files. Let's assume it needs to buffer 20K file paths and 1M positions
+    // per sink, that is around 20K * 200 byte + 1M * 8 bytes = 12 megabytes. Let's
+    // make it 16 MBs.
+    final long bufferedData = 16 * ByteUnits.MEGABYTE;
 
     long perInstanceMemEstimate;
     // The estimate is based purely on the per-partition mem req if the input cardinality_
     // or the avg row size is unknown.
     if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
-      perInstanceMemEstimate = numBufferedPartitionsPerInstance * perPartitionMemReq;
+      perInstanceMemEstimate = perPartitionMemReq + bufferedData;
     } else {
       // The per-partition estimate may be higher than the memory required to buffer
       // the entire input data.
       long perInstanceInputCardinality =
           Math.max(1L, inputNode.getCardinality() / numInstances);
-      long perInstanceInputBytes =
+      perInstanceMemEstimate =
           (long) Math.ceil(perInstanceInputCardinality * inputNode.getAvgRowSize());
-      long perInstanceMemReq =
-          PlanNode.checkedMultiply(numBufferedPartitionsPerInstance, perPartitionMemReq);
-      perInstanceMemEstimate = Math.min(perInstanceInputBytes, perInstanceMemReq);
     }
     resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate);
   }
@@ -97,29 +92,30 @@ public class IcebergDeleteSink extends TableSink {
   @Override
   public void appendSinkExplainString(String prefix, String detailPrefix,
       TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) {
-    output.append(String.format("%sDELETE FROM ICEBERG [%s]\n", prefix,
+    output.append(String.format("%sBUFFERED DELETE FROM ICEBERG [%s]\n", prefix,
         targetTable_.getFullName()));
     if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
       output.append(detailPrefix + "output exprs: ")
-            .append(Expr.getExplainString(outputExprs_, explainLevel) + "\n");
+          .append(Expr.getExplainString(outputExprs_, explainLevel) + "\n");
       if (!partitionKeyExprs_.isEmpty()) {
         output.append(detailPrefix + "partition keys: ")
-              .append(Expr.getExplainString(partitionKeyExprs_, explainLevel) + "\n");
+            .append(Expr.getExplainString(partitionKeyExprs_, explainLevel) + "\n");
       }
     }
   }
 
   @Override
   protected String getLabel() {
-    return "ICEBERG DELETER";
+    return "ICEBERG BUFFERED DELETER";
   }
 
   @Override
   protected void toThriftImpl(TDataSink tsink) {
-    TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink(
-        Expr.treesToThrift(partitionKeyExprs_));
+    TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink();
+    icebergDeleteSink.setPartition_key_exprs(Expr.treesToThrift(partitionKeyExprs_));
+    icebergDeleteSink.setIs_buffered(true);
     TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
-            TTableSinkType.HDFS, sinkOp_.toThrift());
+        TTableSinkType.HDFS, sinkOp_.toThrift());
     tTableSink.iceberg_delete_sink = icebergDeleteSink;
     tTableSink.setTarget_table_id(deleteTableId_);
     tsink.table_sink = tTableSink;
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
index 69386b169..2e2387ddd 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
@@ -22,7 +22,7 @@ import java.util.List;
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeIcebergTable;
-
+import org.apache.impala.common.ByteUnits;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
@@ -73,7 +73,7 @@ public class IcebergDeleteSink extends TableSink {
     // Writing out position delete files means we are writing filenames and positions
     // per partition. So assuming 0.5 GB per position delete file writer can be still
     // considered a very conservative estimate.
-    final long perPartitionMemReq = 512L * 1024L * 1024L;
+    final long perPartitionMemReq = 512L * ByteUnits.MEGABYTE;
 
     long perInstanceMemEstimate;
     // The estimate is based purely on the per-partition mem req if the input cardinality_
@@ -116,8 +116,8 @@ public class IcebergDeleteSink extends TableSink {
 
   @Override
   protected void toThriftImpl(TDataSink tsink) {
-    TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink(
-        Expr.treesToThrift(partitionKeyExprs_));
+    TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink();
+    icebergDeleteSink.setPartition_key_exprs(Expr.treesToThrift(partitionKeyExprs_));
     TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
             TTableSinkType.HDFS, sinkOp_.toThrift());
     tTableSink.iceberg_delete_sink = icebergDeleteSink;
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index eb944f8f6..8eb0ca914 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -923,14 +923,19 @@ public class Planner {
       Analyzer analyzer) throws ImpalaException {
     List<Expr> orderingExprs = new ArrayList<>();
 
-    orderingExprs.addAll(dmlStmt.getPartitionKeyExprs());
+    List<Expr> partitionKeyExprs = dmlStmt.getPartitionKeyExprs();
+    orderingExprs.addAll(partitionKeyExprs);
     orderingExprs.addAll(dmlStmt.getSortExprs());
 
+    if (orderingExprs.isEmpty()) return;
+
     // Build sortinfo to sort by the ordering exprs.
     List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true);
     List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), false);
     SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams,
-        TSortingOrder.LEXICAL);
+        dmlStmt.getSortingOrder());
+    int numPartitionKeys = partitionKeyExprs.size();
+    sortInfo.setNumLexicalKeysInZOrder(numPartitionKeys);
     sortInfo.createSortTupleInfo(dmlStmt.getResultExprs(), analyzer);
     sortInfo.getSortTupleDescriptor().materializeSlots();
     dmlStmt.substituteResultExprs(sortInfo.getOutputSmap(), analyzer);
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 491224841..5de22e79d 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3401,7 +3401,11 @@ CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 (ts timestamp, s string, i int, j int)
 PARTITIONED BY SPEC (year(ts), bucket(5, s))
 SORT BY ZORDER (i, j)
-STORED AS ICEBERG;
+STORED AS ICEBERG
+TBLPROPERTIES('format-version'='2');
+---- DEPENDENT_LOAD
+TRUNCATE TABLE {db_name}{db_suffix}.{table_name};
+INSERT INTO {db_name}{db_suffix}.{table_name} VALUES ('2023-12-08 16:15:33', 'Alpaca', 111, 222);
 ====
 ---- DATASET
 functional
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
index dd7fee1f9..a7b61a2a6 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-update.test
@@ -2,11 +2,7 @@ UPDATE iceberg_v2_no_deletes set s = concat(s,s) where i = 3
 ---- PLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
-|
-01:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=36B cardinality=1
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
    HDFS partitions=1/1 files=1 size=625B
@@ -15,11 +11,7 @@ MULTI DATA SINK
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_no_deletes, OVERWRITE=false]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
-|
-01:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=36B cardinality=1
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
    HDFS partitions=1/1 files=1 size=625B
@@ -30,11 +22,7 @@ UPDATE iceberg_v2_delete_positional SET `data` = concat(`data`,'a') where id = 1
 ---- PLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-03:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=40B cardinality=1
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=40B cardinality=1
@@ -50,11 +38,7 @@ MULTI DATA SINK
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-04:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=40B cardinality=1
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
 |  row-size=40B cardinality=1
@@ -74,11 +58,7 @@ UPDATE iceberg_v2_delete_positional SET id = cast(id+1 as int)
 ---- PLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-03:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=40B cardinality=2
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=40B cardinality=2
@@ -93,11 +73,7 @@ MULTI DATA SINK
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-04:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=40B cardinality=2
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
 |  row-size=40B cardinality=2
@@ -116,11 +92,7 @@ UPDATE iceberg_v2_delete_positional SET id = 42 WHERE FILE__POSITION = id
 ---- PLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-03:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=32B cardinality=1
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=40B cardinality=1
@@ -136,11 +108,7 @@ MULTI DATA SINK
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_delete_positional, OVERWRITE=false]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-04:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=32B cardinality=1
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
 |  row-size=40B cardinality=1
@@ -160,10 +128,10 @@ UPDATE iceberg_v2_partitioned_position_deletes set id = length(action)
 ---- PLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 03:SORT
-|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  order by: action ASC NULLS LAST
 |  row-size=76B cardinality=10
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
@@ -179,13 +147,13 @@ MULTI DATA SINK
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 05:SORT
-|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  order by: action ASC NULLS LAST
 |  row-size=76B cardinality=10
 |
-04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
+04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.action)]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
 |  row-size=76B cardinality=10
@@ -204,10 +172,10 @@ UPDATE iceberg_v2_partitioned_position_deletes set id = length(action) where act
 ---- PLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 03:SORT
-|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  order by: action ASC NULLS LAST
 |  row-size=76B cardinality=3
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
@@ -224,10 +192,10 @@ MULTI DATA SINK
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 04:SORT
-|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  order by: action ASC NULLS LAST
 |  row-size=76B cardinality=3
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
@@ -248,10 +216,10 @@ UPDATE target set user = s from iceberg_v2_partitioned_position_deletes target,
 ---- PLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 09:SORT
-|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  order by: action ASC NULLS LAST
 |  row-size=80B cardinality=10
 |
 08:HASH JOIN [INNER JOIN]
@@ -292,13 +260,13 @@ MULTI DATA SINK
 ---- DISTRIBUTEDPLAN
 MULTI DATA SINK
 |->WRITE TO HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes, OVERWRITE=false, PARTITION-KEYS=(action)]
-|->DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 13:SORT
-|  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
+|  order by: action ASC NULLS LAST
 |  row-size=80B cardinality=10
 |
-12:EXCHANGE [HASH(target.PARTITION__SPEC__ID,target.ICEBERG__PARTITION__SERIALIZED)]
+12:EXCHANGE [HASH(target.action)]
 |
 08:HASH JOIN [INNER JOIN, BROADCAST]
 |  hash predicates: target.id = source.i
@@ -342,3 +310,133 @@ MULTI DATA SINK
    runtime filters: RF000 -> target.id
    row-size=68B cardinality=20
 ====
+update iceberg_partition_transforms_zorder set ts = days_add(ts, 10), i = cast(i + 1000 as int)
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(days_add(ts, 10)),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_partition_transforms_zorder-POSITION-DELETE]
+|
+01:SORT
+|  order by: LEXICAL: year(days_add(ts, 10)) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: CAST(i + 1000 AS INT), j
+|  row-size=80B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
+   HDFS partitions=1/1 files=1 size=1.08KB
+   row-size=72B cardinality=1
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(days_add(ts, 10)),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_partition_transforms_zorder-POSITION-DELETE]
+|
+01:SORT
+|  order by: LEXICAL: year(days_add(ts, 10)) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: CAST(i + 1000 AS INT), j
+|  row-size=80B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
+   HDFS partitions=1/1 files=1 size=1.08KB
+   row-size=72B cardinality=1
+====
+update ice_zorder set j = length(action)
+from iceberg_partition_transforms_zorder ice_zorder, iceberg_partitioned source
+where source.id = ice_zorder.i
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(ice_zorder.ts),iceberg_bucket_transform(ice_zorder.s, 5))]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_partition_transforms_zorder-POSITION-DELETE]
+|
+03:SORT
+|  order by: LEXICAL: year(ice_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(ice_zorder.s, 5) ASC NULLS LAST, ZORDER: i, length(action)
+|  row-size=80B cardinality=20
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: source.id = ice_zorder.i
+|  runtime filters: RF000 <- ice_zorder.i
+|  row-size=84B cardinality=20
+|
+|--00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder ice_zorder]
+|     HDFS partitions=1/1 files=1 size=1.08KB
+|     row-size=68B cardinality=1
+|
+01:SCAN HDFS [functional_parquet.iceberg_partitioned source]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   runtime filters: RF000 -> source.id
+   row-size=16B cardinality=20
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(ice_zorder.ts),iceberg_bucket_transform(ice_zorder.s, 5))]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_partition_transforms_zorder-POSITION-DELETE]
+|
+05:SORT
+|  order by: LEXICAL: year(ice_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(ice_zorder.s, 5) ASC NULLS LAST, ZORDER: i, length(action)
+|  row-size=80B cardinality=20
+|
+04:EXCHANGE [HASH(year(ice_zorder.ts),iceberg_bucket_transform(ice_zorder.s, 5))]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: source.id = ice_zorder.i
+|  runtime filters: RF000 <- ice_zorder.i
+|  row-size=84B cardinality=20
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder ice_zorder]
+|     HDFS partitions=1/1 files=1 size=1.08KB
+|     row-size=68B cardinality=1
+|
+01:SCAN HDFS [functional_parquet.iceberg_partitioned source]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   runtime filters: RF000 -> source.id
+   row-size=16B cardinality=20
+====
+update ice_zorder set j = length(action)
+from iceberg_partition_transforms_zorder ice_zorder inner join iceberg_partitioned source
+on source.id = ice_zorder.i
+---- PLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(ice_zorder.ts),iceberg_bucket_transform(ice_zorder.s, 5))]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_partition_transforms_zorder-POSITION-DELETE]
+|
+03:SORT
+|  order by: LEXICAL: year(ice_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(ice_zorder.s, 5) ASC NULLS LAST, ZORDER: i, length(action)
+|  row-size=80B cardinality=20
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: source.id = ice_zorder.i
+|  runtime filters: RF000 <- ice_zorder.i
+|  row-size=84B cardinality=20
+|
+|--00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder ice_zorder]
+|     HDFS partitions=1/1 files=1 size=1.08KB
+|     row-size=68B cardinality=1
+|
+01:SCAN HDFS [functional_parquet.iceberg_partitioned source]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   runtime filters: RF000 -> source.id
+   row-size=16B cardinality=20
+---- DISTRIBUTEDPLAN
+MULTI DATA SINK
+|->WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=false, PARTITION-KEYS=(year(ice_zorder.ts),iceberg_bucket_transform(ice_zorder.s, 5))]
+|->BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_partition_transforms_zorder-POSITION-DELETE]
+|
+05:SORT
+|  order by: LEXICAL: year(ice_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(ice_zorder.s, 5) ASC NULLS LAST, ZORDER: i, length(action)
+|  row-size=80B cardinality=20
+|
+04:EXCHANGE [HASH(year(ice_zorder.ts),iceberg_bucket_transform(ice_zorder.s, 5))]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: source.id = ice_zorder.i
+|  runtime filters: RF000 <- ice_zorder.i
+|  row-size=84B cardinality=20
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder ice_zorder]
+|     HDFS partitions=1/1 files=1 size=1.08KB
+|     row-size=68B cardinality=1
+|
+01:SCAN HDFS [functional_parquet.iceberg_partitioned source]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   runtime filters: RF000 -> source.id
+   row-size=16B cardinality=20
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
index 4c9026e0a..45c5d186e 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
@@ -397,21 +397,21 @@ WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE
 |
 01:SORT
 |  order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j
-|  row-size=44B cardinality=0
+|  row-size=44B cardinality=1
 |
 00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
-   HDFS partitions=1/1 files=0 size=0B
-   row-size=36B cardinality=0
+   HDFS partitions=1/1 files=1 size=1.08KB
+   row-size=36B cardinality=1
 ---- DISTRIBUTEDPLAN
 WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
 |
 02:SORT
 |  order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j
-|  row-size=44B cardinality=0
+|  row-size=44B cardinality=1
 |
 01:EXCHANGE [UNPARTITIONED]
 |
 00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
-   HDFS partitions=1/1 files=0 size=0B
-   row-size=36B cardinality=0
+   HDFS partitions=1/1 files=1 size=1.08KB
+   row-size=36B cardinality=1
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index c7ecc8785..e0946c828 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -804,16 +804,6 @@ SELECT ICEBERG__DATA__SEQUENCE__NUMBER FROM functional_parquet.alltypes;
 AnalysisException: Could not resolve column/field reference: 'iceberg__data__sequence__number'
 ====
 ---- QUERY
-update functional_parquet.iceberg_partition_evolution set year = 2023 where id = 14393;
----- CATCH
-AnalysisException: Table 'functional_parquet.iceberg_partition_evolution' has multiple partition specs, therefore cannot be used as a target table in an UPDATE statement
-====
----- QUERY
-update functional_parquet.iceberg_int_partitioned set i = j;
----- CATCH
-AnalysisException: Left-hand side in assignment 'i = j' refers to a partitioning column
-====
----- QUERY
 update functional_parquet.iceberg_int_partitioned set k = 1, k = 2;
 ---- CATCH
 AnalysisException: Left-hand side in assignment appears multiple times 'k=2'
@@ -854,17 +844,10 @@ update ice_v2_timestamptz set i = 3;
 AnalysisException: The Iceberg table has a TIMESTAMPTZ column that Impala cannot write.
 ====
 ---- QUERY
-create table sorted_ice (i int) sort by (i)
-stored by iceberg tblproperties ('format-version'='2');
-update sorted_ice set i = 4;
----- CATCH
-AnalysisException: Impala does not support updating sorted tables. Data files in table '$DATABASE.sorted_ice' are sorted by the following column(s): i
-====
----- QUERY
 # Metadata tables should raise proper error message.
-update $DATABASE.sorted_ice.history set parent_id = 0 where snapshot_id = 3;
+update functional_parquet.iceberg_int_partitioned.history set parent_id = 0 where snapshot_id = 3;
 ---- CATCH
-AnalysisException: Cannot resolve path '$DATABASE.sorted_ice.history' for DML statement.
+AnalysisException: Cannot resolve path 'functional_parquet.iceberg_int_partitioned.history' for DML statement.
 ====
 ---- QUERY
 create table update_orc (i int)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
index d44dd16ec..183b9aa6c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-basic.test
@@ -249,7 +249,7 @@ STRING, STRING, STRING, STRING
 ====
 ---- QUERY
 # Negative test for UPDATE part 3:
-# updating partition column AND right side is non-constant value AND we have a FROM clause
+# updating partition column AND right side is non-constant value AND we have a FROM clause with multiple table refs.
 # For such operations, if there are multiple matches in the JOIN, the duplicated records can
 # get shuffled independently to different sink operators, therefore they cannot check the
 # presence of duplicates. For statements like this we need to raise an error during analysis.
@@ -257,7 +257,7 @@ UPDATE ice_id_partitioned set p = cast(ref_table.bi as int)
 FROM ice_id_partitioned, ref_table
 WHERE ice_id_partitioned.i = ref_table.i;
 ---- CATCH
-AnalysisException: Left-hand side in assignment 'p = CAST(ref_table.bi AS INT)' refers to a partitioning column
+AnalysisException: Cannot UPDATE partitioning column 'p' via UPDATE FROM statement with multiple table refs, and when right-hand side 'CAST(ref_table.bi AS INT)' is non-constant.
 ====
 ---- QUERY
 create table ice_bucket_transform(i int, str string, bi bigint, ts timestamp)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
new file mode 100644
index 000000000..cb3becfa6
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
@@ -0,0 +1,460 @@
+====
+---- QUERY
+CREATE TABLE id_part (i int, s string)
+PARTITIONED BY SPEC (i)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+UPDATE id_part SET i = 1;
+---- DML_RESULTS: id_part
+---- TYPES
+INT,STRING
+====
+---- QUERY
+SHOW FILES IN id_part;
+---- RESULTS
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+INSERT INTO id_part VALUES(1, 'one'), (2, 'two'), (3, 'three');
+UPDATE id_part SET i = cast(i * 10 as int);
+---- DML_RESULTS: id_part
+10,'one'
+20,'two'
+30,'three'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+INSERT INTO id_part VALUES(4, 'four'), (5, 'five'), (6, 'six');
+UPDATE id_part SET i =  cast(i / 2 as int), s = upper(s);
+---- DML_RESULTS: id_part
+5,'ONE'
+10,'TWO'
+15,'THREE'
+2,'FOUR'
+2,'FIVE'
+3,'SIX'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+SHOW FILES IN id_part;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=1/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=1/delete-.*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=10/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=10/delete-.*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=15/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=2/delete-.*parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=2/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=20/delete-.*parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=20/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=3/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=3/delete-.*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=30/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=30/delete.*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=4/delete-.*parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=4/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=5/(?!delete-).*.parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=6/delete-.*parq','.*B','','.*'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=6/(?!delete-).*.parq','.*B','','.*'
+---- RESULTS: VERIFY_IS_NOT_IN
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/id_part/data/i=15/delete-.*.parq','.*B','','.*'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+CREATE TABLE trunc_part (i int, s string)
+PARTITIONED BY SPEC (truncate(1, s))
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+# Delete from empty table is no-op.
+UPDATE trunc_part SET i = 1;
+---- DML_RESULTS: trunc_part
+---- TYPES
+INT,STRING
+====
+---- QUERY
+INSERT INTO trunc_part VALUES(1, 'one'), (2, 'two'), (3, 'three');
+UPDATE trunc_part SET i = cast(i + 100 as int) WHERE s like 't%';
+---- DML_RESULTS: trunc_part
+1,'one'
+102,'two'
+103,'three'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+INSERT INTO trunc_part VALUES(4, 'four'), (5, 'five'), (6, 'six');
+UPDATE trunc_part SET s = concat(upper(s), s), i = cast(i + 1000 as int) WHERE i % 2 = 0;
+---- DML_RESULTS: trunc_part
+1,'one'
+1102,'TWOtwo'
+103,'three'
+1004,'FOURfour'
+5,'five'
+1006,'SIXsix'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+CREATE TABLE multi_part (i int, s string, f double)
+PARTITIONED BY SPEC (bucket(7, i), truncate(1, s))
+SORT BY (f, i)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+# Delete from empty table is no-op.
+UPDATE multi_part SET i = 1;
+---- DML_RESULTS: multi_part
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+INSERT INTO multi_part VALUES(1, 'one', 1.1), (2, 'two', 2.2), (3, 'three', 3.33);
+UPDATE multi_part
+SET s = concat(s, s), f = 9.9, i = cast(i + 10 as int)
+WHERE i != (select min(i) from multi_part) and
+      i != (select max(i) from multi_part);
+---- DML_RESULTS: multi_part
+1,'one',1.1
+12,'twotwo',9.9
+3,'three', 3.33
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+INSERT INTO multi_part VALUES(4, 'four', 4.4), (5, 'five', 5.5), (6, 'six', 6.6);
+UPDATE multi_part SET i = 0, s = 'void', f = 3.14 WHERE i % 2 = 0;
+---- DML_RESULTS: multi_part
+1,'one',1.1
+0,'void',3.14
+3,'three', 3.33
+0,'void',3.14
+5,'five',5.5
+0,'void',3.14
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+# No matching rows to WHERE condition
+UPDATE multi_part SET i = 0, s = 'void', f = 3.14
+WHERE f > 1000;
+---- DML_RESULTS: multi_part
+1,'one',1.1
+0,'void',3.14
+3,'three',3.33
+0,'void',3.14
+5,'five',5.5
+0,'void',3.14
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+INSERT INTO multi_part VALUES (null, 'null',0.0);
+---- DML_RESULTS: multi_part
+1,'one',1.1
+0,'void',3.14
+3,'three',3.33
+0,'void',3.14
+5,'five',5.5
+0,'void',3.14
+NULL,'null',0.0
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+UPDATE multi_part SET i = 111, s = 'fox', f = 1.1 where i is null;
+---- DML_RESULTS: multi_part
+1,'one',1.1
+0,'void',3.14
+3,'three',3.33
+0,'void',3.14
+5,'five',5.5
+0,'void',3.14
+111,'fox',1.1
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+CREATE TABLE evolve_part (i int, s string, f double)
+SORT BY ZORDER(i,s)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+INSERT INTO evolve_part VALUES(1, 'one', 1.1), (2, 'two', 2.2), (3, 'three', 3.33);
+UPDATE evolve_part SET i = 222 WHERE i = 2;
+---- DML_RESULTS: evolve_part
+1,'one',1.1
+222,'two',2.2
+3,'three',3.33
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+ALTER TABLE evolve_part SET PARTITION SPEC (i);
+SELECT * FROM evolve_part;
+---- RESULTS
+1,'one',1.1
+222,'two',2.2
+3,'three',3.33
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+INSERT INTO evolve_part VALUES (10, 'ten', 10.10), (20, 'twenty', 20.20);
+---- DML_RESULTS: evolve_part
+1,'one',1.1
+222,'two',2.2
+3,'three',3.33
+10,'ten',10.10
+20,'twenty',20.20
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+UPDATE evolve_part SET i = cast(i + 1000 as int) where s like 't%';
+---- DML_RESULTS: evolve_part
+1,'one',1.1
+1222,'two',2.2
+1003,'three',3.33
+1010,'ten',10.10
+1020,'twenty',20.20
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+ALTER TABLE evolve_part SET PARTITION SPEC (truncate(1, s));
+---- DML_RESULTS: evolve_part
+1,'one',1.1
+1222,'two',2.2
+1003,'three',3.33
+1010,'ten',10.10
+1020,'twenty',20.20
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+INSERT INTO evolve_part VALUES (30, 'thirty', 30.30), (40, 'forty', 40.40), (50, 'fifty', 50.50);
+---- DML_RESULTS: evolve_part
+1,'one',1.1
+1222,'two',2.2
+1003,'three',3.33
+1010,'ten',10.10
+1020,'twenty',20.20
+30,'thirty',30.30
+40,'forty',40.40
+50,'fifty',50.50
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+UPDATE evolve_part SET i = cast(i + 100 as int), s = concat('+++', s);
+---- DML_RESULTS: evolve_part
+101,'+++one',1.1
+1322,'+++two',2.2
+1103,'+++three',3.33
+1110,'+++ten',10.10
+1120,'+++twenty',20.20
+130,'+++thirty',30.30
+140,'+++forty',40.40
+150,'+++fifty',50.50
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+ALTER TABLE evolve_part SET PARTITION SPEC (void(s));
+---- DML_RESULTS: evolve_part
+101,'+++one',1.1
+1322,'+++two',2.2
+1103,'+++three',3.33
+1110,'+++ten',10.10
+1120,'+++twenty',20.20
+130,'+++thirty',30.30
+140,'+++forty',40.40
+150,'+++fifty',50.50
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+UPDATE evolve_part SET s = substr(s, 4), i = cast(i - 100 as int);
+---- DML_RESULTS: evolve_part
+1,'one',1.1
+1222,'two',2.2
+1003,'three',3.33
+1010,'ten',10.10
+1020,'twenty',20.20
+30,'thirty',30.30
+40,'forty',40.40
+50,'fifty',50.50
+---- TYPES
+INT,STRING,DOUBLE
+====
+---- QUERY
+create table date_year_part (i int, d date)
+partitioned by spec (year(d))
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into date_year_part values
+  (1, '1968-01-01'), (2, '1969-12-31'), (3, '1970-01-01'), (4, '2023-11-15');
+UPDATE date_year_part SET d = years_add(d, 10), i = cast(i + 10 as int);
+---- DML_RESULTS: date_year_part
+11,1978-01-01
+12,1979-12-31
+13,1980-01-01
+14,2033-11-15
+---- TYPES
+INT, DATE
+====
+---- QUERY
+create table date_month_part (i int, d date)
+partitioned by spec (month(d))
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into date_month_part values
+  (1, '1968-01-01'), (2, '1969-12-31'), (3, '1970-01-01'), (4, '2023-11-15');
+UPDATE date_month_part SET d = years_add(d, 10), i = cast(i + 10 as int);
+---- DML_RESULTS: date_month_part
+11,1978-01-01
+12,1979-12-31
+13,1980-01-01
+14,2033-11-15
+---- TYPES
+INT, DATE
+====
+---- QUERY
+create table date_day_part (i int, d date)
+partitioned by spec (day(d))
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into date_day_part values
+  (1, '1968-01-01'), (2, '1969-12-31'), (3, '1970-01-01'), (4, '2023-11-15');
+UPDATE date_day_part SET d = years_add(d, 10), i = cast(i + 10 as int);
+---- DML_RESULTS: date_day_part
+11,1978-01-01
+12,1979-12-31
+13,1980-01-01
+14,2033-11-15
+---- TYPES
+INT, DATE
+====
+---- QUERY
+create table ts_year_part (i int, ts timestamp)
+partitioned by spec (year(ts))
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into ts_year_part values (1, '1968-01-01'),
+  (2, '1969-12-31'), (3, '1970-01-01'), (4, '2023-11-15');
+UPDATE ts_year_part SET ts = years_add(ts, -10), i = cast(i + 100 as int);
+---- DML_RESULTS: ts_year_part
+101,1958-01-01 00:00:00
+102,1959-12-31 00:00:00
+103,1960-01-01 00:00:00
+104,2013-11-15 00:00:00
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+create table ts_month_part (i int, ts timestamp)
+partitioned by spec (month(ts))
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into ts_month_part values (1, '1968-01-01 01:02:03'),
+  (2, '1969-12-31 23:59:00'), (3, '1970-01-01 00:00:00'),
+  (4, '2023-11-15 15:31:00');
+UPDATE ts_month_part SET ts = years_add(ts, -10), i = cast(i + 100 as int);
+---- DML_RESULTS: ts_month_part
+101,1958-01-01 01:02:03
+102,1959-12-31 23:59:00
+103,1960-01-01 00:00:00
+104,2013-11-15 15:31:00
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+create table ts_day_part (i int, ts timestamp)
+partitioned by spec (day(ts))
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into ts_day_part values (1, '1968-01-01 01:02:03'),
+  (2, '1969-12-31 23:59:00'), (3, '1970-01-01 00:00:00'),
+  (4, '2023-11-15 15:31:00');
+UPDATE ts_day_part SET ts = years_add(ts, -10), i = cast(i + 100 as int);
+---- DML_RESULTS: ts_day_part
+101,1958-01-01 01:02:03
+102,1959-12-31 23:59:00
+103,1960-01-01 00:00:00
+104,2013-11-15 15:31:00
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+create table ts_hour_part (i int, ts timestamp)
+partitioned by spec (hour(ts))
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into ts_hour_part values (1, '1968-01-01 01:02:03'),
+  (2, '1969-12-31 23:59:00'), (3, '1970-01-01 00:00:00'),
+  (4, '2023-11-15 15:31:00');
+UPDATE ts_hour_part SET ts = years_add(ts, -10), i = cast(i + 100 as int);;
+---- DML_RESULTS: ts_hour_part
+101,1958-01-01 01:02:03
+102,1959-12-31 23:59:00
+103,1960-01-01 00:00:00
+104,2013-11-15 15:31:00
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+create table ts_evolve_part (i int, ts timestamp)
+partitioned by spec (year(ts))
+sort by (ts, i)
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into ts_evolve_part values (1, '1968-01-01 01:02:03');
+alter table ts_evolve_part set partition spec (month(ts));
+insert into ts_evolve_part values (2, '1969-12-31 23:59:00');
+insert into ts_evolve_part values (111, 'invalid');
+alter table ts_evolve_part set partition spec (day(ts));
+insert into ts_evolve_part values (3, '1970-01-01 00:00:00');
+alter table ts_evolve_part set partition spec (hour(ts));
+insert into ts_evolve_part values (4, '2023-11-15 15:31:00');
+---- DML_RESULTS: ts_evolve_part
+1,1968-01-01 01:02:03
+2,1969-12-31 23:59:00
+3,1970-01-01 00:00:00
+4,2023-11-15 15:31:00
+111,NULL
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+UPDATE ts_evolve_part set i = cast(i + 1000 as int), ts = days_add(months_add(years_add(ts, 20), 1), 1);
+---- DML_RESULTS: ts_evolve_part
+1001,1988-02-02 01:02:03
+1002,1990-02-01 23:59:00
+1003,1990-02-02 00:00:00
+1004,2043-12-16 15:31:00
+1111,NULL
+---- TYPES
+INT, TIMESTAMP
+====
+---- QUERY
+create table numeric_truncate (id int, int_col int, bigint_col bigint, dec_8_0 decimal(8, 0), dec_10_2 decimal(10, 2))
+partitioned by spec (truncate(10, int_col), truncate(1000, bigint_col), void(id), truncate(20, dec_8_0), truncate(50, dec_10_2))
+stored by iceberg
+tblproperties ('format-version'='2');
+insert into numeric_truncate values (1, 12, 1222, 135, 20.75);
+UPDATE numeric_truncate SET dec_10_2 = 75.20, dec_8_0 = 531, bigint_col = 2111, int_col = 21, id = 11;
+---- DML_RESULTS: numeric_truncate
+11,21,2111,531,75.20
+---- TYPES
+INT,INT,BIGINT,DECIMAL,DECIMAL
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
new file mode 100644
index 000000000..9ef31cffe
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-stress.test
@@ -0,0 +1,39 @@
+====
+---- QUERY
+CREATE TABLE ice_store_sales
+PARTITIONED BY SPEC (truncate(500, ss_sold_date_sk))
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2')
+AS SELECT * FROM tpcds_parquet.store_sales;
+select count(*), sum(ss_ticket_number) from ice_store_sales where ss_item_sk % 1999 = 0
+---- RESULTS
+1569,188882461
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+update ice_store_sales set ss_ticket_number = ss_ticket_number + 100000 where ss_item_sk % 1999 = 0;
+select count(*), sum(ss_ticket_number) from ice_store_sales where ss_item_sk % 1999 = 0
+---- RESULTS
+1569,345782461
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+alter table ice_store_sales set partition spec (void(ss_sold_date_sk), truncate(1000, ss_sold_date_sk));
+alter table ice_store_sales sort by lexical (ss_ticket_number);
+insert into ice_store_sales SELECT * FROM tpcds_parquet.store_sales;
+select count(*), sum(ss_ticket_number) from ice_store_sales where ss_item_sk % 1999 = 0;
+---- RESULTS
+3138,534664922
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+update ice_store_sales set ss_ticket_number = ss_ticket_number + 100000 where ss_item_sk % 1999 = 0;
+select count(*), sum(ss_ticket_number) from ice_store_sales where ss_item_sk % 1999 = 0
+---- RESULTS
+3138,848464922
+---- TYPES
+BIGINT,BIGINT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 8b9c9fe5e..670f4c6bd 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -39,7 +39,6 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.iceberg_test_suite import IcebergTestSuite
 from tests.common.skip import SkipIf, SkipIfFS, SkipIfDockerizedCluster
 from tests.common.test_dimensions import add_exec_option_dimension
-from tests.common.test_vector import ImpalaTestDimension
 from tests.common.file_utils import (
   create_iceberg_table_from_directory,
   create_table_from_parquet)
@@ -1416,6 +1415,82 @@ class TestIcebergV2Table(IcebergTestSuite):
         "4,2023-11-13 18:07:23.0,gray,2500\n" \
         "8,2023-11-01 00:11:11.0,black,722\n"
 
+  def test_update_partitions(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-update-partitions', vector,
+        unique_database)
+    if IS_HDFS and self.should_run_for_hive(vector):
+      self._update_partitions_hive_tests(unique_database)
+
+  def _update_partitions_hive_tests(self, db):
+    def get_hive_results(tbl, order_by_col):
+      stmt = "SELECT * FROM {}.{} ORDER BY {}".format(db, tbl, order_by_col)
+      return self.run_stmt_in_hive(stmt).split("\n", 1)[1]
+
+    hive_results = get_hive_results("id_part", "i, s")
+    assert hive_results == \
+        "2,FIVE\n" \
+        "2,FOUR\n" \
+        "3,SIX\n" \
+        "5,ONE\n" \
+        "10,TWO\n" \
+        "15,THREE\n"
+
+    hive_results = get_hive_results("trunc_part", "i")
+    assert hive_results == \
+        "1,one\n" \
+        "5,five\n" \
+        "103,three\n" \
+        "1004,FOURfour\n" \
+        "1006,SIXsix\n" \
+        "1102,TWOtwo\n"
+
+    hive_results = get_hive_results("multi_part", "i")
+    assert hive_results == \
+        "0,void,3.14\n" \
+        "0,void,3.14\n" \
+        "0,void,3.14\n" \
+        "1,one,1.1\n" \
+        "3,three,3.33\n" \
+        "5,five,5.5\n" \
+        "111,fox,1.1\n"
+
+    hive_results = get_hive_results("evolve_part", "i")
+    assert hive_results == \
+        "1,one,1.1\n" \
+        "30,thirty,30.3\n" \
+        "40,forty,40.4\n" \
+        "50,fifty,50.5\n" \
+        "1003,three,3.33\n" \
+        "1010,ten,10.1\n" \
+        "1020,twenty,20.2\n" \
+        "1222,two,2.2\n"
+
+    hive_results = get_hive_results("date_day_part", "i")
+    assert hive_results == \
+        "11,1978-01-01\n" \
+        "12,1979-12-31\n" \
+        "13,1980-01-01\n" \
+        "14,2033-11-15\n"
+
+    hive_results = get_hive_results("ts_hour_part", "i")
+    assert hive_results == \
+        "101,1958-01-01 01:02:03.0\n" \
+        "102,1959-12-31 23:59:00.0\n" \
+        "103,1960-01-01 00:00:00.0\n" \
+        "104,2013-11-15 15:31:00.0\n"
+
+    hive_results = get_hive_results("ts_evolve_part", "i")
+    assert hive_results == \
+        "1001,1988-02-02 01:02:03.0\n" \
+        "1002,1990-02-01 23:59:00.0\n" \
+        "1003,1990-02-02 00:00:00.0\n" \
+        "1004,2043-12-16 15:31:00.0\n" \
+        "1111,NULL\n"
+
+    hive_results = get_hive_results("numeric_truncate", "id")
+    assert hive_results == \
+        "11,21,2111,531,75.20\n"
+
   def test_optimize(self, vector, unique_database):
     tbl_name = unique_database + ".optimize_iceberg"
     self.execute_query("""create table {0} (i int)
diff --git a/tests/stress/test_update_stress.py b/tests/stress/test_update_stress.py
index 5510dc1db..01c295e9d 100644
--- a/tests/stress/test_update_stress.py
+++ b/tests/stress/test_update_stress.py
@@ -23,18 +23,61 @@ import time
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
+from tests.common.test_vector import ImpalaTestDimension
 from tests.stress.stress_util import run_tasks, Task
+from tests.util.filesystem_utils import IS_HDFS
+
+
+# Longer-running UPDATE tests are executed here
+class TestIcebergV2UpdateStress(ImpalaTestSuite):
+  """UPDATE tests against Iceberg V2 tables."""
+
+  BATCH_SIZES = [0, 32]
+  EXHAUSTIVE_BATCH_SIZES = [0, 1, 11, 32]
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestIcebergV2UpdateStress, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'parquet')
+    if cls.exploration_strategy() == 'core':
+      cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('batch_size', *TestIcebergV2UpdateStress.BATCH_SIZES))
+    else:
+      cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('batch_size',
+            *TestIcebergV2UpdateStress.EXHAUSTIVE_BATCH_SIZES))
+
+  def test_update_stress(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-update-stress', vector,
+        unique_database)
+    if IS_HDFS:
+      self._update_stress_hive_tests(unique_database)
+
+  def _update_stress_hive_tests(self, db):
+    stmt = """
+        SELECT count(*), sum(ss_ticket_number)
+        FROM {}.ice_store_sales
+        WHERE ss_item_sk % 1999 = 0""".format(db)
+
+    hive_results = self.run_stmt_in_hive(stmt).split("\n", 1)[1]
+    assert hive_results == \
+        "3138,848464922\n"
 
 
 # Stress test for concurrent UPDATE operations against Iceberg tables.
-class TestIcebergUpdateStress(ImpalaTestSuite):
+class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
     return 'targeted-stress'
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestIcebergUpdateStress, cls).add_test_dimensions()
+    super(TestIcebergConcurrentUpdateStress, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: (v.get_value('table_format').file_format == 'parquet'
             and v.get_value('table_format').compression_codec == 'snappy'))