You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/09 03:24:12 UTC

[incubator-doris] branch master updated: [feature] (vec) instead of converting line to src tuple for stream load in vectorized. (#9314)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eec1dfde3a [feature] (vec)  instead of converting line to src tuple for stream load in vectorized. (#9314)
eec1dfde3a is described below

commit eec1dfde3a6b93eccc28fb1c0b58233987a2b856
Author: xiepengcheng01 <10...@users.noreply.github.com>
AuthorDate: Mon May 9 11:24:07 2022 +0800

    [feature] (vec)  instead of converting line to src tuple for stream load in vectorized. (#9314)
    
    Co-authored-by: xiepengcheng01 <xi...@xafj-palo-rpm64.xafj.baidu.com>
---
 be/src/exec/base_scanner.cpp                |  41 ++++-
 be/src/exec/base_scanner.h                  |  15 +-
 be/src/exec/broker_scanner.cpp              |  18 +-
 be/src/exec/broker_scanner.h                |   6 +-
 be/src/vec/exec/vbroker_scan_node.cpp       | 188 ++++++++++----------
 be/src/vec/exec/vbroker_scan_node.h         |   1 +
 be/src/vec/exec/vbroker_scanner.cpp         | 263 ++++++++++++----------------
 be/src/vec/exec/vbroker_scanner.h           |  17 +-
 be/test/exec/multi_bytes_separator_test.cpp |   9 +-
 be/test/vec/exec/vbroker_scan_node_test.cpp |  72 ++++----
 be/test/vec/exec/vbroker_scanner_test.cpp   |  66 +++----
 11 files changed, 350 insertions(+), 346 deletions(-)

diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index ca5b08831f..8621cf75f8 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -107,10 +107,18 @@ Status BaseScanner::init_expr_ctxes() {
 
     // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
     if (!_pre_filter_texprs.empty()) {
-        RETURN_IF_ERROR(
-                Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs));
-        RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
-        RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
+        if (_state->enable_vectorized_exec()) {
+            RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
+                    _state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs));
+            RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc,
+                                                       _mem_tracker));
+            RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state));
+        } else {
+            RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs,
+                                                    &_pre_filter_ctxs));
+            RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
+            RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
+        }
     }
 
     // Construct dest slots information
@@ -133,11 +141,22 @@ Status BaseScanner::init_expr_ctxes() {
                << ", name=" << slot_desc->col_name();
             return Status::InternalError(ss.str());
         }
-        ExprContext* ctx = nullptr;
-        RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
-        RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
-        RETURN_IF_ERROR(ctx->open(_state));
-        _dest_expr_ctx.emplace_back(ctx);
+
+        if (_state->enable_vectorized_exec()) {
+            vectorized::VExprContext* ctx = nullptr;
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+            RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
+            RETURN_IF_ERROR(ctx->open(_state));
+            _dest_vexpr_ctx.emplace_back(ctx);
+        } else {
+            ExprContext* ctx = nullptr;
+            RETURN_IF_ERROR(Expr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+            RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
+            RETURN_IF_ERROR(ctx->open(_state));
+            _dest_expr_ctx.emplace_back(ctx);
+        }
+
         if (has_slot_id_map) {
             auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
             if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
@@ -284,6 +303,10 @@ void BaseScanner::close() {
     if (!_pre_filter_ctxs.empty()) {
         Expr::close(_pre_filter_ctxs, _state);
     }
+
+    if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) {
+        vectorized::VExpr::close(_vpre_filter_ctxs, _state);
+    }
 }
 
 } // namespace doris
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 13285ab6aa..d98e39e663 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -20,6 +20,7 @@
 
 #include "common/status.h"
 #include "exprs/expr.h"
+#include "vec/exprs/vexpr.h"
 #include "runtime/tuple.h"
 #include "util/runtime_profile.h"
 
@@ -52,7 +53,12 @@ class BaseScanner {
 public:
     BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
                 const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-    virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); };
+    virtual ~BaseScanner() {
+        Expr::close(_dest_expr_ctx, _state);
+        if (_state->enable_vectorized_exec()) {
+            vectorized::VExpr::close(_dest_vexpr_ctx, _state);
+        }
+    };
 
     virtual Status init_expr_ctxes();
     // Open this scanner, will initialize information need to
@@ -62,8 +68,8 @@ public:
     virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) = 0;
 
     // Get next block
-    virtual Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) {
-        return Status::NotSupported("Not Implemented get next");
+    virtual Status get_next(vectorized::Block* block, bool* eof) {
+        return Status::NotSupported("Not Implemented get block");
     }
 
     // Close this scanner
@@ -95,6 +101,9 @@ protected:
     // Dest tuple descriptor and dest expr context
     const TupleDescriptor* _dest_tuple_desc;
     std::vector<ExprContext*> _dest_expr_ctx;
+    // for vectorized
+    std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
+    std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
     // the map values of dest slot id to src slot desc
     // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
     std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index cda19d8611..d35e8428a4 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -468,8 +468,7 @@ Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool*
     return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
 }
 
-// Convert one row to this tuple
-Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
+Status BrokerScanner::_line_split_to_values(const Slice& line) {
     bool is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
     if (!is_proto_format && !validate_utf8(line.data, line.size)) {
         RETURN_IF_ERROR(_state->append_error_msg_to_file(
@@ -546,6 +545,17 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
         }
     }
 
+    _success = true;
+    return Status::OK();
+}
+
+// Convert one row to this tuple
+Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
+    RETURN_IF_ERROR(_line_split_to_values(line));
+    if (!_success) {
+        return Status::OK();
+    }
+
     for (int i = 0; i < _split_values.size(); ++i) {
         auto slot_desc = _src_slot_descs[i];
         const Slice& value = _split_values[i];
@@ -560,11 +570,11 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
         str_slot->len = value.size;
     }
 
+    const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
     if (range.__isset.num_of_columns_from_file) {
-        fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path);
+        fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path);
     }
 
-    _success = true;
     return Status::OK();
 }
 
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 344d3789ae..2b6ac2d302 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -65,8 +65,8 @@ public:
     virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof,
                             bool* fill_tuple) override;
 
-    Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* eof) override {
-        return Status::NotSupported("Not Implemented get columns");
+    Status get_next(vectorized::Block* block, bool* eof) override {
+        return Status::NotSupported("Not Implemented get block");
     }
 
     // Close this scanner
@@ -78,6 +78,8 @@ protected:
 
     Status _line_to_src_tuple(const Slice& line);
 
+    Status _line_split_to_values(const Slice& line);
+
 private:
     Status open_file_reader();
     Status create_decompressor(TFileFormatType::type type);
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp
index d12f3e8d85..09338a1959 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block,
         return Status::OK();
     }
 
-    std::shared_ptr<vectorized::Block> scanner_block;
-    {
-        std::unique_lock<std::mutex> l(_batch_queue_lock);
-        while (_process_status.ok() && !_runtime_state->is_cancelled() &&
-               _num_running_scanners > 0 && _block_queue.empty()) {
-            SCOPED_TIMER(_wait_scanner_timer);
-            _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
-        }
-        if (!_process_status.ok()) {
-            // Some scanner process failed.
-            return _process_status;
+    const int batch_size = _runtime_state->batch_size();
+    while (true) {
+        std::shared_ptr<vectorized::Block> scanner_block;
+        {
+            std::unique_lock<std::mutex> l(_batch_queue_lock);
+            while (_process_status.ok() && !_runtime_state->is_cancelled() &&
+                   _num_running_scanners > 0 && _block_queue.empty()) {
+                SCOPED_TIMER(_wait_scanner_timer);
+                _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
+            }
+            if (!_process_status.ok()) {
+                // Some scanner process failed.
+                return _process_status;
+            }
+            if (_runtime_state->is_cancelled()) {
+                if (update_status(Status::Cancelled("Cancelled"))) {
+                    _queue_writer_cond.notify_all();
+                }
+                return _process_status;
+            }
+            if (!_block_queue.empty()) {
+                scanner_block = _block_queue.front();
+                _block_queue.pop_front();
+            }
         }
-        if (_runtime_state->is_cancelled()) {
-            if (update_status(Status::Cancelled("Cancelled"))) {
-                _queue_writer_cond.notify_all();
+
+        // All scanner has been finished, and all cached batch has been read
+        if (!scanner_block) {
+            if (_mutable_block && !_mutable_block->empty()) {
+                *block = _mutable_block->to_block();
+                reached_limit(block, eos);
+                LOG_IF(INFO, *eos) << "VBrokerScanNode ReachedLimit.";
             }
-            return _process_status;
+            _scan_finished.store(true);
+            *eos = true;
+            return Status::OK();
         }
-        if (!_block_queue.empty()) {
-            scanner_block = _block_queue.front();
-            _block_queue.pop_front();
+        // notify one scanner
+        _queue_writer_cond.notify_one();
+
+        if (UNLIKELY(!_mutable_block)) {
+            _mutable_block.reset(new MutableBlock(scanner_block->clone_empty()));
         }
-    }
 
-    // All scanner has been finished, and all cached batch has been read
-    if (scanner_block == nullptr) {
-        _scan_finished.store(true);
-        *eos = true;
-        return Status::OK();
+        if (_mutable_block->rows() + scanner_block->rows() < batch_size) {
+            // merge scanner_block into _mutable_block
+            _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
+            continue;
+        } else {
+            if (_mutable_block->empty()) {
+                // directly use scanner_block
+                *block = *scanner_block;
+            } else {
+                // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next.
+                *block = _mutable_block->to_block();
+                _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns());
+                _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
+            }
+            break;
+        }
     }
 
-    // notify one scanner
-    _queue_writer_cond.notify_one();
-
-    reached_limit(scanner_block.get(), eos);
-    *block = *scanner_block;
-
+    reached_limit(block, eos);
     if (*eos) {
         _scan_finished.store(true);
         _queue_writer_cond.notify_all();
@@ -120,75 +146,53 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
     std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter);
     RETURN_IF_ERROR(scanner->open());
     bool scanner_eof = false;
-
-    const int batch_size = _runtime_state->batch_size();
-    size_t slot_num = _tuple_desc->slots().size();
-
     while (!scanner_eof) {
-        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
-        std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-        for (int i = 0; i < slot_num; i++) {
-            columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+        RETURN_IF_CANCELLED(_runtime_state);
+        // If we have finished all works
+        if (_scan_finished.load() || !_process_status.ok()) {
+            return Status::OK();
         }
 
-        while (columns[0]->size() < batch_size && !scanner_eof) {
-            RETURN_IF_CANCELLED(_runtime_state);
-            // If we have finished all works
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
-
-            RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof));
-            if (scanner_eof) {
-                break;
-            }
+        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
+        RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
+        if (block->rows() == 0) {
+            continue;
+        }
+        auto old_rows = block->rows();
+        RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
+                                                   _tuple_desc->slots().size()));
+        counter->num_rows_unselected += old_rows - block->rows();
+        if (block->rows() == 0) {
+            continue;
         }
 
-        if (!columns[0]->empty()) {
-            auto n_columns = 0;
-            for (const auto slot_desc : _tuple_desc->slots()) {
-                block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                    slot_desc->get_data_type_ptr(),
-                                                    slot_desc->col_name()));
-            }
-
-            auto old_rows = block->rows();
-
-            RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
-                                                       _tuple_desc->slots().size()));
-
-            counter->num_rows_unselected += old_rows - block->rows();
-
-            std::unique_lock<std::mutex> l(_batch_queue_lock);
-            while (_process_status.ok() && !_scan_finished.load() &&
-                   !_runtime_state->is_cancelled() &&
-                   // stop pushing more batch if
-                   // 1. too many batches in queue, or
-                   // 2. at least one batch in queue and memory exceed limit.
-                   (_block_queue.size() >= _max_buffered_batches ||
-                    (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
-                _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
-            }
-            // Process already set failed, so we just return OK
-            if (!_process_status.ok()) {
-                return Status::OK();
-            }
-            // Scan already finished, just return
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
-            // Runtime state is canceled, just return cancel
-            if (_runtime_state->is_cancelled()) {
-                return Status::Cancelled("Cancelled");
-            }
-            // Queue size Must be smaller than _max_buffered_batches
-            _block_queue.push_back(block);
-
-            // Notify reader to
-            _queue_reader_cond.notify_one();
+        std::unique_lock<std::mutex> l(_batch_queue_lock);
+        while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() &&
+               // stop pushing more batch if
+               // 1. too many batches in queue, or
+               // 2. at least one batch in queue and memory exceed limit.
+               (_block_queue.size() >= _max_buffered_batches ||
+                (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
+            _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
         }
-    }
+        // Process already set failed, so we just return OK
+        if (!_process_status.ok()) {
+            return Status::OK();
+        }
+        // Scan already finished, just return
+        if (_scan_finished.load()) {
+            return Status::OK();
+        }
+        // Runtime state is canceled, just return cancel
+        if (_runtime_state->is_cancelled()) {
+            return Status::Cancelled("Cancelled");
+        }
+        // Queue size Must be smaller than _max_buffered_batches
+        _block_queue.push_back(block);
 
+        // Notify reader to
+        _queue_reader_cond.notify_one();
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/vbroker_scan_node.h b/be/src/vec/exec/vbroker_scan_node.h
index 9a3b2fe362..4ccebed5fb 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -51,6 +51,7 @@ private:
     Status scanner_scan(const TBrokerScanRange& scan_range, ScannerCounter* counter);
 
     std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
+    std::unique_ptr<MutableBlock> _mutable_block;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index 28e2f24c22..39302099a6 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -21,25 +21,41 @@
 #include <iostream>
 #include <sstream>
 
+#include "exec/text_converter.h"
 #include "exec/exec_node.h"
 #include "exprs/expr_context.h"
 #include "exec/plain_text_line_reader.h"
+#include "util/utf8_check.h"
 
 namespace doris::vectorized {
+
+bool is_null(const Slice& slice) {
+    return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
+}
+
 VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
                                const TBrokerScanRangeParams& params,
                                const std::vector<TBrokerRangeDesc>& ranges,
                                const std::vector<TNetworkAddress>& broker_addresses,
                                const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
         : BrokerScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
-                        counter) {}
+                        counter) {
+    _text_converter.reset(new (std::nothrow) TextConverter('\\'));
+}
 
-Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eof) {
+VBrokerScanner::~VBrokerScanner() {}
+
+Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
     SCOPED_TIMER(_read_timer);
 
     const int batch_size = _state->batch_size();
+    // Get batch lines
+    int slot_num = _src_slot_descs.size();
+    std::vector<vectorized::MutableColumnPtr> columns(slot_num);
+    for (int i = 0; i < slot_num; i++) {
+        columns[i] = _src_slot_descs[i]->get_empty_mutable_column();
+    }
 
-    // Get one line
     while (columns[0]->size() < batch_size && !_scanner_eof) {
         if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
             RETURN_IF_ERROR(open_next_reader());
@@ -62,7 +78,7 @@ Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eo
         {
             COUNTER_UPDATE(_rows_read_counter, 1);
             SCOPED_TIMER(_materialize_timer);
-            RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), columns));
+            RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), columns));
             if (_success) {
                 free_expr_local_allocations();
             }
@@ -73,53 +89,67 @@ Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eo
     } else {
         *eof = false;
     }
-    return Status::OK();
+    return _fill_dest_block(output_block, columns);
 }
 
-Status VBrokerScanner::_convert_one_row(const Slice& line, std::vector<MutableColumnPtr>& columns) {
-    RETURN_IF_ERROR(_line_to_src_tuple(line));
-    if (!_success) {
-        // If not success, which means we met an invalid row, return.
+Status VBrokerScanner::_fill_dest_block(Block* dest_block, std::vector<MutableColumnPtr>& columns) {
+    if (columns.empty() || columns[0]->size() == 0) {
         return Status::OK();
     }
 
-    return _fill_dest_columns(columns);
+    std::unique_ptr<vectorized::Block> tmp_block(new vectorized::Block());
+    auto n_columns = 0;
+    for (const auto slot_desc : _src_slot_descs) {
+        tmp_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
+                                                slot_desc->get_data_type_ptr(),
+                                                slot_desc->col_name()));
+    }
+    auto old_rows = tmp_block->rows();
+    // filter
+    if (!_vpre_filter_ctxs.empty()) {
+        for (auto vexpr_ctx : _vpre_filter_ctxs) {
+            RETURN_IF_ERROR(VExprContext::filter_block(vexpr_ctx, tmp_block.get(),
+                                                       _dest_tuple_desc->slots().size()));
+            _counter->num_rows_unselected += old_rows - tmp_block->rows();
+            old_rows = tmp_block->rows();
+        }
+    }
+
+    Status status;
+    // expr
+    if (!_dest_vexpr_ctx.empty()) {
+        *dest_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
+                _dest_vexpr_ctx, *tmp_block, status);
+        if (UNLIKELY(dest_block->rows() == 0)) {
+            _success = false;
+            return status;
+        }
+    } else {
+        *dest_block = *tmp_block;
+    }
+
+    return status;
 }
 
-Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns) {
-    // filter src tuple by preceding filter first
-    if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) {
-        _counter->num_rows_unselected++;
-        _success = false;
+Status VBrokerScanner::_fill_dest_columns(const Slice& line,
+                                          std::vector<MutableColumnPtr>& columns) {
+    RETURN_IF_ERROR(_line_split_to_values(line));
+    if (!_success) {
+        // If not success, which means we met an invalid row, return.
         return Status::OK();
     }
-    // convert and fill dest tuple
-    int ctx_idx = 0;
-    for (auto slot_desc : _dest_tuple_desc->slots()) {
-        if (!slot_desc->is_materialized()) {
+
+    int idx = 0;
+    for (int i = 0; i < _split_values.size(); ++i) {
+        int dest_index = idx++;
+
+        auto src_slot_desc = _src_slot_descs[i];
+        if (!src_slot_desc->is_materialized()) {
             continue;
         }
 
-        int dest_index = ctx_idx++;
-        auto* column_ptr = columns[dest_index].get();
-
-        ExprContext* ctx = _dest_expr_ctx[dest_index];
-        void* value = ctx->get_value(_src_tuple_row);
-        if (value == nullptr) {
-            // Only when the expr return value is null, we will check the error message.
-            std::string expr_error = ctx->get_error_msg();
-            if (!expr_error.empty()) {
-                RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                        [&]() -> std::string {
-                            return _src_tuple_row->to_string(*(_row_desc.get()));
-                        },
-                        [&]() -> std::string { return expr_error; }, &_scanner_eof));
-                _counter->num_rows_filtered++;
-                // The ctx is reused, so must clear the error state and message.
-                ctx->clear_error_msg();
-                _success = false;
-                return Status::OK();
-            }
+        const Slice& value = _split_values[i];
+        if (is_null(value)) {
             // If _strict_mode is false, _src_slot_descs_order_by_dest size could be zero
             if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index] != nullptr) &&
                 !_src_tuple->is_null(
@@ -140,7 +170,7 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns
                             fmt::format_to(error_msg,
                                            "column({}) value is incorrect while strict mode is {}, "
                                            "src value is {}",
-                                           slot_desc->col_name(), _strict_mode, raw_string);
+                                           src_slot_desc->col_name(), _strict_mode, raw_string);
                             return error_msg.data();
                         },
                         &_scanner_eof));
@@ -148,7 +178,8 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns
                 _success = false;
                 return Status::OK();
             }
-            if (!slot_desc->is_nullable()) {
+
+            if (!src_slot_desc->is_nullable()) {
                 RETURN_IF_ERROR(_state->append_error_msg_to_file(
                         [&]() -> std::string {
                             return _src_tuple_row->to_string(*(_row_desc.get()));
@@ -158,7 +189,7 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns
                             fmt::format_to(
                                     error_msg,
                                     "column({}) values is null while columns is not nullable",
-                                    slot_desc->col_name());
+                                    src_slot_desc->col_name());
                             return error_msg.data();
                         },
                         &_scanner_eof));
@@ -166,124 +197,50 @@ Status VBrokerScanner::_fill_dest_columns(std::vector<MutableColumnPtr>& columns
                 _success = false;
                 return Status::OK();
             }
-            auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
+            // nullable
+            auto* nullable_column =
+                    reinterpret_cast<vectorized::ColumnNullable*>(columns[dest_index].get());
             nullable_column->insert_data(nullptr, 0);
             continue;
         }
-        if (slot_desc->is_nullable()) {
-            auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
-            nullable_column->get_null_map_data().push_back(0);
-            column_ptr = &nullable_column->get_nested_column();
-        }
-        char* value_ptr = (char*)value;
-        switch (slot_desc->type().type) {
-        case TYPE_BOOLEAN: {
-            assert_cast<ColumnVector<UInt8>*>(column_ptr)->insert_data(value_ptr, 0);
-            break;
-        }
-        case TYPE_TINYINT: {
-            assert_cast<ColumnVector<Int8>*>(column_ptr)->insert_data(value_ptr, 0);
-            break;
-        }
-        case TYPE_SMALLINT: {
-            assert_cast<ColumnVector<Int16>*>(column_ptr)->insert_data(value_ptr, 0);
-            break;
-        }
-        case TYPE_INT: {
-            assert_cast<ColumnVector<Int32>*>(column_ptr)->insert_data(value_ptr, 0);
-            break;
-        }
-        case TYPE_BIGINT: {
-            assert_cast<ColumnVector<Int64>*>(column_ptr)->insert_data(value_ptr, 0);
-            break;
-        }
-        case TYPE_LARGEINT: {
-            assert_cast<ColumnVector<Int128>*>(column_ptr)->insert_data(value_ptr, 0);
-            break;
-        }
-        case TYPE_FLOAT: {
-            assert_cast<ColumnVector<Float32>*>(column_ptr)->insert_data(value_ptr, 0);
-            break;
-        }
-        case TYPE_DOUBLE: {
-            assert_cast<ColumnVector<Float64>*>(column_ptr)->insert_data(value_ptr, 0);
-            break;
-        }
-        case TYPE_CHAR: {
-            Slice* slice = reinterpret_cast<Slice*>(value_ptr);
-            assert_cast<ColumnString*>(column_ptr)
-                    ->insert_data(slice->data, strnlen(slice->data, slice->size));
-            break;
-        }
-        case TYPE_VARCHAR:
-        case TYPE_STRING: {
-            Slice* slice = reinterpret_cast<Slice*>(value_ptr);
-            assert_cast<ColumnString*>(column_ptr)->insert_data(slice->data, slice->size);
-            break;
-        }
-        case TYPE_OBJECT: {
-            Slice* slice = reinterpret_cast<Slice*>(value_ptr);
-            // insert_default()
-            auto* target_column = assert_cast<ColumnBitmap*>(column_ptr);
 
-            target_column->insert_default();
-            BitmapValue* pvalue = nullptr;
-            int pos = target_column->size() - 1;
-            pvalue = &target_column->get_element(pos);
+        RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc,
+                                           &columns[dest_index], _state));
+    }
 
-            if (slice->size != 0) {
-                BitmapValue value;
-                value.deserialize(slice->data);
-                *pvalue = std::move(value);
-            } else {
-                *pvalue = std::move(*reinterpret_cast<BitmapValue*>(slice->data));
-            }
-            break;
-        }
-        case TYPE_HLL: {
-            Slice* slice = reinterpret_cast<Slice*>(value_ptr);
-            auto* target_column = assert_cast<ColumnHLL*>(column_ptr);
+    const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
+    if (range.__isset.num_of_columns_from_file) {
+        RETURN_IF_ERROR(_fill_columns_from_path(range.num_of_columns_from_file,
+                                                range.columns_from_path, columns));
+    }
 
-            target_column->insert_default();
-            HyperLogLog* pvalue = nullptr;
-            int pos = target_column->size() - 1;
-            pvalue = &target_column->get_element(pos);
-            if (slice->size != 0) {
-                HyperLogLog value;
-                value.deserialize(*slice);
-                *pvalue = std::move(value);
-            } else {
-                *pvalue = std::move(*reinterpret_cast<HyperLogLog*>(slice->data));
-            }
-            break;
-        }
-        case TYPE_DECIMALV2: {
-            assert_cast<ColumnDecimal<Decimal128>*>(column_ptr)
-                    ->insert_data(reinterpret_cast<char*>(value_ptr), 0);
-            break;
-        }
-        case TYPE_DATETIME: {
-            DateTimeValue value = *reinterpret_cast<DateTimeValue*>(value_ptr);
-            VecDateTimeValue date;
-            date.convert_dt_to_vec_dt(&value);
-            assert_cast<ColumnVector<Int64>*>(column_ptr)
-                    ->insert_data(reinterpret_cast<char*>(&date), 0);
-            break;
-        }
-        case TYPE_DATE: {
-            DateTimeValue value = *reinterpret_cast<DateTimeValue*>(value_ptr);
-            VecDateTimeValue date;
-            date.convert_dt_to_vec_dt(&value);
-            assert_cast<ColumnVector<Int64>*>(column_ptr)
-                    ->insert_data(reinterpret_cast<char*>(&date), 0);
-            break;
-        }
-        default: {
-            break;
-        }
-        }
+    return Status::OK();
+}
+
+Status VBrokerScanner::_fill_columns_from_path(int start,
+                                               const std::vector<std::string>& columns_from_path,
+                                               std::vector<MutableColumnPtr>& columns) {
+    // values of columns from path can not be null
+    for (int i = 0; i < columns_from_path.size(); ++i) {
+        int dest_index = i + start;
+        auto slot_desc = _src_slot_descs.at(dest_index);
+        const std::string& column_from_path = columns_from_path[i];
+        RETURN_IF_ERROR(_write_text_column(const_cast<char*>(column_from_path.c_str()),
+                                           column_from_path.size(), slot_desc, &columns[dest_index],
+                                           _state));
+    }
+    return Status::OK();
+}
+
+Status VBrokerScanner::_write_text_column(char* value, int value_length, SlotDescriptor* slot,
+                                          vectorized::MutableColumnPtr* column_ptr,
+                                          RuntimeState* state) {
+    if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) {
+        std::stringstream ss;
+        ss << "Fail to convert text value:'" << value << "' to " << slot->type() << " on column:`"
+           << slot->col_name() + "`";
+        return Status::InternalError(ss.str());
     }
-    _success = true;
     return Status::OK();
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h
index 469e38d4e8..57c4004728 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -27,17 +27,26 @@ public:
                    const std::vector<TBrokerRangeDesc>& ranges,
                    const std::vector<TNetworkAddress>& broker_addresses,
                    const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-    ~VBrokerScanner() override = default;
+    ~VBrokerScanner();
 
     virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof,
                             bool* fill_tuple) override {
         return Status::NotSupported("Not Implemented get next");
     }
 
-    virtual Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof) override;
+    Status get_next(Block* block, bool* eof) override;
 
 private:
-    Status _convert_one_row(const Slice& line, std::vector<MutableColumnPtr>& columns);
-    Status _fill_dest_columns(std::vector<MutableColumnPtr>& columns);
+    std::unique_ptr<TextConverter> _text_converter;
+
+    Status _write_text_column(char* value, int length, SlotDescriptor* slot,
+                              MutableColumnPtr* column_ptr, RuntimeState* state);
+
+    Status _fill_dest_block(Block* block, std::vector<MutableColumnPtr>& columns);
+
+    Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);
+
+    Status _fill_columns_from_path(int start, const std::vector<std::string>& columns_from_path,
+                                   std::vector<MutableColumnPtr>& columns);
 };
 } // namespace doris::vectorized
diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp
index 9a99c354bb..3712f6b141 100644
--- a/be/test/exec/multi_bytes_separator_test.cpp
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -37,7 +37,10 @@ namespace doris {
 
 class MultiBytesSeparatorTest : public testing::Test {
 public:
-    MultiBytesSeparatorTest() {}
+    MultiBytesSeparatorTest() : _runtime_state(TQueryGlobals()) {}
+
+private:
+    RuntimeState _runtime_state;
 
 protected:
     virtual void SetUp() {}
@@ -56,8 +59,8 @@ TEST_F(MultiBytesSeparatorTest, normal) {
     const std::vector<TBrokerRangeDesc> ranges;
     const std::vector<TNetworkAddress> broker_addresses;
     const std::vector<TExpr> pre_filter_texprs;
-    BrokerScanner scanner(nullptr, nullptr, params, ranges, broker_addresses, pre_filter_texprs,
-                          nullptr);
+    BrokerScanner scanner(&_runtime_state, nullptr, params, ranges, broker_addresses,
+                          pre_filter_texprs, nullptr);
 
 #define private public
 
diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp b/be/test/vec/exec/vbroker_scan_node_test.cpp
index c0c091d9d6..a0d11d3ac9 100644
--- a/be/test/vec/exec/vbroker_scan_node_test.cpp
+++ b/be/test/vec/exec/vbroker_scan_node_test.cpp
@@ -46,6 +46,7 @@ public:
     VBrokerScanNodeTest() : _runtime_state(TQueryGlobals()) {
         init();
         _runtime_state._instance_mem_tracker.reset(new MemTracker());
+        _runtime_state._query_options.enable_vectorized_engine = true;
     }
     void init();
     static void SetUpTestCase() {
@@ -277,7 +278,7 @@ void VBrokerScanNodeTest::init_desc_table() {
             type.types.push_back(node);
         }
         slot_desc.slotType = type;
-        slot_desc.columnPos = 1;
+        slot_desc.columnPos = 2;
         slot_desc.byteOffset = 32;
         slot_desc.nullIndicatorByte = 0;
         slot_desc.nullIndicatorBit = -1;
@@ -304,7 +305,7 @@ void VBrokerScanNodeTest::init_desc_table() {
             type.types.push_back(node);
         }
         slot_desc.slotType = type;
-        slot_desc.columnPos = 1;
+        slot_desc.columnPos = 3;
         slot_desc.byteOffset = 48;
         slot_desc.nullIndicatorByte = 0;
         slot_desc.nullIndicatorBit = -1;
@@ -466,37 +467,31 @@ TEST_F(VBrokerScanNodeTest, normal) {
     doris::vectorized::Block block;
     bool eos = false;
     status = scan_node.get_next(&_runtime_state, &block, &eos);
-    ASSERT_EQ(3, block.rows());
+    ASSERT_EQ(4, block.rows());
     ASSERT_EQ(4, block.columns());
-    ASSERT_FALSE(eos);
-
-    auto columns = block.get_columns();
-    ASSERT_EQ(columns[0]->get_int(0), 1);
-    ASSERT_EQ(columns[0]->get_int(1), 4);
-    ASSERT_EQ(columns[0]->get_int(2), 8);
-
-    ASSERT_EQ(columns[1]->get_int(0), 2);
-    ASSERT_EQ(columns[1]->get_int(1), 5);
-    ASSERT_EQ(columns[1]->get_int(2), 9);
+    ASSERT_TRUE(eos);
 
-    ASSERT_EQ(columns[2]->get_int(0), 3);
-    ASSERT_EQ(columns[2]->get_int(1), 6);
-    ASSERT_EQ(columns[2]->get_int(2), 10);
+    auto columns = block.get_columns_with_type_and_name();
+    ASSERT_EQ(columns.size(), 4);
+    ASSERT_EQ(columns[0].to_string(0), "1");
+    ASSERT_EQ(columns[0].to_string(1), "4");
+    ASSERT_EQ(columns[0].to_string(2), "8");
+    ASSERT_EQ(columns[0].to_string(3), "4");
 
-    ASSERT_EQ(columns[3]->get_int(0), 1);
-    ASSERT_EQ(columns[3]->get_int(1), 1);
-    ASSERT_EQ(columns[3]->get_int(2), 1);
+    ASSERT_EQ(columns[1].to_string(0), "2");
+    ASSERT_EQ(columns[1].to_string(1), "5");
+    ASSERT_EQ(columns[1].to_string(2), "9");
+    ASSERT_EQ(columns[1].to_string(3), "5");
 
-    block.clear();
-    status = scan_node.get_next(&_runtime_state, &block, &eos);
-    ASSERT_EQ(1, block.rows());
-    ASSERT_FALSE(eos);
+    ASSERT_EQ(columns[2].to_string(0), "3");
+    ASSERT_EQ(columns[2].to_string(1), "6");
+    ASSERT_EQ(columns[2].to_string(2), "10");
+    ASSERT_EQ(columns[2].to_string(3), "6");
 
-    columns = block.get_columns();
-    ASSERT_EQ(columns[0]->get_int(0), 4);
-    ASSERT_EQ(columns[1]->get_int(0), 5);
-    ASSERT_EQ(columns[2]->get_int(0), 6);
-    ASSERT_EQ(columns[3]->get_int(0), 2);
+    ASSERT_EQ(columns[3].to_string(0), "1");
+    ASSERT_EQ(columns[3].to_string(1), "1");
+    ASSERT_EQ(columns[3].to_string(2), "1");
+    ASSERT_EQ(columns[3].to_string(3), "2");
 
     block.clear();
     status = scan_node.get_next(&_runtime_state, &block, &eos);
@@ -610,20 +605,21 @@ TEST_F(VBrokerScanNodeTest, where_binary_pre) {
     ASSERT_EQ(2, block.rows());
     ASSERT_EQ(4, block.columns());
 
-    auto columns = block.get_columns();
-    ASSERT_EQ(columns[0]->get_int(0), 1);
-    ASSERT_EQ(columns[0]->get_int(1), 4);
+    auto columns = block.get_columns_with_type_and_name();
+    ASSERT_EQ(columns.size(), 4);
+    ASSERT_EQ(columns[0].to_string(0), "1");
+    ASSERT_EQ(columns[0].to_string(1), "4");
 
-    ASSERT_EQ(columns[1]->get_int(0), 2);
-    ASSERT_EQ(columns[1]->get_int(1), 5);
+    ASSERT_EQ(columns[1].to_string(0), "2");
+    ASSERT_EQ(columns[1].to_string(1), "5");
 
-    ASSERT_EQ(columns[2]->get_int(0), 3);
-    ASSERT_EQ(columns[2]->get_int(1), 6);
+    ASSERT_EQ(columns[2].to_string(0), "3");
+    ASSERT_EQ(columns[2].to_string(1), "6");
 
-    ASSERT_EQ(columns[3]->get_int(0), 1);
-    ASSERT_EQ(columns[3]->get_int(1), 1);
+    ASSERT_EQ(columns[3].to_string(0), "1");
+    ASSERT_EQ(columns[3].to_string(1), "1");
 
-    ASSERT_FALSE(eos);
+    ASSERT_TRUE(eos);
 
     block.clear();
     status = scan_node.get_next(&_runtime_state, &block, &eos);
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 2d80401cd8..5ead638bef 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -362,28 +362,25 @@ TEST_F(VBrokerScannerTest, normal) {
     auto st = scanner.open();
     ASSERT_TRUE(st.ok());
 
-    int slot_count = 3;
-    auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id);
-    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
-    for (int i = 0; i < slot_count; i++) {
-        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
-    }
+    std::unique_ptr<vectorized::Block> block(new vectorized::Block());
     bool eof = false;
-    st = scanner.get_next(columns, &eof);
+    st = scanner.get_next(block.get(), &eof);
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(eof);
+    auto columns = block->get_columns();
+    ASSERT_EQ(columns.size(), 3);
 
-    ASSERT_EQ(columns[0]->get_int(0), 1);
-    ASSERT_EQ(columns[0]->get_int(1), 4);
-    ASSERT_EQ(columns[0]->get_int(2), 8);
+    ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1");
+    ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "4");
+    ASSERT_EQ(columns[0]->get_data_at(2).to_string(), "8");
 
-    ASSERT_EQ(columns[1]->get_int(0), 2);
-    ASSERT_EQ(columns[1]->get_int(1), 5);
-    ASSERT_EQ(columns[1]->get_int(2), 9);
+    ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2");
+    ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "5");
+    ASSERT_EQ(columns[1]->get_data_at(2).to_string(), "9");
 
-    ASSERT_EQ(columns[2]->get_int(0), 3);
-    ASSERT_EQ(columns[2]->get_int(1), 6);
-    ASSERT_EQ(columns[2]->get_int(2), 10);
+    ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3");
+    ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "6");
+    ASSERT_EQ(columns[2]->get_data_at(2).to_string(), "10");
 }
 
 TEST_F(VBrokerScannerTest, normal2) {
@@ -408,26 +405,22 @@ TEST_F(VBrokerScannerTest, normal2) {
     auto st = scanner.open();
     ASSERT_TRUE(st.ok());
 
-    int slot_count = 3;
-    auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id);
-    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
-    for (int i = 0; i < slot_count; i++) {
-        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
-    }
-
+    std::unique_ptr<vectorized::Block> block(new vectorized::Block());
     bool eof = false;
-    st = scanner.get_next(columns, &eof);
+    st = scanner.get_next(block.get(), &eof);
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(eof);
+    auto columns = block->get_columns();
+    ASSERT_EQ(columns.size(), 3);
 
-    ASSERT_EQ(columns[0]->get_int(0), 1);
-    ASSERT_EQ(columns[0]->get_int(1), 3);
+    ASSERT_EQ(columns[0]->get_data_at(0).to_string(), "1");
+    ASSERT_EQ(columns[0]->get_data_at(1).to_string(), "3");
 
-    ASSERT_EQ(columns[1]->get_int(0), 2);
-    ASSERT_EQ(columns[1]->get_int(1), 4);
+    ASSERT_EQ(columns[1]->get_data_at(0).to_string(), "2");
+    ASSERT_EQ(columns[1]->get_data_at(1).to_string(), "4");
 
-    ASSERT_EQ(columns[2]->get_int(0), 3);
-    ASSERT_EQ(columns[2]->get_int(1), 5);
+    ASSERT_EQ(columns[2]->get_data_at(0).to_string(), "3");
+    ASSERT_EQ(columns[2]->get_data_at(1).to_string(), "5");
 }
 
 TEST_F(VBrokerScannerTest, normal5) {
@@ -446,18 +439,15 @@ TEST_F(VBrokerScannerTest, normal5) {
     auto st = scanner.open();
     ASSERT_TRUE(st.ok());
 
-    int slot_count = 3;
-    auto tuple_desc = _desc_tbl->get_tuple_descriptor(_dst_tuple_id);
-    std::vector<vectorized::MutableColumnPtr> columns(slot_count);
-    for (int i = 0; i < slot_count; i++) {
-        columns[i] = tuple_desc->slots()[i]->get_empty_mutable_column();
-    }
+    std::unique_ptr<vectorized::Block> block(new vectorized::Block());
     bool eof = false;
     // end of file
-    st = scanner.get_next(columns, &eof);
+    st = scanner.get_next(block.get(), &eof);
     ASSERT_TRUE(st.ok());
     ASSERT_TRUE(eof);
-    ASSERT_EQ(columns[0]->size(), 0);
+    auto columns = block->get_columns();
+    ASSERT_EQ(columns.size(), 0);
 }
+
 } // namespace vectorized
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org