You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/09 01:02:41 UTC
[doris] 01/02: [fix](sink) fix OlapTableSink early close causes load failure #21545
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
commit c27558e235288b721b249318dbbe3ec72d835a0e
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Jul 7 14:03:54 2023 +0800
[fix](sink) fix OlapTableSink early close causes load failure #21545
---
be/src/vec/sink/vtablet_sink.cpp | 73 ++++++++++++++++++++++++++++------------
be/src/vec/sink/vtablet_sink.h | 5 ++-
2 files changed, 56 insertions(+), 22 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 0d1d260608..be7183428a 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -107,7 +107,7 @@ public:
void Run() override {
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
- if (cntl.Failed()) {
+ auto open_partition_failed = [this]() {
std::stringstream ss;
ss << "failed to open partition, error=" << berror(this->cntl.ErrorCode())
<< ", error_text=" << this->cntl.ErrorText();
@@ -117,6 +117,14 @@ public:
fmt::format("{}, open failed, err: {}",
vnode_channel->channel_info(), ss.str()),
-1);
+ };
+ if (cntl.Failed()) {
+ open_partition_failed();
+ } else {
+ Status status(result.status());
+ if (!status.ok()) {
+ open_partition_failed();
+ }
}
done = true;
}
@@ -903,14 +911,12 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
request.release_id();
}
-bool VNodeChannel::is_rpc_done() const {
+bool VNodeChannel::is_send_data_rpc_done() const {
if (_add_block_closure != nullptr) {
- return (_add_batches_finished ||
- (_cancelled && !_add_block_closure->is_packet_in_flight())) &&
- open_partition_finished();
+ return _add_batches_finished || (_cancelled && !_add_block_closure->is_packet_in_flight());
} else {
// such as, canceled before open_wait new closure.
- return (_add_batches_finished || _cancelled) && open_partition_finished();
+ return _add_batches_finished || _cancelled;
}
}
@@ -1122,6 +1128,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
}
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _input_row_desc));
+ _prepare = true;
return Status::OK();
}
@@ -1505,6 +1512,22 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
if (_try_close) {
return;
}
+
+ if (config::enable_lazy_open_partition && !_open_partition_done) {
+ // open_partition_finished must be before mark_close
+ bool open_partition_done = true;
+ for (const auto& index_channel : _channels) {
+ index_channel->for_each_node_channel(
+ [&open_partition_done](const std::shared_ptr<VNodeChannel>& ch) {
+ open_partition_done &= ch->open_partition_finished();
+ });
+ }
+ if (!open_partition_done) {
+ return;
+ }
+ _open_partition_done = true;
+ }
+
SCOPED_TIMER(_close_timer);
Status status = exec_status;
if (status.ok()) {
@@ -1540,11 +1563,14 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
}
bool VOlapTableSink::is_close_done() {
+ if (config::enable_lazy_open_partition && !_open_partition_done) {
+ return false;
+ }
bool close_done = true;
for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel(
[&close_done](const std::shared_ptr<VNodeChannel>& ch) {
- close_done &= ch->is_rpc_done();
+ close_done &= ch->is_send_data_rpc_done();
});
}
return close_done;
@@ -1554,15 +1580,29 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return _close_status;
}
- try_close(state, exec_status);
+ if (!_prepare) {
+ DCHECK(!exec_status.ok());
+ _cancel_all_channel(exec_status);
+ DataSink::close(state, exec_status);
+ _close_status = exec_status;
+ return _close_status;
+ }
+
SCOPED_TIMER(_close_timer);
+ SCOPED_TIMER(_profile->total_time_counter());
+
+ if (config::enable_lazy_open_partition) {
+ for (const auto& index_channel : _channels) {
+ index_channel->for_each_node_channel(
+ [](const std::shared_ptr<VNodeChannel>& ch) { ch->open_partition_wait(); });
+ }
+ _open_partition_done = true;
+ }
+
+ try_close(state, exec_status);
// If _close_status is not ok, all nodes have been canceled in try_close.
if (_close_status.ok()) {
- DCHECK(exec_status.ok());
auto status = Status::OK();
- // only if status is ok can we call this _profile->total_time_counter().
- // if status is not ok, this sink may not be prepared, so that _profile is null
- SCOPED_TIMER(_profile->total_time_counter());
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, actual_consume_ns = 0,
@@ -1571,15 +1611,6 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
num_node_channels = 0;
VNodeChannelStat channel_stat;
{
- if (config::enable_lazy_open_partition) {
- for (const auto& index_channel : _channels) {
- index_channel->for_each_node_channel(
- [](const std::shared_ptr<VNodeChannel>& ch) {
- ch->open_partition_wait();
- });
- }
- }
-
for (const auto& index_channel : _channels) {
if (!status.ok()) {
break;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index a3a6a91a0c..265ff47dcc 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -250,7 +250,7 @@ public:
// 2. just cancel()
void mark_close();
- bool is_rpc_done() const;
+ bool is_send_data_rpc_done() const;
bool is_closed() const { return _is_closed; }
bool is_cancelled() const { return _cancelled; }
@@ -647,6 +647,9 @@ private:
// Save the status of try_close() and close() method
Status _close_status;
bool _try_close = false;
+ bool _prepare = false;
+
+ std::atomic<bool> _open_partition_done {false};
// User can change this config at runtime, avoid it being modified during query or loading process.
bool _transfer_large_data_by_brpc = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org