You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/31 15:39:10 UTC

[doris] 12/20: [fix](vresultsink) BufferControlBlock may block all fragment handle threads (#16231)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit eadd33a6094aead5de4d95010a8f654d0fe18018
Author: chenlinzhong <49...@qq.com>
AuthorDate: Mon Jan 30 16:53:21 2023 +0800

    [fix](vresultsink) BufferControlBlock may block all fragment handle threads  (#16231)
    
    BufferControlBlock may block all fragment handle threads leads to be out of work
    
    modify include:
    
    BufferControlBlock cancel after max timeout
    StmtExcutor notify be to cancel the fragment when unexcepted occur
    more details see issue #16203
---
 be/src/runtime/result_buffer_mgr.cpp                           | 10 +++++++++-
 be/src/runtime/result_buffer_mgr.h                             |  3 ++-
 be/src/runtime/result_file_sink.cpp                            |  2 +-
 be/src/runtime/result_sink.cpp                                 |  2 +-
 be/src/runtime/runtime_state.h                                 |  1 +
 be/src/vec/sink/vresult_file_sink.cpp                          |  3 ++-
 be/src/vec/sink/vresult_sink.cpp                               |  5 +++--
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java  |  6 +++++-
 fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java |  6 ++++++
 9 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp
index ce589dd745..b9d6cc7234 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -58,7 +58,8 @@ Status ResultBufferMgr::init() {
 }
 
 Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size,
-                                      std::shared_ptr<BufferControlBlock>* sender) {
+                                      std::shared_ptr<BufferControlBlock>* sender,
+                                      int query_timeout) {
     *sender = find_control_block(query_id);
     if (*sender != nullptr) {
         LOG(WARNING) << "already have buffer control block for this instance " << query_id;
@@ -70,6 +71,13 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size
     {
         std::lock_guard<std::mutex> l(_lock);
         _buffer_map.insert(std::make_pair(query_id, control_block));
+        // BufferControlBlock should destroy after max_timeout
+        // for exceed max_timeout FE will return timeout to client
+        // otherwise in some case may block all fragment handle threads
+        // details see issue https://github.com/apache/doris/issues/16203
+        // add extra 5s for avoid corner case
+        int64_t max_timeout = time(nullptr) + query_timeout + 5;
+        cancel_at_time(max_timeout, query_id);
     }
     *sender = control_block;
     return Status::OK();
diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h
index 26a07bd90c..cc16e771a0 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -47,7 +47,8 @@ public:
     // the returned sender do not need release
     // sender is not used when call cancel or unregister
     Status create_sender(const TUniqueId& query_id, int buffer_size,
-                         std::shared_ptr<BufferControlBlock>* sender);
+                         std::shared_ptr<BufferControlBlock>* sender,
+                         int query_timeout);
     // fetch data, used by RPC
     Status fetch_data(const TUniqueId& fragment_id, TFetchDataResult* result);
 
diff --git a/be/src/runtime/result_file_sink.cpp b/be/src/runtime/result_file_sink.cpp
index cd3e61659a..e294def28f 100644
--- a/be/src/runtime/result_file_sink.cpp
+++ b/be/src/runtime/result_file_sink.cpp
@@ -100,7 +100,7 @@ Status ResultFileSink::prepare(RuntimeState* state) {
     if (_is_top_sink) {
         // create sender
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->fragment_instance_id(), _buf_size, &_sender));
+                state->fragment_instance_id(), _buf_size, &_sender, state->query_timeout()));
         // create writer
         _writer.reset(new (std::nothrow) FileResultWriter(
                 _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp
index de393e9e11..a19c44ea7c 100644
--- a/be/src/runtime/result_sink.cpp
+++ b/be/src/runtime/result_sink.cpp
@@ -67,7 +67,7 @@ Status ResultSink::prepare(RuntimeState* state) {
 
     // create sender
     RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(),
-                                                                   _buf_size, &_sender));
+                                                                   _buf_size, &_sender, state->query_timeout()));
 
     // create writer based on sink type
     switch (_sink_type) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index ca6dc42c4e..cdbfeff5fa 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -97,6 +97,7 @@ public:
         return _query_options.abort_on_default_limit_exceeded;
     }
     int max_errors() const { return _query_options.max_errors; }
+    int query_timeout() const { return _query_options.query_timeout; }
     int max_io_buffers() const { return _query_options.max_io_buffers; }
     int num_scanner_threads() const { return _query_options.num_scanner_threads; }
     TQueryType::type query_type() const { return _query_options.query_type; }
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
index 7bfc8b4c8a..948dfce4d0 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -103,7 +103,8 @@ Status VResultFileSink::prepare(RuntimeState* state) {
     if (_is_top_sink) {
         // create sender
         RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
-                state->fragment_instance_id(), _buf_size, &_sender));
+                state->fragment_instance_id(), _buf_size, &_sender,
+                state->query_timeout()));
         // create writer
         _writer.reset(new (std::nothrow) VFileResultWriter(
                 _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs,
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 77e64a8959..5d3fe0ec9a 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -61,8 +61,9 @@ Status VResultSink::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(prepare_exprs(state));
 
     // create sender
-    RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(),
-                                                                   _buf_size, &_sender));
+    RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
+            state->fragment_instance_id(), _buf_size, &_sender,
+            state->query_timeout()));
 
     // create writer based on sink type
     switch (_sink_type) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7d32530eb6..40b085505d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -940,6 +940,10 @@ public class Coordinator {
     // fragment,
     // if any, as well as all plan fragments on remote nodes.
     public void cancel() {
+        cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
+    }
+
+    public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
         lock();
         try {
             if (!queryStatus.ok()) {
@@ -949,7 +953,7 @@ public class Coordinator {
                 queryStatus.setStatus(Status.CANCELLED);
             }
             LOG.warn("cancel execution of query, this is outside invoke");
-            cancelInternal(Types.PPlanFragmentCancelReason.USER_CANCEL);
+            cancelInternal(cancelReason);
         } finally {
             unlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 30c5e1dbe4..55e6832794 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -100,6 +100,7 @@ import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.cache.Cache;
 import org.apache.doris.qe.cache.CacheAnalyzer;
@@ -1205,6 +1206,11 @@ public class StmtExecutor implements ProfileWriter {
             context.getState().setEof();
             plannerProfile.setQueryFetchResultFinishTime();
         } catch (Exception e) {
+            // notify all be cancel runing fragment
+            // in some case may block all fragment handle threads
+            // details see issue https://github.com/apache/doris/issues/16203
+            LOG.warn("cancel fragment query_id:{} cause {}", DebugUtil.printId(context.queryId()), e.getMessage());
+            coord.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
             fetchResultSpan.recordException(e);
             throw e;
         } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org