You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/08 01:02:32 UTC
[incubator-doris] branch master updated: [feature-wip] (memory tracker) (step4) Switch TLS mem tracker to separate more detailed memory usage (#8669)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 519305cb22 [feature-wip] (memory tracker) (step4) Switch TLS mem tracker to separate more detailed memory usage (#8669)
519305cb22 is described below
commit 519305cb229c0fa2abbddd4dd814e26dffe6e029
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Apr 8 09:02:26 2022 +0800
[feature-wip] (memory tracker) (step4) Switch TLS mem tracker to separate more detailed memory usage (#8669)
Based on #8605, Separate out the memory usage of each operator from the Query/Load/StorageEngine mem tracker.
---
be/src/common/config.h | 4 ++
be/src/exec/analytic_eval_node.cpp | 3 +
be/src/exec/assert_num_rows_node.cpp | 2 +
be/src/exec/base_scanner.cpp | 19 +++---
be/src/exec/blocking_join_node.cpp | 5 +-
be/src/exec/broker_scan_node.cpp | 4 +-
be/src/exec/cross_join_node.cpp | 3 +-
be/src/exec/csv_scan_node.cpp | 5 +-
be/src/exec/es_http_scan_node.cpp | 4 +-
be/src/exec/es_scan_node.cpp | 3 +
be/src/exec/except_node.cpp | 5 +-
be/src/exec/exchange_node.cpp | 3 +
be/src/exec/exec_node.cpp | 1 -
be/src/exec/exec_node.h | 2 +-
be/src/exec/hash_join_node.cpp | 4 +-
be/src/exec/intersect_node.cpp | 5 +-
be/src/exec/merge_join_node.cpp | 3 +
be/src/exec/merge_node.cpp | 3 +
be/src/exec/mysql_scan_node.cpp | 5 +-
be/src/exec/odbc_scan_node.cpp | 5 +-
be/src/exec/olap_scan_node.cpp | 5 +-
be/src/exec/olap_scanner.cpp | 3 +
be/src/exec/olap_scanner.h | 2 +
be/src/exec/partitioned_aggregation_node.cc | 3 +
be/src/exec/repeat_node.cpp | 3 +
be/src/exec/schema_scan_node.cpp | 3 +
be/src/exec/select_node.cpp | 3 +
be/src/exec/set_operation_node.cpp | 7 +-
be/src/exec/spill_sort_node.cc | 3 +
be/src/exec/table_function_node.cpp | 3 +
be/src/exec/tablet_sink.cpp | 15 +++++
be/src/exec/topn_node.cpp | 3 +
be/src/exec/union_node.cpp | 3 +
be/src/exprs/expr_context.cpp | 6 ++
be/src/olap/base_compaction.cpp | 1 -
be/src/olap/delta_writer.cpp | 8 +--
be/src/olap/lru_cache.cpp | 9 +--
be/src/olap/lru_cache.h | 2 +-
be/src/olap/memtable.cpp | 5 +-
be/src/olap/memtable.h | 3 +-
be/src/olap/memtable_flush_executor.cpp | 2 +
be/src/olap/page_cache.cpp | 2 +
be/src/olap/schema_change.cpp | 21 +-----
be/src/olap/schema_change.h | 2 -
be/src/olap/snapshot_manager.cpp | 4 ++
be/src/olap/snapshot_manager.h | 1 -
be/src/olap/storage_engine.cpp | 5 ++
be/src/olap/storage_engine.h | 1 +
be/src/olap/task/engine_alter_tablet_task.cpp | 2 +
be/src/olap/task/engine_batch_load_task.cpp | 2 +
be/src/olap/task/engine_checksum_task.cpp | 2 +
be/src/olap/task/engine_clone_task.cpp | 2 +
be/src/runtime/data_stream_recvr.cc | 7 ++
be/src/runtime/data_stream_sender.cpp | 7 +-
be/src/runtime/disk_io_mgr.cc | 9 +++
be/src/runtime/fold_constant_executor.cpp | 2 +
be/src/runtime/load_channel.cpp | 4 ++
be/src/runtime/load_channel_mgr.cpp | 5 ++
be/src/runtime/mem_tracker.cpp | 3 +-
be/src/runtime/mem_tracker.h | 4 +-
be/src/runtime/memory/chunk_allocator.cpp | 5 +-
be/src/runtime/runtime_filter_mgr.cpp | 9 ++-
be/src/runtime/tablets_channel.cpp | 5 ++
be/src/runtime/thread_context.h | 78 +++++++++++++++++------
be/src/runtime/thread_mem_tracker_mgr.cpp | 15 +++--
be/src/runtime/thread_mem_tracker_mgr.h | 92 ++++++++++++++++++---------
be/src/vec/exec/join/vhash_join_node.cpp | 5 +-
be/src/vec/exec/vaggregation_node.cpp | 13 ++--
be/src/vec/exec/vanalytic_eval_node.cpp | 3 +
be/src/vec/exec/vblocking_join_node.cpp | 5 +-
be/src/vec/exec/vcross_join_node.cpp | 5 +-
be/src/vec/exec/volap_scan_node.cpp | 3 +-
be/src/vec/exec/volap_scanner.cpp | 1 +
be/src/vec/exec/vset_operation_node.cpp | 5 +-
be/src/vec/exec/vsort_node.cpp | 3 +
be/src/vec/exprs/vexpr_context.cpp | 4 ++
be/src/vec/runtime/vdata_stream_recvr.cpp | 6 ++
be/src/vec/sink/vdata_stream_sender.cpp | 4 ++
be/src/vec/sink/vtablet_sink.cpp | 2 +
79 files changed, 378 insertions(+), 150 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 52f7510c4f..a93229cdf8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -614,6 +614,10 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16");
// Whether Hook TCmalloc new/delete, currently consume/release tls mem tracker in Hook.
CONF_Bool(track_new_delete, "true");
+// If true, switch TLS MemTracker to count more detailed memory,
+// including caches such as ExecNode operators and TabletManager.
+CONF_Bool(memory_verbose_track, "true");
+
// Default level of MemTracker to show in web page
// now MemTracker support two level:
// OVERVIEW: 0
diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp
index 3741241bed..d7d8726267 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -140,6 +140,7 @@ Status AnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status AnalyticEvalNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
_child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0];
_curr_tuple_pool.reset(new MemPool(mem_tracker().get()));
@@ -184,6 +185,7 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {
Status AnalyticEvalNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
//RETURN_IF_ERROR(QueryMaintenance(state));
@@ -812,6 +814,7 @@ inline int64_t AnalyticEvalNode::num_output_rows_ready() const {
Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
//RETURN_IF_ERROR(QueryMaintenance(state));
diff --git a/be/src/exec/assert_num_rows_node.cpp b/be/src/exec/assert_num_rows_node.cpp
index 6c84dfc1f0..e101b245ea 100644
--- a/be/src/exec/assert_num_rows_node.cpp
+++ b/be/src/exec/assert_num_rows_node.cpp
@@ -49,6 +49,7 @@ Status AssertNumRowsNode::prepare(RuntimeState* state) {
Status AssertNumRowsNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
// ISSUE-3435
RETURN_IF_ERROR(child(0)->open(state));
@@ -58,6 +59,7 @@ Status AssertNumRowsNode::open(RuntimeState* state) {
Status AssertNumRowsNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
output_batch->reset();
child(0)->get_next(state, output_batch, eos);
_num_rows_returned += output_batch->num_rows();
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 92bb10886e..06331023e8 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -39,6 +39,15 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
_counter(counter),
_src_tuple(nullptr),
_src_tuple_row(nullptr),
+#if BE_TEST
+ _mem_tracker(new MemTracker()),
+#else
+ _mem_tracker(MemTracker::create_tracker(
+ -1, state->query_type() == TQueryType::LOAD
+ ? "BaseScanner:" + std::to_string(state->load_job_id())
+ : "BaseScanner:Select")),
+#endif
+ _mem_pool(std::make_unique<MemPool>(_mem_tracker.get())),
_dest_tuple_desc(nullptr),
_pre_filter_texprs(pre_filter_texprs),
_strict_mode(false),
@@ -48,15 +57,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
_read_timer(nullptr),
_materialize_timer(nullptr),
_success(false),
- _scanner_eof(false) {
-#ifndef BE_TEST
- _mem_pool.reset(new MemPool(state->query_type() == TQueryType::LOAD
- ? "BaseScanner:" + std::to_string(state->load_job_id())
- : "BaseScanner:Select"));
-#else
- _mem_pool.reset(new MemPool());
-#endif
-}
+ _scanner_eof(false) {}
Status BaseScanner::open() {
RETURN_IF_ERROR(init_expr_ctxes());
diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp
index 57196713a2..432606c5c6 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -23,7 +23,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
namespace doris {
@@ -46,6 +45,7 @@ BlockingJoinNode::~BlockingJoinNode() {
Status BlockingJoinNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_build_pool.reset(new MemPool(mem_tracker().get()));
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
@@ -88,8 +88,9 @@ void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Statu
}
Status BlockingJoinNode::open(RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+ RETURN_IF_ERROR(ExecNode::open(state));
// RETURN_IF_ERROR(Expr::open(_conjuncts, state));
RETURN_IF_CANCELLED(state);
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 6e156f8d83..e16410e0f5 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -30,7 +30,6 @@
#include "runtime/dpp_sink_internal.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
namespace doris {
@@ -61,6 +60,7 @@ Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status BrokerScanNode::prepare(RuntimeState* state) {
VLOG_QUERY << "BrokerScanNode prepare";
RETURN_IF_ERROR(ScanNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// get tuple desc
_runtime_state = state;
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -88,6 +88,7 @@ Status BrokerScanNode::prepare(RuntimeState* state) {
Status BrokerScanNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
@@ -108,6 +109,7 @@ Status BrokerScanNode::start_scanners() {
Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// check if CANCELLED.
if (state->is_cancelled()) {
std::unique_lock<std::mutex> l(_batch_queue_lock);
diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp
index 5def58a4cc..3ba307fd0c 100644
--- a/be/src/exec/cross_join_node.cpp
+++ b/be/src/exec/cross_join_node.cpp
@@ -23,7 +23,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
@@ -35,6 +34,7 @@ CrossJoinNode::CrossJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Des
Status CrossJoinNode::prepare(RuntimeState* state) {
DCHECK(_join_op == TJoinOp::CROSS_JOIN);
RETURN_IF_ERROR(BlockingJoinNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_build_batch_pool.reset(new ObjectPool());
return Status::OK();
}
@@ -89,6 +89,7 @@ Status CrossJoinNode::get_next(RuntimeState* state, RowBatch* output_batch, bool
// TOOD(zhaochun)
// RETURN_IF_ERROR(state->check_query_state());
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (reached_limit() || _eos) {
*eos = true;
diff --git a/be/src/exec/csv_scan_node.cpp b/be/src/exec/csv_scan_node.cpp
index e262bda786..f8a1a80437 100644
--- a/be/src/exec/csv_scan_node.cpp
+++ b/be/src/exec/csv_scan_node.cpp
@@ -128,6 +128,7 @@ Status CsvScanNode::prepare(RuntimeState* state) {
}
RETURN_IF_ERROR(ScanNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// add timer
_split_check_timer = ADD_TIMER(_runtime_profile, "split check timer");
@@ -210,6 +211,8 @@ Status CsvScanNode::prepare(RuntimeState* state) {
}
Status CsvScanNode::open(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
VLOG_CRITICAL << "CsvScanNode::Open";
@@ -225,7 +228,6 @@ Status CsvScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
- SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(_csv_scanner->open());
return Status::OK();
@@ -244,6 +246,7 @@ Status CsvScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (reached_limit()) {
*eos = true;
diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index ba91ee02c8..be18743a46 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -30,7 +30,6 @@
#include "runtime/dpp_sink_internal.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/runtime_profile.h"
@@ -68,6 +67,7 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status EsHttpScanNode::prepare(RuntimeState* state) {
VLOG_QUERY << "EsHttpScanNode prepare";
RETURN_IF_ERROR(ScanNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_scanner_profile.reset(new RuntimeProfile("EsHttpScanNode"));
runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
@@ -124,6 +124,7 @@ Status EsHttpScanNode::build_conjuncts_list() {
Status EsHttpScanNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
@@ -199,6 +200,7 @@ Status EsHttpScanNode::collect_scanners_status() {
Status EsHttpScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (state->is_cancelled()) {
std::unique_lock<std::mutex> l(_batch_queue_lock);
if (update_status(Status::Cancelled("Cancelled"))) {
diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp
index 4ba08bf554..5f7ddb1b53 100644
--- a/be/src/exec/es_scan_node.cpp
+++ b/be/src/exec/es_scan_node.cpp
@@ -67,6 +67,7 @@ Status EsScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << "EsScanNode::Prepare";
RETURN_IF_ERROR(ScanNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
std::stringstream ss;
@@ -85,6 +86,7 @@ Status EsScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
// TExtOpenParams.row_schema
@@ -205,6 +207,7 @@ Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// create tuple
MemPool* tuple_pool = row_batch->tuple_data_pool();
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index ec3e451a50..893a8f8d71 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -21,7 +21,6 @@
#include "exprs/expr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
namespace doris {
ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -40,8 +39,9 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
Status ExceptNode::open(RuntimeState* state) {
- RETURN_IF_ERROR(SetOperationNode::open(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Except Node, while probing the hash table.");
+ RETURN_IF_ERROR(SetOperationNode::open(state));
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
@@ -88,6 +88,7 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos)
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
*eos = true;
if (reached_limit()) {
return Status::OK();
diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 373cff3a9f..083e40518e 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -57,6 +57,7 @@ Status ExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status ExchangeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime");
// TODO: figure out appropriate buffer size
DCHECK_GT(_num_senders, 0);
@@ -75,6 +76,7 @@ Status ExchangeNode::prepare(RuntimeState* state) {
Status ExchangeNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
if (_is_merging) {
RETURN_IF_ERROR(_sort_exec_exprs.open(state));
@@ -129,6 +131,7 @@ Status ExchangeNode::fill_input_row_batch(RuntimeState* state) {
Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (reached_limit()) {
_stream_recvr->transfer_all_resources(output_batch);
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index e3891b6139..2b7855e493 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -57,7 +57,6 @@
#include "runtime/mem_tracker.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 1644ba5165..665d1c9d6c 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -28,11 +28,11 @@
#include "runtime/descriptors.h"
#include "runtime/mem_pool.h"
#include "runtime/query_statistics.h"
+#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/blocking_queue.hpp"
#include "util/runtime_profile.h"
#include "util/uid_util.h" // for print_id
-
#include "vec/exprs/vexpr_context.h"
namespace doris {
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 30cd84441c..a4964f45ec 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -30,7 +30,6 @@
#include "runtime/row_batch.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
@@ -96,6 +95,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_build_pool.reset(new MemPool(mem_tracker().get()));
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
@@ -220,6 +220,7 @@ Status HashJoinNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(Expr::open(_build_expr_ctxs, state));
RETURN_IF_ERROR(Expr::open(_probe_expr_ctxs, state));
@@ -306,6 +307,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo
// it may cause the memory to exceed the limit.
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (reached_limit()) {
*eos = true;
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index df0e30de67..a12673a1b5 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -21,7 +21,6 @@
#include "exprs/expr.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
namespace doris {
IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -44,8 +43,9 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
// 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table
// repeat [2] this for all the rest child
Status IntersectNode::open(RuntimeState* state) {
- RETURN_IF_ERROR(SetOperationNode::open(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Intersect Node, while probing the hash table.");
+ RETURN_IF_ERROR(SetOperationNode::open(state));
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
@@ -88,6 +88,7 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
*eos = true;
if (reached_limit()) {
return Status::OK();
diff --git a/be/src/exec/merge_join_node.cpp b/be/src/exec/merge_join_node.cpp
index d21a1e61e1..453963f601 100644
--- a/be/src/exec/merge_join_node.cpp
+++ b/be/src/exec/merge_join_node.cpp
@@ -71,6 +71,7 @@ Status MergeJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status MergeJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// build and probe exprs are evaluated in the context of the rows produced by our
// right and left children, respectively
@@ -148,6 +149,7 @@ Status MergeJoinNode::close(RuntimeState* state) {
Status MergeJoinNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(Expr::open(_left_expr_ctxs, state));
@@ -171,6 +173,7 @@ Status MergeJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (reached_limit() || _eos) {
*eos = true;
diff --git a/be/src/exec/merge_node.cpp b/be/src/exec/merge_node.cpp
index d92dde402f..816993aa1d 100644
--- a/be/src/exec/merge_node.cpp
+++ b/be/src/exec/merge_node.cpp
@@ -60,6 +60,7 @@ Status MergeNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status MergeNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
DCHECK(_tuple_desc != nullptr);
@@ -90,6 +91,7 @@ Status MergeNode::prepare(RuntimeState* state) {
}
Status MergeNode::open(RuntimeState* state) {
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
// Prepare const expr lists.
for (int i = 0; i < _const_result_expr_ctx_lists.size(); ++i) {
@@ -108,6 +110,7 @@ Status MergeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// Create new tuple buffer for row_batch.
int tuple_buffer_size = row_batch->capacity() * _tuple_desc->byte_size();
void* tuple_buffer = row_batch->tuple_data_pool()->allocate(tuple_buffer_size);
diff --git a/be/src/exec/mysql_scan_node.cpp b/be/src/exec/mysql_scan_node.cpp
index f83a66fdeb..d395f2bf70 100644
--- a/be/src/exec/mysql_scan_node.cpp
+++ b/be/src/exec/mysql_scan_node.cpp
@@ -53,6 +53,7 @@ Status MysqlScanNode::prepare(RuntimeState* state) {
}
RETURN_IF_ERROR(ScanNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -99,6 +100,8 @@ Status MysqlScanNode::prepare(RuntimeState* state) {
}
Status MysqlScanNode::open(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
VLOG_CRITICAL << "MysqlScanNode::Open";
@@ -112,7 +115,6 @@ Status MysqlScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
- SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(_mysql_scanner->open());
RETURN_IF_ERROR(_mysql_scanner->query(_table_name, _columns, _filters, _limit));
@@ -159,6 +161,7 @@ Status MysqlScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// create new tuple buffer for row_batch
int tuple_buffer_size = row_batch->capacity() * _tuple_desc->byte_size();
diff --git a/be/src/exec/odbc_scan_node.cpp b/be/src/exec/odbc_scan_node.cpp
index 17945bf0a2..c2783abeac 100644
--- a/be/src/exec/odbc_scan_node.cpp
+++ b/be/src/exec/odbc_scan_node.cpp
@@ -55,6 +55,7 @@ Status OdbcScanNode::prepare(RuntimeState* state) {
}
RETURN_IF_ERROR(ScanNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -92,6 +93,8 @@ Status OdbcScanNode::prepare(RuntimeState* state) {
}
Status OdbcScanNode::open(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
VLOG_CRITICAL << _scan_node_type << "::Open";
@@ -105,7 +108,6 @@ Status OdbcScanNode::open(RuntimeState* state) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
- SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(_odbc_scanner->open());
RETURN_IF_ERROR(_odbc_scanner->query());
// check materialize slot num
@@ -140,6 +142,7 @@ Status OdbcScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (reached_limit()) {
*eos = true;
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 69217280a9..c2d577b948 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -34,7 +34,6 @@
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
-#include "runtime/thread_context.h"
#include "runtime/tuple_row.h"
#include "util/priority_thread_pool.hpp"
#include "util/priority_work_stealing_thread_pool.hpp"
@@ -177,6 +176,7 @@ void OlapScanNode::_init_counter(RuntimeState* state) {
Status OlapScanNode::prepare(RuntimeState* state) {
init_scan_profile();
RETURN_IF_ERROR(ScanNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// create scanner profile
// create timer
_tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT);
@@ -223,6 +223,7 @@ Status OlapScanNode::prepare(RuntimeState* state) {
Status OlapScanNode::open(RuntimeState* state) {
VLOG_CRITICAL << "OlapScanNode::Open";
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(ExecNode::open(state));
@@ -266,6 +267,7 @@ Status OlapScanNode::open(RuntimeState* state) {
Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// check if Canceled.
if (state->is_cancelled()) {
@@ -1521,6 +1523,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
void OlapScanNode::scanner_thread(OlapScanner* scanner) {
SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker());
+ ADD_THREAD_LOCAL_MEM_TRACKER(scanner->mem_tracker());
if (UNLIKELY(_transfer_done)) {
_scanner_done = true;
std::unique_lock<std::mutex> l(_scan_batches_lock);
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 37e6bafde2..6619c67aa6 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -60,6 +60,7 @@ Status OlapScanner::prepare(
const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
bloom_filters) {
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
set_tablet_reader();
// set limit to reduce end of rowset and segment mem use
_tablet_reader->set_batch_size(
@@ -119,6 +120,7 @@ Status OlapScanner::prepare(
Status OlapScanner::open() {
SCOPED_TIMER(_parent->_reader_init_timer);
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
if (_conjunct_ctxs.size() > _parent->_direct_conjunct_size) {
_use_pushdown_conjuncts = true;
@@ -274,6 +276,7 @@ Status OlapScanner::_init_return_columns() {
}
Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
// 2. Allocate Row's Tuple buf
uint8_t* tuple_buf =
batch->tuple_data_pool()->allocate(state->batch_size() * _tuple_desc->byte_size());
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index a9b89511ea..5c336dffd6 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -93,6 +93,8 @@ public:
const std::vector<SlotDescriptor*>& get_query_slots() const { return _query_slots; }
+ const std::shared_ptr<MemTracker>& mem_tracker() const {return _mem_tracker;}
+
protected:
Status _init_tablet_reader_params(const std::vector<OlapScanRange*>& key_ranges,
const std::vector<TCondition>& filters,
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc
index 59fa6c3293..3108b38d08 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -183,6 +183,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
state_ = state;
mem_pool_.reset(new MemPool(mem_tracker().get()));
@@ -244,6 +245,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) {
Status PartitionedAggregationNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// Open the child before consuming resources in this node.
RETURN_IF_ERROR(child(0)->open(state));
RETURN_IF_ERROR(ExecNode::open(state));
@@ -341,6 +343,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) {
}
Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// 1. `!need_finalize` means this aggregation node not the level two aggregation node
// 2. `grouping_exprs_.size() == 0 ` means is not group by
// 3. `child(0)->rows_returned() == 0` mean not data from child
diff --git a/be/src/exec/repeat_node.cpp b/be/src/exec/repeat_node.cpp
index 401b40e318..086ff64a9e 100644
--- a/be/src/exec/repeat_node.cpp
+++ b/be/src/exec/repeat_node.cpp
@@ -44,6 +44,7 @@ RepeatNode::~RepeatNode() {}
Status RepeatNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_runtime_state = state;
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
if (_tuple_desc == nullptr) {
@@ -55,6 +56,7 @@ Status RepeatNode::prepare(RuntimeState* state) {
Status RepeatNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(0)->open(state));
@@ -163,6 +165,7 @@ Status RepeatNode::get_repeated_batch(RowBatch* child_row_batch, int repeat_id_i
Status RepeatNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
DCHECK(_repeat_id_idx >= 0);
for (const std::vector<int64_t>& v : _grouping_list) {
diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp
index fae3a1dd9e..69feae25fd 100644
--- a/be/src/exec/schema_scan_node.cpp
+++ b/be/src/exec/schema_scan_node.cpp
@@ -99,6 +99,7 @@ Status SchemaScanNode::prepare(RuntimeState* state) {
}
RETURN_IF_ERROR(ScanNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
// new one mem pool
_tuple_pool.reset(new (std::nothrow) MemPool());
@@ -196,6 +197,7 @@ Status SchemaScanNode::open(RuntimeState* state) {
}
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(ExecNode::open(state));
@@ -242,6 +244,7 @@ Status SchemaScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (reached_limit()) {
*eos = true;
diff --git a/be/src/exec/select_node.cpp b/be/src/exec/select_node.cpp
index a6db3a402d..4cef98f504 100644
--- a/be/src/exec/select_node.cpp
+++ b/be/src/exec/select_node.cpp
@@ -33,11 +33,13 @@ SelectNode::SelectNode(ObjectPool* pool, const TPlanNode& tnode, const Descripto
Status SelectNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_child_row_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size()));
return Status::OK();
}
Status SelectNode::open(RuntimeState* state) {
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(child(0)->open(state));
@@ -48,6 +50,7 @@ Status SelectNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
if (reached_limit() || (_child_row_idx == _child_row_batch->num_rows() && _child_eos)) {
// we're already done or we exhausted the last child batch and there won't be any
diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp
index 5574d3b90c..4b4ed9f828 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -23,7 +23,6 @@
#include "runtime/raw_value.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
namespace doris {
SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -41,13 +40,14 @@ Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
Status SetOperationNode::prepare(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
DCHECK(_tuple_desc != nullptr);
_build_pool.reset(new MemPool(mem_tracker().get()));
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
- SCOPED_TIMER(_runtime_profile->total_time_counter());
for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
expr_mem_tracker()));
@@ -138,8 +138,9 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* other) {
Status SetOperationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("SetOperation, while constructing the hash table.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("SetOperation, while constructing the hash table.");
RETURN_IF_CANCELLED(state);
// open result expr lists.
for (const std::vector<ExprContext*>& exprs : _child_expr_lists) {
diff --git a/be/src/exec/spill_sort_node.cc b/be/src/exec/spill_sort_node.cc
index 58802741c0..c71deb5bac 100644
--- a/be/src/exec/spill_sort_node.cc
+++ b/be/src/exec/spill_sort_node.cc
@@ -44,6 +44,7 @@ Status SpillSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status SpillSortNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor,
expr_mem_tracker()));
// AddExprCtxsToFree(_sort_exec_exprs);
@@ -52,6 +53,7 @@ Status SpillSortNode::prepare(RuntimeState* state) {
Status SpillSortNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(_sort_exec_exprs.open(state));
RETURN_IF_CANCELLED(state);
@@ -82,6 +84,7 @@ Status SpillSortNode::open(RuntimeState* state) {
Status SpillSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT, state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
diff --git a/be/src/exec/table_function_node.cpp b/be/src/exec/table_function_node.cpp
index 3b44b436ea..ee225a0610 100644
--- a/be/src/exec/table_function_node.cpp
+++ b/be/src/exec/table_function_node.cpp
@@ -80,6 +80,7 @@ Status TableFunctionNode::_prepare_output_slot_ids(const TPlanNode& tnode) {
Status TableFunctionNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", TUnit::UNIT);
@@ -92,6 +93,7 @@ Status TableFunctionNode::prepare(RuntimeState* state) {
Status TableFunctionNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(ExecNode::open(state));
@@ -186,6 +188,7 @@ bool TableFunctionNode::_roll_table_functions(int last_eos_idx) {
Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
const RowDescriptor& parent_rowdesc = row_batch->row_desc();
const RowDescriptor& child_rowdesc = _children[0]->row_desc();
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 96ffd04bbd..27893cb2a0 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -73,6 +73,7 @@ NodeChannel::~NodeChannel() noexcept {
// no need to set _cancel_msg because the error will be
// returned directly via "TabletSink::prepare()" method.
Status NodeChannel::init(RuntimeState* state) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
_tuple_desc = _parent->_output_tuple_desc;
auto node = _parent->_nodes_info->find_node(_node_id);
if (node == nullptr) {
@@ -115,6 +116,7 @@ Status NodeChannel::init(RuntimeState* state) {
}
void NodeChannel::open() {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
PTabletWriterOpenRequest request;
request.set_allocated_id(&_parent->_load_id);
request.set_index_id(_index_channel->_index_id);
@@ -160,6 +162,7 @@ void NodeChannel::_cancel_with_msg(const std::string& msg) {
Status NodeChannel::open_wait() {
_open_closure->join();
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
if (_open_closure->cntl.Failed()) {
if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
_stub, _node_info.host, _node_info.brpc_port)) {
@@ -249,6 +252,7 @@ Status NodeChannel::open_wait() {
}
Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
// If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
@@ -300,6 +304,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
// Used for vectorized engine.
// TODO(cmy): deprecated, need refactor
Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
// If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
@@ -350,6 +355,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
}
void NodeChannel::mark_close() {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
return;
@@ -373,6 +379,7 @@ void NodeChannel::mark_close() {
}
Status NodeChannel::close_wait(RuntimeState* state) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
// set _is_closed to true finally
Defer set_closed {[&]() {
std::lock_guard<std::mutex> l(_closed_lock);
@@ -422,6 +429,7 @@ Status NodeChannel::close_wait(RuntimeState* state) {
}
void NodeChannel::cancel(const std::string& cancel_msg) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
// set _is_closed to true finally
Defer set_closed {[&]() {
std::lock_guard<std::mutex> l(_closed_lock);
@@ -574,6 +582,7 @@ Status NodeChannel::none_of(std::initializer_list<bool> vars) {
}
void NodeChannel::clear_all_batches() {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
std::lock_guard<std::mutex> lg(_pending_batches_lock);
std::queue<AddBatchReq> empty;
std::swap(_pending_batches, empty);
@@ -583,6 +592,7 @@ void NodeChannel::clear_all_batches() {
IndexChannel::~IndexChannel() {}
Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
for (auto& tablet : tablets) {
auto location = _parent->_location->find_tablet(tablet.tablet_id);
if (location == nullptr) {
@@ -615,6 +625,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
}
void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
auto it = _channels_by_tablet.find(tablet_id);
DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id;
for (auto channel : it->second) {
@@ -630,6 +641,7 @@ void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
// Used for vectorized engine.
// TODO(cmy): deprecated, need refactor
void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
auto it = _channels_by_tablet.find(tablet_id);
DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id;
for (auto channel : it->second) {
@@ -760,6 +772,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
MemTracker::create_tracker(-1, "OlapTableSink:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
SCOPED_TIMER(_profile->total_time_counter());
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
// Prepare the exprs to run.
RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _input_row_desc, _expr_mem_tracker));
@@ -863,6 +876,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
Status OlapTableSink::open(RuntimeState* state) {
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
// Prepare the exprs to run.
RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));
@@ -901,6 +915,7 @@ Status OlapTableSink::open(RuntimeState* state) {
Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
SCOPED_TIMER(_profile->total_time_counter());
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
int64_t num_rows = input_batch->num_rows();
diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp
index 9f7cb71fc3..2707fe4990 100644
--- a/be/src/exec/topn_node.cpp
+++ b/be/src/exec/topn_node.cpp
@@ -59,6 +59,7 @@ Status TopNNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status TopNNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_tuple_pool.reset(new MemPool(mem_tracker().get()));
RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor,
expr_mem_tracker()));
@@ -75,6 +76,7 @@ Status TopNNode::prepare(RuntimeState* state) {
Status TopNNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("Top n, before open."));
@@ -127,6 +129,7 @@ Status TopNNode::open(RuntimeState* state) {
Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("Top n, before moving result to row_batch."));
diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp
index d9c389f823..8049861c8c 100644
--- a/be/src/exec/union_node.cpp
+++ b/be/src/exec/union_node.cpp
@@ -67,6 +67,7 @@ Status UnionNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status UnionNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
DCHECK(_tuple_desc != nullptr);
_materialize_exprs_evaluate_timer =
@@ -93,6 +94,7 @@ Status UnionNode::prepare(RuntimeState* state) {
Status UnionNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
// open const expr lists.
for (const std::vector<ExprContext*>& exprs : _const_expr_lists) {
@@ -231,6 +233,7 @@ Status UnionNode::get_next_const(RuntimeState* state, RowBatch* row_batch) {
Status UnionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
// TODO(zc)
diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp
index e3bb2a43b9..779f3dd5d7 100644
--- a/be/src/exprs/expr_context.cpp
+++ b/be/src/exprs/expr_context.cpp
@@ -28,6 +28,7 @@
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
#include "udf/udf_internal.h"
#include "util/debug_util.h"
#include "util/stack_util.h"
@@ -52,6 +53,7 @@ Status ExprContext::prepare(RuntimeState* state, const RowDescriptor& row_desc,
const std::shared_ptr<MemTracker>& tracker) {
DCHECK(!_prepared);
_mem_tracker = tracker;
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
DCHECK(_pool.get() == nullptr);
_prepared = true;
_pool.reset(new MemPool(_mem_tracker.get()));
@@ -63,6 +65,7 @@ Status ExprContext::open(RuntimeState* state) {
if (_opened) {
return Status::OK();
}
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_opened = true;
// Fragment-local state is only initialized for original contexts. Clones inherit the
// original's fragment state and only need to have thread-local state initialized.
@@ -108,6 +111,7 @@ Status ExprContext::clone(RuntimeState* state, ExprContext** new_ctx) {
DCHECK(_prepared);
DCHECK(_opened);
DCHECK(*new_ctx == nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
*new_ctx = state->obj_pool()->add(new ExprContext(_root));
(*new_ctx)->_pool.reset(new MemPool(_pool->mem_tracker()));
@@ -127,6 +131,7 @@ Status ExprContext::clone(RuntimeState* state, ExprContext** new_ctx, Expr* root
DCHECK(_prepared);
DCHECK(_opened);
DCHECK(*new_ctx == nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
*new_ctx = state->obj_pool()->add(new ExprContext(root));
(*new_ctx)->_pool.reset(new MemPool(_pool->mem_tracker()));
@@ -143,6 +148,7 @@ Status ExprContext::clone(RuntimeState* state, ExprContext** new_ctx, Expr* root
}
void ExprContext::free_local_allocations() {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
free_local_allocations(_fn_contexts);
}
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index d379957761..e0de10534f 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -19,7 +19,6 @@
#include "util/doris_metrics.h"
#include "util/trace.h"
-#include "runtime/thread_context.h"
namespace doris {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 74f52f5a9c..df37735ca9 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -250,7 +250,8 @@ OLAPStatus DeltaWriter::wait_flush() {
void DeltaWriter::_reset_mem_table() {
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots,
- _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get()));
+ _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
+ _mem_tracker));
}
OLAPStatus DeltaWriter::close() {
@@ -284,10 +285,6 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
// return error if previous flush failed
RETURN_NOT_OK(_flush_token->wait());
- // Cannot directly DCHECK_EQ(_mem_tracker->consumption(), 0);
- // In allocate/free of mem_pool, the consume_cache of _mem_tracker will be called,
- // and _untracked_mem must be flushed first.
- MemTracker::memory_leak_check(_mem_tracker.get());
// use rowset meta manager to save meta
_cur_rowset = _rowset_writer->build();
@@ -330,7 +327,6 @@ OLAPStatus DeltaWriter::cancel() {
// cancel and wait all memtables in flush queue to be finished
_flush_token->cancel();
}
- MemTracker::memory_leak_check(_mem_tracker.get());
_is_cancelled = true;
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 5d5649907f..0f5ebaa395 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -344,7 +344,7 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value,
return reinterpret_cast<Cache::Handle*>(e);
}
-void LRUCache::erase(const CacheKey& key, uint32_t hash, MemTracker* tracker) {
+void LRUCache::erase(const CacheKey& key, uint32_t hash) {
LRUHandle* e = nullptr;
bool last_ref = false;
{
@@ -443,15 +443,12 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
: _name(name),
_last_id(1),
_mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(_mem_tracker);
const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
_shards[s] = new LRUCache(type);
_shards[s]->set_capacity(per_shard);
}
- // After the lru cache is created in the main thread, the main thread will not switch to the
- // lru cache mem tracker again, so manually clear the untracked mem in tls.
- thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
_entity = DorisMetrics::instance()->metric_registry()->register_entity(
std::string("lru_cache:") + name, {{"name", name}});
@@ -499,7 +496,7 @@ void ShardedLRUCache::release(Handle* handle) {
void ShardedLRUCache::erase(const CacheKey& key) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
- _shards[_shard(hash)]->erase(key, hash, _mem_tracker.get());
+ _shards[_shard(hash)]->erase(key, hash);
}
void* ShardedLRUCache::value(Handle* handle) {
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 9a20843bd3..21e6e9742d 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -316,7 +316,7 @@ public:
CachePriority priority = CachePriority::NORMAL);
Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
void release(Cache::Handle* handle);
- void erase(const CacheKey& key, uint32_t hash, MemTracker* tracker);
+ void erase(const CacheKey& key, uint32_t hash);
int64_t prune();
int64_t prune_if(CacheValuePredicate pred);
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2ab6ab1b92..7b92121752 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -31,13 +31,14 @@ namespace doris {
MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
- KeysType keys_type, RowsetWriter* rowset_writer)
+ KeysType keys_type, RowsetWriter* rowset_writer,
+ const std::shared_ptr<MemTracker>& parent_tracker)
: _tablet_id(tablet_id),
_schema(schema),
_tablet_schema(tablet_schema),
_slot_descs(slot_descs),
_keys_type(keys_type),
- _mem_tracker(MemTracker::create_tracker(-1, "MemTable")),
+ _mem_tracker(MemTracker::create_tracker(-1, "MemTable", parent_tracker)),
_buffer_mem_pool(new MemPool(_mem_tracker.get())),
_table_mem_pool(new MemPool(_mem_tracker.get())),
_schema_size(_schema->schema_size()),
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index db0e6a12cf..6849bf45f4 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -40,7 +40,8 @@ class MemTable {
public:
MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
- KeysType keys_type, RowsetWriter* rowset_writer);
+ KeysType keys_type, RowsetWriter* rowset_writer,
+ const std::shared_ptr<MemTracker>& parent_tracker);
~MemTable();
int64_t tablet_id() const { return _tablet_id; }
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index b63074d282..75ae89136e 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -22,6 +22,7 @@
#include "olap/memtable.h"
#include "util/scoped_cleanup.h"
#include "util/time.h"
+#include "runtime/thread_context.h"
namespace doris {
@@ -56,6 +57,7 @@ OLAPStatus FlushToken::wait() {
}
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable, int64_t submit_task_time) {
+ SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, memtable->mem_tracker());
_stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time);
SCOPED_CLEANUP({ memtable.reset(); });
// If previous flush has failed, return directly
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index 37ef78925b..7f78176164 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -16,6 +16,7 @@
// under the License.
#include "olap/page_cache.h"
+#include "runtime/thread_context.h"
namespace doris {
@@ -31,6 +32,7 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta
: _index_cache_percentage(index_cache_percentage),
_mem_tracker(MemTracker::create_tracker(capacity, "StoragePageCache", nullptr,
MemTrackerLevel::OVERVIEW)) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
if (index_cache_percentage == 0) {
_data_page_cache =
std::unique_ptr<Cache>(new_lru_cache("DataPageCache", capacity));
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ee7589be3c..dfc35a0bcd 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -37,7 +37,6 @@
#include "runtime/exec_env.h"
#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
-#include "runtime/thread_context.h"
#include "util/defer_op.h"
using std::deque;
@@ -50,9 +49,6 @@ using std::vector;
namespace doris {
-DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(schema_change_mem_consumption, MetricUnit::BYTES, "",
- mem_consumption, Labels({{"type", "schema_change"}}));
-
class RowBlockSorter {
public:
explicit RowBlockSorter(RowBlockAllocator* allocator);
@@ -1386,15 +1382,9 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
return true;
}
-SchemaChangeHandler::SchemaChangeHandler()
- : _mem_tracker(MemTracker::create_tracker(-1, "SchemaChangeHandler", StorageEngine::instance()->schema_change_mem_tracker())) {
- REGISTER_HOOK_METRIC(schema_change_mem_consumption,
- [this]() { return _mem_tracker->consumption(); });
-}
+SchemaChangeHandler::SchemaChangeHandler() {}
-SchemaChangeHandler::~SchemaChangeHandler() {
- DEREGISTER_HOOK_METRIC(schema_change_mem_consumption);
-}
+SchemaChangeHandler::~SchemaChangeHandler() {}
OLAPStatus SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
@@ -1497,13 +1487,6 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
reader_context.seek_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
- // TODO(zxy) switch to tls mem tracker
- auto mem_tracker = MemTracker::create_tracker(
- -1,
- "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-" +
- std::to_string(new_tablet->tablet_id()),
- _mem_tracker, MemTrackerLevel::TASK);
-
do {
// get history data to be converted and it will check if there is hold in base tablet
res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed);
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 2db9a8f189..9b699bb35e 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -242,8 +242,6 @@ private:
virtual ~SchemaChangeHandler();
SchemaChangeHandler(const SchemaChangeHandler&) = delete;
SchemaChangeHandler& operator=(const SchemaChangeHandler&) = delete;
-
- std::shared_ptr<MemTracker> _mem_tracker;
};
using RowBlockDeleter = std::function<void(RowBlock*)>;
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 4d23cc27ad..797cd59564 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -36,6 +36,7 @@
#include "olap/rowset/rowset_id_generator.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/storage_engine.h"
+#include "runtime/thread_context.h"
using std::filesystem::path;
using std::map;
@@ -63,6 +64,7 @@ SnapshotManager* SnapshotManager::instance() {
OLAPStatus SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* snapshot_path,
bool* allow_incremental_clone) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
OLAPStatus res = OLAP_SUCCESS;
if (snapshot_path == nullptr) {
LOG(WARNING) << "output parameter cannot be null";
@@ -90,6 +92,7 @@ OLAPStatus SnapshotManager::make_snapshot(const TSnapshotRequest& request, strin
}
OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
// 如果请求的snapshot_path位于root/snapshot文件夹下,则认为是合法的,可以删除
// 否则认为是非法请求,返回错误结果
auto stores = StorageEngine::instance()->get_stores();
@@ -120,6 +123,7 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
// AlphaRowsetMeta here.
OLAPStatus SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_desc, int64_t tablet_id,
const int32_t& schema_hash) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
OLAPStatus res = OLAP_SUCCESS;
// check clone dir existed
if (!FileUtils::check_exist(clone_dir_desc.filepath)) {
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index 4846fbba2b..c13a133ab6 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -103,7 +103,6 @@ private:
Mutex _snapshot_mutex;
uint64_t _snapshot_base_id;
- // TODO(zxy) used after
std::shared_ptr<MemTracker> _mem_tracker = nullptr;
}; // SnapshotManager
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 830f1932a1..f56c37c8e1 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -85,6 +85,8 @@ namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(compaction_mem_consumption, MetricUnit::BYTES, "",
mem_consumption, Labels({{"type", "compaction"}}));
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(schema_change_mem_consumption, MetricUnit::BYTES, "",
+ mem_consumption, Labels({{"type", "schema_change"}}));
StorageEngine* StorageEngine::_s_instance = nullptr;
@@ -142,11 +144,14 @@ StorageEngine::StorageEngine(const EngineOptions& options)
REGISTER_HOOK_METRIC(compaction_mem_consumption, [this]() {
return _compaction_mem_tracker->consumption();
});
+ REGISTER_HOOK_METRIC(schema_change_mem_consumption,
+ [this]() { return _schema_change_mem_tracker->consumption(); });
}
StorageEngine::~StorageEngine() {
DEREGISTER_HOOK_METRIC(unused_rowsets_count);
DEREGISTER_HOOK_METRIC(compaction_mem_consumption);
+ DEREGISTER_HOOK_METRIC(schema_change_mem_consumption);
_clear();
if (_compaction_thread_pool) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 7bfbb77e5c..4a73c21e79 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -333,6 +333,7 @@ private:
// Count the memory consumption of all SchemaChange tasks.
std::shared_ptr<MemTracker> _schema_change_mem_tracker;
// Count the memory consumption of all EngineCloneTask.
+ // Note: Memory that does not contain make/release snapshots.
std::shared_ptr<MemTracker> _clone_mem_tracker;
// Count the memory consumption of all EngineBatchLoadTask.
std::shared_ptr<MemTracker> _batch_load_mem_tracker;
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp
index 4af633c0c6..228940975f 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -19,6 +19,7 @@
#include "olap/schema_change.h"
#include "runtime/mem_tracker.h"
+#include "runtime/thread_context.h"
namespace doris {
@@ -35,6 +36,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
}
OLAPStatus EngineAlterTabletTask::execute() {
+ SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
auto schema_change_handler = SchemaChangeHandler::instance();
diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp
index ad9e264244..f0fce81bbd 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -37,6 +37,7 @@
#include "olap/tablet.h"
#include "util/doris_metrics.h"
#include "util/pretty_printer.h"
+#include "runtime/thread_context.h"
using apache::thrift::ThriftDebugString;
using std::list;
@@ -60,6 +61,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet
EngineBatchLoadTask::~EngineBatchLoadTask() {}
OLAPStatus EngineBatchLoadTask::execute() {
+ SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
Status status = Status::OK();
if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_DELETE ||
_push_req.push_type == TPushType::LOAD_V2) {
diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp
index 7b425a12d7..5068b72a29 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -19,6 +19,7 @@
#include "olap/tuple_reader.h"
#include "olap/row.h"
+#include "runtime/thread_context.h"
namespace doris {
@@ -34,6 +35,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h
}
OLAPStatus EngineChecksumTask::execute() {
+ SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
OLAPStatus res = _compute_checksum();
return res;
} // execute
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index b33b1c91c8..8fa5818ea8 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -30,6 +30,7 @@
#include "olap/rowset/rowset_factory.h"
#include "olap/snapshot_manager.h"
#include "runtime/client_cache.h"
+#include "runtime/thread_context.h"
#include "util/thrift_rpc_helper.h"
using std::set;
@@ -63,6 +64,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
OLAPStatus EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone process
+ SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id);
OLAPStatus st = _do_clone();
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc
index 837a6eb835..1232cceeb5 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -28,6 +28,7 @@
#include "runtime/data_stream_mgr.h"
#include "runtime/row_batch.h"
#include "runtime/sorted_run_merger.h"
+#include "runtime/thread_context.h"
#include "util/debug_util.h"
#include "util/logging.h"
#include "util/runtime_profile.h"
@@ -362,6 +363,7 @@ void DataStreamRecvr::SenderQueue::close() {
Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) {
DCHECK(_is_merging);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
vector<SortedRunMerger::RunBatchSupplier> child_input_batch_suppliers;
// Create the merger that will a single stream of sorted rows.
_merger.reset(new SortedRunMerger(less_than, &_row_desc, _profile, false));
@@ -377,6 +379,7 @@ Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) {
Status DataStreamRecvr::create_parallel_merger(const TupleRowComparator& less_than,
uint32_t batch_size) {
DCHECK(_is_merging);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
vector<SortedRunMerger::RunBatchSupplier> child_input_batch_suppliers;
// Create the merger that will a single stream of sorted rows.
@@ -422,6 +425,7 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) {
// _child_mergers is not empty, means use parallel merge need transfer resource from
// _sender queue.
// the need transfer resources from child_merger input_row_batch
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
if (!_child_mergers.empty()) {
_merger->transfer_all_resources(transfer_batch);
} else {
@@ -470,17 +474,20 @@ DataStreamRecvr::DataStreamRecvr(
Status DataStreamRecvr::get_next(RowBatch* output_batch, bool* eos) {
DCHECK(_merger.get() != nullptr);
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
return _merger->get_next(output_batch, eos);
}
void DataStreamRecvr::add_batch(const PRowBatch& batch, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int use_sender_id = _is_merging ? sender_id : 0;
// Add all batches to the same queue if _is_merging is false.
_sender_queues[use_sender_id]->add_batch(batch, be_number, packet_seq, done);
}
void DataStreamRecvr::add_batch(RowBatch* batch, int sender_id, bool use_move) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->add_batch(batch, use_move);
}
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index cdd3742515..dae7b8a621 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -39,6 +39,7 @@
#include "runtime/raw_value.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
#include "runtime/tuple_row.h"
#include "service/backend_options.h"
#include "service/brpc.h"
@@ -388,10 +389,10 @@ Status DataStreamSender::prepare(RuntimeState* state) {
<< "])";
_profile = _pool->add(new RuntimeProfile(title.str()));
SCOPED_TIMER(_profile->total_time_counter());
- // TODO(zxy) used after
_mem_tracker = MemTracker::create_tracker(
-1, "DataStreamSender:" + print_id(state->fragment_instance_id()),
- state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _profile);
+ nullptr, MemTrackerLevel::VERBOSE, _profile);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
std::random_device rd;
@@ -432,6 +433,7 @@ DataStreamSender::~DataStreamSender() {
Status DataStreamSender::open(RuntimeState* state) {
DCHECK(state != nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(Expr::open(_partition_expr_ctxs, state));
for (auto iter : _partition_infos) {
RETURN_IF_ERROR(iter->open(state));
@@ -441,6 +443,7 @@ Status DataStreamSender::open(RuntimeState* state) {
Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
SCOPED_TIMER(_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
// Unpartition or _channel size
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc
index 3c2315b308..f910f96668 100644
--- a/be/src/runtime/disk_io_mgr.cc
+++ b/be/src/runtime/disk_io_mgr.cc
@@ -283,6 +283,7 @@ DiskIoMgr::DiskIoMgr()
{
_mem_tracker =
MemTracker::create_tracker(-1, "DiskIO", nullptr, MemTrackerLevel::OVERVIEW);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t max_buffer_size_scaled = bit_ceil(_max_buffer_size, _min_buffer_size);
_free_buffers.resize(bit_log2(max_buffer_size_scaled) + 1);
int num_local_disks = (config::num_disks == 0 ? DiskInfo::num_disks() : config::num_disks);
@@ -305,6 +306,7 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_disk, int min_buffer_s
{
_mem_tracker =
MemTracker::create_tracker(-1, "DiskIO", nullptr, MemTrackerLevel::OVERVIEW);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t max_buffer_size_scaled = bit_ceil(_max_buffer_size, _min_buffer_size);
_free_buffers.resize(bit_log2(max_buffer_size_scaled) + 1);
if (num_local_disks == 0) {
@@ -370,6 +372,7 @@ DiskIoMgr::~DiskIoMgr() {
}
Status DiskIoMgr::init(const int64_t mem_limit) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_mem_tracker->set_limit(mem_limit);
// If we hit the process limit, see if we can reclaim some memory by removing
// previously allocated (but unused) io buffers.
@@ -456,6 +459,7 @@ void DiskIoMgr::unregister_context(RequestContext* reader) {
// is on.
// If wait_for_disks_completion is true, wait for the number of active disks to become 0.
void DiskIoMgr::cancel_context(RequestContext* context, bool wait_for_disks_completion) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
context->cancel(Status::Cancelled("Cancelled"));
if (wait_for_disks_completion) {
@@ -536,6 +540,7 @@ Status DiskIoMgr::add_scan_ranges(RequestContext* reader, const vector<ScanRange
if (ranges.empty()) {
return Status::OK();
}
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
// Validate and initialize all ranges
for (int i = 0; i < ranges.size(); ++i) {
@@ -584,6 +589,7 @@ Status DiskIoMgr::add_scan_ranges(RequestContext* reader, const vector<ScanRange
Status DiskIoMgr::get_next_range(RequestContext* reader, ScanRange** range) {
DCHECK(reader != nullptr);
DCHECK(range != nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
*range = nullptr;
Status status = Status::OK();
@@ -638,6 +644,7 @@ Status DiskIoMgr::get_next_range(RequestContext* reader, ScanRange** range) {
Status DiskIoMgr::read(RequestContext* reader, ScanRange* range, BufferDescriptor** buffer) {
DCHECK(range != nullptr);
DCHECK(buffer != nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
*buffer = nullptr;
if (range->len() > _max_buffer_size) {
@@ -658,6 +665,7 @@ Status DiskIoMgr::read(RequestContext* reader, ScanRange* range, BufferDescripto
void DiskIoMgr::return_buffer(BufferDescriptor* buffer_desc) {
DCHECK(buffer_desc != nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
if (!buffer_desc->_status.ok()) {
DCHECK(buffer_desc->_buffer == nullptr);
}
@@ -1168,6 +1176,7 @@ int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
Status DiskIoMgr::add_write_range(RequestContext* writer, WriteRange* write_range) {
DCHECK_LE(write_range->len(), _max_buffer_size);
unique_lock<mutex> writer_lock(writer->_lock);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
if (writer->_state == RequestContext::Cancelled) {
DCHECK(!writer->_status.ok());
diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp
index 6a3e69a02c..9b5300f16a 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -22,6 +22,7 @@
#include "runtime/tuple_row.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
#include "runtime/mem_tracker.h"
#include "exprs/expr_context.h"
#include "exprs/expr.h"
@@ -43,6 +44,7 @@ TUniqueId FoldConstantExecutor::_dummy_id;
Status FoldConstantExecutor::fold_constant_expr(
const TFoldConstantParams& params, PConstantExprResult* response) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const auto& expr_map = params.expr_map;
auto expr_result_map = response->mutable_expr_result_map();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 95a50b5ded..766e00919d 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -20,6 +20,7 @@
#include "olap/lru_cache.h"
#include "runtime/mem_tracker.h"
#include "runtime/tablets_channel.h"
+#include "runtime/thread_context.h"
namespace doris {
@@ -42,6 +43,7 @@ LoadChannel::~LoadChannel() {
}
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t index_id = params.index_id();
std::shared_ptr<TabletsChannel> channel;
{
@@ -66,6 +68,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
PTabletWriterAddBatchResult* response) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t index_id = request.index_id();
// 1. get tablets channel
std::shared_ptr<TabletsChannel> channel;
@@ -152,6 +155,7 @@ bool LoadChannel::is_finished() {
}
Status LoadChannel::cancel() {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
for (auto& it : _tablets_channels) {
it.second->cancel();
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index e5e23f5110..1596903ac3 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -21,6 +21,7 @@
#include "olap/lru_cache.h"
#include "runtime/load_channel.h"
#include "runtime/mem_tracker.h"
+#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
#include "util/stopwatch.hpp"
@@ -87,6 +88,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
_mem_tracker = MemTracker::create_tracker(load_mem_limit, "LoadChannelMgr",
MemTracker::get_process_tracker(),
MemTrackerLevel::OVERVIEW);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
REGISTER_HOOK_METRIC(load_channel_mem_consumption, [this]() { return _mem_tracker->consumption(); });
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
RETURN_IF_ERROR(_start_bg_worker());
@@ -94,6 +96,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
}
Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> channel;
{
@@ -126,6 +129,7 @@ static void dummy_deleter(const CacheKey& key, void* value) {}
Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
PTabletWriterAddBatchResult* response) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(request.id());
// 1. get load channel
std::shared_ptr<LoadChannel> channel;
@@ -208,6 +212,7 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
}
Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> cancelled_channel;
{
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index b4ee28d9fa..002411c737 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -25,7 +25,6 @@
#include "gutil/once.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/pretty_printer.h"
#include "util/string_util.h"
@@ -119,7 +118,7 @@ MemTracker::MemTracker(int64_t byte_limit, const std::string& label,
RuntimeProfile* profile)
: _limit(byte_limit),
_label(label),
- _id(_label + std::to_string(GetCurrentTimeMicros()) + std::to_string(rand())),
+ _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()),
_parent(parent),
_level(level) {
if (profile == nullptr) {
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 89a1d2e0c7..9ae3ab906a 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -402,7 +402,7 @@ public:
return tracker == nullptr ? false : true;
}
- std::string id() { return _id; }
+ int64_t id() { return _id; }
std::string debug_string() {
std::stringstream msg;
@@ -467,7 +467,7 @@ private:
std::string _label;
- std::string _id;
+ int64_t _id;
std::shared_ptr<MemTracker> _parent; // The parent of this tracker.
diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp
index 9aec083c6d..5d4c7b34f9 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -119,13 +119,10 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit)
_arenas(CpuInfo::get_max_num_cores()) {
_mem_tracker =
MemTracker::create_tracker(-1, "ChunkAllocator", nullptr, MemTrackerLevel::OVERVIEW);
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(_mem_tracker);
for (int i = 0; i < _arenas.size(); ++i) {
_arenas[i].reset(new ChunkArena());
}
- // After the ChunkAllocator is created in the main thread, the main thread will not switch to the
- // chunk allocator mem tracker again, so manually clear the untracked mem in tls.
- thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
_chunk_allocator_metric_entity =
DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator");
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 6518391218..a5bb1df0c0 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -85,6 +85,7 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt
const TQueryOptions& options, int node_id) {
DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) ||
role != RuntimeFilterRole::CONSUMER);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_tracker);
int32_t key = desc.filter_id;
std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
@@ -112,6 +113,7 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt
}
Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, const char* data) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_tracker);
UpdateRuntimeFilterParams params;
params.request = request;
params.data = data;
@@ -156,7 +158,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
// LOG(INFO) << "entity filter id:" << filter_id;
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, _fragment_instance_id);
cntVal->tracker = MemTracker::create_tracker(
- -1, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id);
+ -1, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id,
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker());
_filter_map.emplace(filter_id, cntVal);
return Status::OK();
}
@@ -166,8 +169,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag
const TQueryOptions& query_options) {
_query_id = query_id;
_fragment_instance_id = fragment_instance_id;
- // TODO(zxy) used after
_mem_tracker = MemTracker::create_tracker(-1, "RuntimeFilterMergeControllerEntity", nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
int filter_id = filterid_to_desc.first;
const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
@@ -186,6 +189,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag
// merge data
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
const char* data) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
int merged_size = 0;
{
@@ -197,6 +201,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
return Status::InvalidArgument("unknown filter id");
}
cntVal = iter->second;
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(cntVal->tracker);
MergeRuntimeFilterParams params;
params.data = data;
params.request = request;
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index e4e0b78a1b..40687f07a2 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -23,6 +23,7 @@
#include "olap/memtable.h"
#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
+#include "runtime/thread_context.h"
#include "util/doris_metrics.h"
namespace doris {
@@ -50,6 +51,7 @@ TabletsChannel::~TabletsChannel() {
}
Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
if (_state == kOpened) {
// Normal case, already open by other sender
@@ -77,6 +79,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request,
PTabletWriterAddBatchResult* response) {
DCHECK(request.tablet_ids_size() == request.row_batch().num_rows());
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t cur_seq;
{
std::lock_guard<std::mutex> l(_lock);
@@ -202,6 +205,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
}
Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
// TabletsChannel is closed without LoadChannel's lock,
@@ -301,6 +305,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
}
Status TabletsChannel::cancel() {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
return _close_status;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 7c3beafc61..a7c8261930 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -42,12 +42,21 @@
// tracker again in the short term, can consider manually clear_untracked_mems.
// The query thread will automatically clear_untracked_mems when detach_task.
#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
- auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, false)
+ auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<false>(mem_tracker, false)
#define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
- auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, true);
+ auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<false>(mem_tracker, true);
+#define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \
+ auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<true>(mem_tracker, true)
+// After the non-query thread switches the mem tracker, if the thread will not switch the mem
+// tracker again in the short term, can consider manually clear_untracked_mems.
+// The query thread will automatically clear_untracked_mems when detach_task.
+#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(mem_tracker) \
+ auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTrackerEndClear(mem_tracker)
#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \
auto VARNAME_LINENUM(witch_tracker_cb) = \
SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__)
+#define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->add_tracker(mem_tracker)
namespace doris {
@@ -67,10 +76,11 @@ public:
UNKNOWN = 0,
QUERY = 1,
LOAD = 2,
- COMPACTION = 3
+ COMPACTION = 3,
+ STORAGE = 4
// to be added ...
};
- inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION"};
+ inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", "STORAGE"};
public:
ThreadContext() : _thread_id(std::this_thread::get_id()), _type(TaskType::UNKNOWN) {
@@ -83,7 +93,9 @@ public:
void attach(const TaskType& type, const std::string& task_id,
const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTracker>& mem_tracker) {
- DCHECK(_type == TaskType::UNKNOWN && _task_id == "");
+ DCHECK(_type == TaskType::UNKNOWN && _task_id == "")
+ << ",old tracker label: " << mem_tracker->label()
+ << ",new tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label();
_type = type;
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
@@ -177,13 +189,17 @@ public:
explicit AttachTaskThread(const ThreadContext::TaskType& type,
const std::shared_ptr<MemTracker>& mem_tracker) {
+#ifndef BE_TEST
DCHECK(mem_tracker);
+#endif
thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker);
}
explicit AttachTaskThread(const TQueryType::type& query_type,
const std::shared_ptr<MemTracker>& mem_tracker) {
+#ifndef BE_TEST
DCHECK(mem_tracker);
+#endif
thread_local_ctx.get()->attach(query_to_task_type(query_type), "", TUniqueId(),
mem_tracker);
}
@@ -191,9 +207,11 @@ public:
explicit AttachTaskThread(const TQueryType::type& query_type, const std::string& task_id,
const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTracker>& mem_tracker) {
+#ifndef BE_TEST
DCHECK(task_id != "");
DCHECK(fragment_instance_id != TUniqueId());
DCHECK(mem_tracker);
+#endif
thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id,
fragment_instance_id, mem_tracker);
}
@@ -204,10 +222,10 @@ public:
DCHECK(print_id(runtime_state->query_id()) != "");
DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
DCHECK(mem_tracker);
+#endif
thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()),
print_id(runtime_state->query_id()),
runtime_state->fragment_instance_id(), mem_tracker);
-#endif
}
const ThreadContext::TaskType query_to_task_type(const TQueryType::type& query_type) {
@@ -223,10 +241,8 @@ public:
}
~AttachTaskThread() {
-#ifndef BE_TEST
thread_local_ctx.get()->detach();
DorisMetrics::instance()->attach_task_thread_count->increment(1);
-#endif
}
};
@@ -241,33 +257,55 @@ public:
}
private:
- bool _scope;
+ bool _scope = true;
};
+template <bool Existed>
class SwitchThreadMemTracker {
public:
explicit SwitchThreadMemTracker(const std::shared_ptr<MemTracker>& mem_tracker,
bool in_task = true) {
+ if (config::memory_verbose_track) {
#ifndef BE_TEST
- DCHECK(mem_tracker);
- // The thread tracker must be switched after the attach task, otherwise switching
- // in the main thread will cause the cached tracker not be cleaned up in time.
- DCHECK(in_task == false ||
- thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task());
- _old_tracker_id =
- thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker(mem_tracker);
+ DCHECK(mem_tracker);
+ // The thread tracker must be switched after the attach task, otherwise switching
+ // in the main thread will cause the cached tracker not be cleaned up in time.
+ DCHECK(in_task == false ||
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task());
+ if (Existed) {
+ _old_tracker_id =
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<true>(
+ mem_tracker);
+ } else {
+ _old_tracker_id =
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<false>(
+ mem_tracker);
+ }
#endif
+ }
}
~SwitchThreadMemTracker() {
+ if (config::memory_verbose_track) {
#ifndef BE_TEST
- thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
- DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
+ DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
#endif
+ }
}
-private:
- std::string _old_tracker_id;
+protected:
+ int64_t _old_tracker_id = 0;
+};
+
+class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker<false> {
+public:
+ explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr<MemTracker>& mem_tracker)
+ : SwitchThreadMemTracker<false>(mem_tracker, false) {}
+
+ ~SwitchThreadMemTrackerEndClear() {
+ thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
+ }
};
class SwitchThreadMemTrackerErrCallBack {
diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp
index fd06c17427..06fd521faf 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/thread_mem_tracker_mgr.cpp
@@ -37,9 +37,9 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std::
_temp_task_mem_tracker =
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(
task_id);
- update_tracker(_temp_task_mem_tracker);
+ update_tracker<false>(_temp_task_mem_tracker);
} else {
- update_tracker(mem_tracker);
+ update_tracker<false>(mem_tracker);
}
}
@@ -48,13 +48,15 @@ void ThreadMemTrackerMgr::detach_task() {
_fragment_instance_id = TUniqueId();
_consume_err_cb.init();
clear_untracked_mems();
- _tracker_id = "process";
+ _tracker_id = 0;
// The following memory changes for the two map operations of _untracked_mems and _mem_trackers
// will be re-recorded in _untracked_mem.
_untracked_mems.clear();
- _untracked_mems["process"] = 0;
+ _untracked_mems[0] = 0;
_mem_trackers.clear();
- _mem_trackers["process"] = MemTracker::get_process_tracker();
+ _mem_trackers[0] = MemTracker::get_process_tracker();
+ _mem_tracker_labels.clear();
+ _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
}
void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) {
@@ -72,7 +74,8 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details
void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status st) {
auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded(
- nullptr, "In TCMalloc Hook, " + _consume_err_cb.cancel_msg, mem_usage, st);
+ nullptr, fmt::format("In TCMalloc Hook, {}", _consume_err_cb.cancel_msg), mem_usage,
+ st);
if (_consume_err_cb.cb_func != nullptr) {
_consume_err_cb.cb_func();
}
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index 2b581dec79..0066dc0519 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -18,6 +18,7 @@
#pragma once
#include <fmt/format.h>
+#include <parallel_hashmap/phmap.h>
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@@ -61,9 +62,9 @@ inline thread_local bool start_thread_mem_tracker = false;
class ThreadMemTrackerMgr {
public:
ThreadMemTrackerMgr() {
- _mem_trackers["process"] = MemTracker::get_process_tracker();
- _untracked_mems["process"] = 0;
- _tracker_id = "process";
+ _mem_trackers[0] = MemTracker::get_process_tracker();
+ _untracked_mems[0] = 0;
+ _tracker_id = 0;
start_thread_mem_tracker = true;
}
~ThreadMemTrackerMgr() {
@@ -74,11 +75,15 @@ public:
void clear_untracked_mems() {
for (const auto& untracked_mem : _untracked_mems) {
if (untracked_mem.second != 0) {
- DCHECK(_mem_trackers[untracked_mem.first]);
- _mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
+ DCHECK(_mem_trackers[untracked_mem.first]) << ", label: " << _mem_tracker_labels[untracked_mem.first];
+ if (_mem_trackers[untracked_mem.first]) {
+ _mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
+ } else {
+ MemTracker::get_process_tracker()->consume(untracked_mem.second);
+ }
}
}
- _mem_trackers[_tracker_id]->consume(_untracked_mem);
+ mem_tracker()->consume(_untracked_mem);
_untracked_mem = 0;
}
@@ -91,14 +96,16 @@ public:
// Must be fast enough!
// Thread update_tracker may be called very frequently, adding a memory copy will be slow.
- std::string update_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
-
- void update_tracker_id(const std::string& tracker_id) {
- if (tracker_id != _tracker_id) {
- _untracked_mems[_tracker_id] += _untracked_mem;
- _untracked_mem = 0;
- _tracker_id = tracker_id;
- }
+ template <bool Existed>
+ int64_t update_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
+ void update_tracker_id(int64_t tracker_id);
+
+ void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
+ DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end());
+ _mem_trackers[mem_tracker->id()] = mem_tracker;
+ DCHECK(_mem_trackers[mem_tracker->id()]);
+ _untracked_mems[mem_tracker->id()] = 0;
+ _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
}
inline ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg,
@@ -124,8 +131,12 @@ public:
bool is_attach_task() { return _task_id != ""; }
std::shared_ptr<MemTracker> mem_tracker() {
- DCHECK(_mem_trackers[_tracker_id]);
- return _mem_trackers[_tracker_id];
+ DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
+ if (_mem_trackers[_tracker_id]) {
+ return _mem_trackers[_tracker_id];
+ } else {
+ return MemTracker::get_process_tracker();
+ }
}
private:
@@ -146,12 +157,15 @@ private:
// 2. The cost of calling consume for the current untracked mem is huge;
// In order to reduce the cost, during an attach task, the untracked mem of all switched trackers is cached,
// and the untracked mem is consumed only after the upper limit is reached or when the task is detached.
- std::unordered_map<std::string, std::shared_ptr<MemTracker>> _mem_trackers;
- std::string _tracker_id;
- std::unordered_map<std::string, int64_t> _untracked_mems;
+ // NOTE: flat_hash_map, int replaces string as key, all to improve the speed of map find,
+ // the expected speed is increased by more than 10 times.
+ phmap::flat_hash_map<int64_t, std::shared_ptr<MemTracker>> _mem_trackers;
+ int64_t _tracker_id;
+ phmap::flat_hash_map<int64_t, int64_t> _untracked_mems;
+ phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels;
// Avoid memory allocation in functions and fall into an infinite loop
- std::string _temp_tracker_id;
+ int64_t _temp_tracker_id;
ConsumeErrCallBackInfo _temp_consume_err_cb;
std::shared_ptr<MemTracker> _temp_task_mem_tracker;
@@ -160,23 +174,41 @@ private:
ConsumeErrCallBackInfo _consume_err_cb;
};
-inline std::string ThreadMemTrackerMgr::update_tracker(
- const std::shared_ptr<MemTracker>& mem_tracker) {
+template <bool Existed>
+inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
DCHECK(mem_tracker);
_temp_tracker_id = mem_tracker->id();
if (_temp_tracker_id == _tracker_id) {
return _tracker_id;
}
- if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) {
- _mem_trackers[_temp_tracker_id] = mem_tracker;
- _untracked_mems[_temp_tracker_id] = 0;
+ if (Existed) {
+ DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end());
+ } else {
+ if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) {
+ _mem_trackers[_temp_tracker_id] = mem_tracker;
+ DCHECK(_mem_trackers[_temp_tracker_id]);
+ _untracked_mems[_temp_tracker_id] = 0;
+ _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
+ }
}
+
_untracked_mems[_tracker_id] += _untracked_mem;
_untracked_mem = 0;
std::swap(_tracker_id, _temp_tracker_id);
+ DCHECK(_mem_trackers[_tracker_id]);
return _temp_tracker_id; // old tracker_id
}
+inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) {
+ if (tracker_id != _tracker_id) {
+ _untracked_mems[_tracker_id] += _untracked_mem;
+ _untracked_mem = 0;
+ _tracker_id = tracker_id;
+ DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
+ DCHECK(_mem_trackers[_tracker_id]);
+ }
+}
+
inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
_untracked_mem += size;
// When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
@@ -184,7 +216,8 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
// it will cause tracker->consumption to be temporarily less than 0.
if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
- DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end());
+ DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
+ start_thread_mem_tracker = false;
// When switching to the current tracker last time, the remaining untracked memory.
if (_untracked_mems[_tracker_id] != 0) {
_untracked_mem += _untracked_mems[_tracker_id];
@@ -192,19 +225,18 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
}
// Avoid getting stuck in infinite loop if there is memory allocation in noncache_consume.
// For example: GC function when try_consume; mem_limit_exceeded.
- start_thread_mem_tracker = false;
noncache_consume();
start_thread_mem_tracker = true;
}
}
inline void ThreadMemTrackerMgr::noncache_consume() {
- DCHECK(_mem_trackers[_tracker_id]);
- Status st = _mem_trackers[_tracker_id]->try_consume(_untracked_mem);
+ DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
+ Status st = mem_tracker()->try_consume(_untracked_mem);
if (!st) {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
// the consume to ensure the accuracy of the statistics.
- _mem_trackers[_tracker_id]->consume(_untracked_mem);
+ mem_tracker()->consume(_untracked_mem);
exceeded(_untracked_mem, st);
}
_untracked_mem = 0;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 1754acdae5..c222a7a5b5 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -20,7 +20,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_filter_mgr.h"
-#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "vec/core/materialize_block.h"
#include "vec/exprs/vexpr.h"
@@ -752,6 +751,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_hash_table_mem_tracker = MemTracker::create_virtual_tracker(-1, "VSetOperationNode:HashTable");
// Build phase
@@ -944,9 +944,10 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
}
Status HashJoinNode::open(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
- SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VExpr::open(_build_expr_ctxs, state));
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 0788303f87..0942e24758 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -22,7 +22,6 @@
#include "exec/exec_node.h"
#include "runtime/mem_pool.h"
#include "runtime/row_batch.h"
-#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_nullable.h"
@@ -203,15 +202,15 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
}
Status AggregationNode::prepare(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
_expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
_data_mem_tracker = MemTracker::create_virtual_tracker(-1, "AggregationNode:Data", mem_tracker());
-
- SCOPED_TIMER(_runtime_profile->total_time_counter());
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
@@ -332,9 +331,10 @@ Status AggregationNode::prepare(RuntimeState* state) {
}
Status AggregationNode::open(RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::open(state));
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute open.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute open.");
+ RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
@@ -368,8 +368,9 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
}
Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute get_next.");
if (_is_streaming_preagg) {
bool child_eos = false;
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 4d69716216..6861421e12 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -142,6 +142,7 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status VAnalyticEvalNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
_mem_pool.reset(new MemPool(mem_tracker().get()));
_evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime");
@@ -208,6 +209,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
Status VAnalyticEvalNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(child(0)->open(state));
@@ -234,6 +236,7 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vblocking_join_node.cpp b/be/src/vec/exec/vblocking_join_node.cpp
index 4ab17c0f8c..acfec89b26 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -22,7 +22,6 @@
#include "exprs/expr.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
namespace doris::vectorized {
@@ -40,6 +39,7 @@ Status VBlockingJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status VBlockingJoinNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_build_pool.reset(new MemPool(mem_tracker().get()));
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
@@ -76,8 +76,9 @@ void VBlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Stat
}
Status VBlockingJoinNode::open(RuntimeState* state) {
- RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+ RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vcross_join_node.cpp b/be/src/vec/exec/vcross_join_node.cpp
index d2fc21d295..01d96d69de 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -23,7 +23,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
namespace doris::vectorized {
@@ -34,6 +33,7 @@ VCrossJoinNode::VCrossJoinNode(ObjectPool* pool, const TPlanNode& tnode, const D
Status VCrossJoinNode::prepare(RuntimeState* state) {
DCHECK(_join_op == TJoinOp::CROSS_JOIN);
RETURN_IF_ERROR(VBlockingJoinNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
_block_mem_tracker = MemTracker::create_virtual_tracker(-1, "VCrossJoinNode:Block", mem_tracker());
_num_existing_columns = child(0)->row_desc().num_materialized_slots();
@@ -90,8 +90,9 @@ void VCrossJoinNode::init_get_next(int left_batch_row) {
Status VCrossJoinNode::get_next(RuntimeState* state, Block* block, bool* eos) {
RETURN_IF_CANCELLED(state);
- *eos = false;
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
+ *eos = false;
if (_eos) {
*eos = true;
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index d6109e0541..604dda8e72 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -21,7 +21,6 @@
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_filter_mgr.h"
-#include "runtime/thread_context.h"
#include "util/priority_thread_pool.hpp"
#include "vec/core/block.h"
#include "vec/exec/volap_scanner.h"
@@ -147,6 +146,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker());
+ ADD_THREAD_LOCAL_MEM_TRACKER(scanner->mem_tracker());
int64_t wait_time = scanner->update_wait_worker_timer();
// Do not use ScopedTimer. There is no guarantee that, the counter
// (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`.
@@ -444,6 +444,7 @@ Status VOlapScanNode::close(RuntimeState* state) {
Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
// check if Canceled.
if (state->is_cancelled()) {
diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp
index 3850a8ab0c..c186e9a5c7 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -40,6 +40,7 @@ VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, b
Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bool* eof) {
// only empty block should be here
DCHECK(block->rows() == 0);
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp
index bd7856f29e..14cb8dec65 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -17,7 +17,6 @@
#include "vec/exec/vset_operation_node.h"
-#include "runtime/thread_context.h"
#include "util/defer_op.h"
#include "vec/exprs/vexpr.h"
namespace doris {
@@ -115,6 +114,7 @@ Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status VSetOperationNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(ExecNode::open(state));
// open result expr lists.
for (const std::vector<VExprContext*>& exprs : _child_expr_lists) {
@@ -125,9 +125,10 @@ Status VSetOperationNode::open(RuntimeState* state) {
}
Status VSetOperationNode::prepare(RuntimeState* state) {
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_hash_table_mem_tracker = MemTracker::create_virtual_tracker(-1, "VSetOperationNode:HashTable");
- SCOPED_TIMER(_runtime_profile->total_time_counter());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 4b5197a497..8e963262fe 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -43,6 +43,7 @@ Status VSortNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
_runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : "true");
RETURN_IF_ERROR(ExecNode::prepare(state));
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_block_mem_tracker = MemTracker::create_virtual_tracker(-1, "VSortNode:Block", mem_tracker());
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor,
expr_mem_tracker()));
@@ -51,6 +52,7 @@ Status VSortNode::prepare(RuntimeState* state) {
Status VSortNode::open(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_CANCELLED(state);
@@ -76,6 +78,7 @@ Status VSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
auto status = Status::OK();
if (_sorted_blocks.empty()) {
diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp
index 1a8c3f15ee..06d3d2f2be 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -17,6 +17,7 @@
#include "vec/exprs/vexpr_context.h"
+#include "runtime/thread_context.h"
#include "udf/udf_internal.h"
#include "vec/exprs/vexpr.h"
@@ -40,6 +41,7 @@ doris::Status VExprContext::prepare(doris::RuntimeState* state,
const std::shared_ptr<doris::MemTracker>& tracker) {
_prepared = true;
_mem_tracker = tracker;
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_pool.reset(new MemPool(_mem_tracker.get()));
return _root->prepare(state, row_desc, this);
}
@@ -49,6 +51,7 @@ doris::Status VExprContext::open(doris::RuntimeState* state) {
if (_opened) {
return Status::OK();
}
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_opened = true;
// Fragment-local state is only initialized for original contexts. Clones inherit the
// original's fragment state and only need to have thread-local state initialized.
@@ -77,6 +80,7 @@ doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) {
DCHECK(_prepared);
DCHECK(_opened);
DCHECK(*new_ctx == nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
*new_ctx = state->obj_pool()->add(new VExprContext(_root));
(*new_ctx)->_pool.reset(new MemPool(_pool->mem_tracker()));
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 4767127174..180c31838c 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -19,6 +19,7 @@
#include "gen_cpp/data.pb.h"
#include "runtime/mem_tracker.h"
+#include "runtime/thread_context.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
@@ -261,6 +262,7 @@ VDataStreamRecvr::VDataStreamRecvr(
_mem_tracker =
MemTracker::create_tracker(-1, "VDataStreamRecvr:" + print_id(_fragment_instance_id),
nullptr, MemTrackerLevel::VERBOSE, _profile);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
_block_mem_tracker = MemTracker::create_virtual_tracker(
-1, "VDataStreamRecvr:block:" + print_id(_fragment_instance_id), _mem_tracker);
@@ -293,6 +295,7 @@ Status VDataStreamRecvr::create_merger(const std::vector<VExprContext*>& orderin
const std::vector<bool>& nulls_first, size_t batch_size,
int64_t limit, size_t offset) {
DCHECK(_is_merging);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::vector<BlockSupplier> child_block_suppliers;
// Create the merger that will a single stream of sorted rows.
_merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit,
@@ -308,16 +311,19 @@ Status VDataStreamRecvr::create_merger(const std::vector<VExprContext*>& orderin
void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done);
}
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->add_block(block, use_move);
}
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
if (!_is_merging) {
Block* res = nullptr;
RETURN_IF_ERROR(_sender_queues[0]->get_batch(&res));
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index 46df1dc47c..f5882ed38f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -27,6 +27,7 @@
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
#include "util/proto_util.h"
#include "vec/common/sip_hash.h"
#include "vec/runtime/vdata_stream_mgr.h"
@@ -343,6 +344,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_mem_tracker = MemTracker::create_tracker(
-1, "VDataStreamSender:" + print_id(state->fragment_instance_id()),
state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _profile);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
std::random_device rd;
@@ -376,6 +378,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
Status VDataStreamSender::open(RuntimeState* state) {
DCHECK(state != nullptr);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
for (auto iter : _partition_infos) {
RETURN_IF_ERROR(iter->open(state));
@@ -389,6 +392,7 @@ Status VDataStreamSender::send(RuntimeState* state, RowBatch* batch) {
Status VDataStreamSender::send(RuntimeState* state, Block* block) {
SCOPED_TIMER(_profile->total_time_counter());
+ SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
// 1. serialize depends on it is not local exchange
// 2. send block
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index e8d3cc0f5f..58bb03c69c 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -17,6 +17,7 @@
#include "vec/sink/vtablet_sink.h"
+#include "runtime/thread_context.h"
#include "util/doris_metrics.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
@@ -55,6 +56,7 @@ Status VOlapTableSink::open(RuntimeState* state) {
}
Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) {
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
Status status = Status::OK();
auto rows = input_block->rows();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org