You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/09 01:02:41 UTC

[doris] 01/02: [fix](sink) fix OlapTableSink early close causes load failure #21545

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c27558e235288b721b249318dbbe3ec72d835a0e
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Jul 7 14:03:54 2023 +0800

    [fix](sink) fix OlapTableSink early close causes load failure #21545
---
 be/src/vec/sink/vtablet_sink.cpp | 73 ++++++++++++++++++++++++++++------------
 be/src/vec/sink/vtablet_sink.h   |  5 ++-
 2 files changed, 56 insertions(+), 22 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 0d1d260608..be7183428a 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -107,7 +107,7 @@ public:
 
     void Run() override {
         SCOPED_TRACK_MEMORY_TO_UNKNOWN();
-        if (cntl.Failed()) {
+        auto open_partition_failed = [this]() {
             std::stringstream ss;
             ss << "failed to open partition, error=" << berror(this->cntl.ErrorCode())
                << ", error_text=" << this->cntl.ErrorText();
@@ -117,6 +117,14 @@ public:
                                           fmt::format("{}, open failed, err: {}",
                                                       vnode_channel->channel_info(), ss.str()),
                                           -1);
+        };
+        if (cntl.Failed()) {
+            open_partition_failed();
+        } else {
+            Status status(result.status());
+            if (!status.ok()) {
+                open_partition_failed();
+            }
         }
         done = true;
     }
@@ -903,14 +911,12 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
     request.release_id();
 }
 
-bool VNodeChannel::is_rpc_done() const {
+bool VNodeChannel::is_send_data_rpc_done() const {
     if (_add_block_closure != nullptr) {
-        return (_add_batches_finished ||
-                (_cancelled && !_add_block_closure->is_packet_in_flight())) &&
-               open_partition_finished();
+        return _add_batches_finished || (_cancelled && !_add_block_closure->is_packet_in_flight());
     } else {
         // such as, canceled before open_wait new closure.
-        return (_add_batches_finished || _cancelled) && open_partition_finished();
+        return _add_batches_finished || _cancelled;
     }
 }
 
@@ -1122,6 +1128,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
     }
     // Prepare the exprs to run.
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _input_row_desc));
+    _prepare = true;
     return Status::OK();
 }
 
@@ -1505,6 +1512,22 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
     if (_try_close) {
         return;
     }
+
+    if (config::enable_lazy_open_partition && !_open_partition_done) {
+        // open_partition_finished must be before mark_close
+        bool open_partition_done = true;
+        for (const auto& index_channel : _channels) {
+            index_channel->for_each_node_channel(
+                    [&open_partition_done](const std::shared_ptr<VNodeChannel>& ch) {
+                        open_partition_done &= ch->open_partition_finished();
+                    });
+        }
+        if (!open_partition_done) {
+            return;
+        }
+        _open_partition_done = true;
+    }
+
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
     if (status.ok()) {
@@ -1540,11 +1563,14 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
 }
 
 bool VOlapTableSink::is_close_done() {
+    if (config::enable_lazy_open_partition && !_open_partition_done) {
+        return false;
+    }
     bool close_done = true;
     for (const auto& index_channel : _channels) {
         index_channel->for_each_node_channel(
                 [&close_done](const std::shared_ptr<VNodeChannel>& ch) {
-                    close_done &= ch->is_rpc_done();
+                    close_done &= ch->is_send_data_rpc_done();
                 });
     }
     return close_done;
@@ -1554,15 +1580,29 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
         return _close_status;
     }
-    try_close(state, exec_status);
+    if (!_prepare) {
+        DCHECK(!exec_status.ok());
+        _cancel_all_channel(exec_status);
+        DataSink::close(state, exec_status);
+        _close_status = exec_status;
+        return _close_status;
+    }
+
     SCOPED_TIMER(_close_timer);
+    SCOPED_TIMER(_profile->total_time_counter());
+
+    if (config::enable_lazy_open_partition) {
+        for (const auto& index_channel : _channels) {
+            index_channel->for_each_node_channel(
+                    [](const std::shared_ptr<VNodeChannel>& ch) { ch->open_partition_wait(); });
+        }
+        _open_partition_done = true;
+    }
+
+    try_close(state, exec_status);
     // If _close_status is not ok, all nodes have been canceled in try_close.
     if (_close_status.ok()) {
-        DCHECK(exec_status.ok());
         auto status = Status::OK();
-        // only if status is ok can we call this _profile->total_time_counter().
-        // if status is not ok, this sink may not be prepared, so that _profile is null
-        SCOPED_TIMER(_profile->total_time_counter());
         // BE id -> add_batch method counter
         std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
         int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, actual_consume_ns = 0,
@@ -1571,15 +1611,6 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
                 num_node_channels = 0;
         VNodeChannelStat channel_stat;
         {
-            if (config::enable_lazy_open_partition) {
-                for (const auto& index_channel : _channels) {
-                    index_channel->for_each_node_channel(
-                            [](const std::shared_ptr<VNodeChannel>& ch) {
-                                ch->open_partition_wait();
-                            });
-                }
-            }
-
             for (const auto& index_channel : _channels) {
                 if (!status.ok()) {
                     break;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index a3a6a91a0c..265ff47dcc 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -250,7 +250,7 @@ public:
     // 2. just cancel()
     void mark_close();
 
-    bool is_rpc_done() const;
+    bool is_send_data_rpc_done() const;
 
     bool is_closed() const { return _is_closed; }
     bool is_cancelled() const { return _cancelled; }
@@ -647,6 +647,9 @@ private:
     // Save the status of try_close() and close() method
     Status _close_status;
     bool _try_close = false;
+    bool _prepare = false;
+
+    std::atomic<bool> _open_partition_done {false};
 
     // User can change this config at runtime, avoid it being modified during query or loading process.
     bool _transfer_large_data_by_brpc = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org