You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2022/10/11 11:47:21 UTC
[doris] branch master updated: [enhancement](memtracker) Add independent and unique scanner mem tracker for each query (#13262)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new df54c6b63a [enhancement](memtracker) Add independent and unique scanner mem tracker for each query (#13262)
df54c6b63a is described below
commit df54c6b63af4570f9f276fd885d94420159e8128
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Tue Oct 11 19:47:12 2022 +0800
[enhancement](memtracker) Add independent and unique scanner mem tracker for each query (#13262)
---
be/src/runtime/memory/mem_tracker_task_pool.cpp | 20 ++++++++++++++++----
be/src/runtime/memory/mem_tracker_task_pool.h | 4 ++++
be/src/runtime/runtime_state.cpp | 7 +++++++
be/src/runtime/runtime_state.h | 6 +++++-
be/src/runtime/thread_context.cpp | 7 ++++---
be/src/runtime/thread_context.h | 24 ++++++++++++------------
be/src/vec/exec/scan/new_es_scan_node.cpp | 7 ++-----
be/src/vec/exec/scan/new_es_scan_node.h | 2 --
be/src/vec/exec/scan/new_es_scanner.cpp | 10 +++-------
be/src/vec/exec/scan/new_es_scanner.h | 4 ++--
be/src/vec/exec/scan/new_file_arrow_scanner.cpp | 16 ++++++----------
be/src/vec/exec/scan/new_file_arrow_scanner.h | 12 ++++++------
be/src/vec/exec/scan/new_file_scan_node.cpp | 13 ++++---------
be/src/vec/exec/scan/new_file_scan_node.h | 1 -
be/src/vec/exec/scan/new_file_scanner.cpp | 8 +++-----
be/src/vec/exec/scan/new_file_scanner.h | 2 +-
be/src/vec/exec/scan/new_file_text_scanner.cpp | 5 ++---
be/src/vec/exec/scan/new_file_text_scanner.h | 4 ++--
be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 5 ++---
be/src/vec/exec/scan/new_jdbc_scan_node.h | 2 --
be/src/vec/exec/scan/new_jdbc_scanner.cpp | 7 ++-----
be/src/vec/exec/scan/new_jdbc_scanner.h | 4 ++--
be/src/vec/exec/scan/new_odbc_scan_node.cpp | 4 +---
be/src/vec/exec/scan/new_odbc_scan_node.h | 1 -
be/src/vec/exec/scan/new_odbc_scanner.cpp | 6 ++----
be/src/vec/exec/scan/new_odbc_scanner.h | 2 +-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 7 +++----
be/src/vec/exec/scan/new_olap_scan_node.h | 2 --
be/src/vec/exec/scan/new_olap_scanner.cpp | 7 ++-----
be/src/vec/exec/scan/new_olap_scanner.h | 2 +-
be/src/vec/exec/scan/scanner_scheduler.cpp | 6 ++++--
be/src/vec/exec/scan/vfile_scanner.cpp | 6 ++----
be/src/vec/exec/scan/vfile_scanner.h | 2 +-
be/src/vec/exec/scan/vscanner.cpp | 4 +---
be/src/vec/exec/scan/vscanner.h | 3 +--
be/src/vec/exec/volap_scan_node.cpp | 13 ++++++-------
be/src/vec/exec/volap_scan_node.h | 2 --
be/src/vec/exec/volap_scanner.cpp | 9 ++-------
be/src/vec/exec/volap_scanner.h | 4 +---
39 files changed, 113 insertions(+), 137 deletions(-)
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 143e7486fa..0eff6c7460 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -47,16 +47,28 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_task_mem_tracker
std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_mem_tracker(
const std::string& query_id, int64_t mem_limit) {
- return register_task_mem_tracker_impl(query_id, mem_limit,
- fmt::format("Query#queryId={}", query_id),
+ return register_task_mem_tracker_impl(query_id, mem_limit, fmt::format("Query#Id={}", query_id),
+ ExecEnv::GetInstance()->query_pool_mem_tracker());
+}
+
+std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_scanner_mem_tracker(
+ const std::string& query_id) {
+ return register_task_mem_tracker_impl("Scanner#" + query_id, -1,
+ fmt::format("Scanner#Query#Id={}", query_id),
ExecEnv::GetInstance()->query_pool_mem_tracker());
}
std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_mem_tracker(
const std::string& load_id, int64_t mem_limit) {
// In load, the query id of the fragment is executed, which is the same as the load id of the load channel.
- return register_task_mem_tracker_impl(load_id, mem_limit,
- fmt::format("Load#queryId={}", load_id),
+ return register_task_mem_tracker_impl(load_id, mem_limit, fmt::format("Load#Id={}", load_id),
+ ExecEnv::GetInstance()->load_pool_mem_tracker());
+}
+
+std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_scanner_mem_tracker(
+ const std::string& load_id) {
+ return register_task_mem_tracker_impl("Scanner#" + load_id, -1,
+ fmt::format("Scanner#Load#Id={}", load_id),
ExecEnv::GetInstance()->load_pool_mem_tracker());
}
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h
index f8c5039eab..4d3a514f6d 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.h
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -43,8 +43,12 @@ public:
const std::shared_ptr<MemTrackerLimiter>& parent);
std::shared_ptr<MemTrackerLimiter> register_query_mem_tracker(const std::string& query_id,
int64_t mem_limit);
+ std::shared_ptr<MemTrackerLimiter> register_query_scanner_mem_tracker(
+ const std::string& query_id);
std::shared_ptr<MemTrackerLimiter> register_load_mem_tracker(const std::string& load_id,
int64_t mem_limit);
+ std::shared_ptr<MemTrackerLimiter> register_load_scanner_mem_tracker(
+ const std::string& load_id);
std::shared_ptr<MemTrackerLimiter> get_task_mem_tracker(const std::string& task_id);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 646ae960ca..764bf1471f 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -233,14 +233,21 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
_query_mem_tracker =
_exec_env->task_pool_mem_tracker_registry()->register_query_mem_tracker(
print_id(query_id), bytes_limit);
+ _scanner_mem_tracker =
+ _exec_env->task_pool_mem_tracker_registry()->register_query_scanner_mem_tracker(
+ print_id(query_id));
} else if (query_type() == TQueryType::LOAD) {
_query_mem_tracker = _exec_env->task_pool_mem_tracker_registry()->register_load_mem_tracker(
print_id(query_id), bytes_limit);
+ _scanner_mem_tracker =
+ _exec_env->task_pool_mem_tracker_registry()->register_load_scanner_mem_tracker(
+ print_id(query_id));
} else {
DCHECK(false);
_query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker();
}
_query_mem_tracker->enable_reset_zero();
+ _scanner_mem_tracker->enable_reset_zero();
_instance_mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, "RuntimeState:instance:" + print_id(_fragment_instance_id), _query_mem_tracker,
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 1b7042815a..71831667d0 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -124,6 +124,7 @@ public:
ExecEnv* exec_env() { return _exec_env; }
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return _query_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> instance_mem_tracker() { return _instance_mem_tracker; }
+ std::shared_ptr<MemTrackerLimiter> scanner_mem_tracker() { return _scanner_mem_tracker; }
ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; }
void set_fragment_root_id(PlanNodeId id) {
@@ -400,9 +401,12 @@ private:
// MemTracker that is shared by all fragment instances running on this host.
// The query mem tracker must be released after the _instance_mem_tracker.
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
-
// Memory usage of this fragment instance
std::shared_ptr<MemTrackerLimiter> _instance_mem_tracker;
+ // Count the memory consumption of Scanner, independent and unique for each query,
+ // this means that scnner memory does not count into query mem tracker,
+ // label is `Scanner#{queryId}`.
+ std::shared_ptr<MemTrackerLimiter> _scanner_mem_tracker;
// put runtime state before _obj_pool, so that it will be deconstructed after
// _obj_pool. Because some of object in _obj_pool will use profile when deconstructing.
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index 4a02c45560..432c5be879 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -42,9 +42,10 @@ AttachTask::AttachTask(RuntimeState* runtime_state) {
DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
#endif // BE_TEST
DCHECK(runtime_state->instance_mem_tracker());
- thread_context()->attach_task(
- query_to_task_type(runtime_state->query_type()), print_id(runtime_state->query_id()),
- runtime_state->fragment_instance_id(), runtime_state->instance_mem_tracker());
+ thread_context()->attach_task(ThreadContext::query_to_task_type(runtime_state->query_type()),
+ print_id(runtime_state->query_id()),
+ runtime_state->fragment_instance_id(),
+ runtime_state->instance_mem_tracker());
}
AttachTask::~AttachTask() {
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 26030ee2e6..46b2e86d44 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -159,6 +159,18 @@ public:
const std::string& thread_id_str() const { return _thread_id; }
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
+ static TaskType query_to_task_type(const TQueryType::type& query_type) {
+ switch (query_type) {
+ case TQueryType::SELECT:
+ return TaskType::QUERY;
+ case TQueryType::LOAD:
+ return TaskType::LOAD;
+ default:
+ DCHECK(false);
+ return TaskType::UNKNOWN;
+ }
+ }
+
std::string get_thread_id() {
std::stringstream ss;
ss << std::this_thread::get_id();
@@ -206,18 +218,6 @@ public:
explicit AttachTask(RuntimeState* runtime_state);
- const ThreadContext::TaskType query_to_task_type(const TQueryType::type& query_type) {
- switch (query_type) {
- case TQueryType::SELECT:
- return ThreadContext::TaskType::QUERY;
- case TQueryType::LOAD:
- return ThreadContext::TaskType::LOAD;
- default:
- DCHECK(false);
- return ThreadContext::TaskType::UNKNOWN;
- }
- }
-
~AttachTask();
};
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp
index 31e439281d..dcad57a418 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -49,7 +49,6 @@ NewEsScanNode::NewEsScanNode(ObjectPool* pool, const TPlanNode& tnode, const Des
: VScanNode(pool, tnode, descs),
_tuple_id(tnode.es_scan_node.tuple_id),
_tuple_desc(nullptr),
- _scanner_mem_tracker(nullptr),
_es_profile(nullptr) {
_output_tuple_id = tnode.es_scan_node.tuple_id;
}
@@ -78,7 +77,6 @@ Status NewEsScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
RETURN_IF_ERROR(VScanNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- _scanner_mem_tracker = std::make_unique<MemTracker>("NewEsScanner");
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
@@ -203,9 +201,8 @@ Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) {
properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(
properties, _column_names, _predicates, _docvalue_context, &doc_value_mode);
- NewEsScanner* scanner =
- new NewEsScanner(_state, this, _limit_per_scanner, _mem_tracker.get(), _tuple_id,
- properties, _docvalue_context, doc_value_mode);
+ NewEsScanner* scanner = new NewEsScanner(_state, this, _limit_per_scanner, _tuple_id,
+ properties, _docvalue_context, doc_value_mode);
_scanner_pool.add(scanner);
RETURN_IF_ERROR(scanner->prepare(_state));
diff --git a/be/src/vec/exec/scan/new_es_scan_node.h b/be/src/vec/exec/scan/new_es_scan_node.h
index 88586f7383..55aab31dcc 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.h
+++ b/be/src/vec/exec/scan/new_es_scan_node.h
@@ -59,8 +59,6 @@ private:
std::vector<int> _predicate_to_conjunct;
std::vector<int> _conjunct_to_predicate;
- std::unique_ptr<MemTracker> _scanner_mem_tracker;
-
// Profile
std::unique_ptr<RuntimeProfile> _es_profile;
RuntimeProfile::Counter* _rows_read_counter;
diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp
index 03f4526a23..cfde06ab3c 100644
--- a/be/src/vec/exec/scan/new_es_scanner.cpp
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -24,11 +24,10 @@ static const std::string NEW_SCANNER_TYPE = "NewEsScanner";
namespace doris::vectorized {
NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit,
- MemTracker* mem_tracker, TupleId tuple_id,
- const std::map<std::string, std::string>& properties,
+ TupleId tuple_id, const std::map<std::string, std::string>& properties,
const std::map<std::string, std::string>& docvalue_context,
bool doc_value_mode)
- : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+ : VScanner(state, static_cast<VScanNode*>(parent), limit),
_is_init(false),
_es_eof(false),
_properties(properties),
@@ -53,8 +52,6 @@ Status NewEsScanner::prepare(RuntimeState* state) {
return Status::InternalError("input pointer is null.");
}
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (nullptr == _tuple_desc) {
return Status::InternalError("Failed to get tuple descriptor, tuple_id={}", _tuple_id);
@@ -83,10 +80,9 @@ Status NewEsScanner::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VScanner::open(state));
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(_es_reader->open());
- _mem_pool.reset(new MemPool(_mem_tracker));
+ _mem_pool.reset(new MemPool());
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h
index 2e776f08c9..be4d50448b 100644
--- a/be/src/vec/exec/scan/new_es_scanner.h
+++ b/be/src/vec/exec/scan/new_es_scanner.h
@@ -28,8 +28,8 @@ class NewEsScanNode;
class NewEsScanner : public VScanner {
public:
- NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, MemTracker* mem_tracker,
- TupleId tuple_id, const std::map<std::string, std::string>& properties,
+ NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, TupleId tuple_id,
+ const std::map<std::string, std::string>& properties,
const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode);
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
index 9355e36a4c..77506dafb9 100644
--- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
+++ b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
@@ -28,9 +28,9 @@ namespace doris::vectorized {
NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent,
int64_t limit, const TFileScanRange& scan_range,
- MemTracker* tracker, RuntimeProfile* profile,
+ RuntimeProfile* profile,
const std::vector<TExpr>& pre_filter_texprs)
- : NewFileScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs),
+ : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs),
_cur_file_reader(nullptr),
_cur_file_eof(false),
_batch(nullptr),
@@ -39,7 +39,6 @@ NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* p
Status NewFileArrowScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(NewFileScanner::open(state));
// SCOPED_TIMER(_parent->_reader_init_timer);
- // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
// _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false);
return Status::OK();
@@ -239,10 +238,9 @@ Status NewFileArrowScanner::_open_next_reader() {
NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent,
int64_t limit, const TFileScanRange& scan_range,
- MemTracker* tracker, RuntimeProfile* profile,
+ RuntimeProfile* profile,
const std::vector<TExpr>& pre_filter_texprs)
- : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile,
- pre_filter_texprs) {
+ : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) {
// _init_profiles(profile);
}
@@ -254,11 +252,9 @@ ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(
}
NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
- RuntimeProfile* profile,
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
const std::vector<TExpr>& pre_filter_texprs)
- : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile,
- pre_filter_texprs) {}
+ : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) {}
ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(
const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h
index 89e76f6623..281373a70d 100644
--- a/be/src/vec/exec/scan/new_file_arrow_scanner.h
+++ b/be/src/vec/exec/scan/new_file_arrow_scanner.h
@@ -28,8 +28,8 @@ namespace doris::vectorized {
class NewFileArrowScanner : public NewFileScanner {
public:
NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
- RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs);
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
+ const std::vector<TExpr>& pre_filter_texprs);
Status open(RuntimeState* state) override;
protected:
@@ -59,8 +59,8 @@ private:
class NewFileParquetScanner final : public NewFileArrowScanner {
public:
NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
- RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs);
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
+ const std::vector<TExpr>& pre_filter_texprs);
~NewFileParquetScanner() override = default;
@@ -75,8 +75,8 @@ protected:
class NewFileORCScanner final : public NewFileArrowScanner {
public:
NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
- RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs);
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
+ const std::vector<TExpr>& pre_filter_texprs);
~NewFileORCScanner() override = default;
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 19efa0a555..039082ab1c 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -39,7 +39,6 @@ Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
Status NewFileScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
- _scanner_mem_tracker = std::make_unique<MemTracker>("NewFileScanners");
return Status::OK();
}
@@ -105,26 +104,22 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
VScanner* scanner = nullptr;
if (config::enable_new_file_scanner) {
- scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range,
- _scanner_mem_tracker.get(), runtime_profile());
+ scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile());
((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range);
} else {
switch (scan_range.params.format_type) {
case TFileFormatType::FORMAT_PARQUET:
scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range,
- _scanner_mem_tracker.get(), runtime_profile(),
- std::vector<TExpr>());
+ runtime_profile(), std::vector<TExpr>());
break;
case TFileFormatType::FORMAT_ORC:
scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range,
- _scanner_mem_tracker.get(), runtime_profile(),
- std::vector<TExpr>());
+ runtime_profile(), std::vector<TExpr>());
break;
default:
scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range,
- _scanner_mem_tracker.get(), runtime_profile(),
- std::vector<TExpr>());
+ runtime_profile(), std::vector<TExpr>());
break;
}
((NewFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get());
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h
index 5e08d05ae1..53b11e408d 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -41,6 +41,5 @@ private:
private:
std::vector<TScanRangeParams> _scan_ranges;
- std::unique_ptr<MemTracker> _scanner_mem_tracker;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scanner.cpp b/be/src/vec/exec/scan/new_file_scanner.cpp
index 6acfe2810f..6e511fe10f 100644
--- a/be/src/vec/exec/scan/new_file_scanner.cpp
+++ b/be/src/vec/exec/scan/new_file_scanner.cpp
@@ -35,9 +35,9 @@
namespace doris::vectorized {
NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
- RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs)
- : VScanner(state, static_cast<VScanNode*>(parent), limit, tracker),
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
+ const std::vector<TExpr>& pre_filter_texprs)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit),
_params(scan_range.params),
_ranges(scan_range.ranges),
_next_range(0),
@@ -53,8 +53,6 @@ Status NewFileScanner::open(RuntimeState* state) {
}
Status NewFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-
if (vconjunct_ctx_ptr != nullptr) {
// Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
diff --git a/be/src/vec/exec/scan/new_file_scanner.h b/be/src/vec/exec/scan/new_file_scanner.h
index e8534416d9..50423bd3e6 100644
--- a/be/src/vec/exec/scan/new_file_scanner.h
+++ b/be/src/vec/exec/scan/new_file_scanner.h
@@ -30,7 +30,7 @@ class NewFileScanNode;
class NewFileScanner : public VScanner {
public:
NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile,
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
const std::vector<TExpr>& pre_filter_texprs);
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/new_file_text_scanner.cpp b/be/src/vec/exec/scan/new_file_text_scanner.cpp
index a1e2cc9b56..1222d1d6d9 100644
--- a/be/src/vec/exec/scan/new_file_text_scanner.cpp
+++ b/be/src/vec/exec/scan/new_file_text_scanner.cpp
@@ -25,10 +25,9 @@
namespace doris::vectorized {
NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
- RuntimeProfile* profile,
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
const std::vector<TExpr>& pre_filter_texprs)
- : NewFileScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs),
+ : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_line_reader_eof(false),
diff --git a/be/src/vec/exec/scan/new_file_text_scanner.h b/be/src/vec/exec/scan/new_file_text_scanner.h
index ab12389450..ccb92a44c3 100644
--- a/be/src/vec/exec/scan/new_file_text_scanner.h
+++ b/be/src/vec/exec/scan/new_file_text_scanner.h
@@ -29,8 +29,8 @@ namespace doris::vectorized {
class NewFileTextScanner : public NewFileScanner {
public:
NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
- RuntimeProfile* profile, const std::vector<TExpr>& pre_filter_texprs);
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
+ const std::vector<TExpr>& pre_filter_texprs);
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
index da76859915..5f209a09c2 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp
@@ -38,7 +38,6 @@ Status NewJdbcScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VNewJdbcScanNode::Prepare";
RETURN_IF_ERROR(VScanNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- _scanner_mem_tracker = std::make_unique<MemTracker>("NewJdbcScanners");
return Status::OK();
}
@@ -51,8 +50,8 @@ Status NewJdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
if (_eos == true) {
return Status::OK();
}
- NewJdbcScanner* scanner = new NewJdbcScanner(
- _state, this, _limit_per_scanner, _scanner_mem_tracker.get(), _tuple_id, _query_string);
+ NewJdbcScanner* scanner =
+ new NewJdbcScanner(_state, this, _limit_per_scanner, _tuple_id, _query_string);
_scanner_pool.add(scanner);
RETURN_IF_ERROR(scanner->prepare(_state));
scanners->push_back(static_cast<VScanner*>(scanner));
diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.h b/be/src/vec/exec/scan/new_jdbc_scan_node.h
index 287522fc0d..7463e55daf 100644
--- a/be/src/vec/exec/scan/new_jdbc_scan_node.h
+++ b/be/src/vec/exec/scan/new_jdbc_scan_node.h
@@ -38,8 +38,6 @@ private:
std::string _table_name;
TupleId _tuple_id;
std::string _query_string;
-
- std::unique_ptr<MemTracker> _scanner_mem_tracker;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index c571714e94..61c9ff53c5 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -21,8 +21,8 @@
namespace doris::vectorized {
NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
- MemTracker* mem_tracker, TupleId tuple_id, std::string query_string)
- : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+ TupleId tuple_id, std::string query_string)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit),
_is_init(false),
_jdbc_eos(false),
_tuple_id(tuple_id),
@@ -39,8 +39,6 @@ Status NewJdbcScanner::prepare(RuntimeState* state) {
return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare.");
}
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
@@ -83,7 +81,6 @@ Status NewJdbcScanner::open(RuntimeState* state) {
}
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VScanner::open(state));
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(_jdbc_connector->open());
RETURN_IF_ERROR(_jdbc_connector->query());
return Status::OK();
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h
index 75dabcacfa..984e239c2d 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.h
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.h
@@ -26,8 +26,8 @@ namespace doris {
namespace vectorized {
class NewJdbcScanner : public VScanner {
public:
- NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
- MemTracker* mem_tracker, TupleId tuple_id, std::string query_string);
+ NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, TupleId tuple_id,
+ std::string query_string);
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
index 48043dd22f..571566b88a 100644
--- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
@@ -39,7 +39,6 @@ Status NewOdbcScanNode::prepare(RuntimeState* state) {
VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
RETURN_IF_ERROR(VScanNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- _scanner_mem_tracker = std::make_unique<MemTracker>("NewOdbcScanner");
return Status::OK();
}
@@ -52,8 +51,7 @@ Status NewOdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
if (_eos == true) {
return Status::OK();
}
- NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner,
- _scanner_mem_tracker.get(), _odbc_scan_node);
+ NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, _odbc_scan_node);
_scanner_pool.add(scanner);
RETURN_IF_ERROR(scanner->prepare(_state));
scanners->push_back(static_cast<VScanner*>(scanner));
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h b/be/src/vec/exec/scan/new_odbc_scan_node.h
index 40d2bdd4bd..03d18ce58f 100644
--- a/be/src/vec/exec/scan/new_odbc_scan_node.h
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.h
@@ -37,6 +37,5 @@ protected:
private:
std::string _table_name;
TOdbcScanNode _odbc_scan_node;
- std::unique_ptr<MemTracker> _scanner_mem_tracker;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp
index ee383ac636..a69ecb1f32 100644
--- a/be/src/vec/exec/scan/new_odbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp
@@ -26,8 +26,8 @@ static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner";
namespace doris::vectorized {
NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit,
- MemTracker* mem_tracker, const TOdbcScanNode& odbc_scan_node)
- : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+ const TOdbcScanNode& odbc_scan_node)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit),
_is_init(false),
_odbc_eof(false),
_table_name(odbc_scan_node.table_name),
@@ -47,7 +47,6 @@ Status NewOdbcScanner::prepare(RuntimeState* state) {
return Status::InternalError("input pointer is null.");
}
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -89,7 +88,6 @@ Status NewOdbcScanner::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(VScanner::open(state));
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
RETURN_IF_ERROR(_odbc_connector->open());
RETURN_IF_ERROR(_odbc_connector->query());
// check materialize slot num
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h b/be/src/vec/exec/scan/new_odbc_scanner.h
index 34cedb8095..0b8d28ed99 100644
--- a/be/src/vec/exec/scan/new_odbc_scanner.h
+++ b/be/src/vec/exec/scan/new_odbc_scanner.h
@@ -26,7 +26,7 @@ namespace doris::vectorized {
class NewOdbcScanner : public VScanner {
public:
NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit,
- MemTracker* mem_tracker, const TOdbcScanNode& odbc_scan_node);
+ const TOdbcScanNode& odbc_scan_node);
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 8242abef77..3fdf0770af 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -38,7 +38,6 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode,
Status NewOlapScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- _scanner_mem_tracker = std::make_unique<MemTracker>("OlapScanners");
return Status::OK();
}
@@ -307,9 +306,9 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
scanner_ranges.push_back((*ranges)[i].get());
}
- NewOlapScanner* scanner = new NewOlapScanner(
- _state, this, _limit_per_scanner, _olap_scan_node.is_preaggregation,
- _need_agg_finalize, *scan_range, _scanner_mem_tracker.get());
+ NewOlapScanner* scanner = new NewOlapScanner(_state, this, _limit_per_scanner,
+ _olap_scan_node.is_preaggregation,
+ _need_agg_finalize, *scan_range);
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare failed.
_scanner_pool.add(scanner);
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h
index faea367089..2315d67047 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -57,8 +57,6 @@ private:
OlapScanKeys _scan_keys;
std::vector<TCondition> _olap_filters;
- std::unique_ptr<MemTracker> _scanner_mem_tracker;
-
private:
std::unique_ptr<RuntimeProfile> _segment_profile;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp
index ea07536a76..ec33346b04 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -25,8 +25,8 @@ namespace doris::vectorized {
NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit,
bool aggregation, bool need_agg_finalize,
- const TPaloScanRange& scan_range, MemTracker* tracker)
- : VScanner(state, static_cast<VScanNode*>(parent), limit, tracker),
+ const TPaloScanRange& scan_range)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit),
_aggregation(aggregation),
_need_agg_finalize(need_agg_finalize),
_version(-1) {
@@ -38,8 +38,6 @@ Status NewOlapScanner::prepare(
VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
-
if (vconjunct_ctx_ptr != nullptr) {
// Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
@@ -114,7 +112,6 @@ Status NewOlapScanner::prepare(
Status NewOlapScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::open(state));
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
auto res = _tablet_reader->init(_tablet_reader_params);
if (!res.ok()) {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h
index 41b2888879..6b07438bf1 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -34,7 +34,7 @@ class NewOlapScanNode;
class NewOlapScanner : public VScanner {
public:
NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t limit, bool aggregation,
- bool need_agg_finalize, const TPaloScanRange& scan_range, MemTracker* tracker);
+ bool need_agg_finalize, const TPaloScanRange& scan_range);
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 9787ecae5d..b561fca483 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -185,8 +185,10 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
VScanner* scanner) {
INIT_AND_SCOPE_REENTRANT_SPAN_IF(ctx->state()->enable_profile(), ctx->state()->get_tracer(),
ctx->scan_span(), "VScanner::scan");
- SCOPED_ATTACH_TASK(scanner->runtime_state());
-
+ SCOPED_ATTACH_TASK(scanner->runtime_state()->scanner_mem_tracker(),
+ ThreadContext::query_to_task_type(scanner->runtime_state()->query_type()),
+ print_id(scanner->runtime_state()->query_id()),
+ scanner->runtime_state()->fragment_instance_id());
Thread::set_self_name("_scanner_scan");
scanner->update_wait_worker_timer();
// Do not use ScopedTimer. There is no guarantee that, the counter
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 6c798c77c4..ef10257735 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -37,9 +37,8 @@
namespace doris::vectorized {
VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker,
- RuntimeProfile* profile)
- : VScanner(state, static_cast<VScanNode*>(parent), limit, tracker),
+ const TFileScanRange& scan_range, RuntimeProfile* profile)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit),
_params(scan_range.params),
_ranges(scan_range.ranges),
_next_range(0),
@@ -52,7 +51,6 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
Status VFileScanner::prepare(
VExprContext** vconjunct_ctx_ptr,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
_colname_to_value_range = colname_to_value_range;
_get_block_timer = ADD_TIMER(_parent->_scanner_profile, "FileScannerGetBlockTime");
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index cbb4a95707..2bd019b955 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -43,7 +43,7 @@ struct ScannerCounter {
class VFileScanner : public VScanner {
public:
VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, MemTracker* tracker, RuntimeProfile* profile);
+ const TFileScanRange& scan_range, RuntimeProfile* profile);
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp
index f92e291e33..fdda015c20 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -21,11 +21,10 @@
namespace doris::vectorized {
-VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTracker* mem_tracker)
+VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit)
: _state(state),
_parent(parent),
_limit(limit),
- _mem_tracker(mem_tracker),
_input_tuple_desc(parent->input_tuple_desc()),
_output_tuple_desc(parent->output_tuple_desc()) {
_real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc;
@@ -36,7 +35,6 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTra
Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
// only empty block should be here
DCHECK(block->rows() == 0);
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
if (!block->mem_reuse()) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index d5d8025256..49b9c780bd 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -30,7 +30,7 @@ class VScanNode;
class VScanner {
public:
- VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, MemTracker* mem_tracker);
+ VScanner(RuntimeState* state, VScanNode* parent, int64_t limit);
virtual ~VScanner() {}
@@ -117,7 +117,6 @@ protected:
VScanNode* _parent;
// Set if scan node has sort limit info
int64_t _limit = -1;
- MemTracker* _mem_tracker;
const TupleDescriptor* _input_tuple_desc = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 26925a78fe..0b7ba331af 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -209,8 +209,6 @@ Status VOlapScanNode::prepare(RuntimeState* state) {
_init_counter(state);
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
- _scanner_mem_tracker = std::make_unique<MemTracker>("OlapScanners");
-
if (_tuple_desc == nullptr) {
// TODO: make sure we print all available diagnostic output to our error log
return Status::InternalError("Failed to get tuple descriptor.");
@@ -389,8 +387,10 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
}
void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
- // SCOPED_ATTACH_TASK(_runtime_state); // TODO Recorded on an independent tracker
- SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+ SCOPED_ATTACH_TASK(_runtime_state->scanner_mem_tracker(),
+ ThreadContext::query_to_task_type(_runtime_state->query_type()),
+ print_id(_runtime_state->query_id()),
+ _runtime_state->fragment_instance_id());
Thread::set_self_name("volap_scanner");
int64_t wait_time = scanner->update_wait_worker_timer();
// Do not use ScopedTimer. There is no guarantee that, the counter
@@ -892,9 +892,8 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
++j, ++i) {
scanner_ranges.push_back((*ranges)[i].get());
}
- VOlapScanner* scanner =
- new VOlapScanner(state, this, _olap_scan_node.is_preaggregation,
- _need_agg_finalize, *scan_range, _scanner_mem_tracker.get());
+ VOlapScanner* scanner = new VOlapScanner(state, this, _olap_scan_node.is_preaggregation,
+ _need_agg_finalize, *scan_range);
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare failed.
_scanner_pool.add(scanner);
diff --git a/be/src/vec/exec/volap_scan_node.h b/be/src/vec/exec/volap_scan_node.h
index 44b3829402..51f4ed97c0 100644
--- a/be/src/vec/exec/volap_scan_node.h
+++ b/be/src/vec/exec/volap_scan_node.h
@@ -221,8 +221,6 @@ private:
TResourceInfo* _resource_info;
int64_t _buffered_bytes;
- // Count the memory consumption of Rowset Reader and Tablet Reader in OlapScanner.
- std::unique_ptr<MemTracker> _scanner_mem_tracker;
EvalConjunctsFn _eval_conjuncts_fn;
// the max num of scan keys of this scan request.
diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp
index 5761705a84..202f54bcb2 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -28,8 +28,7 @@
namespace doris::vectorized {
VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, bool aggregation,
- bool need_agg_finalize, const TPaloScanRange& scan_range,
- MemTracker* tracker)
+ bool need_agg_finalize, const TPaloScanRange& scan_range)
: _runtime_state(runtime_state),
_parent(parent),
_tuple_desc(parent->_tuple_desc),
@@ -37,8 +36,7 @@ VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, b
_is_open(false),
_aggregation(aggregation),
_need_agg_finalize(need_agg_finalize),
- _version(-1),
- _mem_tracker(tracker) {
+ _version(-1) {
_tablet_schema = std::make_shared<TabletSchema>();
}
@@ -47,7 +45,6 @@ Status VOlapScanner::prepare(
const std::vector<TCondition>& filters,
const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>& bloom_filters,
const std::vector<FunctionFilter>& function_filters) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
set_tablet_reader();
// set limit to reduce end of rowset and segment mem use
_tablet_reader->set_batch_size(
@@ -119,7 +116,6 @@ Status VOlapScanner::prepare(
Status VOlapScanner::open() {
SCOPED_TIMER(_parent->_reader_init_timer);
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
_runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false);
@@ -325,7 +321,6 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) {
Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bool* eof) {
// only empty block should be here
DCHECK(block->rows() == 0);
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
if (!block->mem_reuse()) {
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index d3cd791b63..3bc286eb40 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -36,7 +36,7 @@ class VOlapScanNode;
class VOlapScanner {
public:
VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, bool aggregation,
- bool need_agg_finalize, const TPaloScanRange& scan_range, MemTracker* tracker);
+ bool need_agg_finalize, const TPaloScanRange& scan_range);
virtual ~VOlapScanner() = default;
Status prepare(const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
@@ -141,8 +141,6 @@ private:
MonotonicStopWatch _watcher;
- MemTracker* _mem_tracker;
-
VExprContext* _vconjunct_ctx = nullptr;
bool _need_to_close = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org