You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/07 16:12:56 UTC

[doris] 02/13: [improvement](sink) reuse rows buffer in msyql_result_writer (#20482)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit aff0907778ae9a5e0b627f48327d93d50921c56f
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Wed Jun 7 10:09:32 2023 +0800

    [improvement](sink) reuse rows buffer in msyql_result_writer (#20482)
    
    Creating a rows buffer for each block can impact non-negligible performance.
    So it is necessary to reuse the rows buffer.
    
    Test with a total of 1.7M rows, the AppendBatchTime reduced from 500ms to 280ms.
---
 be/src/util/mysql_row_buffer.cpp         | 20 +++++++++++
 be/src/util/mysql_row_buffer.h           |  2 ++
 be/src/vec/sink/vmysql_result_writer.cpp | 61 +++++++++++++++++++-------------
 be/src/vec/sink/vmysql_result_writer.h   |  3 ++
 4 files changed, 61 insertions(+), 25 deletions(-)

diff --git a/be/src/util/mysql_row_buffer.cpp b/be/src/util/mysql_row_buffer.cpp
index f408cb31a1..f346c2d43c 100644
--- a/be/src/util/mysql_row_buffer.cpp
+++ b/be/src/util/mysql_row_buffer.cpp
@@ -80,6 +80,25 @@ MysqlRowBuffer<is_binary_format>::MysqlRowBuffer()
           _dynamic_mode(0),
           _len_pos(0) {}
 
+template <bool is_binary_format>
+MysqlRowBuffer<is_binary_format>::MysqlRowBuffer(MysqlRowBuffer<is_binary_format>&& other) {
+    if (other._buf == other._default_buf) {
+        auto other_length = other.length();
+        memcpy(_default_buf, other._buf, other_length);
+        _buf = _default_buf;
+        _pos = _buf + other_length;
+    } else {
+        _buf = other._buf;
+        other._buf = other._default_buf;
+        _pos = other._pos;
+    }
+    _buf_size = other._buf_size;
+    _dynamic_mode = other._dynamic_mode;
+    _field_count = other._field_count;
+    _field_pos = other._field_pos;
+    _len_pos = other._len_pos;
+}
+
 template <bool is_binary_format>
 void MysqlRowBuffer<is_binary_format>::start_binary_row(uint32_t num_cols) {
     assert(is_binary_format);
@@ -94,6 +113,7 @@ template <bool is_binary_format>
 MysqlRowBuffer<is_binary_format>::~MysqlRowBuffer() {
     if (_buf != _default_buf) {
         delete[] _buf;
+        _buf = _default_buf;
     }
 }
 
diff --git a/be/src/util/mysql_row_buffer.h b/be/src/util/mysql_row_buffer.h
index 2df739450f..d0e91e766d 100644
--- a/be/src/util/mysql_row_buffer.h
+++ b/be/src/util/mysql_row_buffer.h
@@ -56,6 +56,8 @@ public:
     MysqlRowBuffer();
     ~MysqlRowBuffer();
 
+    MysqlRowBuffer(MysqlRowBuffer&& other);
+
     void reset() { _pos = _buf; }
 
     // Prepare for binary row buffer
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp b/be/src/vec/sink/vmysql_result_writer.cpp
index 6e3e34bcc6..019c0556c4 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -98,6 +98,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
     _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
     _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime");
     _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime");
+    _copy_buffer_timer = ADD_CHILD_TIMER(_parent_profile, "CopyBufferTime", "AppendBatchTime");
     _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
     _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES);
 }
@@ -605,43 +606,53 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
     Block block;
     RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
                                                                        input_block, &block));
-    auto num_rows = block.rows();
-    std::vector<MysqlRowBuffer<is_binary_format>> rows_buffer;
-    rows_buffer.resize(num_rows);
-    if constexpr (is_binary_format) {
-        for (MysqlRowBuffer<is_binary_format>& buf : rows_buffer) {
-            buf.start_binary_row(_output_vexpr_ctxs.size());
-        }
-    }
 
     // convert one batch
     auto result = std::make_unique<TFetchDataResult>();
-    for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
-        const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
-        auto type_ptr = block.get_by_position(i).type;
+    auto num_rows = block.rows();
 
-        DCHECK(num_rows == block.get_by_position(i).column->size())
-                << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i,
-                               block.get_by_position(i).column->size());
+    {
+        SCOPED_TIMER(_convert_tuple_timer);
+        if (_rows_buffer.size() < num_rows) {
+            _rows_buffer.resize(num_rows);
+        }
 
-        RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql(
-                *column_ptr, output_object_data(), rows_buffer, 0, 0, num_rows, col_const));
+        for (size_t i = 0; i != num_rows; ++i) {
+            _rows_buffer[i].reset();
+            if constexpr (is_binary_format) {
+                _rows_buffer[i].start_binary_row(_output_vexpr_ctxs.size());
+            }
+        }
 
-        if (!status) {
-            LOG(WARNING) << "convert row to mysql result failed. block_struct="
-                         << block.dump_structure();
-            break;
+        for (int i = 0; status.ok() && i < _output_vexpr_ctxs.size(); ++i) {
+            const auto& [column_ptr, col_const] = unpack_if_const(block.get_by_position(i).column);
+            auto type_ptr = block.get_by_position(i).type;
+
+            DCHECK(num_rows == block.get_by_position(i).column->size())
+                    << fmt::format("block's rows({}) != column{}'s size({})", num_rows, i,
+                                   block.get_by_position(i).column->size());
+
+            RETURN_IF_ERROR(type_ptr->get_serde()->write_column_to_mysql(
+                    *column_ptr, output_object_data(), _rows_buffer, 0, 0, num_rows, col_const));
+
+            if (!status) {
+                LOG(WARNING) << "convert row to mysql result failed. block_struct="
+                             << block.dump_structure();
+                break;
+            }
         }
     }
 
     uint64_t bytes_sent = 0;
     // copy MysqlRowBuffer to Thrift
-    result->result_batch.rows.resize(num_rows);
-    for (int i = 0; i < num_rows; ++i) {
-        result->result_batch.rows[i].append(rows_buffer[i].buf(), rows_buffer[i].length());
-        bytes_sent += rows_buffer[i].length();
+    {
+        SCOPED_TIMER(_copy_buffer_timer);
+        result->result_batch.rows.resize(num_rows);
+        for (int i = 0; i < num_rows; ++i) {
+            result->result_batch.rows[i].append(_rows_buffer[i].buf(), _rows_buffer[i].length());
+            bytes_sent += _rows_buffer[i].length();
+        }
     }
-
     if (status) {
         SCOPED_TIMER(_result_send_timer);
         // If this is a dry run task, no need to send data block
diff --git a/be/src/vec/sink/vmysql_result_writer.h b/be/src/vec/sink/vmysql_result_writer.h
index 0e0b4d9313..3b6b8579d7 100644
--- a/be/src/vec/sink/vmysql_result_writer.h
+++ b/be/src/vec/sink/vmysql_result_writer.h
@@ -69,6 +69,7 @@ private:
     BufferControlBlock* _sinker;
 
     const VExprContextSPtrs& _output_vexpr_ctxs;
+    std::vector<MysqlRowBuffer<is_binary_format>> _rows_buffer;
 
     RuntimeProfile* _parent_profile; // parent profile from result sink. not owned
     // total time cost on append batch operation
@@ -77,6 +78,8 @@ private:
     RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
     // file write timer, child timer of _append_row_batch_timer
     RuntimeProfile::Counter* _result_send_timer = nullptr;
+    // timer of copying buffer to thrift
+    RuntimeProfile::Counter* _copy_buffer_timer = nullptr;
     // number of sent rows
     RuntimeProfile::Counter* _sent_rows_counter = nullptr;
     // size of sent data


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org