You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/06 02:44:50 UTC

[incubator-doris] 04/19: [fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit af7958ce8a27d34ff7224afb895768e92a9f7806
Author: dataroaring <98...@users.noreply.github.com>
AuthorDate: Sun Apr 3 10:15:45 2022 +0800

    [fix](load) fix concurrent synchronization problem in NodeChannel::try_send_batch (#8728)
    
    The patch fixes two problems.
    1. Memory order problem accessing _last_patch_processed_finished and in_flight, actually _last_patch_processed_finished is redundant, so the patch removes it.
    2. synchronization in join on cid.
    
    Fix for #8725.
---
 be/src/exec/tablet_sink.cpp | 23 +++++++++++++++++------
 be/src/exec/tablet_sink.h   | 31 +++++++++++++++++++++----------
 2 files changed, 38 insertions(+), 16 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 7cb1c18685..4a46d538ee 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -451,13 +451,24 @@ int NodeChannel::try_send_and_fetch_status(std::unique_ptr<ThreadPoolToken>& thr
     if (!st.ok()) {
         return 0;
     }
-    bool is_finished = true;
-    if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 &&
-        _last_patch_processed_finished.compare_exchange_strong(is_finished, false)) {
-        auto s = thread_pool_token->submit_func(std::bind(&NodeChannel::try_send_batch, this));
+
+    if (!_add_batch_closure->try_set_in_flight()) {
+        return _send_finished ? 0 : 1;
+    }
+
+    // We are sure that try_send_batch is not running
+    if (_pending_batches_num > 0) {
+        auto s = thread_pool_token->submit_func(
+                std::bind(&NodeChannel::try_send_batch, this));
         if (!s.ok()) {
             _cancel_with_msg("submit send_batch task to send_batch_thread_pool failed");
+            // clear in flight
+            _add_batch_closure->clear_in_flight();
         }
+        // in_flight is cleared in closure::Run
+    } else {
+        // clear in flight
+        _add_batch_closure->clear_in_flight();
     }
     return _send_finished ? 0 : 1;
 }
@@ -487,6 +498,7 @@ void NodeChannel::try_send_batch() {
                                          &compressed_bytes, _tuple_data_buffer_ptr);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
+            _add_batch_closure->clear_in_flight();
             return;
         }
         if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
@@ -500,6 +512,7 @@ void NodeChannel::try_send_batch() {
     if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
         if (remain_ms <= 0 && !request.eos()) {
             cancel(fmt::format("{}, err: timeout", channel_info()));
+            _add_batch_closure->clear_in_flight();
             return;
         } else {
             remain_ms = config::min_load_rpc_timeout_ms;
@@ -531,12 +544,10 @@ void NodeChannel::try_send_batch() {
                                               ReusableClosure<PTabletWriterAddBatchResult>>(
                 &request, _tuple_data_buffer, _add_batch_closure);
     }
-    _add_batch_closure->set_in_flight();
     _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result,
                                    _add_batch_closure);
 
     _next_packet_seq++;
-    _last_patch_processed_finished = true;
 }
 
 Status NodeChannel::none_of(std::initializer_list<bool> vars) {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 242f45a1a6..a6af6c15e7 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -100,22 +100,35 @@ public:
     void addSuccessHandler(std::function<void(const T&, bool)> fn) { success_handler = fn; }
 
     void join() {
-        if (cid != INVALID_BTHREAD_ID && _packet_in_flight) {
-            brpc::Join(cid);
+        // We rely on in_flight to assure one rpc is running,
+        // while cid is not reliable due to memory order.
+        // in_flight is written before getting callid,
+        // so we can not use memory fence to synchronize.
+        while (_packet_in_flight) {
+            // cid here is complicated
+            if (cid != INVALID_BTHREAD_ID) {
+                // actually cid may be the last rpc call id.
+                brpc::Join(cid);
+            }
+            if (_packet_in_flight) {
+                SleepFor(MonoDelta::FromMilliseconds(10));
+            }
         }
     }
 
     // plz follow this order: reset() -> set_in_flight() -> send brpc batch
     void reset() {
-        join();
-        DCHECK(_packet_in_flight == false);
         cntl.Reset();
         cid = cntl.call_id();
     }
 
-    void set_in_flight() {
-        DCHECK(_packet_in_flight == false);
-        _packet_in_flight = true;
+    bool try_set_in_flight() {
+        bool value = false;
+        return _packet_in_flight.compare_exchange_strong(value, true);
+    }
+
+    void clear_in_flight() {
+        _packet_in_flight = false;
     }
 
     bool is_packet_in_flight() { return _packet_in_flight; }
@@ -134,7 +147,7 @@ public:
         } else {
             success_handler(result, _is_last_rpc);
         }
-        _packet_in_flight = false;
+        clear_in_flight();
     }
 
     brpc::Controller cntl;
@@ -243,8 +256,6 @@ private:
     // add batches finished means the last rpc has be response, used to check whether this channel can be closed
     std::atomic<bool> _add_batches_finished {false};
 
-    std::atomic<bool> _last_patch_processed_finished {true};
-
     bool _eos_is_produced {false}; // only for restricting producer behaviors
 
     std::unique_ptr<RowDescriptor> _row_desc;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org