You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/06 02:44:50 UTC
[incubator-doris] 04/19: [fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit af7958ce8a27d34ff7224afb895768e92a9f7806
Author: dataroaring <98...@users.noreply.github.com>
AuthorDate: Sun Apr 3 10:15:45 2022 +0800
[fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728)
The patch fixes two problems.
1. Memory order problem accessing _last_patch_processed_finished and in_flight, actually _last_patch_processed_finished is redundant, so the patch removes it.
2. synchronization in join on cid.
Fix for #8725.
---
be/src/exec/tablet_sink.cpp | 23 +++++++++++++++++------
be/src/exec/tablet_sink.h | 31 +++++++++++++++++++++----------
2 files changed, 38 insertions(+), 16 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 7cb1c18685..4a46d538ee 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -451,13 +451,24 @@ int NodeChannel::try_send_and_fetch_status(std::unique_ptr<ThreadPoolToken>& thr
if (!st.ok()) {
return 0;
}
- bool is_finished = true;
- if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 &&
- _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) {
- auto s = thread_pool_token->submit_func(std::bind(&NodeChannel::try_send_batch, this));
+
+ if (!_add_batch_closure->try_set_in_flight()) {
+ return _send_finished ? 0 : 1;
+ }
+
+ // We are sure that try_send_batch is not running
+ if (_pending_batches_num > 0) {
+ auto s = thread_pool_token->submit_func(
+ std::bind(&NodeChannel::try_send_batch, this));
if (!s.ok()) {
_cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
+ // clear in flight
+ _add_batch_closure->clear_in_flight();
}
+ // in_flight is cleared in closure::Run
+ } else {
+ // clear in flight
+ _add_batch_closure->clear_in_flight();
}
return _send_finished ? 0 : 1;
}
@@ -487,6 +498,7 @@ void NodeChannel::try_send_batch() {
&compressed_bytes, _tuple_data_buffer_ptr);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
+ _add_batch_closure->clear_in_flight();
return;
}
if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
@@ -500,6 +512,7 @@ void NodeChannel::try_send_batch() {
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
cancel(fmt::format("{}, err: timeout", channel_info()));
+ _add_batch_closure->clear_in_flight();
return;
} else {
remain_ms = config::min_load_rpc_timeout_ms;
@@ -531,12 +544,10 @@ void NodeChannel::try_send_batch() {
ReusableClosure<PTabletWriterAddBatchResult>>(
&request, _tuple_data_buffer, _add_batch_closure);
}
- _add_batch_closure->set_in_flight();
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result,
_add_batch_closure);
_next_packet_seq++;
- _last_patch_processed_finished = true;
}
Status NodeChannel::none_of(std::initializer_list<bool> vars) {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 242f45a1a6..a6af6c15e7 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -100,22 +100,35 @@ public:
void addSuccessHandler(std::function<void(const T&, bool)> fn) { success_handler = fn; }
void join() {
- if (cid != INVALID_BTHREAD_ID && _packet_in_flight) {
- brpc::Join(cid);
+ // We rely on in_flight to assure one rpc is running,
+ // while cid is not reliable due to memory order.
+ // in_flight is written before getting callid,
+ // so we can not use memory fence to synchronize.
+ while (_packet_in_flight) {
+ // cid here is complicated
+ if (cid != INVALID_BTHREAD_ID) {
+ // actually cid may be the last rpc call id.
+ brpc::Join(cid);
+ }
+ if (_packet_in_flight) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
}
}
// plz follow this order: reset() -> set_in_flight() -> send brpc batch
void reset() {
- join();
- DCHECK(_packet_in_flight == false);
cntl.Reset();
cid = cntl.call_id();
}
- void set_in_flight() {
- DCHECK(_packet_in_flight == false);
- _packet_in_flight = true;
+ bool try_set_in_flight() {
+ bool value = false;
+ return _packet_in_flight.compare_exchange_strong(value, true);
+ }
+
+ void clear_in_flight() {
+ _packet_in_flight = false;
}
bool is_packet_in_flight() { return _packet_in_flight; }
@@ -134,7 +147,7 @@ public:
} else {
success_handler(result, _is_last_rpc);
}
- _packet_in_flight = false;
+ clear_in_flight();
}
brpc::Controller cntl;
@@ -243,8 +256,6 @@ private:
// add batches finished means the last rpc has be response, used to check whether this channel can be closed
std::atomic<bool> _add_batches_finished {false};
- std::atomic<bool> _last_patch_processed_finished {true};
-
bool _eos_is_produced {false}; // only for restricting producer behaviors
std::unique_ptr<RowDescriptor> _row_desc;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org