You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/31 03:53:38 UTC

[incubator-doris] branch master updated: [Opt][VecLoad] Opt the vec stream load performance (#9772)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7199102d7c [Opt][VecLoad] Opt the vec stream load performance (#9772)
7199102d7c is described below

commit 7199102d7cf421d245245229a8cb003a6089ec55
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue May 31 11:53:32 2022 +0800

    [Opt][VecLoad] Opt the vec stream load performance (#9772)
    
    Co-authored-by: lihaopeng <li...@baidu.com>
---
 be/src/exec/text_converter.h        |  4 ++++
 be/src/exec/text_converter.hpp      | 18 ++++++++++++++++++
 be/src/olap/delta_writer.cpp        | 13 +------------
 be/src/olap/memtable.cpp            | 13 +++++++++----
 be/src/olap/memtable.h              |  2 +-
 be/src/vec/exec/vbroker_scanner.cpp | 18 +++---------------
 be/src/vec/exec/vbroker_scanner.h   |  3 ---
 7 files changed, 36 insertions(+), 35 deletions(-)

diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h
index 7d957234a7..0f0f871e19 100644
--- a/be/src/exec/text_converter.h
+++ b/be/src/exec/text_converter.h
@@ -48,6 +48,10 @@ public:
     bool write_slot(const SlotDescriptor* slot_desc, Tuple* tuple, const char* data, int len,
                     bool copy_string, bool need_escape, MemPool* pool);
 
+    void write_string_column(const SlotDescriptor* slot_desc,
+                             vectorized::MutableColumnPtr* column_ptr, const char* data,
+                             size_t len);
+
     bool write_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr,
                       const char* data, size_t len, bool copy_string, bool need_escape);
 
diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp
index 14667c35c0..0d4c5af788 100644
--- a/be/src/exec/text_converter.hpp
+++ b/be/src/exec/text_converter.hpp
@@ -166,6 +166,24 @@ inline bool TextConverter::write_slot(const SlotDescriptor* slot_desc, Tuple* tu
     return true;
 }
 
+inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
+                                               vectorized::MutableColumnPtr* column_ptr,
+                                               const char* data, size_t len) {
+    vectorized::IColumn* col_ptr = column_ptr->get();
+    // \N means it's NULL
+    if (LIKELY(slot_desc->is_nullable())) {
+        auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
+        if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) {
+            nullable_column->insert_data(nullptr, 0);
+            return;
+        } else {
+            nullable_column->get_null_map_data().push_back(0);
+            col_ptr = &nullable_column->get_nested_column();
+        }
+    }
+    reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data, len);
+}
+
 inline bool TextConverter::write_column(const SlotDescriptor* slot_desc,
                                         vectorized::MutableColumnPtr* column_ptr, const char* data,
                                         size_t len, bool copy_string, bool need_escape) {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index fe0ffeddbb..132a79a390 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -190,18 +190,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
-    int start = 0, end = 0;
-    const size_t num_rows = row_idxs.size();
-    for (; start < num_rows;) {
-        auto count = end + 1 - start;
-        if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) {
-            _mem_table->insert(block, row_idxs[start], count);
-            start += count;
-            end = start;
-        } else {
-            end++;
-        }
-    }
+    _mem_table->insert(block, row_idxs);
 
     if (_mem_table->need_to_agg()) {
         _mem_table->shrink_memtable_by_agg();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 77b0313004..08c92c3404 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -114,7 +114,7 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
                                *_pblock, -1);
 }
 
-void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
+void MemTable::insert(const vectorized::Block* block, const std::vector<int>& row_idxs) {
     if (_is_first_insertion) {
         _is_first_insertion = false;
         auto cloneBlock = block->clone_without_columns();
@@ -125,8 +125,9 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num
             _init_agg_functions(block);
         }
     }
+    auto num_rows = row_idxs.size();
     size_t cursor_in_mutableblock = _input_mutable_block.rows();
-    _input_mutable_block.add_rows(block, row_pos, num_rows);
+    _input_mutable_block.add_rows(block, row_idxs.data(), row_idxs.data() + num_rows);
     size_t input_size = block->allocated_bytes() * num_rows / block->rows();
     _mem_usage += input_size;
     _mem_tracker->consume(input_size);
@@ -245,11 +246,15 @@ template <bool is_final>
 void MemTable::_collect_vskiplist_results() {
     VecTable::Iterator it(_vec_skip_list.get());
     vectorized::Block in_block = _input_mutable_block.to_block();
-    // TODO: should try to insert data by column, not by row. to opt the code
     if (_keys_type == KeysType::DUP_KEYS) {
+        std::vector<int> row_pos_vec;
+        DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
+        row_pos_vec.reserve(in_block.rows());
         for (it.SeekToFirst(); it.Valid(); it.Next()) {
-            _output_mutable_block.add_row(&in_block, it.key()->_row_pos);
+            row_pos_vec.emplace_back(it.key()->_row_pos);
         }
+        _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
+                                       row_pos_vec.data() + in_block.rows());
     } else {
         size_t idx = 0;
         for (it.SeekToFirst(); it.Valid(); it.Next()) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index c73b39b39d..c594f77679 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -52,7 +52,7 @@ public:
 
     inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
     // insert tuple from (row_pos) to (row_pos+num_rows)
-    void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+    void insert(const vectorized::Block* block, const std::vector<int>& row_idxs);
 
     void shrink_memtable_by_agg();
 
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index 3006dba788..41d0dcac1d 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -20,11 +20,11 @@
 #include <fmt/format.h>
 
 #include <iostream>
-#include <sstream>
 
 #include "exec/exec_node.h"
 #include "exec/plain_text_line_reader.h"
 #include "exec/text_converter.h"
+#include "exec/text_converter.hpp"
 #include "exprs/expr_context.h"
 #include "util/utf8_check.h"
 
@@ -111,22 +111,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
             continue;
         }
 
-        RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc,
-                                           &columns[dest_index], _state));
+        _text_converter->write_string_column(src_slot_desc, &columns[dest_index], value.data,
+                                             value.size);
     }
 
     return Status::OK();
 }
-
-Status VBrokerScanner::_write_text_column(char* value, int value_length, SlotDescriptor* slot,
-                                          vectorized::MutableColumnPtr* column_ptr,
-                                          RuntimeState* state) {
-    if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) {
-        std::stringstream ss;
-        ss << "Fail to convert text value:'" << value << "' to " << slot->type() << " on column:`"
-           << slot->col_name() + "`";
-        return Status::InternalError(ss.str());
-    }
-    return Status::OK();
-}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h
index 11d8b494fa..cbd00f859a 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -39,9 +39,6 @@ public:
 private:
     std::unique_ptr<TextConverter> _text_converter;
 
-    Status _write_text_column(char* value, int length, SlotDescriptor* slot,
-                              MutableColumnPtr* column_ptr, RuntimeState* state);
-
     Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);
 };
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org