You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/18 12:51:12 UTC
[doris] branch master updated: [feature](multi-catalog) read parquet file by start/offset (#10843)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8a366c9ba2 [feature](multi-catalog) read parquet file by start/offset (#10843)
8a366c9ba2 is described below
commit 8a366c9ba2fc56c523708b3ced9e65e69f17f9ae
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Mon Jul 18 20:51:08 2022 +0800
[feature](multi-catalog) read parquet file by start/offset (#10843)
To avoid reading the repeat row group, we should align offsets
---
be/src/common/config.h | 2 +-
be/src/exec/arrow/parquet_reader.cpp | 15 ++-
be/src/exec/arrow/parquet_reader.h | 6 +-
be/src/exec/arrow/parquet_row_group_reader.cpp | 144 ++++++++++++++++++-------
be/src/exec/arrow/parquet_row_group_reader.h | 17 ++-
be/src/exec/parquet_scanner.cpp | 2 +-
be/src/vec/exec/file_arrow_scanner.cpp | 16 ++-
be/src/vec/exec/file_arrow_scanner.h | 9 +-
be/src/vec/exec/varrow_scanner.cpp | 5 +-
be/src/vec/exec/varrow_scanner.h | 3 +-
be/src/vec/exec/vorc_scanner.cpp | 3 +-
be/src/vec/exec/vorc_scanner.h | 3 +-
be/src/vec/exec/vparquet_scanner.cpp | 7 +-
be/src/vec/exec/vparquet_scanner.h | 3 +-
14 files changed, 169 insertions(+), 66 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ff4184eb62..f0fb39800e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -769,7 +769,7 @@ CONF_Int32(object_pool_buffer_size, "100");
// ParquetReaderWrap prefetch buffer size
CONF_Int32(parquet_reader_max_buffer_size, "50");
-CONF_Bool(parquet_predicate_push_down, "false");
+CONF_Bool(parquet_predicate_push_down, "true");
// When the rows number reached this limit, will check the filter rate the of bloomfilter
// if it is lower than a specific threshold, the predicate will be disabled.
diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
index c14022bc53..c0f45f52af 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -37,11 +37,14 @@ namespace doris {
// Broker
ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file)
+ int32_t num_of_columns_from_file, int64_t range_start_offset,
+ int64_t range_size)
: ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
_rows_of_group(0),
_current_line_of_group(0),
- _current_line_of_batch(0) {}
+ _current_line_of_batch(0),
+ _range_start_offset(range_start_offset),
+ _range_size(range_size) {}
ParquetReaderWrap::~ParquetReaderWrap() {
_closed = true;
@@ -101,8 +104,12 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
if (config::parquet_predicate_push_down) {
- _row_group_reader.reset(new RowGroupReader(conjunct_ctxs, _file_metadata, this));
- _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids);
+ int64_t file_size = 0;
+ size(&file_size);
+ _row_group_reader.reset(new RowGroupReader(_range_start_offset, _range_size,
+ conjunct_ctxs, _file_metadata, this));
+ _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids,
+ file_size);
}
_thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
return Status::OK();
diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h
index 6aaab5d24d..3bf4cf4814 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -62,8 +62,8 @@ class RowGroupReader;
class ParquetReaderWrap final : public ArrowReaderWrap {
public:
// batch_size is not use here
- ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file);
+ ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file,
+ int64_t range_start_offset, int64_t range_size);
~ParquetReaderWrap() override;
// Read
@@ -100,6 +100,8 @@ private:
int _current_line_of_group;
int _current_line_of_batch;
std::string _timezone;
+ int64_t _range_start_offset;
+ int64_t _range_size;
private:
std::atomic<bool> _closed = false;
diff --git a/be/src/exec/arrow/parquet_row_group_reader.cpp b/be/src/exec/arrow/parquet_row_group_reader.cpp
index bbc607f02b..517309db76 100644
--- a/be/src/exec/arrow/parquet_row_group_reader.cpp
+++ b/be/src/exec/arrow/parquet_row_group_reader.cpp
@@ -57,30 +57,37 @@
return true; \
}
-#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
- std::vector<T> in_values; \
- for (auto val : in_pred_values) { \
- T value = reinterpret_cast<T*>(val)[0]; \
- in_values.emplace_back(value); \
- } \
- if (in_values.empty()) { \
- return false; \
- } \
- std::sort(in_values.begin(), in_values.end()); \
- T in_min = in_values.front(); \
- T in_max = in_values.back(); \
- const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \
- const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \
- if (in_max < group_min || in_min > group_max) { \
- return true; \
+#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
+ std::vector<T> in_values; \
+ for (auto val : in_pred_values) { \
+ T value = reinterpret_cast<T*>(val)[0]; \
+ in_values.emplace_back(value); \
+ } \
+ if (in_values.empty()) { \
+ return false; \
+ } \
+ auto result = std::minmax_element(in_values.begin(), in_values.end()); \
+ T in_min = *result.first; \
+ T in_max = *result.second; \
+ const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \
+ const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \
+ if (in_max < group_min || in_min > group_max) { \
+ return true; \
}
+#define PARQUET_HEAD 4
+
namespace doris {
-RowGroupReader::RowGroupReader(const std::vector<ExprContext*>& conjunct_ctxs,
+RowGroupReader::RowGroupReader(int64_t range_start_offset, int64_t range_size,
+ const std::vector<ExprContext*>& conjunct_ctxs,
std::shared_ptr<parquet::FileMetaData>& file_metadata,
ParquetReaderWrap* parent)
- : _conjunct_ctxs(conjunct_ctxs), _file_metadata(file_metadata), _parent(parent) {}
+ : _range_start_offset(range_start_offset),
+ _range_size(range_size),
+ _conjunct_ctxs(conjunct_ctxs),
+ _file_metadata(file_metadata),
+ _parent(parent) {}
RowGroupReader::~RowGroupReader() {
_slot_conjuncts.clear();
@@ -89,20 +96,67 @@ RowGroupReader::~RowGroupReader() {
Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc,
const std::map<std::string, int>& map_column,
- const std::vector<int>& include_column_ids) {
+ const std::vector<int>& include_column_ids,
+ int64_t file_size) {
+ int total_group = _file_metadata->num_row_groups();
+ // It will not filter if head_group_offset equals tail_group_offset
+ int64_t head_group_offset = _range_start_offset;
+ int64_t tail_group_offset = _range_start_offset;
+ int64_t range_end_offset = _range_start_offset + _range_size;
+ if (_range_size > 0 && file_size > 0) {
+ // todo: extract to function
+ for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
+ int64_t cur_group_offset = _get_group_offset(row_group_id);
+ // when a whole file is in a split, range_end_offset is the EOF offset
+ if (row_group_id == total_group - 1) {
+ if (cur_group_offset < _range_start_offset) {
+ head_group_offset = cur_group_offset;
+ }
+ if (range_end_offset >= file_size) {
+ tail_group_offset = file_size;
+ } else {
+ tail_group_offset = cur_group_offset;
+ }
+ break;
+ }
+ int64_t next_group_offset = _get_group_offset(row_group_id + 1);
+ if (_range_start_offset >= cur_group_offset &&
+ _range_start_offset < next_group_offset) {
+ // Enter the branch only the fist time to find head group
+ head_group_offset = cur_group_offset;
+ }
+ if (range_end_offset < next_group_offset) {
+ tail_group_offset = cur_group_offset;
+ // find tail, break
+ break;
+ }
+ }
+ if (tail_group_offset < head_group_offset) {
+ tail_group_offset = head_group_offset;
+ }
+ }
+
std::unordered_set<int> parquet_column_ids(include_column_ids.begin(),
include_column_ids.end());
_init_conjuncts(tuple_desc, map_column, parquet_column_ids);
- int total_group = _file_metadata->num_row_groups();
_parent->statistics()->total_groups = total_group;
_parent->statistics()->total_rows = _file_metadata->num_rows();
- int32_t filtered_num_row_groups = 0;
- int64_t filtered_num_rows = 0;
- int64_t filtered_total_byte_size = 0;
bool update_statistics = false;
for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
auto row_group_meta = _file_metadata->RowGroup(row_group_id);
+ if (_range_size > 0 && file_size > 0) {
+ int64_t start_offset = _get_group_offset(row_group_id);
+ int64_t end_offset = row_group_id == total_group - 1
+ ? file_size
+ : _get_group_offset(row_group_id + 1);
+ if (start_offset >= tail_group_offset || end_offset <= head_group_offset) {
+ _filter_group.emplace(row_group_id);
+ VLOG_DEBUG << "Filter extra row group id: " << row_group_id;
+ continue;
+ }
+ }
+ // if head_read_offset <= start_offset < end_offset <= tail_read_offset
for (SlotId slot_id = 0; slot_id < tuple_desc->slots().size(); slot_id++) {
const std::string& col_name = tuple_desc->slots()[slot_id]->col_name();
auto col_iter = map_column.find(col_name);
@@ -129,26 +183,36 @@ Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc,
bool group_need_filter = _determine_filter_row_group(slot_iter->second, min, max);
if (group_need_filter) {
update_statistics = true;
- filtered_num_row_groups++;
- filtered_num_rows += row_group_meta->num_rows();
- filtered_total_byte_size += row_group_meta->total_byte_size();
+ _add_filter_group(row_group_id, row_group_meta);
VLOG_DEBUG << "Filter row group id: " << row_group_id;
- _filter_group.emplace(row_group_id);
break;
}
}
}
+
if (update_statistics) {
- _parent->statistics()->filtered_row_groups = filtered_num_row_groups;
- _parent->statistics()->filtered_rows = filtered_num_rows;
- _parent->statistics()->filtered_total_bytes = filtered_total_byte_size;
+ _parent->statistics()->filtered_row_groups = _filtered_num_row_groups;
+ _parent->statistics()->filtered_rows = _filtered_num_rows;
+ _parent->statistics()->filtered_total_bytes = _filtered_total_byte_size;
VLOG_DEBUG << "Parquet file: " << _file_metadata->schema()->name()
<< ", Num of read row group: " << total_group
- << ", and num of skip row group: " << filtered_num_row_groups;
+ << ", and num of skip row group: " << _filtered_num_row_groups;
}
return Status::OK();
}
+int64_t RowGroupReader::_get_group_offset(int row_group_id) {
+ return _file_metadata->RowGroup(row_group_id)->ColumnChunk(0)->file_offset() - PARQUET_HEAD;
+}
+
+void RowGroupReader::_add_filter_group(int row_group_id,
+ std::unique_ptr<parquet::RowGroupMetaData>& row_group_meta) {
+ _filtered_num_row_groups++;
+ _filtered_num_rows += row_group_meta->num_rows();
+ _filtered_total_byte_size += row_group_meta->total_byte_size();
+ _filter_group.emplace(row_group_id);
+}
+
void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
const std::map<std::string, int>& map_column,
const std::unordered_set<int>& include_column_ids) {
@@ -292,15 +356,15 @@ bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*
case TYPE_DATETIME: {
std::vector<const char*> in_values;
for (auto val : in_pred_values) {
- const char* value = ((std::string*)val)->c_str();
+ const char* value = ((std::string*)val)->data();
in_values.emplace_back(value);
}
if (in_values.empty()) {
return false;
}
- std::sort(in_values.begin(), in_values.end());
- const char* in_min = in_values.front();
- const char* in_max = in_values.back();
+ auto result = std::minmax_element(in_values.begin(), in_values.end());
+ const char* in_min = *result.first;
+ const char* in_max = *result.second;
if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) {
return true;
}
@@ -350,7 +414,7 @@ bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const ch
case TYPE_CHAR:
case TYPE_DATE:
case TYPE_DATETIME: {
- const char* conjunct_value = ((std::string*)value)->c_str();
+ const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) {
return true;
}
@@ -400,7 +464,7 @@ bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const ch
case TYPE_DATE:
case TYPE_DATETIME: {
// case TYPE_TIME:
- const char* conjunct_value = ((std::string*)value)->c_str();
+ const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(max_bytes, conjunct_value) <= 0) {
return true;
}
@@ -450,7 +514,7 @@ bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const ch
case TYPE_DATE:
case TYPE_DATETIME: {
// case TYPE_TIME:
- const char* conjunct_value = ((std::string*)value)->c_str();
+ const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(max_bytes, conjunct_value) < 0) {
return true;
}
@@ -500,7 +564,7 @@ bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const ch
case TYPE_DATE:
case TYPE_DATETIME: {
// case TYPE_TIME:
- const char* conjunct_value = ((std::string*)value)->c_str();
+ const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(min_bytes, conjunct_value) >= 0) {
return true;
}
@@ -550,7 +614,7 @@ bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const ch
case TYPE_DATE:
case TYPE_DATETIME: {
// case TYPE_TIME:
- const char* conjunct_value = ((std::string*)value)->c_str();
+ const char* conjunct_value = ((std::string*)value)->data();
if (strcmp(min_bytes, conjunct_value) > 0) {
return true;
}
diff --git a/be/src/exec/arrow/parquet_row_group_reader.h b/be/src/exec/arrow/parquet_row_group_reader.h
index 98a6b9ff95..6cd5724828 100644
--- a/be/src/exec/arrow/parquet_row_group_reader.h
+++ b/be/src/exec/arrow/parquet_row_group_reader.h
@@ -37,18 +37,24 @@ class ParquetReaderWrap;
class RowGroupReader {
public:
- RowGroupReader(const std::vector<ExprContext*>& conjunct_ctxs,
+ RowGroupReader(int64_t range_start_offset, int64_t range_size,
+ const std::vector<ExprContext*>& conjunct_ctxs,
std::shared_ptr<parquet::FileMetaData>& file_metadata,
ParquetReaderWrap* parent);
~RowGroupReader();
Status init_filter_groups(const TupleDescriptor* tuple_desc,
const std::map<std::string, int>& map_column,
- const std::vector<int>& include_column_ids);
+ const std::vector<int>& include_column_ids, int64_t file_size);
std::unordered_set<int> filter_groups() { return _filter_group; };
private:
+ void _add_filter_group(int row_group_id,
+ std::unique_ptr<parquet::RowGroupMetaData>& row_group_meta);
+
+ int64_t _get_group_offset(int row_group_id);
+
void _init_conjuncts(const TupleDescriptor* tuple_desc,
const std::map<std::string, int>& _map_column,
const std::unordered_set<int>& include_column_ids);
@@ -78,11 +84,18 @@ private:
bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes);
private:
+ int64_t _range_start_offset;
+ int64_t _range_size;
+ int64_t _file_size;
std::map<int, std::vector<ExprContext*>> _slot_conjuncts;
std::unordered_set<int> _filter_group;
std::vector<ExprContext*> _conjunct_ctxs;
std::shared_ptr<parquet::FileMetaData> _file_metadata;
ParquetReaderWrap* _parent;
+
+ int32_t _filtered_num_row_groups = 0;
+ int64_t _filtered_num_rows = 0;
+ int64_t _filtered_total_byte_size = 0;
};
} // namespace doris
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 76caf48e21..d7aa4041e2 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -107,7 +107,7 @@ Status ParquetScanner::open_next_reader() {
num_of_columns_from_file = range.num_of_columns_from_file;
}
_cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(),
- num_of_columns_from_file);
+ num_of_columns_from_file, 0, 0);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
_state->timezone());
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 79c6037c36..55199335bd 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -66,8 +66,9 @@ Status FileArrowScanner::_open_next_reader() {
int32_t num_of_columns_from_file = _file_slot_descs.size();
- _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
- num_of_columns_from_file);
+ _cur_file_reader =
+ _new_arrow_reader(file_reader.release(), _state->batch_size(),
+ num_of_columns_from_file, range.start_offset, range.size);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs,
@@ -217,8 +218,11 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* pr
}
ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) {
- return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+ int32_t num_of_columns_from_file,
+ int64_t range_start_offset,
+ int64_t range_size) {
+ return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file,
+ range_start_offset, range_size);
}
void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) {
@@ -237,7 +241,9 @@ VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
: FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {}
ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) {
+ int32_t num_of_columns_from_file,
+ int64_t range_start_offset,
+ int64_t range_size) {
return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file);
}
diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h
index 8172c9b83c..3f7537c134 100644
--- a/be/src/vec/exec/file_arrow_scanner.h
+++ b/be/src/vec/exec/file_arrow_scanner.h
@@ -53,7 +53,8 @@ public:
protected:
virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) = 0;
+ int32_t num_of_columns_from_file,
+ int64_t range_start_offset, int64_t range_size) = 0;
virtual void _update_profile(std::shared_ptr<Statistics>& statistics) {}
private:
@@ -82,7 +83,8 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) override;
+ int32_t num_of_columns_from_file, int64_t range_start_offset,
+ int64_t range_size) override;
void _init_profiles(RuntimeProfile* profile) override;
void _update_profile(std::shared_ptr<Statistics>& statistics) override;
@@ -105,7 +107,8 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) override;
+ int32_t num_of_columns_from_file, int64_t range_start_offset,
+ int64_t range_size) override;
void _init_profiles(RuntimeProfile* profile) override {};
};
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index 3244de5b90..200e467810 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -76,8 +76,9 @@ Status VArrowScanner::_open_next_reader() {
if (range.__isset.num_of_columns_from_file) {
num_of_columns_from_file = range.num_of_columns_from_file;
}
- _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
- num_of_columns_from_file);
+ _cur_file_reader =
+ _new_arrow_reader(file_reader.release(), _state->batch_size(),
+ num_of_columns_from_file, range.start_offset, range.size);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
_state->timezone());
diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h
index 109ce53177..7eff7ab329 100644
--- a/be/src/vec/exec/varrow_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -64,7 +64,8 @@ public:
protected:
virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) = 0;
+ int32_t num_of_columns_from_file,
+ int64_t range_start_offset, int64_t range_size) = 0;
private:
// Read next buffer from reader
diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp
index ca5c7c2aef..9f71d01cfb 100644
--- a/be/src/vec/exec/vorc_scanner.cpp
+++ b/be/src/vec/exec/vorc_scanner.cpp
@@ -30,7 +30,8 @@ VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile,
counter) {}
ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) {
+ int32_t num_of_columns_from_file,
+ int64_t range_start_offset, int64_t range_size) {
return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file);
}
diff --git a/be/src/vec/exec/vorc_scanner.h b/be/src/vec/exec/vorc_scanner.h
index 12510e9731..b7bd1fdf67 100644
--- a/be/src/vec/exec/vorc_scanner.h
+++ b/be/src/vec/exec/vorc_scanner.h
@@ -47,7 +47,8 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) override;
+ int32_t num_of_columns_from_file, int64_t range_start_offset,
+ int64_t range_size) override;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp
index cb59ae60bc..f4d74a6207 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -31,8 +31,11 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
counter) {}
ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) {
- return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+ int32_t num_of_columns_from_file,
+ int64_t range_start_offset,
+ int64_t range_size) {
+ return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file,
+ range_start_offset, range_size);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h
index 367e2e7472..d8cf597dbe 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -48,7 +48,8 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file) override;
+ int32_t num_of_columns_from_file, int64_t range_start_offset,
+ int64_t range_size) override;
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org