You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/26 02:15:39 UTC

[incubator-doris] branch master updated: [performance] Improve DeltaWriter's performance. (#7216)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 948a2a7  [performance] Improve DeltaWriter's performance. (#7216)
948a2a7 is described below

commit 948a2a738d44bdbf93721e27fc834d54a1f3cd80
Author: 曹建华 <ca...@bytedance.com>
AuthorDate: Fri Nov 26 10:15:27 2021 +0800

    [performance] Improve DeltaWriter's performance. (#7216)
    
    1. Support batch write for DeltaWriter.
    2. Use mutex instead of SpinLock.
---
 be/src/olap/delta_writer.cpp              | 36 +++++++++++++++++++++++++------
 be/src/olap/delta_writer.h                |  6 ++++--
 be/src/runtime/tablets_channel.cpp        | 29 ++++++++++++++++---------
 be/test/runtime/load_channel_mgr_test.cpp |  8 +++++++
 4 files changed, 61 insertions(+), 18 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 3c0d2fc..42712a6 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -24,6 +24,8 @@
 #include "olap/schema.h"
 #include "olap/schema_change.h"
 #include "olap/storage_engine.h"
+#include "runtime/row_batch.h"
+#include "runtime/tuple_row.h"
 
 namespace doris {
 
@@ -153,7 +155,7 @@ OLAPStatus DeltaWriter::init() {
 }
 
 OLAPStatus DeltaWriter::write(Tuple* tuple) {
-    std::lock_guard<SpinLock> l(_lock); 
+    std::lock_guard<std::mutex> l(_lock);
     if (!_is_init && !_is_cancelled) {
         RETURN_NOT_OK(init());
     }
@@ -179,6 +181,28 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
     return OLAP_SUCCESS;
 }
 
+OLAPStatus DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row_idxs) {
+    std::lock_guard<std::mutex> l(_lock);
+    if (!_is_init && !_is_cancelled) {
+        RETURN_NOT_OK(init());
+    }
+
+    if (_is_cancelled) {
+        return OLAP_ERR_ALREADY_CANCELLED;
+    }
+
+    for (const auto& row_idx : row_idxs) {
+        _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
+    }
+
+    if (_mem_table->memory_usage() >= config::write_buffer_size) {
+        RETURN_NOT_OK(_flush_memtable_async());
+        _reset_mem_table();
+    }
+
+    return OLAP_SUCCESS;
+}
+
 OLAPStatus DeltaWriter::_flush_memtable_async() {
     if (++_segment_counter > config::max_segment_num_per_rowset) {
         return OLAP_ERR_TOO_MANY_SEGMENTS;
@@ -187,7 +211,7 @@ OLAPStatus DeltaWriter::_flush_memtable_async() {
 }
 
 OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
-    std::lock_guard<SpinLock> l(_lock); 
+    std::lock_guard<std::mutex> l(_lock);
     if (!_is_init) {
         // This writer is not initialized before flushing. Do nothing
         // But we return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED,
@@ -220,7 +244,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
 }
 
 OLAPStatus DeltaWriter::wait_flush() {
-    std::lock_guard<SpinLock> l(_lock); 
+    std::lock_guard<std::mutex> l(_lock);
     if (!_is_init) {
         // return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED for same reason
         // as described in flush_memtable_and_wait()
@@ -240,7 +264,7 @@ void DeltaWriter::_reset_mem_table() {
 }
 
 OLAPStatus DeltaWriter::close() {
-    std::lock_guard<SpinLock> l(_lock); 
+    std::lock_guard<std::mutex> l(_lock);
     if (!_is_init && !_is_cancelled) {
         // if this delta writer is not initialized, but close() is called.
         // which means this tablet has no data loaded, but at least one tablet
@@ -260,7 +284,7 @@ OLAPStatus DeltaWriter::close() {
 }
 
 OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
-    std::lock_guard<SpinLock> l(_lock); 
+    std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() being called";
 
@@ -328,7 +352,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
 }
 
 OLAPStatus DeltaWriter::cancel() {
-    std::lock_guard<SpinLock> l(_lock); 
+    std::lock_guard<std::mutex> l(_lock);
     if (!_is_init || _is_cancelled) {
         return OLAP_SUCCESS;
     }
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 728e0b2..00e0436 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -21,17 +21,18 @@
 #include "gen_cpp/internal_service.pb.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/tablet.h"
-#include "util/spinlock.h"
 
 namespace doris {
 
 class FlushToken;
 class MemTable;
 class MemTracker;
+class RowBatch;
 class Schema;
 class StorageEngine;
 class Tuple;
 class TupleDescriptor;
+class TupleRow;
 class SlotDescriptor;
 
 enum WriteType { LOAD = 1, LOAD_DELETE = 2, DELETE = 3 };
@@ -61,6 +62,7 @@ public:
     OLAPStatus init();
 
     OLAPStatus write(Tuple* tuple);
+    OLAPStatus write(const RowBatch* row_batch, const std::vector<int>& row_idxs);
     // flush the last memtable to flush queue, must call it before close_wait()
     OLAPStatus close();
     // wait for all memtables to be flushed.
@@ -118,7 +120,7 @@ private:
     // The counter of number of segment flushed already.
     int64_t _segment_counter = 0;
 
-    SpinLock _lock;
+    std::mutex _lock;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index f74861f..a49d847 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -100,20 +100,29 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
     }
 
     RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get());
-
-    // iterator all data
+    std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs;
     for (int i = 0; i < params.tablet_ids_size(); ++i) {
-        auto tablet_id = params.tablet_ids(i);
-        auto it = _tablet_writers.find(tablet_id);
-        if (it == std::end(_tablet_writers)) {
+        int64_t tablet_id = params.tablet_ids(i);
+        auto it = tablet_to_rowidxs.find(tablet_id);
+        if (it == tablet_to_rowidxs.end()) {
+            tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i });
+        } else {
+            it->second.emplace_back(i);
+        }
+    }
+
+    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
+        auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
+        if (tablet_writer_it == _tablet_writers.end()) {
             return Status::InternalError(
-                    strings::Substitute("unknown tablet to append data, tablet=$0", tablet_id));
+                    strings::Substitute("unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first));
         }
-        auto st = it->second->write(row_batch.get_row(i)->get_tuple(0));
+
+        OLAPStatus st = tablet_writer_it->second->write(&row_batch, tablet_to_rowidxs_it.second);
         if (st != OLAP_SUCCESS) {
-            const std::string& err_msg = strings::Substitute(
-                    "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2", it->first,
-                    _txn_id, st);
+            auto err_msg = strings::Substitute(
+                    "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2",
+                    tablet_to_rowidxs_it.first, _txn_id, st);
             LOG(WARNING) << err_msg;
             return Status::InternalError(err_msg);
         }
diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp
index 80e03ad..f8b2374 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -73,6 +73,14 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
     return add_status;
 }
 
+OLAPStatus DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row_idxs) {
+    if (_k_tablet_recorder.find(_req.tablet_id) == std::end(_k_tablet_recorder)) {
+        _k_tablet_recorder[_req.tablet_id] = 0;
+    }
+    _k_tablet_recorder[_req.tablet_id] += row_idxs.size();
+    return add_status;
+}
+
 OLAPStatus DeltaWriter::close() {
     return OLAP_SUCCESS;
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org