You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by as...@apache.org on 2023/06/16 05:11:43 UTC
[doris] branch master updated: [Opt](orc-reader) Optimize orc reader by dict filtering. (#20806)
This is an automated email from the ASF dual-hosted git repository.
ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b7a50a09fe [Opt](orc-reader) Optimize orc reader by dict filtering. (#20806)
b7a50a09fe is described below
commit b7a50a09febc5c24cca38cd02bede48252938837
Author: Qi Chen <ka...@gmail.com>
AuthorDate: Fri Jun 16 13:11:37 2023 +0800
[Opt](orc-reader) Optimize orc reader by dict filtering. (#20806)
Optimize orc reader by dict filtering. It is similar with #17594.
Test result
**ssb-flat-100**: (3 nodes)
| Query | before opt | after opt |
| ------------- |:-------------:| ---------:|
Q1.1 | 1.239 | 1.145
Q1.2 | 1.254 | 1.128
Q1.3 | 1.931 | 1.644
Q2.1 | 1.359 | 1.006
Q2.2 | 1.229 | 0.674
Q2.3 | 0.934 | 0.427
Q3.1 | 2.226 | 1.712
Q3.2 | 2.042 | 1.562
Q3.3 | 1.631 | 1.021
Q3.4 | 1.618 | 0.732
Q4.1 | 2.294 | 1.858
Q4.2 | 2.511 | 1.961
Q4.3 | 1.736 | 1.446
total | 22.004 | 16.316
---
be/src/apache-orc | 2 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 698 ++++++++++++++++++++-
be/src/vec/exec/format/orc/vorc_reader.h | 81 ++-
.../format/table/transactional_hive_reader.cpp | 14 +-
.../exec/format/table/transactional_hive_reader.h | 5 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 12 +-
6 files changed, 773 insertions(+), 39 deletions(-)
diff --git a/be/src/apache-orc b/be/src/apache-orc
index 380df03331..a4e67d732e 160000
--- a/be/src/apache-orc
+++ b/be/src/apache-orc
@@ -1 +1 @@
-Subproject commit 380df03331c12fa4095dd2613eb5f08ad541eb3e
+Subproject commit a4e67d732e9acf3acb45e85c4cfe84d630e71ec1
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 9fd4bc57d9..f1d6fbc7dd 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -38,6 +38,8 @@
#include "cctz/time_zone.h"
#include "common/exception.h"
#include "exec/olap_utils.h"
+#include "exprs/create_predicate_function.h"
+#include "exprs/hybrid_set.h"
#include "gutil/casts.h"
#include "gutil/strings/substitute.h"
#include "io/fs/buffered_reader.h"
@@ -71,7 +73,10 @@
#include "vec/data_types/data_type_struct.h"
#include "vec/exec/format/table/transactional_hive_common.h"
#include "vec/exprs/vbloom_predicate.h"
+#include "vec/exprs/vdirect_in_predicate.h"
+#include "vec/exprs/vectorized_fn_call.h"
#include "vec/exprs/vin_predicate.h"
+#include "vec/exprs/vliteral.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/runtime/vdatetime_value.h"
@@ -87,12 +92,14 @@ enum class FileCachePolicy : uint8_t;
namespace doris::vectorized {
+// TODO: we need to determine it by test.
+static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max();
+
#define FOR_FLAT_ORC_COLUMNS(M) \
M(TypeIndex::Int8, Int8, orc::LongVectorBatch) \
M(TypeIndex::UInt8, UInt8, orc::LongVectorBatch) \
M(TypeIndex::Int16, Int16, orc::LongVectorBatch) \
M(TypeIndex::UInt16, UInt16, orc::LongVectorBatch) \
- M(TypeIndex::Int32, Int32, orc::LongVectorBatch) \
M(TypeIndex::UInt32, UInt32, orc::LongVectorBatch) \
M(TypeIndex::Int64, Int64, orc::LongVectorBatch) \
M(TypeIndex::UInt64, UInt64, orc::LongVectorBatch) \
@@ -160,13 +167,9 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r
}
OrcReader::~OrcReader() {
- close();
-}
-
-void OrcReader::close() {
- if (!_closed) {
- _collect_profile_on_close();
- _closed = true;
+ _collect_profile_on_close();
+ if (_obj_pool && _obj_pool.get()) {
+ _obj_pool->clear();
}
}
@@ -232,12 +235,24 @@ Status OrcReader::_create_file_reader() {
Status OrcReader::init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
- const VExprContextSPtrs& conjuncts, bool is_acid) {
+ const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const VExprContextSPtrs* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
_column_names = column_names;
_colname_to_value_range = colname_to_value_range;
_text_converter.reset(new TextConverter('\\'));
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
+ _tuple_descriptor = tuple_descriptor;
+ _row_descriptor = row_descriptor;
+ _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
+ _text_converter.reset(new TextConverter('\\'));
+ if (not_single_slot_filter_conjuncts) {
+ _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
+ not_single_slot_filter_conjuncts->end());
+ }
+ _obj_pool = std::make_shared<ObjectPool>();
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
RETURN_IF_ERROR(_create_file_reader());
RETURN_IF_ERROR(_init_read_columns());
@@ -735,7 +750,12 @@ Status OrcReader::set_fill_columns(
_orc_filter = std::unique_ptr<ORCFilterImpl>(new ORCFilterImpl(this));
}
try {
- _row_reader = _reader->createRowReader(_row_reader_options, _orc_filter.get());
+ _row_reader_options.setEnableLazyDecoding(true);
+ if (!_lazy_read_ctx.conjuncts.empty()) {
+ _string_dict_filter = std::make_unique<StringDictFilterImpl>(this);
+ }
+ _row_reader = _reader->createRowReader(_row_reader_options, _orc_filter.get(),
+ _string_dict_filter.get());
_batch = _row_reader->createRowBatch(_batch_size);
} catch (std::exception& e) {
return Status::InternalError("Failed to create orc row reader. reason = {}", e.what());
@@ -743,6 +763,22 @@ Status OrcReader::set_fill_columns(
auto& selected_type = _row_reader->getSelectedType();
int idx = 0;
_init_select_types(selected_type, idx);
+
+ if (!_slot_id_to_filter_conjuncts) {
+ return Status::OK();
+ }
+
+ // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
+ // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
+ for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
+ auto& [value, slot_desc] = kv.second;
+ auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
+ if (iter != _slot_id_to_filter_conjuncts->end()) {
+ for (auto& ctx : iter->second) {
+ _filter_conjuncts.push_back(ctx);
+ }
+ }
+ }
return Status::OK();
}
@@ -937,11 +973,26 @@ Status OrcReader::_decode_string_column(const std::string& col_name,
const orc::TypeKind& type_kind, orc::ColumnVectorBatch* cvb,
size_t num_values) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
- const static std::string empty_string;
- auto* data = down_cast<orc::StringVectorBatch*>(cvb);
+ auto* data = down_cast<orc::EncodedStringVectorBatch*>(cvb);
if (data == nullptr) {
return Status::InternalError("Wrong data type for colum '{}'", col_name);
}
+ if (data->isEncoded) {
+ return _decode_string_dict_encoded_column<is_filter>(col_name, data_column, type_kind, data,
+ num_values);
+ } else {
+ return _decode_string_non_dict_encoded_column<is_filter>(col_name, data_column, type_kind,
+ data, num_values);
+ }
+}
+
+template <bool is_filter>
+Status OrcReader::_decode_string_non_dict_encoded_column(const std::string& col_name,
+ const MutableColumnPtr& data_column,
+ const orc::TypeKind& type_kind,
+ orc::EncodedStringVectorBatch* cvb,
+ size_t num_values) {
+ const static std::string empty_string;
std::vector<StringRef> string_values;
string_values.reserve(num_values);
if (type_kind == orc::TypeKind::CHAR) {
@@ -949,8 +1000,8 @@ Status OrcReader::_decode_string_column(const std::string& col_name,
if (cvb->hasNulls) {
for (int i = 0; i < num_values; ++i) {
if (cvb->notNull[i]) {
- string_values.emplace_back(data->data[i],
- trim_right(data->data[i], data->length[i]));
+ string_values.emplace_back(cvb->data[i],
+ trim_right(cvb->data[i], cvb->length[i]));
} else {
// Orc doesn't fill null values in new batch, but the former batch has been release.
// Other types like int/long/timestamp... are flat types without pointer in them,
@@ -960,22 +1011,21 @@ Status OrcReader::_decode_string_column(const std::string& col_name,
}
} else {
for (int i = 0; i < num_values; ++i) {
- string_values.emplace_back(data->data[i],
- trim_right(data->data[i], data->length[i]));
+ string_values.emplace_back(cvb->data[i], trim_right(cvb->data[i], cvb->length[i]));
}
}
} else {
if (cvb->hasNulls) {
for (int i = 0; i < num_values; ++i) {
if (cvb->notNull[i]) {
- string_values.emplace_back(data->data[i], data->length[i]);
+ string_values.emplace_back(cvb->data[i], cvb->length[i]);
} else {
string_values.emplace_back(empty_string.data(), 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
- string_values.emplace_back(data->data[i], data->length[i]);
+ string_values.emplace_back(cvb->data[i], cvb->length[i]);
}
}
}
@@ -983,6 +1033,128 @@ Status OrcReader::_decode_string_column(const std::string& col_name,
return Status::OK();
}
+template <bool is_filter>
+Status OrcReader::_decode_string_dict_encoded_column(const std::string& col_name,
+ const MutableColumnPtr& data_column,
+ const orc::TypeKind& type_kind,
+ orc::EncodedStringVectorBatch* cvb,
+ size_t num_values) {
+ const static std::string empty_string;
+ std::vector<StringRef> string_values;
+ size_t max_value_length = 0;
+ string_values.reserve(num_values);
+ UInt8* __restrict filter_data;
+ if constexpr (is_filter) {
+ filter_data = _filter->data();
+ }
+ if (type_kind == orc::TypeKind::CHAR) {
+ // Possibly there are some zero padding characters in CHAR type, we have to strip them off.
+ if (cvb->hasNulls) {
+ for (int i = 0; i < num_values; ++i) {
+ if (cvb->notNull[i]) {
+ if constexpr (is_filter) {
+ if (!filter_data[i]) {
+ continue;
+ }
+ }
+ char* val_ptr;
+ int64_t length;
+ cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length);
+ length = trim_right(val_ptr, length);
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+ string_values.emplace_back(val_ptr, length);
+ } else {
+ // Orc doesn't fill null values in new batch, but the former batch has been release.
+ // Other types like int/long/timestamp... are flat types without pointer in them,
+ // so other types do not need to be handled separately like string.
+ string_values.emplace_back(empty_string.data(), 0);
+ }
+ }
+ } else {
+ for (int i = 0; i < num_values; ++i) {
+ if constexpr (is_filter) {
+ if (!filter_data[i]) {
+ continue;
+ }
+ }
+ char* val_ptr;
+ int64_t length;
+ cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length);
+ length = trim_right(val_ptr, length);
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+ string_values.emplace_back(val_ptr, length);
+ }
+ }
+ } else {
+ if (cvb->hasNulls) {
+ for (int i = 0; i < num_values; ++i) {
+ if (cvb->notNull[i]) {
+ if constexpr (is_filter) {
+ if (!filter_data[i]) {
+ continue;
+ }
+ }
+ char* val_ptr;
+ int64_t length;
+ cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length);
+ string_values.emplace_back(val_ptr, length);
+ } else {
+ string_values.emplace_back(empty_string.data(), 0);
+ }
+ }
+ } else {
+ for (int i = 0; i < num_values; ++i) {
+ if constexpr (is_filter) {
+ if (!filter_data[i]) {
+ continue;
+ }
+ }
+ char* val_ptr;
+ int64_t length;
+ cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length);
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+ string_values.emplace_back(val_ptr, length);
+ }
+ }
+ }
+ data_column->insert_many_strings_overflow(&string_values[0], string_values.size(),
+ max_value_length);
+ return Status::OK();
+}
+
+template <bool is_filter>
+Status OrcReader::_decode_int32_column(const std::string& col_name,
+ const MutableColumnPtr& data_column,
+ orc::ColumnVectorBatch* cvb, size_t num_values) {
+ SCOPED_RAW_TIMER(&_statistics.decode_value_time);
+ if (dynamic_cast<orc::LongVectorBatch*>(cvb) != nullptr) {
+ return _decode_flat_column<Int32, orc::LongVectorBatch>(col_name, data_column, cvb,
+ num_values);
+ } else if (dynamic_cast<orc::EncodedStringVectorBatch*>(cvb) != nullptr) {
+ auto* data = static_cast<orc::EncodedStringVectorBatch*>(cvb);
+ if (data == nullptr) {
+ return Status::InternalError("Wrong data type for colum '{}'", col_name);
+ }
+ auto* cvb_data = data->index.data();
+ auto& column_data = static_cast<ColumnVector<Int32>&>(*data_column).get_data();
+ auto origin_size = column_data.size();
+ column_data.resize(origin_size + num_values);
+ for (int i = 0; i < num_values; ++i) {
+ column_data[origin_size + i] = (Int32)cvb_data[i];
+ }
+ return Status::OK();
+ } else {
+ DCHECK(false) << "Bad ColumnVectorBatch type.";
+ return Status::InternalError("Bad ColumnVectorBatch type.");
+ }
+}
+
Status OrcReader::_fill_doris_array_offsets(const std::string& col_name,
ColumnArray::Offsets64& doris_offsets,
orc::DataBuffer<int64_t>& orc_offsets,
@@ -1045,6 +1217,8 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
return _decode_flat_column<CppType, OrcColumnType>(col_name, data_column, cvb, num_values);
FOR_FLAT_ORC_COLUMNS(DISPATCH)
#undef DISPATCH
+ case TypeIndex::Int32:
+ return _decode_int32_column<is_filter>(col_name, data_column, cvb, num_values);
case TypeIndex::Decimal32:
return _decode_decimal_column<Int32, is_filter>(col_name, data_column, data_type, cvb,
num_values);
@@ -1227,6 +1401,23 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
}
}
+ for (auto& dict_filter_cols : _dict_filter_cols) {
+ MutableColumnPtr dict_col_ptr = ColumnVector<Int32>::create();
+ size_t pos = block->get_position_by_name(dict_filter_cols.first);
+ auto& column_with_type_and_name = block->get_by_position(pos);
+ auto& column_type = column_with_type_and_name.type;
+ if (column_type->is_nullable()) {
+ block->get_by_position(pos).type =
+ std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ block->replace_by_position(
+ pos, ColumnNullable::create(std::move(dict_col_ptr),
+ ColumnUInt8::create(dict_col_ptr->size(), 0)));
+ } else {
+ block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
+ block->replace_by_position(pos, std::move(dict_col_ptr));
+ }
+ }
+
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);
@@ -1257,8 +1448,13 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
}
if (!_lazy_read_ctx.conjuncts.empty()) {
VExprContextSPtrs filter_conjuncts;
- for (auto& conjunct : _lazy_read_ctx.conjuncts) {
- filter_conjuncts.push_back(conjunct);
+ filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
+ _filter_conjuncts.end());
+ for (auto& conjunct : _dict_filter_conjuncts) {
+ filter_conjuncts.emplace_back(conjunct);
+ }
+ for (auto& conjunct : _non_dict_filter_conjuncts) {
+ filter_conjuncts.emplace_back(conjunct);
}
std::vector<IColumn::Filter*> filters;
if (_delete_rows_filter_ptr) {
@@ -1274,6 +1470,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
}
Block::erase_useless_column(block, column_to_keep);
}
+ _convert_dict_cols_to_string_cols(block, &batch_vec);
}
return Status::OK();
}
@@ -1319,6 +1516,22 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
Block* block = (Block*)arg;
size_t origin_column_num = block->columns();
+ for (auto& dict_filter_cols : _dict_filter_cols) {
+ MutableColumnPtr dict_col_ptr = ColumnVector<Int32>::create();
+ size_t pos = block->get_position_by_name(dict_filter_cols.first);
+ auto& column_with_type_and_name = block->get_by_position(pos);
+ auto& column_type = column_with_type_and_name.type;
+ if (column_type->is_nullable()) {
+ block->get_by_position(pos).type =
+ std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
+ block->replace_by_position(
+ pos, ColumnNullable::create(std::move(dict_col_ptr),
+ ColumnUInt8::create(dict_col_ptr->size(), 0)));
+ } else {
+ block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
+ block->replace_by_position(pos, std::move(dict_col_ptr));
+ }
+ }
std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, &data, 0);
std::vector<string> col_names;
@@ -1356,11 +1569,20 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
auto* __restrict result_filter_data = _filter->data();
bool can_filter_all = false;
VExprContextSPtrs filter_conjuncts;
- for (auto& conjunct : _lazy_read_ctx.conjuncts) {
- filter_conjuncts.push_back(conjunct);
+ filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
+ _filter_conjuncts.end());
+ for (auto& conjunct : _dict_filter_conjuncts) {
+ filter_conjuncts.emplace_back(conjunct);
+ }
+ for (auto& conjunct : _non_dict_filter_conjuncts) {
+ filter_conjuncts.emplace_back(conjunct);
+ }
+ std::vector<IColumn::Filter*> filters;
+ if (_delete_rows_filter_ptr) {
+ filters.push_back(_delete_rows_filter_ptr.get());
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
- filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all));
+ filter_conjuncts, &filters, block, _filter.get(), &can_filter_all));
if (_lazy_read_ctx.resize_first_column) {
block->get_by_position(0).column->assume_mutable()->clear();
@@ -1386,9 +1608,439 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
new_size += result_filter_data[i] ? 1 : 0;
}
data.numElements = new_size;
+ if (data.numElements > 0) {
+ _convert_dict_cols_to_string_cols(block, &batch_vec);
+ } else {
+ _convert_dict_cols_to_string_cols(block, nullptr);
+ }
+ return Status::OK();
+}
+
+Status OrcReader::fill_dict_filter_column_names(
+ std::unique_ptr<orc::StripeInformation> current_strip_information,
+ std::list<std::string>& column_names) {
+ // Check if single slot can be filtered by dict.
+ if (!_slot_id_to_filter_conjuncts) {
+ return Status::OK();
+ }
+ _obj_pool->clear();
+ _dict_filter_cols.clear();
+ _dict_filter_conjuncts.clear();
+ _non_dict_filter_conjuncts.clear();
+
+ const std::list<string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
+ const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
+ int i = 0;
+ for (auto& predicate_col_name : predicate_col_names) {
+ int slot_id = predicate_col_slot_ids[i];
+ if (_can_filter_by_dict(slot_id)) {
+ _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
+ column_names.emplace_back(_col_name_to_file_col_name[predicate_col_name]);
+ } else {
+ if (_slot_id_to_filter_conjuncts->find(slot_id) !=
+ _slot_id_to_filter_conjuncts->end()) {
+ for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
+ _non_dict_filter_conjuncts.push_back(ctx);
+ }
+ }
+ }
+ ++i;
+ }
+ return Status::OK();
+}
+
+bool OrcReader::_can_filter_by_dict(int slot_id) {
+ SlotDescriptor* slot = nullptr;
+ const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
+ for (auto each : slots) {
+ if (each->id() == slot_id) {
+ slot = each;
+ break;
+ }
+ }
+ if (!slot->type().is_string_type()) {
+ return false;
+ }
+
+ if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) {
+ return false;
+ }
+
+ // TODOļ¼check expr like 'a > 10 is null', 'a > 10' should can be filter by dict.
+ for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
+ const auto& root_expr = ctx->root();
+ if (root_expr->node_type() == TExprNodeType::FUNCTION_CALL) {
+ std::string is_null_str;
+ std::string function_name = root_expr->fn().name.function_name;
+ if (function_name.compare("is_null_pred") == 0 ||
+ function_name.compare("is_not_null_pred") == 0) {
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+Status OrcReader::on_string_dicts_loaded(
+ std::unordered_map<std::string, orc::StringDictionary*>& file_column_name_to_dict_map,
+ bool* is_stripe_filtered) {
+ *is_stripe_filtered = false;
+ for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
+ std::string& dict_filter_col_name = it->first;
+ int slot_id = it->second;
+
+ // Can not dict filter col find because stripe is not dict encoded, then remove it.
+ VExprContextSPtrs ctxs;
+ auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
+ if (iter != _slot_id_to_filter_conjuncts->end()) {
+ for (auto& ctx : iter->second) {
+ ctxs.push_back(ctx);
+ }
+ } else {
+ std::stringstream msg;
+ msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found";
+ return Status::NotFound(msg.str());
+ }
+ auto file_column_name_to_dict_map_iter =
+ file_column_name_to_dict_map.find(_col_name_to_file_col_name[dict_filter_col_name]);
+ if (file_column_name_to_dict_map_iter == file_column_name_to_dict_map.end()) {
+ it = _dict_filter_cols.erase(it);
+ for (auto& ctx : ctxs) {
+ _non_dict_filter_conjuncts.emplace_back(ctx);
+ }
+ continue;
+ }
+
+ // 1. Get dictionary values to a string column.
+ MutableColumnPtr dict_value_column = ColumnString::create();
+ orc::StringDictionary* dict = file_column_name_to_dict_map_iter->second;
+
+ std::vector<StringRef> dict_values;
+ std::unordered_map<StringRef, int64_t> dict_value_to_code;
+ size_t max_value_length = 0;
+ uint64_t dictionaryCount = dict->dictionaryOffset.size() - 1;
+ dict_values.reserve(dictionaryCount);
+ for (int i = 0; i < dictionaryCount; ++i) {
+ char* val_ptr;
+ int64_t length;
+ dict->getValueByIndex(i, val_ptr, length);
+ StringRef dict_value(val_ptr, length);
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+ dict_values.emplace_back(dict_value);
+ dict_value_to_code[dict_value] = i;
+ }
+ dict_value_column->insert_many_strings_overflow(&dict_values[0], dict_values.size(),
+ max_value_length);
+ size_t dict_value_column_size = dict_value_column->size();
+ // 2. Build a temp block from the dict string column, then execute conjuncts and filter block.
+ // 2.1 Build a temp block from the dict string column to match the conjuncts executing.
+ Block temp_block;
+ int dict_pos = -1;
+ int index = 0;
+ for (const auto slot_desc : _tuple_descriptor->slots()) {
+ if (!slot_desc->need_materialize()) {
+ // should be ignored from reading
+ continue;
+ }
+ if (slot_desc->id() == slot_id) {
+ auto data_type = slot_desc->get_data_type_ptr();
+ if (data_type->is_nullable()) {
+ temp_block.insert(
+ {ColumnNullable::create(std::move(dict_value_column),
+ ColumnUInt8::create(dict_value_column_size, 0)),
+ std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
+ ""});
+ } else {
+ temp_block.insert(
+ {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""});
+ }
+ dict_pos = index;
+
+ } else {
+ temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+ ++index;
+ }
+
+ // 2.2 Execute conjuncts and filter block.
+ std::vector<uint32_t> columns_to_filter(1, dict_pos);
+ int column_to_keep = temp_block.columns();
+ if (dict_pos != 0) {
+ // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
+ // The following process may be tricky and time-consuming, but we have no other way.
+ temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
+ }
+ RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
+ ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep)));
+ if (dict_pos != 0) {
+ // We have to clean the first column to insert right data.
+ temp_block.get_by_position(0).column->assume_mutable()->clear();
+ }
+
+ // Check some conditions.
+ ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
+ // If dict_column->size() == 0, can filter this stripe.
+ if (dict_column->size() == 0) {
+ *is_stripe_filtered = true;
+ return Status::OK();
+ }
+
+ // About Performance: if dict_column size is too large, it will generate a large IN filter.
+ if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
+ it = _dict_filter_cols.erase(it);
+ for (auto& ctx : ctxs) {
+ _non_dict_filter_conjuncts.emplace_back(ctx);
+ }
+ continue;
+ }
+
+ // 3. Get dict codes.
+ std::vector<int32_t> dict_codes;
+ if (dict_column->is_nullable()) {
+ const ColumnNullable* nullable_column =
+ static_cast<const ColumnNullable*>(dict_column.get());
+ const ColumnString* nested_column = static_cast<const ColumnString*>(
+ nullable_column->get_nested_column_ptr().get());
+ for (int i = 0; i < nested_column->size(); ++i) {
+ StringRef dict_value = nested_column->get_data_at(i);
+ dict_codes.emplace_back(dict_value_to_code[dict_value]);
+ }
+ } else {
+ for (int i = 0; i < dict_column->size(); ++i) {
+ StringRef dict_value = dict_column->get_data_at(i);
+ dict_codes.emplace_back(dict_value_to_code[dict_value]);
+ }
+ }
+
+ // 4. Rewrite conjuncts.
+ _rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable());
+ ++it;
+ }
+ return Status::OK();
+}
+
+Status OrcReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id,
+ bool is_nullable) {
+ VExprSPtr root;
+ if (dict_codes.size() == 1) {
+ {
+ TFunction fn;
+ TFunctionName fn_name;
+ fn_name.__set_db_name("");
+ fn_name.__set_function_name("eq");
+ fn.__set_name(fn_name);
+ fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+ std::vector<TTypeDesc> arg_types;
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+ fn.__set_arg_types(arg_types);
+ fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ fn.__set_has_var_args(false);
+
+ TExprNode texpr_node;
+ texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+ texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+ texpr_node.__set_opcode(TExprOpcode::EQ);
+ texpr_node.__set_vector_opcode(TExprOpcode::EQ);
+ texpr_node.__set_fn(fn);
+ texpr_node.__set_child_type(TPrimitiveType::INT);
+ texpr_node.__set_num_children(2);
+ texpr_node.__set_is_nullable(is_nullable);
+ root = VectorizedFnCall::create_shared(texpr_node);
+ }
+ {
+ SlotDescriptor* slot = nullptr;
+ const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
+ for (auto each : slots) {
+ if (each->id() == slot_id) {
+ slot = each;
+ break;
+ }
+ }
+ root->add_child(VSlotRef::create_shared(slot));
+ }
+ {
+ TExprNode texpr_node;
+ texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
+ texpr_node.__set_type(create_type_desc(TYPE_INT));
+ TIntLiteral int_literal;
+ int_literal.__set_value(dict_codes[0]);
+ texpr_node.__set_int_literal(int_literal);
+ texpr_node.__set_is_nullable(is_nullable);
+ root->add_child(VLiteral::create_shared(texpr_node));
+ }
+ } else {
+ {
+ TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+ TExprNode node;
+ node.__set_type(type_desc);
+ node.__set_node_type(TExprNodeType::IN_PRED);
+ node.in_predicate.__set_is_not_in(false);
+ node.__set_opcode(TExprOpcode::FILTER_IN);
+ node.__isset.vector_opcode = true;
+ node.__set_vector_opcode(TExprOpcode::FILTER_IN);
+ // VdirectInPredicate assume is_nullable = false.
+ node.__set_is_nullable(false);
+
+ root = vectorized::VDirectInPredicate::create_shared(node);
+ std::shared_ptr<HybridSetBase> hybrid_set(
+ create_set(PrimitiveType::TYPE_INT, dict_codes.size()));
+ for (int j = 0; j < dict_codes.size(); ++j) {
+ hybrid_set->insert(&dict_codes[j]);
+ }
+ static_cast<vectorized::VDirectInPredicate*>(root.get())->set_filter(hybrid_set);
+ }
+ {
+ SlotDescriptor* slot = nullptr;
+ const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
+ for (auto each : slots) {
+ if (each->id() == slot_id) {
+ slot = each;
+ break;
+ }
+ }
+ root->add_child(VSlotRef::create_shared(slot));
+ }
+ }
+ VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root);
+ RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
+ RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
+ _dict_filter_conjuncts.emplace_back(rewritten_conjunct_ctx);
return Status::OK();
}
+Status OrcReader::_convert_dict_cols_to_string_cols(
+ Block* block, const std::vector<orc::ColumnVectorBatch*>* batch_vec) {
+ for (auto& dict_filter_cols : _dict_filter_cols) {
+ size_t pos = block->get_position_by_name(dict_filter_cols.first);
+ ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
+ const ColumnPtr& column = column_with_type_and_name.column;
+ auto orc_col_idx = _colname_to_idx.find(dict_filter_cols.first);
+ if (orc_col_idx == _colname_to_idx.end()) {
+ return Status::InternalError("Wrong read column '{}' in orc file",
+ dict_filter_cols.first);
+ }
+ if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
+ const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
+ const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
+ DCHECK(dict_column);
+
+ MutableColumnPtr string_column;
+ if (batch_vec != nullptr) {
+ string_column = _convert_dict_column_to_string_column(
+ dict_column, (*batch_vec)[orc_col_idx->second],
+ _col_orc_type[orc_col_idx->second]);
+ } else {
+ string_column = ColumnString::create();
+ }
+
+ column_with_type_and_name.type =
+ std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+ block->replace_by_position(
+ pos, ColumnNullable::create(std::move(string_column),
+ nullable_column->get_null_map_column_ptr()));
+ } else {
+ const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(column.get());
+ MutableColumnPtr string_column;
+ if (batch_vec != nullptr) {
+ string_column = _convert_dict_column_to_string_column(
+ dict_column, (*batch_vec)[orc_col_idx->second],
+ _col_orc_type[orc_col_idx->second]);
+ } else {
+ string_column = ColumnString::create();
+ }
+
+ column_with_type_and_name.type = std::make_shared<DataTypeString>();
+ block->replace_by_position(pos, std::move(string_column));
+ }
+ }
+ return Status::OK();
+}
+
+MutableColumnPtr OrcReader::_convert_dict_column_to_string_column(
+ const ColumnInt32* dict_column, orc::ColumnVectorBatch* cvb,
+ const orc::Type* orc_column_type) {
+ SCOPED_RAW_TIMER(&_statistics.decode_value_time);
+ auto res = ColumnString::create();
+ const static std::string empty_string;
+ auto* encoded_string_vector_batch = static_cast<orc::EncodedStringVectorBatch*>(cvb);
+ DCHECK(encoded_string_vector_batch);
+ std::vector<StringRef> string_values;
+ size_t num_values = dict_column->size();
+ const int* dict_data = dict_column->get_data().data();
+ string_values.reserve(num_values);
+ size_t max_value_length = 0;
+ if (orc_column_type->getKind() == orc::TypeKind::CHAR) {
+ // Possibly there are some zero padding characters in CHAR type, we have to strip them off.
+ if (cvb->hasNulls) {
+ for (int i = 0; i < num_values; ++i) {
+ if (cvb->notNull[i]) {
+ char* val_ptr;
+ int64_t length;
+ encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
+ length);
+ length = trim_right(val_ptr, length);
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+ string_values.emplace_back(val_ptr, length);
+ } else {
+ // Orc doesn't fill null values in new batch, but the former batch has been release.
+ // Other types like int/long/timestamp... are flat types without pointer in them,
+ // so other types do not need to be handled separately like string.
+ string_values.emplace_back(empty_string.data(), 0);
+ }
+ }
+ } else {
+ for (int i = 0; i < num_values; ++i) {
+ char* val_ptr;
+ int64_t length;
+ encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
+ length);
+ length = trim_right(val_ptr, length);
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+ string_values.emplace_back(val_ptr, length);
+ }
+ }
+ } else {
+ if (cvb->hasNulls) {
+ for (int i = 0; i < num_values; ++i) {
+ if (cvb->notNull[i]) {
+ char* val_ptr;
+ int64_t length;
+ encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
+ length);
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+ string_values.emplace_back(val_ptr, length);
+ } else {
+ string_values.emplace_back(empty_string.data(), 0);
+ }
+ }
+ } else {
+ for (int i = 0; i < num_values; ++i) {
+ char* val_ptr;
+ int64_t length;
+ encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
+ length);
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+ string_values.emplace_back(val_ptr, length);
+ }
+ }
+ }
+ res->insert_many_strings_overflow(&string_values[0], num_values, max_value_length);
+ return res;
+}
+
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
std::vector<bool> selected_columns) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h
index 2a62bfb93c..de7a8d182f 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -143,7 +143,10 @@ public:
Status init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
- const VExprContextSPtrs& conjuncts, bool is_acid);
+ const VExprContextSPtrs& conjuncts, bool is_acid,
+ const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
+ const VExprContextSPtrs* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
@@ -167,8 +170,6 @@ public:
void _build_delete_row_filter(const Block* block, size_t rows);
- void close();
-
int64_t size() const;
std::unordered_map<std::string, TypeDescriptor> get_name_to_type() override;
@@ -178,12 +179,20 @@ public:
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
- Status filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg);
-
void set_delete_rows(const TransactionalHiveReader::AcidRowIDSet* delete_rows) {
_delete_rows = delete_rows;
}
+ Status filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg);
+
+ Status fill_dict_filter_column_names(
+ std::unique_ptr<orc::StripeInformation> current_strip_information,
+ std::list<std::string>& column_names);
+
+ Status on_string_dicts_loaded(
+ std::unordered_map<std::string, orc::StringDictionary*>& column_name_to_dict_map,
+ bool* is_stripe_filtered);
+
private:
struct OrcProfile {
RuntimeProfile::Counter* read_time;
@@ -209,6 +218,27 @@ private:
OrcReader* orcReader;
};
+ class StringDictFilterImpl : public orc::StringDictFilter {
+ public:
+ StringDictFilterImpl(OrcReader* orc_reader) : _orc_reader(orc_reader) {}
+ ~StringDictFilterImpl() override = default;
+
+ virtual void fillDictFilterColumnNames(
+ std::unique_ptr<orc::StripeInformation> current_strip_information,
+ std::list<std::string>& column_names) const override {
+ _orc_reader->fill_dict_filter_column_names(std::move(current_strip_information),
+ column_names);
+ }
+ virtual void onStringDictsLoaded(
+ std::unordered_map<std::string, orc::StringDictionary*>& column_name_to_dict_map,
+ bool* is_stripe_filtered) const override {
+ _orc_reader->on_string_dicts_loaded(column_name_to_dict_map, is_stripe_filtered);
+ }
+
+ private:
+ OrcReader* _orc_reader;
+ };
+
// Create inner orc file,
// return EOF if file is empty
// return EROOR if encounter error.
@@ -344,6 +374,10 @@ private:
return Status::OK();
}
+ template <bool is_filter>
+ Status _decode_int32_column(const std::string& col_name, const MutableColumnPtr& data_column,
+ orc::ColumnVectorBatch* cvb, size_t num_values);
+
template <typename DecimalPrimitiveType, bool is_filter>
Status _decode_decimal_column(const std::string& col_name, const MutableColumnPtr& data_column,
const DataTypePtr& data_type, orc::ColumnVectorBatch* cvb,
@@ -410,6 +444,20 @@ private:
const orc::TypeKind& type_kind, orc::ColumnVectorBatch* cvb,
size_t num_values);
+ template <bool is_filter>
+ Status _decode_string_non_dict_encoded_column(const std::string& col_name,
+ const MutableColumnPtr& data_column,
+ const orc::TypeKind& type_kind,
+ orc::EncodedStringVectorBatch* cvb,
+ size_t num_values);
+
+ template <bool is_filter>
+ Status _decode_string_dict_encoded_column(const std::string& col_name,
+ const MutableColumnPtr& data_column,
+ const orc::TypeKind& type_kind,
+ orc::EncodedStringVectorBatch* cvb,
+ size_t num_values);
+
Status _fill_doris_array_offsets(const std::string& col_name,
ColumnArray::Offsets64& doris_offsets,
orc::DataBuffer<int64_t>& orc_offsets, size_t num_values,
@@ -419,6 +467,17 @@ private:
void _collect_profile_on_close();
+ bool _can_filter_by_dict(int slot_id);
+
+ Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id, bool is_nullable);
+
+ Status _convert_dict_cols_to_string_cols(Block* block,
+ const std::vector<orc::ColumnVectorBatch*>* batch_vec);
+
+ MutableColumnPtr _convert_dict_column_to_string_column(const ColumnInt32* dict_column,
+ orc::ColumnVectorBatch* cvb,
+ const orc::Type* orc_column_typ);
+
private:
RuntimeProfile* _profile = nullptr;
RuntimeState* _state = nullptr;
@@ -449,7 +508,6 @@ private:
std::unique_ptr<ORCFileInputStream> _file_input_stream;
Statistics _statistics;
OrcProfile _orc_profile;
- bool _closed = false;
std::unique_ptr<orc::ColumnVectorBatch> _batch;
std::unique_ptr<orc::Reader> _reader;
@@ -473,6 +531,17 @@ private:
std::unique_ptr<TextConverter> _text_converter = nullptr;
const TransactionalHiveReader::AcidRowIDSet* _delete_rows = nullptr;
std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr = nullptr;
+
+ const TupleDescriptor* _tuple_descriptor;
+ const RowDescriptor* _row_descriptor;
+ const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts;
+ VExprContextSPtrs _dict_filter_conjuncts;
+ VExprContextSPtrs _non_dict_filter_conjuncts;
+ VExprContextSPtrs _filter_conjuncts;
+ // std::pair<col_name, slot_id>
+ std::vector<std::pair<std::string, int>> _dict_filter_cols;
+ std::shared_ptr<ObjectPool> _obj_pool;
+ std::unique_ptr<orc::StringDictFilter> _string_dict_filter;
};
class ORCFileInputStream : public orc::InputStream {
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index a2ae767d56..79341f40aa 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -57,12 +57,17 @@ TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader>
Status TransactionalHiveReader::init_reader(
const std::vector<std::string>& column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
- const VExprContextSPtrs& conjuncts) {
+ const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const VExprContextSPtrs* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
OrcReader* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
_col_names.insert(_col_names.end(), column_names.begin(), column_names.end());
_col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
- Status status = orc_reader->init_reader(&_col_names, colname_to_value_range, conjuncts, true);
+ Status status = orc_reader->init_reader(
+ &_col_names, colname_to_value_range, conjuncts, true, tuple_descriptor, row_descriptor,
+ not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
return status;
}
@@ -132,8 +137,9 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range) {
OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE,
_state->timezone(), _io_ctx, false);
- RETURN_IF_ERROR(delete_reader.init_reader(
- &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, nullptr, {}, false));
+ RETURN_IF_ERROR(
+ delete_reader.init_reader(&TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE,
+ nullptr, {}, false, nullptr, nullptr, nullptr, nullptr));
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h
index b19102a366..5b3609e47f 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.h
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.h
@@ -106,7 +106,10 @@ public:
Status init_reader(
const std::vector<std::string>& column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
- const VExprContextSPtrs& conjuncts);
+ const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const VExprContextSPtrs* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
private:
struct TransactionalHiveProfile {
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 535dac3a68..b0a2d2e327 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -673,13 +673,17 @@ Status VFileScanner::_get_next_reader() {
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
_state, _params, range,
_io_ctx.get());
- init_status = tran_orc_reader->init_reader(_file_col_names, _colname_to_value_range,
- _push_down_conjuncts);
+ init_status = tran_orc_reader->init_reader(
+ _file_col_names, _colname_to_value_range, _push_down_conjuncts,
+ _real_tuple_desc, _default_val_row_desc.get(),
+ &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range));
_cur_reader = std::move(tran_orc_reader);
} else {
- init_status = orc_reader->init_reader(&_file_col_names, _colname_to_value_range,
- _push_down_conjuncts, false);
+ init_status = orc_reader->init_reader(
+ &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false,
+ _real_tuple_desc, _default_val_row_desc.get(),
+ &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
_cur_reader = std::move(orc_reader);
}
break;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org