You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/24 09:10:06 UTC

[doris] 14/15: [fix](multi-catalog) fix and optimize iceberg v2 reader (#15274)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f251343fa42878e4e4454723c8d92924eca381ea
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Sat Dec 24 16:02:07 2022 +0800

    [fix](multi-catalog) fix and optimize iceberg v2 reader (#15274)
    
    Fix three bugs when read iceberg v2 tables:
    1. The `delete position` in `delete file` represents the position of delete row in the entire file, but the `read range` in
    `RowGroupReader` represents the position in current row group. Therefore, we need to subtract the position of first
    row of current row group from `delete position`.
    2. When only reading the partition columns, `RowGroupReader` skips processing the `delete position`.
    3. If the `delete position` has delete all rows in a row group, the `read range` is empty, but we read the whole row
    group in such case.
    
    Optimize four performance issues:
    1. We change `delete position` to `delete range`, and then merge `delete range` and `read range` into the final read
    ranges. This process is too tedious and time-consuming. . we can merge `delete position` and `read range` directly.
    2. `delete position` is ordered in a `delete file`, so we can use merge-sort, instead of ordered-set.
    3. Initialize `RowGroupReader` when reading, instead of initialize all row groups when opening a `ParquetReader`, to
    save memory usage, and the same as `IcebergReader`.
    4. Change the recursive call of `_do_lazy_read` to loop logic.
---
 be/src/service/internal_service.cpp                |   2 +-
 be/src/vec/exec/format/csv/csv_reader.cpp          |   4 +-
 be/src/vec/exec/format/csv/csv_reader.h            |   4 +-
 be/src/vec/exec/format/generic_reader.h            |   6 +-
 be/src/vec/exec/format/json/new_json_reader.cpp    |   4 +-
 be/src/vec/exec/format/json/new_json_reader.h      |   4 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         |   4 +-
 be/src/vec/exec/format/orc/vorc_reader.h           |   4 +-
 be/src/vec/exec/format/parquet/parquet_common.h    |   2 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  30 +--
 .../exec/format/parquet/vparquet_column_reader.h   |  19 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  | 199 +++++++++-----
 .../exec/format/parquet/vparquet_group_reader.h    |  83 ++++--
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 243 +++++++----------
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  30 ++-
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 293 ++++++++++-----------
 be/src/vec/exec/format/table/iceberg_reader.h      |  30 +--
 be/src/vec/exec/format/table/table_format_reader.h |   5 +-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |  12 +-
 19 files changed, 512 insertions(+), 466 deletions(-)

diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 28b9b8f4ca..381782af85 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -478,7 +478,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
     }
     std::vector<std::string> col_names;
     std::vector<TypeDescriptor> col_types;
-    st = reader->get_parsered_schema(&col_names, &col_types);
+    st = reader->get_parsed_schema(&col_names, &col_types);
     if (!st.ok()) {
         LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg();
         st.to_protobuf(result->mutable_status());
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 2e710ca44f..48f8d84dc7 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -202,8 +202,8 @@ Status CsvReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* n
     return Status::OK();
 }
 
-Status CsvReader::get_parsered_schema(std::vector<std::string>* col_names,
-                                      std::vector<TypeDescriptor>* col_types) {
+Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                    std::vector<TypeDescriptor>* col_types) {
     size_t read_line = 0;
     bool is_parse_name = false;
     RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name));
diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h
index 5083c00d2d..5bb14523e3 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -49,8 +49,8 @@ public:
     // 1. header_type is empty, get schema from first line.
     // 2. header_type is CSV_WITH_NAMES, get schema from first line.
     // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line.
-    Status get_parsered_schema(std::vector<std::string>* col_names,
-                               std::vector<TypeDescriptor>* col_types) override;
+    Status get_parsed_schema(std::vector<std::string>* col_names,
+                             std::vector<TypeDescriptor>* col_types) override;
 
 private:
     // used for stream/broker load of csv file.
diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h
index dd2bdd249c..30e93aacd8 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -39,9 +39,9 @@ public:
         return Status::NotSupported("get_columns is not implemented");
     }
 
-    virtual Status get_parsered_schema(std::vector<std::string>* col_names,
-                                       std::vector<TypeDescriptor>* col_types) {
-        return Status::NotSupported("get_parser_schema is not implemented for this reader.");
+    virtual Status get_parsed_schema(std::vector<std::string>* col_names,
+                                     std::vector<TypeDescriptor>* col_types) {
+        return Status::NotSupported("get_parsed_schema is not implemented for this reader.");
     }
     virtual ~GenericReader() = default;
 
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp
index 2228f9701d..68a3f089e5 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -139,8 +139,8 @@ Status NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
-Status NewJsonReader::get_parsered_schema(std::vector<std::string>* col_names,
-                                          std::vector<TypeDescriptor>* col_types) {
+Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                        std::vector<TypeDescriptor>* col_types) {
     RETURN_IF_ERROR(_get_range_params());
 
     RETURN_IF_ERROR(_open_file_reader());
diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h
index 6b003c30fe..5a057d32fe 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -48,8 +48,8 @@ public:
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
-    Status get_parsered_schema(std::vector<std::string>* col_names,
-                               std::vector<TypeDescriptor>* col_types) override;
+    Status get_parsed_schema(std::vector<std::string>* col_names,
+                             std::vector<TypeDescriptor>* col_types) override;
 
 private:
     Status _get_range_params();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 4998bc2486..2748160e97 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -176,8 +176,8 @@ Status OrcReader::init_reader(
     return Status::OK();
 }
 
-Status OrcReader::get_parsered_schema(std::vector<std::string>* col_names,
-                                      std::vector<TypeDescriptor>* col_types) {
+Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                    std::vector<TypeDescriptor>* col_types) {
     if (_file_reader == nullptr) {
         std::unique_ptr<FileReader> inner_reader;
         RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path,
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h
index 9b0e4b6a44..98c2fc7c02 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -99,8 +99,8 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
-    Status get_parsered_schema(std::vector<std::string>* col_names,
-                               std::vector<TypeDescriptor>* col_types) override;
+    Status get_parsed_schema(std::vector<std::string>* col_names,
+                             std::vector<TypeDescriptor>* col_types) override;
 
 private:
     struct OrcProfile {
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index 611ce969f1..112be6a223 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -41,7 +41,7 @@ namespace doris::vectorized {
 using level_t = int16_t;
 
 struct RowRange {
-    RowRange() {}
+    RowRange() = default;
     RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {}
 
     int64_t first_row;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index c0557c482a..75f43b4730 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -19,7 +19,6 @@
 
 #include <common/status.h>
 #include <gen_cpp/parquet_types.h>
-#include <vec/columns/columns_number.h>
 
 #include "schema_desc.h"
 #include "vec/data_types/data_type_array.h"
@@ -28,7 +27,9 @@
 namespace doris::vectorized {
 
 Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
-                                   const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
+                                   const ParquetReadColumn& column,
+                                   const tparquet::RowGroup& row_group,
+                                   const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
                                    std::unique_ptr<ParquetColumnReader>& reader,
                                    size_t max_buf_size) {
     if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
@@ -36,13 +37,13 @@ Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
     }
     if (field->type.type == TYPE_ARRAY) {
         tparquet::ColumnChunk chunk = row_group.columns[field->children[0].physical_column_index];
-        ArrayColumnReader* array_reader = new ArrayColumnReader(ctz);
+        ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz);
         array_reader->init_column_metadata(chunk);
         RETURN_IF_ERROR(array_reader->init(file, field, &chunk, max_buf_size));
         reader.reset(array_reader);
     } else {
         tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index];
-        ScalarColumnReader* scalar_reader = new ScalarColumnReader(ctz);
+        ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, ctz);
         scalar_reader->init_column_metadata(chunk);
         RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, max_buf_size));
         reader.reset(scalar_reader);
@@ -61,23 +62,19 @@ void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chun
 
 void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end_index,
                                                 std::list<RowRange>& read_ranges) {
-    if (_row_ranges->empty()) {
-        read_ranges.emplace_back(start_index, end_index);
-        return;
-    }
     int index = _row_range_index;
-    while (index < _row_ranges->size()) {
-        const RowRange& row_range = (*_row_ranges)[index];
-        if (row_range.last_row <= start_index) {
+    while (index < _row_ranges.size()) {
+        const RowRange& read_range = _row_ranges[index];
+        if (read_range.last_row <= start_index) {
             index++;
             _row_range_index++;
             continue;
         }
-        if (row_range.first_row >= end_index) {
+        if (read_range.first_row >= end_index) {
             break;
         }
-        int64_t start = row_range.first_row < start_index ? start_index : row_range.first_row;
-        int64_t end = row_range.last_row < end_index ? row_range.last_row : end_index;
+        int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row;
+        int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index;
         read_ranges.emplace_back(start, end);
         index++;
     }
@@ -216,7 +213,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
             // lazy read
             size_t remaining_num_values = 0;
             for (auto& range : read_ranges) {
-                remaining_num_values = range.last_row - range.first_row;
+                remaining_num_values += range.last_row - range.first_row;
             }
             if (batch_size >= remaining_num_values &&
                 select_vector.can_filter_all(remaining_num_values)) {
@@ -330,6 +327,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr&
         data_column = doris_column->assume_mutable();
     }
 
+    // generate array offset
     size_t real_batch_size = 0;
     size_t num_values = 0;
     std::vector<size_t> element_offsets;
@@ -339,7 +337,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr&
     level_t* definitions = _def_levels_buf.get();
     _chunk_reader->get_def_levels(definitions, num_values);
     _def_offset = 0;
-    // read_range   delete_row_range
+
     // generate the row ranges that should be read
     std::list<RowRange> read_ranges;
     _generate_read_ranges(_current_row_index, _current_row_index + real_batch_size, read_ranges);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index b7d062e7c6..de0ec185b9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -96,7 +96,8 @@ public:
         }
     };
 
-    ParquetColumnReader(cctz::time_zone* ctz) : _ctz(ctz) {};
+    ParquetColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
+            : _row_ranges(row_ranges), _ctz(ctz) {};
     virtual ~ParquetColumnReader() {
         if (_stream_reader != nullptr) {
             delete _stream_reader;
@@ -106,12 +107,12 @@ public:
     virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                                     ColumnSelectVector& select_vector, size_t batch_size,
                                     size_t* read_rows, bool* eof) = 0;
-    static Status create(FileReader* file, FieldSchema* field, const tparquet::RowGroup& row_group,
-                         cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader,
-                         size_t max_buf_size);
+    static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column,
+                         const tparquet::RowGroup& row_group,
+                         const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
+                         std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size);
     void init_column_metadata(const tparquet::ColumnChunk& chunk);
     void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; }
-    void set_row_ranges(const std::vector<RowRange>* row_ranges) { _row_ranges = row_ranges; };
     Statistics statistics() {
         return Statistics(_stream_reader->statistics(), _chunk_reader->statistics(),
                           _decode_null_map_time);
@@ -124,7 +125,7 @@ protected:
 
     BufferedFileStreamReader* _stream_reader;
     std::unique_ptr<ParquetColumnMetadata> _metadata;
-    const std::vector<RowRange>* _row_ranges;
+    const std::vector<RowRange>& _row_ranges;
     cctz::time_zone* _ctz;
     std::unique_ptr<ColumnChunkReader> _chunk_reader;
     tparquet::OffsetIndex* _offset_index;
@@ -135,7 +136,8 @@ protected:
 
 class ScalarColumnReader : public ParquetColumnReader {
 public:
-    ScalarColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {};
+    ScalarColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
+            : ParquetColumnReader(row_ranges, ctz) {};
     ~ScalarColumnReader() override { close(); };
     Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
                 size_t max_buf_size);
@@ -150,7 +152,8 @@ public:
 
 class ArrayColumnReader : public ParquetColumnReader {
 public:
-    ArrayColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {};
+    ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
+            : ParquetColumnReader(row_ranges, ctz) {};
     ~ArrayColumnReader() override { close(); };
     Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
                 size_t max_buf_size);
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 42baf3a283..24441896a9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -23,26 +23,31 @@
 
 namespace doris::vectorized {
 
+const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
+
 RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
                                const std::vector<ParquetReadColumn>& read_columns,
-                               const RowGroupIndex& row_group_idx,
-                               const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
+                               const int32_t row_group_id, const tparquet::RowGroup& row_group,
+                               cctz::time_zone* ctz,
+                               const PositionDeleteContext& position_delete_ctx,
                                const LazyReadContext& lazy_read_ctx)
         : _file_reader(file_reader),
           _read_columns(read_columns),
-          _row_group_idx(row_group_idx),
+          _row_group_id(row_group_id),
           _row_group_meta(row_group),
           _remaining_rows(row_group.num_rows),
           _ctz(ctz),
+          _position_delete_ctx(position_delete_ctx),
           _lazy_read_ctx(lazy_read_ctx) {}
 
 RowGroupReader::~RowGroupReader() {
     _column_readers.clear();
 }
 
-Status RowGroupReader::init(const FieldDescriptor& schema,
+Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
                             std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
-    if (_read_columns.size() == 0) {
+    _merge_read_ranges(row_ranges);
+    if (_read_columns.empty()) {
         // Query task that only select columns in path.
         return Status::OK();
     }
@@ -52,15 +57,15 @@ Status RowGroupReader::init(const FieldDescriptor& schema,
     for (auto& read_col : _read_columns) {
         auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
         std::unique_ptr<ParquetColumnReader> reader;
-        RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta, _ctz,
-                                                    reader, max_buf_size));
+        RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta,
+                                                    _read_ranges, _ctz, reader, max_buf_size));
         auto col_iter = col_offsets.find(read_col._parquet_col_id);
         if (col_iter != col_offsets.end()) {
             tparquet::OffsetIndex oi = col_iter->second;
             reader->add_offset_index(&oi);
         }
         if (reader == nullptr) {
-            VLOG_DEBUG << "Init row group(" << _row_group_idx.row_group_id << ") reader failed";
+            VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
             return Status::Corruption("Init row group reader failed");
         }
         _column_readers[read_col._file_slot_name] = std::move(reader);
@@ -68,17 +73,11 @@ Status RowGroupReader::init(const FieldDescriptor& schema,
     return Status::OK();
 }
 
-void RowGroupReader::set_row_ranges(const std::vector<doris::vectorized::RowRange>& row_ranges) {
-    for (auto& read_col : _read_columns) {
-        _column_readers[read_col._file_slot_name]->set_row_ranges(&row_ranges);
-    }
-}
-
 Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
-                                  bool* _batch_eof) {
+                                  bool* batch_eof) {
     // Process external table query task that select columns are all from path.
     if (_read_columns.empty()) {
-        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, _batch_eof));
+        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
         RETURN_IF_ERROR(
                 _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
@@ -90,11 +89,11 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
     }
     if (_lazy_read_ctx.can_lazy_read) {
         // call _do_lazy_read recursively when current batch is skipped
-        return _do_lazy_read(block, batch_size, read_rows, _batch_eof);
+        return _do_lazy_read(block, batch_size, read_rows, batch_eof);
     } else {
         ColumnSelectVector run_length_vector;
         RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size,
-                                          read_rows, _batch_eof, run_length_vector));
+                                          read_rows, batch_eof, run_length_vector));
         RETURN_IF_ERROR(
                 _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
@@ -106,8 +105,39 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
     }
 }
 
+void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) {
+    // row_ranges is generated from page index, and the row index begins with 0 in each row group.
+    // _position_delete_ctx is generated from delete file, and the row index begins with 0 in parquet file
+    for (auto& range : row_ranges) {
+        int64_t start_row_id = range.first_row;
+        while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
+            const int64_t& delete_row_id =
+                    _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
+                    _position_delete_ctx.first_row_id;
+            if (delete_row_id < range.first_row) {
+                _position_delete_ctx.index++;
+            } else if (delete_row_id < range.last_row) {
+                if (start_row_id < delete_row_id) {
+                    _read_ranges.emplace_back(start_row_id, delete_row_id);
+                }
+                start_row_id = delete_row_id + 1;
+                _position_delete_ctx.index++;
+            } else { // delete_row_id >= range.last_row
+                if (start_row_id < range.last_row) {
+                    _read_ranges.emplace_back(start_row_id, range.last_row);
+                    start_row_id = range.last_row + 1;
+                }
+                break;
+            }
+        }
+        if (start_row_id < range.last_row) {
+            _read_ranges.emplace_back(start_row_id, range.last_row);
+        }
+    }
+}
+
 Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns,
-                                         size_t batch_size, size_t* read_rows, bool* _batch_eof,
+                                         size_t batch_size, size_t* read_rows, bool* batch_eof,
                                          ColumnSelectVector& select_vector) {
     size_t batch_read_rows = 0;
     bool has_eof = false;
@@ -138,57 +168,74 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st
         col_idx++;
     }
     *read_rows = batch_read_rows;
-    *_batch_eof = has_eof;
+    *batch_eof = has_eof;
     return Status::OK();
 }
 
 Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows,
                                      bool* batch_eof) {
-    // read predicate columns
+    std::unique_ptr<ColumnSelectVector> select_vector_ptr = nullptr;
     size_t pre_read_rows;
     bool pre_eof;
-    ColumnSelectVector run_length_vector;
-    RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size,
-                                      &pre_read_rows, &pre_eof, run_length_vector));
-    RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
-                                            _lazy_read_ctx.predicate_partition_columns));
-    RETURN_IF_ERROR(
-            _fill_missing_columns(block, pre_read_rows, _lazy_read_ctx.predicate_missing_columns));
-    // generate filter vector
-    if (_lazy_read_ctx.resize_first_column) {
-        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
-        // The following process may be tricky and time-consuming, but we have no other way.
-        block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
-    }
     size_t origin_column_num = block->columns();
     int filter_column_id = -1;
-    RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &filter_column_id));
-    ColumnPtr& sv = block->get_by_position(filter_column_id).column;
-    if (_lazy_read_ctx.resize_first_column) {
-        // We have to clean the first column to insert right data.
-        block->get_by_position(0).column->assume_mutable()->clear();
-    }
-
-    // build filter map
-    bool can_filter_all = false;
-    const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows, &can_filter_all);
-    ColumnSelectVector select_vector(filter_map, pre_read_rows, can_filter_all);
-    if (select_vector.filter_all() && !pre_eof) {
-        // If continuous batches are skipped, we can cache them to skip a whole page
-        _cached_filtered_rows += pre_read_rows;
-        for (auto& col : _lazy_read_ctx.predicate_columns) {
-            // clean block to read predicate columns
-            block->get_by_name(col).column->assume_mutable()->clear();
+    while (true) {
+        // read predicate columns
+        pre_read_rows = 0;
+        pre_eof = false;
+        ColumnSelectVector run_length_vector;
+        RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size,
+                                          &pre_read_rows, &pre_eof, run_length_vector));
+        if (pre_read_rows == 0) {
+            DCHECK_EQ(pre_eof, true);
+            break;
         }
-        for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
-            block->get_by_name(col.first).column->assume_mutable()->clear();
+        RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
+                                                _lazy_read_ctx.predicate_partition_columns));
+        RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
+                                              _lazy_read_ctx.predicate_missing_columns));
+        // generate filter vector
+        if (_lazy_read_ctx.resize_first_column) {
+            // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
+            // The following process may be tricky and time-consuming, but we have no other way.
+            block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
         }
-        for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
-            block->get_by_name(col.first).column->assume_mutable()->clear();
+        RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &filter_column_id));
+        ColumnPtr& sv = block->get_by_position(filter_column_id).column;
+        if (_lazy_read_ctx.resize_first_column) {
+            // We have to clean the first column to insert right data.
+            block->get_by_position(0).column->assume_mutable()->clear();
         }
-        Block::erase_useless_column(block, origin_column_num);
-        return _do_lazy_read(block, batch_size, read_rows, batch_eof);
+
+        // build filter map
+        bool can_filter_all = false;
+        const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows, &can_filter_all);
+        select_vector_ptr.reset(new ColumnSelectVector(filter_map, pre_read_rows, can_filter_all));
+        if (select_vector_ptr->filter_all() && !pre_eof) {
+            // If continuous batches are skipped, we can cache them to skip a whole page
+            _cached_filtered_rows += pre_read_rows;
+            for (auto& col : _lazy_read_ctx.predicate_columns) {
+                // clean block to read predicate columns
+                block->get_by_name(col).column->assume_mutable()->clear();
+            }
+            for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
+                block->get_by_name(col.first).column->assume_mutable()->clear();
+            }
+            for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
+                block->get_by_name(col.first).column->assume_mutable()->clear();
+            }
+            Block::erase_useless_column(block, origin_column_num);
+        } else {
+            break;
+        }
+    }
+    if (select_vector_ptr == nullptr) {
+        DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
+        *read_rows = 0;
+        *batch_eof = true;
     }
+
+    ColumnSelectVector& select_vector = *select_vector_ptr;
     std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
     if (_cached_filtered_rows != 0) {
         _rebuild_select_vector(select_vector, rebuild_filter_map, pre_read_rows);
@@ -345,15 +392,37 @@ Status RowGroupReader::_fill_missing_columns(
     return Status::OK();
 }
 
-Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof) {
-    if (batch_size < _remaining_rows) {
-        *read_rows = batch_size;
-        _remaining_rows -= batch_size;
-        *_batch_eof = false;
+Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof) {
+    if (_position_delete_ctx.has_filter) {
+        int64_t start_row_id = _position_delete_ctx.current_row_id;
+        int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size,
+                                      _position_delete_ctx.last_row_id);
+        int64_t num_delete_rows = 0;
+        while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
+            const int64_t& delete_row_id =
+                    _position_delete_ctx.delete_rows[_position_delete_ctx.index];
+            if (delete_row_id < start_row_id) {
+                _position_delete_ctx.index++;
+            } else if (delete_row_id < end_row_id) {
+                num_delete_rows++;
+                _position_delete_ctx.index++;
+            } else { // delete_row_id >= end_row_id
+                break;
+            }
+        }
+        *read_rows = end_row_id - start_row_id - num_delete_rows;
+        _position_delete_ctx.current_row_id = end_row_id;
+        *batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id;
     } else {
-        *read_rows = _remaining_rows;
-        _remaining_rows = 0;
-        *_batch_eof = true;
+        if (batch_size < _remaining_rows) {
+            *read_rows = batch_size;
+            _remaining_rows -= batch_size;
+            *batch_eof = false;
+        } else {
+            *read_rows = _remaining_rows;
+            _remaining_rows = 0;
+            *batch_eof = true;
+        }
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index ef73eba21c..53e8a052c0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -25,16 +25,18 @@
 
 namespace doris::vectorized {
 
-struct RowGroupIndex {
-    int32_t row_group_id;
-    int32_t first_row;
-    int32_t last_row;
-    RowGroupIndex(int32_t id, int32_t first, int32_t last)
-            : row_group_id(id), first_row(first), last_row(last) {}
-};
-
 class RowGroupReader {
 public:
+    static const std::vector<int64_t> NO_DELETE;
+
+    struct RowGroupIndex {
+        int32_t row_group_id;
+        int64_t first_row;
+        int64_t last_row;
+        RowGroupIndex(int32_t id, int64_t first, int64_t last)
+                : row_group_id(id), first_row(first), last_row(last) {}
+    };
+
     struct LazyReadContext {
         VExprContext* vconjunct_ctx = nullptr;
         bool can_lazy_read = false;
@@ -56,26 +58,66 @@ public:
         std::unordered_map<std::string, VExprContext*> missing_columns;
     };
 
+    /**
+     * Support row-level delete in iceberg:
+     * https://iceberg.apache.org/spec/#position-delete-files
+     */
+    struct PositionDeleteContext {
+        // the filtered rows in current row group
+        const std::vector<int64_t>& delete_rows;
+        // the first row id of current row group in parquet file
+        const int64_t first_row_id;
+        // the number of rows in current row group
+        const int64_t num_rows;
+        const int64_t last_row_id;
+        // current row id to read in the row group
+        int64_t current_row_id;
+        // start index in delete_rows
+        const int64_t start_index;
+        // end index in delete_rows
+        const int64_t end_index;
+        // current index in delete_rows
+        int64_t index;
+        const bool has_filter;
+
+        PositionDeleteContext(const std::vector<int64_t>& delete_rows, const int64_t num_rows,
+                              const int64_t first_row_id, const int64_t start_index,
+                              const int64_t end_index)
+                : delete_rows(delete_rows),
+                  first_row_id(first_row_id),
+                  num_rows(num_rows),
+                  last_row_id(first_row_id + num_rows),
+                  current_row_id(first_row_id),
+                  start_index(start_index),
+                  end_index(end_index),
+                  index(start_index),
+                  has_filter(end_index > start_index) {}
+
+        PositionDeleteContext(const int64_t num_rows, const int64_t first_row)
+                : PositionDeleteContext(NO_DELETE, num_rows, first_row, 0, 0) {}
+
+        PositionDeleteContext(const PositionDeleteContext& filter) = default;
+    };
+
     RowGroupReader(doris::FileReader* file_reader,
-                   const std::vector<ParquetReadColumn>& read_columns,
-                   const RowGroupIndex& _row_group_idx, const tparquet::RowGroup& row_group,
-                   cctz::time_zone* ctz, const LazyReadContext& lazy_read_ctx);
+                   const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id,
+                   const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
+                   const PositionDeleteContext& position_delete_ctx,
+                   const LazyReadContext& lazy_read_ctx);
 
     ~RowGroupReader();
-    Status init(const FieldDescriptor& schema,
+    Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
                 std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
-    Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* _batch_eof);
-    int64_t lazy_read_filtered_rows() { return _lazy_read_filtered_rows; }
-    const RowGroupIndex& index() { return _row_group_idx; }
-    void set_row_ranges(const std::vector<doris::vectorized::RowRange>& row_ranges);
+    Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof);
     int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; }
 
     ParquetColumnReader::Statistics statistics();
 
 private:
-    Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof);
+    void _merge_read_ranges(std::vector<RowRange>& row_ranges);
+    Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof);
     Status _read_column_data(Block* block, const std::vector<std::string>& columns,
-                             size_t batch_size, size_t* read_rows, bool* _batch_eof,
+                             size_t batch_size, size_t* read_rows, bool* batch_eof,
                              ColumnSelectVector& select_vector);
     Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof);
     const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool* can_filter_all);
@@ -92,10 +134,13 @@ private:
     doris::FileReader* _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
     const std::vector<ParquetReadColumn>& _read_columns;
-    const RowGroupIndex& _row_group_idx;
+    const int32_t _row_group_id;
     const tparquet::RowGroup& _row_group_meta;
     int64_t _remaining_rows;
     cctz::time_zone* _ctz;
+    PositionDeleteContext _position_delete_ctx;
+    // merge the row ranges generated from page index and position delete.
+    std::vector<RowRange> _read_ranges;
 
     const LazyReadContext& _lazy_read_ctx;
     int64_t _lazy_read_filtered_rows = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index c8d4f6b3c4..6613b1f987 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 
+#include "common/status.h"
 #include "io/file_factory.h"
 #include "parquet_pred_cmp.h"
 #include "parquet_thrift_util.h"
@@ -150,24 +151,12 @@ Status ParquetReader::_open_file() {
     return Status::OK();
 }
 
-Status ParquetReader::file_metadata(FileMetaData** metadata) {
-    Status open_status = _open_file();
-    if (!open_status.ok()) {
-        return open_status;
-    }
-    *metadata = _file_metadata.get();
-    return Status::OK();
-}
-
 Status ParquetReader::init_reader(
         const std::vector<std::string>& column_names,
         std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
         VExprContext* vconjunct_ctx, bool filter_groups) {
     SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
-    Status open_status = _open_file();
-    if (!open_status.ok()) {
-        return open_status;
-    }
+    RETURN_IF_ERROR(_open_file());
     _column_names = &column_names;
     _t_metadata = &_file_metadata->to_thrift();
     _total_groups = _t_metadata->row_groups.size();
@@ -182,7 +171,7 @@ Status ParquetReader::init_reader(
     RETURN_IF_ERROR(_init_read_columns());
     // build column predicates for column lazy read
     _lazy_read_ctx.vconjunct_ctx = vconjunct_ctx;
-    RETURN_IF_ERROR(_init_row_group_readers(filter_groups));
+    RETURN_IF_ERROR(_init_row_groups(filter_groups));
     return Status::OK();
 }
 
@@ -317,20 +306,9 @@ std::unordered_map<std::string, TypeDescriptor> ParquetReader::get_name_to_type(
     return map;
 }
 
-Status ParquetReader::get_parsered_schema(std::vector<std::string>* col_names,
-                                          std::vector<TypeDescriptor>* col_types) {
-    if (_file_reader == nullptr) {
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path,
-                                                        _scan_range.start_offset,
-                                                        _scan_range.file_size, 0, _file_reader));
-    }
-    if (_file_metadata == nullptr) {
-        RETURN_IF_ERROR(_file_reader->open());
-        if (_file_reader->size() == 0) {
-            return Status::EndOfFile("Empty Parquet File");
-        }
-        RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
-    }
+Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
+                                        std::vector<TypeDescriptor>* col_types) {
+    RETURN_IF_ERROR(_open_file());
     _t_metadata = &_file_metadata->to_thrift();
 
     _total_groups = _t_metadata->row_groups.size();
@@ -362,143 +340,90 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
     return Status::OK();
 }
 
-void ParquetReader::merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges) {
-    if (_row_ranges.empty()) {
-        _current_group_reader->set_row_ranges(_row_ranges);
-        return;
-    }
-    if (!delete_row_ranges.empty()) {
-        std::vector<RowRange> candidate_ranges;
-        auto start_range = _row_ranges.begin();
-        auto delete_range = delete_row_ranges.begin();
-        int64_t processed_range_start_idx = start_range->first_row;
-        while (start_range != _row_ranges.end() && delete_range != delete_row_ranges.end()) {
-            int64_t delete_start = delete_range->first_row;
-            int64_t delete_end = delete_range->last_row;
-            int64_t range_start = start_range->first_row;
-            int64_t range_end = start_range->last_row;
-            if (delete_end > range_end) {
-                if (range_start < processed_range_start_idx) {
-                    // rows before processed_range_start_idx have been processed
-                    range_start = processed_range_start_idx;
-                }
-                if (range_end < delete_start) {
-                    /**
-                     *      start_range
-                     *    || --------- || - |--------- |
-                     *                      delete_range
-                     */
-                    candidate_ranges.emplace_back(range_start, range_end);
-                } else if (range_start < delete_start) {
-                    /**
-                     *          row_range
-                     *    || --------|-------- || ----- |
-                     *         delete_start       delete_end
-                     */
-                    candidate_ranges.emplace_back(range_start, delete_start);
-                }
-                // range_end > delete_end && range_start > delete_start
-                start_range++;
-            } else {
-                // delete_end < range_end,most of the time, we will use this branch
-                if (processed_range_start_idx < delete_start) {
-                    /**
-                     *   row_range_start           row_range_end
-                     *       || --- | --------- | --- ||
-                     *               delete_range
-                     */
-                    candidate_ranges.emplace_back(processed_range_start_idx, delete_start);
-                }
-                // delete_end is in row_range, so it can assign to processed_range_start_idx
-                processed_range_start_idx = delete_end;
-                delete_range++;
-                if (delete_range == delete_row_ranges.end()) {
-                    range_end = _row_ranges[_row_ranges.size() - 1].last_row;
-                    if (processed_range_start_idx != range_end) {
-                        candidate_ranges.emplace_back(processed_range_start_idx, range_end);
-                    }
-                }
-            }
-        }
-        _row_ranges.assign(candidate_ranges.begin(), candidate_ranges.end());
-    }
-    _current_group_reader->set_row_ranges(_row_ranges);
-}
-
 Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
-    int32_t num_of_readers = _row_group_readers.size();
-    DCHECK(num_of_readers <= _read_row_groups.size());
-    if (_read_row_groups.empty()) {
-        *eof = true;
-        return Status::OK();
+    if (_current_group_reader == nullptr) {
+        if (_read_row_groups.size() > 0) {
+            RETURN_IF_ERROR(_next_row_group_reader());
+        } else {
+            *read_rows = 0;
+            *eof = true;
+            return Status::OK();
+        }
     }
-    bool _batch_eof = false;
+    DCHECK(_current_group_reader != nullptr);
+    bool batch_eof = false;
     {
         SCOPED_RAW_TIMER(&_statistics.column_read_time);
         RETURN_IF_ERROR(
-                _current_group_reader->next_batch(block, _batch_size, read_rows, &_batch_eof));
+                _current_group_reader->next_batch(block, _batch_size, read_rows, &batch_eof));
     }
-    if (_batch_eof) {
+    if (batch_eof) {
         auto column_st = _current_group_reader->statistics();
         _column_statistics.merge(column_st);
         _statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows();
-        if (!_next_row_group_reader()) {
+        Status st = _next_row_group_reader();
+        if (st.is_end_of_file()) {
             *eof = true;
+        } else if (!st.ok()) {
+            return st;
         }
     }
-    VLOG_DEBUG << "ParquetReader::get_next_block: " << block->rows();
     return Status::OK();
 }
 
-bool ParquetReader::_next_row_group_reader() {
-    if (_row_group_readers.empty()) {
-        return false;
-    }
-    _current_group_reader = _row_group_readers.front();
-    _row_group_readers.pop_front();
-    return true;
+RowGroupReader::PositionDeleteContext ParquetReader::_get_position_delete_ctx(
+        const tparquet::RowGroup& row_group, const RowGroupReader::RowGroupIndex& row_group_index) {
+    if (_delete_rows == nullptr) {
+        return RowGroupReader::PositionDeleteContext(row_group.num_rows, row_group_index.first_row);
+    }
+    int64_t* delete_rows = const_cast<int64_t*>(&(*_delete_rows)[0]);
+    int64_t* delete_rows_end = delete_rows + _delete_rows->size();
+    int64_t* start_pos = std::lower_bound(delete_rows + _delete_rows_index, delete_rows_end,
+                                          row_group_index.first_row);
+    int64_t start_index = start_pos - delete_rows;
+    int64_t* end_pos = std::lower_bound(start_pos, delete_rows_end, row_group_index.last_row);
+    int64_t end_index = end_pos - delete_rows;
+    _delete_rows_index = end_index;
+    return RowGroupReader::PositionDeleteContext(*_delete_rows, row_group.num_rows,
+                                                 row_group_index.first_row, start_index, end_index);
 }
 
-Status ParquetReader::_init_row_group_readers(const bool& filter_groups) {
-    std::vector<RowGroupIndex> group_indexes;
-    RETURN_IF_ERROR(_filter_row_groups(filter_groups, group_indexes));
-    DCHECK_EQ(group_indexes.size(), _read_row_groups.size());
-    auto group_index = group_indexes.begin();
-    for (auto row_group_id : _read_row_groups) {
-        auto& row_group = _t_metadata->row_groups[row_group_id];
-        std::shared_ptr<RowGroupReader> row_group_reader;
-        row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, *group_index,
-                                                  row_group, _ctz, _lazy_read_ctx));
-        group_index++;
-        RETURN_IF_ERROR(_process_page_index(row_group));
-        if (_row_ranges.empty()) {
-            _row_ranges.emplace_back(0, row_group.num_rows);
-            _statistics.read_rows += row_group.num_rows;
-        }
-        RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), _col_offsets));
-        row_group_reader->set_row_ranges(_row_ranges);
-        _row_group_readers.emplace_back(row_group_reader);
-    }
-    if (!_next_row_group_reader()) {
-        return Status::EndOfFile("No next reader");
-    }
-    return Status::OK();
+Status ParquetReader::_next_row_group_reader() {
+    if (_read_row_groups.empty()) {
+        _current_group_reader.reset(nullptr);
+        return Status::EndOfFile("No next RowGroupReader");
+    }
+    RowGroupReader::RowGroupIndex row_group_index = _read_row_groups.front();
+    _read_row_groups.pop_front();
+
+    // process page index and generate the ranges to read
+    auto& row_group = _t_metadata->row_groups[row_group_index.row_group_id];
+    std::vector<RowRange> candidate_row_ranges;
+    RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges));
+
+    RowGroupReader::PositionDeleteContext position_delete_ctx =
+            _get_position_delete_ctx(row_group, row_group_index);
+    _current_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns,
+                                                   row_group_index.row_group_id, row_group, _ctz,
+                                                   position_delete_ctx, _lazy_read_ctx));
+    return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges,
+                                       _col_offsets);
 }
 
-Status ParquetReader::_filter_row_groups(const bool& enabled,
-                                         std::vector<RowGroupIndex>& group_indexes) {
+Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
     SCOPED_RAW_TIMER(&_statistics.row_group_filter_time);
-    if (enabled && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
-        return Status::EndOfFile("No row group need read");
+    if (is_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
+        return Status::EndOfFile("No row group to read");
     }
-    int32_t start_row_id = 0;
+    int64_t row_index = 0;
     for (int32_t row_group_idx = 0; row_group_idx < _total_groups; row_group_idx++) {
         const tparquet::RowGroup& row_group = _t_metadata->row_groups[row_group_idx];
-        if (enabled && _is_misaligned_range_group(row_group)) {
+        if (is_filter_groups && _is_misaligned_range_group(row_group)) {
+            row_index += row_group.num_rows;
             continue;
         }
         bool filter_group = false;
-        if (enabled) {
+        if (is_filter_groups) {
             RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
         }
         int64_t group_size = 0; // only calculate the needed columns
@@ -508,13 +433,12 @@ Status ParquetReader::_filter_row_groups(const bool& enabled,
                 group_size += row_group.columns[parquet_col_id].meta_data.total_compressed_size;
             }
         }
-        //  record row group physical id
-        int32_t first_row_index = start_row_id;
-        int32_t last_row_index = first_row_index + row_group.num_rows;
-        start_row_id = last_row_index + 1;
         if (!filter_group) {
-            group_indexes.emplace_back(row_group_idx, first_row_index, last_row_index);
-            _read_row_groups.emplace_back(row_group_idx);
+            _read_row_groups.emplace_back(row_group_idx, row_index, row_index + row_group.num_rows);
+            if (_statistics.read_row_groups == 0) {
+                _whole_range.first_row = row_index;
+            }
+            _whole_range.last_row = row_index + row_group.num_rows;
             _statistics.read_row_groups++;
             _statistics.read_bytes += group_size;
         } else {
@@ -522,6 +446,11 @@ Status ParquetReader::_filter_row_groups(const bool& enabled,
             _statistics.filtered_bytes += group_size;
             _statistics.filtered_group_rows += row_group.num_rows;
         }
+        row_index += row_group.num_rows;
+    }
+
+    if (_read_row_groups.empty()) {
+        return Status::EndOfFile("No row group to read");
     }
     return Status::OK();
 }
@@ -545,13 +474,22 @@ bool ParquetReader::_has_page_index(const std::vector<tparquet::ColumnChunk>& co
     return page_index.check_and_get_page_index_ranges(columns);
 }
 
-Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
+Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
+                                          std::vector<RowRange>& candidate_row_ranges) {
     SCOPED_RAW_TIMER(&_statistics.page_index_filter_time);
+
+    std::function<void()> read_whole_row_group = [&]() {
+        candidate_row_ranges.emplace_back(0, row_group.num_rows);
+        _statistics.read_rows += row_group.num_rows;
+    };
+
     if (_colname_to_value_range == nullptr || _colname_to_value_range->empty()) {
+        read_whole_row_group();
         return Status::OK();
     }
     PageIndex page_index;
     if (!_has_page_index(row_group.columns, page_index)) {
+        read_whole_row_group();
         return Status::OK();
     }
     uint8_t col_index_buff[page_index._column_index_size];
@@ -573,12 +511,12 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
         auto& chunk = row_group.columns[read_col._parquet_col_id];
         tparquet::ColumnIndex column_index;
         if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
-            return Status::OK();
+            continue;
         }
         RETURN_IF_ERROR(page_index.parse_column_index(chunk, col_index_buff, &column_index));
         const int num_of_pages = column_index.null_pages.size();
         if (num_of_pages <= 0) {
-            break;
+            continue;
         }
         auto& conjuncts = conjunct_iter->second;
         std::vector<int> skipped_page_range;
@@ -600,6 +538,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
         _col_offsets.emplace(read_col._parquet_col_id, offset_index);
     }
     if (skipped_row_ranges.empty()) {
+        read_whole_row_group();
         return Status::OK();
     }
 
@@ -617,14 +556,14 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
             }
         } else {
             // read row with candidate ranges rather than skipped ranges
-            _row_ranges.emplace_back(skip_end, skip_range.first_row);
+            candidate_row_ranges.emplace_back(skip_end, skip_range.first_row);
             read_rows += skip_range.first_row - skip_end;
             skip_end = skip_range.last_row;
         }
     }
     DCHECK_LE(skip_end, row_group.num_rows);
     if (skip_end != row_group.num_rows) {
-        _row_ranges.emplace_back(skip_end, row_group.num_rows);
+        candidate_row_ranges.emplace_back(skip_end, row_group.num_rows);
         read_rows += row_group.num_rows - skip_end;
     }
     _statistics.read_rows += read_rows;
@@ -676,13 +615,13 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co
 void ParquetReader::_init_chunk_dicts() {}
 
 Status ParquetReader::_process_dict_filter(bool* filter_group) {
-    return Status();
+    return Status::OK();
 }
 
 void ParquetReader::_init_bloom_filter() {}
 
 Status ParquetReader::_process_bloom_filter(bool* filter_group) {
-    return Status();
+    return Status::OK();
 }
 
 int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index f5891754e7..2bfc74f823 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -77,9 +77,10 @@ public:
 
     void close();
 
-    Status file_metadata(FileMetaData** metadata);
+    RowRange get_whole_range() { return _whole_range; }
 
-    void merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges);
+    // set the delete rows in current parquet file
+    void set_delete_rows(const std::vector<int64_t>* delete_rows) { _delete_rows = delete_rows; }
 
     int64_t size() const { return _file_reader->size(); }
 
@@ -87,8 +88,8 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
-    Status get_parsered_schema(std::vector<std::string>* col_names,
-                               std::vector<TypeDescriptor>* col_types) override;
+    Status get_parsed_schema(std::vector<std::string>* col_names,
+                             std::vector<TypeDescriptor>* col_types) override;
 
     Statistics& statistics() { return _statistics; }
 
@@ -125,12 +126,16 @@ private:
 
     Status _open_file();
     void _init_profile();
-    bool _next_row_group_reader();
+    Status _next_row_group_reader();
+    RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
+            const tparquet::RowGroup& row_group,
+            const RowGroupReader::RowGroupIndex& row_group_index);
     Status _init_read_columns();
-    Status _init_row_group_readers(const bool& filter_groups);
+    Status _init_row_groups(const bool& is_filter_groups);
     // Page Index Filter
     bool _has_page_index(const std::vector<tparquet::ColumnChunk>& columns, PageIndex& page_index);
-    Status _process_page_index(const tparquet::RowGroup& row_group);
+    Status _process_page_index(const tparquet::RowGroup& row_group,
+                               std::vector<RowRange>& candidate_row_ranges);
 
     // Row Group Filter
     bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
@@ -141,28 +146,27 @@ private:
     Status _process_dict_filter(bool* filter_group);
     void _init_bloom_filter();
     Status _process_bloom_filter(bool* filter_group);
-    Status _filter_row_groups(const bool& enabled, std::vector<RowGroupIndex>& group_indexes);
     int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers);
 
-private:
     RuntimeProfile* _profile;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
     std::unique_ptr<FileReader> _file_reader = nullptr;
-    std::vector<RowRange> _row_ranges;
     std::shared_ptr<FileMetaData> _file_metadata;
     const tparquet::FileMetaData* _t_metadata;
-    std::list<std::shared_ptr<RowGroupReader>> _row_group_readers;
-    std::shared_ptr<RowGroupReader> _current_group_reader;
+    std::unique_ptr<RowGroupReader> _current_group_reader;
     int32_t _total_groups;                  // num of groups(stripes) of a parquet(orc) file
     std::map<std::string, int> _map_column; // column-name <---> column-index
     std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
     std::vector<ParquetReadColumn> _read_columns;
+    RowRange _whole_range = RowRange(0, 0);
+    const std::vector<int64_t>* _delete_rows = nullptr;
+    int64_t _delete_rows_index = 0;
 
     // Used for column lazy read.
     RowGroupReader::LazyReadContext _lazy_read_ctx;
 
-    std::list<int32_t> _read_row_groups;
+    std::list<RowGroupReader::RowGroupIndex> _read_row_groups;
     // parquet file reader object
     size_t _batch_size;
     int64_t _range_start_offset;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 9e2f659772..332aa44e54 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -17,12 +17,10 @@
 
 #include "iceberg_reader.h"
 
-#include <vec/core/column_with_type_and_name.h>
-#include <vec/exec/format/parquet/vparquet_reader.h>
-
-#include <vec/data_types/data_type_factory.hpp>
-
 #include "vec/common/assert_cast.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/exec/format/parquet/vparquet_reader.h"
 
 namespace doris::vectorized {
 
@@ -34,9 +32,11 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim
         : TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
-    _iceberg_profile._delete_files_init_time =
-            ADD_CHILD_TIMER(_profile, "DeleteFileInitTime", iceberg_profile);
-    _iceberg_profile._delete_files_read_total_time =
+    _iceberg_profile.num_delete_files =
+            ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile);
+    _iceberg_profile.num_delete_rows =
+            ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile);
+    _iceberg_profile.delete_files_read_time =
             ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
 }
 
@@ -63,162 +63,159 @@ Status IcebergTableReader::get_columns(
     return _file_format_reader->get_columns(name_to_type, missing_cols);
 }
 
-void IcebergTableReader::filter_rows(const TFileRangeDesc& range) {
-    if (_cur_delete_file_reader == nullptr) {
-        return;
-    }
-    SCOPED_TIMER(_iceberg_profile._delete_files_read_total_time);
+Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
     auto& table_desc = range.table_format_params.iceberg_params;
     auto& version = table_desc.format_version;
     if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
-        return;
+        return Status::OK();
     }
-    bool eof = false;
-    std::set<RowRange> delete_row_ranges;
-    while (!eof) {
-        size_t read_rows = 0;
-        Block block = Block();
-        for (const FieldSchema& field : _column_schemas) {
-            DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field.type, true);
-            MutableColumnPtr data_column = data_type->create_column();
-            block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field.name));
-        }
-        Status st = _cur_delete_file_reader->get_next_block(&block, &read_rows, &eof);
-        if (!st.ok() || eof) {
-            if (!_delete_file_readers.empty()) {
-                eof = false;
-                _cur_delete_file_reader = std::move(_delete_file_readers.front());
-                _delete_file_readers.pop_front();
-            }
-        }
-        if (read_rows != 0) {
-            auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS);
-            ColumnPtr pos_column = pos_type_column.column;
-            using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
-            if (pos_type_column.type->is_nullable()) {
-                pos_column =
-                        assert_cast<const ColumnNullable&>(*pos_column).get_nested_column_ptr();
-            }
-            auto& data = assert_cast<const ColumnType&>(*pos_column).get_data();
-            std::vector<int64_t> delete_row_ids;
-            for (int row_id = 0; row_id < read_rows; row_id++) {
-                delete_row_ids.emplace_back(data[row_id]);
+    auto& delete_file_type = table_desc.content;
+    auto files = table_desc.delete_files;
+    if (files.empty()) {
+        return Status::OK();
+    }
+    if (delete_file_type == POSITION_DELETE) {
+        // position delete
+        SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
+        auto row_desc = RowDescriptor(_state->desc_tbl(),
+                                      std::vector<TupleId>({table_desc.delete_table_tuple_id}),
+                                      std::vector<bool>({false}));
+        RETURN_IF_ERROR(VExpr::create_expr_tree(_state->obj_pool(), table_desc.file_select_conjunct,
+                                                &_data_path_conjunct_ctx));
+        RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc));
+        RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state));
+
+        ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
+        RowRange whole_range = parquet_reader->get_whole_range();
+        bool init_schema = false;
+        std::vector<std::string> delete_file_col_names;
+        std::vector<TypeDescriptor> delete_file_col_types;
+        std::list<std::vector<int64_t>> delete_rows_list;
+        delete_rows_list.resize(files.size());
+        int64_t num_delete_rows = 0;
+        auto delete_rows_iter = delete_rows_list.begin();
+        for (auto& delete_file : files) {
+            if (whole_range.last_row <= delete_file.position_lower_bound ||
+                whole_range.first_row > delete_file.position_upper_bound) {
+                delete_rows_iter++;
+                continue;
             }
-            if (delete_row_ids.empty()) {
-                return;
+            std::vector<int64_t>& delete_rows = *delete_rows_iter;
+            TFileRangeDesc delete_range;
+            delete_range.path = delete_file.path;
+            delete_range.start_offset = 0;
+            delete_range.size = -1;
+            delete_range.file_size = -1;
+            ParquetReader delete_reader(_profile, _params, delete_range, 102400,
+                                        const_cast<cctz::time_zone*>(&_state->timezone_obj()));
+            if (!init_schema) {
+                delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
+                init_schema = true;
             }
-
-            int num_deleted_ids = delete_row_ids.size();
-            int i = 0;
-            while (i < num_deleted_ids) {
-                int64_t row_id = delete_row_ids[i];
-                int64_t row_range_start = row_id;
-                int64_t row_range_end = row_id;
-                while (i + 1 < num_deleted_ids) {
-                    if (delete_row_ids[i + 1] == delete_row_ids[i] + 1) {
-                        row_range_end = delete_row_ids[i + 1];
-                        i++;
-                        continue;
-                    } else {
-                        delete_row_ranges.emplace(row_range_start, row_range_end + 1);
-                        row_range_start = ++row_range_end;
-                        break;
-                    }
+            RETURN_IF_ERROR(delete_reader.init_reader(delete_file_col_names, nullptr,
+                                                      _data_path_conjunct_ctx, false));
+            std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
+                    partition_columns;
+            std::unordered_map<std::string, VExprContext*> missing_columns;
+            delete_reader.set_fill_columns(partition_columns, missing_columns);
+
+            bool eof = false;
+            while (!eof) {
+                Block block = Block();
+                for (int i = 0; i < delete_file_col_names.size(); ++i) {
+                    DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
+                            delete_file_col_types[i], true);
+                    MutableColumnPtr data_column = data_type->create_column();
+                    block.insert(ColumnWithTypeAndName(std::move(data_column), data_type,
+                                                       delete_file_col_names[i]));
                 }
-                if (i == num_deleted_ids - 1) {
-                    delete_row_ranges.emplace(row_range_start,
-                                              delete_row_ids[num_deleted_ids - 1] + 1);
+                eof = false;
+                size_t read_rows = 0;
+                RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
+                if (read_rows > 0) {
+                    auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS);
+                    ColumnPtr pos_column = pos_type_column.column;
+                    using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
+                    if (pos_type_column.type->is_nullable()) {
+                        pos_column = assert_cast<const ColumnNullable&>(*pos_column)
+                                             .get_nested_column_ptr();
+                    }
+                    const int64_t* src_data =
+                            assert_cast<const ColumnType&>(*pos_column).get_data().data();
+                    const int64_t* src_data_end = src_data + read_rows;
+                    const int64_t* cpy_start =
+                            std::lower_bound(src_data, src_data_end, whole_range.first_row);
+                    const int64_t* cpy_end =
+                            std::lower_bound(cpy_start, src_data_end, whole_range.last_row);
+                    int64_t cpy_count = cpy_end - cpy_start;
+
+                    if (cpy_count > 0) {
+                        int64_t origin_size = delete_rows.size();
+                        delete_rows.resize(origin_size + cpy_count);
+                        int64_t* dest_position = &delete_rows[origin_size];
+                        memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
+                        num_delete_rows += cpy_count;
+                    }
                 }
-                row_range_start = delete_row_ids[i + 1];
-                i++;
             }
+            delete_rows_iter++;
         }
-    }
-    if (VLOG_IS_ON(3)) {
-        if (!delete_row_ranges.empty()) {
-            std::stringstream out;
-            out << "[";
-            for (const RowRange& delete_row_range : delete_row_ranges) {
-                out << " " << delete_row_range.debug_string();
+        if (num_delete_rows > 0) {
+            for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end();) {
+                if (iter->empty()) {
+                    delete_rows_list.erase(iter++);
+                } else {
+                    iter++;
+                }
             }
-            out << " ]";
-            VLOG_NOTICE << "Delete row range info: " << out.str();
+            _merge_sort(delete_rows_list, num_delete_rows);
+            parquet_reader->set_delete_rows(&_delete_rows);
+            COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
         }
     }
-    ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
-    parquet_reader->merge_delete_row_ranges(delete_row_ranges);
+    // todo: equality delete
+    COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
+    return Status::OK();
 }
 
-Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
-    auto& table_desc = range.table_format_params.iceberg_params;
-    auto& version = table_desc.format_version;
-    if (version >= MIN_SUPPORT_DELETE_FILES_VERSION) {
-        SCOPED_TIMER(_iceberg_profile._delete_files_init_time);
-        auto& delete_file_type = table_desc.content;
-        auto files = table_desc.delete_files;
-        if (delete_file_type == POSITON_DELELE) {
-            // position delete
-            auto row_desc = RowDescriptor(_state->desc_tbl(),
-                                          std::vector<TupleId>({table_desc.delete_table_tuple_id}),
-                                          std::vector<bool>({false}));
-            RETURN_IF_ERROR(VExpr::create_expr_tree(
-                    _state->obj_pool(), table_desc.file_select_conjunct, &_data_path_conjunct_ctx));
-            RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc));
-            RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state));
-            vector<std::string> names;
-            for (auto& delete_file : files) {
-                _position_delete_params.low_bound_index = delete_file.position_lower_bound;
-                _position_delete_params.upper_bound_index = delete_file.position_upper_bound;
-
-                TFileRangeDesc delete_range;
-                delete_range.path = delete_file.path;
-                delete_range.start_offset = 0;
-                delete_range.size = -1;
-                delete_range.file_size = -1;
-                ParquetReader* delete_reader = new ParquetReader(
-                        _profile, _params, delete_range, _state->query_options().batch_size,
-                        const_cast<cctz::time_zone*>(&_state->timezone_obj()));
-                if (_delete_file_schema == nullptr) {
-                    FileMetaData* metadata = nullptr;
-                    RETURN_IF_ERROR(delete_reader->file_metadata(&metadata));
-                    if (metadata == nullptr) {
-                        break;
-                    }
-                    _delete_file_schema = &metadata->schema();
-                    int num_of_col = _delete_file_schema->size();
-                    for (auto i = 0; i < num_of_col; ++i) {
-                        const FieldSchema* field = _delete_file_schema->get_column(i);
-                        _column_schemas.emplace_back(*field);
-                        names.emplace_back(field->name);
-                    }
-                }
-                DCHECK_EQ(_column_schemas.size(), _delete_file_schema->size());
-                // The expr assure reading delete file data from current file range only
-                Status d_st =
-                        delete_reader->init_reader(names, nullptr, _data_path_conjunct_ctx, false);
-                std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
-                        partition_columns;
-                std::unordered_map<std::string, VExprContext*> missing_columns;
-                delete_reader->set_fill_columns(partition_columns, missing_columns);
-
-                _delete_file_readers.emplace_back((GenericReader*)delete_reader);
-
-                ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
-                FileMetaData* file_metadata = nullptr;
-                RETURN_IF_ERROR(parquet_reader->file_metadata(&file_metadata));
-                _position_delete_params.total_file_rows = file_metadata->to_thrift().num_rows;
-            }
-            if (!_delete_file_readers.empty()) {
-                _cur_delete_file_reader = std::move(_delete_file_readers.front());
-                _delete_file_readers.pop_front();
-            } else {
-                _cur_delete_file_reader = nullptr;
-            }
+void IcebergTableReader::_merge_sort(std::list<std::vector<int64_t>>& delete_rows_list,
+                                     int64_t num_delete_rows) {
+    if (delete_rows_list.empty()) {
+        return;
+    }
+    if (delete_rows_list.size() == 1) {
+        _delete_rows.resize(num_delete_rows);
+        memcpy(&_delete_rows[0], &(delete_rows_list.front()[0]), sizeof(int64_t) * num_delete_rows);
+        return;
+    }
+    if (delete_rows_list.size() == 2) {
+        _delete_rows.resize(num_delete_rows);
+        std::merge(delete_rows_list.front().begin(), delete_rows_list.front().end(),
+                   delete_rows_list.back().begin(), delete_rows_list.back().end(),
+                   _delete_rows.begin());
+        return;
+    }
+
+    // merge sort
+    using vec_pair =
+            std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::const_iterator>;
+    auto cmp = [](const vec_pair& a, const vec_pair& b) { return *a.first > *b.first; };
+    std::priority_queue<vec_pair, vector<vec_pair>, decltype(cmp)> pq(cmp);
+    for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end(); ++iter) {
+        if (iter->size() > 0) {
+            pq.push({iter->begin(), iter->end()});
+        }
+    }
+    _delete_rows.reserve(num_delete_rows);
+    while (!pq.empty()) {
+        vec_pair p = pq.top();
+        pq.pop();
+        _delete_rows.emplace_back(*p.first);
+        p.first++;
+        if (p.first != p.second) {
+            pq.push(p);
         }
     }
-    // todo: equality delete
-    filter_rows(range);
-    return Status::OK();
 }
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index c71ffdaa9c..0a9d4ef71f 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -17,12 +17,11 @@
 
 #pragma once
 
-#include <vec/exec/format/parquet/parquet_common.h>
-
 #include <queue>
 
 #include "table_format_reader.h"
 #include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/parquet/parquet_common.h"
 #include "vec/exprs/vexpr.h"
 
 namespace doris::vectorized {
@@ -32,8 +31,8 @@ public:
     IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
                        RuntimeState* state, const TFileScanRangeParams& params);
     ~IcebergTableReader() override;
-    Status init_row_filters(const TFileRangeDesc& range);
-    void filter_rows(const TFileRangeDesc& range) override;
+
+    Status init_row_filters(const TFileRangeDesc& range) override;
 
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
 
@@ -45,30 +44,23 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
-public:
-    enum { DATA, POSITON_DELELE, EQULITY_DELELE };
-    struct PositionDeleteParams {
-        int64_t low_bound_index = -1;
-        int64_t upper_bound_index = -1;
-        int64_t last_delete_row_index = -1;
-        int64_t total_file_rows = 0;
-    };
+    enum { DATA, POSITION_DELETE, EQUALITY_DELETE };
 
 private:
     struct IcebergProfile {
-        RuntimeProfile::Counter* _delete_files_init_time;
-        RuntimeProfile::Counter* _delete_files_read_total_time;
+        RuntimeProfile::Counter* num_delete_files;
+        RuntimeProfile::Counter* num_delete_rows;
+        RuntimeProfile::Counter* delete_files_read_time;
     };
+
+    void _merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows);
+
     RuntimeProfile* _profile;
     RuntimeState* _state;
     const TFileScanRangeParams& _params;
-    std::vector<FieldSchema> _column_schemas;
-    std::deque<std::unique_ptr<GenericReader>> _delete_file_readers;
-    std::unique_ptr<GenericReader> _cur_delete_file_reader;
-    PositionDeleteParams _position_delete_params;
-    const FieldDescriptor* _delete_file_schema = nullptr;
     VExprContext* _data_path_conjunct_ctx = nullptr;
     IcebergProfile _iceberg_profile;
+    std::vector<int64_t> _delete_rows;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h
index 188d41a2b9..1c2ebec7d3 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -17,12 +17,11 @@
 
 #pragma once
 
-#include <vec/exec/format/parquet/parquet_common.h>
-
 #include <string>
 
 #include "runtime/runtime_state.h"
 #include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/parquet/parquet_common.h"
 
 namespace doris::vectorized {
 
@@ -38,7 +37,7 @@ public:
         return _file_format_reader->get_columns(name_to_type, missing_cols);
     }
 
-    virtual void filter_rows(const TFileRangeDesc& range) = 0;
+    virtual Status init_row_filters(const TFileRangeDesc& range) = 0;
 
 protected:
     std::string _table_format;                          // hudi, iceberg
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c8eeb5d00b..d1f59e9234 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -413,13 +413,13 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
     TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
     auto row_group = t_metadata.row_groups[0];
     std::shared_ptr<RowGroupReader> row_group_reader;
-    row_group_reader.reset(new RowGroupReader(&file_reader, read_columns,
-                                              RowGroupIndex(0, 0, 10000), row_group, &ctz,
-                                              lazy_read_ctx));
+    RowGroupReader::PositionDeleteContext position_delete_ctx(row_group.num_rows, 0);
+    row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group, &ctz,
+                                              position_delete_ctx, lazy_read_ctx));
+    std::vector<RowRange> row_ranges;
+    row_ranges.emplace_back(0, row_group.num_rows);
     auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>();
-    auto stg = row_group_reader->init(meta_data->schema(), col_offsets);
-    std::vector<RowRange> row_ranges = std::vector<RowRange>();
-    row_group_reader->set_row_ranges(row_ranges);
+    auto stg = row_group_reader->init(meta_data->schema(), row_ranges, col_offsets);
     EXPECT_TRUE(stg.ok());
 
     vectorized::Block block;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org