You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/24 15:04:05 UTC

[doris] branch master updated: [Bug](pipeline) access map may cause coredump in sink buffer (#21108)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 601120db04 [Bug](pipeline) access map may cause coredump in sink buffer (#21108)
601120db04 is described below

commit 601120db04325547cced54e12d38c6994755434d
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Sat Jun 24 23:03:59 2023 +0800

    [Bug](pipeline) access map may cause coredump in sink buffer (#21108)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 25 ++++++++-----------------
 be/src/pipeline/exec/exchange_sink_buffer.h   |  6 +++---
 2 files changed, 11 insertions(+), 20 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index c0f0e921e9..e8b3f76fda 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -59,11 +59,8 @@ ExchangeSinkBuffer::~ExchangeSinkBuffer() = default;
 
 void ExchangeSinkBuffer::close() {
     for (const auto& pair : _instance_to_request) {
-        if (pair.second) {
-            pair.second->release_finst_id();
-            pair.second->release_query_id();
-            delete pair.second;
-        }
+        pair.second->release_finst_id();
+        pair.second->release_query_id();
     }
     _instance_to_request.clear();
 }
@@ -104,10 +101,10 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
     PUniqueId finst_id;
     finst_id.set_hi(fragment_instance_id.hi);
     finst_id.set_lo(fragment_instance_id.lo);
-    _instance_to_finst_id[low_id] = finst_id;
     _instance_to_sending_by_pipeline[low_id] = true;
     _instance_to_receiver_eof[low_id] = false;
     _instance_to_rpc_time[low_id] = 0;
+    _construct_request(low_id, finst_id);
 }
 
 Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
@@ -173,10 +170,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
     if (!q.empty()) {
         // If we have data to shuffle which is not broadcasted
         auto& request = q.front();
-        if (!_instance_to_request[id]) {
-            _construct_request(id);
-        }
-        auto brpc_request = _instance_to_request[id];
+        auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
         if (request.block) {
@@ -220,10 +214,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
     } else if (!broadcast_q.empty()) {
         // If we have data to shuffle which is broadcasted
         auto& request = broadcast_q.front();
-        if (!_instance_to_request[id]) {
-            _construct_request(id);
-        }
-        auto brpc_request = _instance_to_request[id];
+        auto& brpc_request = _instance_to_request[id];
         brpc_request->set_eos(request.eos);
         brpc_request->set_packet_seq(_instance_to_seq[id]++);
         if (request.block_holder->get_block()) {
@@ -272,9 +263,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
     return Status::OK();
 }
 
-void ExchangeSinkBuffer::_construct_request(InstanceLoId id) {
-    _instance_to_request[id] = new PTransmitDataParams();
-    _instance_to_request[id]->set_allocated_finst_id(&_instance_to_finst_id[id]);
+void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) {
+    _instance_to_request[id] = std::make_unique<PTransmitDataParams>();
+    _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id);
     _instance_to_request[id]->set_allocated_query_id(&_query_id);
 
     _instance_to_request[id]->set_node_id(_dest_node_id);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h
index ccc5e2afff..dcea246f91 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -179,9 +179,9 @@ private:
             _instance_to_broadcast_package_queue;
     using PackageSeq = int64_t;
     // must init zero
+    // TODO: make all flat_hash_map to a STRUT
     phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
-    phmap::flat_hash_map<InstanceLoId, PTransmitDataParams*> _instance_to_request;
-    phmap::flat_hash_map<InstanceLoId, PUniqueId> _instance_to_finst_id;
+    phmap::flat_hash_map<InstanceLoId, std::unique_ptr<PTransmitDataParams>> _instance_to_request;
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
     phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
@@ -197,7 +197,7 @@ private:
 
     Status _send_rpc(InstanceLoId);
     // must hold the _instance_to_package_queue_mutex[id] mutex to opera
-    void _construct_request(InstanceLoId id);
+    void _construct_request(InstanceLoId id, PUniqueId);
     inline void _ended(InstanceLoId id);
     inline void _failed(InstanceLoId id, const std::string& err);
     inline void _set_receiver_eof(InstanceLoId id);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org