You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/18 12:51:12 UTC

[doris] branch master updated: [feature](multi-catalog) read parquet file by start/offset (#10843)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a366c9ba2 [feature](multi-catalog) read parquet file by start/offset (#10843)
8a366c9ba2 is described below

commit 8a366c9ba2fc56c523708b3ced9e65e69f17f9ae
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Mon Jul 18 20:51:08 2022 +0800

    [feature](multi-catalog) read parquet file by start/offset (#10843)
    
    To avoid reading the repeat row group, we should align offsets
---
 be/src/common/config.h                         |   2 +-
 be/src/exec/arrow/parquet_reader.cpp           |  15 ++-
 be/src/exec/arrow/parquet_reader.h             |   6 +-
 be/src/exec/arrow/parquet_row_group_reader.cpp | 144 ++++++++++++++++++-------
 be/src/exec/arrow/parquet_row_group_reader.h   |  17 ++-
 be/src/exec/parquet_scanner.cpp                |   2 +-
 be/src/vec/exec/file_arrow_scanner.cpp         |  16 ++-
 be/src/vec/exec/file_arrow_scanner.h           |   9 +-
 be/src/vec/exec/varrow_scanner.cpp             |   5 +-
 be/src/vec/exec/varrow_scanner.h               |   3 +-
 be/src/vec/exec/vorc_scanner.cpp               |   3 +-
 be/src/vec/exec/vorc_scanner.h                 |   3 +-
 be/src/vec/exec/vparquet_scanner.cpp           |   7 +-
 be/src/vec/exec/vparquet_scanner.h             |   3 +-
 14 files changed, 169 insertions(+), 66 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index ff4184eb62..f0fb39800e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -769,7 +769,7 @@ CONF_Int32(object_pool_buffer_size, "100");
 
 // ParquetReaderWrap prefetch buffer size
 CONF_Int32(parquet_reader_max_buffer_size, "50");
-CONF_Bool(parquet_predicate_push_down, "false");
+CONF_Bool(parquet_predicate_push_down, "true");
 
 // When the rows number reached this limit, will check the filter rate the of bloomfilter
 // if it is lower than a specific threshold, the predicate will be disabled.
diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
index c14022bc53..c0f45f52af 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -37,11 +37,14 @@ namespace doris {
 
 // Broker
 ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
-                                     int32_t num_of_columns_from_file)
+                                     int32_t num_of_columns_from_file, int64_t range_start_offset,
+                                     int64_t range_size)
         : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
           _rows_of_group(0),
           _current_line_of_group(0),
-          _current_line_of_batch(0) {}
+          _current_line_of_batch(0),
+          _range_start_offset(range_start_offset),
+          _range_size(range_size) {}
 
 ParquetReaderWrap::~ParquetReaderWrap() {
     _closed = true;
@@ -101,8 +104,12 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
 
         RETURN_IF_ERROR(column_indices(tuple_slot_descs));
         if (config::parquet_predicate_push_down) {
-            _row_group_reader.reset(new RowGroupReader(conjunct_ctxs, _file_metadata, this));
-            _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids);
+            int64_t file_size = 0;
+            size(&file_size);
+            _row_group_reader.reset(new RowGroupReader(_range_start_offset, _range_size,
+                                                       conjunct_ctxs, _file_metadata, this));
+            _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids,
+                                                  file_size);
         }
         _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
         return Status::OK();
diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h
index 6aaab5d24d..3bf4cf4814 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -62,8 +62,8 @@ class RowGroupReader;
 class ParquetReaderWrap final : public ArrowReaderWrap {
 public:
     // batch_size is not use here
-    ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
-                      int32_t num_of_columns_from_file);
+    ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file,
+                      int64_t range_start_offset, int64_t range_size);
     ~ParquetReaderWrap() override;
 
     // Read
@@ -100,6 +100,8 @@ private:
     int _current_line_of_group;
     int _current_line_of_batch;
     std::string _timezone;
+    int64_t _range_start_offset;
+    int64_t _range_size;
 
 private:
     std::atomic<bool> _closed = false;
diff --git a/be/src/exec/arrow/parquet_row_group_reader.cpp b/be/src/exec/arrow/parquet_row_group_reader.cpp
index bbc607f02b..517309db76 100644
--- a/be/src/exec/arrow/parquet_row_group_reader.cpp
+++ b/be/src/exec/arrow/parquet_row_group_reader.cpp
@@ -57,30 +57,37 @@
         return true;                                  \
     }
 
-#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
-    std::vector<T> in_values;                                        \
-    for (auto val : in_pred_values) {                                \
-        T value = reinterpret_cast<T*>(val)[0];                      \
-        in_values.emplace_back(value);                               \
-    }                                                                \
-    if (in_values.empty()) {                                         \
-        return false;                                                \
-    }                                                                \
-    std::sort(in_values.begin(), in_values.end());                   \
-    T in_min = in_values.front();                                    \
-    T in_max = in_values.back();                                     \
-    const T group_min = reinterpret_cast<const T*>(min_bytes)[0];    \
-    const T group_max = reinterpret_cast<const T*>(max_bytes)[0];    \
-    if (in_max < group_min || in_min > group_max) {                  \
-        return true;                                                 \
+#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes)       \
+    std::vector<T> in_values;                                              \
+    for (auto val : in_pred_values) {                                      \
+        T value = reinterpret_cast<T*>(val)[0];                            \
+        in_values.emplace_back(value);                                     \
+    }                                                                      \
+    if (in_values.empty()) {                                               \
+        return false;                                                      \
+    }                                                                      \
+    auto result = std::minmax_element(in_values.begin(), in_values.end()); \
+    T in_min = *result.first;                                              \
+    T in_max = *result.second;                                             \
+    const T group_min = reinterpret_cast<const T*>(min_bytes)[0];          \
+    const T group_max = reinterpret_cast<const T*>(max_bytes)[0];          \
+    if (in_max < group_min || in_min > group_max) {                        \
+        return true;                                                       \
     }
 
+#define PARQUET_HEAD 4
+
 namespace doris {
 
-RowGroupReader::RowGroupReader(const std::vector<ExprContext*>& conjunct_ctxs,
+RowGroupReader::RowGroupReader(int64_t range_start_offset, int64_t range_size,
+                               const std::vector<ExprContext*>& conjunct_ctxs,
                                std::shared_ptr<parquet::FileMetaData>& file_metadata,
                                ParquetReaderWrap* parent)
-        : _conjunct_ctxs(conjunct_ctxs), _file_metadata(file_metadata), _parent(parent) {}
+        : _range_start_offset(range_start_offset),
+          _range_size(range_size),
+          _conjunct_ctxs(conjunct_ctxs),
+          _file_metadata(file_metadata),
+          _parent(parent) {}
 
 RowGroupReader::~RowGroupReader() {
     _slot_conjuncts.clear();
@@ -89,20 +96,67 @@ RowGroupReader::~RowGroupReader() {
 
 Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc,
                                           const std::map<std::string, int>& map_column,
-                                          const std::vector<int>& include_column_ids) {
+                                          const std::vector<int>& include_column_ids,
+                                          int64_t file_size) {
+    int total_group = _file_metadata->num_row_groups();
+    // It will not filter if head_group_offset equals tail_group_offset
+    int64_t head_group_offset = _range_start_offset;
+    int64_t tail_group_offset = _range_start_offset;
+    int64_t range_end_offset = _range_start_offset + _range_size;
+    if (_range_size > 0 && file_size > 0) {
+        // todo: extract to function
+        for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
+            int64_t cur_group_offset = _get_group_offset(row_group_id);
+            // when a whole file is in a split, range_end_offset is the EOF offset
+            if (row_group_id == total_group - 1) {
+                if (cur_group_offset < _range_start_offset) {
+                    head_group_offset = cur_group_offset;
+                }
+                if (range_end_offset >= file_size) {
+                    tail_group_offset = file_size;
+                } else {
+                    tail_group_offset = cur_group_offset;
+                }
+                break;
+            }
+            int64_t next_group_offset = _get_group_offset(row_group_id + 1);
+            if (_range_start_offset >= cur_group_offset &&
+                _range_start_offset < next_group_offset) {
+                // Enter the branch only the fist time to find head group
+                head_group_offset = cur_group_offset;
+            }
+            if (range_end_offset < next_group_offset) {
+                tail_group_offset = cur_group_offset;
+                // find tail, break
+                break;
+            }
+        }
+        if (tail_group_offset < head_group_offset) {
+            tail_group_offset = head_group_offset;
+        }
+    }
+
     std::unordered_set<int> parquet_column_ids(include_column_ids.begin(),
                                                include_column_ids.end());
     _init_conjuncts(tuple_desc, map_column, parquet_column_ids);
-    int total_group = _file_metadata->num_row_groups();
     _parent->statistics()->total_groups = total_group;
     _parent->statistics()->total_rows = _file_metadata->num_rows();
 
-    int32_t filtered_num_row_groups = 0;
-    int64_t filtered_num_rows = 0;
-    int64_t filtered_total_byte_size = 0;
     bool update_statistics = false;
     for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
         auto row_group_meta = _file_metadata->RowGroup(row_group_id);
+        if (_range_size > 0 && file_size > 0) {
+            int64_t start_offset = _get_group_offset(row_group_id);
+            int64_t end_offset = row_group_id == total_group - 1
+                                         ? file_size
+                                         : _get_group_offset(row_group_id + 1);
+            if (start_offset >= tail_group_offset || end_offset <= head_group_offset) {
+                _filter_group.emplace(row_group_id);
+                VLOG_DEBUG << "Filter extra row group id: " << row_group_id;
+                continue;
+            }
+        }
+        // if head_read_offset <= start_offset < end_offset <= tail_read_offset
         for (SlotId slot_id = 0; slot_id < tuple_desc->slots().size(); slot_id++) {
             const std::string& col_name = tuple_desc->slots()[slot_id]->col_name();
             auto col_iter = map_column.find(col_name);
@@ -129,26 +183,36 @@ Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc,
             bool group_need_filter = _determine_filter_row_group(slot_iter->second, min, max);
             if (group_need_filter) {
                 update_statistics = true;
-                filtered_num_row_groups++;
-                filtered_num_rows += row_group_meta->num_rows();
-                filtered_total_byte_size += row_group_meta->total_byte_size();
+                _add_filter_group(row_group_id, row_group_meta);
                 VLOG_DEBUG << "Filter row group id: " << row_group_id;
-                _filter_group.emplace(row_group_id);
                 break;
             }
         }
     }
+
     if (update_statistics) {
-        _parent->statistics()->filtered_row_groups = filtered_num_row_groups;
-        _parent->statistics()->filtered_rows = filtered_num_rows;
-        _parent->statistics()->filtered_total_bytes = filtered_total_byte_size;
+        _parent->statistics()->filtered_row_groups = _filtered_num_row_groups;
+        _parent->statistics()->filtered_rows = _filtered_num_rows;
+        _parent->statistics()->filtered_total_bytes = _filtered_total_byte_size;
         VLOG_DEBUG << "Parquet file: " << _file_metadata->schema()->name()
                    << ", Num of read row group: " << total_group
-                   << ", and num of skip row group: " << filtered_num_row_groups;
+                   << ", and num of skip row group: " << _filtered_num_row_groups;
     }
     return Status::OK();
 }
 
+int64_t RowGroupReader::_get_group_offset(int row_group_id) {
+    return _file_metadata->RowGroup(row_group_id)->ColumnChunk(0)->file_offset() - PARQUET_HEAD;
+}
+
+void RowGroupReader::_add_filter_group(int row_group_id,
+                                       std::unique_ptr<parquet::RowGroupMetaData>& row_group_meta) {
+    _filtered_num_row_groups++;
+    _filtered_num_rows += row_group_meta->num_rows();
+    _filtered_total_byte_size += row_group_meta->total_byte_size();
+    _filter_group.emplace(row_group_id);
+}
+
 void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
                                      const std::map<std::string, int>& map_column,
                                      const std::unordered_set<int>& include_column_ids) {
@@ -292,15 +356,15 @@ bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*
     case TYPE_DATETIME: {
         std::vector<const char*> in_values;
         for (auto val : in_pred_values) {
-            const char* value = ((std::string*)val)->c_str();
+            const char* value = ((std::string*)val)->data();
             in_values.emplace_back(value);
         }
         if (in_values.empty()) {
             return false;
         }
-        std::sort(in_values.begin(), in_values.end());
-        const char* in_min = in_values.front();
-        const char* in_max = in_values.back();
+        auto result = std::minmax_element(in_values.begin(), in_values.end());
+        const char* in_min = *result.first;
+        const char* in_max = *result.second;
         if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) {
             return true;
         }
@@ -350,7 +414,7 @@ bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const ch
     case TYPE_CHAR:
     case TYPE_DATE:
     case TYPE_DATETIME: {
-        const char* conjunct_value = ((std::string*)value)->c_str();
+        const char* conjunct_value = ((std::string*)value)->data();
         if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) {
             return true;
         }
@@ -400,7 +464,7 @@ bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const ch
     case TYPE_DATE:
     case TYPE_DATETIME: {
         //            case TYPE_TIME:
-        const char* conjunct_value = ((std::string*)value)->c_str();
+        const char* conjunct_value = ((std::string*)value)->data();
         if (strcmp(max_bytes, conjunct_value) <= 0) {
             return true;
         }
@@ -450,7 +514,7 @@ bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const ch
     case TYPE_DATE:
     case TYPE_DATETIME: {
         //            case TYPE_TIME:
-        const char* conjunct_value = ((std::string*)value)->c_str();
+        const char* conjunct_value = ((std::string*)value)->data();
         if (strcmp(max_bytes, conjunct_value) < 0) {
             return true;
         }
@@ -500,7 +564,7 @@ bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const ch
     case TYPE_DATE:
     case TYPE_DATETIME: {
         //            case TYPE_TIME:
-        const char* conjunct_value = ((std::string*)value)->c_str();
+        const char* conjunct_value = ((std::string*)value)->data();
         if (strcmp(min_bytes, conjunct_value) >= 0) {
             return true;
         }
@@ -550,7 +614,7 @@ bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const ch
     case TYPE_DATE:
     case TYPE_DATETIME: {
         //            case TYPE_TIME:
-        const char* conjunct_value = ((std::string*)value)->c_str();
+        const char* conjunct_value = ((std::string*)value)->data();
         if (strcmp(min_bytes, conjunct_value) > 0) {
             return true;
         }
diff --git a/be/src/exec/arrow/parquet_row_group_reader.h b/be/src/exec/arrow/parquet_row_group_reader.h
index 98a6b9ff95..6cd5724828 100644
--- a/be/src/exec/arrow/parquet_row_group_reader.h
+++ b/be/src/exec/arrow/parquet_row_group_reader.h
@@ -37,18 +37,24 @@ class ParquetReaderWrap;
 
 class RowGroupReader {
 public:
-    RowGroupReader(const std::vector<ExprContext*>& conjunct_ctxs,
+    RowGroupReader(int64_t range_start_offset, int64_t range_size,
+                   const std::vector<ExprContext*>& conjunct_ctxs,
                    std::shared_ptr<parquet::FileMetaData>& file_metadata,
                    ParquetReaderWrap* parent);
     ~RowGroupReader();
 
     Status init_filter_groups(const TupleDescriptor* tuple_desc,
                               const std::map<std::string, int>& map_column,
-                              const std::vector<int>& include_column_ids);
+                              const std::vector<int>& include_column_ids, int64_t file_size);
 
     std::unordered_set<int> filter_groups() { return _filter_group; };
 
 private:
+    void _add_filter_group(int row_group_id,
+                           std::unique_ptr<parquet::RowGroupMetaData>& row_group_meta);
+
+    int64_t _get_group_offset(int row_group_id);
+
     void _init_conjuncts(const TupleDescriptor* tuple_desc,
                          const std::map<std::string, int>& _map_column,
                          const std::unordered_set<int>& include_column_ids);
@@ -78,11 +84,18 @@ private:
     bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes);
 
 private:
+    int64_t _range_start_offset;
+    int64_t _range_size;
+    int64_t _file_size;
     std::map<int, std::vector<ExprContext*>> _slot_conjuncts;
     std::unordered_set<int> _filter_group;
 
     std::vector<ExprContext*> _conjunct_ctxs;
     std::shared_ptr<parquet::FileMetaData> _file_metadata;
     ParquetReaderWrap* _parent;
+
+    int32_t _filtered_num_row_groups = 0;
+    int64_t _filtered_num_rows = 0;
+    int64_t _filtered_total_byte_size = 0;
 };
 } // namespace doris
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 76caf48e21..d7aa4041e2 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -107,7 +107,7 @@ Status ParquetScanner::open_next_reader() {
             num_of_columns_from_file = range.num_of_columns_from_file;
         }
         _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(),
-                                                 num_of_columns_from_file);
+                                                 num_of_columns_from_file, 0, 0);
         auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
         Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
                                                       _state->timezone());
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 79c6037c36..55199335bd 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -66,8 +66,9 @@ Status FileArrowScanner::_open_next_reader() {
 
         int32_t num_of_columns_from_file = _file_slot_descs.size();
 
-        _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
-                                             num_of_columns_from_file);
+        _cur_file_reader =
+                _new_arrow_reader(file_reader.release(), _state->batch_size(),
+                                  num_of_columns_from_file, range.start_offset, range.size);
 
         auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
         Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs,
@@ -217,8 +218,11 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* pr
 }
 
 ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                                        int32_t num_of_columns_from_file) {
-    return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+                                                        int32_t num_of_columns_from_file,
+                                                        int64_t range_start_offset,
+                                                        int64_t range_size) {
+    return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file,
+                                 range_start_offset, range_size);
 }
 
 void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) {
@@ -237,7 +241,9 @@ VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
         : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {}
 
 ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                                    int32_t num_of_columns_from_file) {
+                                                    int32_t num_of_columns_from_file,
+                                                    int64_t range_start_offset,
+                                                    int64_t range_size) {
     return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file);
 }
 
diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h
index 8172c9b83c..3f7537c134 100644
--- a/be/src/vec/exec/file_arrow_scanner.h
+++ b/be/src/vec/exec/file_arrow_scanner.h
@@ -53,7 +53,8 @@ public:
 
 protected:
     virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                               int32_t num_of_columns_from_file) = 0;
+                                               int32_t num_of_columns_from_file,
+                                               int64_t range_start_offset, int64_t range_size) = 0;
     virtual void _update_profile(std::shared_ptr<Statistics>& statistics) {}
 
 private:
@@ -82,7 +83,8 @@ public:
 
 protected:
     ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                       int32_t num_of_columns_from_file) override;
+                                       int32_t num_of_columns_from_file, int64_t range_start_offset,
+                                       int64_t range_size) override;
 
     void _init_profiles(RuntimeProfile* profile) override;
     void _update_profile(std::shared_ptr<Statistics>& statistics) override;
@@ -105,7 +107,8 @@ public:
 
 protected:
     ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                       int32_t num_of_columns_from_file) override;
+                                       int32_t num_of_columns_from_file, int64_t range_start_offset,
+                                       int64_t range_size) override;
     void _init_profiles(RuntimeProfile* profile) override {};
 };
 
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index 3244de5b90..200e467810 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -76,8 +76,9 @@ Status VArrowScanner::_open_next_reader() {
         if (range.__isset.num_of_columns_from_file) {
             num_of_columns_from_file = range.num_of_columns_from_file;
         }
-        _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
-                                             num_of_columns_from_file);
+        _cur_file_reader =
+                _new_arrow_reader(file_reader.release(), _state->batch_size(),
+                                  num_of_columns_from_file, range.start_offset, range.size);
         auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
         Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
                                                       _state->timezone());
diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h
index 109ce53177..7eff7ab329 100644
--- a/be/src/vec/exec/varrow_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -64,7 +64,8 @@ public:
 
 protected:
     virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                               int32_t num_of_columns_from_file) = 0;
+                                               int32_t num_of_columns_from_file,
+                                               int64_t range_start_offset, int64_t range_size) = 0;
 
 private:
     // Read next buffer from reader
diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp
index ca5c7c2aef..9f71d01cfb 100644
--- a/be/src/vec/exec/vorc_scanner.cpp
+++ b/be/src/vec/exec/vorc_scanner.cpp
@@ -30,7 +30,8 @@ VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile,
                         counter) {}
 
 ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                                int32_t num_of_columns_from_file) {
+                                                int32_t num_of_columns_from_file,
+                                                int64_t range_start_offset, int64_t range_size) {
     return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file);
 }
 
diff --git a/be/src/vec/exec/vorc_scanner.h b/be/src/vec/exec/vorc_scanner.h
index 12510e9731..b7bd1fdf67 100644
--- a/be/src/vec/exec/vorc_scanner.h
+++ b/be/src/vec/exec/vorc_scanner.h
@@ -47,7 +47,8 @@ public:
 
 protected:
     ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                       int32_t num_of_columns_from_file) override;
+                                       int32_t num_of_columns_from_file, int64_t range_start_offset,
+                                       int64_t range_size) override;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp
index cb59ae60bc..f4d74a6207 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -31,8 +31,11 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
                         counter) {}
 
 ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                                    int32_t num_of_columns_from_file) {
-    return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+                                                    int32_t num_of_columns_from_file,
+                                                    int64_t range_start_offset,
+                                                    int64_t range_size) {
+    return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file,
+                                 range_start_offset, range_size);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h
index 367e2e7472..d8cf597dbe 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -48,7 +48,8 @@ public:
 
 protected:
     ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
-                                       int32_t num_of_columns_from_file) override;
+                                       int32_t num_of_columns_from_file, int64_t range_start_offset,
+                                       int64_t range_size) override;
 };
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org