You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/31 03:53:38 UTC
[incubator-doris] branch master updated: [Opt][VecLoad] Opt the vec stream load performance (#9772)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7199102d7c [Opt][VecLoad] Opt the vec stream load performance (#9772)
7199102d7c is described below
commit 7199102d7cf421d245245229a8cb003a6089ec55
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue May 31 11:53:32 2022 +0800
[Opt][VecLoad] Opt the vec stream load performance (#9772)
Co-authored-by: lihaopeng <li...@baidu.com>
---
be/src/exec/text_converter.h | 4 ++++
be/src/exec/text_converter.hpp | 18 ++++++++++++++++++
be/src/olap/delta_writer.cpp | 13 +------------
be/src/olap/memtable.cpp | 13 +++++++++----
be/src/olap/memtable.h | 2 +-
be/src/vec/exec/vbroker_scanner.cpp | 18 +++---------------
be/src/vec/exec/vbroker_scanner.h | 3 ---
7 files changed, 36 insertions(+), 35 deletions(-)
diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h
index 7d957234a7..0f0f871e19 100644
--- a/be/src/exec/text_converter.h
+++ b/be/src/exec/text_converter.h
@@ -48,6 +48,10 @@ public:
bool write_slot(const SlotDescriptor* slot_desc, Tuple* tuple, const char* data, int len,
bool copy_string, bool need_escape, MemPool* pool);
+ void write_string_column(const SlotDescriptor* slot_desc,
+ vectorized::MutableColumnPtr* column_ptr, const char* data,
+ size_t len);
+
bool write_column(const SlotDescriptor* slot_desc, vectorized::MutableColumnPtr* column_ptr,
const char* data, size_t len, bool copy_string, bool need_escape);
diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp
index 14667c35c0..0d4c5af788 100644
--- a/be/src/exec/text_converter.hpp
+++ b/be/src/exec/text_converter.hpp
@@ -166,6 +166,24 @@ inline bool TextConverter::write_slot(const SlotDescriptor* slot_desc, Tuple* tu
return true;
}
+inline void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
+ vectorized::MutableColumnPtr* column_ptr,
+ const char* data, size_t len) {
+ vectorized::IColumn* col_ptr = column_ptr->get();
+ // \N means it's NULL
+ if (LIKELY(slot_desc->is_nullable())) {
+ auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
+ if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) {
+ nullable_column->insert_data(nullptr, 0);
+ return;
+ } else {
+ nullable_column->get_null_map_data().push_back(0);
+ col_ptr = &nullable_column->get_nested_column();
+ }
+ }
+ reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(data, len);
+}
+
inline bool TextConverter::write_column(const SlotDescriptor* slot_desc,
vectorized::MutableColumnPtr* column_ptr, const char* data,
size_t len, bool copy_string, bool need_escape) {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index fe0ffeddbb..132a79a390 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -190,18 +190,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
- int start = 0, end = 0;
- const size_t num_rows = row_idxs.size();
- for (; start < num_rows;) {
- auto count = end + 1 - start;
- if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) {
- _mem_table->insert(block, row_idxs[start], count);
- start += count;
- end = start;
- } else {
- end++;
- }
- }
+ _mem_table->insert(block, row_idxs);
if (_mem_table->need_to_agg()) {
_mem_table->shrink_memtable_by_agg();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 77b0313004..08c92c3404 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -114,7 +114,7 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
*_pblock, -1);
}
-void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
+void MemTable::insert(const vectorized::Block* block, const std::vector<int>& row_idxs) {
if (_is_first_insertion) {
_is_first_insertion = false;
auto cloneBlock = block->clone_without_columns();
@@ -125,8 +125,9 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num
_init_agg_functions(block);
}
}
+ auto num_rows = row_idxs.size();
size_t cursor_in_mutableblock = _input_mutable_block.rows();
- _input_mutable_block.add_rows(block, row_pos, num_rows);
+ _input_mutable_block.add_rows(block, row_idxs.data(), row_idxs.data() + num_rows);
size_t input_size = block->allocated_bytes() * num_rows / block->rows();
_mem_usage += input_size;
_mem_tracker->consume(input_size);
@@ -245,11 +246,15 @@ template <bool is_final>
void MemTable::_collect_vskiplist_results() {
VecTable::Iterator it(_vec_skip_list.get());
vectorized::Block in_block = _input_mutable_block.to_block();
- // TODO: should try to insert data by column, not by row. to opt the code
if (_keys_type == KeysType::DUP_KEYS) {
+ std::vector<int> row_pos_vec;
+ DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
+ row_pos_vec.reserve(in_block.rows());
for (it.SeekToFirst(); it.Valid(); it.Next()) {
- _output_mutable_block.add_row(&in_block, it.key()->_row_pos);
+ row_pos_vec.emplace_back(it.key()->_row_pos);
}
+ _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
+ row_pos_vec.data() + in_block.rows());
} else {
size_t idx = 0;
for (it.SeekToFirst(); it.Valid(); it.Next()) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index c73b39b39d..c594f77679 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -52,7 +52,7 @@ public:
inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
// insert tuple from (row_pos) to (row_pos+num_rows)
- void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+ void insert(const vectorized::Block* block, const std::vector<int>& row_idxs);
void shrink_memtable_by_agg();
diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp
index 3006dba788..41d0dcac1d 100644
--- a/be/src/vec/exec/vbroker_scanner.cpp
+++ b/be/src/vec/exec/vbroker_scanner.cpp
@@ -20,11 +20,11 @@
#include <fmt/format.h>
#include <iostream>
-#include <sstream>
#include "exec/exec_node.h"
#include "exec/plain_text_line_reader.h"
#include "exec/text_converter.h"
+#include "exec/text_converter.hpp"
#include "exprs/expr_context.h"
#include "util/utf8_check.h"
@@ -111,22 +111,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
continue;
}
- RETURN_IF_ERROR(_write_text_column(value.data, value.size, src_slot_desc,
- &columns[dest_index], _state));
+ _text_converter->write_string_column(src_slot_desc, &columns[dest_index], value.data,
+ value.size);
}
return Status::OK();
}
-
-Status VBrokerScanner::_write_text_column(char* value, int value_length, SlotDescriptor* slot,
- vectorized::MutableColumnPtr* column_ptr,
- RuntimeState* state) {
- if (!_text_converter->write_column(slot, column_ptr, value, value_length, true, false)) {
- std::stringstream ss;
- ss << "Fail to convert text value:'" << value << "' to " << slot->type() << " on column:`"
- << slot->col_name() + "`";
- return Status::InternalError(ss.str());
- }
- return Status::OK();
-}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h
index 11d8b494fa..cbd00f859a 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -39,9 +39,6 @@ public:
private:
std::unique_ptr<TextConverter> _text_converter;
- Status _write_text_column(char* value, int length, SlotDescriptor* slot,
- MutableColumnPtr* column_ptr, RuntimeState* state);
-
Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org