You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/24 15:04:05 UTC
[doris] branch master updated: [Bug](pipeline) access map may cause coredump in sink buffer (#21108)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 601120db04 [Bug](pipeline) access map may cause coredump in sink buffer (#21108)
601120db04 is described below
commit 601120db04325547cced54e12d38c6994755434d
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Sat Jun 24 23:03:59 2023 +0800
[Bug](pipeline) access map may cause coredump in sink buffer (#21108)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 25 ++++++++-----------------
be/src/pipeline/exec/exchange_sink_buffer.h | 6 +++---
2 files changed, 11 insertions(+), 20 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index c0f0e921e9..e8b3f76fda 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -59,11 +59,8 @@ ExchangeSinkBuffer::~ExchangeSinkBuffer() = default;
void ExchangeSinkBuffer::close() {
for (const auto& pair : _instance_to_request) {
- if (pair.second) {
- pair.second->release_finst_id();
- pair.second->release_query_id();
- delete pair.second;
- }
+ pair.second->release_finst_id();
+ pair.second->release_query_id();
}
_instance_to_request.clear();
}
@@ -104,10 +101,10 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
PUniqueId finst_id;
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
- _instance_to_finst_id[low_id] = finst_id;
_instance_to_sending_by_pipeline[low_id] = true;
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_time[low_id] = 0;
+ _construct_request(low_id, finst_id);
}
Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
@@ -173,10 +170,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
auto& request = q.front();
- if (!_instance_to_request[id]) {
- _construct_request(id);
- }
- auto brpc_request = _instance_to_request[id];
+ auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
if (request.block) {
@@ -220,10 +214,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
auto& request = broadcast_q.front();
- if (!_instance_to_request[id]) {
- _construct_request(id);
- }
- auto brpc_request = _instance_to_request[id];
+ auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
if (request.block_holder->get_block()) {
@@ -272,9 +263,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
return Status::OK();
}
-void ExchangeSinkBuffer::_construct_request(InstanceLoId id) {
- _instance_to_request[id] = new PTransmitDataParams();
- _instance_to_request[id]->set_allocated_finst_id(&_instance_to_finst_id[id]);
+void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) {
+ _instance_to_request[id] = std::make_unique<PTransmitDataParams>();
+ _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
_instance_to_request[id]->set_allocated_query_id(&_query_id);
_instance_to_request[id]->set_node_id(_dest_node_id);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h
index ccc5e2afff..dcea246f91 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -179,9 +179,9 @@ private:
_instance_to_broadcast_package_queue;
using PackageSeq = int64_t;
// must init zero
+ // TODO: make all flat_hash_map to a STRUT
phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
- phmap::flat_hash_map<InstanceLoId, PTransmitDataParams*> _instance_to_request;
- phmap::flat_hash_map<InstanceLoId, PUniqueId> _instance_to_finst_id;
+ phmap::flat_hash_map<InstanceLoId, std::unique_ptr<PTransmitDataParams>> _instance_to_request;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
@@ -197,7 +197,7 @@ private:
Status _send_rpc(InstanceLoId);
// must hold the _instance_to_package_queue_mutex[id] mutex to opera
- void _construct_request(InstanceLoId id);
+ void _construct_request(InstanceLoId id, PUniqueId);
inline void _ended(InstanceLoId id);
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org