You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2023/06/29 02:04:03 UTC

[doris] branch master updated: [Bug](pipeline) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc (#21169)

This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f8cfe5e579 [Bug](pipeline) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc (#21169)
f8cfe5e579 is described below

commit f8cfe5e57959d4352d7593f900dd70229ceb7525
Author: Pxl <px...@qq.com>
AuthorDate: Thu Jun 29 10:03:57 2023 +0800

    [Bug](pipeline) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc (#21169)
    
    add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 7 +++++--
 be/src/util/ref_count_closure.h               | 2 +-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index e8b3f76fda..0326929e5c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -62,6 +62,8 @@ void ExchangeSinkBuffer::close() {
         pair.second->release_finst_id();
         pair.second->release_query_id();
     }
+    _instance_to_broadcast_package_queue.clear();
+    _instance_to_package_queue.clear();
     _instance_to_request.clear();
 }
 
@@ -146,7 +148,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
             send_now = true;
             _instance_to_sending_by_pipeline[ins_id.lo] = false;
         }
-        _instance_to_broadcast_package_queue[ins_id.lo].emplace(std::move(request));
+        _instance_to_broadcast_package_queue[ins_id.lo].emplace(request);
     }
     if (send_now) {
         RETURN_IF_ERROR(_send_rpc(ins_id.lo));
@@ -158,6 +160,8 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
 Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
 
+    DCHECK(_instance_to_sending_by_pipeline[id] == false);
+
     std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
             _instance_to_broadcast_package_queue[id];
@@ -257,7 +261,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         broadcast_q.pop();
     } else {
         _instance_to_sending_by_pipeline[id] = true;
-        return Status::OK();
     }
 
     return Status::OK();
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index fe6efa7615..d2fbd2fd14 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -31,7 +31,7 @@ template <typename T>
 class RefCountClosure : public google::protobuf::Closure {
 public:
     RefCountClosure() : _refs(0) {}
-    ~RefCountClosure() {}
+    ~RefCountClosure() override = default;
 
     void ref() { _refs.fetch_add(1); }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org