You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/25 00:56:26 UTC
[incubator-doris] branch master updated: [fix](memory tracker) Fix lru cache, compaction tracker, add USE_MEM_TRACKER compile (#9661)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ca05d1ee01 [fix](memory tracker) Fix lru cache, compaction tracker, add USE_MEM_TRACKER compile (#9661)
ca05d1ee01 is described below
commit ca05d1ee017f491e896ffdc41dea2b70f2a4de0b
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed May 25 08:56:17 2022 +0800
[fix](memory tracker) Fix lru cache, compaction tracker, add USE_MEM_TRACKER compile (#9661)
1. Fix Lru Cache MemTracker consumption value is negative.
2. Fix compaction Cache MemTracker has no track.
3. Add USE_MEM_TRACKER compile option.
4. Make sure the malloc/free hook is not stopped at any time.
---
be/CMakeLists.txt | 8 ++++++++
be/src/agent/task_worker_pool.cpp | 2 ++
be/src/exec/json_scanner.h | 4 ++++
be/src/gutil/strings/numbers.cc | 25 +++++++++++++++---------
be/src/olap/compaction.cpp | 13 ++++++++++---
be/src/olap/compaction.h | 2 ++
be/src/olap/lru_cache.cpp | 20 ++++++++-----------
be/src/olap/lru_cache.h | 8 +++++++-
be/src/olap/olap_server.cpp | 2 ++
be/src/olap/tablet.cpp | 9 +++++++++
be/src/olap/tablet.h | 2 ++
be/src/runtime/load_channel.cpp | 2 +-
be/src/runtime/mem_tracker.cpp | 2 +-
be/src/runtime/mem_tracker.h | 14 ++++++++++++++
be/src/runtime/result_sink.cpp | 7 +++----
be/src/runtime/thread_context.cpp | 34 ++++++++++++++++++++++++++++-----
be/src/runtime/thread_context.h | 29 ++++++++++++++++++++++------
be/src/runtime/thread_mem_tracker_mgr.h | 28 ++++++++++++++++++++-------
be/src/service/doris_main.cpp | 2 ++
be/src/vec/sink/result_sink.cpp | 3 +++
be/test/olap/lru_cache_test.cpp | 21 ++++++++++----------
build.sh | 5 +++++
run-be-ut.sh | 1 +
23 files changed, 183 insertions(+), 60 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 76edc3ba27..7c53258326 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -438,6 +438,14 @@ if (WITH_LZO)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_LZO")
endif()
+# Enable memory tracker, which allows BE to limit the memory of tasks such as query, load,
+# and compaction,and observe the memory of BE through be_ip:http_port/MemTracker.
+# Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn off the memory tracker,
+# which will bring about a 2% performance improvement, which may be useful in performance POC.
+if (USE_MEM_TRACKER)
+ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DUSE_MEM_TRACKER")
+endif()
+
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new")
endif()
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 0ef2325d2f..0a30aad20a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1609,6 +1609,8 @@ void TaskWorkerPool::_random_sleep(int second) {
}
void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
+ SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION,
+ StorageEngine::instance()->compaction_mem_tracker());
while (_is_work) {
TAgentTaskRequest agent_task_req;
TCompactionReq compaction_req;
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index ab2f479e60..5dc61c18cb 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -64,6 +64,10 @@ public:
// Get next tuple
Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override;
+ Status get_next(vectorized::Block* block, bool* eof) override {
+ return Status::NotSupported("Not Implemented get block");
+ }
+
// Close this scanner
void close() override;
diff --git a/be/src/gutil/strings/numbers.cc b/be/src/gutil/strings/numbers.cc
index 568b0ce2a4..46be289fb8 100644
--- a/be/src/gutil/strings/numbers.cc
+++ b/be/src/gutil/strings/numbers.cc
@@ -1442,7 +1442,6 @@ char* SimpleItoaWithCommas(__int128_t i, char* buffer, int32_t buffer_size) {
return p;
}
-
// ----------------------------------------------------------------------
// ItoaKMGT()
// Description: converts an integer to a string
@@ -1480,7 +1479,7 @@ string ItoaKMGT(int64 i) {
}
string AccurateItoaKMGT(int64 i) {
- const char *sign = "";
+ const char* sign = "";
if (i < 0) {
// We lose some accuracy if the caller passes LONG_LONG_MIN, but
// that's OK as this function is only for human readability
@@ -1489,31 +1488,39 @@ string AccurateItoaKMGT(int64 i) {
i = -i;
}
- string ret = std::to_string(i) + " : " + StringPrintf("%s", sign);
+ string ret = std::to_string(i) + " = " + StringPrintf("%s", sign);
int64 val;
if ((val = (i >> 40)) > 1) {
- ret += StringPrintf("%" PRId64 "%s", val, "T");
+ ret += StringPrintf("%" PRId64
+ "%s"
+ " + ",
+ val, "T");
i = i - (val << 40);
}
if ((val = (i >> 30)) > 1) {
- ret += StringPrintf(" %" PRId64 "%s", val, "G");
+ ret += StringPrintf("%" PRId64
+ "%s"
+ " + ",
+ val, "G");
i = i - (val << 30);
}
if ((val = (i >> 20)) > 1) {
- ret += StringPrintf(" %" PRId64 "%s", val, "M");
+ ret += StringPrintf("%" PRId64
+ "%s"
+ " + ",
+ val, "M");
i = i - (val << 20);
}
if ((val = (i >> 10)) > 1) {
- ret += StringPrintf(" %" PRId64 "%s", val, "K");
+ ret += StringPrintf("%" PRId64 "%s", val, "K");
i = i - (val << 10);
} else {
- ret += StringPrintf(" %" PRId64 "%s", i, "K");
+ ret += StringPrintf("%" PRId64 "%s", i, "K");
}
return ret;
}
-
// DEPRECATED(wadetregaskis).
// These are non-inline because some BUILD files turn on -Wformat-non-literal.
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index fdc07fc725..14569881a7 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -26,11 +26,18 @@ using std::vector;
namespace doris {
Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
- : _mem_tracker(MemTracker::create_tracker(-1, label, nullptr, MemTrackerLevel::INSTANCE)),
- _tablet(tablet),
+ : _tablet(tablet),
_input_rowsets_size(0),
_input_row_num(0),
- _state(CompactionState::INITED) {}
+ _state(CompactionState::INITED) {
+#ifndef BE_TEST
+ _mem_tracker = MemTracker::create_tracker(-1, label,
+ StorageEngine::instance()->compaction_mem_tracker(),
+ MemTrackerLevel::INSTANCE);
+#else
+ _mem_tracker = MemTracker::get_process_tracker();
+#endif
+}
Compaction::~Compaction() {}
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 27c33dcc5a..7306bbacea 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -53,6 +53,8 @@ public:
Status execute_compact();
virtual Status execute_compact_impl() = 0;
+ std::shared_ptr<MemTracker>& get_mem_tracker() { return _mem_tracker; }
+
protected:
virtual Status pick_rowsets_to_compact() = 0;
virtual std::string compaction_name() const = 0;
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 584d7dccb9..90b2e4ddf5 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -283,7 +283,7 @@ void LRUCache::_evict_one_entry(LRUHandle* e) {
Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
- CachePriority priority) {
+ CachePriority priority, MemTracker* tracker) {
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
e->value = value;
@@ -296,7 +296,12 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value,
e->next = e->prev = nullptr;
e->in_cache = true;
e->priority = priority;
+ e->mem_tracker = tracker;
memcpy(e->key_data, key.data(), key.size());
+ // The memory of the parameter value should be recorded in the tls mem tracker,
+ // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
+ if (tracker)
+ tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(tracker, e->total_size);
LRUHandle* to_remove_head = nullptr;
{
std::lock_guard<std::mutex> l(_mutex);
@@ -433,7 +438,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
: _name(name),
_last_id(1),
_mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(_mem_tracker);
const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
_shards[s] = new LRUCache(type);
@@ -452,7 +456,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
}
ShardedLRUCache::~ShardedLRUCache() {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
for (int s = 0; s < kNumShards; s++) {
delete _shards[s];
}
@@ -463,12 +466,9 @@ ShardedLRUCache::~ShardedLRUCache() {
Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority) {
- // The memory of the parameter value should be recorded in the tls mem tracker,
- // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
- tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge);
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
- return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority);
+ return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority,
+ _mem_tracker.get());
}
Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
@@ -477,13 +477,11 @@ Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
}
void ShardedLRUCache::release(Handle* handle) {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
_shards[_shard(h->hash)]->release(handle);
}
void ShardedLRUCache::erase(const CacheKey& key) {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
_shards[_shard(hash)]->erase(key, hash);
}
@@ -502,7 +500,6 @@ uint64_t ShardedLRUCache::new_id() {
}
int64_t ShardedLRUCache::prune() {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune();
@@ -511,7 +508,6 @@ int64_t ShardedLRUCache::prune() {
}
int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune_if(pred);
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index b6b1f95754..6fb87744f4 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -15,6 +15,7 @@
#include "olap/olap_common.h"
#include "runtime/mem_tracker.h"
+#include "runtime/thread_context.h"
#include "util/metrics.h"
#include "util/slice.h"
@@ -236,6 +237,7 @@ typedef struct LRUHandle {
uint32_t refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
CachePriority priority = CachePriority::NORMAL;
+ MemTracker* mem_tracker;
char key_data[1]; // Beginning of key
CacheKey key() const {
@@ -250,6 +252,9 @@ typedef struct LRUHandle {
void free() {
(*deleter)(key(), value);
+ if (mem_tracker)
+ mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(),
+ total_size);
::free(this);
}
@@ -308,7 +313,8 @@ public:
// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
- CachePriority priority = CachePriority::NORMAL);
+ CachePriority priority = CachePriority::NORMAL,
+ MemTracker* tracker = nullptr);
Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
void release(Cache::Handle* handle);
void erase(const CacheKey& key, uint32_t hash);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index f7437693b1..f4732cbe04 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -564,6 +564,8 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits);
if (st.ok() && permits > 0 && _permit_limiter.request(permits)) {
auto st = _compaction_thread_pool->submit_func([=]() {
+ SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION,
+ tablet->get_compaction_mem_tracker(compaction_type));
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
_permit_limiter.release(permits);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index cb073a8558..668c9ce67f 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1504,6 +1504,7 @@ Status Tablet::create_rowset_writer(const int64_t& txn_id, const PUniqueId& load
void Tablet::_init_context_common_fields(RowsetWriterContext& context) {
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.tablet_uid = tablet_uid();
+
context.tablet_id = tablet_id();
context.partition_id = partition_id();
context.tablet_schema_hash = schema_hash();
@@ -1522,4 +1523,12 @@ Status Tablet::create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* r
return RowsetFactory::create_rowset(&tablet_schema(), tablet_path_desc(), rowset_meta, rowset);
}
+std::shared_ptr<MemTracker>& Tablet::get_compaction_mem_tracker(CompactionType compaction_type) {
+ if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
+ return _cumulative_compaction->get_mem_tracker();
+ } else {
+ return _base_compaction->get_mem_tracker();
+ }
+}
+
} // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 1874afa4e8..3fb3144af1 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -253,6 +253,8 @@ public:
return _cumulative_compaction_policy;
}
+ std::shared_ptr<MemTracker>& get_compaction_mem_tracker(CompactionType compaction_type);
+
inline bool all_beta() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 85f2c5da3f..55038e2742 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -137,7 +137,7 @@ bool LoadChannel::is_finished() {
}
Status LoadChannel::cancel() {
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
for (auto& it : _tablets_channels) {
it.second->cancel();
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index bc2ae3efc1..359b0f5368 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -178,7 +178,7 @@ void MemTracker::init_virtual() {
MemTracker::~MemTracker() {
consume(_untracked_mem.exchange(0)); // before memory_leak_check
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
- if (_label == "Process") GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER();
+ if (_label == "Process") STOP_THREAD_LOCAL_MEM_TRACKER(false);
if (!_virtual && config::memory_leak_detection) MemTracker::memory_leak_check(this);
if (!_virtual && parent()) {
// Do not call release on the parent tracker to avoid repeated releases.
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 3d4eb740d2..c21d6d3db5 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -122,6 +122,7 @@ public:
// Increases consumption of this tracker and its ancestors by 'bytes'.
void consume(int64_t bytes) {
+#ifdef USE_MEM_TRACKER
if (bytes <= 0) {
release(-bytes);
return;
@@ -129,6 +130,7 @@ public:
for (auto& tracker : _all_trackers) {
tracker->_consumption->add(bytes);
}
+#endif
}
// Increases consumption of this tracker and its ancestors by 'bytes' only if
@@ -136,6 +138,7 @@ public:
// no MemTrackers are updated. Returns true if the consumption was successfully updated.
WARN_UNUSED_RESULT
Status try_consume(int64_t bytes) {
+#ifdef USE_MEM_TRACKER
if (bytes <= 0) {
release(-bytes);
return Status::OK();
@@ -166,11 +169,13 @@ public:
}
// Everyone succeeded, return.
DCHECK_EQ(i, -1);
+#endif
return Status::OK();
}
// Decreases consumption of this tracker and its ancestors by 'bytes'.
void release(int64_t bytes) {
+#ifdef USE_MEM_TRACKER
if (bytes < 0) {
consume(-bytes);
return;
@@ -181,6 +186,7 @@ public:
for (auto& tracker : _all_trackers) {
tracker->_consumption->add(-bytes);
}
+#endif
}
static void batch_consume(int64_t bytes,
@@ -247,22 +253,26 @@ public:
// ancestor. This happens when we want to update tracking on a particular mem tracker but the consumption
// against the limit recorded in one of its ancestors already happened.
void consume_local(int64_t bytes, MemTracker* end_tracker) {
+#ifdef USE_MEM_TRACKER
DCHECK(end_tracker);
if (bytes == 0) return;
for (auto& tracker : _all_trackers) {
if (tracker == end_tracker) return;
tracker->_consumption->add(bytes);
}
+#endif
}
// up to (but not including) end_tracker.
void release_local(int64_t bytes, MemTracker* end_tracker) {
+#ifdef USE_MEM_TRACKER
DCHECK(end_tracker);
if (bytes == 0) return;
for (auto& tracker : _all_trackers) {
if (tracker == end_tracker) return;
tracker->_consumption->add(-bytes);
}
+#endif
}
// Transfer 'bytes' of consumption from this tracker to 'dst'.
@@ -273,6 +283,7 @@ public:
WARN_UNUSED_RESULT
Status try_transfer_to(MemTracker* dst, int64_t bytes) {
+#ifdef USE_MEM_TRACKER
if (id() == dst->id()) return Status::OK();
// Must release first, then consume
release_cache(bytes);
@@ -281,14 +292,17 @@ public:
consume_cache(bytes);
return st;
}
+#endif
return Status::OK();
}
// Forced transfer, 'dst' may limit exceed, and more ancestor trackers will be updated.
void transfer_to(MemTracker* dst, int64_t bytes) {
+#ifdef USE_MEM_TRACKER
if (id() == dst->id()) return;
release_cache(bytes);
dst->consume_cache(bytes);
+#endif
}
// Returns true if a valid limit of this tracker or one of its ancestors is exceeded.
diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp
index cefe5d2181..7e67a8bfcb 100644
--- a/be/src/runtime/result_sink.cpp
+++ b/be/src/runtime/result_sink.cpp
@@ -88,10 +88,9 @@ Status ResultSink::open(RuntimeState* state) {
}
Status ResultSink::send(RuntimeState* state, RowBatch* batch) {
- // The memory consumption in the process of sending the results is not recorded in the query memory.
- // 1. Avoid the query being cancelled when the memory limit is reached after the query result comes out.
- // 2. If record this memory, also need to record on the receiving end, need to consider the life cycle of MemTracker.
- SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER();
+ // The memory consumption in the process of sending the results is not check query memory limit.
+ // Avoid the query being cancelled when the memory limit is reached after the query result comes out.
+ STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
return _writer->append_row_batch(batch);
}
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index fade4fd51e..b08642bf12 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -36,7 +36,9 @@ AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, const st
const TUniqueId& fragment_instance_id,
const std::shared_ptr<doris::MemTracker>& mem_tracker) {
DCHECK(task_id != "");
+#ifdef USE_MEM_TRACKER
tls_ctx()->attach(type, task_id, fragment_instance_id, mem_tracker);
+#endif
}
AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type,
@@ -44,7 +46,9 @@ AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type,
#ifndef BE_TEST
DCHECK(mem_tracker);
#endif
+#ifdef USE_MEM_TRACKER
tls_ctx()->attach(type, "", TUniqueId(), mem_tracker);
+#endif
}
AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
@@ -52,7 +56,9 @@ AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
#ifndef BE_TEST
DCHECK(mem_tracker);
#endif
+#ifdef USE_MEM_TRACKER
tls_ctx()->attach(query_to_task_type(query_type), "", TUniqueId(), mem_tracker);
+#endif
}
AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
@@ -64,7 +70,9 @@ AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
DCHECK(fragment_instance_id != TUniqueId());
DCHECK(mem_tracker);
#endif
+#ifdef USE_MEM_TRACKER
tls_ctx()->attach(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker);
+#endif
}
AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state,
@@ -74,19 +82,24 @@ AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state,
DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
DCHECK(mem_tracker);
#endif
+#ifdef USE_MEM_TRACKER
tls_ctx()->attach(query_to_task_type(runtime_state->query_type()),
print_id(runtime_state->query_id()), runtime_state->fragment_instance_id(),
mem_tracker);
+#endif
}
AttachTaskThread::~AttachTaskThread() {
+#ifdef USE_MEM_TRACKER
tls_ctx()->detach();
DorisMetrics::instance()->attach_task_thread_count->increment(1);
+#endif
}
template <bool Existed>
SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker(
const std::shared_ptr<doris::MemTracker>& mem_tracker, bool in_task) {
+#ifdef USE_MEM_TRACKER
if (config::memory_verbose_track) {
#ifndef BE_TEST
DCHECK(mem_tracker);
@@ -100,41 +113,49 @@ SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker(
_old_tracker_id =
tls_ctx()->_thread_mem_tracker_mgr->update_tracker<false>(mem_tracker);
}
-#endif
+#endif // BE_TEST
#ifndef NDEBUG
tls_ctx()->_thread_mem_tracker_mgr->switch_count += 1;
-#endif
+#endif // NDEBUG
}
+#endif // USE_MEM_TRACKER
}
template <bool Existed>
SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() {
+#ifdef USE_MEM_TRACKER
if (config::memory_verbose_track) {
#ifndef NDEBUG
tls_ctx()->_thread_mem_tracker_mgr->switch_count -= 1;
DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
-#endif
+#endif // NDEBUG
#ifndef BE_TEST
tls_ctx()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
-#endif
+#endif // BE_TEST
}
+#endif // USE_MEM_TRACKER
}
SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(
const std::string& action_type, bool cancel_work, ERRCALLBACK err_call_back_func) {
+#ifdef USE_MEM_TRACKER
DCHECK(action_type != std::string());
_old_tracker_cb = tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(
action_type, cancel_work, err_call_back_func);
+#endif
}
SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() {
+#ifdef USE_MEM_TRACKER
tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb);
#ifndef NDEBUG
DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1);
#endif
+#endif // USE_MEM_TRACKER
}
SwitchBthread::SwitchBthread() {
+#ifdef USE_MEM_TRACKER
tls = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
// First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
if (tls == nullptr) {
@@ -148,16 +169,19 @@ SwitchBthread::SwitchBthread() {
}
tls->_thread_mem_tracker_mgr->init();
tls->set_type(ThreadContext::TaskType::BRPC);
+#endif
}
SwitchBthread::~SwitchBthread() {
+#ifdef USE_MEM_TRACKER
DCHECK(tls != nullptr);
tls->_thread_mem_tracker_mgr->clear_untracked_mems();
tls->_thread_mem_tracker_mgr->init();
tls->set_type(ThreadContext::TaskType::UNKNOWN);
#ifndef NDEBUG
DorisMetrics::instance()->switch_bthread_count->increment(1);
-#endif
+#endif // NDEBUG
+#endif // USE_MEM_TRACKER
}
template class SwitchThreadMemTracker<true>;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 8dc8f5267e..0572c2b08d 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -35,10 +35,8 @@
// Be careful to stop the thread mem tracker, because the actual order of malloc and free memory
// may be different from the order of execution of instructions, which will cause the position of
// the memory track to be unexpected.
-#define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \
- auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(true)
-#define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \
- auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(false)
+#define STOP_THREAD_LOCAL_MEM_TRACKER(scope) \
+ auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(scope)
// Switch thread mem tracker during task execution.
// After the non-query thread switches the mem tracker, if the thread will not switch the mem
// tracker again in the short term, can consider manually clear_untracked_mems.
@@ -80,9 +78,11 @@
#define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
doris::tls_ctx()->_thread_mem_tracker_mgr->add_tracker(mem_tracker)
#define CONSUME_THREAD_LOCAL_MEM_TRACKER(size) \
- doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(size)
+ doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_try_consume(size)
#define RELEASE_THREAD_LOCAL_MEM_TRACKER(size) \
- doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(-size)
+ doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_try_consume(-size)
+#define STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER() \
+ auto VARNAME_LINENUM(switch_bthread) = StopCheckLimitThreadMemTracker()
namespace doris {
@@ -281,7 +281,9 @@ public:
~SwitchThreadMemTracker();
protected:
+#ifdef USE_MEM_TRACKER
int64_t _old_tracker_id = 0;
+#endif
};
class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker<false> {
@@ -303,7 +305,9 @@ public:
~SwitchThreadMemTrackerErrCallBack();
private:
+#ifdef USE_MEM_TRACKER
ConsumeErrCallBackInfo _old_tracker_cb;
+#endif
};
class SwitchBthread {
@@ -313,7 +317,20 @@ public:
~SwitchBthread();
private:
+#ifdef USE_MEM_TRACKER
ThreadContext* tls;
+#endif
+};
+
+class StopCheckLimitThreadMemTracker {
+public:
+ explicit StopCheckLimitThreadMemTracker() {
+ tls_ctx()->_thread_mem_tracker_mgr->update_check_limit(false);
+ }
+
+ ~StopCheckLimitThreadMemTracker() {
+ tls_ctx()->_thread_mem_tracker_mgr->update_check_limit(true);
+ }
};
} // namespace doris
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index 84033ff4e7..754e231747 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -63,6 +63,10 @@ public:
~ThreadMemTrackerMgr() {
clear_untracked_mems();
+ _consume_err_cb.init();
+ _mem_trackers.clear();
+ _untracked_mems.clear();
+ _mem_tracker_labels.clear();
start_thread_mem_tracker = false;
}
@@ -107,12 +111,14 @@ public:
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
void cache_consume(int64_t size);
- void noncache_consume(int64_t size);
+ void noncache_try_consume(int64_t size);
bool is_attach_task() { return _task_id != ""; }
std::shared_ptr<MemTracker> mem_tracker();
+ void update_check_limit(bool check_limit) { _check_limit = check_limit; }
+
int64_t switch_count = 0;
std::string print_debug_string() {
@@ -163,6 +169,8 @@ private:
// we can confirm the tracker label that was added through _mem_tracker_labels.
// Because for performance, all map keys are tracker id.
phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels;
+ // If true, call memtracker try_consume, otherwise call consume.
+ bool _check_limit;
int64_t _tracker_id;
// Avoid memory allocation in functions.
@@ -184,6 +192,7 @@ inline void ThreadMemTrackerMgr::init() {
_untracked_mems[0] = 0;
_mem_tracker_labels.clear();
_mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
+ _check_limit = true;
}
inline void ThreadMemTrackerMgr::clear_untracked_mems() {
@@ -244,21 +253,26 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string();
- // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion.
- // Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses.
- start_thread_mem_tracker = false;
// When switching to the current tracker last time, the remaining untracked memory.
if (_untracked_mems[_tracker_id] != 0) {
_untracked_mem += _untracked_mems[_tracker_id];
_untracked_mems[_tracker_id] = 0;
}
- noncache_consume(_untracked_mem);
+ // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again,
+ // will enter infinite recursion. So the temporary memory allocated in mem_tracker.try_consume
+ // and mem_limit_exceeded will directly call consume.
+ if (_check_limit) {
+ _check_limit = false;
+ noncache_try_consume(_untracked_mem);
+ _check_limit = true;
+ } else {
+ mem_tracker()->consume(_untracked_mem);
+ }
_untracked_mem = 0;
- start_thread_mem_tracker = true;
}
}
-inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) {
+inline void ThreadMemTrackerMgr::noncache_try_consume(int64_t size) {
Status st = mem_tracker()->try_consume(size);
if (!st) {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index dd8804a56d..cf486b2c98 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -332,9 +332,11 @@ int main(int argc, char** argv) {
fprintf(stderr, "Failed to change TCMalloc total thread cache size.\n");
return -1;
}
+#ifdef USE_MEM_TRACKER
if (doris::config::track_new_delete) {
init_hook();
}
+#endif // USE_MEM_TRACKER
#endif
std::vector<doris::StorePath> paths;
diff --git a/be/src/vec/sink/result_sink.cpp b/be/src/vec/sink/result_sink.cpp
index 9441fec8b9..ce3ac086cc 100644
--- a/be/src/vec/sink/result_sink.cpp
+++ b/be/src/vec/sink/result_sink.cpp
@@ -87,6 +87,9 @@ Status VResultSink::send(RuntimeState* state, RowBatch* batch) {
}
Status VResultSink::send(RuntimeState* state, Block* block) {
+ // The memory consumption in the process of sending the results is not check query memory limit.
+ // Avoid the query being cancelled when the memory limit is reached after the query result comes out.
+ STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
return _writer->append_block(*block);
}
diff --git a/be/test/olap/lru_cache_test.cpp b/be/test/olap/lru_cache_test.cpp
index a23a608f39..b7e18cc01e 100644
--- a/be/test/olap/lru_cache_test.cpp
+++ b/be/test/olap/lru_cache_test.cpp
@@ -226,45 +226,44 @@ static void insert_LRUCache(LRUCache& cache, const CacheKey& key, int value,
TEST_F(CacheTest, Usage) {
LRUCache cache(LRUCacheType::SIZE);
- cache.set_capacity(1050);
+ cache.set_capacity(1040);
// The lru usage is handle_size + charge.
- // handle_size = sizeof(handle) - 1 + key size = 88 - 1 + 3 = 90
+ // handle_size = sizeof(handle) - 1 + key size = 96 - 1 + 3 = 98
CacheKey key1("100");
insert_LRUCache(cache, key1, 100, CachePriority::NORMAL);
- ASSERT_EQ(190, cache.get_usage()); // 100 + 90
+ ASSERT_EQ(198, cache.get_usage()); // 100 + 98
CacheKey key2("200");
insert_LRUCache(cache, key2, 200, CachePriority::DURABLE);
- ASSERT_EQ(480, cache.get_usage()); // 190 + 290(d)
+ ASSERT_EQ(496, cache.get_usage()); // 198 + 298(d), d = DURABLE
CacheKey key3("300");
insert_LRUCache(cache, key3, 300, CachePriority::NORMAL);
- ASSERT_EQ(870, cache.get_usage()); // 190 + 290(d) + 390
+ ASSERT_EQ(894, cache.get_usage()); // 198 + 298(d) + 398
CacheKey key4("400");
insert_LRUCache(cache, key4, 400, CachePriority::NORMAL);
- ASSERT_EQ(780, cache.get_usage()); // 290(d) + 490
+ ASSERT_EQ(796, cache.get_usage()); // 298(d) + 498, evict 198 398
CacheKey key5("500");
insert_LRUCache(cache, key5, 500, CachePriority::NORMAL);
- ASSERT_EQ(880, cache.get_usage()); // 290(d) + 590
+ ASSERT_EQ(896, cache.get_usage()); // 298(d) + 598, evict 498
CacheKey key6("600");
insert_LRUCache(cache, key6, 600, CachePriority::NORMAL);
- ASSERT_EQ(980, cache.get_usage()); // 290(d) + 690
+ ASSERT_EQ(996, cache.get_usage()); // 298(d) + 698, evict 498
CacheKey key7("950");
insert_LRUCache(cache, key7, 950, CachePriority::DURABLE);
- ASSERT_EQ(1040, cache.get_usage()); // 1040(d)
+ ASSERT_EQ(0, cache.get_usage()); // evict 298 698, because 950 + 98 > 1040, so insert failed
}
TEST_F(CacheTest, Prune) {
LRUCache cache(LRUCacheType::NUMBER);
cache.set_capacity(5);
- // The lru usage is handle_size + charge = 96 - 1 = 95
- // 95 + 3 means handle_size + key size
+ // The lru usage is 1, add one entry
CacheKey key1("100");
insert_LRUCache(cache, key1, 100, CachePriority::NORMAL);
EXPECT_EQ(1, cache.get_usage());
diff --git a/build.sh b/build.sh
index e3b3243eb8..907f5510ff 100755
--- a/build.sh
+++ b/build.sh
@@ -214,6 +214,9 @@ fi
if [[ -z ${STRIP_DEBUG_INFO} ]]; then
STRIP_DEBUG_INFO=OFF
fi
+if [[ -z ${USE_MEM_TRACKER} ]]; then
+ USE_MEM_TRACKER=ON
+fi
if [[ -z ${USE_DWARF} ]]; then
USE_DWARF=OFF
@@ -238,6 +241,7 @@ echo "Get params:
USE_LLD -- $USE_LLD
USE_DWARF -- $USE_DWARF
STRIP_DEBUG_INFO -- $STRIP_DEBUG_INFO
+ USE_MEM_TRACKER -- $USE_MEM_TRACKER
"
# Clean and build generated code
@@ -300,6 +304,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
-DBUILD_JAVA_UDF=${BUILD_JAVA_UDF} \
-DSTRIP_DEBUG_INFO=${STRIP_DEBUG_INFO} \
-DUSE_DWARF=${USE_DWARF} \
+ -DUSE_MEM_TRACKER=${USE_MEM_TRACKER} \
-DUSE_AVX2=${USE_AVX2} \
-DGLIBC_COMPATIBILITY=${GLIBC_COMPATIBILITY} ../
${BUILD_SYSTEM} -j ${PARALLEL}
diff --git a/run-be-ut.sh b/run-be-ut.sh
index a8b0950cce..d075693454 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -138,6 +138,7 @@ ${CMAKE_CMD} -G "${GENERATOR}" \
-DWITH_MYSQL=OFF \
-DWITH_KERBEROS=OFF \
-DUSE_DWARF=${USE_DWARF} \
+ -DUSE_MEM_TRACKER=ON \
${CMAKE_USE_CCACHE} ../
${BUILD_SYSTEM} -j ${PARALLEL}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org