You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/01/25 13:37:57 UTC

(doris) 07/09: [fix](move-memtable) all sinks wait stream close for load timeout (#30356)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 90ed663eb00716906aeef90893ab8f170d6a7d8c
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Thu Jan 25 19:37:02 2024 +0800

    [fix](move-memtable) all sinks wait stream close for load timeout (#30356)
---
 be/src/common/config.cpp                     | 2 --
 be/src/common/config.h                       | 2 --
 be/src/vec/sink/load_stream_stub.cpp         | 7 -------
 be/src/vec/sink/load_stream_stub.h           | 1 -
 be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 ++++-
 be/src/vec/sink/writer/vtablet_writer_v2.h   | 1 +
 6 files changed, 5 insertions(+), 13 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 894ea80d6d1..355a90ab7f9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -766,8 +766,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
 DEFINE_Bool(share_delta_writers, "true");
 // timeout for open load stream rpc in ms
 DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
-// timeout for load stream close wait in ms
-DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min
 
 // brpc streaming max_buf_size in bytes
 DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 79704aa0e9c..8db84b459b8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -819,8 +819,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
 DECLARE_Bool(share_delta_writers);
 // timeout for open load stream rpc in ms
 DECLARE_Int64(open_load_stream_timeout_ms);
-// timeout for load stream close wait in ms
-DECLARE_Int64(close_load_stream_timeout_ms);
 
 // brpc streaming max_buf_size in bytes
 DECLARE_Int64(load_stream_max_buf_size);
diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp
index c4b10162299..5751e8308bd 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -316,13 +316,6 @@ Status LoadStreamStub::close_wait(int64_t timeout_ms) {
     if (_is_closed.load()) {
         return _check_cancel();
     }
-    // if there are other sinks remaining, let the last sink handle close wait
-    if (_use_cnt > 0) {
-        return Status::OK();
-    }
-    if (timeout_ms <= 0) {
-        timeout_ms = config::close_load_stream_timeout_ms;
-    }
     DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
     std::unique_lock<bthread::Mutex> lock(_close_mutex);
     if (!_is_closed.load()) {
diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h
index c91f1016d35..3bc6331dc02 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -150,7 +150,6 @@ public:
 
     // wait remote to close stream,
     // remote will close stream when it receives CLOSE_LOAD
-    // if timeout_ms <= 0, will fallback to config::close_load_stream_timeout_ms
     Status close_wait(int64_t timeout_ms = 0);
 
     // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index c42f0955a2a..bb4b60a3a86 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -247,6 +247,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
 
 Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
     RETURN_IF_ERROR(_init(state, profile));
+    _timeout_watch.start();
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_TIMER(_open_timer);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -550,9 +551,11 @@ Status VTabletWriterV2::close(Status exec_status) {
 
         {
             SCOPED_TIMER(_close_load_timer);
+            auto remain_ms = _state->execution_timeout() * 1000 -
+                             _timeout_watch.elapsed_time() / 1000 / 1000;
             for (const auto& [_, streams] : _streams_for_node) {
                 for (const auto& stream : streams->streams()) {
-                    RETURN_IF_ERROR(stream->close_wait());
+                    RETURN_IF_ERROR(stream->close_wait(remain_ms));
                 }
             }
         }
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 3b96b578d35..460b3acc33f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -186,6 +186,7 @@ private:
     int64_t _number_output_rows = 0;
 
     MonotonicStopWatch _row_distribution_watch;
+    MonotonicStopWatch _timeout_watch;
 
     RuntimeProfile::Counter* _input_rows_counter = nullptr;
     RuntimeProfile::Counter* _output_rows_counter = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org