You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2023/07/05 22:05:53 UTC

[impala] branch master updated: IMPALA-11619: Improve Iceberg V2 reads with a custom Iceberg Position Delete operator

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d0fe4c604 IMPALA-11619: Improve Iceberg V2 reads with a custom Iceberg Position Delete operator
d0fe4c604 is described below

commit d0fe4c604f72d41019832513ebf65cfe8f469953
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Thu May 11 09:23:42 2023 +0200

    IMPALA-11619: Improve Iceberg V2 reads with a custom Iceberg Position Delete operator
    
    IcebergDeleteNode and IcebergDeleteBuild classes are based on
    PartitionedHashJoin counterparts. The actual "join" part of the node is
    optimized, while others are kept very similarly, to be able to integrate
    features of PartitionedHashJoin if needed (partitioning, spilling).
    
    ICEBERG_DELETE_JOIN is added as a join operator which is used only by
    IcebergDeleteNode node.
    
    IcebergDeleteBuild processes the data from the relevant delete files and
    stores them in a {file_path: ordered row id vector} hash map.
    
    IcebergDeleteNode tracks the processed file and progresses through the
    row id vector parallel to the probe batch to check if a row is deleted
    or hashes the probe row's file path and uses binary search to find the
    closest row id if it is needed for the check.
    
    Testing:
      - Duplicated related planner tests to run both with new operator and
    hash join
      - Added a dimension for e2e tests to run both with new operator and
    hash join
      - Added new multiblock tests to verify assumptions used in new
    operator to optimize probing
      - Added new test with BATCH_SIZE=2 to verify in/out batch handling
    with new operator
    
    Change-Id: I024a61573c83bda5584f243c879d9ff39dd2dcfa
    Reviewed-on: http://gerrit.cloudera.org:8080/19850
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/CMakeLists.txt                         |   2 +
 be/src/exec/blocking-join-node.cc                  |   3 +-
 be/src/exec/data-sink.cc                           |  11 +-
 be/src/exec/data-sink.h                            |   3 +-
 be/src/exec/exec-node.cc                           |   4 +
 be/src/exec/iceberg-delete-builder.cc              | 322 ++++++++++++++
 be/src/exec/iceberg-delete-builder.h               | 166 +++++++
 be/src/exec/iceberg-delete-node.cc                 | 475 +++++++++++++++++++++
 be/src/exec/iceberg-delete-node.h                  | 224 ++++++++++
 be/src/exec/join-builder.h                         |   2 +
 be/src/exec/join-op.h                              |   3 +-
 be/src/exec/partitioned-hash-join-node.cc          |   4 +-
 be/src/runtime/query-state.h                       |   4 +
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |   4 +-
 common/thrift/DataSinks.thrift                     |   1 +
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/PlanNodes.thrift                     |  10 +
 common/thrift/Query.thrift                         |   4 +-
 .../org/apache/impala/analysis/JoinOperator.java   |  29 +-
 .../apache/impala/planner/DistributedPlanner.java  | 141 +++++-
 .../org/apache/impala/planner/HashJoinNode.java    |   1 +
 .../apache/impala/planner/IcebergDeleteNode.java   | 225 ++++++++++
 .../apache/impala/planner/IcebergScanPlanner.java  |  47 +-
 .../org/apache/impala/planner/JoinBuildSink.java   |  11 +-
 .../java/org/apache/impala/planner/JoinNode.java   |   9 +-
 .../apache/impala/planner/NestedLoopJoinNode.java  |   1 +
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  11 +-
 .../java/org/apache/impala/analysis/ToSqlTest.java |  12 +-
 .../org/apache/impala/planner/PlannerTest.java     |  10 +
 testdata/data/README                               |  12 +
 ...eb011be-job_16881608248131_0342-1-00001.parquet | Bin 0 -> 1811714 bytes
 ...41f8028e-98cbf79b00000000_453028296_data.0.parq | Bin 0 -> 13614 bytes
 .../01dc3e0b-fe92-4d60-973b-fcbb58f71be5-m0.avro   | Bin 0 -> 5128 bytes
 .../e6680781-452a-41d3-a149-0136fa868069-m0.avro   | Bin 0 -> 4486 bytes
 ...889-1-e6680781-452a-41d3-a149-0136fa868069.avro | Bin 0 -> 2488 bytes
 ...835-1-01dc3e0b-fe92-4d60-973b-fcbb58f71be5.avro | Bin 0 -> 2332 bytes
 .../metadata/v1.metadata.json                      | 146 +++++++
 .../metadata/v2.metadata.json                      | 182 ++++++++
 .../metadata/v3.metadata.json                      | 212 +++++++++
 .../metadata/version-hint.txt                      |   1 +
 .../functional/functional_schema_template.sql      |  16 +
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/PlannerTest/iceberg-v2-delete.test     |  56 +--
 ...ables.test => iceberg-v2-tables-hash-join.test} |   2 +-
 .../queries/PlannerTest/iceberg-v2-tables.test     | 122 +++---
 .../queries/PlannerTest/tablesample.test           |  24 +-
 .../iceberg-v2-read-position-deletes.test          |  35 ++
 tests/query_test/test_iceberg.py                   |   4 +-
 49 files changed, 2402 insertions(+), 157 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 776f50fee..2496ff7ad 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -76,6 +76,8 @@ add_library(Exec
   hdfs-table-sink.cc
   hdfs-table-writer.cc
   hdfs-text-table-writer.cc
+  iceberg-delete-builder.cc
+  iceberg-delete-node.cc
   iceberg-delete-sink.cc
   incr-stats-util.cc
   join-builder.cc
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index da7c9a5fc..d08a507a3 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -91,7 +91,8 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) {
   switch (join_op_) {
     case TJoinOp::LEFT_ANTI_JOIN:
     case TJoinOp::LEFT_SEMI_JOIN:
-    case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN: {
+    case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
+    case TJoinOp::ICEBERG_DELETE_JOIN: {
       // Only return the surviving probe-side tuples.
       DCHECK(row_desc()->Equals(probe_row_desc()));
       break;
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index d41bbac8e..bb47b5837 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -21,19 +21,20 @@
 #include <map>
 
 #include "common/logging.h"
+#include "exec/blocking-plan-root-sink.h"
+#include "exec/buffered-plan-root-sink.h"
 #include "exec/exec-node.h"
 #include "exec/hbase/hbase-table-sink.h"
 #include "exec/hdfs-table-sink.h"
+#include "exec/iceberg-delete-builder.h"
 #include "exec/iceberg-delete-sink.h"
 #include "exec/kudu/kudu-table-sink.h"
 #include "exec/kudu/kudu-util.h"
-#include "exec/blocking-plan-root-sink.h"
-#include "exec/buffered-plan-root-sink.h"
 #include "exec/nested-loop-join-builder.h"
 #include "exec/partitioned-hash-join-builder.h"
 #include "exec/plan-root-sink.h"
-#include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "exprs/scalar-expr.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
@@ -120,6 +121,10 @@ Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink,
       *data_sink = pool->Add(new NljBuilderConfig());
       break;
     }
+    case TDataSinkType::ICEBERG_DELETE_BUILDER: {
+      *data_sink = pool->Add(new IcebergDeleteBuilderConfig());
+      break;
+    }
     default:
       stringstream error_msg;
       map<int, const char*>::const_iterator i =
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 8e094d1d5..357f4c8d0 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -196,7 +196,8 @@ class DataSink {
 
 static inline bool IsJoinBuildSink(const TDataSinkType::type& type) {
   return type == TDataSinkType::HASH_JOIN_BUILDER
-      || type == TDataSinkType::NESTED_LOOP_JOIN_BUILDER;
+      || type == TDataSinkType::NESTED_LOOP_JOIN_BUILDER
+      || type == TDataSinkType::ICEBERG_DELETE_BUILDER;
 }
 
 } // namespace impala
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index c456d3e69..7779cf2c7 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -37,6 +37,7 @@
 #include "exec/hbase/hbase-scan-node.h"
 #include "exec/hdfs-scan-node-mt.h"
 #include "exec/hdfs-scan-node.h"
+#include "exec/iceberg-delete-node.h"
 #include "exec/kudu/kudu-scan-node-mt.h"
 #include "exec/kudu/kudu-scan-node.h"
 #include "exec/kudu/kudu-util.h"
@@ -222,6 +223,9 @@ Status PlanNode::CreatePlanNode(
     case TPlanNodeType::CARDINALITY_CHECK_NODE:
       *node = pool->Add(new CardinalityCheckPlanNode());
       break;
+    case TPlanNodeType::ICEBERG_DELETE_NODE:
+      *node = pool->Add(new IcebergDeletePlanNode());
+      break;
     default:
       map<int, const char*>::const_iterator i =
           _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
diff --git a/be/src/exec/iceberg-delete-builder.cc b/be/src/exec/iceberg-delete-builder.cc
new file mode 100644
index 000000000..8c24a042a
--- /dev/null
+++ b/be/src/exec/iceberg-delete-builder.cc
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/iceberg-delete-builder.h"
+
+#include <filesystem>
+
+#include "exec/exec-node.h"
+#include "exec/join-op.h"
+#include "runtime/fragment-state.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/debug-util.h"
+#include "util/pretty-printer.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
+    "Failed to acquire initial read "
+    "buffer for stream in hash join node $0. Reducing query concurrency or increasing "
+    "the memory limit may help this query to complete successfully.";
+
+using namespace impala;
+
+DataSink* IcebergDeleteBuilderConfig::CreateSink(RuntimeState* state) const {
+  // We have one fragment per sink, so we can use the fragment index as the sink ID.
+  TDataSinkId sink_id = state->fragment().idx;
+  ObjectPool* pool = state->obj_pool();
+  return pool->Add(new IcebergDeleteBuilder(sink_id, *this, state));
+}
+
+IcebergDeleteBuilder* IcebergDeleteBuilderConfig::CreateSink(
+    BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
+    int64_t max_row_buffer_size, RuntimeState* state) const {
+  ObjectPool* pool = state->obj_pool();
+  return pool->Add(new IcebergDeleteBuilder(
+      *this, buffer_pool_client, spillable_buffer_size, max_row_buffer_size, state));
+}
+
+Status IcebergDeleteBuilderConfig::CreateConfig(FragmentState* state, int join_node_id,
+    TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+    IcebergDeleteBuilderConfig** sink) {
+  ObjectPool* pool = state->obj_pool();
+  TDataSink* tsink = pool->Add(new TDataSink());
+  IcebergDeleteBuilderConfig* data_sink = pool->Add(new IcebergDeleteBuilderConfig());
+  RETURN_IF_ERROR(data_sink->Init(state, join_node_id, join_op, build_row_desc, tsink));
+  *sink = data_sink;
+  return Status::OK();
+}
+
+void IcebergDeleteBuilderConfig::Close() {
+  DataSinkConfig::Close();
+}
+
+Status IcebergDeleteBuilderConfig::Init(FragmentState* state, int join_node_id,
+    TJoinOp::type join_op, const RowDescriptor* build_row_desc, TDataSink* tsink) {
+  DCHECK(join_op == TJoinOp::ICEBERG_DELETE_JOIN);
+  tsink->__isset.join_build_sink = true;
+  tsink->join_build_sink.__set_dest_node_id(join_node_id);
+  tsink->join_build_sink.__set_join_op(join_op);
+  RETURN_IF_ERROR(JoinBuilderConfig::Init(*tsink, build_row_desc, state));
+  build_row_desc_ = build_row_desc;
+  return Status::OK();
+}
+
+Status IcebergDeleteBuilderConfig::Init(
+    const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) {
+  DCHECK(tsink.join_build_sink.runtime_filters.empty());
+  RETURN_IF_ERROR(JoinBuilderConfig::Init(tsink, input_row_desc, state));
+  build_row_desc_ = input_row_desc;
+  return Status::OK();
+}
+
+IcebergDeleteBuilder::IcebergDeleteBuilder(TDataSinkId sink_id,
+    const IcebergDeleteBuilderConfig& sink_config, RuntimeState* state)
+  : JoinBuilder(sink_id, sink_config,
+        ConstructBuilderName("IcebergDelete", sink_config.join_node_id_), state),
+    runtime_state_(state),
+    runtime_profile_(state->runtime_profile()),
+    build_row_desc_(sink_config.build_row_desc_) {
+  DCHECK(num_probe_threads_ <= 1 || !NeedToProcessUnmatchedBuildRows(join_op_))
+      << "Returning rows with build partitions is not supported with shared builds";
+}
+
+IcebergDeleteBuilder::IcebergDeleteBuilder(const IcebergDeleteBuilderConfig& sink_config,
+    BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
+    int64_t max_row_buffer_size, RuntimeState* state)
+  : JoinBuilder(-1, sink_config,
+        ConstructBuilderName("IcebergDelete", sink_config.join_node_id_), state),
+    runtime_state_(state),
+    runtime_profile_(state->runtime_profile()),
+    build_row_desc_(sink_config.build_row_desc_) {
+  DCHECK_EQ(1, num_probe_threads_) << "Embedded builders cannot be shared";
+}
+
+IcebergDeleteBuilder::~IcebergDeleteBuilder() {}
+
+Status IcebergDeleteBuilder::CalculateDataFiles() {
+  auto& fragment_state_map = runtime_state_->query_state()->FragmentStateMap();
+  auto fragment_it = fragment_state_map.end();
+  PlanNode* delete_scan_node = nullptr;
+  bool found = false;
+  std::queue<const PlanNode*> q;
+  for (auto it = fragment_state_map.begin(); !found && it != fragment_state_map.end();
+       it++) {
+    q.push(it->second->plan_tree());
+    while (!q.empty()) {
+      auto* current = q.front();
+      q.pop();
+      if (current->tnode_->node_id == join_node_id_) {
+        fragment_it = it;
+        delete_scan_node = current->children_[0];
+        found = true;
+        while (!q.empty()) q.pop();
+        break;
+      }
+      for (auto* child : current->children_) {
+        q.push(child);
+      }
+    }
+  }
+
+  const vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs =
+      fragment_it->second->instance_ctx_pbs();
+  for (auto ctx : instance_ctx_pbs) {
+    ctx->per_node_scan_ranges().size();
+    auto ranges = ctx->per_node_scan_ranges().find(delete_scan_node->tnode_->node_id);
+    if (ranges == ctx->per_node_scan_ranges().end()) continue;
+
+    auto tuple_id = delete_scan_node->tnode_->hdfs_scan_node.tuple_id;
+    auto tuple_desc = runtime_state_->desc_tbl().GetTupleDescriptor(tuple_id);
+    DCHECK(tuple_desc->table_desc() != nullptr);
+    auto hdfs_table = static_cast<const HdfsTableDescriptor*>(tuple_desc->table_desc());
+    DCHECK(hdfs_table->IsIcebergTable());
+
+    for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) {
+      DCHECK(params.scan_range().has_hdfs_file_split());
+      const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
+
+      HdfsPartitionDescriptor* partition_desc =
+          hdfs_table->GetPartition(split.partition_id());
+
+      std::filesystem::path file_path;
+      if (split.relative_path().empty()) {
+        file_path.append(split.absolute_path());
+      } else {
+        file_path.append(partition_desc->location()).append(split.relative_path());
+      }
+      auto& file_path_str = file_path.native();
+      char* ptr_copy =
+          reinterpret_cast<char*>(expr_results_pool_->Allocate(file_path_str.length()));
+
+      if (ptr_copy == nullptr) {
+        return Status("Failed to allocate memory.");
+      }
+
+      memcpy(ptr_copy, file_path_str.c_str(), file_path_str.length());
+
+      std::pair<DeleteRowHashTable::iterator, bool> retval =
+          deleted_rows_.emplace(std::piecewise_construct,
+              std::forward_as_tuple(ptr_copy, file_path_str.length()),
+              std::forward_as_tuple());
+
+      // emplace succeeded, reserve capacity for the new file
+      if (retval.second) retval.first->second.reserve(INITIAL_DELETE_VECTOR_CAPACITY);
+    }
+  }
+
+  is_distributed_mode_ = deleted_rows_.empty();
+
+  return Status::OK();
+}
+
+Status IcebergDeleteBuilder::Prepare(
+    RuntimeState* state, MemTracker* parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
+
+  RETURN_IF_ERROR(CalculateDataFiles());
+
+  num_build_rows_ = ADD_COUNTER(profile(), "BuildRows", TUnit::UNIT);
+
+  RETURN_IF_ERROR(DebugAction(state->query_options(), "ID_BUILDER_PREPARE"));
+
+  const auto& tuple_descs = build_row_desc_->tuple_descriptors();
+  const auto& slot_descs = tuple_descs[0]->slots();
+
+  file_path_offset_ = slot_descs[0]->tuple_offset();
+  pos_offset_ = slot_descs[1]->tuple_offset();
+
+  position_sort_timer_ = ADD_TIMER(runtime_profile_, "IcebergDeletePositionSortTimer");
+
+  return Status::OK();
+}
+
+Status IcebergDeleteBuilder::Open(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(DataSink::Open(state));
+  return Status::OK();
+}
+
+Status IcebergDeleteBuilder::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(AddBatch(batch));
+  COUNTER_ADD(num_build_rows_, batch->num_rows());
+  return Status::OK();
+}
+
+Status IcebergDeleteBuilder::AddBatch(RowBatch* batch) {
+  RETURN_IF_ERROR(ProcessBuildBatch(batch));
+  return Status::OK();
+}
+
+Status IcebergDeleteBuilder::FlushFinal(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  return FinalizeBuild(state);
+}
+
+Status IcebergDeleteBuilder::FinalizeBuild(RuntimeState* state) {
+  {
+    SCOPED_TIMER(position_sort_timer_);
+    for (auto& ids : deleted_rows_) {
+      DeleteRowVector& vec = ids.second;
+      std::sort(vec.begin(), vec.end());
+
+      // Iceberg allows concurrent deletes, there can be duplicates
+      vec.erase(std::unique(vec.begin(), vec.end()), vec.end());
+    }
+  }
+
+  if (is_separate_build_) {
+    HandoffToProbesAndWait(state);
+  }
+  return Status::OK();
+}
+
+void IcebergDeleteBuilder::Close(RuntimeState* state) {
+  if (closed_) return;
+  obj_pool_.Clear();
+  DataSink::Close(state);
+  closed_ = true;
+}
+
+void IcebergDeleteBuilder::Reset(RowBatch* row_batch) {
+  DCHECK(!is_separate_build_);
+  deleted_rows_.clear();
+  expr_results_pool_->Clear();
+}
+
+string IcebergDeleteBuilder::DebugString() const {
+  stringstream ss;
+  ss << " IcebergDeleteBuilder op=" << join_op_
+     << " is_separate_build=" << is_separate_build_
+     << " num_probe_threads=" << num_probe_threads_ << endl;
+  return ss.str();
+}
+
+Status IcebergDeleteBuilder::ProcessBuildBatch(RowBatch* build_batch) {
+  FOREACH_ROW(build_batch, 0, build_batch_iter) {
+    TupleRow* build_row = build_batch_iter.Get();
+
+    impala::StringValue* file_path =
+        build_row->GetTuple(0)->GetStringSlot(file_path_offset_);
+
+    if (UNLIKELY(file_path->len == 0)) {
+      return Status(Substitute("NULL found as file_path in delete file"));
+    }
+    int64_t* id = build_row->GetTuple(0)->GetBigIntSlot(pos_offset_);
+    const int length = file_path->len;
+    if (is_distributed_mode_) {
+      // Distributed mode, deleted_rows_ is empty after init, only the relevant delete
+      // files are sent to this fragment, processing everything
+
+      auto it = deleted_rows_.find(*file_path);
+      if (it == deleted_rows_.end()) {
+        char* ptr_copy = reinterpret_cast<char*>(expr_results_pool_->Allocate(length));
+        if (ptr_copy == nullptr) {
+          return Status("Failed to allocate memory.");
+        }
+
+        memcpy(ptr_copy, file_path->ptr, length);
+
+        std::pair<DeleteRowHashTable::iterator, bool> retval =
+            deleted_rows_.emplace(std::piecewise_construct,
+                std::forward_as_tuple(ptr_copy, length), std::forward_as_tuple());
+        // emplace succeeded
+        DCHECK(retval.second == true);
+
+        it = retval.first;
+        it->second.reserve(INITIAL_DELETE_VECTOR_CAPACITY);
+      }
+
+      it->second.emplace_back(*id);
+    } else {
+      // Broadcast mode, deleted_rows_ filled with the relevant data file names,
+      // processing only those
+      auto it = deleted_rows_.find(*file_path);
+      if (it != deleted_rows_.end()) {
+        it->second.emplace_back(*id);
+      }
+    }
+  }
+
+  return Status::OK();
+}
diff --git a/be/src/exec/iceberg-delete-builder.h b/be/src/exec/iceberg-delete-builder.h
new file mode 100644
index 000000000..3174b60b4
--- /dev/null
+++ b/be/src/exec/iceberg-delete-builder.h
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <vector>
+
+#include "common/object-pool.h"
+#include "common/status.h"
+#include "exec/join-builder.h"
+
+namespace impala {
+
+class IcebergDeleteBuilder;
+class RowDescriptor;
+class RuntimeState;
+
+/// Iceberg Delete Builder Config class. This has a few extra methods to be used
+/// directly by the IcebergDeletePlanNode.
+class IcebergDeleteBuilderConfig : public JoinBuilderConfig {
+ public:
+  DataSink* CreateSink(RuntimeState* state) const override;
+
+  /// Creates an IcebergDeleteBuilder for embedded use within an IcebergDeleteNode.
+  IcebergDeleteBuilder* CreateSink(BufferPool::ClientHandle* buffer_pool_client,
+      int64_t spillable_buffer_size, int64_t max_row_buffer_size,
+      RuntimeState* state) const;
+
+  /// Creates an IcebergDeleteBuilderConfig for embedded use within an
+  /// IcebergDeleteNode. Creates the object in the state's object pool. To be
+  /// used only by IcebergDeletePlanNode.
+  static Status CreateConfig(FragmentState* state, int join_node_id,
+      TJoinOp::type join_op, const RowDescriptor* build_row_desc,
+      IcebergDeleteBuilderConfig** sink);
+
+  void Close() override;
+
+  ~IcebergDeleteBuilderConfig() override {}
+
+  const RowDescriptor* build_row_desc_;
+
+ protected:
+  /// Initialization for separate sink.
+  Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
+      FragmentState* state) override;
+
+ private:
+  /// Helper method used by CreateConfig() to initialize embedded builder.
+  /// 'tsink' does not need to be initialized by the caller - all values to be used are
+  /// passed in as arguments and this function fills in required fields in 'tsink'.
+  Status Init(FragmentState* state, int join_node_id, TJoinOp::type join_op,
+      const RowDescriptor* build_row_desc, TDataSink* tsink);
+};
+
+/// The build side for the IcebergDeleteNode. Processed the scanned data from delete
+/// files, and stores them in unordered_map<file_path, ordered vector of row ids> to allow
+/// fast probing.
+///
+/// Similarly to PartitionedHashJoin, there are 2 modes:
+///
+///   Broadcast: every fragment receives all data from delete files, filters them and
+///   stores only the ones which will be needed to process the assigned data files.
+///
+///   Partitioned: Both data and delete files are hashed by the file path. This means
+///   there is no need to filter further the delete files, but it can cause minor data
+///   skew due to the imbalance in the number of deleted rows corresponding to different
+///   data files.
+///
+/// Shared Build
+/// ------------
+/// A separate builder can be shared between multiple IcebergDeleteNodes.
+class IcebergDeleteBuilder : public JoinBuilder {
+ public:
+  // Constructor for separate join build.
+  IcebergDeleteBuilder(TDataSinkId sink_id, const IcebergDeleteBuilderConfig& sink_config,
+      RuntimeState* state);
+  // Constructor for join builder embedded in a IcebergDeleteNode. Shares
+  // 'buffer_pool_client' with the parent node and inherits buffer sizes from
+  // the parent node.
+  IcebergDeleteBuilder(const IcebergDeleteBuilderConfig& sink_config,
+      BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
+      int64_t max_row_buffer_size, RuntimeState* state);
+  ~IcebergDeleteBuilder();
+
+  // Checks distribution mode and collects the processed data files' file path in case
+  // of broadcast mode.
+  Status CalculateDataFiles();
+
+  /// Implementations of DataSink interface methods.
+  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
+  Status Open(RuntimeState* state) override;
+  Status Send(RuntimeState* state, RowBatch* batch) override;
+  Status FlushFinal(RuntimeState* state) override;
+  void Close(RuntimeState* state) override;
+
+  /// Reset the builder the same state as it was in after calling Open().
+  /// Not valid to call on a separate join build.
+  void Reset(RowBatch* row_batch);
+
+  std::string DebugString() const;
+
+  struct StringValueHashWrapper {
+    size_t operator()(const impala::StringValue& str) const {
+      return impala::hash_value(str);
+    }
+  };
+
+  using DeleteRowVector = std::vector<int64_t>;
+  using DeleteRowHashTable =
+      std::unordered_map<impala::StringValue, DeleteRowVector, StringValueHashWrapper>;
+
+  DeleteRowHashTable& deleted_rows() { return deleted_rows_; }
+  bool IsDistributedMode() { return is_distributed_mode_; }
+
+ private:
+  /// Reads the rows in build_batch and collects them into delete_hash_.
+  Status ProcessBuildBatch(RowBatch* build_batch);
+
+  /// Helper method for Send() that does the actual work apart from updating the
+  /// counters.
+  Status AddBatch(RowBatch* build_batch);
+
+  /// Helper method for FlushFinal() that does the actual work.
+  Status FinalizeBuild(RuntimeState* state);
+
+  RuntimeState* const runtime_state_;
+
+  /// Pool for objects with same lifetime as builder.
+  ObjectPool obj_pool_;
+
+  // Runtime profile for this node. Owned by the QueryState's ObjectPool.
+  RuntimeProfile* const runtime_profile_;
+
+  // Measuring the time took to sort row ids
+  RuntimeProfile::Counter* position_sort_timer_;
+
+  // Specification of iceberg delete files allows to optimize for data extraction
+  const RowDescriptor* build_row_desc_;
+  int file_path_offset_;
+  int pos_offset_;
+
+  // Distribution mode of the node
+  bool is_distributed_mode_;
+
+  // Use the length of a cache line as initial capacity
+  static constexpr size_t INITIAL_DELETE_VECTOR_CAPACITY = 8;
+
+  // Stores {file_path: ordered row ids vector}
+  DeleteRowHashTable deleted_rows_;
+};
+} // namespace impala
diff --git a/be/src/exec/iceberg-delete-node.cc b/be/src/exec/iceberg-delete-node.cc
new file mode 100644
index 000000000..04f024ca0
--- /dev/null
+++ b/be/src/exec/iceberg-delete-node.cc
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/iceberg-delete-node.h"
+
+#include <sstream>
+
+#include "exec/blocking-join-node.inline.h"
+#include "exec/exec-node-util.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+Status IcebergDeletePlanNode::Init(const TPlanNode& tnode, FragmentState* state) {
+  RETURN_IF_ERROR(BlockingJoinPlanNode::Init(tnode, state));
+
+  DCHECK(tnode.__isset.join_node);
+  DCHECK(tnode.join_node.__isset.iceberg_delete_node);
+
+  // TODO: IMPALA-12265: create the config only if it is necessary
+  RETURN_IF_ERROR(IcebergDeleteBuilderConfig::CreateConfig(state, tnode_->node_id,
+      tnode_->join_node.join_op, &build_row_desc(), &id_builder_config_));
+  return Status::OK();
+}
+
+void IcebergDeletePlanNode::Close() {
+  if (id_builder_config_ != nullptr) id_builder_config_->Close();
+  PlanNode::Close();
+}
+
+Status IcebergDeletePlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new IcebergDeleteNode(state, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+IcebergDeleteNode::IcebergDeleteNode(
+    RuntimeState* state, const IcebergDeletePlanNode& pnode, const DescriptorTbl& descs)
+  : BlockingJoinNode("IcebergDeleteNode", state->obj_pool(), pnode, descs) {}
+
+IcebergDeleteNode::~IcebergDeleteNode() {}
+
+Status IcebergDeleteNode::Prepare(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+
+  RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
+  runtime_state_ = state;
+  if (!UseSeparateBuild(state->query_options())) {
+    const IcebergDeleteBuilderConfig& builder_config =
+        *static_cast<const IcebergDeletePlanNode&>(plan_node_).id_builder_config_;
+    builder_ = builder_config.CreateSink(buffer_pool_client(),
+        resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size,
+        state);
+    RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
+    runtime_profile()->PrependChild(builder_->profile());
+  }
+
+  auto& tuple_descs = probe_row_desc().tuple_descriptors();
+  auto& slot_descs = tuple_descs[0]->slots();
+
+  for (auto& slot : slot_descs) {
+    if (slot->virtual_column_type() == TVirtualColumnType::FILE_POSITION) {
+      pos_offset_ = slot->tuple_offset();
+    }
+    if (slot->virtual_column_type() == TVirtualColumnType::INPUT_FILE_NAME) {
+      file_path_offset_ = slot->tuple_offset();
+    }
+  }
+
+  return Status::OK();
+}
+
+Status IcebergDeleteNode::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  ScopedOpenEventAdder ea(this);
+  JoinBuilder* tmp_builder = nullptr;
+  RETURN_IF_ERROR(BlockingJoinNode::OpenImpl(state, &tmp_builder));
+  if (builder_ == nullptr) {
+    DCHECK(UseSeparateBuild(state->query_options()));
+    builder_ = dynamic_cast<IcebergDeleteBuilder*>(tmp_builder);
+    DCHECK(builder_ != nullptr);
+  }
+
+  // Check for errors and free expr result allocations before opening children.
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+  // The prepare functions of probe expressions may have made result allocations
+  // implicitly (e.g. calling UdfBuiltins::Lower()). The probe expressions' expr result
+  // allocations need to be cleared now as they don't get cleared again till probing.
+  // Other exprs' result allocations are cleared in QueryMaintenance().
+
+  RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_));
+  RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
+  ResetForProbe();
+  probe_state_ = ProbeState::PROBING_IN_BATCH;
+  iceberg_delete_state_.Init(builder_);
+  return Status::OK();
+}
+
+Status IcebergDeleteNode::AcquireResourcesForBuild(RuntimeState* state) {
+  if (!buffer_pool_client()->is_registered()) {
+    RETURN_IF_ERROR(ClaimBufferReservation(state));
+  }
+  return Status::OK();
+}
+
+Status IcebergDeleteNode::Reset(RuntimeState* state, RowBatch* row_batch) {
+  builder_->Reset(nullptr);
+  iceberg_delete_state_.Reset();
+  return BlockingJoinNode::Reset(state, row_batch);
+}
+
+void IcebergDeleteNode::Close(RuntimeState* state) {
+  if (is_closed()) return;
+  // IMPALA-9737: free batches in case attached buffers need to be freed to
+  // transfer reservation to 'builder_'.
+  if (build_batch_ != nullptr) build_batch_->Reset();
+  if (probe_batch_ != nullptr) probe_batch_->Reset();
+  if (builder_ != nullptr) {
+    bool separate_build = UseSeparateBuild(state->query_options());
+    if (!separate_build || waited_for_build_) {
+      builder_->CloseFromProbe(state);
+      waited_for_build_ = false;
+    }
+  }
+  iceberg_delete_state_.Reset();
+  BlockingJoinNode::Close(state);
+}
+
+Status IcebergDeleteNode::NextProbeRowBatch(
+    RuntimeState* state, RowBatch* out_batch, bool* eos) {
+  DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
+  RETURN_IF_ERROR(NextProbeRowBatchFromChild(state, out_batch, eos));
+  return Status::OK();
+}
+
+Status IcebergDeleteNode::NextProbeRowBatchFromChild(
+    RuntimeState* state, RowBatch* out_batch, bool* eos) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
+  DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
+  *eos = false;
+  do {
+    // Loop until we find a non-empty row batch.
+    probe_batch_->TransferResourceOwnership(out_batch);
+    if (out_batch->AtCapacity()) {
+      // This out batch is full. Need to return it before getting the next batch.
+      probe_batch_pos_ = -1;
+      return Status::OK();
+    }
+    if (probe_side_eos_) {
+      current_probe_row_ = nullptr;
+      probe_batch_pos_ = -1;
+      *eos = true;
+      return Status::OK();
+    }
+    RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_));
+    COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows());
+  } while (probe_batch_->num_rows() == 0);
+
+  ResetForProbe();
+  return Status::OK();
+}
+
+Status IcebergDeleteNode::ProcessProbeBatch(RowBatch* out_batch) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_IN_BATCH);
+  DCHECK_NE(probe_batch_pos_, -1);
+  int rows_added = 0;
+  Status status;
+  TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
+  SCOPED_TIMER(probe_timer_);
+
+  rows_added = ProcessProbeBatch(prefetch_mode, out_batch, &status);
+
+  if (UNLIKELY(rows_added < 0)) {
+    DCHECK(!status.ok());
+    return status;
+  }
+  DCHECK(status.ok());
+  out_batch->CommitRows(rows_added);
+  return Status::OK();
+}
+
+Status IcebergDeleteNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  ScopedGetNextEventAdder ea(this, eos);
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  DCHECK(!out_batch->AtCapacity());
+
+  Status status = Status::OK();
+  *eos = false;
+  // Save the number of rows in case GetNext() is called with a non-empty batch,
+  // which can happen in a subplan.
+  int num_rows_before = out_batch->num_rows();
+
+  // This loop executes the 'probe_state_' state machine until either a full batch is
+  // produced, resources are attached to 'out_batch' that require flushing, or eos
+  // is reached (i.e. all rows are returned). The next call into GetNext() will resume
+  // execution of the state machine where the current call into GetNext() left off.
+  // See the definition of ProbeState for description of the state machine and states.
+  do {
+    DCHECK(status.ok());
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(QueryMaintenance(state));
+    switch (probe_state_) {
+      case ProbeState::PROBING_IN_BATCH: {
+        // Finish processing rows in the current probe batch.
+        RETURN_IF_ERROR(ProcessProbeBatch(out_batch));
+        if (probe_batch_pos_ == probe_batch_->num_rows()
+            && current_probe_row_ == nullptr) {
+          probe_state_ = ProbeState::PROBING_END_BATCH;
+        }
+        break;
+      }
+      case ProbeState::PROBING_END_BATCH: {
+        // Try to get the next row batch from the current probe input.
+        bool probe_eos;
+        RETURN_IF_ERROR(NextProbeRowBatch(state, out_batch, &probe_eos));
+        if (probe_batch_pos_ == 0) {
+          // Got a batch, need to process it.
+          probe_state_ = ProbeState::PROBING_IN_BATCH;
+        } else if (probe_eos) {
+          DCHECK_EQ(probe_batch_pos_, -1);
+          // Finished processing all the probe rows
+          RETURN_IF_ERROR(DoneProbing(state, out_batch));
+          probe_state_ = ProbeState::EOS;
+        } else {
+          // Got an empty batch with resources that we need to flush before getting the
+          // next batch.
+          DCHECK_EQ(probe_batch_pos_, -1);
+        }
+        break;
+      }
+      case ProbeState::EOS: {
+        // Ensure that all potential sources of output rows are exhausted.
+        DCHECK(probe_side_eos_);
+        *eos = true;
+        break;
+      }
+      default:
+        DCHECK(false) << "invalid probe_state_" << static_cast<int>(probe_state_);
+        break;
+    }
+  } while (!out_batch->AtCapacity() && !*eos);
+
+  int num_rows_added = out_batch->num_rows() - num_rows_before;
+  DCHECK_GE(num_rows_added, 0);
+
+  if (limit_ != -1 && rows_returned() + num_rows_added > limit_) {
+    // Truncate the row batch if we went over the limit.
+    num_rows_added = limit_ - rows_returned();
+    DCHECK_GE(num_rows_added, 0);
+    out_batch->set_num_rows(num_rows_before + num_rows_added);
+    probe_batch_->TransferResourceOwnership(out_batch);
+    *eos = true;
+  }
+
+  IncrementNumRowsReturned(num_rows_added);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
+  return Status::OK();
+}
+
+Status IcebergDeleteNode::DoneProbing(RuntimeState* state, RowBatch* batch) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
+  DCHECK_EQ(probe_batch_pos_, -1);
+  VLOG(2) << "Probe Side Consumed\n" << NodeDebugString();
+  return Status::OK();
+}
+
+string IcebergDeleteNode::NodeDebugString() const {
+  stringstream ss;
+  ss << "IcebergDeleteNode (id=" << id() << " op=" << join_op_ << ")" << endl;
+
+  if (builder_ != nullptr) {
+    ss << "IcebergDeleteBuilder: " << builder_->DebugString();
+  }
+
+  return ss.str();
+}
+
+void IcebergDeleteNode::IcebergDeleteState::Init(IcebergDeleteBuilder* builder) {
+  builder_ = builder;
+  current_file_path_ = nullptr;
+  previous_file_path_ = nullptr;
+  current_delete_row_ = nullptr;
+  current_deleted_pos_row_id_ = INVALID_ROW_ID;
+  current_probe_pos_ = INVALID_ROW_ID;
+}
+
+void IcebergDeleteNode::IcebergDeleteState::UpdateImpl() {
+  DCHECK(current_delete_row_ != nullptr);
+  // We need to use binary search to find the next delete candidate in 2 cases:
+  //   1. new file (or start of row batch)
+  //   2. discontinuity in probe row ids
+  auto next_deleted_pos_it_ = std::lower_bound(
+      current_delete_row_->begin(), current_delete_row_->end(), current_probe_pos_);
+  if (next_deleted_pos_it_ == current_delete_row_->end()) {
+    current_deleted_pos_row_id_ = INVALID_ROW_ID;
+  } else {
+    current_deleted_pos_row_id_ =
+        std::distance(current_delete_row_->begin(), next_deleted_pos_it_);
+  }
+}
+
+void IcebergDeleteNode::IcebergDeleteState::Update(
+    impala::StringValue* file_path, int64_t* probe_pos) {
+  DCHECK(builder_ != nullptr);
+  // Making sure the row ids are in ascending order inside a row batch in broadcast mode
+  DCHECK(builder_->IsDistributedMode() || current_probe_pos_ == INVALID_ROW_ID
+      || current_probe_pos_ < *probe_pos);
+  DCHECK(!builder_->IsDistributedMode() || previous_file_path_ == nullptr
+      || *file_path != *previous_file_path_ || current_probe_pos_ == INVALID_ROW_ID
+      || current_probe_pos_ < *probe_pos);
+  current_probe_pos_ = *probe_pos;
+
+  if (previous_file_path_ != nullptr
+      && (!builder_->IsDistributedMode() || *file_path == *previous_file_path_)) {
+    // Fast path if the file did not change, no need to hash again
+    if (current_deleted_pos_row_id_ != INVALID_ROW_ID
+        && current_probe_pos_ > (*current_delete_row_)[current_deleted_pos_row_id_]) {
+      UpdateImpl();
+    }
+  } else {
+    auto it = builder_->deleted_rows().find(*file_path);
+    if (it != builder_->deleted_rows().end()) {
+      current_file_path_ = &it->first;
+      current_delete_row_ = &it->second;
+      UpdateImpl();
+      previous_file_path_ = current_file_path_;
+    }
+  }
+}
+
+bool IcebergDeleteNode::IcebergDeleteState::IsDeleted() const {
+  DCHECK(builder_ != nullptr);
+
+  if (current_deleted_pos_row_id_ == INVALID_ROW_ID) return false;
+
+  DCHECK(current_probe_pos_ != INVALID_ROW_ID);
+  DCHECK(current_delete_row_ != nullptr);
+  DCHECK(current_deleted_pos_row_id_ < current_delete_row_->size());
+
+  return current_probe_pos_ == (*current_delete_row_)[current_deleted_pos_row_id_];
+}
+
+void IcebergDeleteNode::IcebergDeleteState::Delete() {
+  DCHECK(builder_ != nullptr);
+
+  current_deleted_pos_row_id_++;
+  if (current_deleted_pos_row_id_ == current_delete_row_->size()) {
+    current_deleted_pos_row_id_ = INVALID_ROW_ID;
+  }
+}
+
+bool IcebergDeleteNode::IcebergDeleteState::NeedCheck() const {
+  return builder_->IsDistributedMode() || current_deleted_pos_row_id_ != INVALID_ROW_ID
+      || current_probe_pos_ == INVALID_ROW_ID;
+}
+
+void IcebergDeleteNode::IcebergDeleteState::Clear() {
+  current_file_path_ = nullptr;
+  previous_file_path_ = nullptr;
+  current_delete_row_ = nullptr;
+  current_deleted_pos_row_id_ = INVALID_ROW_ID;
+  current_probe_pos_ = INVALID_ROW_ID;
+}
+
+void IcebergDeleteNode::IcebergDeleteState::Reset() {
+  builder_ = nullptr;
+  Clear();
+}
+
+bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRow(
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
+  DCHECK(current_probe_row_ != nullptr);
+  TupleRow* out_row = out_batch_iterator->Get();
+  if (!iceberg_delete_state_.IsDeleted()) {
+    out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
+    matched_probe_ = true;
+    --(*remaining_capacity);
+    if (*remaining_capacity == 0) return false;
+    out_row = out_batch_iterator->Next();
+  } else {
+    iceberg_delete_state_.Delete();
+  }
+  return true;
+}
+
+bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRowNoCheck(
+    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
+  DCHECK(current_probe_row_ != nullptr);
+  TupleRow* out_row = out_batch_iterator->Get();
+  out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
+  matched_probe_ = true;
+  --(*remaining_capacity);
+  if (*remaining_capacity == 0) return false;
+  out_row = out_batch_iterator->Next();
+  return true;
+}
+
+int IcebergDeleteNode::ProcessProbeBatch(
+    TPrefetchMode::type prefetch_mode, RowBatch* out_batch, Status* __restrict__ status) {
+  DCHECK(!out_batch->AtCapacity());
+  DCHECK_GE(probe_batch_pos_, 0);
+  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->AddRow());
+  const int max_rows = out_batch->capacity() - out_batch->num_rows();
+  // Note that 'probe_batch_pos_' is the row no. of the row after 'current_probe_row_'.
+  RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
+  int remaining_capacity = max_rows;
+
+  while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0 && status->ok()) {
+    current_probe_row_ = probe_batch_iterator.Get();
+    if (iceberg_delete_state_.NeedCheck()) {
+      impala::StringValue* file_path =
+          current_probe_row_->GetTuple(0)->GetStringSlot(file_path_offset_);
+      int64_t* current_probe_pos =
+          current_probe_row_->GetTuple(0)->GetBigIntSlot(pos_offset_);
+
+      iceberg_delete_state_.Update(file_path, current_probe_pos);
+      if (!ProcessProbeRow(&out_batch_iterator, &remaining_capacity, status)) {
+        if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
+      }
+    } else {
+      if (!ProcessProbeRowNoCheck(&out_batch_iterator, &remaining_capacity, status)) {
+        if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
+      }
+    }
+
+    probe_batch_iterator.Next();
+
+    // Update where we are in the probe batch.
+    probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0));
+  }
+
+  int num_rows_added;
+  if (LIKELY(status->ok())) {
+    num_rows_added = max_rows - remaining_capacity;
+  } else {
+    num_rows_added = -1;
+  }
+
+  // Clear state as ascending order of row ids are not guaranteed between probe row
+  // batches
+  if (probe_batch_iterator.AtEnd()) {
+    current_probe_row_ = nullptr;
+    iceberg_delete_state_.Clear();
+  }
+
+  DCHECK_GE(probe_batch_pos_, 0);
+  DCHECK_LE(probe_batch_pos_, probe_batch_->capacity());
+  DCHECK_LE(num_rows_added, max_rows);
+  return num_rows_added;
+}
+} // namespace impala
diff --git a/be/src/exec/iceberg-delete-node.h b/be/src/exec/iceberg-delete-node.h
new file mode 100644
index 000000000..0775f1cad
--- /dev/null
+++ b/be/src/exec/iceberg-delete-node.h
@@ -0,0 +1,224 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/blocking-join-node.h"
+#include "exec/iceberg-delete-builder.h"
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+class ExecNode;
+class FragmentState;
+class RowBatch;
+class TupleRow;
+
+class IcebergDeletePlanNode : public BlockingJoinPlanNode {
+ public:
+  Status Init(const TPlanNode& tnode, FragmentState* state) override;
+  void Close() override;
+  Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+
+  ~IcebergDeletePlanNode() {}
+
+  /// Data sink config object for creating a id builder that will be eventually used by
+  /// the exec node.
+  IcebergDeleteBuilderConfig* id_builder_config_;
+};
+
+/// Operator to perform iceberg delete.
+///
+/// The high-level algorithm is as follows:
+///  1. Consume all build input.
+///  2. Construct hash table.
+///  3. Consume the probe input.
+///
+/// IMPLEMENTATION DETAILS:
+/// -----------------------
+/// The iceberg delete algorithm is implemented with the IcebergDeleteNode
+/// and IcebergDeleteBuilder classes. Each delete node has a builder (see
+/// IcebergDeleteBuilder) that stores and builds hash tables over the build
+/// rows.
+///
+/// The above algorithm has the following phases:
+///
+///   1. Read build rows from the right input plan tree. Everything is kept in memory.
+///
+///   2. Read the probe rows from child(0) and filter them based on the data in the
+///      hash table
+///
+///      This phase has sub-states (see ProbeState) that are used in GetNext() to drive
+///      progress.
+///
+class IcebergDeleteNode : public BlockingJoinNode {
+ public:
+  IcebergDeleteNode(RuntimeState* state, const IcebergDeletePlanNode& pnode,
+      const DescriptorTbl& descs);
+  ~IcebergDeleteNode();
+
+  Status Prepare(RuntimeState* state) override;
+  Status Open(RuntimeState* state) override;
+  Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  Status Reset(RuntimeState* state, RowBatch* row_batch) override;
+  void Close(RuntimeState* state) override;
+
+ protected:
+  // Safe to close the build side early because we rematerialize the build rows always.
+  bool CanCloseBuildEarly() const override { return true; }
+  Status AcquireResourcesForBuild(RuntimeState* state) override;
+
+ private:
+  // This enum drives the state machine in GetNext() that processes probe batches and
+  // generates output rows.
+  //
+  // The state transition diagram is below. The state machine handles iterating through
+  // probe batches (PROBING_IN_BATCH <-> PROBING_END_BATCH), with each input probe batch
+  // producing a variable number of output rows. When the processing is done EOS is
+  // entered.
+  //
+  // start
+  //     +------------------+
+  //---->+ PROBING_IN_BATCH |
+  //     +-----+-----+------+
+  //           ^     |
+  //           |     |
+  //           |     v
+  //     +-----+-----+-------+              +----------------+
+  //     + PROBING_END_BATCH +------------->+       EOS      |
+  //     +-------------------+              +----------------+
+  //
+  enum class ProbeState {
+    // Processing probe batches and more rows in the current probe batch must be
+    // processed.
+    PROBING_IN_BATCH,
+    // Processing probe batches and no more rows in the current probe batch to process.
+    PROBING_END_BATCH,
+    // All output rows have been produced - GetNext() should return eos.
+    EOS,
+  };
+
+  /// Probes 'current_probe_row_' against the hash tables and append outputs
+  /// to output batch.
+  bool inline ProcessProbeRow(RowBatch::Iterator* out_batch_iterator,
+      int* remaining_capacity, Status* status) WARN_UNUSED_RESULT;
+
+  /// Append outputs to output batch.
+  bool inline ProcessProbeRowNoCheck(RowBatch::Iterator* out_batch_iterator,
+      int* remaining_capacity, Status* status) WARN_UNUSED_RESULT;
+
+  /// Process probe rows from probe_batch_. Returns either if out_batch is full or
+  /// probe_batch_ is entirely consumed.
+  /// Returns the number of rows added to out_batch; -1 on error (and *status will
+  /// be set). This function doesn't commit rows to the output batch so it's the caller's
+  /// responsibility to do so.
+  int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, Status* status);
+
+  /// Wrapper that ProcessProbeBatch() and commits the rows to 'out_batch' on success.
+  Status ProcessProbeBatch(RowBatch* out_batch);
+
+  /// Call at the end of consuming the probe rows, when 'probe_state_' is
+  /// PROBING_END_BATCH, before transitioning to PROBE_EOS.
+  Status DoneProbing(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT;
+
+  /// Get the next row batch from the probe (left) side (child(0)).
+  //. If we are done consuming the input, sets 'probe_batch_pos_' to -1, otherwise,
+  /// sets it to 0.  'probe_state_' must be PROBING_END_BATCH. *eos is true iff
+  /// 'out_batch' contains the last rows from the child or spilled partition.
+  Status NextProbeRowBatch(
+      RuntimeState* state, RowBatch* out_batch, bool* eos) WARN_UNUSED_RESULT;
+
+  /// Get the next row batch from the probe (left) side (child(0)). If we are done
+  /// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
+  /// 'probe_state_' must be PROBING_END_BATCH. *eos is true iff 'out_batch'
+  /// contains the last rows from the child.
+  Status NextProbeRowBatchFromChild(RuntimeState* state, RowBatch* out_batch, bool* eos);
+
+  /// Prepares for probing the next batch. Called after populating 'probe_batch_'
+  /// with rows and entering 'probe_state_' PROBING_IN_BATCH.
+  inline void ResetForProbe() {
+    current_probe_row_ = NULL;
+    probe_batch_pos_ = 0;
+    matched_probe_ = true;
+  }
+
+  std::string NodeDebugString() const;
+
+  RuntimeState* runtime_state_;
+
+  /////////////////////////////////////////
+  /// BEGIN: Members that must be Reset()
+
+  /// State of the probing algorithm. Used to drive the state machine in GetNext().
+  ProbeState probe_state_ = ProbeState::EOS;
+
+  /// The build-side rows of the join. Initialized in Prepare() if the build is embedded
+  /// in the join, otherwise looked up in Open() if it's a separate build. Owned by an
+  /// object pool with query lifetime in either case.
+  IcebergDeleteBuilder* builder_ = nullptr;
+
+  /// END: Members that must be Reset()
+  /////////////////////////////////////////
+
+  int file_path_offset_;
+  int pos_offset_;
+
+  class IcebergDeleteState {
+   public:
+    void Init(IcebergDeleteBuilder* builder);
+
+    // Recalculated the next delete row id if:
+    //  1. data file path changed
+    //  2. probe position is bigger than the next delete row id (there was a gap)
+    //     in the probe side
+    void Update(impala::StringValue* file_path, int64_t* probe_pos);
+
+    // Checks if the current probe row is deleted.
+    bool IsDeleted() const;
+
+    // Progresses the delete row id, or sets to invalid if we reached to the end
+    // of the delete vector.
+    void Delete();
+
+    // Returns true, if we can pass through the rest of the row batch
+    bool NeedCheck() const;
+
+    // Clears the state after the row batch is processed
+    void Clear();
+
+    void Reset();
+
+   private:
+    void UpdateImpl();
+    static constexpr int64_t INVALID_ROW_ID = -1;
+
+    // Using pointers and index instead of iterators to have nicer default state
+    // when we switch rowbatch
+    const impala::StringValue* current_file_path_;
+    const impala::StringValue* previous_file_path_;
+    IcebergDeleteBuilder::DeleteRowVector* current_delete_row_;
+    int64_t current_deleted_pos_row_id_;
+    int64_t current_probe_pos_;
+
+    IcebergDeleteBuilder* builder_ = nullptr;
+  };
+
+  IcebergDeleteState iceberg_delete_state_;
+};
+
+} // namespace impala
diff --git a/be/src/exec/join-builder.h b/be/src/exec/join-builder.h
index 02a5fb62a..aaedb9ba7 100644
--- a/be/src/exec/join-builder.h
+++ b/be/src/exec/join-builder.h
@@ -28,6 +28,7 @@ namespace impala {
 
 class NljBuilder;
 class PhjBuilder;
+class IcebergDeleteBuilder;
 
 class JoinBuilderConfig : public DataSinkConfig {
  public:
@@ -37,6 +38,7 @@ class JoinBuilderConfig : public DataSinkConfig {
   friend class JoinBuilder;
   friend class NljBuilder;
   friend class PhjBuilder;
+  friend class IcebergDeleteBuilder;
 
   Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc,
       FragmentState* state) override;
diff --git a/be/src/exec/join-op.h b/be/src/exec/join-op.h
index cd0c99273..925d5e4a8 100644
--- a/be/src/exec/join-op.h
+++ b/be/src/exec/join-op.h
@@ -26,7 +26,8 @@ namespace impala {
 /// build rows.
 inline bool IsLeftSemiJoin(TJoinOp::type join_op) {
   return join_op == TJoinOp::LEFT_ANTI_JOIN || join_op == TJoinOp::LEFT_SEMI_JOIN
-      || join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
+      || join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
+      || join_op == TJoinOp::ICEBERG_DELETE_JOIN;
 }
 
 /// Returns true if this is a semi-join that does not return tuple data from the
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index a56f885fd..c62dfd3f0 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -83,9 +83,7 @@ Status PartitionedHashJoinPlanNode::Init(
       probe_exprs_, PhjBuilder::HashTableStoresNulls(join_op_, is_not_distinct_from_),
       is_not_distinct_from_));
 
-  // Create the config always. It is only used if UseSeparateBuild() is true, but in
-  // Init(), IsInSubplan() isn't available yet.
-  // TODO: simplify this by ensuring that UseSeparateBuild() is accurate in Init().
+  // TODO: IMPALA-12265: create the config only if it is necessary
   RETURN_IF_ERROR(
       PhjBuilderConfig::CreateConfig(state, tnode_->node_id, tnode_->join_node.join_op,
           &build_row_desc(), eq_join_conjuncts, tnode_->runtime_filters,
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 6f3613f77..ba749b351 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -293,6 +293,10 @@ class QueryState {
     return (it != fragment_state_map_.end()) ? it->second : nullptr;
   }
 
+  const std::unordered_map<TFragmentIdx, FragmentState*>& FragmentStateMap() {
+    return fragment_state_map_;
+  }
+
  private:
   friend class QueryExecMgr;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 493b71be1..7df214152 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1118,6 +1118,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_num_threads_for_table_migration(int32_t_val);
         break;
       }
+      case TImpalaQueryOptions::DISABLE_OPTIMIZED_ICEBERG_V2_READ: {
+        query_options->__set_disable_optimized_iceberg_v2_read(IsTrue(value));
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 931e2009e..2a585c243 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::NUM_THREADS_FOR_TABLE_MIGRATION + 1);                         \
+      TImpalaQueryOptions::DISABLE_OPTIMIZED_ICEBERG_V2_READ + 1);                       \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -295,6 +295,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(max_sort_run_size, MAX_SORT_RUN_SIZE, TQueryOptionLevel::DEVELOPMENT)     \
   QUERY_OPT_FN(allow_unsafe_casts, ALLOW_UNSAFE_CASTS, TQueryOptionLevel::DEVELOPMENT)   \
   QUERY_OPT_FN(num_threads_for_table_migration, NUM_THREADS_FOR_TABLE_MIGRATION,         \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(disable_optimized_iceberg_v2_read, DISABLE_OPTIMIZED_ICEBERG_V2_READ,     \
       TQueryOptionLevel::ADVANCED);
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index b288fc067..325eae58a 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -32,6 +32,7 @@ enum TDataSinkType {
   HASH_JOIN_BUILDER = 2
   PLAN_ROOT_SINK = 3
   NESTED_LOOP_JOIN_BUILDER = 4
+  ICEBERG_DELETE_BUILDER = 5
 }
 
 enum TSinkAction {
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 75e471f0e..2b82825c3 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -803,6 +803,9 @@ enum TImpalaQueryOptions {
   // The maximum number of threads Impala can use for migrating a table to a different
   // type. E.g. from Hive table to Iceberg table.
   NUM_THREADS_FOR_TABLE_MIGRATION = 159;
+
+  // Turns off optimized Iceberg V2 reads, falls back to Hash Join
+  DISABLE_OPTIMIZED_ICEBERG_V2_READ = 160;
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 6e5d8886c..120f6c66c 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -51,6 +51,7 @@ enum TPlanNodeType {
   KUDU_SCAN_NODE = 15
   CARDINALITY_CHECK_NODE = 16
   MULTI_AGGREGATION_NODE = 17
+  ICEBERG_DELETE_NODE = 18
 }
 
 // phases of an execution node
@@ -414,6 +415,9 @@ enum TJoinOp {
   RIGHT_ANTI_JOIN = 7
   FULL_OUTER_JOIN = 8
   CROSS_JOIN = 9
+
+  // Iceberg Delete operator is based on join operation
+  ICEBERG_DELETE_JOIN = 10
 }
 
 struct THashJoinNode {
@@ -434,6 +438,11 @@ struct TNestedLoopJoinNode {
   1: optional list<Exprs.TExpr> join_conjuncts
 }
 
+struct TIcebergDeleteNode {
+  // equi-join predicates
+  1: required list<TEqJoinCondition> eq_join_conjuncts
+}
+
 // Top-level struct for a join node. Elements that are shared between the different
 // join implementations are top-level variables and elements that are specific to a
 // join implementation live in a specialized struct.
@@ -449,6 +458,7 @@ struct TJoinNode {
   // One of these must be set.
   4: optional THashJoinNode hash_join_node
   5: optional TNestedLoopJoinNode nested_loop_join_node
+  6: optional TIcebergDeleteNode iceberg_delete_node
 }
 
 struct TAggregator {
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index db046dc2b..2e866cd6c 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -631,7 +631,6 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   156: optional double join_selectivity_correlation_factor = 0.0;
-
   // See comment in ImpalaService.thrift
   157: optional i32 max_fragment_instances_per_node = MAX_FRAGMENT_INSTANCES_PER_NODE
 
@@ -644,6 +643,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   160: optional i32 num_threads_for_table_migration = 1;
+
+  // See comment in ImpalaService.thrift
+  161: optional bool disable_optimized_iceberg_v2_read = false;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/analysis/JoinOperator.java b/fe/src/main/java/org/apache/impala/analysis/JoinOperator.java
index e2321f092..0ea9525f1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/JoinOperator.java
+++ b/fe/src/main/java/org/apache/impala/analysis/JoinOperator.java
@@ -32,8 +32,9 @@ public enum JoinOperator {
   // Variant of the LEFT ANTI JOIN that is used for the rewrite of
   // NOT IN subqueries. It can have a single equality join conjunct
   // that returns TRUE when the rhs is NULL.
-  NULL_AWARE_LEFT_ANTI_JOIN("NULL AWARE LEFT ANTI JOIN",
-      TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN);
+  NULL_AWARE_LEFT_ANTI_JOIN(
+      "NULL AWARE LEFT ANTI JOIN", TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN),
+  ICEBERG_DELETE_JOIN("ICEBERG DELETE JOIN", TJoinOp.ICEBERG_DELETE_JOIN);
 
   private final String description_;
   private final TJoinOp thriftJoinOp_;
@@ -58,14 +59,16 @@ public enum JoinOperator {
   }
 
   public boolean isSemiJoin() {
-    return this == JoinOperator.LEFT_SEMI_JOIN || this == JoinOperator.LEFT_ANTI_JOIN ||
-        this == JoinOperator.RIGHT_SEMI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN ||
-        this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
+    return this == JoinOperator.LEFT_SEMI_JOIN || this == JoinOperator.LEFT_ANTI_JOIN
+        || this == JoinOperator.RIGHT_SEMI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN
+        || this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN
+        || this == JoinOperator.ICEBERG_DELETE_JOIN;
   }
 
   public boolean isLeftSemiJoin() {
-    return this == JoinOperator.LEFT_SEMI_JOIN || this == JoinOperator.LEFT_ANTI_JOIN ||
-        this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
+    return this == JoinOperator.LEFT_SEMI_JOIN || this == JoinOperator.LEFT_ANTI_JOIN
+        || this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN
+        || this == JoinOperator.ICEBERG_DELETE_JOIN;
   }
 
   public boolean isRightSemiJoin() {
@@ -85,8 +88,13 @@ public enum JoinOperator {
   }
 
   public boolean isAntiJoin() {
-    return this == JoinOperator.LEFT_ANTI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN ||
-        this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
+    return this == JoinOperator.LEFT_ANTI_JOIN || this == JoinOperator.RIGHT_ANTI_JOIN
+        || this == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN
+        || this == JoinOperator.ICEBERG_DELETE_JOIN;
+  }
+
+  public boolean isIcebergDeleteJoin() {
+    return this == JoinOperator.ICEBERG_DELETE_JOIN;
   }
 
   public JoinOperator invert() {
@@ -97,7 +105,8 @@ public enum JoinOperator {
       case RIGHT_SEMI_JOIN: return LEFT_SEMI_JOIN;
       case LEFT_ANTI_JOIN: return RIGHT_ANTI_JOIN;
       case RIGHT_ANTI_JOIN: return LEFT_ANTI_JOIN;
-      case NULL_AWARE_LEFT_ANTI_JOIN: throw new IllegalStateException("Not implemented");
+      case NULL_AWARE_LEFT_ANTI_JOIN:
+      case ICEBERG_DELETE_JOIN: throw new IllegalStateException("Not implemented");
       default: return this;
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index e76acd340..23601e098 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -22,20 +22,21 @@ import java.util.List;
 
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.Analyzer;
-import org.apache.impala.analysis.DeleteStmt;
+import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.DmlStatementBase;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.InsertStmt;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
+import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.QueryStmt;
-import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TPartitionType;
+import org.apache.impala.thrift.TVirtualColumnType;
 import org.apache.impala.util.KuduUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,6 +155,10 @@ public class DistributedPlanner {
     } else if (root instanceof IcebergMetadataScanNode) {
       result = createIcebergMetadataScanFragment(root);
       fragments.add(result);
+    } else if (root instanceof IcebergDeleteNode) {
+      Preconditions.checkState(childFragments.size() == 2);
+      result = createIcebergDeleteFragment((IcebergDeleteNode) root,
+          childFragments.get(1), childFragments.get(0), fragments);
     } else {
       throw new InternalException("Cannot create plan fragment for this node type: "
           + root.getExplainString(ctx_.getQueryOptions()));
@@ -663,6 +668,138 @@ public class DistributedPlanner {
           rightChildFragment, lhsJoinExprs, rhsJoinExprs, fragments);
     }
     return hjFragment;
+  }
+
+  /**
+   * Helper function to produce an iceberg delete fragment
+   */
+  private PlanFragment createPartitionedIcebergDeleteFragment(IcebergDeleteNode node,
+      PlanFragment leftChildFragment, PlanFragment rightChildFragment,
+      List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs) {
+    Preconditions.checkState(node.getDistributionMode() == DistributionMode.PARTITIONED);
+
+    DataPartition rhsJoinPartition =
+        DataPartition.hashPartitioned(Expr.cloneList(rhsJoinExprs));
+    DataPartition lhsJoinPartition =
+        DataPartition.hashPartitioned(Expr.cloneList(lhsJoinExprs));
+
+    // Create a new parent fragment containing a HashJoin node with two
+    // ExchangeNodes as inputs; the latter are the destinations of the
+    // left- and rightChildFragments, which now partition their output
+    // on their respective join exprs.
+    // The new fragment is hash-partitioned on the lhs input join exprs.
+    ExchangeNode lhsExchange =
+        new ExchangeNode(ctx_.getNextNodeId(), leftChildFragment.getPlanRoot());
+    lhsExchange.computeStats(ctx_.getRootAnalyzer());
+    node.setChild(0, lhsExchange);
+    ExchangeNode rhsExchange =
+        new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot());
+    rhsExchange.computeStats(ctx_.getRootAnalyzer());
+    node.setChild(1, rhsExchange);
+
+    // Connect the child fragments in a new fragment, and set the data partition
+    // of the new fragment and its child fragments.
+    DataPartition outputPartition = lhsJoinPartition;
+
+    PlanFragment joinFragment =
+        new PlanFragment(ctx_.getNextFragmentId(), node, outputPartition);
+    leftChildFragment.setDestination(lhsExchange);
+    leftChildFragment.setOutputPartition(lhsJoinPartition);
+    rightChildFragment.setDestination(rhsExchange);
+    rightChildFragment.setOutputPartition(rhsJoinPartition);
+    return joinFragment;
+  }
+
+  /**
+   * Creates either a broadcast join or a repartitioning join depending on the expected
+   * cost and various constraints. See computeDistributionMode() for more details.
+   */
+  private PlanFragment createIcebergDeleteFragment(IcebergDeleteNode node,
+      PlanFragment rightChildFragment, PlanFragment leftChildFragment,
+      List<PlanFragment> fragments) throws ImpalaException {
+    // For both join types, the total cost is calculated as the amount of data
+    // sent over the network, the hash tables build cost is roughly the same.
+    // broadcast: send the rightChildFragment's output to each node executing
+    // the leftChildFragment.
+    PlanNode rhsTree = rightChildFragment.getPlanRoot();
+    long rhsDataSize = -1;
+    long broadcastCost = -1;
+    int mt_dop = ctx_.getQueryOptions().mt_dop;
+    int leftChildNodes = leftChildFragment.getNumNodes();
+    if (rhsTree.getCardinality() != -1) {
+      rhsDataSize = Math.round(
+          rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree));
+
+      Preconditions.checkState(leftChildNodes != -1);
+      broadcastCost = rhsDataSize * leftChildNodes;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("broadcast: cost=" + Long.toString(broadcastCost));
+      LOG.trace("card=" + Long.toString(rhsTree.getCardinality())
+          + " row_size=" + Float.toString(rhsTree.getAvgRowSize())
+          + " #nodes=" + Integer.toString(leftChildNodes));
+    }
+
+    // repartition: both left- and rightChildFragment are partitioned on the
+    // file path, and a hash table is built with the rightChildFragment's output.
+    PlanNode lhsTree = leftChildFragment.getPlanRoot();
+    List<Expr> lhsJoinExprs = new ArrayList<>();
+    List<Expr> rhsJoinExprs = new ArrayList<>();
+
+    Preconditions.checkState(node.getEqJoinConjuncts().size() == 2);
+    BinaryPredicate filePathEq = node.getEqJoinConjuncts().get(1);
+
+    // Verify that the partitioning is based in file path
+    Preconditions.checkState(
+        ((SlotRef) filePathEq.getChild(0)).getDesc().getVirtualColumnType()
+        == TVirtualColumnType.INPUT_FILE_NAME);
+
+    lhsJoinExprs.add(filePathEq.getChild(0).clone());
+    rhsJoinExprs.add(filePathEq.getChild(1).clone());
+
+    long partitionCost = -1;
+    if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) {
+      Preconditions.checkState(rhsDataSize != -1);
+      double lhsNetworkCost = Math.round(
+          lhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(lhsTree));
+      double rhsNetworkCost = rhsDataSize;
+      partitionCost = Math.round(lhsNetworkCost + rhsNetworkCost);
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("partition: cost=" + Long.toString(partitionCost));
+      LOG.trace("lhs card=" + Long.toString(lhsTree.getCardinality())
+          + " row_size=" + Float.toString(lhsTree.getAvgRowSize()));
+      LOG.trace("rhs card=" + Long.toString(rhsTree.getCardinality())
+          + " row_size=" + Float.toString(rhsTree.getAvgRowSize()));
+      LOG.trace(rhsTree.getExplainString(ctx_.getQueryOptions()));
+    }
+
+    DistributionMode distrMode = DistributionMode.fromThrift(
+        ctx_.getQueryOptions().getDefault_join_distribution_mode());
+
+    // Broadcast mode has better fast path checks, it could be slightly more favoured,
+    // but network costs dominate probing costs, so it does not matter much.
+    if (broadcastCost != -1 && partitionCost != -1) {
+      if (broadcastCost < partitionCost) distrMode = DistributionMode.BROADCAST;
+      if (partitionCost < broadcastCost) distrMode = DistributionMode.PARTITIONED;
+    }
+
+    node.setDistributionMode(distrMode);
+
+    PlanFragment hjFragment = null;
+    if (distrMode == DistributionMode.BROADCAST) {
+      // Doesn't create a new fragment, but modifies leftChildFragment to execute
+      // the join; the build input is provided by an ExchangeNode, which is the
+      // destination of the rightChildFragment's output
+      node.setChild(0, leftChildFragment.getPlanRoot());
+      connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
+      leftChildFragment.setPlanRoot(node);
+      hjFragment = leftChildFragment;
+    } else {
+      hjFragment = createPartitionedIcebergDeleteFragment(
+          node, leftChildFragment, rightChildFragment, lhsJoinExprs, rhsJoinExprs);
+    }
+    return hjFragment;
  }
 
  /**
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index de04ce023..83581ad34 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -56,6 +56,7 @@ public class HashJoinNode extends JoinNode {
         otherJoinConjuncts, "HASH JOIN");
     Preconditions.checkNotNull(eqJoinConjuncts);
     Preconditions.checkState(joinOp_ != JoinOperator.CROSS_JOIN);
+    Preconditions.checkState(joinOp_ != JoinOperator.ICEBERG_DELETE_JOIN);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
new file mode 100644
index 000000000..09beeb9bd
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteNode.java
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.JoinOperator;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TEqJoinCondition;
+import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TIcebergDeleteNode;
+import org.apache.impala.thrift.TPlanNode;
+import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import org.apache.impala.util.ExprUtil;
+
+public class IcebergDeleteNode extends JoinNode {
+  private final static Logger LOG = LoggerFactory.getLogger(IcebergDeleteNode.class);
+  public IcebergDeleteNode(
+      PlanNode outer, PlanNode inner, List<BinaryPredicate> eqJoinConjuncts) {
+    super(outer, inner, true, DistributionMode.NONE, JoinOperator.ICEBERG_DELETE_JOIN,
+        eqJoinConjuncts, Collections.emptyList(), "ICEBERG DELETE");
+    Preconditions.checkNotNull(eqJoinConjuncts);
+    Preconditions.checkState(joinOp_ == JoinOperator.ICEBERG_DELETE_JOIN);
+    Preconditions.checkState(conjuncts_.isEmpty());
+    Preconditions.checkState(runtimeFilters_.isEmpty());
+  }
+
+  @Override
+  public boolean isBlockingJoinNode() {
+    return true;
+  }
+
+  @Override
+  public List<BinaryPredicate> getEqJoinConjuncts() {
+    return eqJoinConjuncts_;
+  }
+
+  @Override
+  public void init(Analyzer analyzer) throws ImpalaException {
+    super.init(analyzer);
+    List<BinaryPredicate> newEqJoinConjuncts = new ArrayList<>();
+    ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
+    for (Expr c : eqJoinConjuncts_) {
+      BinaryPredicate eqPred =
+          (BinaryPredicate) c.substitute(combinedChildSmap, analyzer, false);
+      Type t0 = eqPred.getChild(0).getType();
+      Type t1 = eqPred.getChild(1).getType();
+      Preconditions.checkState(t0.matchesType(t1));
+      BinaryPredicate newEqPred =
+          new BinaryPredicate(eqPred.getOp(), eqPred.getChild(0), eqPred.getChild(1));
+      newEqPred.analyze(analyzer);
+      newEqJoinConjuncts.add(newEqPred);
+    }
+    eqJoinConjuncts_ = newEqJoinConjuncts;
+    orderJoinConjunctsByCost();
+    computeStats(analyzer);
+  }
+
+  @Override
+  protected String debugString() {
+    return MoreObjects.toStringHelper(this)
+        .add("eqJoinConjuncts_", eqJoinConjunctsDebugString())
+        .addValue(super.debugString())
+        .toString();
+  }
+
+  private String eqJoinConjunctsDebugString() {
+    MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
+    for (Expr entry : eqJoinConjuncts_) {
+      helper.add("lhs", entry.getChild(0)).add("rhs", entry.getChild(1));
+    }
+    return helper.toString();
+  }
+
+  @Override
+  protected void toThrift(TPlanNode msg) {
+    msg.node_type = TPlanNodeType.ICEBERG_DELETE_NODE;
+    msg.join_node = joinNodeToThrift();
+    msg.join_node.iceberg_delete_node = new TIcebergDeleteNode();
+    msg.join_node.iceberg_delete_node.setEq_join_conjuncts(getThriftEquiJoinConjuncts());
+  }
+
+  /**
+   * Helper to get the equi-join conjuncts in a thrift representation.
+   */
+  public List<TEqJoinCondition> getThriftEquiJoinConjuncts() {
+    List<TEqJoinCondition> equiJoinConjuncts = new ArrayList<>(eqJoinConjuncts_.size());
+    for (BinaryPredicate bp : eqJoinConjuncts_) {
+      TEqJoinCondition eqJoinCondition = new TEqJoinCondition(
+          bp.getChild(0).treeToThrift(), bp.getChild(1).treeToThrift(),
+          bp.getOp() == BinaryPredicate.Operator.NOT_DISTINCT);
+
+      equiJoinConjuncts.add(eqJoinCondition);
+    }
+    return equiJoinConjuncts;
+  }
+
+  @Override
+  protected String getNodeExplainString(
+      String prefix, String detailPrefix, TExplainLevel detailLevel) {
+    StringBuilder output = new StringBuilder();
+    output.append(
+        String.format("%s%s [%s]\n", prefix, getDisplayLabel(), getDisplayLabelDetail()));
+
+    if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
+      if (!isDeleteRowsJoin_
+          || detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
+        output.append(detailPrefix + "equality predicates: ");
+        for (int i = 0; i < eqJoinConjuncts_.size(); ++i) {
+          Expr eqConjunct = eqJoinConjuncts_.get(i);
+          output.append(eqConjunct.toSql());
+          if (i + 1 != eqJoinConjuncts_.size()) output.append(", ");
+        }
+        output.append("\n");
+      }
+    }
+    return output.toString();
+  }
+
+  /**
+   * Helper method to compute the resource requirements for the join that can be
+   * called from the builder or the join node. Returns a pair of the probe
+   * resource requirements and the build resource requirements.
+   */
+  @Override
+  public Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile(
+      TQueryOptions queryOptions) {
+    long perBuildInstanceMemEstimate;
+    long perBuildInstanceDataBytes;
+    int numInstances = fragment_.getNumInstances();
+    if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
+        || numInstances <= 0) {
+      perBuildInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
+      perBuildInstanceDataBytes = -1;
+    } else {
+      long rhsCard = getChild(1).getCardinality();
+      long rhsNdv = 1;
+      // Calculate the ndv of the right child, which is the multiplication of
+      // the ndv of the right child column
+      for (Expr eqJoinPredicate : eqJoinConjuncts_) {
+        long rhsPdNdv = getNdv(eqJoinPredicate.getChild(1));
+        rhsPdNdv = Math.min(rhsPdNdv, rhsCard);
+        if (rhsPdNdv != -1) {
+          rhsNdv = PlanNode.checkedMultiply(rhsNdv, rhsPdNdv);
+        }
+      }
+      // The memory of the data stored in hash table is the file path of the data files
+      // which have delete files and 8 byte for every deleted row position
+      int numberOfDataFilesWithDelete = 0;
+      if (distrMode_ == DistributionMode.PARTITIONED) {
+        numberOfDataFilesWithDelete = ((IcebergScanNode) getChild(0).getChild(0))
+                                          .getFileDescriptorsWithLimit(null, false, -1)
+                                          .size();
+      } else {
+        numberOfDataFilesWithDelete = ((IcebergScanNode) getChild(0))
+                                          .getFileDescriptorsWithLimit(null, false, -1)
+                                          .size();
+      }
+
+      perBuildInstanceDataBytes = (long) Math.ceil(
+          numberOfDataFilesWithDelete * getChild(1).getAvgRowSize() + 8 * rhsCard);
+
+      // In both modes, on average the data in the hash tables are distributed evenly
+      // among the instances.
+      perBuildInstanceMemEstimate = perBuildInstanceDataBytes / numInstances;
+    }
+
+    // Almost all resource consumption is in the build, or shared between the build and
+    // the probe. These are accounted for in the build profile.
+    ResourceProfile probeProfile =
+        new ResourceProfileBuilder().setMemEstimateBytes(0).build();
+    ResourceProfile buildProfile =
+        new ResourceProfileBuilder()
+            .setMemEstimateBytes(perBuildInstanceMemEstimate)
+            .setMinMemReservationBytes(perBuildInstanceMemEstimate)
+            .build();
+    return Pair.create(probeProfile, buildProfile);
+  }
+
+  @Override
+  public Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost() {
+    // Assume 'eqJoinConjuncts_' will be applied to all rows from lhs and rhs side.
+    float eqJoinPredicateEvalCost = ExprUtil.computeExprsTotalCost(eqJoinConjuncts_);
+
+    // Compute the processing cost for lhs.
+    ProcessingCost probeProcessingCost =
+        ProcessingCost.basicCost(getDisplayLabel() + " Probe side (eqJoinConjuncts_)",
+            getChild(0).getCardinality(), eqJoinPredicateEvalCost);
+
+    // Compute the processing cost for rhs.
+    ProcessingCost buildProcessingCost =
+        ProcessingCost.basicCost(getDisplayLabel() + " Build side",
+            getChild(1).getCardinality(), eqJoinPredicateEvalCost);
+    return Pair.create(probeProcessingCost, buildProcessingCost);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index 806cdf62e..d336f446c 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -80,9 +80,11 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.planner.IcebergDeleteNode;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TColumnStats;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
+import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TVirtualColumnType;
 import org.apache.impala.util.ExprUtil;
 import org.apache.impala.util.IcebergUtil;
@@ -258,9 +260,16 @@ public class IcebergScanPlanner {
     List<BinaryPredicate> positionJoinConjuncts = createPositionJoinConjuncts(
             analyzer_, tblRef_.getDesc(), deleteDeltaRef.getDesc());
 
-    JoinNode joinNode = new HashJoinNode(dataScanNode, deleteScanNode,
-        /*straight_join=*/true, DistributionMode.NONE, JoinOperator.LEFT_ANTI_JOIN,
-        positionJoinConjuncts, /*otherJoinConjuncts=*/Collections.emptyList());
+    TQueryOptions queryOpts = analyzer_.getQueryCtx().client_request.query_options;
+    JoinNode joinNode = null;
+    if (queryOpts.disable_optimized_iceberg_v2_read) {
+      joinNode = new HashJoinNode(dataScanNode, deleteScanNode,
+          /*straight_join=*/true, DistributionMode.NONE, JoinOperator.LEFT_ANTI_JOIN,
+          positionJoinConjuncts, /*otherJoinConjuncts=*/Collections.emptyList());
+    } else {
+      joinNode =
+          new IcebergDeleteNode(dataScanNode, deleteScanNode, positionJoinConjuncts);
+    }
     joinNode.setId(ctx_.getNextNodeId());
     joinNode.init(analyzer_);
     joinNode.setIsDeleteRowsJoin();
@@ -306,6 +315,8 @@ public class IcebergScanPlanner {
       TupleDescriptor insertTupleDesc, TupleDescriptor deleteTupleDesc)
       throws AnalysisException {
     List<BinaryPredicate> ret = new ArrayList<>();
+    BinaryPredicate filePathEq = null;
+    BinaryPredicate posEq = null;
     for (SlotDescriptor deleteSlotDesc : deleteTupleDesc.getSlots()) {
       boolean foundMatch = false;
       Column col = deleteSlotDesc.getParent().getTable().getColumns().get(
@@ -314,23 +325,29 @@ public class IcebergScanPlanner {
       int fieldId = ((IcebergColumn)col).getFieldId();
       for (SlotDescriptor insertSlotDesc : insertTupleDesc.getSlots()) {
         TVirtualColumnType virtColType = insertSlotDesc.getVirtualColumnType();
-        if (fieldId == IcebergTable.V2_FILE_PATH_FIELD_ID &&
-            virtColType != TVirtualColumnType.INPUT_FILE_NAME) {
-          continue;
+        if (fieldId == IcebergTable.V2_FILE_PATH_FIELD_ID
+            && virtColType == TVirtualColumnType.INPUT_FILE_NAME) {
+          foundMatch = true;
+          filePathEq = new BinaryPredicate(
+              Operator.EQ, new SlotRef(insertSlotDesc), new SlotRef(deleteSlotDesc));
+          filePathEq.analyze(analyzer);
+          break;
         }
-        if (fieldId == IcebergTable.V2_POS_FIELD_ID &&
-            virtColType != TVirtualColumnType.FILE_POSITION) {
-          continue;
+        if (fieldId == IcebergTable.V2_POS_FIELD_ID
+            && virtColType == TVirtualColumnType.FILE_POSITION) {
+          foundMatch = true;
+          posEq = new BinaryPredicate(
+              Operator.EQ, new SlotRef(insertSlotDesc), new SlotRef(deleteSlotDesc));
+          posEq.analyze(analyzer);
+          break;
         }
-        foundMatch = true;
-        BinaryPredicate pred = new BinaryPredicate(
-            Operator.EQ, new SlotRef(insertSlotDesc), new SlotRef(deleteSlotDesc));
-        pred.analyze(analyzer);
-        ret.add(pred);
-        break;
       }
       Preconditions.checkState(foundMatch);
     }
+    Preconditions.checkState(filePathEq != null);
+    Preconditions.checkState(posEq != null);
+    ret.add(filePathEq);
+    ret.add(posEq);
     return ret;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index ab268c0c4..8f6698119 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -84,6 +84,11 @@ public class JoinBuildSink extends DataSink {
           ((HashJoinNode)joinNode_).getThriftEquiJoinConjuncts());
       tBuildSink.setHash_seed(joinNode_.getFragment().getHashSeed());
     }
+    if (joinNode_ instanceof IcebergDeleteNode) {
+      tBuildSink.setEq_join_conjuncts(
+          ((IcebergDeleteNode) joinNode_).getThriftEquiJoinConjuncts());
+      tBuildSink.setHash_seed(joinNode_.getFragment().getHashSeed());
+    }
     for (RuntimeFilter filter : runtimeFilters_) {
       tBuildSink.addToRuntime_filters(filter.toThrift());
     }
@@ -95,9 +100,11 @@ public class JoinBuildSink extends DataSink {
   protected TDataSinkType getSinkType() {
     if (joinNode_ instanceof HashJoinNode) {
       return TDataSinkType.HASH_JOIN_BUILDER;
-    } else {
-      Preconditions.checkState(joinNode_ instanceof NestedLoopJoinNode);
+    } else if (joinNode_ instanceof NestedLoopJoinNode) {
       return TDataSinkType.NESTED_LOOP_JOIN_BUILDER;
+    } else {
+      Preconditions.checkState(joinNode_ instanceof IcebergDeleteNode);
+      return TDataSinkType.ICEBERG_DELETE_BUILDER;
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 005ff1e05..4c5d51db3 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -150,7 +150,8 @@ public abstract class JoinNode extends PlanNode {
     switch (joinOp_) {
       case LEFT_ANTI_JOIN:
       case LEFT_SEMI_JOIN:
-      case NULL_AWARE_LEFT_ANTI_JOIN: {
+      case NULL_AWARE_LEFT_ANTI_JOIN:
+      case ICEBERG_DELETE_JOIN: {
         tupleIds_.addAll(outer.getTupleIds());
         break;
       }
@@ -723,7 +724,8 @@ public abstract class JoinNode extends PlanNode {
           break;
         }
         case LEFT_ANTI_JOIN:
-        case NULL_AWARE_LEFT_ANTI_JOIN: {
+        case NULL_AWARE_LEFT_ANTI_JOIN:
+        case ICEBERG_DELETE_JOIN: {
           selectivity = (double) Math.max(lhsNdv - rhsNdv, lhsNdv) / (double) lhsNdv;
           break;
         }
@@ -814,7 +816,8 @@ public abstract class JoinNode extends PlanNode {
         break;
       }
       case LEFT_ANTI_JOIN:
-      case NULL_AWARE_LEFT_ANTI_JOIN: {
+      case NULL_AWARE_LEFT_ANTI_JOIN:
+      case ICEBERG_DELETE_JOIN: {
         if (leftCard != -1) {
           cardinality_ = Math.min(leftCard, cardinality_);
         }
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index ecd5c03ae..56cbc340a 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -55,6 +55,7 @@ public class NestedLoopJoinNode extends JoinNode {
     super(outer, inner, isStraightJoin, distrMode, joinOp,
         Collections.<BinaryPredicate>emptyList(), otherJoinConjuncts,
         "NESTED LOOP JOIN");
+    Preconditions.checkState(joinOp_ != JoinOperator.ICEBERG_DELETE_JOIN);
   }
 
   @Override
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 8307dc6b0..fd0dbc043 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -84,7 +84,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
     // Test join types. Parent/collection joins do not require an ON or USING clause.
     for (JoinOperator joinOp: JoinOperator.values()) {
-      if (joinOp.isNullAwareLeftAntiJoin()) continue;
+      if (joinOp.isNullAwareLeftAntiJoin() || joinOp.isIcebergDeleteJoin()) continue;
       TblsAnalyzeOk(String.format("select 1 from $TBL %s allcomplextypes.%s",
           joinOp, collectionTable), tbl);
       TblsAnalyzeOk(String.format("select 1 from $TBL a %s a.%s",
@@ -102,9 +102,8 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         collectionField, collectionTable), tbl);
     // Non parent/collection outer or semi  joins require an ON or USING clause.
     for (JoinOperator joinOp: JoinOperator.values()) {
-      if (joinOp.isNullAwareLeftAntiJoin()
-          || joinOp.isCrossJoin()
-          || joinOp.isInnerJoin()) {
+      if (joinOp.isNullAwareLeftAntiJoin() || joinOp.isCrossJoin() || joinOp.isInnerJoin()
+          || joinOp.isIcebergDeleteJoin()) {
         continue;
       }
       AnalysisError(String.format(
@@ -321,7 +320,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
     // Test that parent/collection joins without an ON clause analyze ok.
     for (JoinOperator joinOp: JoinOperator.values()) {
-      if (joinOp.isNullAwareLeftAntiJoin()) continue;
+      if (joinOp.isNullAwareLeftAntiJoin() || joinOp.isIcebergDeleteJoin()) continue;
       AnalyzesOk(String.format(
           "select 1 from functional.allcomplextypes a %s a.int_array_col b", joinOp));
       AnalyzesOk(String.format(
@@ -1568,7 +1567,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
 
     // Test that joins without an ON clause analyze ok if the rhs table is correlated.
     for (JoinOperator joinOp: JoinOperator.values()) {
-      if (joinOp.isNullAwareLeftAntiJoin()) continue;
+      if (joinOp.isNullAwareLeftAntiJoin() || joinOp.isIcebergDeleteJoin()) continue;
       AnalyzesOk(String.format("select 1 from functional.allcomplextypes a %s " +
           "(select item from a.int_array_col) v", joinOp));
       AnalyzesOk(String.format("select 1 from functional.allcomplextypes a %s " +
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 3acbf93e8..c10d4b9de 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -56,15 +56,17 @@ public class ToSqlTest extends FrontendTestBase {
 
   static {
     // Exclude the NULL AWARE LEFT ANTI JOIN operator because it cannot
-    // be directly expressed via SQL.
-    joinTypes_ = new String[JoinOperator.values().length - 2];
-    int numNonSemiJoinTypes = JoinOperator.values().length - 2 -
-        leftSemiJoinTypes_.length - rightSemiJoinTypes_.length;
+    // be directly expressed via SQL, and ICEBERG DELETE JOIN, because it is not
+    // a real join.
+    joinTypes_ = new String[JoinOperator.values().length - 3];
+    int numNonSemiJoinTypes = JoinOperator.values().length - 3 - leftSemiJoinTypes_.length
+        - rightSemiJoinTypes_.length;
     nonSemiJoinTypes_ = new String[numNonSemiJoinTypes];
     int i = 0;
     int j = 0;
     for (JoinOperator op: JoinOperator.values()) {
-      if (op.isCrossJoin() || op.isNullAwareLeftAntiJoin()) continue;
+      if (op.isCrossJoin() || op.isNullAwareLeftAntiJoin() || op.isIcebergDeleteJoin())
+        continue;
       joinTypes_[i++] = op.toString();
       if (op.isSemiJoin()) continue;
       nonSemiJoinTypes_[j++] = op.toString();
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 2a274fb2b..b8dbfa89d 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1282,6 +1282,16 @@ public class PlannerTest extends PlannerTestBase {
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
+  /**
+   * Check that Iceberg V2 table scans work as expected with hash join
+   */
+  @Test
+  public void testIcebergV2TableScansHashJoin() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setDisable_optimized_iceberg_v2_read(true);
+    runPlannerTestFile("iceberg-v2-tables-hash-join", "functional_parquet", options,
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
+  }
 
   /**
    * Check that Iceberg V2 DELETE statements work as expected.
diff --git a/testdata/data/README b/testdata/data/README
index 7c1560527..c5f8f4ed0 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -941,6 +941,18 @@ Generated by Hive
 create table iceberg_create_table_like_parquet_test (col_int int, col_float float, col_double double, col_string string, col_struct struct<col_int:int, col_float:float>, col_array array<string>, col_map map<string,array<int>>) stored as parquet;
 insert into iceberg_create_table_like_parquet_test values (0, 1.0, 2.0, "3", named_struct("col_int", 4, "col_float", cast(5.0 as float)), array("6","7","8"), map("A", array(11,12), "B", array(21,22)));
 
+iceberg_lineitem_multiblock
+Generated by Hive, see testdata/LineItemMultiBlock/README.dox for more details
+set parquet.block.size=4086;
+create table functional_parquet.iceberg_lineitem_multiblock like tpch.lineitem stored by iceberg location 'hdfs://localhost:20500/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock' tblproperties('format-version'='2');
+insert into functional_parquet.iceberg_lineitem_multiblock select * from tpch.lineitem limit 20000;
+Delete by Ímpala
+delete from functional_parquet.iceberg_lineitem_multiblock where l_linenumber%5=0;
+Then saved the contents from HDFS to local  ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice
+And converted the HiveCatalog metadata to HadoopCatalog metadata via scripts at iceberg_v2_no_deletes
+And rewrote metadata content to the correct lengths with
+testdata/bin/rewrite-iceberg-metadata.py "" testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/
+
 simple_arrays_big.parq:
 Generated with RandomNestedDataGenerator.java from the following schema:
 {
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/data/00000-0-data-gfurnstahl_20230705104815_d51294d1-5658-497c-94f2-54269eb011be-job_16881608248131_0342-1-00001.parquet b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/data/00000-0-data-gfurnstahl_20230705104815_d51294d1-5658-497c-94f2-54269eb011be-job_16881608248131_0342-1-00001.parquet
new file mode 100644
index 000000000..8e301c048
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/data/00000-0-data-gfurnstahl_20230705104815_d51294d1-5658-497c-94f2-54269eb011be-job_16881608248131_0342-1-00001.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/data/delete-74487c6141f8028e-98cbf79b00000000_453028296_data.0.parq b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/data/delete-74487c6141f8028e-98cbf79b00000000_453028296_data.0.parq
new file mode 100644
index 000000000..31457e149
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/data/delete-74487c6141f8028e-98cbf79b00000000_453028296_data.0.parq differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/01dc3e0b-fe92-4d60-973b-fcbb58f71be5-m0.avro b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/01dc3e0b-fe92-4d60-973b-fcbb58f71be5-m0.avro
new file mode 100644
index 000000000..df0722db1
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/01dc3e0b-fe92-4d60-973b-fcbb58f71be5-m0.avro differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/e6680781-452a-41d3-a149-0136fa868069-m0.avro b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/e6680781-452a-41d3-a149-0136fa868069-m0.avro
new file mode 100644
index 000000000..82c0650d9
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/e6680781-452a-41d3-a149-0136fa868069-m0.avro differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-4821756033809199889-1-e6680781-452a-41d3-a149-0136fa868069.avro b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-4821756033809199889-1-e6680781-452a-41d3-a149-0136fa868069.avro
new file mode 100644
index 000000000..e631dd8cd
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-4821756033809199889-1-e6680781-452a-41d3-a149-0136fa868069.avro differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-7963820769190930835-1-01dc3e0b-fe92-4d60-973b-fcbb58f71be5.avro b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-7963820769190930835-1-01dc3e0b-fe92-4d60-973b-fcbb58f71be5.avro
new file mode 100644
index 000000000..1902e9b93
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-7963820769190930835-1-01dc3e0b-fe92-4d60-973b-fcbb58f71be5.avro differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v1.metadata.json b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v1.metadata.json
new file mode 100644
index 000000000..7789adffe
--- /dev/null
+++ b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v1.metadata.json
@@ -0,0 +1,146 @@
+{
+  "table-uuid": "69c2bb75-7833-4112-bf3e-b848315ef761", 
+  "statistics": [], 
+  "snapshot-log": [], 
+  "last-partition-id": 999, 
+  "last-updated-ms": 1688546893502, 
+  "last-column-id": 16, 
+  "refs": {}, 
+  "format-version": 2, 
+  "default-sort-order-id": 0, 
+  "partition-specs": [
+    {
+      "fields": [], 
+      "spec-id": 0
+    }
+  ], 
+  "properties": {
+    "serialization.format": "|", 
+    "field.delim": "|", 
+    "created_with_ctlt": "true", 
+    "write.delete.mode": "merge-on-read", 
+    "write.update.mode": "merge-on-read", 
+    "TRANSLATED_TO_EXTERNAL": "TRUE", 
+    "engine.hive.enabled": "true", 
+    "EXTERNAL": "TRUE", 
+    "storage_handler": "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler", 
+    "write.merge.mode": "merge-on-read"
+  }, 
+  "metadata-log": [], 
+  "location": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock", 
+  "last-sequence-number": 0, 
+  "current-snapshot-id": -1, 
+  "default-spec-id": 0, 
+  "current-schema-id": 0, 
+  "sort-orders": [
+    {
+      "fields": [], 
+      "order-id": 0
+    }
+  ], 
+  "snapshots": [], 
+  "schemas": [
+    {
+      "schema-id": 0, 
+      "type": "struct", 
+      "fields": [
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 1, 
+          "name": "l_orderkey"
+        }, 
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 2, 
+          "name": "l_partkey"
+        }, 
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 3, 
+          "name": "l_suppkey"
+        }, 
+        {
+          "required": false, 
+          "type": "int", 
+          "id": 4, 
+          "name": "l_linenumber"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 5, 
+          "name": "l_quantity"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 6, 
+          "name": "l_extendedprice"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 7, 
+          "name": "l_discount"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 8, 
+          "name": "l_tax"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 9, 
+          "name": "l_returnflag"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 10, 
+          "name": "l_linestatus"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 11, 
+          "name": "l_shipdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 12, 
+          "name": "l_commitdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 13, 
+          "name": "l_receiptdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 14, 
+          "name": "l_shipinstruct"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 15, 
+          "name": "l_shipmode"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 16, 
+          "name": "l_comment"
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v2.metadata.json b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v2.metadata.json
new file mode 100644
index 000000000..c78486552
--- /dev/null
+++ b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v2.metadata.json
@@ -0,0 +1,182 @@
+{
+  "table-uuid": "69c2bb75-7833-4112-bf3e-b848315ef761", 
+  "statistics": [], 
+  "snapshot-log": [
+    {
+      "timestamp-ms": 1688546907580, 
+      "snapshot-id": 7963820769190930835
+    }
+  ], 
+  "last-partition-id": 999, 
+  "last-updated-ms": 1688546907580, 
+  "last-column-id": 16, 
+  "refs": {
+    "main": {
+      "snapshot-id": 7963820769190930835, 
+      "type": "branch"
+    }
+  }, 
+  "format-version": 2, 
+  "default-sort-order-id": 0, 
+  "partition-specs": [
+    {
+      "fields": [], 
+      "spec-id": 0
+    }
+  ], 
+  "properties": {
+    "serialization.format": "|", 
+    "field.delim": "|", 
+    "created_with_ctlt": "true", 
+    "write.delete.mode": "merge-on-read", 
+    "write.update.mode": "merge-on-read", 
+    "TRANSLATED_TO_EXTERNAL": "TRUE", 
+    "engine.hive.enabled": "true", 
+    "EXTERNAL": "TRUE", 
+    "storage_handler": "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler", 
+    "write.merge.mode": "merge-on-read"
+  }, 
+  "metadata-log": [
+    {
+      "timestamp-ms": 1688546893502, 
+      "metadata-file": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/00000-683e61e4-f3bd-4fbe-8c08-698fdb1489ec.metadata.json"
+    }
+  ], 
+  "location": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock", 
+  "last-sequence-number": 1, 
+  "current-snapshot-id": 7963820769190930835, 
+  "default-spec-id": 0, 
+  "current-schema-id": 0, 
+  "sort-orders": [
+    {
+      "fields": [], 
+      "order-id": 0
+    }
+  ], 
+  "snapshots": [
+    {
+      "timestamp-ms": 1688546907580, 
+      "sequence-number": 1, 
+      "snapshot-id": 7963820769190930835, 
+      "manifest-list": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-7963820769190930835-1-01dc3e0b-fe92-4d60-973b-fcbb58f71be5.avro", 
+      "summary": {
+        "changed-partition-count": "1", 
+        "added-files-size": "1811714", 
+        "added-data-files": "1", 
+        "added-records": "20000", 
+        "total-data-files": "1", 
+        "total-equality-deletes": "0", 
+        "total-records": "20000", 
+        "operation": "append", 
+        "total-files-size": "1811714", 
+        "total-delete-files": "0", 
+        "total-position-deletes": "0"
+      }, 
+      "schema-id": 0
+    }
+  ], 
+  "schemas": [
+    {
+      "schema-id": 0, 
+      "type": "struct", 
+      "fields": [
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 1, 
+          "name": "l_orderkey"
+        }, 
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 2, 
+          "name": "l_partkey"
+        }, 
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 3, 
+          "name": "l_suppkey"
+        }, 
+        {
+          "required": false, 
+          "type": "int", 
+          "id": 4, 
+          "name": "l_linenumber"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 5, 
+          "name": "l_quantity"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 6, 
+          "name": "l_extendedprice"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 7, 
+          "name": "l_discount"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 8, 
+          "name": "l_tax"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 9, 
+          "name": "l_returnflag"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 10, 
+          "name": "l_linestatus"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 11, 
+          "name": "l_shipdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 12, 
+          "name": "l_commitdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 13, 
+          "name": "l_receiptdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 14, 
+          "name": "l_shipinstruct"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 15, 
+          "name": "l_shipmode"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 16, 
+          "name": "l_comment"
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v3.metadata.json b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v3.metadata.json
new file mode 100644
index 000000000..f0e98e2bb
--- /dev/null
+++ b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/v3.metadata.json
@@ -0,0 +1,212 @@
+{
+  "table-uuid": "69c2bb75-7833-4112-bf3e-b848315ef761", 
+  "statistics": [], 
+  "snapshot-log": [
+    {
+      "timestamp-ms": 1688546907580, 
+      "snapshot-id": 7963820769190930835
+    }, 
+    {
+      "timestamp-ms": 1688546917176, 
+      "snapshot-id": 4821756033809199889
+    }
+  ], 
+  "last-partition-id": 999, 
+  "last-updated-ms": 1688546917176, 
+  "last-column-id": 16, 
+  "refs": {
+    "main": {
+      "snapshot-id": 4821756033809199889, 
+      "type": "branch"
+    }
+  }, 
+  "format-version": 2, 
+  "default-sort-order-id": 0, 
+  "partition-specs": [
+    {
+      "fields": [], 
+      "spec-id": 0
+    }
+  ], 
+  "properties": {
+    "serialization.format": "|", 
+    "field.delim": "|", 
+    "created_with_ctlt": "true", 
+    "write.delete.mode": "merge-on-read", 
+    "write.update.mode": "merge-on-read", 
+    "TRANSLATED_TO_EXTERNAL": "TRUE", 
+    "engine.hive.enabled": "true", 
+    "EXTERNAL": "TRUE", 
+    "storage_handler": "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler", 
+    "write.merge.mode": "merge-on-read"
+  }, 
+  "metadata-log": [
+    {
+      "timestamp-ms": 1688546893502, 
+      "metadata-file": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/00000-683e61e4-f3bd-4fbe-8c08-698fdb1489ec.metadata.json"
+    }, 
+    {
+      "timestamp-ms": 1688546907580, 
+      "metadata-file": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/00001-fa91f1f6-b259-42c5-949e-b0ce7184da08.metadata.json"
+    }
+  ], 
+  "location": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock", 
+  "last-sequence-number": 2, 
+  "current-snapshot-id": 4821756033809199889, 
+  "default-spec-id": 0, 
+  "current-schema-id": 0, 
+  "sort-orders": [
+    {
+      "fields": [], 
+      "order-id": 0
+    }
+  ], 
+  "snapshots": [
+    {
+      "timestamp-ms": 1688546907580, 
+      "sequence-number": 1, 
+      "snapshot-id": 7963820769190930835, 
+      "manifest-list": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-7963820769190930835-1-01dc3e0b-fe92-4d60-973b-fcbb58f71be5.avro", 
+      "summary": {
+        "changed-partition-count": "1", 
+        "added-files-size": "1811714", 
+        "added-data-files": "1", 
+        "added-records": "20000", 
+        "total-data-files": "1", 
+        "total-equality-deletes": "0", 
+        "total-records": "20000", 
+        "operation": "append", 
+        "total-files-size": "1811714", 
+        "total-delete-files": "0", 
+        "total-position-deletes": "0"
+      }, 
+      "schema-id": 0
+    }, 
+    {
+      "timestamp-ms": 1688546917176, 
+      "sequence-number": 2, 
+      "snapshot-id": 4821756033809199889, 
+      "parent-snapshot-id": 7963820769190930835, 
+      "manifest-list": "/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/snap-4821756033809199889-1-e6680781-452a-41d3-a149-0136fa868069.avro", 
+      "summary": {
+        "changed-partition-count": "1", 
+        "added-files-size": "13614", 
+        "total-equality-deletes": "0", 
+        "added-delete-files": "1", 
+        "total-data-files": "1", 
+        "total-records": "20000", 
+        "added-position-delete-files": "1", 
+        "added-position-deletes": "2114", 
+        "operation": "overwrite", 
+        "total-files-size": "1825328", 
+        "total-delete-files": "1", 
+        "total-position-deletes": "2114"
+      }, 
+      "schema-id": 0
+    }
+  ], 
+  "schemas": [
+    {
+      "schema-id": 0, 
+      "type": "struct", 
+      "fields": [
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 1, 
+          "name": "l_orderkey"
+        }, 
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 2, 
+          "name": "l_partkey"
+        }, 
+        {
+          "required": false, 
+          "type": "long", 
+          "id": 3, 
+          "name": "l_suppkey"
+        }, 
+        {
+          "required": false, 
+          "type": "int", 
+          "id": 4, 
+          "name": "l_linenumber"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 5, 
+          "name": "l_quantity"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 6, 
+          "name": "l_extendedprice"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 7, 
+          "name": "l_discount"
+        }, 
+        {
+          "required": false, 
+          "type": "decimal(12, 2)", 
+          "id": 8, 
+          "name": "l_tax"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 9, 
+          "name": "l_returnflag"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 10, 
+          "name": "l_linestatus"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 11, 
+          "name": "l_shipdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 12, 
+          "name": "l_commitdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 13, 
+          "name": "l_receiptdate"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 14, 
+          "name": "l_shipinstruct"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 15, 
+          "name": "l_shipmode"
+        }, 
+        {
+          "required": false, 
+          "type": "string", 
+          "id": 16, 
+          "name": "l_comment"
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git a/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/version-hint.txt b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/version-hint.txt
new file mode 100644
index 000000000..00750edc0
--- /dev/null
+++ b/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock/metadata/version-hint.txt
@@ -0,0 +1 @@
+3
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index cbaf445db..31ccd7d17 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3695,6 +3695,22 @@ INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(2, 'orc', 1.5, false)
 ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.default'='parquet');
 INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(3, 'parquet', 2.5, false);
 
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+iceberg_lineitem_multiblock
+---- CREATE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+              'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog',
+              'iceberg.table_identifier'='ice.iceberg_lineitem_multiblock',
+              'format-version'='2');
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog/ice && \
+hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice
+
 ====
 ---- DATASET
 functional
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index f567cac56..8e8ae9fa7 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -96,6 +96,7 @@ table_name:iceberg_v2_partitioned_position_deletes_orc, constraint:restrict_to,
 table_name:iceberg_multiple_storage_locations, constraint:restrict_to, table_format:parquet/none/none
 table_name:iceberg_avro_format, constraint:restrict_to, table_format:parquet/none/none
 table_name:iceberg_mixed_file_format, constraint:restrict_to, table_format:parquet/none/none
+table_name:iceberg_lineitem_multiblock, constraint:restrict_to, table_format:parquet/none/none
 
 # TODO: Support Avro. Data loading currently fails for Avro because complex types
 # cannot be converted to the corresponding Avro types yet.
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
index 210a7a4ad..b54bd67e1 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
@@ -30,7 +30,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 |  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=20B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=28B cardinality=1
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
@@ -48,7 +48,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 |  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=20B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=28B cardinality=1
 |
 |--03:EXCHANGE [BROADCAST]
@@ -80,7 +80,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 |  |  output: min(id)
 |  |  row-size=8B cardinality=1
 |  |
-|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=28B cardinality=3
 |  |
 |  |--04:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 functional_parquet.iceberg_v2_delete_positional-position-delete]
@@ -91,7 +91,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 |     HDFS partitions=1/1 files=1 size=662B
 |     row-size=28B cardinality=3
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=28B cardinality=3
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
@@ -126,7 +126,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 |  |  output: min(id)
 |  |  row-size=8B cardinality=1
 |  |
-|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  |  row-size=28B cardinality=3
 |  |
 |  |--09:EXCHANGE [BROADCAST]
@@ -139,7 +139,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 |     HDFS partitions=1/1 files=1 size=662B
 |     row-size=28B cardinality=3
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=28B cardinality=3
 |
 |--08:EXCHANGE [BROADCAST]
@@ -161,7 +161,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 |  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=20B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=28B cardinality=1
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
@@ -179,7 +179,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 |  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=20B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=28B cardinality=1
 |
 |--03:EXCHANGE [BROADCAST]
@@ -201,7 +201,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=36B cardinality=2
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=40B cardinality=2
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -221,16 +221,16 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |
 05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  row-size=40B cardinality=2
 |
-|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
+|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
 |  |
 |  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
 |     HDFS partitions=1/1 files=3 size=9.47KB
 |     row-size=204B cardinality=10
 |
-03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
+03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=3 size=3.48KB
@@ -245,7 +245,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=36B cardinality=6
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=36B cardinality=6
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -263,7 +263,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=36B cardinality=6
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=36B cardinality=6
 |
 |--03:EXCHANGE [BROADCAST]
@@ -285,7 +285,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=36B cardinality=2
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=48B cardinality=2
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -305,16 +305,16 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |
 05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  row-size=48B cardinality=2
 |
-|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
+|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
 |  |
 |  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
 |     HDFS partitions=1/1 files=3 size=9.47KB
 |     row-size=204B cardinality=10
 |
-03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
+03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=3 size=3.48KB
@@ -339,7 +339,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |  |  output: max(id)
 |  |  row-size=8B cardinality=1
 |  |
-|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=28B cardinality=3
 |  |
 |  |--04:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 functional_parquet.iceberg_v2_delete_positional-position-delete]
@@ -350,7 +350,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |     HDFS partitions=1/1 files=1 size=662B
 |     row-size=28B cardinality=3
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=40B cardinality=20
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -387,7 +387,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |  |  output: max(id)
 |  |  row-size=8B cardinality=1
 |  |
-|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  05:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  |  row-size=28B cardinality=3
 |  |
 |  |--10:EXCHANGE [BROADCAST]
@@ -400,16 +400,16 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |     HDFS partitions=1/1 files=1 size=662B
 |     row-size=28B cardinality=3
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  row-size=40B cardinality=20
 |
-|--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
+|--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
 |  |
 |  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
 |     HDFS partitions=1/1 files=3 size=9.47KB
 |     row-size=204B cardinality=10
 |
-08:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
+08:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=3 size=3.48KB
@@ -424,7 +424,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |  order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
 |  row-size=36B cardinality=2
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=40B cardinality=2
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -444,16 +444,16 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-
 |
 05:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  row-size=40B cardinality=2
 |
-|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
+|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
 |  |
 |  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
 |     HDFS partitions=1/1 files=3 size=9.47KB
 |     row-size=204B cardinality=10
 |
-03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
+03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=3 size=3.48KB
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables-hash-join.test
similarity index 99%
copy from testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
copy to testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables-hash-join.test
index 66a4f8954..1ee7848d7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables-hash-join.test
@@ -1151,4 +1151,4 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
    row-size=20B cardinality=6
-====
\ No newline at end of file
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
index 66a4f8954..d825285f9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
@@ -50,7 +50,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  row-size=8B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=20B cardinality=3
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
@@ -73,7 +73,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  row-size=8B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=20B cardinality=3
 |
 |--04:EXCHANGE [BROADCAST]
@@ -90,7 +90,7 @@ SELECT * from iceberg_v2_delete_positional;
 ---- PLAN
 PLAN-ROOT SINK
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=40B cardinality=3
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
@@ -105,7 +105,7 @@ PLAN-ROOT SINK
 |
 04:EXCHANGE [UNPARTITIONED]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=40B cardinality=3
 |
 |--03:EXCHANGE [BROADCAST]
@@ -122,7 +122,7 @@ SELECT * from iceberg_v2_positional_delete_all_rows;
 ---- PLAN
 PLAN-ROOT SINK
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=36B cardinality=3
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_delete_all_rows-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_delete_all_rows-position-delete]
@@ -137,7 +137,7 @@ PLAN-ROOT SINK
 |
 04:EXCHANGE [UNPARTITIONED]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=36B cardinality=3
 |
 |--03:EXCHANGE [BROADCAST]
@@ -173,7 +173,7 @@ SELECT * from iceberg_v2_positional_delete_all_rows limit 1
 ---- PLAN
 PLAN-ROOT SINK
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  limit: 1
 |  row-size=36B cardinality=1
 |
@@ -190,7 +190,7 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  limit: 1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  limit: 1
 |  row-size=36B cardinality=1
 |
@@ -213,7 +213,7 @@ PLAN-ROOT SINK
 |  limit: 1
 |  row-size=36B cardinality=1
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -238,16 +238,16 @@ PLAN-ROOT SINK
 |  limit: 1
 |  row-size=36B cardinality=1
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  |  row-size=36B cardinality=6
 |  |
-|  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
 |  |  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
 |  |     row-size=267B cardinality=4
 |  |
-|  05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+|  05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
@@ -265,7 +265,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=10
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -288,16 +288,16 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=10
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  |  row-size=36B cardinality=6
 |  |
-|  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
 |  |  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
 |  |     row-size=267B cardinality=4
 |  |
-|  05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+|  05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
@@ -329,7 +329,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  row-size=8B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=20B cardinality=3
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -352,7 +352,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  row-size=8B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=20B cardinality=3
 |
 |--04:EXCHANGE [BROADCAST]
@@ -373,7 +373,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  row-size=8B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=20B cardinality=6
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -396,16 +396,16 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  row-size=8B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  row-size=20B cardinality=6
 |
-|--05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|--05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |
 |  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |     HDFS partitions=1/1 files=2 size=5.33KB
 |     row-size=267B cardinality=4
 |
-04:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+04:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
@@ -419,7 +419,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=6
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=36B cardinality=3
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_update_all_rows-position-delete]
@@ -442,7 +442,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=6
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  |  row-size=36B cardinality=3
 |  |
 |  |--05:EXCHANGE [BROADCAST]
@@ -463,7 +463,7 @@ SELECT * from iceberg_v2_partitioned_position_deletes
 ---- PLAN
 PLAN-ROOT SINK
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=64B cardinality=20
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -478,16 +478,16 @@ PLAN-ROOT SINK
 |
 05:EXCHANGE [UNPARTITIONED]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  row-size=64B cardinality=20
 |
-|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.pos,functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
+|--04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete.file_path)]
 |  |
 |  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
 |     HDFS partitions=1/1 files=3 size=9.47KB
 |     row-size=204B cardinality=10
 |
-03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.file__position,functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
+03:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.input__file__name)]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
    HDFS partitions=1/1 files=3 size=3.48KB
@@ -502,7 +502,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=2
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=36B cardinality=1
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -527,16 +527,16 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=2
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  |  row-size=36B cardinality=1
 |  |
-|  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
 |  |  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
 |  |     row-size=267B cardinality=4
 |  |
-|  05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+|  05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
@@ -570,7 +570,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=10
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -613,16 +613,16 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=10
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  |  row-size=36B cardinality=6
 |  |
-|  |--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|  |--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
 |  |  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
 |  |     row-size=267B cardinality=4
 |  |
-|  08:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+|  08:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
@@ -641,7 +641,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=2
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=36B cardinality=1
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -666,16 +666,16 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=2
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  |  row-size=36B cardinality=1
 |  |
-|  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
 |  |  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
 |  |     row-size=267B cardinality=4
 |  |
-|  05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+|  05:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
@@ -706,7 +706,7 @@ PLAN-ROOT SINK
 |  |  pass-through-operands: all
 |  |  row-size=24B cardinality=6
 |  |
-|  |--07:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  |--07:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  |  row-size=24B cardinality=3
 |  |  |
 |  |  |--06:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-06 functional_parquet.iceberg_v2_positional_update_all_rows-position-delete]
@@ -725,7 +725,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=10
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -767,7 +767,7 @@ PLAN-ROOT SINK
 |  |  pass-through-operands: all
 |  |  row-size=24B cardinality=6
 |  |
-|  |--07:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  |--07:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  |  |  row-size=24B cardinality=3
 |  |  |
 |  |  |--14:EXCHANGE [BROADCAST]
@@ -788,16 +788,16 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  row-size=36B cardinality=10
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  |  row-size=36B cardinality=6
 |  |
-|  |--13:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|  |--13:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
 |  |  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
 |  |     row-size=267B cardinality=4
 |  |
-|  12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+|  12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
@@ -813,7 +813,7 @@ select * from iceberg_v2_partitioned_position_deletes where action = 'download'
 ---- PLAN
 PLAN-ROOT SINK
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=64B cardinality=1
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -829,7 +829,7 @@ PLAN-ROOT SINK
 |
 04:EXCHANGE [UNPARTITIONED]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=64B cardinality=1
 |
 |--03:EXCHANGE [BROADCAST]
@@ -848,7 +848,7 @@ where action = 'download' and user = 'Lisa';
 ---- PLAN
 PLAN-ROOT SINK
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=64B cardinality=1
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -864,7 +864,7 @@ PLAN-ROOT SINK
 |
 04:EXCHANGE [UNPARTITIONED]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=64B cardinality=1
 |
 |--03:EXCHANGE [BROADCAST]
@@ -1010,7 +1010,7 @@ select * from iceberg_v2_partitioned_position_deletes where action = 'download';
 ---- PLAN
 PLAN-ROOT SINK
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=64B cardinality=6
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
@@ -1026,7 +1026,7 @@ PLAN-ROOT SINK
 |
 04:EXCHANGE [UNPARTITIONED]
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, BROADCAST]
 |  row-size=64B cardinality=6
 |
 |--03:EXCHANGE [BROADCAST]
@@ -1053,7 +1053,7 @@ PLAN-ROOT SINK
 |  |  output: count(*)
 |  |  row-size=8B cardinality=1
 |  |
-|  07:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  07:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  |  row-size=20B cardinality=6
 |  |
 |  |--06:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-06 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -1075,7 +1075,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  row-size=8B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=20B cardinality=6
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -1103,16 +1103,16 @@ PLAN-ROOT SINK
 |  |  output: count(*)
 |  |  row-size=8B cardinality=1
 |  |
-|  07:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+|  07:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  |  row-size=20B cardinality=6
 |  |
-|  |--17:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|  |--17:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
 |  |  06:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-06 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
 |  |     row-size=267B cardinality=4
 |  |
-|  16:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+|  16:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |  |
 |  05:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
@@ -1137,18 +1137,18 @@ PLAN-ROOT SINK
 |  output: count(*)
 |  row-size=8B cardinality=1
 |
-02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
+02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, PARTITIONED]
 |  row-size=20B cardinality=6
 |
-|--12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
+|--12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |
 |  01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |     HDFS partitions=1/1 files=2 size=5.33KB
 |     row-size=267B cardinality=4
 |
-11:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
+11:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name)]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
    row-size=20B cardinality=6
-====
\ No newline at end of file
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index d4c347c76..3193f0254 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -169,19 +169,19 @@ PLAN-ROOT SINK
 select id from functional_parquet.alltypes tablesample system(10) repeatable(1234)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=20.00MB mem-reservation=4.01MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=20.00MB mem-reservation=4.02MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: id
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=3/24 files=3 size=23.71KB
+   HDFS partitions=3/24 files=3 size=23.83KB
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=11.95K
+     partitions: 0/24 rows=12.01K
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=unavailable
-   mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=1
+   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=4B cardinality=1.20K
    in pipelines: 00(GETNEXT)
 ====
@@ -270,7 +270,7 @@ PLAN-ROOT SINK
 select count(*) from functional_parquet.iceberg_non_partitioned tablesample system(10) repeatable(1234)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=32.02MB mem-reservation=8.00KB thread-reservation=2
+|  Per-Host Resources: mem-estimate=1.02MB mem-reservation=8.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
@@ -287,7 +287,7 @@ PLAN-ROOT SINK
      table: rows=20 size=22.90KB
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=6
-   mem-estimate=32.00MB mem-reservation=8.00KB thread-reservation=1
+   mem-estimate=1.00MB mem-reservation=8.00KB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=20
    in pipelines: 00(GETNEXT)
 ====
@@ -331,7 +331,7 @@ PLAN-ROOT SINK
    parquet statistics predicates: id > CAST(0 AS INT), action = 'click'
    parquet dictionary predicates: id > CAST(0 AS INT), action = 'click'
    mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=44B cardinality=4
+   tuple-ids=0 row-size=44B cardinality=1
    in pipelines: 00(GETNEXT)
 ====
 # Sampling Iceberg V2 tables. Delete files are not sampled, only the data files. So we
@@ -340,7 +340,7 @@ select * from functional_parquet.iceberg_v2_positional_not_all_data_files_have_d
 tablesample system(10) repeatable(1234)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=101.94MB mem-reservation=5.98MB thread-reservation=3
+|  Per-Host Resources: mem-estimate=100.00MB mem-reservation=4.05MB thread-reservation=3
 PLAN-ROOT SINK
 |  output exprs: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
@@ -351,9 +351,9 @@ PLAN-ROOT SINK
 |  tuple-ids=0 row-size=36B cardinality=4
 |  in pipelines: 03(GETNEXT), 00(GETNEXT)
 |
-|--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  hash predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position = functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name = functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path
-|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|--02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
+|  |  equality predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position = functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name = functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path
+|  |  mem-estimate=566B mem-reservation=566B thread-reservation=0
 |  |  tuple-ids=0 row-size=36B cardinality=3
 |  |  in pipelines: 00(GETNEXT), 01(OPEN)
 |  |
@@ -364,7 +364,7 @@ PLAN-ROOT SINK
 |  |       columns: all
 |  |     extrapolated-rows=disabled max-scan-range-rows=2
 |  |     mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
-|  |     tuple-ids=1 row-size=245B cardinality=4
+|  |     tuple-ids=1 row-size=267B cardinality=4
 |  |     in pipelines: 01(GETNEXT)
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
index e9c7b985f..c40a968ef 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
@@ -656,3 +656,38 @@ regex:'$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned/data/event_time
 ---- TYPES
 STRING, INT, STRING, STRING, TIMESTAMP
 ====
+---- QUERY
+SELECT count(*) from iceberg_lineitem_multiblock;
+---- RESULTS
+17886
+---- TYPES
+bigint
+====
+---- QUERY
+SET BATCH_SIZE=2;
+SELECT count(*) from iceberg_lineitem_multiblock;
+---- RESULTS
+17886
+---- TYPES
+bigint
+====
+---- QUERY
+SELECT * from iceberg_v2_partitioned_position_deletes;
+---- RESULTS
+6,'Alex','view',2020-01-01 09:00:00
+20,'Alex','view',2020-01-01 09:00:00
+4,'Alex','view',2020-01-01 09:00:00
+18,'Alan','click',2020-01-01 10:00:00
+12,'Alan','click',2020-01-01 10:00:00
+10,'Alan','click',2020-01-01 10:00:00
+2,'Lisa','download',2020-01-01 11:00:00
+14,'Lisa','download',2020-01-01 11:00:00
+8,'Lisa','download',2020-01-01 11:00:00
+16,'Lisa','download',2020-01-01 11:00:00
+---- TYPES
+INT, STRING, STRING, TIMESTAMP
+====
+---- QUERY
+# Changing back in case we add new tests later
+SET BATCH_SIZE=0;
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 831f70c0a..08f9cff34 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -37,6 +37,7 @@ import json
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.iceberg_test_suite import IcebergTestSuite
 from tests.common.skip import SkipIf, SkipIfFS, SkipIfDockerizedCluster
+from tests.common.test_vector import ImpalaTestDimension
 from tests.common.file_utils import (
   create_iceberg_table_from_directory,
   create_table_from_parquet)
@@ -1119,7 +1120,8 @@ class TestIcebergV2Table(IcebergTestSuite):
     super(TestIcebergV2Table, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(
       lambda v: v.get_value('table_format').file_format == 'parquet')
-
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(
+      'disable_optimized_iceberg_v2_read', 0, 1))
   # The test uses pre-written Iceberg tables where the position delete files refer to
   # the data files via full URI, i.e. they start with 'hdfs://localhost:2050/...'. In the
   # dockerised environment the namenode is accessible on a different hostname/port.