You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/19 05:10:44 UTC

[doris] 10/21: [fix](iceberg-v2) icebergv2 filter data path (#14470)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d23e7905b09c1d1f4dfe4849d21e5a79034dbf4e
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Thu Dec 15 10:18:12 2022 +0800

    [fix](iceberg-v2) icebergv2  filter data path  (#14470)
    
    1. a icebergv2 delete file may cross many data paths, so the path of a file split is required as a predicate to filter rows of delete file
    - create delete file structure to save predicate parameters
    - create predicate for file path
    2. add some log to print row range
    3.  fix bug when create file metadata
---
 be/src/vec/exec/format/parquet/parquet_common.h    |   8 ++
 be/src/vec/exec/format/parquet/schema_desc.h       |   4 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  10 +-
 .../vec/exec/format/parquet/vparquet_page_index.h  |   4 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  58 +++++++-----
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   3 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 104 ++++++++++++++++-----
 be/src/vec/exec/format/table/iceberg_reader.h      |  26 ++++--
 be/src/vec/exec/format/table/table_format_reader.h |   3 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |   6 +-
 .../planner/external/ExternalFileScanNode.java     |   2 +-
 .../apache/doris/planner/external/HiveSplit.java   |   1 +
 .../planner/external/IcebergScanProvider.java      |  86 +++++++++++++++--
 .../doris/planner/external/IcebergSplit.java       |   5 +
 .../doris/planner/external/QueryScanProvider.java  |   9 +-
 gensrc/thrift/PlanNodes.thrift                     |   7 +-
 16 files changed, 252 insertions(+), 84 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index d124c2fb9a..611ce969f1 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -46,6 +46,14 @@ struct RowRange {
 
     int64_t first_row;
     int64_t last_row;
+
+    bool operator<(const RowRange& range) const { return first_row < range.first_row; }
+
+    std::string debug_string() const {
+        std::stringstream ss;
+        ss << "[" << first_row << "," << last_row << ")";
+        return ss.str();
+    }
 };
 
 struct ParquetReadColumn {
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h
index 73f9f97d97..c45e96f236 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -40,11 +40,13 @@ struct FieldSchema {
     tparquet::Type::type physical_type;
     // The index order in FieldDescriptor._physical_fields
     int physical_column_index = -1;
-
     int16_t definition_level = 0;
     int16_t repetition_level = 0;
     std::vector<FieldSchema> children;
 
+    FieldSchema() = default;
+    ~FieldSchema() = default;
+    FieldSchema(const FieldSchema& fieldSchema) = default;
     std::string debug_string() const;
 };
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 235a5f3e3b..c0557c482a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -67,17 +67,17 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end
     }
     int index = _row_range_index;
     while (index < _row_ranges->size()) {
-        const RowRange& read_range = (*_row_ranges)[index];
-        if (read_range.last_row <= start_index) {
+        const RowRange& row_range = (*_row_ranges)[index];
+        if (row_range.last_row <= start_index) {
             index++;
             _row_range_index++;
             continue;
         }
-        if (read_range.first_row >= end_index) {
+        if (row_range.first_row >= end_index) {
             break;
         }
-        int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row;
-        int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index;
+        int64_t start = row_range.first_row < start_index ? start_index : row_range.first_row;
+        int64_t end = row_range.last_row < end_index ? row_range.last_row : end_index;
         read_ranges.emplace_back(start, end);
         index++;
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index 978a798bf4..4a27593abf 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -36,9 +36,9 @@ public:
                                       std::vector<int>& skipped_ranges);
     bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns);
     Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
-                              tparquet::ColumnIndex* _column_index);
+                              tparquet::ColumnIndex* column_index);
     Status parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
-                              tparquet::OffsetIndex* _offset_index);
+                              tparquet::OffsetIndex* offset_index);
 
 private:
     friend class ParquetReader;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 01c226b1e4..c8d4f6b3c4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -324,11 +324,13 @@ Status ParquetReader::get_parsered_schema(std::vector<std::string>* col_names,
                                                         _scan_range.start_offset,
                                                         _scan_range.file_size, 0, _file_reader));
     }
-    RETURN_IF_ERROR(_file_reader->open());
-    if (_file_reader->size() == 0) {
-        return Status::EndOfFile("Empty Parquet File");
+    if (_file_metadata == nullptr) {
+        RETURN_IF_ERROR(_file_reader->open());
+        if (_file_reader->size() == 0) {
+            return Status::EndOfFile("Empty Parquet File");
+        }
+        RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
     }
-    RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
     _t_metadata = &_file_metadata->to_thrift();
 
     _total_groups = _t_metadata->row_groups.size();
@@ -360,57 +362,64 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
-void ParquetReader::merge_delete_row_ranges(const std::vector<RowRange>& delete_row_ranges) {
+void ParquetReader::merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges) {
     if (_row_ranges.empty()) {
+        _current_group_reader->set_row_ranges(_row_ranges);
         return;
     }
-    std::vector<RowRange> candidate_ranges;
-    auto start_range = _row_ranges.begin();
     if (!delete_row_ranges.empty()) {
+        std::vector<RowRange> candidate_ranges;
+        auto start_range = _row_ranges.begin();
         auto delete_range = delete_row_ranges.begin();
-        int64_t range_start_idx = start_range->first_row;
+        int64_t processed_range_start_idx = start_range->first_row;
         while (start_range != _row_ranges.end() && delete_range != delete_row_ranges.end()) {
             int64_t delete_start = delete_range->first_row;
             int64_t delete_end = delete_range->last_row;
             int64_t range_start = start_range->first_row;
             int64_t range_end = start_range->last_row;
-
-            if (delete_end >= range_end) {
-                if (range_start < delete_start) {
+            if (delete_end > range_end) {
+                if (range_start < processed_range_start_idx) {
+                    // rows before processed_range_start_idx have been processed
+                    range_start = processed_range_start_idx;
+                }
+                if (range_end < delete_start) {
+                    /**
+                     *      start_range
+                     *    || --------- || - |--------- |
+                     *                      delete_range
+                     */
+                    candidate_ranges.emplace_back(range_start, range_end);
+                } else if (range_start < delete_start) {
                     /**
                      *          row_range
                      *    || --------|-------- || ----- |
                      *         delete_start       delete_end
                      */
                     candidate_ranges.emplace_back(range_start, delete_start);
-                } else if (range_end <= delete_start) {
-                    /**
-                     *      start_range
-                     *    || --------- || ----------- |
-                     *                   delete_range
-                     */
-                    candidate_ranges.emplace_back(range_start, range_end);
                 }
+                // range_end > delete_end && range_start > delete_start
                 start_range++;
             } else {
                 // delete_end < range_end,most of the time, we will use this branch
-                if (range_start <= delete_start) {
+                if (processed_range_start_idx < delete_start) {
                     /**
                      *   row_range_start           row_range_end
                      *       || --- | --------- | --- ||
                      *               delete_range
                      */
-                    candidate_ranges.emplace_back(range_start_idx, delete_start);
-                    range_start_idx = delete_end + 1;
+                    candidate_ranges.emplace_back(processed_range_start_idx, delete_start);
                 }
+                // delete_end is in row_range, so it can assign to processed_range_start_idx
+                processed_range_start_idx = delete_end;
                 delete_range++;
                 if (delete_range == delete_row_ranges.end()) {
-                    candidate_ranges.emplace_back(delete_end + 1, range_end);
+                    range_end = _row_ranges[_row_ranges.size() - 1].last_row;
+                    if (processed_range_start_idx != range_end) {
+                        candidate_ranges.emplace_back(processed_range_start_idx, range_end);
+                    }
                 }
             }
         }
-    }
-    if (!candidate_ranges.empty()) {
         _row_ranges.assign(candidate_ranges.begin(), candidate_ranges.end());
     }
     _current_group_reader->set_row_ranges(_row_ranges);
@@ -545,7 +554,6 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
     if (!_has_page_index(row_group.columns, page_index)) {
         return Status::OK();
     }
-    //    int64_t buffer_size = page_index._column_index_size;
     uint8_t col_index_buff[page_index._column_index_size];
     int64_t bytes_read = 0;
     RETURN_IF_ERROR(_file_reader->readat(page_index._column_index_start,
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index aaae115bd3..f5891754e7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -79,7 +79,7 @@ public:
 
     Status file_metadata(FileMetaData** metadata);
 
-    void merge_delete_row_ranges(const std::vector<RowRange>& delete_row_ranges);
+    void merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges);
 
     int64_t size() const { return _file_reader->size(); }
 
@@ -149,7 +149,6 @@ private:
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
     std::unique_ptr<FileReader> _file_reader = nullptr;
-    std::vector<RowRange> _delete_row_ranges;
     std::vector<RowRange> _row_ranges;
     std::shared_ptr<FileMetaData> _file_metadata;
     const tparquet::FileMetaData* _t_metadata;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 041c70438f..9e2f659772 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -29,38 +29,64 @@ namespace doris::vectorized {
 const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
 const std::string ICEBERG_ROW_POS = "pos";
 
+IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
+                                       RuntimeState* state, const TFileScanRangeParams& params)
+        : TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) {
+    static const char* iceberg_profile = "IcebergProfile";
+    ADD_TIMER(_profile, iceberg_profile);
+    _iceberg_profile._delete_files_init_time =
+            ADD_CHILD_TIMER(_profile, "DeleteFileInitTime", iceberg_profile);
+    _iceberg_profile._delete_files_read_total_time =
+            ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
+}
+
+IcebergTableReader::~IcebergTableReader() {
+    if (_data_path_conjunct_ctx != nullptr) {
+        _data_path_conjunct_ctx->close(_state);
+    }
+}
+
 Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
     return _file_format_reader->get_next_block(block, read_rows, eof);
 }
 
+Status IcebergTableReader::set_fill_columns(
+        const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
+                partition_columns,
+        const std::unordered_map<std::string, VExprContext*>& missing_columns) {
+    return _file_format_reader->set_fill_columns(partition_columns, missing_columns);
+}
+
 Status IcebergTableReader::get_columns(
         std::unordered_map<std::string, TypeDescriptor>* name_to_type,
         std::unordered_set<std::string>* missing_cols) {
     return _file_format_reader->get_columns(name_to_type, missing_cols);
 }
 
-void IcebergTableReader::filter_rows() {
+void IcebergTableReader::filter_rows(const TFileRangeDesc& range) {
     if (_cur_delete_file_reader == nullptr) {
         return;
     }
-    auto& table_desc = _params.table_format_params.iceberg_params;
+    SCOPED_TIMER(_iceberg_profile._delete_files_read_total_time);
+    auto& table_desc = range.table_format_params.iceberg_params;
     auto& version = table_desc.format_version;
     if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
         return;
     }
     bool eof = false;
-    std::vector<RowRange> delete_row_ranges;
+    std::set<RowRange> delete_row_ranges;
     while (!eof) {
         size_t read_rows = 0;
         Block block = Block();
-        for (const FieldSchema* field : _column_schemas) {
-            DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field->type, true);
+        for (const FieldSchema& field : _column_schemas) {
+            DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field.type, true);
             MutableColumnPtr data_column = data_type->create_column();
-            block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field->name));
+            block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field.name));
         }
         Status st = _cur_delete_file_reader->get_next_block(&block, &read_rows, &eof);
         if (!st.ok() || eof) {
             if (!_delete_file_readers.empty()) {
+                eof = false;
                 _cur_delete_file_reader = std::move(_delete_file_readers.front());
                 _delete_file_readers.pop_front();
             }
@@ -88,40 +114,58 @@ void IcebergTableReader::filter_rows() {
                 int64_t row_id = delete_row_ids[i];
                 int64_t row_range_start = row_id;
                 int64_t row_range_end = row_id;
-                // todo: add debug info
-                // todo: asure reading delete file data in file_range only
                 while (i + 1 < num_deleted_ids) {
                     if (delete_row_ids[i + 1] == delete_row_ids[i] + 1) {
                         row_range_end = delete_row_ids[i + 1];
                         i++;
                         continue;
                     } else {
-                        delete_row_ranges.emplace_back(row_range_start, row_range_end);
+                        delete_row_ranges.emplace(row_range_start, row_range_end + 1);
                         row_range_start = ++row_range_end;
                         break;
                     }
                 }
                 if (i == num_deleted_ids - 1) {
-                    delete_row_ranges.emplace_back(row_range_start,
-                                                   delete_row_ids[num_deleted_ids - 1]);
+                    delete_row_ranges.emplace(row_range_start,
+                                              delete_row_ids[num_deleted_ids - 1] + 1);
                 }
                 row_range_start = delete_row_ids[i + 1];
                 i++;
             }
         }
     }
+    if (VLOG_IS_ON(3)) {
+        if (!delete_row_ranges.empty()) {
+            std::stringstream out;
+            out << "[";
+            for (const RowRange& delete_row_range : delete_row_ranges) {
+                out << " " << delete_row_range.debug_string();
+            }
+            out << " ]";
+            VLOG_NOTICE << "Delete row range info: " << out.str();
+        }
+    }
     ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
     parquet_reader->merge_delete_row_ranges(delete_row_ranges);
 }
 
-Status IcebergTableReader::init_row_filters() {
-    auto& table_desc = _params.table_format_params.iceberg_params;
+Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
+    auto& table_desc = range.table_format_params.iceberg_params;
     auto& version = table_desc.format_version;
     if (version >= MIN_SUPPORT_DELETE_FILES_VERSION) {
+        SCOPED_TIMER(_iceberg_profile._delete_files_init_time);
         auto& delete_file_type = table_desc.content;
         auto files = table_desc.delete_files;
         if (delete_file_type == POSITON_DELELE) {
             // position delete
+            auto row_desc = RowDescriptor(_state->desc_tbl(),
+                                          std::vector<TupleId>({table_desc.delete_table_tuple_id}),
+                                          std::vector<bool>({false}));
+            RETURN_IF_ERROR(VExpr::create_expr_tree(
+                    _state->obj_pool(), table_desc.file_select_conjunct, &_data_path_conjunct_ctx));
+            RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc));
+            RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state));
+            vector<std::string> names;
             for (auto& delete_file : files) {
                 _position_delete_params.low_bound_index = delete_file.position_lower_bound;
                 _position_delete_params.upper_bound_index = delete_file.position_upper_bound;
@@ -134,17 +178,29 @@ Status IcebergTableReader::init_row_filters() {
                 ParquetReader* delete_reader = new ParquetReader(
                         _profile, _params, delete_range, _state->query_options().batch_size,
                         const_cast<cctz::time_zone*>(&_state->timezone_obj()));
-                FileMetaData* metadata = nullptr;
-                RETURN_IF_ERROR(delete_reader->file_metadata(&metadata));
-
-                auto& delete_file_schema = metadata->schema();
-                vector<std::string> names;
-                for (auto i = 0; i < delete_file_schema.size(); ++i) {
-                    const FieldSchema* field = delete_file_schema.get_column(i);
-                    _column_schemas.emplace_back(field);
-                    names.emplace_back(field->name);
+                if (_delete_file_schema == nullptr) {
+                    FileMetaData* metadata = nullptr;
+                    RETURN_IF_ERROR(delete_reader->file_metadata(&metadata));
+                    if (metadata == nullptr) {
+                        break;
+                    }
+                    _delete_file_schema = &metadata->schema();
+                    int num_of_col = _delete_file_schema->size();
+                    for (auto i = 0; i < num_of_col; ++i) {
+                        const FieldSchema* field = _delete_file_schema->get_column(i);
+                        _column_schemas.emplace_back(*field);
+                        names.emplace_back(field->name);
+                    }
                 }
-                Status d_st = delete_reader->init_reader(names, false);
+                DCHECK_EQ(_column_schemas.size(), _delete_file_schema->size());
+                // The expr assure reading delete file data from current file range only
+                Status d_st =
+                        delete_reader->init_reader(names, nullptr, _data_path_conjunct_ctx, false);
+                std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
+                        partition_columns;
+                std::unordered_map<std::string, VExprContext*> missing_columns;
+                delete_reader->set_fill_columns(partition_columns, missing_columns);
+
                 _delete_file_readers.emplace_back((GenericReader*)delete_reader);
 
                 ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
@@ -161,7 +217,7 @@ Status IcebergTableReader::init_row_filters() {
         }
     }
     // todo: equality delete
-    filter_rows();
+    filter_rows(range);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index e862fe6171..c71ffdaa9c 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -23,22 +23,25 @@
 
 #include "table_format_reader.h"
 #include "vec/exec/format/generic_reader.h"
+#include "vec/exprs/vexpr.h"
 
 namespace doris::vectorized {
 
 class IcebergTableReader : public TableFormatReader {
 public:
     IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
-                       RuntimeState* state, const TFileScanRangeParams& params)
-            : TableFormatReader(file_format_reader),
-              _profile(profile),
-              _state(state),
-              _params(params) {}
-    Status init_row_filters();
-    void filter_rows() override;
+                       RuntimeState* state, const TFileScanRangeParams& params);
+    ~IcebergTableReader() override;
+    Status init_row_filters(const TFileRangeDesc& range);
+    void filter_rows(const TFileRangeDesc& range) override;
 
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
 
+    Status set_fill_columns(
+            const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
+                    partition_columns,
+            const std::unordered_map<std::string, VExprContext*>& missing_columns) override;
+
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
@@ -52,13 +55,20 @@ public:
     };
 
 private:
+    struct IcebergProfile {
+        RuntimeProfile::Counter* _delete_files_init_time;
+        RuntimeProfile::Counter* _delete_files_read_total_time;
+    };
     RuntimeProfile* _profile;
     RuntimeState* _state;
     const TFileScanRangeParams& _params;
-    std::vector<const FieldSchema*> _column_schemas;
+    std::vector<FieldSchema> _column_schemas;
     std::deque<std::unique_ptr<GenericReader>> _delete_file_readers;
     std::unique_ptr<GenericReader> _cur_delete_file_reader;
     PositionDeleteParams _position_delete_params;
+    const FieldDescriptor* _delete_file_schema = nullptr;
+    VExprContext* _data_path_conjunct_ctx = nullptr;
+    IcebergProfile _iceberg_profile;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h
index 3409135dee..188d41a2b9 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -29,6 +29,7 @@ namespace doris::vectorized {
 class TableFormatReader : public GenericReader {
 public:
     TableFormatReader(GenericReader* file_format_reader);
+    ~TableFormatReader() override = default;
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override {
         return _file_format_reader->get_next_block(block, read_rows, eof);
     }
@@ -37,7 +38,7 @@ public:
         return _file_format_reader->get_columns(name_to_type, missing_cols);
     }
 
-    virtual void filter_rows() = 0;
+    virtual void filter_rows(const TFileRangeDesc& range) = 0;
 
 protected:
     std::string _table_format;                          // hudi, iceberg
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 6d170a6130..cf61384e81 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -491,11 +491,11 @@ Status VFileScanner::_get_next_reader() {
             }
             init_status = parquet_reader->init_reader(_file_col_names, _colname_to_value_range,
                                                       _push_down_expr);
-            if (_params.__isset.table_format_params &&
-                _params.table_format_params.table_format_type == "iceberg") {
+            if (range.__isset.table_format_params &&
+                range.table_format_params.table_format_type == "iceberg") {
                 IcebergTableReader* iceberg_reader = new IcebergTableReader(
                         (GenericReader*)parquet_reader, _profile, _state, _params);
-                iceberg_reader->init_row_filters();
+                iceberg_reader->init_row_filters(range);
                 _cur_reader.reset((GenericReader*)iceberg_reader);
             } else {
                 _cur_reader.reset((GenericReader*)parquet_reader);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index c42c97041b..4117f20539 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -207,7 +207,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
                 scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange);
                 break;
             case ICEBERG:
-                scanProvider = new IcebergScanProvider(hmsTable, desc, columnNameToRange);
+                scanProvider = new IcebergScanProvider(hmsTable, analyzer, desc, columnNameToRange);
                 break;
             case HIVE:
                 scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
index a1a1749c2f..6c8f916a5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.FileSplit;
 
 @Data
 public class HiveSplit extends FileSplit {
+    public HiveSplit() {}
 
     public HiveSplit(Path file, long start, long length, String[] hosts) {
         super(file, start, length, hosts);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index 30b8888fd7..7681d6a66d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -17,8 +17,19 @@
 
 package org.apache.doris.planner.external;
 
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BaseTableRef;
+import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -26,10 +37,15 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TIcebergDeleteFileDesc;
 import org.apache.doris.thrift.TIcebergFileDesc;
+import org.apache.doris.thrift.TIcebergTable;
+import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableFormatFileDesc;
+import org.apache.doris.thrift.TTableType;
 
+import lombok.Data;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputSplit;
@@ -59,13 +75,20 @@ import java.util.OptionalLong;
 public class IcebergScanProvider extends HiveScanProvider {
 
     private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
+    public static final String V2_DELETE_TBL = "iceberg#delete#tbl";
+    public static final String V2_DELETE_DB = "iceberg#delete#db";
+    private static final DeleteFileTempTable scanDeleteTable =
+            new DeleteFileTempTable(TableIf.TableType.HMS_EXTERNAL_TABLE);
+    private final Analyzer analyzer;
 
-    public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
-            Map<String, ColumnRange> columnNameToRange) {
+    public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, TupleDescriptor desc,
+                               Map<String, ColumnRange> columnNameToRange) {
         super(hmsTable, desc, columnNameToRange);
+        this.analyzer = analyzer;
     }
 
-    public static void setIcebergParams(ExternalFileScanNode.ParamCreateContext context, IcebergSplit icebergSplit) {
+    public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit)
+            throws UserException {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
         TIcebergFileDesc fileDesc = new TIcebergFileDesc();
@@ -74,6 +97,7 @@ public class IcebergScanProvider extends HiveScanProvider {
         if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
             fileDesc.setContent(FileContent.DATA.id());
         } else {
+            setPathSelectConjunct(fileDesc, icebergSplit);
             for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
                 TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
                 deleteFileDesc.setPath(filter.getDeleteFilePath());
@@ -99,7 +123,20 @@ public class IcebergScanProvider extends HiveScanProvider {
             }
         }
         tableFormatFileDesc.setIcebergParams(fileDesc);
-        context.params.setTableFormatParams(tableFormatFileDesc);
+        rangeDesc.setTableFormatParams(tableFormatFileDesc);
+    }
+
+    private static void setPathSelectConjunct(TIcebergFileDesc fileDesc, IcebergSplit icebergSplit)
+                throws UserException {
+        BaseTableRef tableRef = icebergSplit.getDeleteTableRef();
+        fileDesc.setDeleteTableTupleId(tableRef.getDesc().getId().asInt());
+        SlotRef lhs = new SlotRef(tableRef.getName(), DeleteFileTempTable.DATA_FILE_PATH);
+        lhs.analyze(icebergSplit.getAnalyzer());
+        lhs.getDesc().setIsMaterialized(true);
+        StringLiteral rhs = new StringLiteral(icebergSplit.getPath().toUri().toString());
+        BinaryPredicate pathSelectConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs);
+        pathSelectConjunct.analyze(icebergSplit.getAnalyzer());
+        fileDesc.setFileSelectConjunct(pathSelectConjunct.treeToThrift());
     }
 
     @Override
@@ -134,18 +171,27 @@ public class IcebergScanProvider extends HiveScanProvider {
             scan = scan.filter(predicate);
         }
         List<InputSplit> splits = new ArrayList<>();
-
         int formatVersion = ((BaseTable) table).operations().current().formatVersion();
+        BaseTableRef tableRef = null;
+        if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
+            TableName fullName = analyzer.getFqTableName(scanDeleteTable.getTableName());
+            fullName.analyze(analyzer);
+            TableRef ref = new TableRef(fullName, fullName.toString(), null);
+            tableRef = new BaseTableRef(ref, scanDeleteTable, scanDeleteTable.getTableName());
+            tableRef.analyze(analyzer);
+        }
         for (FileScanTask task : scan.planFiles()) {
             for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
                 String dataFilePath = spitTask.file().path().toString();
                 IcebergSplit split = new IcebergSplit(new Path(dataFilePath), spitTask.start(),
                         spitTask.length(), new String[0]);
                 split.setFormatVersion(formatVersion);
-                if (formatVersion == 2) {
+                if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     split.setDeleteFileFilters(getDeleteFileFilters(spitTask));
+                    split.setDeleteTableRef(tableRef);
                 }
                 split.setTableFormatType(TableFormatType.ICEBERG);
+                split.setAnalyzer(analyzer);
                 splits.add(split);
             }
         }
@@ -193,4 +239,32 @@ public class IcebergScanProvider extends HiveScanProvider {
     public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
         return Collections.emptyList();
     }
+
+    @Data
+    static class DeleteFileTempTable extends ExternalTable {
+        public static final String DATA_FILE_PATH = "file_path";
+        private final TableName tableName;
+        private final List<Column> fullSchema = new ArrayList<>();
+
+        public DeleteFileTempTable(TableType type) {
+            super(0, V2_DELETE_TBL, null, V2_DELETE_DB, type);
+            this.tableName = new TableName(null, V2_DELETE_DB, V2_DELETE_TBL);
+            Column dataFilePathCol = new Column(DATA_FILE_PATH, PrimitiveType.STRING, true);
+            this.fullSchema.add(dataFilePathCol);
+        }
+
+        @Override
+        public List<Column> getFullSchema() {
+            return fullSchema;
+        }
+
+        @Override
+        public TTableDescriptor toThrift() {
+            TIcebergTable tIcebergTable = new TIcebergTable(V2_DELETE_DB, V2_DELETE_TBL, new HashMap<>());
+            TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE,
+                    fullSchema.size(), 0, getName(), "");
+            tTableDescriptor.setIcebergTable(tIcebergTable);
+            return tTableDescriptor;
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
index 05ae3c69d3..14fce6caf8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.planner.external;
 
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BaseTableRef;
+
 import lombok.Data;
 import org.apache.hadoop.fs.Path;
 
@@ -28,8 +31,10 @@ public class IcebergSplit extends HiveSplit {
         super(file, start, length, hosts);
     }
 
+    private Analyzer analyzer;
     private String dataFilePath;
     private Integer formatVersion;
+    private BaseTableRef deleteTableRef;
     private List<IcebergDeleteFileFilter> deleteFileFilters;
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 3066b7354a..a2ce044bb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -73,9 +73,6 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
                 context.params.setFileAttributes(getFileAttributes());
             }
 
-            if (inputSplit instanceof IcebergSplit) {
-                IcebergScanProvider.setIcebergParams(context, (IcebergSplit) inputSplit);
-            }
             // set hdfs params for hdfs file type.
             Map<String, String> locationProperties = getLocationProperties();
             if (locationType == TFileType.FILE_HDFS) {
@@ -96,7 +93,6 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
             } else if (locationType == TFileType.FILE_S3) {
                 context.params.setProperties(locationProperties);
             }
-
             TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
 
             FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
@@ -108,7 +104,10 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
                         pathPartitionKeys, false);
 
                 TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
-
+                // external data lake table
+                if (split instanceof IcebergSplit) {
+                    IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) inputSplit);
+                }
                 curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
                 LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
                         curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2725c67acc..0dd3373b74 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -268,7 +268,10 @@ struct TIcebergFileDesc {
     1: optional i32 format_version;
     // Iceberg file type, 0: data, 1: position delete, 2: equality delete.
     2: optional i32 content;
+    // When open a delete file, filter the data file path with the 'file_path' property
     3: optional list<TIcebergDeleteFileDesc> delete_files;
+    4: optional Types.TTupleId delete_table_tuple_id;
+    5: optional Exprs.TExpr file_select_conjunct;
 }
 
 struct TTableFormatFileDesc {
@@ -308,7 +311,7 @@ struct TFileScanRangeParams {
     14: optional list<Types.TNetworkAddress> broker_addresses
     15: optional TFileAttributes file_attributes
     16: optional Exprs.TExpr pre_filter_exprs
-    // For data lake table format
+    // Deprecated, For data lake table format
     17: optional TTableFormatFileDesc table_format_params
     // For csv query task, same the column index in file, order by dest_tuple
     18: optional list<i32> column_idxs
@@ -329,6 +332,8 @@ struct TFileRangeDesc {
     6: optional list<string> columns_from_path;
     // column names from file path, in the same order with columns_from_path
     7: optional list<string> columns_from_path_keys;
+    // For data lake table format
+    8: optional TTableFormatFileDesc table_format_params
 }
 
 // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org