You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/08 15:11:33 UTC
[doris] branch master updated: [feature-wip] support parquet predicate push down (#10512)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c358a43f35 [feature-wip] support parquet predicate push down (#10512)
c358a43f35 is described below
commit c358a43f3571059c9b641f3851ec84d8390fa95f
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Fri Jul 8 23:11:25 2022 +0800
[feature-wip] support parquet predicate push down (#10512)
---
be/src/common/config.h | 1 +
be/src/exec/CMakeLists.txt | 1 +
be/src/exec/arrow/arrow_reader.cpp | 1 +
be/src/exec/arrow/arrow_reader.h | 15 +-
be/src/exec/arrow/orc_reader.cpp | 4 +-
be/src/exec/arrow/orc_reader.h | 4 +-
be/src/exec/arrow/parquet_reader.cpp | 93 ++--
be/src/exec/arrow/parquet_reader.h | 14 +-
be/src/exec/arrow/parquet_row_group_reader.cpp | 568 +++++++++++++++++++++++++
be/src/exec/arrow/parquet_row_group_reader.h | 89 ++++
be/src/exec/base_scanner.cpp | 6 +
be/src/exec/base_scanner.h | 8 +
be/src/exec/broker_scan_node.cpp | 1 +
be/src/exec/parquet_scanner.cpp | 11 +-
be/src/exprs/expr_context.h | 1 +
be/src/vec/exec/file_arrow_scanner.cpp | 7 +-
be/src/vec/exec/file_scan_node.cpp | 1 +
be/src/vec/exec/file_scanner.cpp | 6 +
be/src/vec/exec/file_scanner.h | 8 +
be/src/vec/exec/varrow_scanner.cpp | 22 +-
be/src/vec/exec/varrow_scanner.h | 10 +
be/src/vec/exec/vbroker_scan_node.cpp | 1 +
be/src/vec/exec/vparquet_scanner.cpp | 2 +-
be/test/exec/parquet_scanner_test.cpp | 64 ++-
24 files changed, 889 insertions(+), 49 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2f5ee5cb71..ca0c66654d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -752,6 +752,7 @@ CONF_Int32(object_pool_buffer_size, "100");
// ParquetReaderWrap prefetch buffer size
CONF_Int32(parquet_reader_max_buffer_size, "50");
+CONF_Bool(parquet_predicate_push_down, "false");
// When the rows number reached this limit, will check the filter rate the of bloomfilter
// if it is lower than a specific threshold, the predicate will be disabled.
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index cc94f5fe9a..9a25789dd9 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -25,6 +25,7 @@ set(EXEC_FILES
arrow/arrow_reader.cpp
arrow/orc_reader.cpp
arrow/parquet_reader.cpp
+ arrow/parquet_row_group_reader.cpp
analytic_eval_node.cpp
blocking_join_node.cpp
broker_scan_node.cpp
diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp
index 94289a990b..9d20697148 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -43,6 +43,7 @@ ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
_rb_reader = nullptr;
_total_groups = 0;
_current_group = 0;
+ _statistics = std::make_shared<Statistics>();
}
ArrowReaderWrap::~ArrowReaderWrap() {
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 5a1e00f022..ad888061be 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -33,6 +33,7 @@
#include <string>
#include "common/status.h"
+#include "exprs/expr_context.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
@@ -48,6 +49,14 @@ class SlotDescriptor;
class MemPool;
class FileReader;
+struct Statistics {
+ int32_t filtered_row_groups = 0;
+ int32_t total_groups = 0;
+ int64_t filtered_rows = 0;
+ int64_t total_rows = 0;
+ int64_t filtered_total_bytes = 0;
+};
+
class ArrowFile : public arrow::io::RandomAccessFile {
public:
ArrowFile(FileReader* file);
@@ -72,7 +81,9 @@ public:
ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
virtual ~ArrowReaderWrap();
- virtual Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ virtual Status init_reader(const TupleDescriptor* tuple_desc,
+ const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) = 0;
// for row
virtual Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
@@ -81,6 +92,7 @@ public:
}
// for vec
virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) = 0;
+ std::shared_ptr<Statistics>& statistics() { return _statistics; }
void close();
virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); }
@@ -96,6 +108,7 @@ protected:
int _current_group; // current group(stripe)
std::map<std::string, int> _map_column; // column-name <---> column-index
std::vector<int> _include_column_ids; // columns that need to get from file
+ std::shared_ptr<Statistics> _statistics;
};
} // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 536b852ad6..f94d24610c 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -34,7 +34,9 @@ ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
_cur_file_eof = false;
}
-Status ORCReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
+ const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) {
// Open ORC file reader
auto maybe_reader =
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index dd7853efe7..3b46cc6073 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -35,7 +35,9 @@ public:
ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
~ORCReaderWrap() override = default;
- Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ Status init_reader(const TupleDescriptor* tuple_desc,
+ const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) override;
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
index 6885ebb781..f24768369e 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -36,13 +36,13 @@
namespace doris {
// Broker
-
-ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
- int32_t num_of_columns_from_file)
+ParquetReaderWrap::ParquetReaderWrap(RuntimeProfile* profile, FileReader* file_reader,
+ int64_t batch_size, int32_t num_of_columns_from_file)
: ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
_rows_of_group(0),
_current_line_of_group(0),
- _current_line_of_batch(0) {}
+ _current_line_of_batch(0),
+ _profile(profile) {}
ParquetReaderWrap::~ParquetReaderWrap() {
_closed = true;
@@ -52,7 +52,9 @@ ParquetReaderWrap::~ParquetReaderWrap() {
}
}
-Status ParquetReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
+ const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) {
try {
parquet::ArrowReaderProperties arrow_reader_properties =
@@ -99,22 +101,12 @@ Status ParquetReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_
_timezone = timezone;
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
-
- _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
-
- // read batch
- RETURN_IF_ERROR(read_next_batch());
- _current_line_of_batch = 0;
- //save column type
- std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
- for (int i = 0; i < _include_column_ids.size(); i++) {
- std::shared_ptr<arrow::Field> field = field_schema->field(i);
- if (!field) {
- LOG(WARNING) << "Get field schema failed. Column order:" << i;
- return Status::InternalError(_status.ToString());
- }
- _parquet_column_type.emplace_back(field->type()->id());
+ if (config::parquet_predicate_push_down) {
+ _row_group_reader.reset(
+ new RowGroupReader(_profile, conjunct_ctxs, _file_metadata, this));
+ _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids);
}
+ _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
return Status::OK();
} catch (parquet::ParquetException& e) {
std::stringstream str_error;
@@ -188,19 +180,25 @@ Status ParquetReaderWrap::read_record_batch(bool* eof) {
}
Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
- if (_batch->num_rows() == 0 || _current_line_of_batch != 0 || _current_line_of_group != 0) {
- RETURN_IF_ERROR(read_record_batch(eof));
+ std::unique_lock<std::mutex> lock(_mtx);
+ while (!_closed && _queue.empty()) {
+ if (_batch_eof) {
+ _include_column_ids.clear();
+ *eof = true;
+ _batch_eof = false;
+ return Status::OK();
+ }
+ _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
}
- *batch = get_batch();
+ if (UNLIKELY(_closed)) {
+ return Status::InternalError(_status.message());
+ }
+ *batch = _queue.front();
+ _queue.pop_front();
+ _queue_writer_cond.notify_one();
return Status::OK();
}
-const std::shared_ptr<arrow::RecordBatch>& ParquetReaderWrap::get_batch() {
- _current_line_of_batch += _batch->num_rows();
- _current_line_of_group += _batch->num_rows();
- return _batch;
-}
-
Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array,
uint8_t* buf, int32_t* wbytes) {
const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
@@ -240,8 +238,32 @@ Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::Timestam
return Status::OK();
}
+Status ParquetReaderWrap::init_parquet_type() {
+ // read batch
+ RETURN_IF_ERROR(read_next_batch());
+ _current_line_of_batch = 0;
+ if (_batch == nullptr) {
+ return Status::OK();
+ }
+ //save column type
+ std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
+ for (int i = 0; i < _include_column_ids.size(); i++) {
+ std::shared_ptr<arrow::Field> field = field_schema->field(i);
+ if (!field) {
+ LOG(WARNING) << "Get field schema failed. Column order:" << i;
+ return Status::InternalError(_status.ToString());
+ }
+ _parquet_column_type.emplace_back(field->type()->id());
+ }
+ return Status::OK();
+}
+
Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
MemPool* mem_pool, bool* eof) {
+ if (_batch == nullptr) {
+ _current_line_of_group += _rows_of_group;
+ return read_record_batch(eof);
+ }
uint8_t tmp_buf[128] = {0};
int32_t wbytes = 0;
const uint8_t* value = nullptr;
@@ -535,8 +557,18 @@ void ParquetReaderWrap::prefetch_batch() {
int total_groups = _total_groups;
while (true) {
if (_closed || current_group >= total_groups) {
+ _batch_eof = true;
+ _queue_reader_cond.notify_one();
return;
}
+ if (config::parquet_predicate_push_down) {
+ auto filter_group_set = _row_group_reader->filter_groups();
+ if (filter_group_set.end() != filter_group_set.find(current_group)) {
+ // find filter group, skip
+ current_group++;
+ continue;
+ }
+ }
_status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader);
if (!_status.ok()) {
_closed = true;
@@ -556,6 +588,9 @@ void ParquetReaderWrap::prefetch_batch() {
Status ParquetReaderWrap::read_next_batch() {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {
+ if (_batch_eof) {
+ return Status::OK();
+ }
_queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
}
diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h
index 870c560fe4..8d9a420f55 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -41,6 +41,7 @@
#include "common/config.h"
#include "common/status.h"
#include "exec/arrow/arrow_reader.h"
+#include "exec/arrow/parquet_row_group_reader.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
@@ -55,12 +56,13 @@ class Tuple;
class SlotDescriptor;
class MemPool;
class FileReader;
+class RowGroupReader;
// Reader of parquet file
class ParquetReaderWrap final : public ArrowReaderWrap {
public:
// batch_size is not use here
- ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
+ ParquetReaderWrap(RuntimeProfile* profile, FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file);
~ParquetReaderWrap() override;
@@ -68,8 +70,11 @@ public:
Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
MemPool* mem_pool, bool* eof) override;
Status size(int64_t* size) override;
- Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ Status init_reader(const TupleDescriptor* tuple_desc,
+ const std::vector<SlotDescriptor*>& tuple_slot_descs,
+ const std::vector<ExprContext*>& conjunct_ctxs,
const std::string& timezone) override;
+ Status init_parquet_type();
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
private:
@@ -77,7 +82,6 @@ private:
int32_t len);
Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc);
Status read_record_batch(bool* eof);
- const std::shared_ptr<arrow::RecordBatch>& get_batch();
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
int32_t* wbtyes);
@@ -95,16 +99,18 @@ private:
int _rows_of_group; // rows in a group.
int _current_line_of_group;
int _current_line_of_batch;
-
+ RuntimeProfile* _profile;
std::string _timezone;
private:
std::atomic<bool> _closed = false;
+ std::atomic<bool> _batch_eof = false;
arrow::Status _status;
std::mutex _mtx;
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
+ std::unique_ptr<doris::RowGroupReader> _row_group_reader;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
std::thread _thread;
};
diff --git a/be/src/exec/arrow/parquet_row_group_reader.cpp b/be/src/exec/arrow/parquet_row_group_reader.cpp
new file mode 100644
index 0000000000..48e382e88c
--- /dev/null
+++ b/be/src/exec/arrow/parquet_row_group_reader.cpp
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/arrow/parquet_row_group_reader.h"
+
+#include <exprs/expr_context.h>
+#include <exprs/in_predicate.h>
+#include <parquet/encoding.h>
+
+#include <cstring>
+
+#define _PLAIN_DECODE(T, value, min_bytes, max_bytes, out_value, out_min, out_max) \
+ const T out_min = reinterpret_cast<const T*>(min_bytes)[0]; \
+ const T out_max = reinterpret_cast<const T*>(max_bytes)[0]; \
+ T out_value = *((T*)value);
+
+#define _PLAIN_DECODE_SINGLE(T, value, bytes, conjunct_value, out) \
+ const T out = reinterpret_cast<const T*>(bytes)[0]; \
+ T conjunct_value = *((T*)value);
+
+#define _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) \
+ if (conjunct_value < min || conjunct_value > max) { \
+ return true; \
+ }
+
+#define _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) \
+ if (max <= conjunct_value) { \
+ return true; \
+ }
+
+#define _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) \
+ if (max < conjunct_value) { \
+ return true; \
+ }
+
+#define _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) \
+ if (min >= conjunct_value) { \
+ return true; \
+ }
+
+#define _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) \
+ if (min > conjunct_value) { \
+ return true; \
+ }
+
+#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
+ std::vector<T> in_values; \
+ for (auto val : in_pred_values) { \
+ T value = reinterpret_cast<T*>(val)[0]; \
+ in_values.emplace_back(value); \
+ } \
+ if (in_values.empty()) { \
+ return false; \
+ } \
+ std::sort(in_values.begin(), in_values.end()); \
+ T in_min = in_values.front(); \
+ T in_max = in_values.back(); \
+ const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \
+ const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \
+ if (in_max < group_min || in_min > group_max) { \
+ return true; \
+ }
+
+namespace doris {
+
+RowGroupReader::RowGroupReader(RuntimeProfile* profile,
+ const std::vector<ExprContext*>& conjunct_ctxs,
+ std::shared_ptr<parquet::FileMetaData>& file_metadata,
+ ParquetReaderWrap* parent)
+ : _conjunct_ctxs(conjunct_ctxs),
+ _file_metadata(file_metadata),
+ _profile(profile),
+ _parent(parent) {}
+
+RowGroupReader::~RowGroupReader() {
+ _slot_conjuncts.clear();
+ _filter_group.clear();
+}
+
+Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc,
+ const std::map<std::string, int>& map_column,
+ const std::vector<int>& include_column_ids) {
+ std::unordered_set<int> parquet_column_ids(include_column_ids.begin(),
+ include_column_ids.end());
+ _init_conjuncts(tuple_desc, map_column, parquet_column_ids);
+ int total_group = _file_metadata->num_row_groups();
+ _parent->statistics()->total_groups = total_group;
+ _parent->statistics()->total_rows = _file_metadata->num_rows();
+
+ int32_t filtered_num_row_groups = 0;
+ int64_t filtered_num_rows = 0;
+ int64_t filtered_total_byte_size = 0;
+ bool update_statistics = false;
+ for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
+ auto row_group_meta = _file_metadata->RowGroup(row_group_id);
+ for (SlotId slot_id = 0; slot_id < tuple_desc->slots().size(); slot_id++) {
+ const std::string& col_name = tuple_desc->slots()[slot_id]->col_name();
+ auto col_iter = map_column.find(col_name);
+ if (col_iter == map_column.end()) {
+ continue;
+ }
+ int parquet_col_id = col_iter->second;
+ if (parquet_column_ids.end() == parquet_column_ids.find(parquet_col_id)) {
+ // Column not exist in parquet file
+ continue;
+ }
+ auto slot_iter = _slot_conjuncts.find(slot_id);
+ if (slot_iter == _slot_conjuncts.end()) {
+ continue;
+ }
+ auto statistic = row_group_meta->ColumnChunk(parquet_col_id)->statistics();
+ if (!statistic->HasMinMax()) {
+ continue;
+ }
+ // Min-max of statistic is plain-encoded value
+ const std::string& min = statistic->EncodeMin();
+ const std::string& max = statistic->EncodeMax();
+
+ bool group_need_filter = _determine_filter_row_group(slot_iter->second, min, max);
+ if (group_need_filter) {
+ update_statistics = true;
+ filtered_num_row_groups++;
+ filtered_num_rows += row_group_meta->num_rows();
+ filtered_total_byte_size += row_group_meta->total_byte_size();
+ VLOG_DEBUG << "Filter row group id: " << row_group_id;
+ _filter_group.emplace(row_group_id);
+ break;
+ }
+ }
+ }
+ if (update_statistics) {
+ _parent->statistics()->filtered_row_groups = filtered_num_row_groups;
+ _parent->statistics()->filtered_rows = filtered_num_rows;
+ _parent->statistics()->filtered_total_bytes = filtered_total_byte_size;
+ VLOG_DEBUG << "Parquet file: " << _file_metadata->schema()->name()
+ << ", Num of read row group: " << total_group
+ << ", and num of skip row group: " << filtered_num_row_groups;
+ }
+ return Status::OK();
+}
+
+void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
+ const std::map<std::string, int>& map_column,
+ const std::unordered_set<int>& include_column_ids) {
+ if (tuple_desc->slots().empty()) {
+ return;
+ }
+ for (int i = 0; i < tuple_desc->slots().size(); i++) {
+ auto col_iter = map_column.find(tuple_desc->slots()[i]->col_name());
+ if (col_iter == map_column.end()) {
+ continue;
+ }
+ int parquet_col_id = col_iter->second;
+ if (include_column_ids.end() == include_column_ids.find(parquet_col_id)) {
+ continue;
+ }
+ for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); conj_idx++) {
+ Expr* conjunct = _conjunct_ctxs[conj_idx]->root();
+ if (conjunct->get_num_children() == 0) {
+ continue;
+ }
+ Expr* raw_slot = conjunct->get_child(0);
+ if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
+ continue;
+ }
+ SlotRef* slot_ref = (SlotRef*)raw_slot;
+ SlotId conjunct_slot_id = slot_ref->slot_id();
+ if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
+ // Get conjuncts by conjunct_slot_id
+ auto iter = _slot_conjuncts.find(conjunct_slot_id);
+ if (_slot_conjuncts.end() == iter) {
+ std::vector<ExprContext*> conjuncts;
+ conjuncts.emplace_back(_conjunct_ctxs[conj_idx]);
+ _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts));
+ } else {
+ std::vector<ExprContext*> conjuncts = iter->second;
+ conjuncts.emplace_back(_conjunct_ctxs[conj_idx]);
+ }
+ }
+ }
+ }
+}
+
+bool RowGroupReader::_determine_filter_row_group(const std::vector<ExprContext*>& conjuncts,
+ const std::string& encoded_min,
+ const std::string& encoded_max) {
+ const char* min_bytes = encoded_min.data();
+ const char* max_bytes = encoded_max.data();
+ bool need_filter = false;
+ for (int i = 0; i < conjuncts.size(); i++) {
+ Expr* conjunct = conjuncts[i]->root();
+ if (TExprNodeType::BINARY_PRED == conjunct->node_type()) {
+ _eval_binary_predicate(conjuncts[i], min_bytes, max_bytes, need_filter);
+ } else if (TExprNodeType::IN_PRED == conjunct->node_type()) {
+ _eval_in_predicate(conjuncts[i], min_bytes, max_bytes, need_filter);
+ }
+ }
+ return need_filter;
+}
+
+void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char* min_bytes,
+ const char* max_bytes, bool& need_filter) {
+ Expr* conjunct = ctx->root();
+ Expr* expr = conjunct->get_child(1);
+ if (expr == nullptr) {
+ return;
+ }
+ // supported conjunct example: slot_ref < 123, slot_ref > func(123), ..
+ auto conjunct_type = expr->type().type;
+ void* conjunct_value = ctx->get_value(expr, nullptr);
+ switch (conjunct->op()) {
+ case TExprOpcode::EQ:
+ need_filter = _eval_eq(conjunct_type, conjunct_value, min_bytes, max_bytes);
+ break;
+ case TExprOpcode::NE:
+ break;
+ case TExprOpcode::GT:
+ need_filter = _eval_gt(conjunct_type, conjunct_value, max_bytes);
+ break;
+ case TExprOpcode::GE:
+ need_filter = _eval_ge(conjunct_type, conjunct_value, max_bytes);
+ break;
+ case TExprOpcode::LT:
+ need_filter = _eval_lt(conjunct_type, conjunct_value, min_bytes);
+ break;
+ case TExprOpcode::LE:
+ need_filter = _eval_le(conjunct_type, conjunct_value, min_bytes);
+ break;
+ default:
+ break;
+ }
+}
+
+void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes,
+ const char* max_bytes, bool& need_filter) {
+ Expr* conjunct = ctx->root();
+ std::vector<void*> in_pred_values;
+ const InPredicate* pred = static_cast<const InPredicate*>(conjunct);
+ HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin();
+ // TODO: process expr: in(func(123),123)
+ while (iter->has_next()) {
+ if (nullptr == iter->get_value()) {
+ return;
+ }
+ in_pred_values.emplace_back(const_cast<void*>(iter->get_value()));
+ iter->next();
+ }
+ auto conjunct_type = conjunct->get_child(1)->type().type;
+ switch (conjunct->op()) {
+ case TExprOpcode::FILTER_IN:
+ need_filter = _eval_in_val(conjunct_type, in_pred_values, min_bytes, max_bytes);
+ break;
+ // case TExprOpcode::FILTER_NOT_IN:
+ default:
+ need_filter = false;
+ }
+}
+
+bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
+ const char* min_bytes, const char* max_bytes) {
+ switch (conjunct_type) {
+ case TYPE_TINYINT: {
+ _FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes)
+ break;
+ }
+ case TYPE_SMALLINT: {
+ _FILTER_GROUP_BY_IN(int16_t, in_pred_values, min_bytes, max_bytes)
+ break;
+ }
+ case TYPE_INT: {
+ _FILTER_GROUP_BY_IN(int32_t, in_pred_values, min_bytes, max_bytes)
+ break;
+ }
+ case TYPE_BIGINT: {
+ _FILTER_GROUP_BY_IN(int64_t, in_pred_values, min_bytes, max_bytes)
+ break;
+ }
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ std::vector<const char*> in_values;
+ for (auto val : in_pred_values) {
+ const char* value = ((std::string*)val)->c_str();
+ in_values.emplace_back(value);
+ }
+ if (in_values.empty()) {
+ return false;
+ }
+ std::sort(in_values.begin(), in_values.end());
+ const char* in_min = in_values.front();
+ const char* in_max = in_values.back();
+ if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) {
+ return true;
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+ return false;
+}
+
+bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
+ const char* max_bytes) {
+ switch (conjunct_type) {
+ case TYPE_TINYINT: {
+ _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+ break;
+ }
+ case TYPE_SMALLINT: {
+ _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+ break;
+ }
+ case TYPE_INT: {
+ _PLAIN_DECODE(int32_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+ break;
+ }
+ case TYPE_BIGINT: {
+ _PLAIN_DECODE(int64_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+ break;
+ }
+ case TYPE_DOUBLE: {
+ _PLAIN_DECODE(double, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+ break;
+ }
+ case TYPE_FLOAT: {
+ _PLAIN_DECODE(float, value, min_bytes, max_bytes, conjunct_value, min, max)
+ _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+ break;
+ }
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ const char* conjunct_value = ((std::string*)value)->c_str();
+ if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) {
+ return true;
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+ return false;
+}
+
+bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
+ switch (conjunct_type) {
+ case TYPE_TINYINT: {
+ _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_SMALLINT: {
+ _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_INT: {
+ _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_BIGINT: {
+ _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_DOUBLE: {
+ _PLAIN_DECODE_SINGLE(double, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_FLOAT: {
+ _PLAIN_DECODE_SINGLE(float, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ // case TYPE_TIME:
+ const char* conjunct_value = ((std::string*)value)->c_str();
+ if (strcmp(max_bytes, conjunct_value) <= 0) {
+ return true;
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+ return false;
+}
+
+bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
+ switch (conjunct_type) {
+ case TYPE_TINYINT: {
+ _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_SMALLINT: {
+ _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_INT: {
+ _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_BIGINT: {
+ _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_DOUBLE: {
+ _PLAIN_DECODE_SINGLE(double, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_FLOAT: {
+ _PLAIN_DECODE_SINGLE(float, value, max_bytes, conjunct_value, max)
+ _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+ break;
+ }
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ // case TYPE_TIME:
+ const char* conjunct_value = ((std::string*)value)->c_str();
+ if (strcmp(max_bytes, conjunct_value) < 0) {
+ return true;
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+ return false;
+}
+
+bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
+ switch (conjunct_type) {
+ case TYPE_TINYINT: {
+ _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_SMALLINT: {
+ _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_INT: {
+ _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_BIGINT: {
+ _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_DOUBLE: {
+ _PLAIN_DECODE_SINGLE(double, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_FLOAT: {
+ _PLAIN_DECODE_SINGLE(float, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ // case TYPE_TIME:
+ const char* conjunct_value = ((std::string*)value)->c_str();
+ if (strcmp(min_bytes, conjunct_value) >= 0) {
+ return true;
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+ return false;
+}
+
+bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
+ switch (conjunct_type) {
+ case TYPE_TINYINT: {
+ _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_SMALLINT: {
+ _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_INT: {
+ _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_BIGINT: {
+ _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_DOUBLE: {
+ _PLAIN_DECODE_SINGLE(double, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_FLOAT: {
+ _PLAIN_DECODE_SINGLE(float, value, min_bytes, conjunct_value, min)
+ _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+ break;
+ }
+ case TYPE_STRING:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ // case TYPE_TIME:
+ const char* conjunct_value = ((std::string*)value)->c_str();
+ if (strcmp(min_bytes, conjunct_value) > 0) {
+ return true;
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+ return false;
+}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/arrow/parquet_row_group_reader.h b/be/src/exec/arrow/parquet_row_group_reader.h
new file mode 100644
index 0000000000..d29a6c2e05
--- /dev/null
+++ b/be/src/exec/arrow/parquet_row_group_reader.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/type_fwd.h>
+#include <exprs/expr.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/encoding.h>
+#include <parquet/file_reader.h>
+#include <parquet/metadata.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include <unordered_set>
+
+#include "common/status.h"
+#include "exec/arrow/parquet_reader.h"
+
+namespace doris {
+
+class ParquetReaderWrap;
+
+class RowGroupReader {
+public:
+ RowGroupReader(RuntimeProfile* profile, const std::vector<ExprContext*>& conjunct_ctxs,
+ std::shared_ptr<parquet::FileMetaData>& file_metadata,
+ ParquetReaderWrap* parent);
+ ~RowGroupReader();
+
+ Status init_filter_groups(const TupleDescriptor* tuple_desc,
+ const std::map<std::string, int>& map_column,
+ const std::vector<int>& include_column_ids);
+
+ std::unordered_set<int> filter_groups() { return _filter_group; };
+
+private:
+ void _init_conjuncts(const TupleDescriptor* tuple_desc,
+ const std::map<std::string, int>& _map_column,
+ const std::unordered_set<int>& include_column_ids);
+
+ bool _determine_filter_row_group(const std::vector<ExprContext*>& conjuncts,
+ const std::string& encoded_min,
+ const std::string& encoded_max);
+
+ void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes,
+ bool& need_filter);
+
+ void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes,
+ bool& need_filter);
+
+ bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
+ const char* min_bytes, const char* max_bytes);
+
+ bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
+ const char* max_bytes);
+
+ bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes);
+
+ bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes);
+
+ bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes);
+
+ bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes);
+
+private:
+ std::map<int, std::vector<ExprContext*>> _slot_conjuncts;
+ std::unordered_set<int> _filter_group;
+
+ std::vector<ExprContext*> _conjunct_ctxs;
+ std::shared_ptr<parquet::FileMetaData> _file_metadata;
+ RuntimeProfile* _profile;
+ ParquetReaderWrap* _parent;
+};
+} // namespace doris
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 3ec5aba1cd..ef85e56dd3 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -95,6 +95,12 @@ Status BaseScanner::open() {
return Status::OK();
}
+void BaseScanner::reg_conjunct_ctxs(const TupleId& tupleId,
+ const std::vector<ExprContext*>& conjunct_ctxs) {
+ _conjunct_ctxs = conjunct_ctxs;
+ _tupleId = tupleId;
+}
+
Status BaseScanner::init_expr_ctxes() {
// Construct _src_slot_descs
const TupleDescriptor* src_tuple_desc =
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index fe3e088d4e..7efdee0f2f 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -64,6 +64,10 @@ public:
}
}
+ // Register conjuncts for push down
+ virtual void reg_conjunct_ctxs(const TupleId& tupleId,
+ const std::vector<ExprContext*>& conjunct_ctxs);
+
virtual Status init_expr_ctxes();
// Open this scanner, will initialize information need to
virtual Status open();
@@ -142,6 +146,10 @@ protected:
vectorized::Block _src_block;
int _num_of_columns_from_file;
+ // slot_ids for parquet predicate push down are in tuple desc
+ TupleId _tupleId;
+ std::vector<ExprContext*> _conjunct_ctxs;
+
private:
Status _filter_src_block();
void _fill_columns_from_path();
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 532a9d7d06..49fed4c3ce 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -263,6 +263,7 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
_pre_filter_texprs, counter);
}
}
+ scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs);
std::unique_ptr<BaseScanner> scanner(scan);
return scanner;
}
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 776115427e..2e9ea1f3eb 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -106,11 +106,11 @@ Status ParquetScanner::open_next_reader() {
if (range.__isset.num_of_columns_from_file) {
num_of_columns_from_file = range.num_of_columns_from_file;
}
- _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(),
- num_of_columns_from_file);
-
- Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone());
-
+ _cur_file_reader = new ParquetReaderWrap(_profile, file_reader.release(),
+ _state->batch_size(), num_of_columns_from_file);
+ auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
+ Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
+ _state->timezone());
if (status.is_end_of_file()) {
continue;
} else {
@@ -118,6 +118,7 @@ Status ParquetScanner::open_next_reader() {
return Status::InternalError("file: {}, error:{}", range.path,
status.get_error_msg());
} else {
+ RETURN_IF_ERROR(_cur_file_reader->init_parquet_type());
return status;
}
}
diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h
index 7fcb277bb0..6543ff1b9c 100644
--- a/be/src/exprs/expr_context.h
+++ b/be/src/exprs/expr_context.h
@@ -164,6 +164,7 @@ private:
friend class BloomFilterPredicate;
friend class OlapScanNode;
friend class EsPredicate;
+ friend class RowGroupReader;
friend class vectorized::VOlapScanNode;
/// FunctionContexts for each registered expression. The FunctionContexts are created
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 57defdb8ef..9adad2d71a 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -70,8 +70,9 @@ Status FileArrowScanner::_open_next_reader() {
_cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
num_of_columns_from_file);
- Status status = _cur_file_reader->init_reader(_file_slot_descs, _state->timezone());
-
+ auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
+ Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs,
+ _state->timezone());
if (status.is_end_of_file()) {
continue;
} else {
@@ -207,7 +208,7 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* pr
ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) {
- return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+ return new ParquetReaderWrap(_profile, file_reader, batch_size, num_of_columns_from_file);
}
VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp
index 4105f8dc01..741b66dd81 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -315,6 +315,7 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange&
scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, _pre_filter_texprs, counter);
}
+ scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs);
std::unique_ptr<FileScanner> scanner(scan);
return scanner;
}
diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp
index d9780f8dbe..3168e220d5 100644
--- a/be/src/vec/exec/file_scanner.cpp
+++ b/be/src/vec/exec/file_scanner.cpp
@@ -69,6 +69,12 @@ Status FileScanner::open() {
return Status::OK();
}
+void FileScanner::reg_conjunct_ctxs(const TupleId& tupleId,
+ const std::vector<ExprContext*>& conjunct_ctxs) {
+ _conjunct_ctxs = conjunct_ctxs;
+ _tupleId = tupleId;
+}
+
Status FileScanner::_init_expr_ctxes() {
const TupleDescriptor* src_tuple_desc =
_state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id);
diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h
index cb21b876d5..abdc2eaa4e 100644
--- a/be/src/vec/exec/file_scanner.h
+++ b/be/src/vec/exec/file_scanner.h
@@ -34,6 +34,9 @@ public:
virtual ~FileScanner() = default;
+ virtual void reg_conjunct_ctxs(const TupleId& tupleId,
+ const std::vector<ExprContext*>& conjunct_ctxs);
+
// Open this scanner, will initialize information need to
virtual Status open();
@@ -85,6 +88,11 @@ protected:
std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
int _num_of_columns_from_file;
+ // File formats based push down predicate
+ std::vector<ExprContext*> _conjunct_ctxs;
+ // slot_ids for parquet predicate push down are in tuple desc
+ TupleId _tupleId;
+
private:
Status _init_expr_ctxes();
Status _filter_block(vectorized::Block* output_block);
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index 05329f1d04..9d2bfb8a52 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -37,7 +37,13 @@ VArrowScanner::VArrowScanner(RuntimeState* state, RuntimeProfile* profile,
_cur_file_reader(nullptr),
_cur_file_eof(false),
_batch(nullptr),
- _arrow_batch_cur_idx(0) {}
+ _arrow_batch_cur_idx(0) {
+ _filtered_row_groups_counter = ADD_COUNTER(_profile, "FileFilteredRowGroups", TUnit::UNIT);
+ _filtered_rows_counter = ADD_COUNTER(_profile, "FileFilteredRows", TUnit::UNIT);
+ _filtered_bytes_counter = ADD_COUNTER(_profile, "FileFilteredBytes", TUnit::BYTES);
+ _total_rows_counter = ADD_COUNTER(_profile, "FileTotalRows", TUnit::UNIT);
+ _total_groups_counter = ADD_COUNTER(_profile, "FileTotalRowGroups", TUnit::UNIT);
+}
VArrowScanner::~VArrowScanner() {
close();
@@ -72,8 +78,9 @@ Status VArrowScanner::_open_next_reader() {
}
_cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
num_of_columns_from_file);
-
- Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone());
+ auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
+ Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
+ _state->timezone());
if (status.is_end_of_file()) {
continue;
@@ -82,12 +89,21 @@ Status VArrowScanner::_open_next_reader() {
return Status::InternalError(" file: {} error:{}", range.path,
status.get_error_msg());
} else {
+ update_profile(_cur_file_reader->statistics());
return status;
}
}
}
}
+void VArrowScanner::update_profile(std::shared_ptr<Statistics>& statistics) {
+ COUNTER_UPDATE(_total_groups_counter, statistics->total_groups);
+ COUNTER_UPDATE(_filtered_row_groups_counter, statistics->filtered_row_groups);
+ COUNTER_UPDATE(_total_rows_counter, statistics->total_rows);
+ COUNTER_UPDATE(_filtered_rows_counter, statistics->filtered_rows);
+ COUNTER_UPDATE(_filtered_bytes_counter, statistics->filtered_total_bytes);
+}
+
Status VArrowScanner::open() {
RETURN_IF_ERROR(BaseScanner::open());
if (_ranges.empty()) {
diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h
index 176ddb58f2..109ce53177 100644
--- a/be/src/vec/exec/varrow_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -29,6 +29,7 @@
#include <vector>
#include "common/status.h"
+#include "exec/arrow/parquet_reader.h"
#include "exec/base_scanner.h"
#include "gen_cpp/Types_types.h"
#include "runtime/mem_pool.h"
@@ -56,6 +57,9 @@ public:
virtual Status get_next(Block* block, bool* eof) override;
+ // Update file predicate filter profile
+ void update_profile(std::shared_ptr<Statistics>& statistics);
+
virtual void close() override;
protected:
@@ -77,6 +81,12 @@ private:
bool _cur_file_eof; // is read over?
std::shared_ptr<arrow::RecordBatch> _batch;
size_t _arrow_batch_cur_idx;
+
+ RuntimeProfile::Counter* _filtered_row_groups_counter;
+ RuntimeProfile::Counter* _filtered_rows_counter;
+ RuntimeProfile::Counter* _filtered_bytes_counter;
+ RuntimeProfile::Counter* _total_rows_counter;
+ RuntimeProfile::Counter* _total_groups_counter;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp
index 6f63324e6d..dc738c5a09 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -321,6 +321,7 @@ std::unique_ptr<BaseScanner> VBrokerScanNode::create_scanner(const TBrokerScanRa
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
}
+ scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs);
std::unique_ptr<BaseScanner> scanner(scan);
return scanner;
}
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp
index cb59ae60bc..319d8a3ccb 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -32,7 +32,7 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
int32_t num_of_columns_from_file) {
- return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+ return new ParquetReaderWrap(_profile, file_reader, batch_size, num_of_columns_from_file);
}
} // namespace doris::vectorized
diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp
index cbd673d4dc..5b1020224e 100644
--- a/be/test/exec/parquet_scanner_test.cpp
+++ b/be/test/exec/parquet_scanner_test.cpp
@@ -58,6 +58,7 @@ private:
int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
void create_expr_info();
void init_desc_table();
+ void init_filter_expr();
RuntimeState _runtime_state;
ObjectPool _obj_pool;
std::map<std::string, SlotDescriptor*> _slots_map;
@@ -406,10 +407,70 @@ void ParquetScannerTest::create_expr_info() {
_params.__set_src_tuple_id(TUPLE_ID_SRC);
}
+void ParquetScannerTest::init_filter_expr() {
+ TTypeDesc bool_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::BOOLEAN);
+ scalar_type.__set_len(5000);
+ node.__set_scalar_type(scalar_type);
+ bool_type.types.push_back(node);
+ }
+ TTypeDesc int_type;
+ {
+ TTypeNode node;
+ node.__set_type(TTypeNodeType::SCALAR);
+ TScalarType scalar_type;
+ scalar_type.__set_type(TPrimitiveType::BIGINT);
+ node.__set_scalar_type(scalar_type);
+ int_type.types.push_back(node);
+ }
+
+ // create predicate
+ ::doris::TExpr expr;
+
+ // create predicate elements: LeftExpr op RightExpr
+ // expr: log_time > 1
+ ::doris::TExprNode op;
+ op.node_type = TExprNodeType::BINARY_PRED;
+ op.opcode = TExprOpcode::GT;
+ op.type = bool_type;
+ op.num_children = 2;
+ op.child_type = TPrimitiveType::BIGINT;
+ op.__isset.opcode = true;
+ expr.nodes.push_back(op);
+
+ // log_time
+ ::doris::TExprNode slot_ref;
+ slot_ref.node_type = TExprNodeType::SLOT_REF;
+ slot_ref.type = int_type;
+ slot_ref.slot_ref.slot_id = 1;
+ slot_ref.slot_ref.tuple_id = 0;
+ slot_ref.num_children = 0;
+ slot_ref.__isset.slot_ref = true;
+ expr.nodes.push_back(slot_ref);
+
+ ::doris::TExprNode int_expr;
+ int_expr.node_type = TExprNodeType::INT_LITERAL;
+ int_expr.type = int_type;
+ int_expr.int_literal.value = 1;
+ int_expr.num_children = 0;
+ int_expr.__isset.int_literal = true;
+
+ expr.nodes.push_back(int_expr);
+
+ std::vector<::doris::TExpr> conjuncts;
+ conjuncts.push_back(expr);
+ // push down conjuncts;
+ _tnode.__set_conjuncts(conjuncts);
+}
+
void ParquetScannerTest::init() {
create_expr_info();
init_desc_table();
-
+ init_filter_expr();
// Node Id
_tnode.node_id = 0;
_tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
@@ -419,6 +480,7 @@ void ParquetScannerTest::init() {
_tnode.nullable_tuples.push_back(false);
_tnode.broker_scan_node.tuple_id = 0;
_tnode.__isset.broker_scan_node = true;
+ _tnode.__isset.conjuncts = true;
}
TEST_F(ParquetScannerTest, normal) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org