You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/26 02:15:39 UTC
[incubator-doris] branch master updated: [performance] Improve DeltaWriter's performance. (#7216)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 948a2a7 [performance] Improve DeltaWriter's performance. (#7216)
948a2a7 is described below
commit 948a2a738d44bdbf93721e27fc834d54a1f3cd80
Author: 曹建华 <ca...@bytedance.com>
AuthorDate: Fri Nov 26 10:15:27 2021 +0800
[performance] Improve DeltaWriter's performance. (#7216)
1. Support batch write for DeltaWriter.
2. Use mutex instead of SpinLock.
---
be/src/olap/delta_writer.cpp | 36 +++++++++++++++++++++++++------
be/src/olap/delta_writer.h | 6 ++++--
be/src/runtime/tablets_channel.cpp | 29 ++++++++++++++++---------
be/test/runtime/load_channel_mgr_test.cpp | 8 +++++++
4 files changed, 61 insertions(+), 18 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 3c0d2fc..42712a6 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -24,6 +24,8 @@
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
+#include "runtime/row_batch.h"
+#include "runtime/tuple_row.h"
namespace doris {
@@ -153,7 +155,7 @@ OLAPStatus DeltaWriter::init() {
}
OLAPStatus DeltaWriter::write(Tuple* tuple) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
if (!_is_init && !_is_cancelled) {
RETURN_NOT_OK(init());
}
@@ -179,6 +181,28 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
return OLAP_SUCCESS;
}
+OLAPStatus DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row_idxs) {
+ std::lock_guard<std::mutex> l(_lock);
+ if (!_is_init && !_is_cancelled) {
+ RETURN_NOT_OK(init());
+ }
+
+ if (_is_cancelled) {
+ return OLAP_ERR_ALREADY_CANCELLED;
+ }
+
+ for (const auto& row_idx : row_idxs) {
+ _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
+ }
+
+ if (_mem_table->memory_usage() >= config::write_buffer_size) {
+ RETURN_NOT_OK(_flush_memtable_async());
+ _reset_mem_table();
+ }
+
+ return OLAP_SUCCESS;
+}
+
OLAPStatus DeltaWriter::_flush_memtable_async() {
if (++_segment_counter > config::max_segment_num_per_rowset) {
return OLAP_ERR_TOO_MANY_SEGMENTS;
@@ -187,7 +211,7 @@ OLAPStatus DeltaWriter::_flush_memtable_async() {
}
OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
if (!_is_init) {
// This writer is not initialized before flushing. Do nothing
// But we return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED,
@@ -220,7 +244,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
}
OLAPStatus DeltaWriter::wait_flush() {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
if (!_is_init) {
// return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED for same reason
// as described in flush_memtable_and_wait()
@@ -240,7 +264,7 @@ void DeltaWriter::_reset_mem_table() {
}
OLAPStatus DeltaWriter::close() {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
if (!_is_init && !_is_cancelled) {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
@@ -260,7 +284,7 @@ OLAPStatus DeltaWriter::close() {
}
OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait() being called";
@@ -328,7 +352,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
}
OLAPStatus DeltaWriter::cancel() {
- std::lock_guard<SpinLock> l(_lock);
+ std::lock_guard<std::mutex> l(_lock);
if (!_is_init || _is_cancelled) {
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 728e0b2..00e0436 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -21,17 +21,18 @@
#include "gen_cpp/internal_service.pb.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/tablet.h"
-#include "util/spinlock.h"
namespace doris {
class FlushToken;
class MemTable;
class MemTracker;
+class RowBatch;
class Schema;
class StorageEngine;
class Tuple;
class TupleDescriptor;
+class TupleRow;
class SlotDescriptor;
enum WriteType { LOAD = 1, LOAD_DELETE = 2, DELETE = 3 };
@@ -61,6 +62,7 @@ public:
OLAPStatus init();
OLAPStatus write(Tuple* tuple);
+ OLAPStatus write(const RowBatch* row_batch, const std::vector<int>& row_idxs);
// flush the last memtable to flush queue, must call it before close_wait()
OLAPStatus close();
// wait for all memtables to be flushed.
@@ -118,7 +120,7 @@ private:
// The counter of number of segment flushed already.
int64_t _segment_counter = 0;
- SpinLock _lock;
+ std::mutex _lock;
};
} // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index f74861f..a49d847 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -100,20 +100,29 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
}
RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get());
-
- // iterator all data
+ std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs;
for (int i = 0; i < params.tablet_ids_size(); ++i) {
- auto tablet_id = params.tablet_ids(i);
- auto it = _tablet_writers.find(tablet_id);
- if (it == std::end(_tablet_writers)) {
+ int64_t tablet_id = params.tablet_ids(i);
+ auto it = tablet_to_rowidxs.find(tablet_id);
+ if (it == tablet_to_rowidxs.end()) {
+ tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>{ i });
+ } else {
+ it->second.emplace_back(i);
+ }
+ }
+
+ for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
+ auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
+ if (tablet_writer_it == _tablet_writers.end()) {
return Status::InternalError(
- strings::Substitute("unknown tablet to append data, tablet=$0", tablet_id));
+ strings::Substitute("unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first));
}
- auto st = it->second->write(row_batch.get_row(i)->get_tuple(0));
+
+ OLAPStatus st = tablet_writer_it->second->write(&row_batch, tablet_to_rowidxs_it.second);
if (st != OLAP_SUCCESS) {
- const std::string& err_msg = strings::Substitute(
- "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2", it->first,
- _txn_id, st);
+ auto err_msg = strings::Substitute(
+ "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2",
+ tablet_to_rowidxs_it.first, _txn_id, st);
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}
diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp
index 80e03ad..f8b2374 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -73,6 +73,14 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
return add_status;
}
+OLAPStatus DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row_idxs) {
+ if (_k_tablet_recorder.find(_req.tablet_id) == std::end(_k_tablet_recorder)) {
+ _k_tablet_recorder[_req.tablet_id] = 0;
+ }
+ _k_tablet_recorder[_req.tablet_id] += row_idxs.size();
+ return add_status;
+}
+
OLAPStatus DeltaWriter::close() {
return OLAP_SUCCESS;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org