You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/09 03:24:12 UTC
[incubator-doris] branch master updated: [feature] (vec) instead of converting line to src tuple for stream load in vectorized. (#9314)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new eec1dfde3a [feature] (vec) instead of converting line to src tuple for stream load in vectorized. (#9314)
eec1dfde3a is described below
commit eec1dfde3a6b93eccc28fb1c0b58233987a2b856
Author: xiepengcheng01 <10...@users.noreply.github.com>
AuthorDate: Mon May 9 11:24:07 2022 +0800
[feature] (vec) instead of converting line to src tuple for stream load in vectorized. (#9314)
Co-authored-by: xiepengcheng01 <xi...@xafj-palo-rpm64.xafj.baidu.com>
---
be/src/exec/base_scanner.cpp | 41 ++++-
be/src/exec/base_scanner.h | 15 +-
be/src/exec/broker_scanner.cpp | 18 +-
be/src/exec/broker_scanner.h | 6 +-
be/src/vec/exec/vbroker_scan_node.cpp | 188 ++++++++++----------
be/src/vec/exec/vbroker_scan_node.h | 1 +
be/src/vec/exec/vbroker_scanner.cpp | 263 ++++++++++++----------------
be/src/vec/exec/vbroker_scanner.h | 17 +-
be/test/exec/multi_bytes_separator_test.cpp | 9 +-
be/test/vec/exec/vbroker_scan_node_test.cpp | 72 ++++----
be/test/vec/exec/vbroker_scanner_test.cpp | 66 +++----
11 files changed, 350 insertions(+), 346 deletions(-)
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index ca5b08831f..8621cf75f8 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -107,10 +107,18 @@ Status BaseScanner::init_expr_ctxes() {
// preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
if (!_pre_filter_texprs.empty()) {
- RETURN_IF_ERROR(
- Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs));
- RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
- RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
+ if (_state->enable_vectorized_exec()) {
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
+ _state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs));
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc,
+ _mem_tracker));
+ RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state));
+ } else {
+ RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs,
+ &_pre_filter_ctxs));
+ RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
+ RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
+ }
}
// Construct dest slots information
@@ -133,11 +141,22 @@ Status BaseScanner::init_expr_ctxes() {
<< ", name=" << slot_desc->col_name();
return Status::InternalError(ss.str());
}
- ExprContext* ctx = nullptr;
- RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
- RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
- RETURN_IF_ERROR(ctx->open(_state));
- _dest_expr_ctx.emplace_back(ctx);
+
+ if (_state->enable_vectorized_exec()) {
+ vectorized::VExprContext* ctx = nullptr;
+ RETURN_IF_ERROR(
+ vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+ RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
+ RETURN_IF_ERROR(ctx->open(_state));
+ _dest_vexpr_ctx.emplace_back(ctx);
+ } else {
+ ExprContext* ctx = nullptr;
+ RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+ RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
+ RETURN_IF_ERROR(ctx->open(_state));
+ _dest_expr_ctx.emplace_back(ctx);
+ }
+
if (has_slot_id_map) {
auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
@@ -284,6 +303,10 @@ void BaseScanner::close() {
if (!_pre_filter_ctxs.empty()) {
Expr::close(_pre_filter_ctxs, _state);
}
+
+ if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) {
+ vectorized::VExpr::close(_vpre_filter_ctxs, _state);
+ }
}
} // namespace doris
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 13285ab6aa..d98e39e663 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -20,6 +20,7 @@
#include "common/status.h"
#include "exprs/expr.h"
+#include "vec/exprs/vexpr.h"
#include "runtime/tuple.h"
#include "util/runtime_profile.h"
@@ -52,7 +53,12 @@ class BaseScanner {
public:
BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
- virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); };
+ virtual ~BaseScanner() {
+ Expr::close(_dest_expr_ctx, _state);
+ if (_state->enable_vectorized_exec()) {
+ vectorized::VExpr::close(_dest_vexpr_ctx, _state);
+ }
+ };
virtual Status init_expr_ctxes();
// Open this scanner, will initialize information need to
@@ -62,8 +68,8 @@ public:
virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) = 0;
// Get next block
- virtual Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) {
- return Status::NotSupported("Not Implemented get next");
+ virtual Status get_next(vectorized::Block* block, bool* eof) {
+ return Status::NotSupported("Not Implemented get block");
}
// Close this scanner
@@ -95,6 +101,9 @@ protected:
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
+ // for vectorized
+ std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
+ std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index cda19d8611..d35e8428a4 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -468,8 +468,7 @@ Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool*
return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
}
-// Convert one row to this tuple
-Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
+Status BrokerScanner::_line_split_to_values(const Slice& line) {
bool is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
if (!is_proto_format && !validate_utf8(line.data, line.size)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
@@ -546,6 +545,17 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
}
}
+ _success = true;
+ return Status::OK();
+}
+
+// Convert one row to this tuple
+Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
+ RETURN_IF_ERROR(_line_split_to_values(line));
+ if (!_success) {
+ return Status::OK();
+ }
+
for (int i = 0; i < _split_values.size(); ++i) {
auto slot_desc = _src_slot_descs[i];
const Slice& value = _split_values[i];
@@ -560,11 +570,11 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
str_slot->len = value.size;
}
+ const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.num_of_columns_from_file) {
- fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path);
+ fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path);
}
- _success = true;
return Status::OK();
}
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 344d3789ae..2b6ac2d302 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -65,8 +65,8 @@ public:
virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof,
bool* fill_tuple) override;
- Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) override {
- return Status::NotSupported("Not Implemented get columns");
+ Status get_next(vectorized::Block* block, bool* eof) override {
+ return Status::NotSupported("Not Implemented get block");
}
// Close this scanner
@@ -78,6 +78,8 @@ protected:
Status _line_to_src_tuple(const Slice& line);
+ Status _line_split_to_values(const Slice& line);
+
private:
Status open_file_reader();
Status create_decompressor(TFileFormatType::type type);
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp
index d12f3e8d85..09338a1959 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block,
return Status::OK();
}
- std::shared_ptr<vectorized::Block> scanner_block;
- {
- std::unique_lock<std::mutex> l(_batch_queue_lock);
- while (_process_status.ok() && !_runtime_state->is_cancelled() &&
- _num_running_scanners > 0 && _block_queue.empty()) {
- SCOPED_TIMER(_wait_scanner_timer);
- _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
- }
- if (!_process_status.ok()) {
- // Some scanner process failed.
- return _process_status;
+ const int batch_size = _runtime_state->batch_size();
+ while (true) {
+ std::shared_ptr<vectorized::Block> scanner_block;
+ {
+ std::unique_lock<std::mutex> l(_batch_queue_lock);
+ while (_process_status.ok() && !_runtime_state->is_cancelled() &&
+ _num_running_scanners > 0 && _block_queue.empty()) {
+ SCOPED_TIMER(_wait_scanner_timer);
+ _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
+ }
+ if (!_process_status.ok()) {
+ // Some scanner process failed.
+ return _process_status;
+ }
+ if (_runtime_state->is_cancelled()) {
+ if (update_status(Status::Cancelled("Cancelled"))) {
+ _queue_writer_cond.notify_all();
+ }
+ return _process_status;
+ }
+ if (!_block_queue.empty()) {
+ scanner_block = _block_queue.front();
+ _block_queue.pop_front();
+ }
}
- if (_runtime_state->is_cancelled()) {
- if (update_status(Status::Cancelled("Cancelled"))) {
- _queue_writer_cond.notify_all();
+
+ // All scanner has been finished, and all cached batch has been read
+ if (!scanner_block) {
+ if (_mutable_block && !_mutable_block->empty()) {
+ *block = _mutable_block->to_block();
+ reached_limit(block, eos);
+ LOG_IF(INFO, *eos) << "VBrokerScanNode ReachedLimit.";
}
- return _process_status;
+ _scan_finished.store(true);
+ *eos = true;
+ return Status::OK();
}
- if (!_block_queue.empty()) {
- scanner_block = _block_queue.front();
- _block_queue.pop_front();
+ // notify one scanner
+ _queue_writer_cond.notify_one();
+
+ if (UNLIKELY(!_mutable_block)) {
+ _mutable_block.reset(new MutableBlock(scanner_block->clone_empty()));
}
- }
- // All scanner has been finished, and all cached batch has been read
- if (scanner_block == nullptr) {
- _scan_finished.store(true);
- *eos = true;
- return Status::OK();
+ if (_mutable_block->rows() + scanner_block->rows() < batch_size) {
+ // merge scanner_block into _mutable_block
+ _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
+ continue;
+ } else {
+ if (_mutable_block->empty()) {
+ // directly use scanner_block
+ *block = *scanner_block;
+ } else {
+ // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next.
+ *block = _mutable_block->to_block();
+ _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns());
+ _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
+ }
+ break;
+ }
}
- // notify one scanner
- _queue_writer_cond.notify_one();
-
- reached_limit(scanner_block.get(), eos);
- *block = *scanner_block;
-
+ reached_limit(block, eos);
if (*eos) {
_scan_finished.store(true);
_queue_writer_cond.notify_all();
@@ -120,75 +146,53 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter);
RETURN_IF_ERROR(scanner->open());
bool scanner_eof = false;
-
- const int batch_size = _runtime_state->batch_size();
- size_t slot_num = _tuple_desc->slots().size();
-
while (!scanner_eof) {
- std::shared_ptr<vectorized::Block> block(new vectorized::Block());
- std::vector<vectorized::MutableColumnPtr> columns(slot_num);
- for (int i = 0; i < slot_num; i++) {
- columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+ RETURN_IF_CANCELLED(_runtime_state);
+ // If we have finished all works
+ if (_scan_finished.load() || !_process_status.ok()) {
+ return Status::OK();
}
- while (columns[0]->size() < batch_size && !scanner_eof) {
- RETURN_IF_CANCELLED(_runtime_state);
- // If we have finished all works
- if (_scan_finished.load()) {
- return Status::OK();
- }
-
- RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof));
- if (scanner_eof) {
- break;
- }
+ std::shared_ptr<vectorized::Block> block(new vectorized::Block());
+ RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
+ if (block->rows() == 0) {
+ continue;
+ }
+ auto old_rows = block->rows();
+ RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
+ _tuple_desc->slots().size()));
+ counter->num_rows_unselected += old_rows - block->rows();
+ if (block->rows() == 0) {
+ continue;
}
- if (!columns[0]->empty()) {
- auto n_columns = 0;
- for (const auto slot_desc : _tuple_desc->slots()) {
- block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
- slot_desc->get_data_type_ptr(),
- slot_desc->col_name()));
- }
-
- auto old_rows = block->rows();
-
- RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
- _tuple_desc->slots().size()));
-
- counter->num_rows_unselected += old_rows - block->rows();
-
- std::unique_lock<std::mutex> l(_batch_queue_lock);
- while (_process_status.ok() && !_scan_finished.load() &&
- !_runtime_state->is_cancelled() &&
- // stop pushing more batch if
- // 1. too many batches in queue, or
- // 2. at least one batch in queue and memory exceed limit.
- (_block_queue.size() >= _max_buffered_batches ||
- (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
- _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
- }
- // Process already set failed, so we just return OK
- if (!_process_status.ok()) {
- return Status::OK();
- }
- // Scan already finished, just return
- if (_scan_finished.load()) {
- return Status::OK();
- }
- // Runtime state is canceled, just return cancel
- if (_runtime_state->is_cancelled()) {
- return Status::Cancelled("Cancelled");
- }
- // Queue size Must be smaller than _max_buffered_batches
- _block_queue.push_back(block);
-
- // Notify reader to
- _queue_reader_cond.notify_one();
+ std::unique_lock<std::mutex> l(_batch_queue_lock);
+ while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() &&
+ // stop pushing more batch if
+ // 1. too many batches in queue, or
+ // 2. at least one batch in queue and memory exceed limit.
+ (_block_queue.size() >= _max_buffered_batches ||
+ (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
+ _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
}
- }
+ // Process already set failed, so we just return OK
+ if (!_process_status.ok()) {
+ return Status::OK();
+ }
+ // Scan already finished, just return
+ if (_scan_finished.load()) {
+ return Status::OK();
+ }
+ // Runtime state is canceled, just return cancel
+ if (_runtime_state->is_cancelled()) {
+ return Status::Cancelled("Cancelled");
+ }
+ // Queue size Must be smaller than _max_buffered_batches
+ _block_queue.push_back(block);
+ // Notify reader to
+ _queue_reader_cond.notify_one();
+ }
return Status::OK();
}
diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h
index 9a3b2fe362..4ccebed5fb 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -51,6 +51,7 @@ private:
Status scanner_scan(const TBrokerScanRange& scan_range, ScannerCounter* counter);
std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
+ std::unique_ptr<MutableBlock> _mutable_block;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index 28e2f24c22..39302099a6 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -21,25 +21,41 @@
#include <iostream>
#include <sstream>
+#include "exec/text_converter.h"
#include "exec/exec_node.h"
#include "exprs/expr_context.h"
#include "exec/plain_text_line_reader.h"
+#include "util/utf8_check.h"
namespace doris::vectorized {
+
+bool is_null(const Slice& slice) {
+ return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
+}
+
VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BrokerScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
- counter) {}
+ counter) {
+ _text_converter.reset(new (std::nothrow) TextConverter('\\'));
+}
-Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eof) {
+VBrokerScanner::~VBrokerScanner() {}
+
+Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
SCOPED_TIMER(_read_timer);
const int batch_size = _state->batch_size();
+ // Get batch lines
+ int slot_num = _src_slot_descs.size();
+ std::vector<vectorized::MutableColumnPtr> columns(slot_num);
+ for (int i = 0; i < slot_num; i++) {
+ columns[i] = _src_slot_descs[i]->get_empty_mutable_column();
+ }
- // Get one line
while (columns[0]->size() < batch_size && !_scanner_eof) {
if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
RETURN_IF_ERROR(open_next_reader());
@@ -62,7 +78,7 @@ Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eo
{
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), columns));
+ RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), columns));
if (_success) {
free_expr_local_allocations();
}
@@ -73,53 +89,67 @@ Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eo
} else {
*eof = false;
}
- return Status::OK();
+ return _fill_dest_block(output_block, columns);
}
-Status VBrokerScanner::_convert_one_row(const Slice& line, std::vector<MutableColumnPtr>& columns) {
- RETURN_IF_ERROR(_line_to_src_tuple(line));
- if (!_success) {
- // If not success, which means we met an invalid row, return.
+Status VBrokerScanner::_fill_dest_block(Block* dest_block, std::vector<MutableColumnPtr>& columns) {
+ if (columns.empty() || columns[0]->size() == 0) {
return Status::OK();
}
- return _fill_dest_columns(columns);
+ std::unique_ptr<vectorized::Block> tmp_block(new vectorized::Block());
+ auto n_columns = 0;
+ for (const auto slot_desc : _src_slot_descs) {
+ tmp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+ auto old_rows = tmp_block->rows();
+ // filter
+ if (!_vpre_filter_ctxs.empty()) {
+ for (auto vexpr_ctx : _vpre_filter_ctxs) {
+ RETURN_IF_ERROR(VExprContext::filter_block(vexpr_ctx, tmp_block.get(),
+ _dest_tuple_desc->slots().size()));
+ _counter->num_rows_unselected += old_rows - tmp_block->rows();
+ old_rows = tmp_block->rows();
+ }
+ }
+
+ Status status;
+ // expr
+ if (!_dest_vexpr_ctx.empty()) {
+ *dest_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
+ _dest_vexpr_ctx, *tmp_block, status);
+ if (UNLIKELY(dest_block->rows() == 0)) {
+ _success = false;
+ return status;
+ }
+ } else {
+ *dest_block = *tmp_block;
+ }
+
+ return status;
}
-Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns) {
- // filter src tuple by preceding filter first
- if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) {
- _counter->num_rows_unselected++;
- _success = false;
+Status VBrokerScanner::_fill_dest_columns(const Slice& line,
+ std::vector<MutableColumnPtr>& columns) {
+ RETURN_IF_ERROR(_line_split_to_values(line));
+ if (!_success) {
+ // If not success, which means we met an invalid row, return.
return Status::OK();
}
- // convert and fill dest tuple
- int ctx_idx = 0;
- for (auto slot_desc : _dest_tuple_desc->slots()) {
- if (!slot_desc->is_materialized()) {
+
+ int idx = 0;
+ for (int i = 0; i < _split_values.size(); ++i) {
+ int dest_index = idx++;
+
+ auto src_slot_desc = _src_slot_descs[i];
+ if (!src_slot_desc->is_materialized()) {
continue;
}
- int dest_index = ctx_idx++;
- auto* column_ptr = columns[dest_index].get();
-
- ExprContext* ctx = _dest_expr_ctx[dest_index];
- void* value = ctx->get_value(_src_tuple_row);
- if (value == nullptr) {
- // Only when the expr return value is null, we will check the error message.
- std::string expr_error = ctx->get_error_msg();
- if (!expr_error.empty()) {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return _src_tuple_row->to_string(*(_row_desc.get()));
- },
- [&]() -> std::string { return expr_error; }, &_scanner_eof));
- _counter->num_rows_filtered++;
- // The ctx is reused, so must clear the error state and message.
- ctx->clear_error_msg();
- _success = false;
- return Status::OK();
- }
+ const Slice& value = _split_values[i];
+ if (is_null(value)) {
// If _strict_mode is false, _src_slot_descs_order_by_dest size could be zero
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index] != nullptr) &&
!_src_tuple->is_null(
@@ -140,7 +170,7 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns
fmt::format_to(error_msg,
"column({}) value is incorrect while strict mode is {}, "
"src value is {}",
- slot_desc->col_name(), _strict_mode, raw_string);
+ src_slot_desc->col_name(), _strict_mode, raw_string);
return error_msg.data();
},
&_scanner_eof));
@@ -148,7 +178,8 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns
_success = false;
return Status::OK();
}
- if (!slot_desc->is_nullable()) {
+
+ if (!src_slot_desc->is_nullable()) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_tuple_row->to_string(*(_row_desc.get()));
@@ -158,7 +189,7 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns
fmt::format_to(
error_msg,
"column({}) values is null while columns is not nullable",
- slot_desc->col_name());
+ src_slot_desc->col_name());
return error_msg.data();
},
&_scanner_eof));
@@ -166,124 +197,50 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns
_success = false;
return Status::OK();
}
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+ // nullable
+ auto* nullable_column =
+ reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
nullable_column->insert_data(nullptr, 0);
continue;
}
- if (slot_desc->is_nullable()) {
- auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
- nullable_column->get_null_map_data().push_back(0);
- column_ptr = &nullable_column->get_nested_column();
- }
- char* value_ptr = (char*)value;
- switch (slot_desc->type().type) {
- case TYPE_BOOLEAN: {
- assert_cast<ColumnVector<UInt8>*>(column_ptr)->insert_data(value_ptr, 0);
- break;
- }
- case TYPE_TINYINT: {
- assert_cast<ColumnVector<Int8>*>(column_ptr)->insert_data(value_ptr, 0);
- break;
- }
- case TYPE_SMALLINT: {
- assert_cast<ColumnVector<Int16>*>(column_ptr)->insert_data(value_ptr, 0);
- break;
- }
- case TYPE_INT: {
- assert_cast<ColumnVector<Int32>*>(column_ptr)->insert_data(value_ptr, 0);
- break;
- }
- case TYPE_BIGINT: {
- assert_cast<ColumnVector<Int64>*>(column_ptr)->insert_data(value_ptr, 0);
- break;
- }
- case TYPE_LARGEINT: {
- assert_cast<ColumnVector<Int128>*>(column_ptr)->insert_data(value_ptr, 0);
- break;
- }
- case TYPE_FLOAT: {
- assert_cast<ColumnVector<Float32>*>(column_ptr)->insert_data(value_ptr, 0);
- break;
- }
- case TYPE_DOUBLE: {
- assert_cast<ColumnVector<Float64>*>(column_ptr)->insert_data(value_ptr, 0);
- break;
- }
- case TYPE_CHAR: {
- Slice* slice = reinterpret_cast<Slice*>(value_ptr);
- assert_cast<ColumnString*>(column_ptr)
- ->insert_data(slice->data, strnlen(slice->data, slice->size));
- break;
- }
- case TYPE_VARCHAR:
- case TYPE_STRING: {
- Slice* slice = reinterpret_cast<Slice*>(value_ptr);
- assert_cast<ColumnString*>(column_ptr)->insert_data(slice->data, slice->size);
- break;
- }
- case TYPE_OBJECT: {
- Slice* slice = reinterpret_cast<Slice*>(value_ptr);
- // insert_default()
- auto* target_column = assert_cast<ColumnBitmap*>(column_ptr);
- target_column->insert_default();
- BitmapValue* pvalue = nullptr;
- int pos = target_column->size() - 1;
- pvalue = &target_column->get_element(pos);
+ RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc,
+ &columns[dest_index], _state));
+ }
- if (slice->size != 0) {
- BitmapValue value;
- value.deserialize(slice->data);
- *pvalue = std::move(value);
- } else {
- *pvalue = std::move(*reinterpret_cast<BitmapValue*>(slice->data));
- }
- break;
- }
- case TYPE_HLL: {
- Slice* slice = reinterpret_cast<Slice*>(value_ptr);
- auto* target_column = assert_cast<ColumnHLL*>(column_ptr);
+ const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
+ if (range.__isset.num_of_columns_from_file) {
+ RETURN_IF_ERROR(_fill_columns_from_path(range.num_of_columns_from_file,
+ range.columns_from_path, columns));
+ }
- target_column->insert_default();
- HyperLogLog* pvalue = nullptr;
- int pos = target_column->size() - 1;
- pvalue = &target_column->get_element(pos);
- if (slice->size != 0) {
- HyperLogLog value;
- value.deserialize(*slice);
- *pvalue = std::move(value);
- } else {
- *pvalue = std::move(*reinterpret_cast<HyperLogLog*>(slice->data));
- }
- break;
- }
- case TYPE_DECIMALV2: {
- assert_cast<ColumnDecimal<Decimal128>*>(column_ptr)
- ->insert_data(reinterpret_cast<char*>(value_ptr), 0);
- break;
- }
- case TYPE_DATETIME: {
- DateTimeValue value = *reinterpret_cast<DateTimeValue*>(value_ptr);
- VecDateTimeValue date;
- date.convert_dt_to_vec_dt(&value);
- assert_cast<ColumnVector<Int64>*>(column_ptr)
- ->insert_data(reinterpret_cast<char*>(&date), 0);
- break;
- }
- case TYPE_DATE: {
- DateTimeValue value = *reinterpret_cast<DateTimeValue*>(value_ptr);
- VecDateTimeValue date;
- date.convert_dt_to_vec_dt(&value);
- assert_cast<ColumnVector<Int64>*>(column_ptr)
- ->insert_data(reinterpret_cast<char*>(&date), 0);
- break;
- }
- default: {
- break;
- }
- }
+ return Status::OK();
+}
+
+Status VBrokerScanner::_fill_columns_from_path(int start,
+ const std::vector<std::string>& columns_from_path,
+ std::vector<MutableColumnPtr>& columns) {
+ // values of columns from path can not be null
+ for (int i = 0; i < columns_from_path.size(); ++i) {
+ int dest_index = i + start;
+ auto slot_desc = _src_slot_descs.at(dest_index);
+ const std::string& column_from_path = columns_from_path[i];
+ RETURN_IF_ERROR(_write_text_column(const_cast<char*>(column_from_path.c_str()),
+ column_from_path.size(), slot_desc, &columns[dest_index],
+ _state));
+ }
+ return Status::OK();
+}
+
+Status VBrokerScanner::_write_text_column(char* value, int value_length, SlotDescriptor* slot,
+ vectorized::MutableColumnPtr* column_ptr,
+ RuntimeState* state) {
+ if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) {
+ std::stringstream ss;
+ ss << "Fail to convert text value:'" << value << "' to " << slot->type() << " on column:`"
+ << slot->col_name() + "`";
+ return Status::InternalError(ss.str());
}
- _success = true;
return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h
index 469e38d4e8..57c4004728 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -27,17 +27,26 @@ public:
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
- ~VBrokerScanner() override = default;
+ ~VBrokerScanner();
virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof,
bool* fill_tuple) override {
return Status::NotSupported("Not Implemented get next");
}
- virtual Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof) override;
+ Status get_next(Block* block, bool* eof) override;
private:
- Status _convert_one_row(const Slice& line, std::vector<MutableColumnPtr>& columns);
- Status _fill_dest_columns(std::vector<MutableColumnPtr>& columns);
+ std::unique_ptr<TextConverter> _text_converter;
+
+ Status _write_text_column(char* value, int length, SlotDescriptor* slot,
+ MutableColumnPtr* column_ptr, RuntimeState* state);
+
+ Status _fill_dest_block(Block* block, std::vector<MutableColumnPtr>& columns);
+
+ Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);
+
+ Status _fill_columns_from_path(int start, const std::vector<std::string>& columns_from_path,
+ std::vector<MutableColumnPtr>& columns);
};
} // namespace doris::vectorized
diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp
index 9a99c354bb..3712f6b141 100644
--- a/be/test/exec/multi_bytes_separator_test.cpp
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -37,7 +37,10 @@ namespace doris {
class MultiBytesSeparatorTest : public testing::Test {
public:
- MultiBytesSeparatorTest() {}
+ MultiBytesSeparatorTest() : _runtime_state(TQueryGlobals()) {}
+
+private:
+ RuntimeState _runtime_state;
protected:
virtual void SetUp() {}
@@ -56,8 +59,8 @@ TEST_F(MultiBytesSeparatorTest, normal) {
const std::vector<TBrokerRangeDesc> ranges;
const std::vector<TNetworkAddress> broker_addresses;
const std::vector<TExpr> pre_filter_texprs;
- BrokerScanner scanner(nullptr, nullptr, params, ranges, broker_addresses, pre_filter_texprs,
- nullptr);
+ BrokerScanner scanner(&_runtime_state, nullptr, params, ranges, broker_addresses,
+ pre_filter_texprs, nullptr);
#define private public
diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp b/be/test/vec/exec/vbroker_scan_node_test.cpp
index c0c091d9d6..a0d11d3ac9 100644
--- a/be/test/vec/exec/vbroker_scan_node_test.cpp
+++ b/be/test/vec/exec/vbroker_scan_node_test.cpp
@@ -46,6 +46,7 @@ public:
VBrokerScanNodeTest() : _runtime_state(TQueryGlobals()) {
init();
_runtime_state._instance_mem_tracker.reset(new MemTracker());
+ _runtime_state._query_options.enable_vectorized_engine = true;
}
void init();
static void SetUpTestCase() {
@@ -277,7 +278,7 @@ void VBrokerScanNodeTest::init_desc_table() {
type.types.push_back(node);
}
slot_desc.slotType = type;
- slot_desc.columnPos = 1;
+ slot_desc.columnPos = 2;
slot_desc.byteOffset = 32;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = -1;
@@ -304,7 +305,7 @@ void VBrokerScanNodeTest::init_desc_table() {
type.types.push_back(node);
}
slot_desc.slotType = type;
- slot_desc.columnPos = 1;
+ slot_desc.columnPos = 3;
slot_desc.byteOffset = 48;
slot_desc.nullIndicatorByte = 0;
slot_desc.nullIndicatorBit = -1;
@@ -466,37 +467,31 @@ TEST_F(VBrokerScanNodeTest, normal) {
doris::vectorized::Block block;
bool eos = false;
status = scan_node.get_next(&_runtime_state, &block, &eos);
- ASSERT_EQ(3, block.rows());
+ ASSERT_EQ(4, block.rows());
ASSERT_EQ(4, block.columns());
- ASSERT_FALSE(eos);
-
- auto columns = block.get_columns();
- ASSERT_EQ(columns[0]->get_int(0), 1);
- ASSERT_EQ(columns[0]->get_int(1), 4);
- ASSERT_EQ(columns[0]->get_int(2), 8);
-
- ASSERT_EQ(columns[1]->get_int(0), 2);
- ASSERT_EQ(columns[1]->get_int(1), 5);
- ASSERT_EQ(columns[1]->get_int(2), 9);
+ ASSERT_TRUE(eos);
- ASSERT_EQ(columns[2]->get_int(0), 3);
- ASSERT_EQ(columns[2]->get_int(1), 6);
- ASSERT_EQ(columns[2]->get_int(2), 10);
+ auto columns = block.get_columns_with_type_and_name();
+ ASSERT_EQ(columns.size(), 4);
+ ASSERT_EQ(columns[0].to_string(0), "1");
+ ASSERT_EQ(columns[0].to_string(1), "4");
+ ASSERT_EQ(columns[0].to_string(2), "8");
+ ASSERT_EQ(columns[0].to_string(3), "4");
- ASSERT_EQ(columns[3]->get_int(0), 1);
- ASSERT_EQ(columns[3]->get_int(1), 1);
- ASSERT_EQ(columns[3]->get_int(2), 1);
+ ASSERT_EQ(columns[1].to_string(0), "2");
+ ASSERT_EQ(columns[1].to_string(1), "5");
+ ASSERT_EQ(columns[1].to_string(2), "9");
+ ASSERT_EQ(columns[1].to_string(3), "5");
- block.clear();
- status = scan_node.get_next(&_runtime_state, &block, &eos);
- ASSERT_EQ(1, block.rows());
- ASSERT_FALSE(eos);
+ ASSERT_EQ(columns[2].to_string(0), "3");
+ ASSERT_EQ(columns[2].to_string(1), "6");
+ ASSERT_EQ(columns[2].to_string(2), "10");
+ ASSERT_EQ(columns[2].to_string(3), "6");
- columns = block.get_columns();
- ASSERT_EQ(columns[0]->get_int(0), 4);
- ASSERT_EQ(columns[1]->get_int(0), 5);
- ASSERT_EQ(columns[2]->get_int(0), 6);
- ASSERT_EQ(columns[3]->get_int(0), 2);
+ ASSERT_EQ(columns[3].to_string(0), "1");
+ ASSERT_EQ(columns[3].to_string(1), "1");
+ ASSERT_EQ(columns[3].to_string(2), "1");
+ ASSERT_EQ(columns[3].to_string(3), "2");
block.clear();
status = scan_node.get_next(&_runtime_state, &block, &eos);
@@ -610,20 +605,21 @@ TEST_F(VBrokerScanNodeTest, where_binary_pre) {
ASSERT_EQ(2, block.rows());
ASSERT_EQ(4, block.columns());
- auto columns = block.get_columns();
- ASSERT_EQ(columns[0]->get_int(0), 1);
- ASSERT_EQ(columns[0]->get_int(1), 4);
+ auto columns = block.get_columns_with_type_and_name();
+ ASSERT_EQ(columns.size(), 4);
+ ASSERT_EQ(columns[0].to_string(0), "1");
+ ASSERT_EQ(columns[0].to_string(1), "4");
- ASSERT_EQ(columns[1]->get_int(0), 2);
- ASSERT_EQ(columns[1]->get_int(1), 5);
+ ASSERT_EQ(columns[1].to_string(0), "2");
+ ASSERT_EQ(columns[1].to_string(1), "5");
- ASSERT_EQ(columns[2]->get_int(0), 3);
- ASSERT_EQ(columns[2]->get_int(1), 6);
+ ASSERT_EQ(columns[2].to_string(0), "3");
+ ASSERT_EQ(columns[2].to_string(1), "6");
- ASSERT_EQ(columns[3]->get_int(0), 1);
- ASSERT_EQ(columns[3]->get_int(1), 1);
+ ASSERT_EQ(columns[3].to_string(0), "1");
+ ASSERT_EQ(columns[3].to_string(1), "1");
- ASSERT_FALSE(eos);
+ ASSERT_TRUE(eos);
block.clear();
status = scan_node.get_next(&_runtime_state, &block, &eos);
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 2d80401cd8..5ead638bef 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -362,28 +362,25 @@ TEST_F(VBrokerScannerTest, normal) {
auto st = scanner.open();
ASSERT_TRUE(st.ok());
- int slot_count = 3;
- auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id);
- std::vector<vectorized::MutableColumnPtr> columns(slot_count);
- for (int i = 0; i < slot_count; i++) {
- columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
- }
+ std::unique_ptr<vectorized::Block> block(new vectorized::Block());
bool eof = false;
- st = scanner.get_next(columns, &eof);
+ st = scanner.get_next(block.get(), &eof);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(eof);
+ auto columns = block->get_columns();
+ ASSERT_EQ(columns.size(), 3);
- ASSERT_EQ(columns[0]->get_int(0), 1);
- ASSERT_EQ(columns[0]->get_int(1), 4);
- ASSERT_EQ(columns[0]->get_int(2), 8);
+ ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1");
+ ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "4");
+ ASSERT_EQ(columns[0]->get_data_at(2).to_string(), "8");
- ASSERT_EQ(columns[1]->get_int(0), 2);
- ASSERT_EQ(columns[1]->get_int(1), 5);
- ASSERT_EQ(columns[1]->get_int(2), 9);
+ ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2");
+ ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "5");
+ ASSERT_EQ(columns[1]->get_data_at(2).to_string(), "9");
- ASSERT_EQ(columns[2]->get_int(0), 3);
- ASSERT_EQ(columns[2]->get_int(1), 6);
- ASSERT_EQ(columns[2]->get_int(2), 10);
+ ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3");
+ ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "6");
+ ASSERT_EQ(columns[2]->get_data_at(2).to_string(), "10");
}
TEST_F(VBrokerScannerTest, normal2) {
@@ -408,26 +405,22 @@ TEST_F(VBrokerScannerTest, normal2) {
auto st = scanner.open();
ASSERT_TRUE(st.ok());
- int slot_count = 3;
- auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id);
- std::vector<vectorized::MutableColumnPtr> columns(slot_count);
- for (int i = 0; i < slot_count; i++) {
- columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
- }
-
+ std::unique_ptr<vectorized::Block> block(new vectorized::Block());
bool eof = false;
- st = scanner.get_next(columns, &eof);
+ st = scanner.get_next(block.get(), &eof);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(eof);
+ auto columns = block->get_columns();
+ ASSERT_EQ(columns.size(), 3);
- ASSERT_EQ(columns[0]->get_int(0), 1);
- ASSERT_EQ(columns[0]->get_int(1), 3);
+ ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1");
+ ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "3");
- ASSERT_EQ(columns[1]->get_int(0), 2);
- ASSERT_EQ(columns[1]->get_int(1), 4);
+ ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2");
+ ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "4");
- ASSERT_EQ(columns[2]->get_int(0), 3);
- ASSERT_EQ(columns[2]->get_int(1), 5);
+ ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3");
+ ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "5");
}
TEST_F(VBrokerScannerTest, normal5) {
@@ -446,18 +439,15 @@ TEST_F(VBrokerScannerTest, normal5) {
auto st = scanner.open();
ASSERT_TRUE(st.ok());
- int slot_count = 3;
- auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id);
- std::vector<vectorized::MutableColumnPtr> columns(slot_count);
- for (int i = 0; i < slot_count; i++) {
- columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
- }
+ std::unique_ptr<vectorized::Block> block(new vectorized::Block());
bool eof = false;
// end of file
- st = scanner.get_next(columns, &eof);
+ st = scanner.get_next(block.get(), &eof);
ASSERT_TRUE(st.ok());
ASSERT_TRUE(eof);
- ASSERT_EQ(columns[0]->size(), 0);
+ auto columns = block->get_columns();
+ ASSERT_EQ(columns.size(), 0);
}
+
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org