You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/20 03:43:08 UTC

[incubator-doris] branch master updated: [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner (#9666)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fa677b59c [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner (#9666)
8fa677b59c is described below

commit 8fa677b59cc1567c0c81ef2fdbd25c1d319fe8db
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Fri May 20 11:43:03 2022 +0800

    [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner (#9666)
    
    * [Refactor][Bug-Fix][Load Vec] Refactor code of basescanner and vjson/vparquet/vbroker scanner
    1. fix bug of vjson scanner not support `range_from_file_path`
    2. fix bug of vjson/vbrocker scanner core dump by src/dest slot nullable is different
    3. fix bug of vparquest filter_block reference of column in not 1
    4. refactor code to simple all the code
    
    It only changed vectorized load, not original row based load.
    
    Co-authored-by: lihaopeng <li...@baidu.com>
---
 be/src/exec/base_scanner.cpp                       | 184 +++++++++++++++++----
 be/src/exec/base_scanner.h                         |  30 ++--
 be/src/exec/broker_scanner.cpp                     |   5 +-
 be/src/exec/broker_scanner.h                       |   4 -
 be/src/exec/json_scanner.cpp                       |   5 +-
 be/src/exec/json_scanner.h                         |   4 -
 be/src/exec/orc_scanner.cpp                        |   5 +-
 be/src/exec/orc_scanner.h                          |   4 -
 be/src/exec/parquet_scanner.cpp                    |  15 +-
 be/src/exec/parquet_scanner.h                      |   5 -
 be/src/vec/core/block.cpp                          |  18 +-
 be/src/vec/core/block.h                            |   3 +
 be/src/vec/data_types/data_type_nullable.cpp       |   2 +-
 be/src/vec/exec/vbroker_scanner.cpp                | 125 +-------------
 be/src/vec/exec/vbroker_scanner.h                  |   5 -
 be/src/vec/exec/vjson_scanner.cpp                  |  12 +-
 be/src/vec/exec/vparquet_scanner.cpp               | 122 ++------------
 be/src/vec/exec/vparquet_scanner.h                 |   6 +-
 be/test/vec/exec/vbroker_scanner_test.cpp          |  37 +++--
 be/test/vec/exec/vjson_scanner_test.cpp            |  29 ++--
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   2 +
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   4 +
 22 files changed, 268 insertions(+), 358 deletions(-)

diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index c4e1b5c056..e06b52de2f 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -28,14 +28,20 @@
 #include "runtime/raw_value.h"
 #include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
+#include "vec/data_types/data_type_factory.hpp"
 
 namespace doris {
 
 BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
                          const TBrokerScanRangeParams& params,
+                         const std::vector<TBrokerRangeDesc>& ranges,
+                         const std::vector<TNetworkAddress>& broker_addresses,
                          const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
         : _state(state),
           _params(params),
+          _ranges(ranges),
+          _broker_addresses(broker_addresses),
+          _next_range(0),
           _counter(counter),
           _src_tuple(nullptr),
           _src_tuple_row(nullptr),
@@ -71,6 +77,22 @@ Status BaseScanner::open() {
     _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
     _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
     _materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)");
+
+    DCHECK(!_ranges.empty());
+    const auto& range = _ranges[0];
+    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
+                                        ? implicit_cast<int>(range.num_of_columns_from_file)
+                                        : implicit_cast<int>(_src_slot_descs.size());
+
+    // check consistency
+    if (range.__isset.num_of_columns_from_file) {
+        int size = range.columns_from_path.size();
+        for (const auto& r : _ranges) {
+            if (r.columns_from_path.size() != size) {
+                return Status::InternalError("ranges have different number of columns.");
+            }
+        }
+    }
     return Status::OK();
 }
 
@@ -272,59 +294,135 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
         }
         void* slot = dest_tuple->get_slot(slot_desc->tuple_offset());
         RawValue::write(value, slot, slot_desc->type(), mem_pool);
-        continue;
     }
     _success = true;
     return Status::OK();
 }
 
-Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) {
+Status BaseScanner::_filter_src_block() {
+    auto origin_column_num = _src_block.columns();
     // filter block
     if (!_vpre_filter_ctxs.empty()) {
         for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
-            auto old_rows = temp_block->rows();
-            RETURN_IF_ERROR(
-                    vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num));
-            _counter->num_rows_unselected += old_rows - temp_block->rows();
+            auto old_rows = _src_block.rows();
+            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block,
+                                                                   origin_column_num));
+            _counter->num_rows_unselected += old_rows - _src_block.rows();
         }
     }
     return Status::OK();
 }
 
-Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) {
+Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
     // Do vectorized expr here
-    Status status;
-    if (!_dest_vexpr_ctx.empty()) {
-        *output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
-                _dest_vexpr_ctx, *temp_block, status);
-        if (UNLIKELY(output_block->rows() == 0)) {
-            return status;
+    int ctx_idx = 0;
+    size_t rows = _src_block.rows();
+    auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
+    auto& filter_map = filter_column->get_data();
+
+    for (auto slot_desc : _dest_tuple_desc->slots()) {
+        if (!slot_desc->is_materialized()) {
+            continue;
+        }
+        int dest_index = ctx_idx++;
+
+        auto* ctx = _dest_vexpr_ctx[dest_index];
+        int result_column_id = -1;
+        // PT1 => dest primitive type
+        RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
+        auto column_ptr = _src_block.get_by_position(result_column_id).column;
+
+        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
+        // is likely to be nullable
+        if (LIKELY(column_ptr->is_nullable())) {
+            auto nullable_column =
+                    reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
+            for (int i = 0; i < rows; ++i) {
+                if (filter_map[i] && nullable_column->is_null_at(i)) {
+                    if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
+                        !_src_block.get_by_position(dest_index).column->is_null_at(i)) {
+                        RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                                [&]() -> std::string {
+                                    return _src_block.dump_one_line(i, _num_of_columns_from_file);
+                                },
+                                [&]() -> std::string {
+                                    auto raw_value =
+                                            _src_block.get_by_position(ctx_idx).column->get_data_at(
+                                                    i);
+                                    std::string raw_string = raw_value.to_string();
+                                    fmt::memory_buffer error_msg;
+                                    fmt::format_to(error_msg,
+                                                   "column({}) value is incorrect while strict "
+                                                   "mode is {}, "
+                                                   "src value is {}",
+                                                   slot_desc->col_name(), _strict_mode, raw_string);
+                                    return fmt::to_string(error_msg);
+                                },
+                                &_scanner_eof));
+                        filter_map[i] = false;
+                    } else if (!slot_desc->is_nullable()) {
+                        RETURN_IF_ERROR(_state->append_error_msg_to_file(
+                                [&]() -> std::string {
+                                    return _src_block.dump_one_line(i, _num_of_columns_from_file);
+                                },
+                                [&]() -> std::string {
+                                    fmt::memory_buffer error_msg;
+                                    fmt::format_to(error_msg,
+                                                   "column({}) values is null while columns is not "
+                                                   "nullable",
+                                                   slot_desc->col_name());
+                                    return fmt::to_string(error_msg);
+                                },
+                                &_scanner_eof));
+                        filter_map[i] = false;
+                    }
+                }
+            }
+            if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr();
+        } else if (slot_desc->is_nullable()) {
+            column_ptr = vectorized::make_nullable(column_ptr);
         }
+        dest_block->insert(vectorized::ColumnWithTypeAndName(
+                std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
     }
 
+    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
+    _src_block.clear();
+
+    size_t dest_size = dest_block->columns();
+    // do filter
+    dest_block->insert(vectorized::ColumnWithTypeAndName(
+            std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(),
+            "filter column"));
+    RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size));
+    _counter->num_rows_filtered += rows - dest_block->rows();
+
     return Status::OK();
 }
 
-Status BaseScanner::fill_dest_block(vectorized::Block* dest_block,
-                                    std::vector<vectorized::MutableColumnPtr>& columns) {
-    if (columns.empty() || columns[0]->size() == 0) {
-        return Status::OK();
-    }
-
-    std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
-    auto n_columns = 0;
-    for (const auto slot_desc : _src_slot_descs) {
-        temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                             slot_desc->get_data_type_ptr(),
-                                                             slot_desc->col_name()));
+// TODO: opt the reuse of src_block or dest_block column. some case we have to
+// shallow copy the column of src_block to dest block
+Status BaseScanner::_init_src_block() {
+    DCHECK(_src_block.columns() == 0);
+    for (auto i = 0; i < _num_of_columns_from_file; ++i) {
+        SlotDescriptor* slot_desc = _src_slot_descs[i];
+        if (slot_desc == nullptr) {
+            continue;
+        }
+        auto data_type = slot_desc->get_data_type_ptr();
+        _src_block.insert(vectorized::ColumnWithTypeAndName(
+                data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
     }
 
-    RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size()));
+    return Status::OK();
+}
 
-    if (_dest_vexpr_ctx.empty()) {
-        *dest_block = *temp_block;
-    } else {
-        RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get()));
+Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) {
+    *eof = _scanner_eof;
+    _fill_columns_from_path();
+    if (LIKELY(_src_block.rows() > 0)) {
+        RETURN_IF_ERROR(BaseScanner::_filter_src_block());
+        RETURN_IF_ERROR(BaseScanner::_materialize_dest_block(dest_block));
     }
 
     return Status::OK();
@@ -337,7 +435,7 @@ void BaseScanner::fill_slots_of_columns_from_path(
         auto slot_desc = _src_slot_descs.at(i + start);
         _src_tuple->set_not_null(slot_desc->null_indicator_offset());
         void* slot = _src_tuple->get_slot(slot_desc->tuple_offset());
-        StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
+        auto* str_slot = reinterpret_cast<StringValue*>(slot);
         const std::string& column_from_path = columns_from_path[i];
         str_slot->ptr = const_cast<char*>(column_from_path.c_str());
         str_slot->len = column_from_path.size();
@@ -360,4 +458,28 @@ void BaseScanner::close() {
     }
 }
 
+void BaseScanner::_fill_columns_from_path() {
+    const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
+    if (range.__isset.num_of_columns_from_file) {
+        size_t start = range.num_of_columns_from_file;
+        size_t rows = _src_block.rows();
+
+        for (size_t i = 0; i < range.columns_from_path.size(); ++i) {
+            auto slot_desc = _src_slot_descs.at(i + start);
+            if (slot_desc == nullptr) continue;
+            auto is_nullable = slot_desc->is_nullable();
+            auto data_type = vectorized::DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
+                                                                                      is_nullable);
+            auto data_column = data_type->create_column();
+            const std::string& column_from_path = range.columns_from_path[i];
+            for (size_t j = 0; j < rows; ++j) {
+                data_column->insert_data(const_cast<char*>(column_from_path.c_str()),
+                                         column_from_path.size());
+            }
+            _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), data_type,
+                                                                slot_desc->col_name()));
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 2fccc62db8..1c2ce211b5 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -53,7 +53,10 @@ struct ScannerCounter {
 class BaseScanner {
 public:
     BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
+                const std::vector<TBrokerRangeDesc>& ranges,
+                const std::vector<TNetworkAddress>& broker_addresses,
                 const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
+
     virtual ~BaseScanner() {
         Expr::close(_dest_expr_ctx, _state);
         if (_state->enable_vectorized_exec()) {
@@ -77,21 +80,22 @@ public:
     virtual void close() = 0;
     Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
 
-    Status fill_dest_block(vectorized::Block* dest_block,
-                           std::vector<vectorized::MutableColumnPtr>& columns);
-
     void fill_slots_of_columns_from_path(int start,
                                          const std::vector<std::string>& columns_from_path);
 
     void free_expr_local_allocations();
 
-    Status filter_block(vectorized::Block* temp_block, size_t slot_num);
-
-    Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block);
-
 protected:
+    Status _fill_dest_block(vectorized::Block* dest_block, bool* eof);
+    virtual Status _init_src_block();
+
     RuntimeState* _state;
     const TBrokerScanRangeParams& _params;
+
+    //const TBrokerScanRangeParams& _params;
+    const std::vector<TBrokerRangeDesc>& _ranges;
+    const std::vector<TNetworkAddress>& _broker_addresses;
+    int _next_range;
     // used for process stat
     ScannerCounter* _counter;
 
@@ -109,9 +113,6 @@ protected:
     // Dest tuple descriptor and dest expr context
     const TupleDescriptor* _dest_tuple_desc;
     std::vector<ExprContext*> _dest_expr_ctx;
-    // for vectorized
-    std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
-    std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
     // the map values of dest slot id to src slot desc
     // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
     std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
@@ -135,7 +136,16 @@ protected:
     bool _success = false;
     bool _scanner_eof = false;
 
+    // for vectorized load
+    std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
+    std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
+    vectorized::Block _src_block;
+    int _num_of_columns_from_file;
+
 private:
+    Status _filter_src_block();
+    void _fill_columns_from_path();
+    Status _materialize_dest_block(vectorized::Block* output_block);
     Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
 };
 
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index d9453fecf0..2c21fd3d54 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -48,13 +48,10 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
                              const std::vector<TBrokerRangeDesc>& ranges,
                              const std::vector<TNetworkAddress>& broker_addresses,
                              const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
-        : BaseScanner(state, profile, params, pre_filter_texprs, counter),
-          _ranges(ranges),
-          _broker_addresses(broker_addresses),
+        : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
           _cur_file_reader(nullptr),
           _cur_line_reader(nullptr),
           _cur_decompressor(nullptr),
-          _next_range(0),
           _cur_line_reader_eof(false),
           _skip_lines(0) {
     if (params.__isset.column_separator_length && params.column_separator_length > 1) {
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 2b6ac2d302..f10ce68518 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -100,9 +100,6 @@ private:
     Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);
 
 protected:
-    const std::vector<TBrokerRangeDesc>& _ranges;
-    const std::vector<TNetworkAddress>& _broker_addresses;
-
     std::string _value_separator;
     std::string _line_delimiter;
     TFileFormatType::type _file_format_type;
@@ -113,7 +110,6 @@ protected:
     FileReader* _cur_file_reader;
     LineReader* _cur_line_reader;
     Decompressor* _cur_decompressor;
-    int _next_range;
     bool _cur_line_reader_eof;
 
     // When we fetch range start from 0, header_type="csv_with_names" skip first line
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index a23ce44b03..0be3d4c089 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -40,13 +40,10 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile,
                          const std::vector<TBrokerRangeDesc>& ranges,
                          const std::vector<TNetworkAddress>& broker_addresses,
                          const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
-        : BaseScanner(state, profile, params, pre_filter_texprs, counter),
-          _ranges(ranges),
-          _broker_addresses(broker_addresses),
+        : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
           _cur_file_reader(nullptr),
           _cur_line_reader(nullptr),
           _cur_json_reader(nullptr),
-          _next_range(0),
           _cur_reader_eof(false),
           _read_json_by_line(false) {
     if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) {
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index 276b2dd077..ab2f479e60 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -78,9 +78,6 @@ protected:
                             bool& num_as_string, bool& fuzzy_parse);
 
 protected:
-    const std::vector<TBrokerRangeDesc>& _ranges;
-    const std::vector<TNetworkAddress>& _broker_addresses;
-
     std::string _jsonpath;
     std::string _jsonpath_file;
 
@@ -91,7 +88,6 @@ protected:
     FileReader* _cur_file_reader;
     LineReader* _cur_line_reader;
     JsonReader* _cur_json_reader;
-    int _next_range;
     bool _cur_reader_eof;
     bool _read_json_by_line;
 
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index fdda223c1a..138eb729ef 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -120,11 +120,8 @@ ORCScanner::ORCScanner(RuntimeState* state, RuntimeProfile* profile,
                        const std::vector<TBrokerRangeDesc>& ranges,
                        const std::vector<TNetworkAddress>& broker_addresses,
                        const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
-        : BaseScanner(state, profile, params, pre_filter_texprs, counter),
-          _ranges(ranges),
-          _broker_addresses(broker_addresses),
+        : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
           // _splittable(params.splittable),
-          _next_range(0),
           _cur_file_eof(true),
           _total_groups(0),
           _current_group(0),
diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h
index c13e331fcd..7ee4ab0b61 100644
--- a/be/src/exec/orc_scanner.h
+++ b/be/src/exec/orc_scanner.h
@@ -47,11 +47,7 @@ private:
     Status open_next_reader();
 
 private:
-    const std::vector<TBrokerRangeDesc>& _ranges;
-    const std::vector<TNetworkAddress>& _broker_addresses;
-
     // Reader
-    int _next_range;
     bool _cur_file_eof;
 
     // orc file reader object
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 3295dc4bc7..c6cb02e8c2 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -25,9 +25,6 @@
 #include "exec/parquet_reader.h"
 #include "exec/s3_reader.h"
 #include "exec/text_converter.h"
-#include "exec/text_converter.hpp"
-#include "exprs/expr.h"
-#include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/raw_value.h"
 #include "runtime/stream_load/load_stream_mgr.h"
@@ -41,12 +38,9 @@ ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile,
                                const std::vector<TBrokerRangeDesc>& ranges,
                                const std::vector<TNetworkAddress>& broker_addresses,
                                const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
-        : BaseScanner(state, profile, params, pre_filter_texprs, counter),
-          _ranges(ranges),
-          _broker_addresses(broker_addresses),
+        : BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
           // _splittable(params.splittable),
           _cur_file_reader(nullptr),
-          _next_range(0),
           _cur_file_eof(false) {}
 
 ParquetScanner::~ParquetScanner() {
@@ -83,11 +77,8 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo
         RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
         break; // break always
     }
-    if (_scanner_eof) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
+
+    *eof = _scanner_eof;
     return Status::OK();
 }
 
diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h
index 535d3fe6c5..d66802dd95 100644
--- a/be/src/exec/parquet_scanner.h
+++ b/be/src/exec/parquet_scanner.h
@@ -74,13 +74,8 @@ protected:
     Status open_next_reader();
 
 protected:
-    //const TBrokerScanRangeParams& _params;
-    const std::vector<TBrokerRangeDesc>& _ranges;
-    const std::vector<TNetworkAddress>& _broker_addresses;
-
     // Reader
     ParquetReaderWrap* _cur_file_reader;
-    int _next_range;
     bool _cur_file_eof; // is read over?
 
     // used to hold current StreamLoadPipe
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8d5880531a..aa482fcfbf 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -353,8 +353,8 @@ std::string Block::dump_names() const {
 std::string Block::dump_data(size_t begin, size_t row_limit) const {
     std::vector<std::string> headers;
     std::vector<size_t> headers_size;
-    for (auto it = data.begin(); it != data.end(); ++it) {
-        std::string s = fmt::format("{}({})", it->name, it->type->get_name());
+    for (const auto& it : data) {
+        std::string s = fmt::format("{}({})", it.name, it.type->get_name());
         headers_size.push_back(s.size() > 15 ? s.size() : 15);
         headers.emplace_back(s);
     }
@@ -402,6 +402,20 @@ std::string Block::dump_data(size_t begin, size_t row_limit) const {
     return out.str();
 }
 
+std::string Block::dump_one_line(size_t row, int column_end) const {
+    assert(column_end < columns());
+    fmt::memory_buffer line;
+    for (int i = 0; i < column_end; ++i) {
+        if (LIKELY(i != 0)) {
+            // TODO: need more effective function of to string. now the impl is slow
+            fmt::format_to(line, " {}", data[i].to_string(row));
+        } else {
+            fmt::format_to(line, "{}", data[i].to_string(row));
+        }
+    }
+    return fmt::to_string(line);
+}
+
 std::string Block::dump_structure() const {
     // WriteBufferFromOwnString out;
     std::stringstream out;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 375ef6906f..729f531291 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -242,6 +242,9 @@ public:
     /** Get block data in string. */
     std::string dump_data(size_t begin = 0, size_t row_limit = 100) const;
 
+    /** Get one line data from block, only use in load data */
+    std::string dump_one_line(size_t row, int column_end) const;
+
     static Status filter_block(Block* block, int filter_conlumn_id, int column_to_keep);
 
     static void erase_useless_column(Block* block, int column_to_keep) {
diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp
index 5c3730b860..9e63fa5e24 100644
--- a/be/src/vec/data_types/data_type_nullable.cpp
+++ b/be/src/vec/data_types/data_type_nullable.cpp
@@ -47,7 +47,7 @@ std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) c
             assert_cast<const ColumnNullable&>(*column.convert_to_full_column_if_const().get());
 
     if (col.is_null_at(row_num)) {
-        return "\\N";
+        return "NULL";
     } else {
         return nested_data_type->to_string(col.get_nested_column(), row_num);
     }
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index dea6d55da5..3006dba788 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -44,18 +44,14 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
     _text_converter.reset(new (std::nothrow) TextConverter('\\'));
 }
 
-VBrokerScanner::~VBrokerScanner() {}
+VBrokerScanner::~VBrokerScanner() = default;
 
 Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
     SCOPED_TIMER(_read_timer);
+    RETURN_IF_ERROR(_init_src_block());
 
     const int batch_size = _state->batch_size();
-    // Get batch lines
-    int slot_num = _src_slot_descs.size();
-    std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-    for (int i = 0; i < slot_num; i++) {
-        columns[i] = _src_slot_descs[i]->get_empty_mutable_column();
-    }
+    auto columns = _src_block.mutate_columns();
 
     while (columns[0]->size() < batch_size && !_scanner_eof) {
         if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
@@ -85,51 +81,8 @@ Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
             }
         }
     }
-    if (_scanner_eof) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
-    return _fill_dest_block(output_block, columns);
-}
-
-Status VBrokerScanner::_fill_dest_block(Block* dest_block, std::vector<MutableColumnPtr>& columns) {
-    if (columns.empty() || columns[0]->size() == 0) {
-        return Status::OK();
-    }
-
-    std::unique_ptr<vectorized::Block> tmp_block(new vectorized::Block());
-    auto n_columns = 0;
-    for (const auto slot_desc : _src_slot_descs) {
-        tmp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                slot_desc->get_data_type_ptr(),
-                                                slot_desc->col_name()));
-    }
-    auto old_rows = tmp_block->rows();
-    // filter
-    if (!_vpre_filter_ctxs.empty()) {
-        for (auto vexpr_ctx : _vpre_filter_ctxs) {
-            RETURN_IF_ERROR(VExprContext::filter_block(vexpr_ctx, tmp_block.get(),
-                                                       _dest_tuple_desc->slots().size()));
-            _counter->num_rows_unselected += old_rows - tmp_block->rows();
-            old_rows = tmp_block->rows();
-        }
-    }
-
-    Status status;
-    // expr
-    if (!_dest_vexpr_ctx.empty()) {
-        *dest_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
-                _dest_vexpr_ctx, *tmp_block, status);
-        if (UNLIKELY(dest_block->rows() == 0)) {
-            _success = false;
-            return status;
-        }
-    } else {
-        *dest_block = *tmp_block;
-    }
 
-    return status;
+    return _fill_dest_block(output_block, eof);
 }
 
 Status VBrokerScanner::_fill_dest_columns(const Slice& line,
@@ -151,57 +104,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
 
         const Slice& value = _split_values[i];
         if (is_null(value)) {
-            // If _strict_mode is false, _src_slot_descs_order_by_dest size could be zero
-            if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index] != nullptr) &&
-                !_src_tuple->is_null(
-                        _src_slot_descs_order_by_dest[dest_index]->null_indicator_offset())) {
-                RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                        [&]() -> std::string {
-                            return _src_tuple_row->to_string(*(_row_desc.get()));
-                        },
-                        [&]() -> std::string {
-                            // Type of the slot is must be Varchar in _src_tuple.
-                            StringValue* raw_value = _src_tuple->get_string_slot(
-                                    _src_slot_descs_order_by_dest[dest_index]->tuple_offset());
-                            std::string raw_string;
-                            if (raw_value != nullptr) { //is not null then get raw value
-                                raw_string = raw_value->to_string();
-                            }
-                            fmt::memory_buffer error_msg;
-                            fmt::format_to(error_msg,
-                                           "column({}) value is incorrect while strict mode is {}, "
-                                           "src value is {}",
-                                           src_slot_desc->col_name(), _strict_mode, raw_string);
-                            return error_msg.data();
-                        },
-                        &_scanner_eof));
-                _counter->num_rows_filtered++;
-                _success = false;
-                return Status::OK();
-            }
-
-            if (!src_slot_desc->is_nullable()) {
-                RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                        [&]() -> std::string {
-                            return _src_tuple_row->to_string(*(_row_desc.get()));
-                        },
-                        [&]() -> std::string {
-                            fmt::memory_buffer error_msg;
-                            fmt::format_to(
-                                    error_msg,
-                                    "column({}) values is null while columns is not nullable",
-                                    src_slot_desc->col_name());
-                            return error_msg.data();
-                        },
-                        &_scanner_eof));
-                _counter->num_rows_filtered++;
-                _success = false;
-                return Status::OK();
-            }
             // nullable
             auto* nullable_column =
                     reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
-            nullable_column->insert_data(nullptr, 0);
+            nullable_column->insert_default();
             continue;
         }
 
@@ -209,27 +115,6 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
                                            &columns[dest_index], _state));
     }
 
-    const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
-    if (range.__isset.num_of_columns_from_file) {
-        RETURN_IF_ERROR(_fill_columns_from_path(range.num_of_columns_from_file,
-                                                range.columns_from_path, columns));
-    }
-
-    return Status::OK();
-}
-
-Status VBrokerScanner::_fill_columns_from_path(int start,
-                                               const std::vector<std::string>& columns_from_path,
-                                               std::vector<MutableColumnPtr>& columns) {
-    // values of columns from path can not be null
-    for (int i = 0; i < columns_from_path.size(); ++i) {
-        int dest_index = i + start;
-        auto slot_desc = _src_slot_descs.at(dest_index);
-        const std::string& column_from_path = columns_from_path[i];
-        RETURN_IF_ERROR(_write_text_column(const_cast<char*>(column_from_path.c_str()),
-                                           column_from_path.size(), slot_desc, &columns[dest_index],
-                                           _state));
-    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h
index 57c4004728..11d8b494fa 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -42,11 +42,6 @@ private:
     Status _write_text_column(char* value, int length, SlotDescriptor* slot,
                               MutableColumnPtr* column_ptr, RuntimeState* state);
 
-    Status _fill_dest_block(Block* block, std::vector<MutableColumnPtr>& columns);
-
     Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);
-
-    Status _fill_columns_from_path(int start, const std::vector<std::string>& columns_from_path,
-                                   std::vector<MutableColumnPtr>& columns);
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp
index b7645fecf3..e456e6dfc8 100644
--- a/be/src/vec/exec/vjson_scanner.cpp
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -46,14 +46,10 @@ VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile,
 
 Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) {
     SCOPED_TIMER(_read_timer);
+    RETURN_IF_ERROR(_init_src_block());
     const int batch_size = _state->batch_size();
-    size_t slot_num = _src_slot_descs.size();
-    std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-    auto string_type = make_nullable(std::make_shared<DataTypeString>());
-    for (int i = 0; i < slot_num; i++) {
-        columns[i] = string_type->create_column();
-    }
 
+    auto columns = _src_block.mutate_columns();
     // Get one line
     while (columns[0]->size() < batch_size && !_scanner_eof) {
         if (_cur_file_reader == nullptr || _cur_reader_eof) {
@@ -83,10 +79,8 @@ Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) {
 
     COUNTER_UPDATE(_rows_read_counter, columns[0]->size());
     SCOPED_TIMER(_materialize_timer);
-    RETURN_IF_ERROR(BaseScanner::fill_dest_block(output_block, columns));
 
-    *eof = _scanner_eof;
-    return Status::OK();
+    return _fill_dest_block(output_block, eof);
 }
 
 Status VJsonScanner::open_next_reader() {
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp
index 6a891850a7..037bc15028 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -36,29 +36,15 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
         : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
                          counter),
           _batch(nullptr),
-          _arrow_batch_cur_idx(0),
-          _num_of_columns_from_file(0) {}
-VParquetScanner::~VParquetScanner() {}
+          _arrow_batch_cur_idx(0) {}
+
+VParquetScanner::~VParquetScanner() = default;
 
 Status VParquetScanner::open() {
     RETURN_IF_ERROR(ParquetScanner::open());
     if (_ranges.empty()) {
         return Status::OK();
     }
-    auto range = _ranges[0];
-    _num_of_columns_from_file = range.__isset.num_of_columns_from_file
-                                        ? implicit_cast<int>(range.num_of_columns_from_file)
-                                        : implicit_cast<int>(_src_slot_descs.size());
-
-    // check consistency
-    if (range.__isset.num_of_columns_from_file) {
-        int size = range.columns_from_path.size();
-        for (const auto& r : _ranges) {
-            if (r.columns_from_path.size() != size) {
-                return Status::InternalError("ranges have different number of columns.");
-            }
-        }
-    }
     return Status::OK();
 }
 
@@ -99,9 +85,9 @@ Status VParquetScanner::_init_arrow_batch_if_necessary() {
     return status;
 }
 
-Status VParquetScanner::_init_src_block(Block* block) {
+Status VParquetScanner::_init_src_block() {
     size_t batch_pos = 0;
-    block->clear();
+    _src_block.clear();
     for (auto i = 0; i < _num_of_columns_from_file; ++i) {
         SlotDescriptor* slot_desc = _src_slot_descs[i];
         if (slot_desc == nullptr) {
@@ -118,7 +104,7 @@ Status VParquetScanner::_init_src_block(Block* block) {
                     fmt::format("Not support arrow type:{}", array->type()->name()));
         }
         MutableColumnPtr data_column = data_type->create_column();
-        block->insert(
+        _src_block.insert(
                 ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
     }
     return Status::OK();
@@ -150,15 +136,15 @@ Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
             return Status::OK();
         }
     }
-    Block src_block;
-    RETURN_IF_ERROR(_init_src_block(&src_block));
+
+    RETURN_IF_ERROR(_init_src_block());
     // convert arrow batch to block until reach the batch_size
     while (!_scanner_eof) {
         // cast arrow type to PT0 and append it to src block
         // for example: arrow::Type::INT16 => TYPE_SMALLINT
-        RETURN_IF_ERROR(_append_batch_to_src_block(&src_block));
+        RETURN_IF_ERROR(_append_batch_to_src_block(&_src_block));
         // finalize the src block if full
-        if (src_block.rows() >= _state->batch_size()) {
+        if (_src_block.rows() >= _state->batch_size()) {
             break;
         }
         auto status = _next_arrow_batch();
@@ -173,94 +159,14 @@ Status VParquetScanner::get_next(vectorized::Block* block, bool* eof) {
         _cur_file_eof = true;
         break;
     }
-    COUNTER_UPDATE(_rows_read_counter, src_block.rows());
+    COUNTER_UPDATE(_rows_read_counter, _src_block.rows());
     SCOPED_TIMER(_materialize_timer);
     // cast PT0 => PT1
     // for example: TYPE_SMALLINT => TYPE_VARCHAR
-    RETURN_IF_ERROR(_cast_src_block(&src_block));
-    // range of current file
-    _fill_columns_from_path(&src_block);
-    RETURN_IF_ERROR(_eval_conjunts(&src_block));
-    // materialize, src block => dest columns
-    RETURN_IF_ERROR(_materialize_block(&src_block, block));
-    *eof = _scanner_eof;
-    return Status::OK();
-}
-
-// eval conjuncts, for example: t1 > 1
-Status VParquetScanner::_eval_conjunts(Block* block) {
-    for (auto& vctx : _vpre_filter_ctxs) {
-        size_t orig_rows = block->rows();
-        RETURN_IF_ERROR(VExprContext::filter_block(vctx, block, block->columns()));
-        _counter->num_rows_unselected += orig_rows - block->rows();
-    }
-    return Status::OK();
-}
+    RETURN_IF_ERROR(_cast_src_block(&_src_block));
 
-void VParquetScanner::_fill_columns_from_path(Block* block) {
-    const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
-    if (range.__isset.num_of_columns_from_file) {
-        size_t start = range.num_of_columns_from_file;
-        size_t rows = block->rows();
-        for (size_t i = 0; i < range.columns_from_path.size(); ++i) {
-            auto slot_desc = _src_slot_descs.at(i + start);
-            if (slot_desc == nullptr) continue;
-            auto is_nullable = slot_desc->is_nullable();
-            DataTypePtr data_type =
-                    DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, is_nullable);
-            MutableColumnPtr data_column = data_type->create_column();
-            const std::string& column_from_path = range.columns_from_path[i];
-            for (size_t i = 0; i < rows; ++i) {
-                data_column->insert_data(const_cast<char*>(column_from_path.c_str()),
-                                         column_from_path.size());
-            }
-            block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
-                                                slot_desc->col_name()));
-        }
-    }
-}
-
-Status VParquetScanner::_materialize_block(Block* block, Block* dest_block) {
-    int ctx_idx = 0;
-    size_t orig_rows = block->rows();
-    auto filter_column = ColumnUInt8::create(orig_rows, 1);
-    for (auto slot_desc : _dest_tuple_desc->slots()) {
-        if (!slot_desc->is_materialized()) {
-            continue;
-        }
-        int dest_index = ctx_idx++;
-
-        VExprContext* ctx = _dest_vexpr_ctx[dest_index];
-        int result_column_id = 0;
-        // PT1 => dest primitive type
-        RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
-        ColumnPtr& ptr = block->safe_get_by_position(result_column_id).column;
-        if (!slot_desc->is_nullable()) {
-            if (auto* nullable_column = check_and_get_column<ColumnNullable>(*ptr)) {
-                if (nullable_column->has_null()) {
-                    // fill filter if src has null value and dest column is not nullable
-                    IColumn::Filter& filter = assert_cast<ColumnUInt8&>(*filter_column).get_data();
-                    const ColumnPtr& null_column_ptr = nullable_column->get_null_map_column_ptr();
-                    const auto& column_data =
-                            assert_cast<const ColumnUInt8&>(*null_column_ptr).get_data();
-                    for (size_t i = 0; i < null_column_ptr->size(); ++i) {
-                        filter[i] &= !column_data[i];
-                    }
-                }
-                ptr = nullable_column->get_nested_column_ptr();
-            }
-        }
-        dest_block->insert(vectorized::ColumnWithTypeAndName(
-                std::move(ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
-    }
-    size_t dest_size = dest_block->columns();
-    // do filter
-    dest_block->insert(vectorized::ColumnWithTypeAndName(
-            std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(),
-            "filter column"));
-    RETURN_IF_ERROR(Block::filter_block(dest_block, dest_size, dest_size));
-    _counter->num_rows_filtered += orig_rows - dest_block->rows();
-    return Status::OK();
+    // materialize, src block => dest columns
+    return _fill_dest_block(block, eof);
 }
 
 // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h
index 31749248d5..72ac280989 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -54,17 +54,13 @@ public:
 private:
     Status _next_arrow_batch();
     Status _init_arrow_batch_if_necessary();
-    Status _init_src_block(Block* block);
+    Status _init_src_block() override;
     Status _append_batch_to_src_block(Block* block);
     Status _cast_src_block(Block* block);
-    Status _eval_conjunts(Block* block);
-    Status _materialize_block(Block* block, Block* dest_block);
-    void _fill_columns_from_path(Block* block);
 
 private:
     std::shared_ptr<arrow::RecordBatch> _batch;
     size_t _arrow_batch_cur_idx;
-    int _num_of_columns_from_file;
 };
 
 } // namespace doris::vectorized
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 1a39c1f8e0..713aefc4a7 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -41,6 +41,13 @@ public:
         init();
         _profile = _runtime_state.runtime_profile();
         _runtime_state._instance_mem_tracker.reset(new MemTracker());
+
+        TUniqueId unique_id;
+        TQueryOptions query_options;
+        query_options.__set_enable_vectorized_engine(true);
+        TQueryGlobals query_globals;
+
+        _runtime_state.init(unique_id, query_options, query_globals, nullptr);
     }
     void init();
 
@@ -370,17 +377,17 @@ TEST_F(VBrokerScannerTest, normal) {
     auto columns = block->get_columns();
     ASSERT_EQ(columns.size(), 3);
 
-    ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1");
-    ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "4");
-    ASSERT_EQ(columns[0]->get_data_at(2).to_string(), "8");
+    ASSERT_EQ(columns[0]->get_int(0), 1);
+    ASSERT_EQ(columns[0]->get_int(1), 4);
+    ASSERT_EQ(columns[0]->get_int(2), 8);
 
-    ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2");
-    ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "5");
-    ASSERT_EQ(columns[1]->get_data_at(2).to_string(), "9");
+    ASSERT_EQ(columns[1]->get_int(0), 2);
+    ASSERT_EQ(columns[1]->get_int(1), 5);
+    ASSERT_EQ(columns[1]->get_int(2), 9);
 
-    ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3");
-    ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "6");
-    ASSERT_EQ(columns[2]->get_data_at(2).to_string(), "10");
+    ASSERT_EQ(columns[2]->get_int(0), 3);
+    ASSERT_EQ(columns[2]->get_int(1), 6);
+    ASSERT_EQ(columns[2]->get_int(2), 10);
 }
 
 TEST_F(VBrokerScannerTest, normal2) {
@@ -413,14 +420,14 @@ TEST_F(VBrokerScannerTest, normal2) {
     auto columns = block->get_columns();
     ASSERT_EQ(columns.size(), 3);
 
-    ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1");
-    ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "3");
+    ASSERT_EQ(columns[0]->get_int(0), 1);
+    ASSERT_EQ(columns[0]->get_int(1), 3);
 
-    ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2");
-    ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "4");
+    ASSERT_EQ(columns[1]->get_int(0), 2);
+    ASSERT_EQ(columns[1]->get_int(1), 4);
 
-    ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3");
-    ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "5");
+    ASSERT_EQ(columns[2]->get_int(0), 3);
+    ASSERT_EQ(columns[2]->get_int(1), 5);
 }
 
 TEST_F(VBrokerScannerTest, normal5) {
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp
index c96772a011..393ff80f16 100644
--- a/be/test/vec/exec/vjson_scanner_test.cpp
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -47,7 +47,13 @@ public:
     VJsonScannerTest() : _runtime_state(TQueryGlobals()) {
         init();
         _runtime_state._instance_mem_tracker.reset(new MemTracker());
-        _runtime_state._exec_env = ExecEnv::GetInstance();
+
+        TUniqueId unique_id;
+        TQueryOptions query_options;
+        query_options.__set_enable_vectorized_engine(true);
+        TQueryGlobals query_globals;
+
+        _runtime_state.init(unique_id, query_options, query_globals, nullptr);
     }
     void init();
     static void SetUpTestCase() {
@@ -391,7 +397,7 @@ void VJsonScannerTest::create_expr_info() {
             TTypeNode node;
             node.__set_type(TTypeNodeType::SCALAR);
             TScalarType scalar_type;
-            scalar_type.__set_type(TPrimitiveType::BIGINT);
+            scalar_type.__set_type(TPrimitiveType::DOUBLE);
             node.__set_scalar_type(scalar_type);
             int_type.types.push_back(node);
         }
@@ -553,6 +559,7 @@ TEST_F(VJsonScannerTest, simple_array_json) {
         range.format_type = TFileFormatType::FORMAT_JSON;
         range.strip_outer_array = true;
         range.__isset.strip_outer_array = true;
+        range.__set_num_as_string(true);
         range.splittable = true;
         range.path = "./be/test/exec/test_data/json_scanner/test_simple2.json";
         range.file_type = TFileType::FILE_LOCAL;
@@ -583,9 +590,9 @@ TEST_F(VJsonScannerTest, simple_array_json) {
     ASSERT_EQ(columns[3].to_string(0), "8.950000");
     ASSERT_EQ(columns[3].to_string(1), "12.990000");
     ASSERT_EQ(columns[4].to_string(0), "1234");
-    ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424.000000");
-    ASSERT_EQ(columns[5].to_string(0), "1234.123400");
-    ASSERT_EQ(columns[5].to_string(1), "10000000000000.001953");
+    ASSERT_EQ(columns[4].to_string(1), "1180591620717411303424");
+    ASSERT_EQ(columns[5].to_string(0), "1234.123400000");
+    ASSERT_EQ(columns[5].to_string(1), "9999999999999.999999000");
 
     block.clear();
     status = scan_node.get_next(&_runtime_state, &block, &eof);
@@ -753,12 +760,12 @@ TEST_F(VJsonScannerTest, use_jsonpaths_mismatch) {
 
     auto columns = block.get_columns_with_type_and_name();
     ASSERT_EQ(columns.size(), 6);
-    ASSERT_EQ(columns[0].to_string(0), "\\N");
-    ASSERT_EQ(columns[0].to_string(1), "\\N");
-    ASSERT_EQ(columns[1].to_string(0), "\\N");
-    ASSERT_EQ(columns[1].to_string(1), "\\N");
-    ASSERT_EQ(columns[2].to_string(0), "\\N");
-    ASSERT_EQ(columns[2].to_string(1), "\\N");
+    ASSERT_EQ(columns[0].to_string(0), "NULL");
+    ASSERT_EQ(columns[0].to_string(1), "NULL");
+    ASSERT_EQ(columns[1].to_string(0), "NULL");
+    ASSERT_EQ(columns[1].to_string(1), "NULL");
+    ASSERT_EQ(columns[2].to_string(0), "NULL");
+    ASSERT_EQ(columns[2].to_string(1), "NULL");
     block.clear();
     scan_node.close(&_runtime_state);
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 303f080976..ae8aafa9cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
@@ -130,6 +131,7 @@ public class LoadLoadingTask extends LoadTask {
                 planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance);
         curCoordinator.setQueryType(TQueryType.LOAD);
         curCoordinator.setExecMemoryLimit(execMemLimit);
+        curCoordinator.setExecVecEngine(Config.enable_vectorized_load);
         /*
          * For broker load job, user only need to set mem limit by 'exec_mem_limit' property.
          * And the variable 'load_mem_limit' does not make any effect.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index d74b17f490..78fa7a23b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
@@ -125,6 +126,9 @@ public class LoadingTaskPlanner {
         scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism);
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
+        if (Config.enable_vectorized_load) {
+            scanNode.convertToVectoriezd();
+        }
         scanNodes.add(scanNode);
         descTable.computeStatAndMemLayout();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org