You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2023/06/29 02:04:03 UTC
[doris] branch master updated: [Bug](pipeline) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc (#21169)
This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f8cfe5e579 [Bug](pipeline) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc (#21169)
f8cfe5e579 is described below
commit f8cfe5e57959d4352d7593f900dd70229ceb7525
Author: Pxl <px...@qq.com>
AuthorDate: Thu Jun 29 10:03:57 2023 +0800
[Bug](pipeline) add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc (#21169)
add DCHECK for _instance_to_sending_by_pipeline = false on _send_rpc
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 7 +++++--
be/src/util/ref_count_closure.h | 2 +-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index e8b3f76fda..0326929e5c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -62,6 +62,8 @@ void ExchangeSinkBuffer::close() {
pair.second->release_finst_id();
pair.second->release_query_id();
}
+ _instance_to_broadcast_package_queue.clear();
+ _instance_to_package_queue.clear();
_instance_to_request.clear();
}
@@ -146,7 +148,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
send_now = true;
_instance_to_sending_by_pipeline[ins_id.lo] = false;
}
- _instance_to_broadcast_package_queue[ins_id.lo].emplace(std::move(request));
+ _instance_to_broadcast_package_queue[ins_id.lo].emplace(request);
}
if (send_now) {
RETURN_IF_ERROR(_send_rpc(ins_id.lo));
@@ -158,6 +160,8 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
+ DCHECK(_instance_to_sending_by_pipeline[id] == false);
+
std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
@@ -257,7 +261,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
broadcast_q.pop();
} else {
_instance_to_sending_by_pipeline[id] = true;
- return Status::OK();
}
return Status::OK();
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index fe6efa7615..d2fbd2fd14 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -31,7 +31,7 @@ template <typename T>
class RefCountClosure : public google::protobuf::Closure {
public:
RefCountClosure() : _refs(0) {}
- ~RefCountClosure() {}
+ ~RefCountClosure() override = default;
void ref() { _refs.fetch_add(1); }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org