You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/01/25 13:37:57 UTC
(doris) 07/09: [fix](move-memtable) all sinks wait stream close for load timeout (#30356)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 90ed663eb00716906aeef90893ab8f170d6a7d8c
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Thu Jan 25 19:37:02 2024 +0800
[fix](move-memtable) all sinks wait stream close for load timeout (#30356)
---
be/src/common/config.cpp | 2 --
be/src/common/config.h | 2 --
be/src/vec/sink/load_stream_stub.cpp | 7 -------
be/src/vec/sink/load_stream_stub.h | 1 -
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 ++++-
be/src/vec/sink/writer/vtablet_writer_v2.h | 1 +
6 files changed, 5 insertions(+), 13 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 894ea80d6d1..355a90ab7f9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -766,8 +766,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
DEFINE_Bool(share_delta_writers, "true");
// timeout for open load stream rpc in ms
DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
-// timeout for load stream close wait in ms
-DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min
// brpc streaming max_buf_size in bytes
DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 79704aa0e9c..8db84b459b8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -819,8 +819,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
DECLARE_Bool(share_delta_writers);
// timeout for open load stream rpc in ms
DECLARE_Int64(open_load_stream_timeout_ms);
-// timeout for load stream close wait in ms
-DECLARE_Int64(close_load_stream_timeout_ms);
// brpc streaming max_buf_size in bytes
DECLARE_Int64(load_stream_max_buf_size);
diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp
index c4b10162299..5751e8308bd 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -316,13 +316,6 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
if (_is_closed.load()) {
return _check_cancel();
}
- // if there are other sinks remaining, let the last sink handle close wait
- if (_use_cnt > 0) {
- return Status::OK();
- }
- if (timeout_ms <= 0) {
- timeout_ms = config::close_load_stream_timeout_ms;
- }
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_close_mutex);
if (!_is_closed.load()) {
diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h
index c91f1016d35..3bc6331dc02 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -150,7 +150,6 @@ public:
// wait remote to close stream,
// remote will close stream when it receives CLOSE_LOAD
- // if timeout_ms <= 0, will fallback to config::close_load_stream_timeout_ms
Status close_wait(int64_t timeout_ms = 0);
// cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index c42f0955a2a..bb4b60a3a86 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -247,6 +247,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
RETURN_IF_ERROR(_init(state, profile));
+ _timeout_watch.start();
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -550,9 +551,11 @@ Status VTabletWriterV2::close(Status exec_status) {
{
SCOPED_TIMER(_close_load_timer);
+ auto remain_ms = _state->execution_timeout() * 1000 -
+ _timeout_watch.elapsed_time() / 1000 / 1000;
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : streams->streams()) {
- RETURN_IF_ERROR(stream->close_wait());
+ RETURN_IF_ERROR(stream->close_wait(remain_ms));
}
}
}
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 3b96b578d35..460b3acc33f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -186,6 +186,7 @@ private:
int64_t _number_output_rows = 0;
MonotonicStopWatch _row_distribution_watch;
+ MonotonicStopWatch _timeout_watch;
RuntimeProfile::Counter* _input_rows_counter = nullptr;
RuntimeProfile::Counter* _output_rows_counter = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org