You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/04/12 02:17:16 UTC

[incubator-doris] branch master updated: [fix][mem tracker] Fix MemTracker null pointer in vectorized (#8925)

This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 66d2f4e1fd [fix][mem tracker] Fix MemTracker null pointer in vectorized (#8925)
66d2f4e1fd is described below

commit 66d2f4e1fdc490536d383e0878ed2e3ad011bd6c
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Tue Apr 12 10:17:10 2022 +0800

    [fix][mem tracker] Fix MemTracker null pointer in vectorized (#8925)
    
    Fix ThreadMemTrackerMgr::update_tracker null pointer and some details.
    
    Issue Number: close #8920
---
 be/src/exec/exchange_node.cpp             |  1 +
 be/src/runtime/data_stream_recvr.h        |  1 +
 be/src/runtime/fold_constant_executor.cpp |  4 ++--
 be/src/runtime/thread_context.h           |  2 +-
 be/src/runtime/thread_mem_tracker_mgr.h   | 13 ++++++++-----
 be/src/vec/exec/vexchange_node.cpp        |  5 +++++
 be/src/vec/runtime/vdata_stream_recvr.h   |  1 +
 be/src/vec/sink/vtablet_sink.cpp          |  3 ++-
 8 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 083e40518e..d79170f912 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -212,6 +212,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next."));
 
+    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
     while ((_num_rows_skipped < _offset)) {
         _num_rows_skipped += output_batch->num_rows();
diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h
index 3b1edac081..28a4a9dadf 100644
--- a/be/src/runtime/data_stream_recvr.h
+++ b/be/src/runtime/data_stream_recvr.h
@@ -100,6 +100,7 @@ public:
     const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
     PlanNodeId dest_node_id() const { return _dest_node_id; }
     const RowDescriptor& row_desc() const { return _row_desc; }
+    const std::shared_ptr<MemTracker>& mem_tracker() const { return _mem_tracker; }
 
     void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) {
         _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp
index 9b5300f16a..274750b8b3 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -44,7 +44,6 @@ TUniqueId FoldConstantExecutor::_dummy_id;
 
 Status FoldConstantExecutor::fold_constant_expr(
         const TFoldConstantParams& params, PConstantExprResult* response) {
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const auto& expr_map = params.expr_map;
     auto expr_result_map = response->mutable_expr_result_map();
 
@@ -54,6 +53,7 @@ Status FoldConstantExecutor::fold_constant_expr(
     if (UNLIKELY(!status.ok())) {
         return status;
     }
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     for (const auto& m : expr_map) {
         PExprResultMap pexpr_result_map;
@@ -108,9 +108,9 @@ Status FoldConstantExecutor::fold_constant_vexpr(
     // init
     Status status = _init(query_globals);
     if (UNLIKELY(!status.ok())) {
-        LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg();
         return status;
     }
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     for (const auto& m : expr_map) {
         PExprResultMap pexpr_result_map;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index a7c8261930..4d9d60078c 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -156,7 +156,7 @@ private:
 // The func provided by pthread and std::thread doesn't help either.
 //
 // So, kudu Class-scoped static thread local implementation was introduced. Solve the above problem by
-// Thread-scopedthread local + Class-scoped thread local.
+// Thread-scoped thread local + Class-scoped thread local.
 //
 // This may look very trick, but it's the best way I can find.
 //
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index a709a536bf..79c152c243 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -65,6 +65,7 @@ public:
         _mem_trackers[0] = MemTracker::get_process_tracker();
         _untracked_mems[0] = 0;
         _tracker_id = 0;
+        _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
         start_thread_mem_tracker = true;
     }
     ~ThreadMemTrackerMgr() {
@@ -75,7 +76,8 @@ public:
     void clear_untracked_mems() {
         for (const auto& untracked_mem : _untracked_mems) {
             if (untracked_mem.second != 0) {
-                DCHECK(_mem_trackers[untracked_mem.first]) << ", label: " << _mem_tracker_labels[untracked_mem.first];
+                DCHECK(_mem_trackers[untracked_mem.first])
+                        << ", label: " << _mem_tracker_labels[untracked_mem.first];
                 if (_mem_trackers[untracked_mem.first]) {
                     _mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
                 } else {
@@ -195,7 +197,7 @@ inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTrac
     _untracked_mems[_tracker_id] += _untracked_mem;
     _untracked_mem = 0;
     std::swap(_tracker_id, _temp_tracker_id);
-    DCHECK(_mem_trackers[_tracker_id]);
+    DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
     return _temp_tracker_id; // old tracker_id
 }
 
@@ -204,7 +206,8 @@ inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) {
         _untracked_mems[_tracker_id] += _untracked_mem;
         _untracked_mem = 0;
         _tracker_id = tracker_id;
-        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
+        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end())
+                << ", label: " << _mem_tracker_labels[_tracker_id];
         DCHECK(_mem_trackers[_tracker_id]);
     }
 }
@@ -217,14 +220,14 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
     if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
         _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
         DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
+        // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion.
+        // Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses.
         start_thread_mem_tracker = false;
         // When switching to the current tracker last time, the remaining untracked memory.
         if (_untracked_mems[_tracker_id] != 0) {
             _untracked_mem += _untracked_mems[_tracker_id];
             _untracked_mems[_tracker_id] = 0;
         }
-        // Avoid getting stuck in infinite loop if there is memory allocation in noncache_consume.
-        // For example: GC function when try_consume; mem_limit_exceeded.
         noncache_consume();
         start_thread_mem_tracker = true;
     }
diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp
index 91b107904d..ea4e61e7fe 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -19,6 +19,7 @@
 
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 #include "vec/runtime/vdata_stream_recvr.h"
 
@@ -48,6 +49,7 @@ Status VExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status VExchangeNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     DCHECK_GT(_num_senders, 0);
     _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr());
     _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
@@ -63,6 +65,7 @@ Status VExchangeNode::prepare(RuntimeState* state) {
 }
 Status VExchangeNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
 
     if (_is_merging) {
@@ -80,6 +83,8 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
 
 Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     SCOPED_TIMER(runtime_profile()->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
+    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     auto status = _stream_recvr->get_next(block, eos);
     if (block != nullptr) {
         if (_num_rows_returned + block->rows() < _limit) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h
index 7e74944712..9c18cb6baa 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -73,6 +73,7 @@ public:
     const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
     PlanNodeId dest_node_id() const { return _dest_node_id; }
     const RowDescriptor& row_desc() const { return _row_desc; }
+    const std::shared_ptr<MemTracker>& mem_tracker() const { return _mem_tracker; }
 
     void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) {
         _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 58bb03c69c..0804cde38f 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -43,10 +43,11 @@ Status VOlapTableSink::init(const TDataSink& sink) {
 }
 
 Status VOlapTableSink::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(OlapTableSink::prepare(state));
     // Prepare the exprs to run.
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _input_row_desc,
                                                _expr_mem_tracker));
-    return OlapTableSink::prepare(state);
+    return Status::OK();
 }
 
 Status VOlapTableSink::open(RuntimeState* state) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org