You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/07 16:12:56 UTC
[doris] 02/13: [improvement](sink) reuse rows buffer in msyql_result_writer (#20482)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git
commit aff0907778ae9a5e0b627f48327d93d50921c56f
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Wed Jun 7 10:09:32 2023 +0800
[improvement](sink) reuse rows buffer in msyql_result_writer (#20482)
Creating a rows buffer for each block can impact non-negligible performance.
So it is necessary to reuse the rows buffer.
Test with a total of 1.7M rows, the AppendBatchTime reduced from 500ms to 280ms.
---
be/src/util/mysql_row_buffer.cpp | 20 +++++++++++
be/src/util/mysql_row_buffer.h | 2 ++
be/src/vec/sink/vmysql_result_writer.cpp | 61 +++++++++++++++++++-------------
be/src/vec/sink/vmysql_result_writer.h | 3 ++
4 files changed, 61 insertions(+), 25 deletions(-)
diff --git a/be/src/util/mysql_row_buffer.cpp b/be/src/util/mysql_row_buffer.cpp
index f408cb31a1..f346c2d43c 100644
--- a/be/src/util/mysql_row_buffer.cpp
+++ b/be/src/util/mysql_row_buffer.cpp
@@ -80,6 +80,25 @@ MysqlRowBuffer<is_binary_format>::MysqlRowBuffer()
_dynamic_mode(0),
_len_pos(0) {}
+template <bool is_binary_format>
+MysqlRowBuffer<is_binary_format>::MysqlRowBuffer(MysqlRowBuffer<is_binary_format>&& other) {
+ if (other._buf == other._default_buf) {
+ auto other_length = other.length();
+ memcpy(_default_buf, other._buf, other_length);
+ _buf = _default_buf;
+ _pos = _buf + other_length;
+ } else {
+ _buf = other._buf;
+ other._buf = other._default_buf;
+ _pos = other._pos;
+ }
+ _buf_size = other._buf_size;
+ _dynamic_mode = other._dynamic_mode;
+ _field_count = other._field_count;
+ _field_pos = other._field_pos;
+ _len_pos = other._len_pos;
+}
+
template <bool is_binary_format>
void MysqlRowBuffer<is_binary_format>::start_binary_row(uint32_t num_cols) {
assert(is_binary_format);
@@ -94,6 +113,7 @@ template <bool is_binary_format>
MysqlRowBuffer<is_binary_format>::~MysqlRowBuffer() {
if (_buf != _default_buf) {
delete[] _buf;
+ _buf = _default_buf;
}
}
diff --git a/be/src/util/mysql_row_buffer.h b/be/src/util/mysql_row_buffer.h
index 2df739450f..d0e91e766d 100644
--- a/be/src/util/mysql_row_buffer.h
+++ b/be/src/util/mysql_row_buffer.h
@@ -56,6 +56,8 @@ public:
MysqlRowBuffer();
~MysqlRowBuffer();
+ MysqlRowBuffer(MysqlRowBuffer&& other);
+
void reset() { _pos = _buf; }
// Prepare for binary row buffer
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp
index 6e3e34bcc6..019c0556c4 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -98,6 +98,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
_append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
_convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime");
_result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime");
+ _copy_buffer_timer = ADD_CHILD_TIMER(_parent_profile, "CopyBufferTime", "AppendBatchTime");
_sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
_bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES);
}
@@ -605,43 +606,53 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
Block block;
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));
- auto num_rows = block.rows();
- std::vector<MysqlRowBuffer<is_binary_format>> rows_buffer;
- rows_buffer.resize(num_rows);
- if constexpr (is_binary_format) {
- for (MysqlRowBuffer<is_binary_format>& buf : rows_buffer) {
- buf.start_binary_row(_output_vexpr_ctxs.size());
- }
- }
// convert one batch
auto result = std::make_unique<TFetchDataResult>();
- for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
- const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
- auto type_ptr = block.get_by_position(i).type;
+ auto num_rows = block.rows();
- DCHECK(num_rows == block.get_by_position(i).column->size())
- << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i,
- block.get_by_position(i).column->size());
+ {
+ SCOPED_TIMER(_convert_tuple_timer);
+ if (_rows_buffer.size() < num_rows) {
+ _rows_buffer.resize(num_rows);
+ }
- RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql(
- *column_ptr, output_object_data(), rows_buffer, 0, 0, num_rows, col_const));
+ for (size_t i = 0; i != num_rows; ++i) {
+ _rows_buffer[i].reset();
+ if constexpr (is_binary_format) {
+ _rows_buffer[i].start_binary_row(_output_vexpr_ctxs.size());
+ }
+ }
- if (!status) {
- LOG(WARNING) << "convert row to mysql result failed. block_struct="
- << block.dump_structure();
- break;
+ for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
+ const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
+ auto type_ptr = block.get_by_position(i).type;
+
+ DCHECK(num_rows == block.get_by_position(i).column->size())
+ << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i,
+ block.get_by_position(i).column->size());
+
+ RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql(
+ *column_ptr, output_object_data(), _rows_buffer, 0, 0, num_rows, col_const));
+
+ if (!status) {
+ LOG(WARNING) << "convert row to mysql result failed. block_struct="
+ << block.dump_structure();
+ break;
+ }
}
}
uint64_t bytes_sent = 0;
// copy MysqlRowBuffer to Thrift
- result->result_batch.rows.resize(num_rows);
- for (int i = 0; i < num_rows; ++i) {
- result->result_batch.rows[i].append(rows_buffer[i].buf(), rows_buffer[i].length());
- bytes_sent += rows_buffer[i].length();
+ {
+ SCOPED_TIMER(_copy_buffer_timer);
+ result->result_batch.rows.resize(num_rows);
+ for (int i = 0; i < num_rows; ++i) {
+ result->result_batch.rows[i].append(_rows_buffer[i].buf(), _rows_buffer[i].length());
+ bytes_sent += _rows_buffer[i].length();
+ }
}
-
if (status) {
SCOPED_TIMER(_result_send_timer);
// If this is a dry run task, no need to send data block
diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h
index 0e0b4d9313..3b6b8579d7 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -69,6 +69,7 @@ private:
BufferControlBlock* _sinker;
const VExprContextSPtrs& _output_vexpr_ctxs;
+ std::vector<MysqlRowBuffer<is_binary_format>> _rows_buffer;
RuntimeProfile* _parent_profile; // parent profile from result sink. not owned
// total time cost on append batch operation
@@ -77,6 +78,8 @@ private:
RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
// file write timer, child timer of _append_row_batch_timer
RuntimeProfile::Counter* _result_send_timer = nullptr;
+ // timer of copying buffer to thrift
+ RuntimeProfile::Counter* _copy_buffer_timer = nullptr;
// number of sent rows
RuntimeProfile::Counter* _sent_rows_counter = nullptr;
// size of sent data
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org