You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/08 14:14:59 UTC

[doris] branch branch-1.2-lts updated: [opt](multi-catalog) cache and reuse position delete rows in iceberg v2 (#15670)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 48a097b040 [opt](multi-catalog) cache and reuse position delete rows in iceberg v2 (#15670)
48a097b040 is described below

commit 48a097b040bda777dcf49ac30639f9fb8a5fde3f
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Sat Jan 7 22:29:11 2023 +0800

    [opt](multi-catalog) cache and reuse position delete rows in iceberg v2 (#15670)
    
    A deleted file may belong to multiple data files. Each data file will read a full amount of deleted files,
    so a deleted file may be read repeatedly. The deleted files can be cached, and multiple data files
    can reuse the first read content.
    
    The performance is improved by 60% in the case of single thread, and by 30% in the case of multithreading.
---
 be/src/vec/exec/format/format_common.h          |  60 +++++
 be/src/vec/exec/format/table/iceberg_reader.cpp | 345 +++++++++++++-----------
 be/src/vec/exec/format/table/iceberg_reader.h   |  26 +-
 be/src/vec/exec/scan/new_file_scan_node.cpp     |   4 +-
 be/src/vec/exec/scan/new_file_scan_node.h       |   2 +
 be/src/vec/exec/scan/vfile_scanner.cpp          |  10 +-
 be/src/vec/exec/scan/vfile_scanner.h            |   6 +-
 7 files changed, 275 insertions(+), 178 deletions(-)

diff --git a/be/src/vec/exec/format/format_common.h b/be/src/vec/exec/format/format_common.h
index c5570f967d..18277c1ed4 100644
--- a/be/src/vec/exec/format/format_common.h
+++ b/be/src/vec/exec/format/format_common.h
@@ -47,4 +47,64 @@ struct DecimalScaleParams {
     }
 };
 
+/**
+ * Key-Value Cache Helper.
+ *
+ * It store a object instance global. User can invoke get method by key and a
+ * object creator callback. If there is a instance stored in cache, then it will
+ * return a void pointer of it, otherwise, it will invoke creator callback, create
+ * a new instance store global, and return it.
+ *
+ * The stored objects will be deleted when deconstructing, so user do not need to
+ * delete the returned pointer.
+ *
+ * User can invoke erase method by key to delete data.
+ *
+ * @tparam KType is the key type
+ */
+template <typename KType>
+class KVCache {
+public:
+    KVCache() = default;
+
+    ~KVCache() {
+        for (auto& kv : _storage) {
+            _delete_fn[kv.first](kv.second);
+        }
+    }
+
+    void erase(const KType& key) {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto it = _storage.find(key);
+        if (it != _storage.end()) {
+            _delete_fn[key](_storage[key]);
+            _storage.erase(key);
+            _delete_fn.erase(key);
+        }
+    }
+
+    template <class T>
+    T* get(const KType& key, const std::function<T*()> createIfNotExists) {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto it = _storage.find(key);
+        if (it != _storage.end()) {
+            return reinterpret_cast<T*>(it->second);
+        } else {
+            T* rawPtr = createIfNotExists();
+            if (rawPtr != nullptr) {
+                _delete_fn[key] = [](void* obj) { delete reinterpret_cast<T*>(obj); };
+                _storage[key] = rawPtr;
+            }
+            return rawPtr;
+        }
+    }
+
+private:
+    using DeleteFn = void (*)(void*);
+
+    std::mutex _lock;
+    std::unordered_map<KType, DeleteFn> _delete_fn;
+    std::unordered_map<KType, void*> _storage;
+};
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index a4e5bbd618..f506597fc4 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -24,18 +24,25 @@
 
 namespace doris::vectorized {
 
+using DeleteRows = std::vector<int64_t>;
+using DeleteFile = phmap::parallel_flat_hash_map<
+        std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>,
+        std::equal_to<std::string>,
+        std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, std::mutex>;
+
 const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
 const std::string ICEBERG_ROW_POS = "pos";
 const std::string ICEBERG_FILE_PATH = "file_path";
 
 IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
                                        RuntimeState* state, const TFileScanRangeParams& params,
-                                       const TFileRangeDesc& range)
+                                       const TFileRangeDesc& range, KVCache<std::string>& kv_cache)
         : TableFormatReader(file_format_reader),
           _profile(profile),
           _state(state),
           _params(params),
-          _range(range) {
+          _range(range),
+          _kv_cache(kv_cache) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
     _iceberg_profile.num_delete_files =
@@ -72,29 +79,49 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
         return Status::OK();
     }
     auto& delete_file_type = table_desc.content;
-    auto files = table_desc.delete_files;
+    const std::vector<TIcebergDeleteFileDesc>& files = table_desc.delete_files;
     if (files.empty()) {
         return Status::OK();
     }
     if (delete_file_type == POSITION_DELETE) {
-        // position delete
-        ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
-        RowRange whole_range = parquet_reader->get_whole_range();
-        bool init_schema = false;
-        std::vector<std::string> delete_file_col_names;
-        std::vector<TypeDescriptor> delete_file_col_types;
-        std::list<std::vector<int64_t>> delete_rows_list;
-        delete_rows_list.resize(files.size());
-        int64_t num_delete_rows = 0;
-        auto delete_rows_iter = delete_rows_list.begin();
-        for (auto& delete_file : files) {
-            if (whole_range.last_row <= delete_file.position_lower_bound ||
-                whole_range.first_row > delete_file.position_upper_bound) {
-                delete_rows_iter++;
-                continue;
-            }
-            SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
-            std::vector<int64_t>& delete_rows = *delete_rows_iter;
+        RETURN_IF_ERROR(_position_delete(files));
+    }
+    // todo: equality delete
+    COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
+    return Status::OK();
+}
+
+Status IcebergTableReader::_position_delete(
+        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
+    std::string data_file_path = _range.path;
+    // the path in _range is remove the namenode prefix,
+    // and the file_path in delete file is full path, so we should add it back.
+    if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
+        std::string fs_name = _params.hdfs_params.fs_name;
+        if (!starts_with(data_file_path, fs_name)) {
+            data_file_path = fs_name + data_file_path;
+        }
+    }
+
+    // position delete
+    ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
+    RowRange whole_range = parquet_reader->get_whole_range();
+    bool init_schema = false;
+    std::vector<std::string> delete_file_col_names;
+    std::vector<TypeDescriptor> delete_file_col_types;
+    std::vector<DeleteRows*> delete_rows_array;
+    int64_t num_delete_rows = 0;
+    std::vector<DeleteFile*> erase_data;
+    for (auto& delete_file : delete_files) {
+        if (whole_range.last_row <= delete_file.position_lower_bound ||
+            whole_range.first_row > delete_file.position_upper_bound) {
+            continue;
+        }
+
+        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
+        Status create_status = Status::OK();
+        DeleteFile* delete_file_cache = _kv_cache.get<
+                DeleteFile>(delete_file.path, [&]() -> DeleteFile* {
             TFileRangeDesc delete_range;
             delete_range.path = delete_file.path;
             delete_range.start_offset = 0;
@@ -106,41 +133,34 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
                 delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
                 init_schema = true;
             }
-            std::string data_file_path = _range.path;
-            // the path in _range is remove the namenode prefix,
-            // and the file_path in delete file is full path, so we should add it back.
-            if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
-                std::string fs_name = _params.hdfs_params.fs_name;
-                if (!starts_with(data_file_path, fs_name)) {
-                    data_file_path = fs_name + data_file_path;
-                }
-            }
-            Status init_st =
+            create_status =
                     delete_reader.init_reader(delete_file_col_names, nullptr, nullptr, false);
-            if (init_st.is_end_of_file()) {
-                continue;
-            } else if (!init_st.ok()) {
-                return init_st;
+            if (!create_status.ok()) {
+                return nullptr;
             }
+
             std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
                     partition_columns;
             std::unordered_map<std::string, VExprContext*> missing_columns;
             delete_reader.set_fill_columns(partition_columns, missing_columns);
 
-            bool dictionary_coded = false;
+            bool dictionary_coded = true;
             const tparquet::FileMetaData* meta_data = delete_reader.get_meta_data();
             for (int i = 0; i < delete_file_col_names.size(); ++i) {
                 if (delete_file_col_names[i] == ICEBERG_FILE_PATH) {
-                    // ParquetReader wil return EndOfFile if there's no row group
-                    auto& column_chunk = meta_data->row_groups[0].columns[i];
-                    if (column_chunk.__isset.meta_data &&
-                        column_chunk.meta_data.__isset.dictionary_page_offset) {
-                        dictionary_coded = true;
+                    for (int j = 0; j < meta_data->row_groups.size(); ++j) {
+                        auto& column_chunk = meta_data->row_groups[j].columns[i];
+                        if (!(column_chunk.__isset.meta_data &&
+                              column_chunk.meta_data.__isset.dictionary_page_offset)) {
+                            dictionary_coded = false;
+                            break;
+                        }
                     }
                     break;
                 }
             }
 
+            DeleteFile* position_delete = new DeleteFile;
             bool eof = false;
             while (!eof) {
                 Block block = Block();
@@ -162,161 +182,170 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
                 }
                 eof = false;
                 size_t read_rows = 0;
-                RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
+                create_status = delete_reader.get_next_block(&block, &read_rows, &eof);
+                if (!create_status.ok()) {
+                    return nullptr;
+                }
                 if (read_rows > 0) {
                     ColumnPtr path_column = block.get_by_name(ICEBERG_FILE_PATH).column;
                     DCHECK_EQ(path_column->size(), read_rows);
-                    std::pair<int, int> path_range;
+                    ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column;
+                    using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
+                    const int64_t* src_data =
+                            assert_cast<const ColumnType&>(*pos_column).get_data().data();
+                    IcebergTableReader::PositionDeleteRange range;
                     if (dictionary_coded) {
-                        path_range = _binary_search(assert_cast<const ColumnDictI32&>(*path_column),
-                                                    data_file_path);
+                        range = _get_range(assert_cast<const ColumnDictI32&>(*path_column));
                     } else {
-                        path_range = _binary_search(assert_cast<const ColumnString&>(*path_column),
-                                                    data_file_path);
+                        range = _get_range(assert_cast<const ColumnString&>(*path_column));
                     }
-
-                    int skip_count = path_range.first;
-                    int valid_count = path_range.second;
-                    if (valid_count > 0) {
-                        // delete position
-                        ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column;
-                        CHECK_EQ(pos_column->size(), read_rows);
-                        using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
-                        const int64_t* src_data =
-                                assert_cast<const ColumnType&>(*pos_column).get_data().data() +
-                                skip_count;
-                        const int64_t* src_data_end = src_data + valid_count;
-                        const int64_t* cpy_start =
-                                std::lower_bound(src_data, src_data_end, whole_range.first_row);
-                        const int64_t* cpy_end =
-                                std::lower_bound(cpy_start, src_data_end, whole_range.last_row);
-                        int64_t cpy_count = cpy_end - cpy_start;
-
-                        if (cpy_count > 0) {
-                            int64_t origin_size = delete_rows.size();
-                            delete_rows.resize(origin_size + cpy_count);
-                            int64_t* dest_position = &delete_rows[origin_size];
-                            memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
-                            num_delete_rows += cpy_count;
+                    for (int i = 0; i < range.range.size(); ++i) {
+                        std::string key = range.data_file_path[i];
+                        auto iter = position_delete->find(key);
+                        DeleteRows* delete_rows;
+                        if (iter == position_delete->end()) {
+                            delete_rows = new DeleteRows;
+                            std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows);
+                            (*position_delete)[key] = std::move(delete_rows_ptr);
+                        } else {
+                            delete_rows = iter->second.get();
                         }
+                        const int64_t* cpy_start = src_data + range.range[i].first;
+                        const int64_t cpy_count = range.range[i].second - range.range[i].first;
+                        int64_t origin_size = delete_rows->size();
+                        delete_rows->resize(origin_size + cpy_count);
+                        int64_t* dest_position = &(*delete_rows)[origin_size];
+                        memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
                     }
                 }
             }
-            delete_rows_iter++;
+            return position_delete;
+        });
+        if (create_status.is_end_of_file()) {
+            continue;
+        } else if (!create_status.ok()) {
+            return create_status;
         }
-        if (num_delete_rows > 0) {
-            for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end();) {
-                if (iter->empty()) {
-                    delete_rows_list.erase(iter++);
-                } else {
-                    iter++;
+
+        DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
+        auto get_value = [&](const auto& v) {
+            DeleteRows* row_ids;
+            // remove those compatibility codes when we finish upgrade phmap.
+            if constexpr (std::is_same_v<const typename DeleteFile::mapped_type&, decltype(v)>) {
+                row_ids = v.get();
+            } else {
+                row_ids = v.second.get();
+            }
+            if (row_ids->size() > 0) {
+                delete_rows_array.emplace_back(row_ids);
+                num_delete_rows += row_ids->size();
+                if (row_ids->front() >= whole_range.first_row &&
+                    row_ids->back() < whole_range.last_row) {
+                    erase_data.emplace_back(delete_file_cache);
                 }
             }
-            SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
-            _merge_sort(delete_rows_list, num_delete_rows);
-            parquet_reader->set_delete_rows(&_delete_rows);
-            COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
-        }
+        };
+        delete_file_map.if_contains(data_file_path, get_value);
+    }
+    if (num_delete_rows > 0) {
+        SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
+        _sort_delete_rows(delete_rows_array, num_delete_rows);
+        parquet_reader->set_delete_rows(&_delete_rows);
+        COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
+    }
+    // the delete rows are copy out, we can erase them.
+    for (auto& erase_item : erase_data) {
+        erase_item->erase(data_file_path);
     }
-    // todo: equality delete
-    COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
     return Status::OK();
 }
 
-std::pair<int, int> IcebergTableReader::_binary_search(const ColumnDictI32& file_path_column,
-                                                       const std::string& data_file_path) {
-    size_t read_rows = file_path_column.get_data().size();
-
-    int data_file_code = file_path_column.find_code(StringValue(data_file_path));
-    if (data_file_code == -2) { // -1 is null code
-        return std::make_pair(read_rows, 0);
+IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range(
+        const ColumnDictI32& file_path_column) {
+    IcebergTableReader::PositionDeleteRange range;
+    int read_rows = file_path_column.get_data().size();
+    int* code_path = const_cast<int*>(file_path_column.get_data().data());
+    int* code_path_start = code_path;
+    int* code_path_end = code_path + read_rows;
+    while (code_path < code_path_end) {
+        int code = code_path[0];
+        int* code_end = std::upper_bound(code_path, code_path_end, code);
+        range.data_file_path.emplace_back(file_path_column.get_value(code).to_string());
+        range.range.emplace_back(
+                std::make_pair(code_path - code_path_start, code_end - code_path_start));
+        code_path = code_end;
     }
-
-    const int* coded_path = file_path_column.get_data().data();
-    const int* coded_path_end = coded_path + read_rows;
-    const int* path_start = std::lower_bound(coded_path, coded_path_end, data_file_code);
-    const int* path_end = std::lower_bound(path_start, coded_path_end, data_file_code + 1);
-    int skip_count = path_start - coded_path;
-    int valid_count = path_end - path_start;
-
-    return std::make_pair(skip_count, valid_count);
+    return range;
 }
 
-std::pair<int, int> IcebergTableReader::_binary_search(const ColumnString& file_path_column,
-                                                       const std::string& data_file_path) {
-    const int read_rows = file_path_column.size();
-    if (read_rows == 0) {
-        return std::make_pair(0, 0);
-    }
-    StringRef data_file(data_file_path);
-
-    int left = 0;
-    int right = read_rows - 1;
-    if (file_path_column.get_data_at(left) > data_file ||
-        file_path_column.get_data_at(right) < data_file) {
-        return std::make_pair(read_rows, 0);
-    }
-    while (left < right) {
-        int mid = (left + right) / 2;
-        if (file_path_column.get_data_at(mid) < data_file) {
-            left = mid;
-        } else {
-            right = mid;
-        }
-    }
-    if (file_path_column.get_data_at(left) == data_file) {
-        int start = left;
-        int end = read_rows - 1;
-        while (start < end) {
-            int pivot = (start + end) / 2;
-            if (file_path_column.get_data_at(pivot) > data_file) {
-                end = pivot;
+IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range(
+        const ColumnString& file_path_column) {
+    IcebergTableReader::PositionDeleteRange range;
+    int read_rows = file_path_column.size();
+    int index = 0;
+    while (index < read_rows) {
+        StringRef data_path = file_path_column.get_data_at(index);
+        int left = index;
+        int right = read_rows - 1;
+        while (left < right) {
+            int mid = left + (right - left) / 2;
+            if (file_path_column.get_data_at(mid) > data_path) {
+                right = mid;
             } else {
-                start = pivot;
+                left = mid;
             }
         }
-        return std::make_pair(left, end - left + 1);
-    } else {
-        return std::make_pair(read_rows, 0);
+        range.data_file_path.emplace_back(data_path.to_string());
+        range.range.emplace_back(std::make_pair(index, left + 1));
+        index = left + 1;
     }
+    return range;
 }
 
-void IcebergTableReader::_merge_sort(std::list<std::vector<int64_t>>& delete_rows_list,
-                                     int64_t num_delete_rows) {
-    if (delete_rows_list.empty()) {
+void IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& delete_rows_array,
+                                           int64_t num_delete_rows) {
+    if (delete_rows_array.empty()) {
         return;
     }
-    if (delete_rows_list.size() == 1) {
+    if (delete_rows_array.size() == 1) {
         _delete_rows.resize(num_delete_rows);
-        memcpy(&_delete_rows[0], &(delete_rows_list.front()[0]), sizeof(int64_t) * num_delete_rows);
+        memcpy(&_delete_rows[0], &((*delete_rows_array.front())[0]),
+               sizeof(int64_t) * num_delete_rows);
         return;
     }
-    if (delete_rows_list.size() == 2) {
+    if (delete_rows_array.size() == 2) {
         _delete_rows.resize(num_delete_rows);
-        std::merge(delete_rows_list.front().begin(), delete_rows_list.front().end(),
-                   delete_rows_list.back().begin(), delete_rows_list.back().end(),
+        std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
+                   delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
                    _delete_rows.begin());
         return;
     }
 
-    // merge sort
-    using vec_pair =
-            std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::const_iterator>;
-    auto cmp = [](const vec_pair& a, const vec_pair& b) { return *a.first > *b.first; };
-    std::priority_queue<vec_pair, vector<vec_pair>, decltype(cmp)> pq(cmp);
-    for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end(); ++iter) {
-        if (iter->size() > 0) {
-            pq.push({iter->begin(), iter->end()});
+    using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
+    _delete_rows.resize(num_delete_rows);
+    auto row_id_iter = _delete_rows.begin();
+    auto iter_end = _delete_rows.end();
+    std::vector<vec_pair> rows_array;
+    for (auto rows : delete_rows_array) {
+        if (rows->size() > 0) {
+            rows_array.push_back({rows->begin(), rows->end()});
         }
     }
-    _delete_rows.reserve(num_delete_rows);
-    while (!pq.empty()) {
-        vec_pair p = pq.top();
-        pq.pop();
-        _delete_rows.emplace_back(*p.first);
-        p.first++;
-        if (p.first != p.second) {
-            pq.push(p);
+    int array_size = rows_array.size();
+    while (row_id_iter != iter_end) {
+        int min_index = 0;
+        int min = *rows_array[0].first;
+        for (int i = 0; i < array_size; ++i) {
+            if (*rows_array[i].first < min) {
+                min_index = i;
+                min = *rows_array[i].first;
+            }
+        }
+        *row_id_iter++ = min;
+        rows_array[min_index].first++;
+        if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
+            rows_array.erase(rows_array.begin() + min_index);
+            array_size--;
         }
     }
 }
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index d689fbcf51..301546b6df 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -29,9 +29,14 @@ namespace doris::vectorized {
 
 class IcebergTableReader : public TableFormatReader {
 public:
+    struct PositionDeleteRange {
+        std::vector<std::string> data_file_path;
+        std::vector<std::pair<int, int>> range;
+    };
+
     IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
                        RuntimeState* state, const TFileScanRangeParams& params,
-                       const TFileRangeDesc& range);
+                       const TFileRangeDesc& range, KVCache<std::string>& kv_cache);
     ~IcebergTableReader() override = default;
 
     Status init_row_filters(const TFileRangeDesc& range) override;
@@ -56,31 +61,26 @@ private:
         RuntimeProfile::Counter* delete_rows_sort_time;
     };
 
+    Status _position_delete(const std::vector<TIcebergDeleteFileDesc>& delete_files);
+
     /**
      * https://iceberg.apache.org/spec/#position-delete-files
      * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
      * Sorting by file_path allows filter pushdown by file in columnar storage formats.
      * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
-     *
-     * So, use merge-sort to merge delete rows from different files.
      */
-    void _merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows);
+    void _sort_delete_rows(std::vector<std::vector<int64_t>*>& delete_rows_array,
+                           int64_t num_delete_rows);
 
-    /**
-     * Delete rows is sorted by file_path, using binary-search to locate the right delete rows for current data file.
-     * @return a pair of \<skip_count, valid_count\>,
-     * and the range of [skip_count, skip_count + valid_count) is the delete rows for current data file.
-     */
-    std::pair<int, int> _binary_search(const ColumnDictI32& file_path_column,
-                                       const std::string& data_file_path);
+    PositionDeleteRange _get_range(const ColumnDictI32& file_path_column);
 
-    std::pair<int, int> _binary_search(const ColumnString& file_path_column,
-                                       const std::string& data_file_path);
+    PositionDeleteRange _get_range(const ColumnString& file_path_column);
 
     RuntimeProfile* _profile;
     RuntimeState* _state;
     const TFileScanRangeParams& _params;
     const TFileRangeDesc& _range;
+    KVCache<std::string>& _kv_cache;
     IcebergProfile _iceberg_profile;
     std::vector<int64_t> _delete_rows;
 };
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 49319f1e6b..59d3ca8453 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -100,8 +100,8 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
 }
 
 VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
-    VScanner* scanner =
-            new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile());
+    VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range,
+                                         runtime_profile(), _kv_cache);
     ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range);
     _scanner_pool.add(scanner);
     // TODO: Can we remove _conjunct_ctxs and use _vconjunct_ctx_ptr instead?
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h
index 53b11e408d..26eed2310b 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "vec/exec/format/format_common.h"
 #include "vec/exec/scan/vscan_node.h"
 
 namespace doris::vectorized {
@@ -41,5 +42,6 @@ private:
 
 private:
     std::vector<TScanRangeParams> _scan_ranges;
+    KVCache<std::string> _kv_cache;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 4043174a4a..d0492ea9c7 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -22,7 +22,6 @@
 
 #include <vec/data_types/data_type_factory.hpp>
 
-#include "../format/table/iceberg_reader.h"
 #include "common/logging.h"
 #include "common/utils.h"
 #include "exec/arrow/orc_reader.h"
@@ -42,7 +41,8 @@
 namespace doris::vectorized {
 
 VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                           const TFileScanRange& scan_range, RuntimeProfile* profile)
+                           const TFileScanRange& scan_range, RuntimeProfile* profile,
+                           KVCache<std::string>& kv_cache)
         : VScanner(state, static_cast<VScanNode*>(parent), limit),
           _params(scan_range.params),
           _ranges(scan_range.ranges),
@@ -51,6 +51,7 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
           _cur_reader_eof(false),
           _mem_pool(std::make_unique<MemPool>()),
           _profile(profile),
+          _kv_cache(kv_cache),
           _strict_mode(false) {
     if (scan_range.params.__isset.strict_mode) {
         _strict_mode = scan_range.params.strict_mode;
@@ -492,8 +493,9 @@ Status VFileScanner::_get_next_reader() {
                                                       _push_down_expr);
             if (range.__isset.table_format_params &&
                 range.table_format_params.table_format_type == "iceberg") {
-                IcebergTableReader* iceberg_reader = new IcebergTableReader(
-                        (GenericReader*)parquet_reader, _profile, _state, _params, range);
+                IcebergTableReader* iceberg_reader =
+                        new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state,
+                                               _params, range, _kv_cache);
                 RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
                 _cur_reader.reset((GenericReader*)iceberg_reader);
             } else {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index 6ac802f60c..28c0e3d347 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -23,6 +23,7 @@
 #include "exprs/function_filter.h"
 #include "io/file_factory.h"
 #include "runtime/tuple.h"
+#include "vec/exec/format/format_common.h"
 #include "vec/exec/format/generic_reader.h"
 #include "vec/exec/scan/vscanner.h"
 
@@ -33,7 +34,8 @@ class NewFileScanNode;
 class VFileScanner : public VScanner {
 public:
     VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                 const TFileScanRange& scan_range, RuntimeProfile* profile);
+                 const TFileScanRange& scan_range, RuntimeProfile* profile,
+                 KVCache<string>& kv_cache);
 
     Status open(RuntimeState* state) override;
 
@@ -107,6 +109,8 @@ protected:
     // Profile
     RuntimeProfile* _profile;
 
+    KVCache<std::string>& _kv_cache;
+
     bool _scanner_eof = false;
     int _rows = 0;
     int _num_of_columns_from_file;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org