You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/20 03:43:08 UTC
[incubator-doris] branch master updated: [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner (#9666)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8fa677b59c [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner (#9666)
8fa677b59c is described below
commit 8fa677b59cc1567c0c81ef2fdbd25c1d319fe8db
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Fri May 20 11:43:03 2022 +0800
[Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner (#9666)
* [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner
1. fix bug of vjson scanner not support `range_from_file_path`
2. fix bug of vjson/vbrocker scanner core dump by src/dest slot nullable is different
3. fix bug of vparquest filter_block reference of column in not 1
4. refactor code to simple all the code
It only changed vectorized load, not original row based load.
Co-authored-by: lihaopeng <li...@baidu.com>
---
be/src/exec/base_scanner.cpp | 184 +++++++++++++++++----
be/src/exec/base_scanner.h | 30 ++--
be/src/exec/broker_scanner.cpp | 5 +-
be/src/exec/broker_scanner.h | 4 -
be/src/exec/json_scanner.cpp | 5 +-
be/src/exec/json_scanner.h | 4 -
be/src/exec/orc_scanner.cpp | 5 +-
be/src/exec/orc_scanner.h | 4 -
be/src/exec/parquet_scanner.cpp | 15 +-
be/src/exec/parquet_scanner.h | 5 -
be/src/vec/core/block.cpp | 18 +-
be/src/vec/core/block.h | 3 +
be/src/vec/data_types/data_type_nullable.cpp | 2 +-
be/src/vec/exec/vbroker_scanner.cpp | 125 +-------------
be/src/vec/exec/vbroker_scanner.h | 5 -
be/src/vec/exec/vjson_scanner.cpp | 12 +-
be/src/vec/exec/vparquet_scanner.cpp | 122 ++------------
be/src/vec/exec/vparquet_scanner.h | 6 +-
be/test/vec/exec/vbroker_scanner_test.cpp | 37 +++--
be/test/vec/exec/vjson_scanner_test.cpp | 29 ++--
.../apache/doris/load/loadv2/LoadLoadingTask.java | 2 +
.../doris/load/loadv2/LoadingTaskPlanner.java | 4 +
22 files changed, 268 insertions(+), 358 deletions(-)
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index c4e1b5c056..e06b52de2f 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -28,14 +28,20 @@
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
+#include "vec/data_types/data_type_factory.hpp"
namespace doris {
BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
+ const std::vector<TBrokerRangeDesc>& ranges,
+ const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: _state(state),
_params(params),
+ _ranges(ranges),
+ _broker_addresses(broker_addresses),
+ _next_range(0),
_counter(counter),
_src_tuple(nullptr),
_src_tuple_row(nullptr),
@@ -71,6 +77,22 @@ Status BaseScanner::open() {
_rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
_materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)");
+
+ DCHECK(!_ranges.empty());
+ const auto& range = _ranges[0];
+ _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+ ? implicit_cast<int>(range.num_of_columns_from_file)
+ : implicit_cast<int>(_src_slot_descs.size());
+
+ // check consistency
+ if (range.__isset.num_of_columns_from_file) {
+ int size = range.columns_from_path.size();
+ for (const auto& r : _ranges) {
+ if (r.columns_from_path.size() != size) {
+ return Status::InternalError("ranges have different number of columns.");
+ }
+ }
+ }
return Status::OK();
}
@@ -272,59 +294,135 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
}
void* slot = dest_tuple->get_slot(slot_desc->tuple_offset());
RawValue::write(value, slot, slot_desc->type(), mem_pool);
- continue;
}
_success = true;
return Status::OK();
}
-Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) {
+Status BaseScanner::_filter_src_block() {
+ auto origin_column_num = _src_block.columns();
// filter block
if (!_vpre_filter_ctxs.empty()) {
for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
- auto old_rows = temp_block->rows();
- RETURN_IF_ERROR(
- vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num));
- _counter->num_rows_unselected += old_rows - temp_block->rows();
+ auto old_rows = _src_block.rows();
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block,
+ origin_column_num));
+ _counter->num_rows_unselected += old_rows - _src_block.rows();
}
}
return Status::OK();
}
-Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) {
+Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
// Do vectorized expr here
- Status status;
- if (!_dest_vexpr_ctx.empty()) {
- *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
- _dest_vexpr_ctx, *temp_block, status);
- if (UNLIKELY(output_block->rows() == 0)) {
- return status;
+ int ctx_idx = 0;
+ size_t rows = _src_block.rows();
+ auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
+ auto& filter_map = filter_column->get_data();
+
+ for (auto slot_desc : _dest_tuple_desc->slots()) {
+ if (!slot_desc->is_materialized()) {
+ continue;
+ }
+ int dest_index = ctx_idx++;
+
+ auto* ctx = _dest_vexpr_ctx[dest_index];
+ int result_column_id = -1;
+ // PT1 => dest primitive type
+ RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
+ auto column_ptr = _src_block.get_by_position(result_column_id).column;
+
+ // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
+ // is likely to be nullable
+ if (LIKELY(column_ptr->is_nullable())) {
+ auto nullable_column =
+ reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
+ for (int i = 0; i < rows; ++i) {
+ if (filter_map[i] && nullable_column->is_null_at(i)) {
+ if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
+ !_src_block.get_by_position(dest_index).column->is_null_at(i)) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return _src_block.dump_one_line(i, _num_of_columns_from_file);
+ },
+ [&]() -> std::string {
+ auto raw_value =
+ _src_block.get_by_position(ctx_idx).column->get_data_at(
+ i);
+ std::string raw_string = raw_value.to_string();
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg,
+ "column({}) value is incorrect while strict "
+ "mode is {}, "
+ "src value is {}",
+ slot_desc->col_name(), _strict_mode, raw_string);
+ return fmt::to_string(error_msg);
+ },
+ &_scanner_eof));
+ filter_map[i] = false;
+ } else if (!slot_desc->is_nullable()) {
+ RETURN_IF_ERROR(_state->append_error_msg_to_file(
+ [&]() -> std::string {
+ return _src_block.dump_one_line(i, _num_of_columns_from_file);
+ },
+ [&]() -> std::string {
+ fmt::memory_buffer error_msg;
+ fmt::format_to(error_msg,
+ "column({}) values is null while columns is not "
+ "nullable",
+ slot_desc->col_name());
+ return fmt::to_string(error_msg);
+ },
+ &_scanner_eof));
+ filter_map[i] = false;
+ }
+ }
+ }
+ if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr();
+ } else if (slot_desc->is_nullable()) {
+ column_ptr = vectorized::make_nullable(column_ptr);
}
+ dest_block->insert(vectorized::ColumnWithTypeAndName(
+ std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}
+ // after do the dest block insert operation, clear _src_block to remove the reference of origin column
+ _src_block.clear();
+
+ size_t dest_size = dest_block->columns();
+ // do filter
+ dest_block->insert(vectorized::ColumnWithTypeAndName(
+ std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(),
+ "filter column"));
+ RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size));
+ _counter->num_rows_filtered += rows - dest_block->rows();
+
return Status::OK();
}
-Status BaseScanner::fill_dest_block(vectorized::Block* dest_block,
- std::vector<vectorized::MutableColumnPtr>& columns) {
- if (columns.empty() || columns[0]->size() == 0) {
- return Status::OK();
- }
-
- std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
- auto n_columns = 0;
- for (const auto slot_desc : _src_slot_descs) {
- temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
- slot_desc->get_data_type_ptr(),
- slot_desc->col_name()));
+// TODO: opt the reuse of src_block or dest_block column. some case we have to
+// shallow copy the column of src_block to dest block
+Status BaseScanner::_init_src_block() {
+ DCHECK(_src_block.columns() == 0);
+ for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+ SlotDescriptor* slot_desc = _src_slot_descs[i];
+ if (slot_desc == nullptr) {
+ continue;
+ }
+ auto data_type = slot_desc->get_data_type_ptr();
+ _src_block.insert(vectorized::ColumnWithTypeAndName(
+ data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}
- RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size()));
+ return Status::OK();
+}
- if (_dest_vexpr_ctx.empty()) {
- *dest_block = *temp_block;
- } else {
- RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get()));
+Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) {
+ *eof = _scanner_eof;
+ _fill_columns_from_path();
+ if (LIKELY(_src_block.rows() > 0)) {
+ RETURN_IF_ERROR(BaseScanner::_filter_src_block());
+ RETURN_IF_ERROR(BaseScanner::_materialize_dest_block(dest_block));
}
return Status::OK();
@@ -337,7 +435,7 @@ void BaseScanner::fill_slots_of_columns_from_path(
auto slot_desc = _src_slot_descs.at(i + start);
_src_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = _src_tuple->get_slot(slot_desc->tuple_offset());
- StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
+ auto* str_slot = reinterpret_cast<StringValue*>(slot);
const std::string& column_from_path = columns_from_path[i];
str_slot->ptr = const_cast<char*>(column_from_path.c_str());
str_slot->len = column_from_path.size();
@@ -360,4 +458,28 @@ void BaseScanner::close() {
}
}
+void BaseScanner::_fill_columns_from_path() {
+ const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
+ if (range.__isset.num_of_columns_from_file) {
+ size_t start = range.num_of_columns_from_file;
+ size_t rows = _src_block.rows();
+
+ for (size_t i = 0; i < range.columns_from_path.size(); ++i) {
+ auto slot_desc = _src_slot_descs.at(i + start);
+ if (slot_desc == nullptr) continue;
+ auto is_nullable = slot_desc->is_nullable();
+ auto data_type = vectorized::DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
+ is_nullable);
+ auto data_column = data_type->create_column();
+ const std::string& column_from_path = range.columns_from_path[i];
+ for (size_t j = 0; j < rows; ++j) {
+ data_column->insert_data(const_cast<char*>(column_from_path.c_str()),
+ column_from_path.size());
+ }
+ _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), data_type,
+ slot_desc->col_name()));
+ }
+ }
+}
+
} // namespace doris
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 2fccc62db8..1c2ce211b5 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -53,7 +53,10 @@ struct ScannerCounter {
class BaseScanner {
public:
BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
+ const std::vector<TBrokerRangeDesc>& ranges,
+ const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+
virtual ~BaseScanner() {
Expr::close(_dest_expr_ctx, _state);
if (_state->enable_vectorized_exec()) {
@@ -77,21 +80,22 @@ public:
virtual void close() = 0;
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
- Status fill_dest_block(vectorized::Block* dest_block,
- std::vector<vectorized::MutableColumnPtr>& columns);
-
void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);
void free_expr_local_allocations();
- Status filter_block(vectorized::Block* temp_block, size_t slot_num);
-
- Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block);
-
protected:
+ Status _fill_dest_block(vectorized::Block* dest_block, bool* eof);
+ virtual Status _init_src_block();
+
RuntimeState* _state;
const TBrokerScanRangeParams& _params;
+
+ //const TBrokerScanRangeParams& _params;
+ const std::vector<TBrokerRangeDesc>& _ranges;
+ const std::vector<TNetworkAddress>& _broker_addresses;
+ int _next_range;
// used for process stat
ScannerCounter* _counter;
@@ -109,9 +113,6 @@ protected:
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
- // for vectorized
- std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
- std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
@@ -135,7 +136,16 @@ protected:
bool _success = false;
bool _scanner_eof = false;
+ // for vectorized load
+ std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
+ std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
+ vectorized::Block _src_block;
+ int _num_of_columns_from_file;
+
private:
+ Status _filter_src_block();
+ void _fill_columns_from_path();
+ Status _materialize_dest_block(vectorized::Block* output_block);
Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
};
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index d9453fecf0..2c21fd3d54 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -48,13 +48,10 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
- : BaseScanner(state, profile, params, pre_filter_texprs, counter),
- _ranges(ranges),
- _broker_addresses(broker_addresses),
+ : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_decompressor(nullptr),
- _next_range(0),
_cur_line_reader_eof(false),
_skip_lines(0) {
if (params.__isset.column_separator_length && params.column_separator_length > 1) {
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 2b6ac2d302..f10ce68518 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -100,9 +100,6 @@ private:
Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);
protected:
- const std::vector<TBrokerRangeDesc>& _ranges;
- const std::vector<TNetworkAddress>& _broker_addresses;
-
std::string _value_separator;
std::string _line_delimiter;
TFileFormatType::type _file_format_type;
@@ -113,7 +110,6 @@ protected:
FileReader* _cur_file_reader;
LineReader* _cur_line_reader;
Decompressor* _cur_decompressor;
- int _next_range;
bool _cur_line_reader_eof;
// When we fetch range start from 0, header_type="csv_with_names" skip first line
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index a23ce44b03..0be3d4c089 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -40,13 +40,10 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
- : BaseScanner(state, profile, params, pre_filter_texprs, counter),
- _ranges(ranges),
- _broker_addresses(broker_addresses),
+ : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_json_reader(nullptr),
- _next_range(0),
_cur_reader_eof(false),
_read_json_by_line(false) {
if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) {
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index 276b2dd077..ab2f479e60 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -78,9 +78,6 @@ protected:
bool& num_as_string, bool& fuzzy_parse);
protected:
- const std::vector<TBrokerRangeDesc>& _ranges;
- const std::vector<TNetworkAddress>& _broker_addresses;
-
std::string _jsonpath;
std::string _jsonpath_file;
@@ -91,7 +88,6 @@ protected:
FileReader* _cur_file_reader;
LineReader* _cur_line_reader;
JsonReader* _cur_json_reader;
- int _next_range;
bool _cur_reader_eof;
bool _read_json_by_line;
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index fdda223c1a..138eb729ef 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -120,11 +120,8 @@ ORCScanner::ORCScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
- : BaseScanner(state, profile, params, pre_filter_texprs, counter),
- _ranges(ranges),
- _broker_addresses(broker_addresses),
+ : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
// _splittable(params.splittable),
- _next_range(0),
_cur_file_eof(true),
_total_groups(0),
_current_group(0),
diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h
index c13e331fcd..7ee4ab0b61 100644
--- a/be/src/exec/orc_scanner.h
+++ b/be/src/exec/orc_scanner.h
@@ -47,11 +47,7 @@ private:
Status open_next_reader();
private:
- const std::vector<TBrokerRangeDesc>& _ranges;
- const std::vector<TNetworkAddress>& _broker_addresses;
-
// Reader
- int _next_range;
bool _cur_file_eof;
// orc file reader object
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 3295dc4bc7..c6cb02e8c2 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -25,9 +25,6 @@
#include "exec/parquet_reader.h"
#include "exec/s3_reader.h"
#include "exec/text_converter.h"
-#include "exec/text_converter.hpp"
-#include "exprs/expr.h"
-#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/raw_value.h"
#include "runtime/stream_load/load_stream_mgr.h"
@@ -41,12 +38,9 @@ ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
- : BaseScanner(state, profile, params, pre_filter_texprs, counter),
- _ranges(ranges),
- _broker_addresses(broker_addresses),
+ : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
// _splittable(params.splittable),
_cur_file_reader(nullptr),
- _next_range(0),
_cur_file_eof(false) {}
ParquetScanner::~ParquetScanner() {
@@ -83,11 +77,8 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break; // break always
}
- if (_scanner_eof) {
- *eof = true;
- } else {
- *eof = false;
- }
+
+ *eof = _scanner_eof;
return Status::OK();
}
diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h
index 535d3fe6c5..d66802dd95 100644
--- a/be/src/exec/parquet_scanner.h
+++ b/be/src/exec/parquet_scanner.h
@@ -74,13 +74,8 @@ protected:
Status open_next_reader();
protected:
- //const TBrokerScanRangeParams& _params;
- const std::vector<TBrokerRangeDesc>& _ranges;
- const std::vector<TNetworkAddress>& _broker_addresses;
-
// Reader
ParquetReaderWrap* _cur_file_reader;
- int _next_range;
bool _cur_file_eof; // is read over?
// used to hold current StreamLoadPipe
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8d5880531a..aa482fcfbf 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -353,8 +353,8 @@ std::string Block::dump_names() const {
std::string Block::dump_data(size_t begin, size_t row_limit) const {
std::vector<std::string> headers;
std::vector<size_t> headers_size;
- for (auto it = data.begin(); it != data.end(); ++it) {
- std::string s = fmt::format("{}({})", it->name, it->type->get_name());
+ for (const auto& it : data) {
+ std::string s = fmt::format("{}({})", it.name, it.type->get_name());
headers_size.push_back(s.size() > 15 ? s.size() : 15);
headers.emplace_back(s);
}
@@ -402,6 +402,20 @@ std::string Block::dump_data(size_t begin, size_t row_limit) const {
return out.str();
}
+std::string Block::dump_one_line(size_t row, int column_end) const {
+ assert(column_end < columns());
+ fmt::memory_buffer line;
+ for (int i = 0; i < column_end; ++i) {
+ if (LIKELY(i != 0)) {
+ // TODO: need more effective function of to string. now the impl is slow
+ fmt::format_to(line, " {}", data[i].to_string(row));
+ } else {
+ fmt::format_to(line, "{}", data[i].to_string(row));
+ }
+ }
+ return fmt::to_string(line);
+}
+
std::string Block::dump_structure() const {
// WriteBufferFromOwnString out;
std::stringstream out;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 375ef6906f..729f531291 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -242,6 +242,9 @@ public:
/** Get block data in string. */
std::string dump_data(size_t begin = 0, size_t row_limit = 100) const;
+ /** Get one line data from block, only use in load data */
+ std::string dump_one_line(size_t row, int column_end) const;
+
static Status filter_block(Block* block, int filter_conlumn_id, int column_to_keep);
static void erase_useless_column(Block* block, int column_to_keep) {
diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp
index 5c3730b860..9e63fa5e24 100644
--- a/be/src/vec/data_types/data_type_nullable.cpp
+++ b/be/src/vec/data_types/data_type_nullable.cpp
@@ -47,7 +47,7 @@ std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) c
assert_cast<const ColumnNullable&>(*column.convert_to_full_column_if_const().get());
if (col.is_null_at(row_num)) {
- return "\\N";
+ return "NULL";
} else {
return nested_data_type->to_string(col.get_nested_column(), row_num);
}
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index dea6d55da5..3006dba788 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -44,18 +44,14 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
_text_converter.reset(new (std::nothrow) TextConverter('\\'));
}
-VBrokerScanner::~VBrokerScanner() {}
+VBrokerScanner::~VBrokerScanner() = default;
Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
SCOPED_TIMER(_read_timer);
+ RETURN_IF_ERROR(_init_src_block());
const int batch_size = _state->batch_size();
- // Get batch lines
- int slot_num = _src_slot_descs.size();
- std::vector<vectorized::MutableColumnPtr> columns(slot_num);
- for (int i = 0; i < slot_num; i++) {
- columns[i] = _src_slot_descs[i]->get_empty_mutable_column();
- }
+ auto columns = _src_block.mutate_columns();
while (columns[0]->size() < batch_size && !_scanner_eof) {
if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
@@ -85,51 +81,8 @@ Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
}
}
}
- if (_scanner_eof) {
- *eof = true;
- } else {
- *eof = false;
- }
- return _fill_dest_block(output_block, columns);
-}
-
-Status VBrokerScanner::_fill_dest_block(Block* dest_block, std::vector<MutableColumnPtr>& columns) {
- if (columns.empty() || columns[0]->size() == 0) {
- return Status::OK();
- }
-
- std::unique_ptr<vectorized::Block> tmp_block(new vectorized::Block());
- auto n_columns = 0;
- for (const auto slot_desc : _src_slot_descs) {
- tmp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
- slot_desc->get_data_type_ptr(),
- slot_desc->col_name()));
- }
- auto old_rows = tmp_block->rows();
- // filter
- if (!_vpre_filter_ctxs.empty()) {
- for (auto vexpr_ctx : _vpre_filter_ctxs) {
- RETURN_IF_ERROR(VExprContext::filter_block(vexpr_ctx, tmp_block.get(),
- _dest_tuple_desc->slots().size()));
- _counter->num_rows_unselected += old_rows - tmp_block->rows();
- old_rows = tmp_block->rows();
- }
- }
-
- Status status;
- // expr
- if (!_dest_vexpr_ctx.empty()) {
- *dest_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
- _dest_vexpr_ctx, *tmp_block, status);
- if (UNLIKELY(dest_block->rows() == 0)) {
- _success = false;
- return status;
- }
- } else {
- *dest_block = *tmp_block;
- }
- return status;
+ return _fill_dest_block(output_block, eof);
}
Status VBrokerScanner::_fill_dest_columns(const Slice& line,
@@ -151,57 +104,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
const Slice& value = _split_values[i];
if (is_null(value)) {
- // If _strict_mode is false, _src_slot_descs_order_by_dest size could be zero
- if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index] != nullptr) &&
- !_src_tuple->is_null(
- _src_slot_descs_order_by_dest[dest_index]->null_indicator_offset())) {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return _src_tuple_row->to_string(*(_row_desc.get()));
- },
- [&]() -> std::string {
- // Type of the slot is must be Varchar in _src_tuple.
- StringValue* raw_value = _src_tuple->get_string_slot(
- _src_slot_descs_order_by_dest[dest_index]->tuple_offset());
- std::string raw_string;
- if (raw_value != nullptr) { //is not null then get raw value
- raw_string = raw_value->to_string();
- }
- fmt::memory_buffer error_msg;
- fmt::format_to(error_msg,
- "column({}) value is incorrect while strict mode is {}, "
- "src value is {}",
- src_slot_desc->col_name(), _strict_mode, raw_string);
- return error_msg.data();
- },
- &_scanner_eof));
- _counter->num_rows_filtered++;
- _success = false;
- return Status::OK();
- }
-
- if (!src_slot_desc->is_nullable()) {
- RETURN_IF_ERROR(_state->append_error_msg_to_file(
- [&]() -> std::string {
- return _src_tuple_row->to_string(*(_row_desc.get()));
- },
- [&]() -> std::string {
- fmt::memory_buffer error_msg;
- fmt::format_to(
- error_msg,
- "column({}) values is null while columns is not nullable",
- src_slot_desc->col_name());
- return error_msg.data();
- },
- &_scanner_eof));
- _counter->num_rows_filtered++;
- _success = false;
- return Status::OK();
- }
// nullable
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
- nullable_column->insert_data(nullptr, 0);
+ nullable_column->insert_default();
continue;
}
@@ -209,27 +115,6 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
&columns[dest_index], _state));
}
- const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
- if (range.__isset.num_of_columns_from_file) {
- RETURN_IF_ERROR(_fill_columns_from_path(range.num_of_columns_from_file,
- range.columns_from_path, columns));
- }
-
- return Status::OK();
-}
-
-Status VBrokerScanner::_fill_columns_from_path(int start,
- const std::vector<std::string>& columns_from_path,
- std::vector<MutableColumnPtr>& columns) {
- // values of columns from path can not be null
- for (int i = 0; i < columns_from_path.size(); ++i) {
- int dest_index = i + start;
- auto slot_desc = _src_slot_descs.at(dest_index);
- const std::string& column_from_path = columns_from_path[i];
- RETURN_IF_ERROR(_write_text_column(const_cast<char*>(column_from_path.c_str()),
- column_from_path.size(), slot_desc, &columns[dest_index],
- _state));
- }
return Status::OK();
}
diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h
index 57c4004728..11d8b494fa 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -42,11 +42,6 @@ private:
Status _write_text_column(char* value, int length, SlotDescriptor* slot,
MutableColumnPtr* column_ptr, RuntimeState* state);
- Status _fill_dest_block(Block* block, std::vector<MutableColumnPtr>& columns);
-
Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);
-
- Status _fill_columns_from_path(int start, const std::vector<std::string>& columns_from_path,
- std::vector<MutableColumnPtr>& columns);
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp
index b7645fecf3..e456e6dfc8 100644
--- a/be/src/vec/exec/vjson_scanner.cpp
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -46,14 +46,10 @@ VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile,
Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) {
SCOPED_TIMER(_read_timer);
+ RETURN_IF_ERROR(_init_src_block());
const int batch_size = _state->batch_size();
- size_t slot_num = _src_slot_descs.size();
- std::vector<vectorized::MutableColumnPtr> columns(slot_num);
- auto string_type = make_nullable(std::make_shared<DataTypeString>());
- for (int i = 0; i < slot_num; i++) {
- columns[i] = string_type->create_column();
- }
+ auto columns = _src_block.mutate_columns();
// Get one line
while (columns[0]->size() < batch_size && !_scanner_eof) {
if (_cur_file_reader == nullptr || _cur_reader_eof) {
@@ -83,10 +79,8 @@ Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) {
COUNTER_UPDATE(_rows_read_counter, columns[0]->size());
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns));
- *eof = _scanner_eof;
- return Status::OK();
+ return _fill_dest_block(output_block, eof);
}
Status VJsonScanner::open_next_reader() {
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp
index 6a891850a7..037bc15028 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -36,29 +36,15 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
: ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
counter),
_batch(nullptr),
- _arrow_batch_cur_idx(0),
- _num_of_columns_from_file(0) {}
-VParquetScanner::~VParquetScanner() {}
+ _arrow_batch_cur_idx(0) {}
+
+VParquetScanner::~VParquetScanner() = default;
Status VParquetScanner::open() {
RETURN_IF_ERROR(ParquetScanner::open());
if (_ranges.empty()) {
return Status::OK();
}
- auto range = _ranges[0];
- _num_of_columns_from_file = range.__isset.num_of_columns_from_file
- ? implicit_cast<int>(range.num_of_columns_from_file)
- : implicit_cast<int>(_src_slot_descs.size());
-
- // check consistency
- if (range.__isset.num_of_columns_from_file) {
- int size = range.columns_from_path.size();
- for (const auto& r : _ranges) {
- if (r.columns_from_path.size() != size) {
- return Status::InternalError("ranges have different number of columns.");
- }
- }
- }
return Status::OK();
}
@@ -99,9 +85,9 @@ Status VParquetScanner::_init_arrow_batch_if_necessary() {
return status;
}
-Status VParquetScanner::_init_src_block(Block* block) {
+Status VParquetScanner::_init_src_block() {
size_t batch_pos = 0;
- block->clear();
+ _src_block.clear();
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descs[i];
if (slot_desc == nullptr) {
@@ -118,7 +104,7 @@ Status VParquetScanner::_init_src_block(Block* block) {
fmt::format("Not support arrow type:{}", array->type()->name()));
}
MutableColumnPtr data_column = data_type->create_column();
- block->insert(
+ _src_block.insert(
ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
}
return Status::OK();
@@ -150,15 +136,15 @@ Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
return Status::OK();
}
}
- Block src_block;
- RETURN_IF_ERROR(_init_src_block(&src_block));
+
+ RETURN_IF_ERROR(_init_src_block());
// convert arrow batch to block until reach the batch_size
while (!_scanner_eof) {
// cast arrow type to PT0 and append it to src block
// for example: arrow::Type::INT16 => TYPE_SMALLINT
- RETURN_IF_ERROR(_append_batch_to_src_block(&src_block));
+ RETURN_IF_ERROR(_append_batch_to_src_block(&_src_block));
// finalize the src block if full
- if (src_block.rows() >= _state->batch_size()) {
+ if (_src_block.rows() >= _state->batch_size()) {
break;
}
auto status = _next_arrow_batch();
@@ -173,94 +159,14 @@ Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
_cur_file_eof = true;
break;
}
- COUNTER_UPDATE(_rows_read_counter, src_block.rows());
+ COUNTER_UPDATE(_rows_read_counter, _src_block.rows());
SCOPED_TIMER(_materialize_timer);
// cast PT0 => PT1
// for example: TYPE_SMALLINT => TYPE_VARCHAR
- RETURN_IF_ERROR(_cast_src_block(&src_block));
- // range of current file
- _fill_columns_from_path(&src_block);
- RETURN_IF_ERROR(_eval_conjunts(&src_block));
- // materialize, src block => dest columns
- RETURN_IF_ERROR(_materialize_block(&src_block, block));
- *eof = _scanner_eof;
- return Status::OK();
-}
-
-// eval conjuncts, for example: t1 > 1
-Status VParquetScanner::_eval_conjunts(Block* block) {
- for (auto& vctx : _vpre_filter_ctxs) {
- size_t orig_rows = block->rows();
- RETURN_IF_ERROR(VExprContext::filter_block(vctx, block, block->columns()));
- _counter->num_rows_unselected += orig_rows - block->rows();
- }
- return Status::OK();
-}
+ RETURN_IF_ERROR(_cast_src_block(&_src_block));
-void VParquetScanner::_fill_columns_from_path(Block* block) {
- const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
- if (range.__isset.num_of_columns_from_file) {
- size_t start = range.num_of_columns_from_file;
- size_t rows = block->rows();
- for (size_t i = 0; i < range.columns_from_path.size(); ++i) {
- auto slot_desc = _src_slot_descs.at(i + start);
- if (slot_desc == nullptr) continue;
- auto is_nullable = slot_desc->is_nullable();
- DataTypePtr data_type =
- DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, is_nullable);
- MutableColumnPtr data_column = data_type->create_column();
- const std::string& column_from_path = range.columns_from_path[i];
- for (size_t i = 0; i < rows; ++i) {
- data_column->insert_data(const_cast<char*>(column_from_path.c_str()),
- column_from_path.size());
- }
- block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
- slot_desc->col_name()));
- }
- }
-}
-
-Status VParquetScanner::_materialize_block(Block* block, Block* dest_block) {
- int ctx_idx = 0;
- size_t orig_rows = block->rows();
- auto filter_column = ColumnUInt8::create(orig_rows, 1);
- for (auto slot_desc : _dest_tuple_desc->slots()) {
- if (!slot_desc->is_materialized()) {
- continue;
- }
- int dest_index = ctx_idx++;
-
- VExprContext* ctx = _dest_vexpr_ctx[dest_index];
- int result_column_id = 0;
- // PT1 => dest primitive type
- RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
- ColumnPtr& ptr = block->safe_get_by_position(result_column_id).column;
- if (!slot_desc->is_nullable()) {
- if (auto* nullable_column = check_and_get_column<ColumnNullable>(*ptr)) {
- if (nullable_column->has_null()) {
- // fill filter if src has null value and dest column is not nullable
- IColumn::Filter& filter = assert_cast<ColumnUInt8&>(*filter_column).get_data();
- const ColumnPtr& null_column_ptr = nullable_column->get_null_map_column_ptr();
- const auto& column_data =
- assert_cast<const ColumnUInt8&>(*null_column_ptr).get_data();
- for (size_t i = 0; i < null_column_ptr->size(); ++i) {
- filter[i] &= !column_data[i];
- }
- }
- ptr = nullable_column->get_nested_column_ptr();
- }
- }
- dest_block->insert(vectorized::ColumnWithTypeAndName(
- std::move(ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
- }
- size_t dest_size = dest_block->columns();
- // do filter
- dest_block->insert(vectorized::ColumnWithTypeAndName(
- std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(),
- "filter column"));
- RETURN_IF_ERROR(Block::filter_block(dest_block, dest_size, dest_size));
- _counter->num_rows_filtered += orig_rows - dest_block->rows();
- return Status::OK();
+ // materialize, src block => dest columns
+ return _fill_dest_block(block, eof);
}
// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h
index 31749248d5..72ac280989 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -54,17 +54,13 @@ public:
private:
Status _next_arrow_batch();
Status _init_arrow_batch_if_necessary();
- Status _init_src_block(Block* block);
+ Status _init_src_block() override;
Status _append_batch_to_src_block(Block* block);
Status _cast_src_block(Block* block);
- Status _eval_conjunts(Block* block);
- Status _materialize_block(Block* block, Block* dest_block);
- void _fill_columns_from_path(Block* block);
private:
std::shared_ptr<arrow::RecordBatch> _batch;
size_t _arrow_batch_cur_idx;
- int _num_of_columns_from_file;
};
} // namespace doris::vectorized
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 1a39c1f8e0..713aefc4a7 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -41,6 +41,13 @@ public:
init();
_profile = _runtime_state.runtime_profile();
_runtime_state._instance_mem_tracker.reset(new MemTracker());
+
+ TUniqueId unique_id;
+ TQueryOptions query_options;
+ query_options.__set_enable_vectorized_engine(true);
+ TQueryGlobals query_globals;
+
+ _runtime_state.init(unique_id, query_options, query_globals, nullptr);
}
void init();
@@ -370,17 +377,17 @@ TEST_F(VBrokerScannerTest, normal) {
auto columns = block->get_columns();
ASSERT_EQ(columns.size(), 3);
- ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1");
- ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "4");
- ASSERT_EQ(columns[0]->get_data_at(2).to_string(), "8");
+ ASSERT_EQ(columns[0]->get_int(0), 1);
+ ASSERT_EQ(columns[0]->get_int(1), 4);
+ ASSERT_EQ(columns[0]->get_int(2), 8);
- ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2");
- ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "5");
- ASSERT_EQ(columns[1]->get_data_at(2).to_string(), "9");
+ ASSERT_EQ(columns[1]->get_int(0), 2);
+ ASSERT_EQ(columns[1]->get_int(1), 5);
+ ASSERT_EQ(columns[1]->get_int(2), 9);
- ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3");
- ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "6");
- ASSERT_EQ(columns[2]->get_data_at(2).to_string(), "10");
+ ASSERT_EQ(columns[2]->get_int(0), 3);
+ ASSERT_EQ(columns[2]->get_int(1), 6);
+ ASSERT_EQ(columns[2]->get_int(2), 10);
}
TEST_F(VBrokerScannerTest, normal2) {
@@ -413,14 +420,14 @@ TEST_F(VBrokerScannerTest, normal2) {
auto columns = block->get_columns();
ASSERT_EQ(columns.size(), 3);
- ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1");
- ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "3");
+ ASSERT_EQ(columns[0]->get_int(0), 1);
+ ASSERT_EQ(columns[0]->get_int(1), 3);
- ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2");
- ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "4");
+ ASSERT_EQ(columns[1]->get_int(0), 2);
+ ASSERT_EQ(columns[1]->get_int(1), 4);
- ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3");
- ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "5");
+ ASSERT_EQ(columns[2]->get_int(0), 3);
+ ASSERT_EQ(columns[2]->get_int(1), 5);
}
TEST_F(VBrokerScannerTest, normal5) {
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp
index c96772a011..393ff80f16 100644
--- a/be/test/vec/exec/vjson_scanner_test.cpp
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -47,7 +47,13 @@ public:
VJsonScannerTest() : _runtime_state(TQueryGlobals()) {
init();
_runtime_state._instance_mem_tracker.reset(new MemTracker());
- _runtime_state._exec_env = ExecEnv::GetInstance();
+
+ TUniqueId unique_id;
+ TQueryOptions query_options;
+ query_options.__set_enable_vectorized_engine(true);
+ TQueryGlobals query_globals;
+
+ _runtime_state.init(unique_id, query_options, query_globals, nullptr);
}
void init();
static void SetUpTestCase() {
@@ -391,7 +397,7 @@ void VJsonScannerTest::create_expr_info() {
TTypeNode node;
node.__set_type(TTypeNodeType::SCALAR);
TScalarType scalar_type;
- scalar_type.__set_type(TPrimitiveType::BIGINT);
+ scalar_type.__set_type(TPrimitiveType::DOUBLE);
node.__set_scalar_type(scalar_type);
int_type.types.push_back(node);
}
@@ -553,6 +559,7 @@ TEST_F(VJsonScannerTest, simple_array_json) {
range.format_type = TFileFormatType::FORMAT_JSON;
range.strip_outer_array = true;
range.__isset.strip_outer_array = true;
+ range.__set_num_as_string(true);
range.splittable = true;
range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
range.file_type = TFileType::FILE_LOCAL;
@@ -583,9 +590,9 @@ TEST_F(VJsonScannerTest, simple_array_json) {
ASSERT_EQ(columns[3].to_string(0), "8.950000");
ASSERT_EQ(columns[3].to_string(1), "12.990000");
ASSERT_EQ(columns[4].to_string(0), "1234");
- ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424.000000");
- ASSERT_EQ(columns[5].to_string(0), "1234.123400");
- ASSERT_EQ(columns[5].to_string(1), "10000000000000.001953");
+ ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424");
+ ASSERT_EQ(columns[5].to_string(0), "1234.123400000");
+ ASSERT_EQ(columns[5].to_string(1), "9999999999999.999999000");
block.clear();
status = scan_node.get_next(&_runtime_state, &block, &eof);
@@ -753,12 +760,12 @@ TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) {
auto columns = block.get_columns_with_type_and_name();
ASSERT_EQ(columns.size(), 6);
- ASSERT_EQ(columns[0].to_string(0), "\\N");
- ASSERT_EQ(columns[0].to_string(1), "\\N");
- ASSERT_EQ(columns[1].to_string(0), "\\N");
- ASSERT_EQ(columns[1].to_string(1), "\\N");
- ASSERT_EQ(columns[2].to_string(0), "\\N");
- ASSERT_EQ(columns[2].to_string(1), "\\N");
+ ASSERT_EQ(columns[0].to_string(0), "NULL");
+ ASSERT_EQ(columns[0].to_string(1), "NULL");
+ ASSERT_EQ(columns[1].to_string(0), "NULL");
+ ASSERT_EQ(columns[1].to_string(1), "NULL");
+ ASSERT_EQ(columns[2].to_string(0), "NULL");
+ ASSERT_EQ(columns[2].to_string(1), "NULL");
block.clear();
scan_node.close(&_runtime_state);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 303f080976..ae8aafa9cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
@@ -130,6 +131,7 @@ public class LoadLoadingTask extends LoadTask {
planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance);
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);
+ curCoordinator.setExecVecEngine(Config.enable_vectorized_load);
/*
* For broker load job, user only need to set mem limit by 'exec_mem_limit' property.
* And the variable 'load_mem_limit' does not make any effect.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index d74b17f490..78fa7a23b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
@@ -125,6 +126,9 @@ public class LoadingTaskPlanner {
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
+ if (Config.enable_vectorized_load) {
+ scanNode.convertToVectoriezd();
+ }
scanNodes.add(scanNode);
descTable.computeStatAndMemLayout();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org