You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/19 04:28:27 UTC

[doris] branch master updated: [enhancement](memtracker) Fix brpc causing query mem tracker to be inaccurate #13401

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2745a88814 [enhancement](memtracker) Fix brpc causing query mem tracker to be inaccurate #13401
2745a88814 is described below

commit 2745a888143cba313625520438b0616eb35e7aed
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed Oct 19 12:28:20 2022 +0800

    [enhancement](memtracker) Fix brpc causing query mem tracker to be inaccurate #13401
---
 be/src/runtime/memory/mem_tracker.cpp           | 10 ++++------
 be/src/runtime/memory/mem_tracker.h             |  6 ++++++
 be/src/runtime/memory/mem_tracker_limiter.h     |  4 ----
 be/src/runtime/memory/mem_tracker_task_pool.cpp | 10 +++++-----
 be/src/service/internal_service.cpp             | 12 ++++--------
 be/src/vec/runtime/vdata_stream_recvr.cpp       |  5 ++++-
 6 files changed, 23 insertions(+), 24 deletions(-)

diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index 9c0b006281..0604d538dc 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -23,7 +23,6 @@
 #include <fmt/format.h>
 
 #include "runtime/thread_context.h"
-#include "util/pretty_printer.h"
 #include "util/string_util.h"
 #include "util/time.h"
 
@@ -103,11 +102,10 @@ void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot
 }
 
 std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
-    return fmt::format(
-            "MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label,
-            snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
-            snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES),
-            snapshot.peak_consumption);
+    return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)",
+                       snapshot.label, snapshot.parent, print_bytes(snapshot.cur_consumption),
+                       snapshot.cur_consumption, print_bytes(snapshot.peak_consumption),
+                       snapshot.peak_consumption);
 }
 
 static std::unordered_map<std::string, std::shared_ptr<MemTracker>> global_mem_trackers;
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 0b8ead634b..c939d9a62d 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -19,6 +19,7 @@
 // and modified by Doris
 #pragma once
 
+#include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
@@ -56,6 +57,11 @@ public:
     static std::shared_ptr<MemTracker> get_global_mem_tracker(const std::string& label);
     static void make_global_mem_tracker_snapshot(std::vector<MemTracker::Snapshot>* snapshots);
 
+    static std::string print_bytes(int64_t bytes) {
+        return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
+                          : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES);
+    }
+
 public:
     const std::string& label() const { return _label; }
     // Returns the memory consumed in bytes.
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 9a40e79b66..f6440c0ace 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -165,10 +165,6 @@ public:
         return msg.str();
     }
 
-    static std::string print_bytes(int64_t bytes) {
-        return PrettyPrinter::print(bytes, TUnit::BYTES);
-    }
-
 private:
     // The following func, for automatic memory tracking and limiting based on system memory allocation.
     friend class ThreadMemTrackerMgr;
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 04f2388515..58d98278c6 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -55,7 +55,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_scanner_me
         const std::string& query_id) {
     return register_task_mem_tracker_impl("Scanner#" + query_id, -1,
                                           fmt::format("Scanner#Query#Id={}", query_id),
-                                          ExecEnv::GetInstance()->query_pool_mem_tracker());
+                                          get_task_mem_tracker(query_id));
 }
 
 std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_mem_tracker(
@@ -69,7 +69,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_scanner_mem
         const std::string& load_id) {
     return register_task_mem_tracker_impl("Scanner#" + load_id, -1,
                                           fmt::format("Scanner#Load#Id={}", load_id),
-                                          ExecEnv::GetInstance()->load_pool_mem_tracker());
+                                          get_task_mem_tracker(load_id));
 }
 
 std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::get_task_mem_tracker(
@@ -104,9 +104,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
             LOG(INFO) << fmt::format(
                     "Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
                     "PeakUsed={}",
-                    it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES),
-                    PrettyPrinter::print(it->second->consumption(), TUnit::BYTES),
-                    PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES));
+                    it->first, MemTracker::print_bytes(it->second->limit()),
+                    MemTracker::print_bytes(it->second->consumption()),
+                    MemTracker::print_bytes(it->second->peak_consumption()));
             expired_task_ids.emplace_back(it->first);
         } else if (config::memory_verbose_track) {
             it->second->print_log_usage("query routine");
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index d7f1834843..a81c97bc20 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -132,10 +132,8 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_
         query_id = print_id(request->query_id());
         finst_id.__set_hi(request->finst_id().hi());
         finst_id.__set_lo(request->finst_id().lo());
-        // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
-        transmit_tracker = std::make_shared<MemTrackerLimiter>(
-                -1, fmt::format("QueryTransmit#queryId={}", query_id),
-                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
+        transmit_tracker =
+                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
     } else {
         query_id = "unkown_transmit_data";
         transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data");
@@ -642,10 +640,8 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl
         query_id = print_id(request->query_id());
         finst_id.__set_hi(request->finst_id().hi());
         finst_id.__set_lo(request->finst_id().lo());
-        // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
-        transmit_tracker = std::make_shared<MemTrackerLimiter>(
-                -1, fmt::format("QueryTransmit#queryId={}", query_id),
-                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
+        transmit_tracker =
+                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
     } else {
         query_id = "unkown_transmit_block";
         transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_block");
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 0e40404733..e80354af25 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -53,7 +53,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
     }
 
     // _cur_batch must be replaced with the returned batch.
-    _current_block.reset();
+    {
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+        _current_block.reset();
+    }
     *next_block = nullptr;
     if (_is_cancelled) {
         return Status::Cancelled("Cancelled");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org