You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/30 05:09:48 UTC

[doris] branch branch-1.2-lts updated: [optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 36b61d4826 [optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441)
36b61d4826 is described below

commit 36b61d4826eeb1f172d7043a32d471aaeb64e1e1
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Fri Dec 30 08:57:55 2022 +0800

    [optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441)
    
    **Optimize**
    PR #14470 has used `Expr` to filter delete rows to match current data file,
    but the rows in the delete file are [sorted by file_path then position](https://iceberg.apache.org/spec/#position-delete-files)
    to optimize filtering rows while scanning, so this PR remove `Expr` and use binary search to filter delete rows.
    
    In addition, delete files are likely to be encoded in dictionary, it's time-consuming to decode `file_path`
    columns into `ColumnString`, so this PR use `ColumnDictionary` to read `file_path` column.
    
    After testing, the performance of iceberg v2's MOR is improved by 30%+.
    
    **Fix Bug**
    Lazy-read-block may not have the filter column, if the whole group is filtered by `Expr`
    and the batch_eof is generated from next batch.
---
 be/src/vec/columns/column_dictionary.h             |  11 ++
 be/src/vec/exec/format/parquet/parquet_common.cpp  |  56 +++++++
 be/src/vec/exec/format/parquet/parquet_common.h    |  21 ++-
 .../parquet/vparquet_column_chunk_reader.cpp       |   4 +
 .../format/parquet/vparquet_column_chunk_reader.h  |   1 +
 .../exec/format/parquet/vparquet_group_reader.cpp  |  10 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  19 ++-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   4 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 184 ++++++++++++++++-----
 be/src/vec/exec/format/table/iceberg_reader.h      |  28 +++-
 be/src/vec/exec/scan/vfile_scanner.cpp             |   4 +-
 .../planner/external/IcebergScanProvider.java      |  69 --------
 .../doris/planner/external/IcebergSplit.java       |   2 -
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 14 files changed, 274 insertions(+), 141 deletions(-)

diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h
index 729957946f..422e2fdfbb 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -211,6 +211,17 @@ public:
         LOG(FATAL) << "should not call replace_column_data_default in ColumnDictionary";
     }
 
+    /**
+     * Just insert dictionary data items, the items will append into _dict.
+     */
+    void insert_many_dict_data(const StringRef* dict_array, uint32_t dict_num) {
+        _dict.reserve(_dict.size() + dict_num);
+        for (uint32_t i = 0; i < dict_num; ++i) {
+            auto value = StringValue(dict_array[i].data, dict_array[i].size);
+            _dict.insert_value(value);
+        }
+    }
+
     void insert_many_dict_data(const int32_t* data_array, size_t start_index,
                                const StringRef* dict_array, size_t data_num,
                                uint32_t dict_num) override {
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 98f0a7de1a..5f8656fcd9 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -18,6 +18,7 @@
 #include "parquet_common.h"
 
 #include "util/coding.h"
+#include "vec/columns/column_dictionary.h"
 #include "vec/data_types/data_type_nullable.h"
 
 namespace doris::vectorized {
@@ -277,6 +278,36 @@ void Decoder::init(FieldSchema* field_schema, cctz::time_zone* ctz) {
     }
 }
 
+Status Decoder::_decode_dict_values(MutableColumnPtr& doris_column,
+                                    ColumnSelectVector& select_vector) {
+    DCHECK(doris_column->is_column_dictionary());
+    size_t dict_index = 0;
+    ColumnSelectVector::DataReadType read_type;
+    auto& column_data = assert_cast<ColumnDictI32&>(*doris_column).get_data();
+    while (size_t run_length = select_vector.get_next_run(&read_type)) {
+        switch (read_type) {
+        case ColumnSelectVector::CONTENT: {
+            uint32_t* start_index = &_indexes[0];
+            column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
+            dict_index += run_length;
+            break;
+        }
+        case ColumnSelectVector::NULL_DATA: {
+            doris_column->insert_many_defaults(run_length);
+            break;
+        }
+        case ColumnSelectVector::FILTERED_CONTENT: {
+            dict_index += run_length;
+            break;
+        }
+        case ColumnSelectVector::FILTERED_NULL: {
+            break;
+        }
+        }
+    }
+    return Status::OK();
+}
+
 Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length,
                                   size_t num_values) {
     if (num_values * _type_length != length) {
@@ -321,11 +352,26 @@ Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP
                                        ColumnSelectVector& select_vector) {
     size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
     if (_has_dict) {
+        if (doris_column->is_column_dictionary() &&
+            assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
+            std::vector<StringRef> dict_items;
+            dict_items.reserve(_dict_items.size());
+            for (int i = 0; i < _dict_items.size(); ++i) {
+                dict_items.emplace_back(_dict_items[i], _type_length);
+            }
+            assert_cast<ColumnDictI32&>(*doris_column)
+                    .insert_many_dict_data(&dict_items[0], dict_items.size());
+        }
         _indexes.resize(non_null_size);
         _index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
     } else if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) {
         return Status::IOError("Out-of-bounds access in parquet data decoder");
     }
+
+    if (doris_column->is_column_dictionary()) {
+        return _decode_dict_values(doris_column, select_vector);
+    }
+
     TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
     switch (logical_type) {
 #define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
@@ -507,9 +553,19 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP
                                        ColumnSelectVector& select_vector) {
     size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
     if (_has_dict) {
+        if (doris_column->is_column_dictionary() &&
+            assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
+            assert_cast<ColumnDictI32&>(*doris_column)
+                    .insert_many_dict_data(&_dict_items[0], _dict_items.size());
+        }
         _indexes.resize(non_null_size);
         _index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
     }
+
+    if (doris_column->is_column_dictionary()) {
+        return _decode_dict_values(doris_column, select_vector);
+    }
+
     TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
     switch (logical_type) {
     case TypeIndex::String:
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index 112be6a223..a3d3e0d8a4 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -178,11 +178,23 @@ public:
     }
 
 protected:
+    /**
+     * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type,
+     * and the coded values must be read into _indexes previously.
+     */
+    Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
+
     int32_t _type_length;
     Slice* _data = nullptr;
     uint32_t _offset = 0;
     FieldSchema* _field_schema = nullptr;
     std::unique_ptr<DecodeParams> _decode_params = nullptr;
+
+    // For dictionary encoding
+    bool _has_dict = false;
+    std::unique_ptr<uint8_t[]> _dict = nullptr;
+    std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
+    std::vector<uint32_t> _indexes;
 };
 
 template <typename DecimalPrimitiveType>
@@ -253,12 +265,9 @@ protected:
     if (!_has_dict) _offset += _type_length
 
     tparquet::Type::type _physical_type;
+
     // For dictionary encoding
-    bool _has_dict = false;
-    std::unique_ptr<uint8_t[]> _dict = nullptr;
     std::vector<char*> _dict_items;
-    std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
-    std::vector<uint32_t> _indexes;
 };
 
 template <typename Numeric>
@@ -567,11 +576,7 @@ protected:
                                   ColumnSelectVector& select_vector);
 
     // For dictionary encoding
-    bool _has_dict = false;
-    std::unique_ptr<uint8_t[]> _dict = nullptr;
     std::vector<StringRef> _dict_items;
-    std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
-    std::vector<uint32_t> _indexes;
 };
 
 template <typename DecimalPrimitiveType>
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 2d398b5409..e1c2c0af64 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -176,6 +176,7 @@ Status ColumnChunkReader::_decode_dict_page() {
                                            header.dictionary_page_header.num_values));
     _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] = std::move(page_decoder);
 
+    _has_dict = true;
     return Status::OK();
 }
 
@@ -218,6 +219,9 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) {
 Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
                                         ColumnSelectVector& select_vector) {
     SCOPED_RAW_TIMER(&_statistics.decode_value_time);
+    if (UNLIKELY(doris_column->is_column_dictionary() && !_has_dict)) {
+        return Status::IOError("Not dictionary coded");
+    }
     if (UNLIKELY(_remaining_num_values < select_vector.num_values())) {
         return Status::IOError("Decode too many values in current page");
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index f76c05b735..1bb1d017ce 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -168,6 +168,7 @@ private:
     Slice _page_data;
     std::unique_ptr<uint8_t[]> _decompress_buf;
     size_t _decompress_buf_size = 0;
+    bool _has_dict = false;
     Decoder* _page_decoder = nullptr;
     // Map: encoding -> Decoder
     // Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index e3f841f1b1..0d7df17e4b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -257,8 +257,14 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
 
     // filter data in predicate columns, and remove filter column
     if (select_vector.has_filter()) {
-        Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id,
-                            origin_column_num);
+        if (block->columns() == origin_column_num) {
+            // the whole row group has been filtered by _lazy_read_ctx.vconjunct_ctx, and batch_eof is
+            // generated from next batch, so the filter column is removed ahead.
+            DCHECK_EQ(block->rows(), 0);
+        } else {
+            Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id,
+                                origin_column_num);
+        }
     } else {
         Block::erase_useless_column(block, origin_column_num);
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6613b1f987..a27eddce71 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -141,7 +141,6 @@ Status ParquetReader::_open_file() {
                                                         _scan_range.file_size, 0, _file_reader));
     }
     if (_file_metadata == nullptr) {
-        SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
         RETURN_IF_ERROR(_file_reader->open());
         if (_file_reader->size() == 0) {
             return Status::EndOfFile("Empty Parquet File");
@@ -341,31 +340,31 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
 }
 
 Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
-    if (_current_group_reader == nullptr) {
+    if (_current_group_reader == nullptr || _row_group_eof) {
         if (_read_row_groups.size() > 0) {
             RETURN_IF_ERROR(_next_row_group_reader());
         } else {
+            _current_group_reader.reset(nullptr);
+            _row_group_eof = true;
             *read_rows = 0;
             *eof = true;
             return Status::OK();
         }
     }
     DCHECK(_current_group_reader != nullptr);
-    bool batch_eof = false;
     {
         SCOPED_RAW_TIMER(&_statistics.column_read_time);
         RETURN_IF_ERROR(
-                _current_group_reader->next_batch(block, _batch_size, read_rows, &batch_eof));
+                _current_group_reader->next_batch(block, _batch_size, read_rows, &_row_group_eof));
     }
-    if (batch_eof) {
+    if (_row_group_eof) {
         auto column_st = _current_group_reader->statistics();
         _column_statistics.merge(column_st);
         _statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows();
-        Status st = _next_row_group_reader();
-        if (st.is_end_of_file()) {
+        if (_read_row_groups.size() == 0) {
             *eof = true;
-        } else if (!st.ok()) {
-            return st;
+        } else {
+            *eof = false;
         }
     }
     return Status::OK();
@@ -390,6 +389,7 @@ RowGroupReader::PositionDeleteContext ParquetReader::_get_position_delete_ctx(
 
 Status ParquetReader::_next_row_group_reader() {
     if (_read_row_groups.empty()) {
+        _row_group_eof = true;
         _current_group_reader.reset(nullptr);
         return Status::EndOfFile("No next RowGroupReader");
     }
@@ -406,6 +406,7 @@ Status ParquetReader::_next_row_group_reader() {
     _current_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns,
                                                    row_group_index.row_group_id, row_group, _ctz,
                                                    position_delete_ctx, _lazy_read_ctx));
+    _row_group_eof = false;
     return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges,
                                        _col_offsets);
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 2bfc74f823..19a7b2534d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -154,7 +154,9 @@ private:
     std::unique_ptr<FileReader> _file_reader = nullptr;
     std::shared_ptr<FileMetaData> _file_metadata;
     const tparquet::FileMetaData* _t_metadata;
-    std::unique_ptr<RowGroupReader> _current_group_reader;
+    std::unique_ptr<RowGroupReader> _current_group_reader = nullptr;
+    // read to the end of current reader
+    bool _row_group_eof = true;
     int32_t _total_groups;                  // num of groups(stripes) of a parquet(orc) file
     std::map<std::string, int> _map_column; // column-name <---> column-index
     std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 332aa44e54..1bdae3a6e2 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -26,10 +26,16 @@ namespace doris::vectorized {
 
 const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
 const std::string ICEBERG_ROW_POS = "pos";
+const std::string ICEBERG_FILE_PATH = "file_path";
 
 IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
-                                       RuntimeState* state, const TFileScanRangeParams& params)
-        : TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) {
+                                       RuntimeState* state, const TFileScanRangeParams& params,
+                                       const TFileRangeDesc& range)
+        : TableFormatReader(file_format_reader),
+          _profile(profile),
+          _state(state),
+          _params(params),
+          _range(range) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
     _iceberg_profile.num_delete_files =
@@ -38,12 +44,8 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim
             ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile);
     _iceberg_profile.delete_files_read_time =
             ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
-}
-
-IcebergTableReader::~IcebergTableReader() {
-    if (_data_path_conjunct_ctx != nullptr) {
-        _data_path_conjunct_ctx->close(_state);
-    }
+    _iceberg_profile.delete_rows_sort_time =
+            ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
 }
 
 Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
@@ -76,15 +78,6 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
     }
     if (delete_file_type == POSITION_DELETE) {
         // position delete
-        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
-        auto row_desc = RowDescriptor(_state->desc_tbl(),
-                                      std::vector<TupleId>({table_desc.delete_table_tuple_id}),
-                                      std::vector<bool>({false}));
-        RETURN_IF_ERROR(VExpr::create_expr_tree(_state->obj_pool(), table_desc.file_select_conjunct,
-                                                &_data_path_conjunct_ctx));
-        RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc));
-        RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state));
-
         ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
         RowRange whole_range = parquet_reader->get_whole_range();
         bool init_schema = false;
@@ -100,6 +93,7 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
                 delete_rows_iter++;
                 continue;
             }
+            SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
             std::vector<int64_t>& delete_rows = *delete_rows_iter;
             TFileRangeDesc delete_range;
             delete_range.path = delete_file.path;
@@ -112,49 +106,90 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
                 delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
                 init_schema = true;
             }
-            RETURN_IF_ERROR(delete_reader.init_reader(delete_file_col_names, nullptr,
-                                                      _data_path_conjunct_ctx, false));
+            std::string data_file_path = _range.path;
+            // the path in _range is remove the namenode prefix,
+            // and the file_path in delete file is full path, so we should add it back.
+            if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
+                std::string fs_name = _params.hdfs_params.fs_name;
+                if (!starts_with(data_file_path, fs_name)) {
+                    data_file_path = fs_name + data_file_path;
+                }
+            }
+            RETURN_IF_ERROR(
+                    delete_reader.init_reader(delete_file_col_names, nullptr, nullptr, false));
             std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
                     partition_columns;
             std::unordered_map<std::string, VExprContext*> missing_columns;
             delete_reader.set_fill_columns(partition_columns, missing_columns);
 
             bool eof = false;
+            // We can only know whether a parquet file is encoded in dictionary after reading the first block,
+            // so we assume it dictionary encoded first, and reset it false if error thrown.
+            bool dictionary_coded = true;
             while (!eof) {
                 Block block = Block();
                 for (int i = 0; i < delete_file_col_names.size(); ++i) {
                     DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
-                            delete_file_col_types[i], true);
-                    MutableColumnPtr data_column = data_type->create_column();
-                    block.insert(ColumnWithTypeAndName(std::move(data_column), data_type,
-                                                       delete_file_col_names[i]));
+                            delete_file_col_types[i], false);
+                    if (delete_file_col_names[i] == ICEBERG_FILE_PATH && dictionary_coded) {
+                        // the dictionary data in ColumnDictI32 is referenced by StringValue, it does keep
+                        // the dictionary data in its life circle, so the upper caller should keep the
+                        // dictionary data alive after ColumnDictI32.
+                        MutableColumnPtr dict_column = ColumnDictI32::create();
+                        block.insert(ColumnWithTypeAndName(std::move(dict_column), data_type,
+                                                           delete_file_col_names[i]));
+                    } else {
+                        MutableColumnPtr data_column = data_type->create_column();
+                        block.insert(ColumnWithTypeAndName(std::move(data_column), data_type,
+                                                           delete_file_col_names[i]));
+                    }
                 }
                 eof = false;
                 size_t read_rows = 0;
-                RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
+                Status st = delete_reader.get_next_block(&block, &read_rows, &eof);
+                if (!st.ok()) {
+                    if (st.to_string() == "[IO_ERROR]Not dictionary coded") {
+                        dictionary_coded = false;
+                        continue;
+                    }
+                    return st;
+                }
                 if (read_rows > 0) {
-                    auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS);
-                    ColumnPtr pos_column = pos_type_column.column;
-                    using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
-                    if (pos_type_column.type->is_nullable()) {
-                        pos_column = assert_cast<const ColumnNullable&>(*pos_column)
-                                             .get_nested_column_ptr();
+                    ColumnPtr path_column = block.get_by_name(ICEBERG_FILE_PATH).column;
+                    DCHECK_EQ(path_column->size(), read_rows);
+                    std::pair<int, int> path_range;
+                    if (dictionary_coded) {
+                        path_range = _binary_search(assert_cast<const ColumnDictI32&>(*path_column),
+                                                    data_file_path);
+                    } else {
+                        path_range = _binary_search(assert_cast<const ColumnString&>(*path_column),
+                                                    data_file_path);
                     }
-                    const int64_t* src_data =
-                            assert_cast<const ColumnType&>(*pos_column).get_data().data();
-                    const int64_t* src_data_end = src_data + read_rows;
-                    const int64_t* cpy_start =
-                            std::lower_bound(src_data, src_data_end, whole_range.first_row);
-                    const int64_t* cpy_end =
-                            std::lower_bound(cpy_start, src_data_end, whole_range.last_row);
-                    int64_t cpy_count = cpy_end - cpy_start;
-
-                    if (cpy_count > 0) {
-                        int64_t origin_size = delete_rows.size();
-                        delete_rows.resize(origin_size + cpy_count);
-                        int64_t* dest_position = &delete_rows[origin_size];
-                        memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
-                        num_delete_rows += cpy_count;
+
+                    int skip_count = path_range.first;
+                    int valid_count = path_range.second;
+                    if (valid_count > 0) {
+                        // delete position
+                        ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column;
+                        CHECK_EQ(pos_column->size(), read_rows);
+                        using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
+                        const int64_t* src_data =
+                                assert_cast<const ColumnType&>(*pos_column).get_data().data() +
+                                skip_count;
+                        const int64_t* src_data_end = src_data + valid_count;
+                        const int64_t* cpy_start =
+                                std::lower_bound(src_data, src_data_end, whole_range.first_row);
+                        const int64_t* cpy_end =
+                                std::lower_bound(cpy_start, src_data_end, whole_range.last_row);
+                        int64_t cpy_count = cpy_end - cpy_start;
+
+                        if (cpy_count > 0) {
+                            int64_t origin_size = delete_rows.size();
+                            delete_rows.resize(origin_size + cpy_count);
+                            int64_t* dest_position = &delete_rows[origin_size];
+                            memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
+                            num_delete_rows += cpy_count;
+                        }
                     }
                 }
             }
@@ -168,6 +203,7 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
                     iter++;
                 }
             }
+            SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
             _merge_sort(delete_rows_list, num_delete_rows);
             parquet_reader->set_delete_rows(&_delete_rows);
             COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
@@ -178,6 +214,64 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
     return Status::OK();
 }
 
+std::pair<int, int> IcebergTableReader::_binary_search(const ColumnDictI32& file_path_column,
+                                                       const std::string& data_file_path) {
+    size_t read_rows = file_path_column.get_data().size();
+
+    int data_file_code = file_path_column.find_code(StringValue(data_file_path));
+    if (data_file_code == -2) { // -1 is null code
+        return std::make_pair(read_rows, 0);
+    }
+
+    const int* coded_path = file_path_column.get_data().data();
+    const int* coded_path_end = coded_path + read_rows;
+    const int* path_start = std::lower_bound(coded_path, coded_path_end, data_file_code);
+    const int* path_end = std::lower_bound(path_start, coded_path_end, data_file_code + 1);
+    int skip_count = path_start - coded_path;
+    int valid_count = path_end - path_start;
+
+    return std::make_pair(skip_count, valid_count);
+}
+
+std::pair<int, int> IcebergTableReader::_binary_search(const ColumnString& file_path_column,
+                                                       const std::string& data_file_path) {
+    const int read_rows = file_path_column.size();
+    if (read_rows == 0) {
+        return std::make_pair(0, 0);
+    }
+    StringRef data_file(data_file_path);
+
+    int left = 0;
+    int right = read_rows - 1;
+    if (file_path_column.get_data_at(left) > data_file ||
+        file_path_column.get_data_at(right) < data_file) {
+        return std::make_pair(read_rows, 0);
+    }
+    while (left < right) {
+        int mid = (left + right) / 2;
+        if (file_path_column.get_data_at(mid) < data_file) {
+            left = mid;
+        } else {
+            right = mid;
+        }
+    }
+    if (file_path_column.get_data_at(left) == data_file) {
+        int start = left;
+        int end = read_rows - 1;
+        while (start < end) {
+            int pivot = (start + end) / 2;
+            if (file_path_column.get_data_at(pivot) > data_file) {
+                end = pivot;
+            } else {
+                start = pivot;
+            }
+        }
+        return std::make_pair(left, end - left + 1);
+    } else {
+        return std::make_pair(read_rows, 0);
+    }
+}
+
 void IcebergTableReader::_merge_sort(std::list<std::vector<int64_t>>& delete_rows_list,
                                      int64_t num_delete_rows) {
     if (delete_rows_list.empty()) {
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index 0a9d4ef71f..d689fbcf51 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -20,6 +20,7 @@
 #include <queue>
 
 #include "table_format_reader.h"
+#include "vec/columns/column_dictionary.h"
 #include "vec/exec/format/generic_reader.h"
 #include "vec/exec/format/parquet/parquet_common.h"
 #include "vec/exprs/vexpr.h"
@@ -29,8 +30,9 @@ namespace doris::vectorized {
 class IcebergTableReader : public TableFormatReader {
 public:
     IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
-                       RuntimeState* state, const TFileScanRangeParams& params);
-    ~IcebergTableReader() override;
+                       RuntimeState* state, const TFileScanRangeParams& params,
+                       const TFileRangeDesc& range);
+    ~IcebergTableReader() override = default;
 
     Status init_row_filters(const TFileRangeDesc& range) override;
 
@@ -51,14 +53,34 @@ private:
         RuntimeProfile::Counter* num_delete_files;
         RuntimeProfile::Counter* num_delete_rows;
         RuntimeProfile::Counter* delete_files_read_time;
+        RuntimeProfile::Counter* delete_rows_sort_time;
     };
 
+    /**
+     * https://iceberg.apache.org/spec/#position-delete-files
+     * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
+     * Sorting by file_path allows filter pushdown by file in columnar storage formats.
+     * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
+     *
+     * So, use merge-sort to merge delete rows from different files.
+     */
     void _merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows);
 
+    /**
+     * Delete rows is sorted by file_path, using binary-search to locate the right delete rows for current data file.
+     * @return a pair of \<skip_count, valid_count\>,
+     * and the range of [skip_count, skip_count + valid_count) is the delete rows for current data file.
+     */
+    std::pair<int, int> _binary_search(const ColumnDictI32& file_path_column,
+                                       const std::string& data_file_path);
+
+    std::pair<int, int> _binary_search(const ColumnString& file_path_column,
+                                       const std::string& data_file_path);
+
     RuntimeProfile* _profile;
     RuntimeState* _state;
     const TFileScanRangeParams& _params;
-    VExprContext* _data_path_conjunct_ctx = nullptr;
+    const TFileRangeDesc& _range;
     IcebergProfile _iceberg_profile;
     std::vector<int64_t> _delete_rows;
 };
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index e675583bd3..4043174a4a 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -493,8 +493,8 @@ Status VFileScanner::_get_next_reader() {
             if (range.__isset.table_format_params &&
                 range.table_format_params.table_format_type == "iceberg") {
                 IcebergTableReader* iceberg_reader = new IcebergTableReader(
-                        (GenericReader*)parquet_reader, _profile, _state, _params);
-                iceberg_reader->init_row_filters(range);
+                        (GenericReader*)parquet_reader, _profile, _state, _params, range);
+                RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
                 _cur_reader.reset((GenericReader*)iceberg_reader);
             } else {
                 _cur_reader.reset((GenericReader*)parquet_reader);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index e51b39144f..46adddfde6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -18,19 +18,9 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.BaseTableRef;
-import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.StringLiteral;
-import org.apache.doris.analysis.TableName;
-import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.HMSResource;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -41,12 +31,8 @@ import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TIcebergDeleteFileDesc;
 import org.apache.doris.thrift.TIcebergFileDesc;
-import org.apache.doris.thrift.TIcebergTable;
-import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableFormatFileDesc;
-import org.apache.doris.thrift.TTableType;
 
-import lombok.Data;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputSplit;
@@ -76,10 +62,6 @@ import java.util.OptionalLong;
 public class IcebergScanProvider extends HiveScanProvider {
 
     private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
-    public static final String V2_DELETE_TBL = "iceberg#delete#tbl";
-    public static final String V2_DELETE_DB = "iceberg#delete#db";
-    private static final DeleteFileTempTable scanDeleteTable =
-            new DeleteFileTempTable(TableIf.TableType.HMS_EXTERNAL_TABLE);
     private final Analyzer analyzer;
 
     public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, TupleDescriptor desc,
@@ -98,7 +80,6 @@ public class IcebergScanProvider extends HiveScanProvider {
         if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
             fileDesc.setContent(FileContent.DATA.id());
         } else {
-            setPathSelectConjunct(fileDesc, icebergSplit);
             for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
                 TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
                 deleteFileDesc.setPath(filter.getDeleteFilePath());
@@ -127,19 +108,6 @@ public class IcebergScanProvider extends HiveScanProvider {
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
 
-    private static void setPathSelectConjunct(TIcebergFileDesc fileDesc, IcebergSplit icebergSplit)
-                throws UserException {
-        BaseTableRef tableRef = icebergSplit.getDeleteTableRef();
-        fileDesc.setDeleteTableTupleId(tableRef.getDesc().getId().asInt());
-        SlotRef lhs = new SlotRef(tableRef.getName(), DeleteFileTempTable.DATA_FILE_PATH);
-        lhs.analyze(icebergSplit.getAnalyzer());
-        lhs.getDesc().setIsMaterialized(true);
-        StringLiteral rhs = new StringLiteral(icebergSplit.getPath().toUri().toString());
-        BinaryPredicate pathSelectConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs);
-        pathSelectConjunct.analyze(icebergSplit.getAnalyzer());
-        fileDesc.setFileSelectConjunct(pathSelectConjunct.treeToThrift());
-    }
-
     @Override
     public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
         TFileFormatType type;
@@ -173,14 +141,6 @@ public class IcebergScanProvider extends HiveScanProvider {
         }
         List<InputSplit> splits = new ArrayList<>();
         int formatVersion = ((BaseTable) table).operations().current().formatVersion();
-        BaseTableRef tableRef = null;
-        if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
-            TableName fullName = analyzer.getFqTableName(scanDeleteTable.getTableName());
-            fullName.analyze(analyzer);
-            TableRef ref = new TableRef(fullName, fullName.toString(), null);
-            tableRef = new BaseTableRef(ref, scanDeleteTable, scanDeleteTable.getTableName());
-            tableRef.analyze(analyzer);
-        }
         for (FileScanTask task : scan.planFiles()) {
             for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
                 String dataFilePath = spitTask.file().path().toString();
@@ -189,7 +149,6 @@ public class IcebergScanProvider extends HiveScanProvider {
                 split.setFormatVersion(formatVersion);
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     split.setDeleteFileFilters(getDeleteFileFilters(spitTask));
-                    split.setDeleteTableRef(tableRef);
                 }
                 split.setTableFormatType(TableFormatType.ICEBERG);
                 split.setAnalyzer(analyzer);
@@ -239,32 +198,4 @@ public class IcebergScanProvider extends HiveScanProvider {
     public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
         return Collections.emptyList();
     }
-
-    @Data
-    static class DeleteFileTempTable extends ExternalTable {
-        public static final String DATA_FILE_PATH = "file_path";
-        private final TableName tableName;
-        private final List<Column> fullSchema = new ArrayList<>();
-
-        public DeleteFileTempTable(TableType type) {
-            super(0, V2_DELETE_TBL, null, V2_DELETE_DB, type);
-            this.tableName = new TableName(null, V2_DELETE_DB, V2_DELETE_TBL);
-            Column dataFilePathCol = new Column(DATA_FILE_PATH, PrimitiveType.STRING, true);
-            this.fullSchema.add(dataFilePathCol);
-        }
-
-        @Override
-        public List<Column> getFullSchema() {
-            return fullSchema;
-        }
-
-        @Override
-        public TTableDescriptor toThrift() {
-            TIcebergTable tIcebergTable = new TIcebergTable(V2_DELETE_DB, V2_DELETE_TBL, new HashMap<>());
-            TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE,
-                    fullSchema.size(), 0, getName(), "");
-            tTableDescriptor.setIcebergTable(tIcebergTable);
-            return tTableDescriptor;
-        }
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
index 14fce6caf8..b9607a7f00 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
@@ -18,7 +18,6 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.BaseTableRef;
 
 import lombok.Data;
 import org.apache.hadoop.fs.Path;
@@ -34,7 +33,6 @@ public class IcebergSplit extends HiveSplit {
     private Analyzer analyzer;
     private String dataFilePath;
     private Integer formatVersion;
-    private BaseTableRef deleteTableRef;
     private List<IcebergDeleteFileFilter> deleteFileFilters;
 }
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 613c884a06..ff97361c6e 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -274,7 +274,9 @@ struct TIcebergFileDesc {
     2: optional i32 content;
     // When open a delete file, filter the data file path with the 'file_path' property
     3: optional list<TIcebergDeleteFileDesc> delete_files;
+    // Deprecated
     4: optional Types.TTupleId delete_table_tuple_id;
+    // Deprecated
     5: optional Exprs.TExpr file_select_conjunct;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org