You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/03 02:16:00 UTC
[incubator-doris] branch master updated: [fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6cc8762 [fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728)
6cc8762 is described below
commit 6cc8762ce76845bccdd2630122f0fdbcfcf5d207
Author: dataroaring <98...@users.noreply.github.com>
AuthorDate: Sun Apr 3 10:15:45 2022 +0800
[fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728)
The patch fixes two problems.
1. Memory order problem accessing _last_patch_processed_finished and in_flight, actually _last_patch_processed_finished is redundant, so the patch removes it.
2. synchronization in join on cid.
Fix for #8725.
---
be/src/exec/tablet_sink.cpp | 19 +++++++++++++++----
be/src/exec/tablet_sink.h | 31 +++++++++++++++++++++----------
2 files changed, 36 insertions(+), 14 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index bc616b4..2cfbbb6 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -457,14 +457,24 @@ int NodeChannel::try_send_and_fetch_status(RuntimeState* state,
if (!st.ok()) {
return 0;
}
- bool is_finished = true;
- if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 &&
- _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) {
+
+ if (!_add_batch_closure->try_set_in_flight()) {
+ return _send_finished ? 0 : 1;
+ }
+
+ // We are sure that try_send_batch is not running
+ if (_pending_batches_num > 0) {
auto s = thread_pool_token->submit_func(
std::bind(&NodeChannel::try_send_batch, this, state));
if (!s.ok()) {
_cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
+ // clear in flight
+ _add_batch_closure->clear_in_flight();
}
+ // in_flight is cleared in closure::Run
+ } else {
+ // clear in flight
+ _add_batch_closure->clear_in_flight();
}
return _send_finished ? 0 : 1;
}
@@ -495,6 +505,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
&compressed_bytes, _tuple_data_buffer_ptr);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
+ _add_batch_closure->clear_in_flight();
return;
}
if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
@@ -508,6 +519,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
cancel(fmt::format("{}, err: timeout", channel_info()));
+ _add_batch_closure->clear_in_flight();
return;
} else {
remain_ms = config::min_load_rpc_timeout_ms;
@@ -544,7 +556,6 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
_add_batch_closure);
_next_packet_seq++;
- _last_patch_processed_finished = true;
}
Status NodeChannel::none_of(std::initializer_list<bool> vars) {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 6749602..cf5ad40 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -100,22 +100,35 @@ public:
void addSuccessHandler(std::function<void(const T&, bool)> fn) { success_handler = fn; }
void join() {
- if (cid != INVALID_BTHREAD_ID && _packet_in_flight) {
- brpc::Join(cid);
+ // We rely on in_flight to assure one rpc is running,
+ // while cid is not reliable due to memory order.
+ // in_flight is written before getting callid,
+ // so we can not use memory fence to synchronize.
+ while (_packet_in_flight) {
+ // cid here is complicated
+ if (cid != INVALID_BTHREAD_ID) {
+ // actually cid may be the last rpc call id.
+ brpc::Join(cid);
+ }
+ if (_packet_in_flight) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
}
}
// plz follow this order: reset() -> set_in_flight() -> send brpc batch
void reset() {
- join();
- DCHECK(_packet_in_flight == false);
cntl.Reset();
cid = cntl.call_id();
}
- void set_in_flight() {
- DCHECK(_packet_in_flight == false);
- _packet_in_flight = true;
+ bool try_set_in_flight() {
+ bool value = false;
+ return _packet_in_flight.compare_exchange_strong(value, true);
+ }
+
+ void clear_in_flight() {
+ _packet_in_flight = false;
}
bool is_packet_in_flight() { return _packet_in_flight; }
@@ -134,7 +147,7 @@ public:
} else {
success_handler(result, _is_last_rpc);
}
- _packet_in_flight = false;
+ clear_in_flight();
}
brpc::Controller cntl;
@@ -245,8 +258,6 @@ private:
// add batches finished means the last rpc has be response, used to check whether this channel can be closed
std::atomic<bool> _add_batches_finished {false};
- std::atomic<bool> _last_patch_processed_finished {true};
-
bool _eos_is_produced {false}; // only for restricting producer behaviors
std::unique_ptr<RowDescriptor> _row_desc;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org