You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/04/29 08:19:03 UTC

[GitHub] [incubator-doris] xiepengcheng01 opened a new pull request, #9314: [feature] (vec) instead of converting line to src tuple for stream load in vectorized.

xiepengcheng01 opened a new pull request, #9314:
URL: https://github.com/apache/incubator-doris/pull/9314

   # Proposed changes
   Support stream load in vectorized instead of converting line to src tuple for vbroker scanner.
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee merged pull request #9314: [feature] (vec) instead of converting line to src tuple for stream load in vectorized.

Posted by GitBox <gi...@apache.org>.
HappenLee merged PR #9314:
URL: https://github.com/apache/incubator-doris/pull/9314


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9314: [feature] (vec) instead of converting line to src tuple for stream load in vectorized.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9314:
URL: https://github.com/apache/incubator-doris/pull/9314#issuecomment-1119526345

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9314: [feature] (vec) instead of converting line to src tuple for stream load in vectorized.

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9314:
URL: https://github.com/apache/incubator-doris/pull/9314#discussion_r862587595


##########
be/src/exec/broker_scanner.cpp:
##########
@@ -560,8 +570,9 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
         str_slot->len = value.size;
     }
 
+    const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
     if (range.__isset.num_of_columns_from_file) {
-        fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path);
+        fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path);
     }
 
     _success = true;

Review Comment:
   in function `line_split_to_values` have set the `_success`,not need set here.



##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -121,73 +121,94 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
     bool scanner_eof = false;
 
     const int batch_size = _runtime_state->batch_size();
-    size_t slot_num = _tuple_desc->slots().size();
 
     while (!scanner_eof) {
-        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
-        std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-        for (int i = 0; i < slot_num; i++) {
-            columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+        RETURN_IF_CANCELLED(_runtime_state);
+        // If we have finished all works
+        if (_scan_finished.load() || !_process_status.ok()) {
+            return Status::OK();
         }
 
-        while (columns[0]->size() < batch_size && !scanner_eof) {
-            RETURN_IF_CANCELLED(_runtime_state);
-            // If we have finished all works
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
+        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
+        RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
+        if (block->rows() == 0) {
+            continue;
+        }
+        auto old_rows = block->rows();
+        RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
+                                                    _tuple_desc->slots().size()));
+        counter->num_rows_unselected += old_rows - block->rows();
+        if (block->rows() == 0) {
+            continue;
+        }
 
-            RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof));
-            if (scanner_eof) {
-                break;
-            }
+        // merge block
+        if (_mutable_block.get() == nullptr) {
+            _mutable_block.reset(new MutableBlock(block->clone_empty()));
         }
 
-        if (!columns[0]->empty()) {
-            auto n_columns = 0;
-            for (const auto slot_desc : _tuple_desc->slots()) {
-                block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                    slot_desc->get_data_type_ptr(),
-                                                    slot_desc->col_name()));
+        int row_wait_add = block->rows();
+        int begin = 0;
+        while (row_wait_add > 0) {
+            int row_add = 0;
+            int max_add = batch_size - _mutable_block->rows();
+            if (row_wait_add >= max_add) {
+                row_add = max_add;
+            } else {
+                row_add = row_wait_add;
             }
 
-            auto old_rows = block->rows();
-
-            RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
-                                                       _tuple_desc->slots().size()));
+            _mutable_block->add_rows(block.get(), begin, row_add);

Review Comment:
   here must have a copy here. should opt to skip the copy operation. if `_mutable_block.size() + block.size() < batch_size` to do merge. else directly return the block, do not copy here



##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -121,73 +121,94 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
     bool scanner_eof = false;
 
     const int batch_size = _runtime_state->batch_size();
-    size_t slot_num = _tuple_desc->slots().size();
 
     while (!scanner_eof) {
-        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
-        std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-        for (int i = 0; i < slot_num; i++) {
-            columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+        RETURN_IF_CANCELLED(_runtime_state);
+        // If we have finished all works
+        if (_scan_finished.load() || !_process_status.ok()) {
+            return Status::OK();
         }
 
-        while (columns[0]->size() < batch_size && !scanner_eof) {
-            RETURN_IF_CANCELLED(_runtime_state);
-            // If we have finished all works
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
+        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
+        RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
+        if (block->rows() == 0) {
+            continue;
+        }
+        auto old_rows = block->rows();
+        RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
+                                                    _tuple_desc->slots().size()));
+        counter->num_rows_unselected += old_rows - block->rows();
+        if (block->rows() == 0) {
+            continue;
+        }
 
-            RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof));
-            if (scanner_eof) {
-                break;
-            }
+        // merge block
+        if (_mutable_block.get() == nullptr) {
+            _mutable_block.reset(new MutableBlock(block->clone_empty()));
         }
 
-        if (!columns[0]->empty()) {
-            auto n_columns = 0;
-            for (const auto slot_desc : _tuple_desc->slots()) {
-                block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                    slot_desc->get_data_type_ptr(),
-                                                    slot_desc->col_name()));
+        int row_wait_add = block->rows();
+        int begin = 0;
+        while (row_wait_add > 0) {
+            int row_add = 0;
+            int max_add = batch_size - _mutable_block->rows();
+            if (row_wait_add >= max_add) {
+                row_add = max_add;
+            } else {
+                row_add = row_wait_add;
             }
 
-            auto old_rows = block->rows();
-
-            RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
-                                                       _tuple_desc->slots().size()));
+            _mutable_block->add_rows(block.get(), begin, row_add);
+            row_wait_add -= row_add;
+            begin += row_add;
+            if (_mutable_block->rows() >= batch_size) {
+                RETURN_IF_ERROR(push_block_queue());
+            }
+        }
+    }
 
-            counter->num_rows_unselected += old_rows - block->rows();
+    RETURN_IF_ERROR(push_block_queue());
+    return Status::OK();
+}
 
-            std::unique_lock<std::mutex> l(_batch_queue_lock);
-            while (_process_status.ok() && !_scan_finished.load() &&
-                   !_runtime_state->is_cancelled() &&
-                   // stop pushing more batch if
-                   // 1. too many batches in queue, or
-                   // 2. at least one batch in queue and memory exceed limit.
-                   (_block_queue.size() >= _max_buffered_batches ||
-                    (mem_tracker()->any_limit_exceeded() && !_block_queue.empty()))) {
-                _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
-            }
-            // Process already set failed, so we just return OK
-            if (!_process_status.ok()) {
-                return Status::OK();
-            }
-            // Scan already finished, just return
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
-            // Runtime state is canceled, just return cancel
-            if (_runtime_state->is_cancelled()) {
-                return Status::Cancelled("Cancelled");
-            }
-            // Queue size Must be smaller than _max_buffered_batches
-            _block_queue.push_back(block);
+Status VBrokerScanNode::push_block_queue() {
+    if (_mutable_block.get() == nullptr || _mutable_block->rows() == 0) {
+        return Status::OK();
+    }
 
-            // Notify reader to
-            _queue_reader_cond.notify_one();
-        }
+    auto output_block = _mutable_block->to_block();
+    std::shared_ptr<vectorized::Block> block_ptr(new vectorized::Block());
+    block_ptr->swap(std::move(output_block));

Review Comment:
   block_ptr->swap(_mutable_block->to_block());



##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -121,73 +121,94 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
     bool scanner_eof = false;
 
     const int batch_size = _runtime_state->batch_size();
-    size_t slot_num = _tuple_desc->slots().size();
 
     while (!scanner_eof) {
-        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
-        std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-        for (int i = 0; i < slot_num; i++) {
-            columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+        RETURN_IF_CANCELLED(_runtime_state);
+        // If we have finished all works
+        if (_scan_finished.load() || !_process_status.ok()) {
+            return Status::OK();
         }
 
-        while (columns[0]->size() < batch_size && !scanner_eof) {
-            RETURN_IF_CANCELLED(_runtime_state);
-            // If we have finished all works
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
+        std::shared_ptr<vectorized::Block> block(new vectorized::Block());

Review Comment:
   why here is shared_ptr ? not the unique_ptr?



##########
be/src/exec/base_scanner.cpp:
##########
@@ -133,6 +140,15 @@ Status BaseScanner::init_expr_ctxes() {
                << ", name=" << slot_desc->col_name();
             return Status::InternalError(ss.str());
         }
+
+        if (_state->enable_vectorized_exec()) {
+            vectorized::VExprContext* ctx = nullptr;
+            RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
+            RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
+            RETURN_IF_ERROR(ctx->open(_state));
+            _dest_vexpr_ctx.emplace_back(ctx);
+        }

Review Comment:
   else {}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9314: [feature] (vec) instead of converting line to src tuple for stream load in vectorized.

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9314:
URL: https://github.com/apache/incubator-doris/pull/9314#discussion_r866462928


##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block,
         return Status::OK();
     }
 
-    std::shared_ptr<vectorized::Block> scanner_block;
-    {
-        std::unique_lock<std::mutex> l(_batch_queue_lock);
-        while (_process_status.ok() && !_runtime_state->is_cancelled() &&
-               _num_running_scanners > 0 && _block_queue.empty()) {
-            SCOPED_TIMER(_wait_scanner_timer);
-            _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
-        }
-        if (!_process_status.ok()) {
-            // Some scanner process failed.
-            return _process_status;
+    const int batch_size = _runtime_state->batch_size();
+    while (true) {
+        std::shared_ptr<vectorized::Block> scanner_block;
+        {
+            std::unique_lock<std::mutex> l(_batch_queue_lock);
+            while (_process_status.ok() && !_runtime_state->is_cancelled() &&
+                _num_running_scanners > 0 && _block_queue.empty()) {
+                SCOPED_TIMER(_wait_scanner_timer);
+                _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
+            }
+            if (!_process_status.ok()) {
+                // Some scanner process failed.
+                return _process_status;
+            }
+            if (_runtime_state->is_cancelled()) {
+                if (update_status(Status::Cancelled("Cancelled"))) {
+                    _queue_writer_cond.notify_all();
+                }
+                return _process_status;
+            }
+            if (!_block_queue.empty()) {
+                scanner_block = _block_queue.front();
+                _block_queue.pop_front();
+            }
         }
-        if (_runtime_state->is_cancelled()) {
-            if (update_status(Status::Cancelled("Cancelled"))) {
-                _queue_writer_cond.notify_all();
+
+        // All scanner has been finished, and all cached batch has been read
+        if (scanner_block == nullptr || scanner_block.get() == nullptr) {
+            if (_mutable_block.get() != nullptr && !_mutable_block->empty()) {
+                *block = _mutable_block->to_block();
+                reached_limit(block, eos);
+                LOG_IF(INFO, *eos) << "VBrokerScanNode ReachedLimit.";
             }
-            return _process_status;
+            _scan_finished.store(true);
+            *eos = true;
+            return Status::OK();
         }
-        if (!_block_queue.empty()) {
-            scanner_block = _block_queue.front();
-            _block_queue.pop_front();
+        // notify one scanner
+        _queue_writer_cond.notify_one();
+
+        if (_mutable_block.get() == nullptr) {

Review Comment:
   unlikely, should move the line to up



##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -61,43 +61,69 @@ Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block,
         return Status::OK();
     }
 
-    std::shared_ptr<vectorized::Block> scanner_block;
-    {
-        std::unique_lock<std::mutex> l(_batch_queue_lock);
-        while (_process_status.ok() && !_runtime_state->is_cancelled() &&
-               _num_running_scanners > 0 && _block_queue.empty()) {
-            SCOPED_TIMER(_wait_scanner_timer);
-            _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
-        }
-        if (!_process_status.ok()) {
-            // Some scanner process failed.
-            return _process_status;
+    const int batch_size = _runtime_state->batch_size();
+    while (true) {
+        std::shared_ptr<vectorized::Block> scanner_block;
+        {
+            std::unique_lock<std::mutex> l(_batch_queue_lock);
+            while (_process_status.ok() && !_runtime_state->is_cancelled() &&
+                _num_running_scanners > 0 && _block_queue.empty()) {
+                SCOPED_TIMER(_wait_scanner_timer);
+                _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
+            }
+            if (!_process_status.ok()) {
+                // Some scanner process failed.
+                return _process_status;
+            }
+            if (_runtime_state->is_cancelled()) {
+                if (update_status(Status::Cancelled("Cancelled"))) {
+                    _queue_writer_cond.notify_all();
+                }
+                return _process_status;
+            }
+            if (!_block_queue.empty()) {
+                scanner_block = _block_queue.front();
+                _block_queue.pop_front();
+            }
         }
-        if (_runtime_state->is_cancelled()) {
-            if (update_status(Status::Cancelled("Cancelled"))) {
-                _queue_writer_cond.notify_all();
+
+        // All scanner has been finished, and all cached batch has been read
+        if (scanner_block == nullptr || scanner_block.get() == nullptr) {

Review Comment:
   `scanner_block == nullptr || scanner_block.get() == nullptr` only need one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9314: [feature] (vec) instead of converting line to src tuple for stream load in vectorized.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9314:
URL: https://github.com/apache/incubator-doris/pull/9314#issuecomment-1119526364

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9314: [feature] (vec) instead of converting line to src tuple for stream load in vectorized.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9314:
URL: https://github.com/apache/incubator-doris/pull/9314#issuecomment-1120175753

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9314: [feature] (vec) instead of converting line to src tuple for stream load in vectorized.

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9314:
URL: https://github.com/apache/incubator-doris/pull/9314#discussion_r862596558


##########
be/src/vec/exec/vbroker_scanner.cpp:
##########
@@ -21,25 +21,40 @@
 #include <iostream>
 #include <sstream>
 
+#include "exec/text_converter.h"
 #include "exec/exec_node.h"
 #include "exprs/expr_context.h"
 #include "exec/plain_text_line_reader.h"
+#include "util/utf8_check.h"
 
 namespace doris::vectorized {
+
+bool is_null(const Slice& slice) {
+    return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
+}
+
 VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
                                const TBrokerScanRangeParams& params,
                                const std::vector<TBrokerRangeDesc>& ranges,
                                const std::vector<TNetworkAddress>& broker_addresses,
                                const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
         : BrokerScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs,
-                        counter) {}
+                        counter) {
+    _text_converter.reset(new (std::nothrow) TextConverter('\\'));
+}
 
-Status VBrokerScanner::get_next(std::vector<MutableColumnPtr>& columns, bool* eof) {
+Status VBrokerScanner::get_next(Block* output_block, bool* eof) {
     SCOPED_TIMER(_read_timer);
 
     const int batch_size = _state->batch_size();
+    std::shared_ptr<vectorized::Block> tmp_block(std::make_shared<Block>());

Review Comment:
   why define tmp_block here ? should in  `_fill_dest_block`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org