You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/07/11 12:21:37 UTC
[doris] branch master updated: [improvement](memory) Refactor doris cache GC (#21522)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4b30485d62 [improvement](memory) Refactor doris cache GC (#21522)
4b30485d62 is described below
commit 4b30485d623890b54110ed582a9d5c5674a0cc48
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Tue Jul 11 20:21:31 2023 +0800
[improvement](memory) Refactor doris cache GC (#21522)
Abstract CachePolicy, which controls the gc of all caches.
Add stale sweep to all lru caches, including page caches, etc.
I0710 18:32:35.729460 2945318 mem_info.cpp:172] End Full GC Free, Memory 3866389992 Bytes. cost(us): 112165339, details: FullGC:
FreeTopMemoryQuery:
- CancelCostTime: 1m51s
- CancelTasksNum: 1
- FindCostTime: 0.000ns
- FreedMemory: 2.93 GB
WorkloadGroup:
Cache name=DataPageCache:
- CostTime: 15.283ms
- FreedEntrys: 9.56K
- FreedMemory: 691.97 MB
- PruneAllNumber: 1
- PruneStaleNumber: 1
---
be/src/common/config.cpp | 11 +-
be/src/common/config.h | 10 +-
be/src/olap/page_cache.cpp | 25 +--
be/src/olap/page_cache.h | 57 ++++--
.../rowset/segment_v2/inverted_index_cache.cpp | 56 +-----
.../olap/rowset/segment_v2/inverted_index_cache.h | 36 +---
be/src/olap/schema_cache.cpp | 22 ---
be/src/olap/schema_cache.h | 23 ++-
be/src/olap/segment_loader.cpp | 50 ++---
be/src/olap/segment_loader.h | 65 +++----
be/src/olap/storage_engine.cpp | 4 +-
be/src/runtime/exec_env_init.cpp | 5 +-
be/src/runtime/memory/cache_manager.cpp | 49 +++++
be/src/runtime/memory/cache_manager.h | 63 +++++++
be/src/runtime/memory/cache_policy.h | 66 +++++++
be/src/runtime/memory/lru_cache_policy.h | 95 ++++++++++
be/src/runtime/memory/mem_tracker_limiter.cpp | 209 +++++++++++++--------
be/src/runtime/memory/mem_tracker_limiter.h | 25 ++-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 2 +-
be/src/util/mem_info.cpp | 122 +++++-------
be/src/util/mem_info.h | 6 +-
be/src/util/obj_lru_cache.h | 1 +
be/src/vec/common/allocator.cpp | 3 +-
be/test/testutil/run_all_tests.cpp | 1 +
24 files changed, 613 insertions(+), 393 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 95a583b9b0..7306437b20 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -315,6 +315,10 @@ DEFINE_String(pk_storage_page_cache_limit, "10%");
// data page size for primary key index
DEFINE_Int32(primary_key_data_page_size, "32768");
+DEFINE_mInt32(data_page_cache_stale_sweep_time_sec, "300");
+DEFINE_mInt32(index_page_cache_stale_sweep_time_sec, "600");
+DEFINE_mInt32(pk_index_page_cache_stale_sweep_time_sec, "600");
+
DEFINE_Bool(enable_low_cardinality_optimize, "true");
DEFINE_Bool(enable_low_cardinality_cache_code, "true");
@@ -949,11 +953,8 @@ DEFINE_Validator(file_cache_min_file_segment_size, [](const int64_t config) -> b
DEFINE_Bool(clear_file_cache, "false");
DEFINE_Bool(enable_file_cache_query_limit, "false");
-// inverted index searcher cache
-// cache entry stay time after lookup, default 1h
-DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "3600");
-// cache entry that have not been visited for a certain period of time can be cleaned up by GC thread
-DEFINE_mInt32(index_cache_entry_no_visit_gc_time_s, "3600");
+DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
+DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
// inverted index searcher cache size
DEFINE_String(inverted_index_searcher_cache_limit, "10%");
// set `true` to enable insert searcher into cache when write inverted index data
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 656d955c4b..ef976b1f8f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -358,6 +358,12 @@ DECLARE_String(pk_storage_page_cache_limit);
// data page size for primary key index
DECLARE_Int32(primary_key_data_page_size);
+// inc_rowset snapshot rs sweep time interval
+DECLARE_mInt32(data_page_cache_stale_sweep_time_sec);
+DECLARE_mInt32(index_page_cache_stale_sweep_time_sec);
+// great impact on the performance of MOW, so it can be longer.
+DECLARE_mInt32(pk_index_page_cache_stale_sweep_time_sec);
+
DECLARE_Bool(enable_low_cardinality_optimize);
DECLARE_Bool(enable_low_cardinality_cache_code);
@@ -968,10 +974,10 @@ DECLARE_Bool(clear_file_cache);
DECLARE_Bool(enable_file_cache_query_limit);
// inverted index searcher cache
-// cache entry stay time after lookup, default 1h
+// cache entry stay time after lookup
DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
// cache entry that have not been visited for a certain period of time can be cleaned up by GC thread
-DECLARE_mInt32(index_cache_entry_no_visit_gc_time_s);
+DECLARE_mInt32(inverted_index_cache_stale_sweep_time_sec);
// inverted index searcher cache size
DECLARE_String(inverted_index_searcher_cache_limit);
// set `true` to enable insert searcher into cache when write inverted index data
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index de9aeaa28f..47a2379576 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -37,24 +37,20 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta
int64_t pk_index_cache_capacity, uint32_t num_shards)
: _index_cache_percentage(index_cache_percentage) {
if (index_cache_percentage == 0) {
- _data_page_cache = std::unique_ptr<Cache>(
- new_lru_cache("DataPageCache", capacity, LRUCacheType::SIZE, num_shards));
+ _data_page_cache = std::make_unique<DataPageCache>(capacity, num_shards);
} else if (index_cache_percentage == 100) {
- _index_page_cache = std::unique_ptr<Cache>(
- new_lru_cache("IndexPageCache", capacity, LRUCacheType::SIZE, num_shards));
+ _index_page_cache = std::make_unique<IndexPageCache>(capacity, num_shards);
} else if (index_cache_percentage > 0 && index_cache_percentage < 100) {
- _data_page_cache = std::unique_ptr<Cache>(
- new_lru_cache("DataPageCache", capacity * (100 - index_cache_percentage) / 100,
- LRUCacheType::SIZE, num_shards));
- _index_page_cache = std::unique_ptr<Cache>(
- new_lru_cache("IndexPageCache", capacity * index_cache_percentage / 100,
- LRUCacheType::SIZE, num_shards));
+ _data_page_cache = std::make_unique<DataPageCache>(
+ capacity * (100 - index_cache_percentage) / 100, num_shards);
+ _index_page_cache = std::make_unique<IndexPageCache>(
+ capacity * index_cache_percentage / 100, num_shards);
} else {
CHECK(false) << "invalid index page cache percentage";
}
if (pk_index_cache_capacity > 0) {
- _pk_index_page_cache = std::unique_ptr<Cache>(new_lru_cache(
- "PkIndexPageCache", pk_index_cache_capacity, LRUCacheType::SIZE, num_shards));
+ _pk_index_page_cache =
+ std::make_unique<PKIndexPageCache>(pk_index_cache_capacity, num_shards);
}
}
@@ -86,9 +82,4 @@ void StoragePageCache::insert(const CacheKey& key, DataPage* data, PageCacheHand
*handle = PageCacheHandle(cache, lru_handle);
}
-void StoragePageCache::prune(segment_v2::PageTypePB page_type) {
- auto cache = _get_page_cache(page_type);
- cache->prune();
-}
-
} // namespace doris
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index 7e5ca4de6e..b065d52d0e 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -27,6 +27,7 @@
#include <utility>
#include "olap/lru_cache.h"
+#include "runtime/memory/lru_cache_policy.h"
#include "util/slice.h"
#include "vec/common/allocator.h"
#include "vec/common/allocator_fwd.h"
@@ -36,7 +37,7 @@ namespace doris {
class PageCacheHandle;
template <typename TAllocator>
-class PageBase : private TAllocator {
+class PageBase : private TAllocator, LRUCacheValueBase {
public:
PageBase() : _data(nullptr), _size(0), _capacity(0) {}
@@ -99,6 +100,27 @@ public:
}
};
+ class DataPageCache : public LRUCachePolicy {
+ public:
+ DataPageCache(size_t capacity, uint32_t num_shards)
+ : LRUCachePolicy("DataPageCache", capacity, LRUCacheType::SIZE,
+ config::data_page_cache_stale_sweep_time_sec, num_shards) {}
+ };
+
+ class IndexPageCache : public LRUCachePolicy {
+ public:
+ IndexPageCache(size_t capacity, uint32_t num_shards)
+ : LRUCachePolicy("IndexPageCache", capacity, LRUCacheType::SIZE,
+ config::index_page_cache_stale_sweep_time_sec, num_shards) {}
+ };
+
+ class PKIndexPageCache : public LRUCachePolicy {
+ public:
+ PKIndexPageCache(size_t capacity, uint32_t num_shards)
+ : LRUCachePolicy("PKIndexPageCache", capacity, LRUCacheType::SIZE,
+ config::pk_index_page_cache_stale_sweep_time_sec, num_shards) {}
+ };
+
static constexpr uint32_t kDefaultNumShards = 16;
// Create global instance of this class
@@ -138,33 +160,38 @@ public:
return _get_page_cache(page_type) != nullptr;
}
- void prune(segment_v2::PageTypePB page_type);
-
- int64_t get_page_cache_mem_consumption(segment_v2::PageTypePB page_type) {
- return _get_page_cache(page_type)->mem_consumption();
- }
-
private:
StoragePageCache();
static StoragePageCache* _s_instance;
int32_t _index_cache_percentage = 0;
- std::unique_ptr<Cache> _data_page_cache = nullptr;
- std::unique_ptr<Cache> _index_page_cache = nullptr;
+ std::unique_ptr<DataPageCache> _data_page_cache = nullptr;
+ std::unique_ptr<IndexPageCache> _index_page_cache = nullptr;
// Cache data for primary key index data page, seperated from data
// page cache to make it for flexible. we need this cache When construct
// delete bitmap in unique key with mow
- std::unique_ptr<Cache> _pk_index_page_cache = nullptr;
+ std::unique_ptr<PKIndexPageCache> _pk_index_page_cache = nullptr;
Cache* _get_page_cache(segment_v2::PageTypePB page_type) {
switch (page_type) {
case segment_v2::DATA_PAGE: {
- return _data_page_cache.get();
+ if (_data_page_cache) {
+ return _data_page_cache->get();
+ }
+ return nullptr;
+ }
+ case segment_v2::INDEX_PAGE: {
+ if (_index_page_cache) {
+ return _index_page_cache->get();
+ }
+ return nullptr;
+ }
+ case segment_v2::PRIMARY_KEY_INDEX_PAGE: {
+ if (_pk_index_page_cache) {
+ return _pk_index_page_cache->get();
+ }
+ return nullptr;
}
- case segment_v2::INDEX_PAGE:
- return _index_page_cache.get();
- case segment_v2::PRIMARY_KEY_INDEX_PAGE:
- return _pk_index_page_cache.get();
default:
return nullptr;
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index 243a738da9..f3c68984eb 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -62,7 +62,9 @@ void InvertedIndexSearcherCache::create_global_instance(size_t capacity, uint32_
}
InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards)
- : _mem_tracker(std::make_unique<MemTracker>("InvertedIndexSearcherCache")) {
+ : LRUCachePolicy("InvertedIndexSearcherCache",
+ config::inverted_index_cache_stale_sweep_time_sec),
+ _mem_tracker(std::make_unique<MemTracker>("InvertedIndexSearcherCache")) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
uint64_t fd_number = config::min_file_descriptor_number;
struct rlimit l;
@@ -179,32 +181,6 @@ Status InvertedIndexSearcherCache::erase(const std::string& index_file_path) {
return Status::OK();
}
-int64_t InvertedIndexSearcherCache::prune() {
- if (_cache) {
- const int64_t curtime = UnixMillis();
- int64_t byte_size = 0L;
- auto pred = [curtime, &byte_size](const void* value) -> bool {
- InvertedIndexSearcherCache::CacheValue* cache_value =
- (InvertedIndexSearcherCache::CacheValue*)value;
- if ((cache_value->last_visit_time +
- config::index_cache_entry_no_visit_gc_time_s * 1000) < curtime) {
- byte_size += cache_value->size;
- return true;
- }
- return false;
- };
-
- MonotonicStopWatch watch;
- watch.start();
- // Prune cache in lazy mode to save cpu and minimize the time holding write lock
- int64_t prune_num = _cache->prune_if(pred, true);
- LOG(INFO) << "prune " << prune_num << " entries in inverted index cache. cost(ms): "
- << watch.elapsed_time() / 1000 / 1000;
- return byte_size;
- }
- return 0L;
-}
-
int64_t InvertedIndexSearcherCache::mem_consumption() {
if (_cache) {
return _cache->mem_consumption();
@@ -269,32 +245,6 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin
*handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle);
}
-int64_t InvertedIndexQueryCache::prune() {
- if (_cache) {
- const int64_t curtime = UnixMillis();
- int64_t byte_size = 0L;
- auto pred = [curtime, &byte_size](const void* value) -> bool {
- InvertedIndexQueryCache::CacheValue* cache_value =
- (InvertedIndexQueryCache::CacheValue*)value;
- if ((cache_value->last_visit_time +
- config::index_cache_entry_no_visit_gc_time_s * 1000) < curtime) {
- byte_size += cache_value->size;
- return true;
- }
- return false;
- };
-
- MonotonicStopWatch watch;
- watch.start();
- // Prune cache in lazy mode to save cpu and minimize the time holding write lock
- int64_t prune_num = _cache->prune_if(pred, true);
- LOG(INFO) << "prune " << prune_num << " entries in inverted index cache. cost(ms): "
- << watch.elapsed_time() / 1000 / 1000;
- return byte_size;
- }
- return 0L;
-}
-
int64_t InvertedIndexQueryCache::mem_consumption() {
if (_cache) {
return _cache->mem_consumption();
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.h b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
index 941fb2dc63..9f368eca0c 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.h
@@ -37,6 +37,7 @@
#include "io/fs/path.h"
#include "olap/lru_cache.h"
#include "olap/rowset/segment_v2/inverted_index_query_type.h"
+#include "runtime/memory/lru_cache_policy.h"
#include "runtime/memory/mem_tracker.h"
#include "util/slice.h"
#include "util/time.h"
@@ -55,7 +56,7 @@ using IndexSearcherPtr = std::shared_ptr<lucene::search::IndexSearcher>;
class InvertedIndexCacheHandle;
-class InvertedIndexSearcherCache {
+class InvertedIndexSearcherCache : public LRUCachePolicy {
public:
// The cache key of index_searcher lru cache
struct CacheKey {
@@ -65,12 +66,8 @@ public:
// The cache value of index_searcher lru cache.
// Holding a opened index_searcher.
- struct CacheValue {
- // Save the last visit time of this cache entry.
- // Use atomic because it may be modified by multi threads.
- std::atomic<int64_t> last_visit_time = 0;
+ struct CacheValue : public LRUCacheValueBase {
IndexSearcherPtr index_searcher;
- size_t size = 0;
};
// Create global instance of this class.
@@ -110,8 +107,6 @@ public:
// function `erase` called after compaction remove segment
Status erase(const std::string& index_file_path);
- int64_t prune();
-
int64_t mem_consumption();
private:
@@ -129,8 +124,6 @@ private:
private:
static InvertedIndexSearcherCache* _s_instance;
- // A LRU cache to cache all opened index_searcher
- std::unique_ptr<Cache> _cache = nullptr;
std::unique_ptr<MemTracker> _mem_tracker = nullptr;
};
@@ -199,7 +192,7 @@ private:
class InvertedIndexQueryCacheHandle;
-class InvertedIndexQueryCache {
+class InvertedIndexQueryCache : public LRUCachePolicy {
public:
// cache key
struct CacheKey {
@@ -226,19 +219,14 @@ public:
}
};
- struct CacheValue {
- // Save the last visit time of this cache entry.
- // Use atomic because it may be modified by multi threads.
- std::atomic<int64_t> last_visit_time = 0;
+ struct CacheValue : public LRUCacheValueBase {
std::shared_ptr<roaring::Roaring> bitmap;
- size_t size = 0;
};
// Create global instance of this class
- static void create_global_cache(size_t capacity, int32_t index_cache_percentage,
- uint32_t num_shards = 16) {
+ static void create_global_cache(size_t capacity, uint32_t num_shards = 16) {
DCHECK(_s_instance == nullptr);
- static InvertedIndexQueryCache instance(capacity, index_cache_percentage, num_shards);
+ static InvertedIndexQueryCache instance(capacity, num_shards);
_s_instance = &instance;
}
@@ -248,23 +236,19 @@ public:
InvertedIndexQueryCache() = delete;
- InvertedIndexQueryCache(size_t capacity, int32_t index_cache_percentage, uint32_t num_shards) {
- _cache = std::unique_ptr<Cache>(
- new_lru_cache("InvertedIndexQueryCache", capacity, LRUCacheType::SIZE, num_shards));
- }
+ InvertedIndexQueryCache(size_t capacity, uint32_t num_shards)
+ : LRUCachePolicy("InvertedIndexQueryCache", capacity, LRUCacheType::SIZE,
+ config::inverted_index_cache_stale_sweep_time_sec, num_shards) {}
bool lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle);
void insert(const CacheKey& key, std::shared_ptr<roaring::Roaring> bitmap,
InvertedIndexQueryCacheHandle* handle);
- int64_t prune();
-
int64_t mem_consumption();
private:
static InvertedIndexQueryCache* _s_instance;
- std::unique_ptr<Cache> _cache {nullptr};
};
class InvertedIndexQueryCacheHandle {
diff --git a/be/src/olap/schema_cache.cpp b/be/src/olap/schema_cache.cpp
index 02b17f18ff..39d1a60a4c 100644
--- a/be/src/olap/schema_cache.cpp
+++ b/be/src/olap/schema_cache.cpp
@@ -75,26 +75,4 @@ void SchemaCache::create_global_instance(size_t capacity) {
_s_instance = &instance;
}
-SchemaCache::SchemaCache(size_t capacity) {
- _schema_cache =
- std::unique_ptr<Cache>(new_lru_cache("SchemaCache", capacity, LRUCacheType::NUMBER));
-}
-
-Status SchemaCache::prune() {
- const int64_t curtime = UnixMillis();
- auto pred = [curtime](const void* value) -> bool {
- CacheValue* cache_value = (CacheValue*)value;
- return (cache_value->last_visit_time + config::schema_cache_sweep_time_sec * 1000) <
- curtime;
- };
-
- MonotonicStopWatch watch;
- watch.start();
- // Prune cache in lazy mode to save cpu and minimize the time holding write lock
- int64_t prune_num = _schema_cache->prune_if(pred, true);
- LOG(INFO) << "prune " << prune_num
- << " entries in SchemaCache cache. cost(ms): " << watch.elapsed_time() / 1000 / 1000;
- return Status::OK();
-}
-
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/schema_cache.h b/be/src/olap/schema_cache.h
index b45a252696..f34f7c296d 100644
--- a/be/src/olap/schema_cache.h
+++ b/be/src/olap/schema_cache.h
@@ -44,7 +44,7 @@ using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>;
// eliminating the need for frequent allocation and deallocation during usage.
// This caching mechanism proves immensely advantageous, particularly in scenarios
// with high concurrency, where queries are executed simultaneously.
-class SchemaCache {
+class SchemaCache : public LRUCachePolicy {
public:
enum class Type { TABLET_SCHEMA = 0, SCHEMA = 1 };
@@ -65,11 +65,10 @@ public:
if (!_s_instance || schema_key.empty()) {
return {};
}
- auto lru_handle = _schema_cache->lookup(schema_key);
+ auto lru_handle = _cache->lookup(schema_key);
if (lru_handle) {
- Defer release(
- [cache = _schema_cache.get(), lru_handle] { cache->release(lru_handle); });
- auto value = (CacheValue*)_schema_cache->value(lru_handle);
+ Defer release([cache = _cache.get(), lru_handle] { cache->release(lru_handle); });
+ auto value = (CacheValue*)_cache->value(lru_handle);
value->last_visit_time = UnixMillis();
VLOG_DEBUG << "use cache schema";
if constexpr (std::is_same_v<SchemaType, TabletSchemaSPtr>) {
@@ -101,26 +100,26 @@ public:
CacheValue* cache_value = (CacheValue*)value;
delete cache_value;
};
- auto lru_handle = _schema_cache->insert(key, value, sizeof(CacheValue), deleter,
- CachePriority::NORMAL, schema->mem_size());
- _schema_cache->release(lru_handle);
+ auto lru_handle = _cache->insert(key, value, sizeof(CacheValue), deleter,
+ CachePriority::NORMAL, schema->mem_size());
+ _cache->release(lru_handle);
}
// Try to prune the cache if expired.
Status prune();
- struct CacheValue {
+ struct CacheValue : public LRUCacheValueBase {
Type type;
- std::atomic<int64_t> last_visit_time = 0;
// either tablet_schema or schema
TabletSchemaSPtr tablet_schema = nullptr;
SchemaSPtr schema = nullptr;
};
private:
- SchemaCache(size_t capacity);
+ SchemaCache(size_t capacity)
+ : LRUCachePolicy("SchemaCache", capacity, LRUCacheType::NUMBER,
+ config::schema_cache_sweep_time_sec) {}
static constexpr char SCHEMA_DELIMITER = '-';
- std::unique_ptr<Cache> _schema_cache = nullptr;
static SchemaCache* _s_instance;
};
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 3d61ef05db..aad8ac517a 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -32,12 +32,7 @@ void SegmentLoader::create_global_instance(size_t capacity) {
_s_instance = &instance;
}
-SegmentLoader::SegmentLoader(size_t capacity) {
- _cache = std::unique_ptr<Cache>(
- new_lru_cache("SegmentMetaCache", capacity, LRUCacheType::NUMBER));
-}
-
-bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key, SegmentCacheHandle* handle) {
+bool SegmentCache::lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle) {
auto lru_handle = _cache->lookup(key.encode());
if (lru_handle == nullptr) {
return false;
@@ -46,10 +41,10 @@ bool SegmentLoader::_lookup(const SegmentLoader::CacheKey& key, SegmentCacheHand
return true;
}
-void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, SegmentLoader::CacheValue& value,
- SegmentCacheHandle* handle) {
+void SegmentCache::insert(const SegmentCache::CacheKey& key, SegmentCache::CacheValue& value,
+ SegmentCacheHandle* handle) {
auto deleter = [](const doris::CacheKey& key, void* value) {
- SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*)value;
+ SegmentCache::CacheValue* cache_value = (SegmentCache::CacheValue*)value;
cache_value->segments.clear();
delete cache_value;
};
@@ -59,15 +54,19 @@ void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, SegmentLoader::C
meta_mem_usage += segment->meta_mem_usage();
}
- auto lru_handle = _cache->insert(key.encode(), &value, sizeof(SegmentLoader::CacheValue),
+ auto lru_handle = _cache->insert(key.encode(), &value, sizeof(SegmentCache::CacheValue),
deleter, CachePriority::NORMAL, meta_mem_usage);
*handle = SegmentCacheHandle(_cache.get(), lru_handle);
}
+void SegmentCache::erase(const SegmentCache::CacheKey& key) {
+ _cache->erase(key.encode());
+}
+
Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle, bool use_cache) {
- SegmentLoader::CacheKey cache_key(rowset->rowset_id());
- if (_lookup(cache_key, cache_handle)) {
+ SegmentCache::CacheKey cache_key(rowset->rowset_id());
+ if (_segment_cache->lookup(cache_key, cache_handle)) {
cache_handle->owned = false;
return Status::OK();
}
@@ -77,10 +76,10 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
RETURN_IF_ERROR(rowset->load_segments(&segments));
if (use_cache) {
- // memory of SegmentLoader::CacheValue will be handled by SegmentLoader
- SegmentLoader::CacheValue* cache_value = new SegmentLoader::CacheValue();
+ // memory of SegmentCache::CacheValue will be handled by SegmentCache
+ SegmentCache::CacheValue* cache_value = new SegmentCache::CacheValue();
cache_value->segments = std::move(segments);
- _insert(cache_key, *cache_value, cache_handle);
+ _segment_cache->insert(cache_key, *cache_value, cache_handle);
} else {
cache_handle->segments = std::move(segments);
}
@@ -88,25 +87,8 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
return Status::OK();
}
-void SegmentLoader::erase_segment(const SegmentLoader::CacheKey& key) {
- _cache->erase(key.encode());
-}
-
-Status SegmentLoader::prune() {
- const int64_t curtime = UnixMillis();
- auto pred = [curtime](const void* value) -> bool {
- SegmentLoader::CacheValue* cache_value = (SegmentLoader::CacheValue*)value;
- return (cache_value->last_visit_time + config::tablet_rowset_stale_sweep_time_sec * 1000) <
- curtime;
- };
-
- MonotonicStopWatch watch;
- watch.start();
- // Prune cache in lazy mode to save cpu and minimize the time holding write lock
- int64_t prune_num = _cache->prune_if(pred, true);
- LOG(INFO) << "prune " << prune_num
- << " entries in segment cache. cost(ms): " << watch.elapsed_time() / 1000 / 1000;
- return Status::OK();
+void SegmentLoader::erase_segment(const SegmentCache::CacheKey& key) {
+ _segment_cache->erase(key);
}
} // namespace doris
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 9e6a8af82a..600692750e 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -33,6 +33,7 @@
#include "olap/lru_cache.h"
#include "olap/olap_common.h" // for rowset id
#include "olap/rowset/segment_v2/segment.h"
+#include "runtime/memory/lru_cache_policy.h"
#include "util/time.h"
namespace doris {
@@ -53,7 +54,8 @@ class BetaRowset;
//
// Make sure that cache_handle is valid during the segment usage period.
using BetaRowsetSharedPtr = std::shared_ptr<BetaRowset>;
-class SegmentLoader {
+
+class SegmentCache : public LRUCachePolicy {
public:
// The cache key or segment lru cache
struct CacheKey {
@@ -61,18 +63,34 @@ public:
RowsetId rowset_id;
// Encode to a flat binary which can be used as LRUCache's key
- std::string encode() const { return rowset_id.to_string(); }
+ [[nodiscard]] std::string encode() const { return rowset_id.to_string(); }
};
// The cache value of segment lru cache.
// Holding all opened segments of a rowset.
- struct CacheValue {
- // Save the last visit time of this cache entry.
- // Use atomic because it may be modified by multi threads.
- std::atomic<int64_t> last_visit_time = 0;
+ struct CacheValue : public LRUCacheValueBase {
std::vector<segment_v2::SegmentSharedPtr> segments;
};
+ SegmentCache(size_t capacity)
+ : LRUCachePolicy("SegmentCache", capacity, LRUCacheType::NUMBER,
+ config::tablet_rowset_stale_sweep_time_sec) {}
+
+ // Lookup the given rowset in the cache.
+ // If the rowset is found, the cache entry will be written into handle.
+ // Return true if entry is found, otherwise return false.
+ bool lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle);
+
+ // Insert a cache entry by key.
+ // And the cache entry will be returned in handle.
+ // This function is thread-safe.
+ void insert(const SegmentCache::CacheKey& key, CacheValue& value, SegmentCacheHandle* handle);
+
+ void erase(const SegmentCache::CacheKey& key);
+};
+
+class SegmentLoader {
+public:
// Create global instance of this class.
// "capacity" is the capacity of lru cache.
// TODO: Currently we use the number of rowset as the cache capacity.
@@ -86,43 +104,20 @@ public:
// Client should call create_global_cache before.
static SegmentLoader* instance() { return _s_instance; }
- SegmentLoader(size_t capacity);
+ SegmentLoader(size_t capacity) { _segment_cache = std::make_unique<SegmentCache>(capacity); }
// Load segments of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
bool use_cache = false);
- void erase_segment(const SegmentLoader::CacheKey& key);
-
- // Try to prune the segment cache if expired.
- Status prune();
- int64_t prune_all() { return _cache->prune(); };
- int64_t segment_cache_mem_consumption() { return _cache->mem_consumption(); }
- int64_t segment_cache_get_usage() { return _cache->get_usage(); }
- double segment_cache_get_usage_ratio() {
- return _cache->get_total_capacity() == 0
- ? 0
- : ((double)_cache->get_usage() / _cache->get_total_capacity());
- }
+ void erase_segment(const SegmentCache::CacheKey& key);
private:
SegmentLoader();
- // Lookup the given rowset in the cache.
- // If the rowset is found, the cache entry will be written into handle.
- // Return true if entry is found, otherwise return false.
- bool _lookup(const SegmentLoader::CacheKey& key, SegmentCacheHandle* handle);
-
- // Insert a cache entry by key.
- // And the cache entry will be returned in handle.
- // This function is thread-safe.
- void _insert(const SegmentLoader::CacheKey& key, CacheValue& value, SegmentCacheHandle* handle);
-
-private:
static SegmentLoader* _s_instance;
- // A LRU cache to cache all opened segments
- std::unique_ptr<Cache> _cache = nullptr;
+ std::unique_ptr<SegmentCache> _segment_cache = nullptr;
};
// A handle for a single rowset from segment lru cache.
@@ -132,7 +127,7 @@ private:
// So the caller need to make sure the handle is valid in lifecycle.
class SegmentCacheHandle {
public:
- SegmentCacheHandle() {}
+ SegmentCacheHandle() = default;
SegmentCacheHandle(Cache* cache, Cache::Handle* handle) : _cache(cache), _handle(handle) {}
~SegmentCacheHandle() {
@@ -142,7 +137,7 @@ public:
CHECK(!owned);
// last_visit_time is set when release.
// because it only be needed when pruning.
- ((SegmentLoader::CacheValue*)_cache->value(_handle))->last_visit_time = UnixMillis();
+ ((SegmentCache::CacheValue*)_cache->value(_handle))->last_visit_time = UnixMillis();
_cache->release(_handle);
}
}
@@ -166,7 +161,7 @@ public:
if (owned) {
return segments;
} else {
- return ((SegmentLoader::CacheValue*)_cache->value(_handle))->segments;
+ return ((SegmentCache::CacheValue*)_cache->value(_handle))->segments;
}
}
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 6a2b869099..af10713ea6 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -66,6 +66,7 @@
#include "olap/tablet_meta.h"
#include "olap/task/engine_task.h"
#include "olap/txn_manager.h"
+#include "runtime/memory/cache_manager.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/doris_metrics.h"
@@ -614,8 +615,7 @@ void StorageEngine::clear_transaction_task(const TTransactionId transaction_id,
}
void StorageEngine::_start_clean_cache() {
- SegmentLoader::instance()->prune();
- SchemaCache::instance()->prune();
+ CacheManager::instance()->for_each_cache_prune_stale();
}
Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 29ac809246..6599f36d79 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -55,6 +55,7 @@
#include "runtime/heartbeat_flags.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/load_path_mgr.h"
+#include "runtime/memory/cache_manager.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
@@ -228,6 +229,8 @@ Status ExecEnv::_init_mem_env() {
}
// 3. init storage page cache
+ CacheManager::create_global_instance();
+
int64_t storage_cache_limit =
ParseUtil::parse_mem_spec(config::storage_page_cache_limit, MemInfo::mem_limit(),
MemInfo::physical_mem(), &is_percent);
@@ -308,7 +311,7 @@ Status ExecEnv::_init_mem_env() {
// Reason same as buffer_pool_limit
inverted_index_query_cache_limit = inverted_index_query_cache_limit / 2;
}
- InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit, 10);
+ InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit);
LOG(INFO) << "Inverted index query match cache memory limit: "
<< PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::inverted_index_query_cache_limit;
diff --git a/be/src/runtime/memory/cache_manager.cpp b/be/src/runtime/memory/cache_manager.cpp
new file mode 100644
index 0000000000..027ed81b16
--- /dev/null
+++ b/be/src/runtime/memory/cache_manager.cpp
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/cache_manager.h"
+
+#include "runtime/memory/cache_policy.h"
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+int64_t CacheManager::for_each_cache_prune_stale_wrap(
+ std::function<void(CachePolicy* cache_policy)> func, RuntimeProfile* profile) {
+ int64_t freed_size = 0;
+ std::lock_guard<std::mutex> l(_caches_lock);
+ for (auto cache_policy : _caches) {
+ func(cache_policy);
+ freed_size += cache_policy->profile()->get_counter("FreedMemory")->value();
+ if (cache_policy->profile()->get_counter("FreedMemory")->value() != 0 && profile) {
+ profile->add_child(cache_policy->profile(), true, nullptr);
+ }
+ }
+ return freed_size;
+}
+
+int64_t CacheManager::for_each_cache_prune_stale(RuntimeProfile* profile) {
+ return for_each_cache_prune_stale_wrap(
+ [](CachePolicy* cache_policy) { cache_policy->prune_stale(); }, profile);
+}
+
+int64_t CacheManager::for_each_cache_prune_all(RuntimeProfile* profile) {
+ return for_each_cache_prune_stale_wrap(
+ [](CachePolicy* cache_policy) { cache_policy->prune_all(); }, profile);
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/cache_manager.h b/be/src/runtime/memory/cache_manager.h
new file mode 100644
index 0000000000..6086c02b94
--- /dev/null
+++ b/be/src/runtime/memory/cache_manager.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+class CachePolicy;
+
+// Hold the list of all caches, for prune when memory not enough or timing.
+class CacheManager {
+public:
+ static void create_global_instance() {
+ DCHECK(_s_instance == nullptr);
+ static CacheManager instance;
+ _s_instance = &instance;
+ }
+ static CacheManager* instance() { return _s_instance; }
+
+ std::list<CachePolicy*>::iterator register_cache(CachePolicy* cache) {
+ std::lock_guard<std::mutex> l(_caches_lock);
+ return _caches.insert(_caches.end(), cache);
+ }
+
+ void unregister_cache(std::list<CachePolicy*>::iterator it) {
+ std::lock_guard<std::mutex> l(_caches_lock);
+ if (it != _caches.end()) {
+ _caches.erase(it);
+ it = _caches.end();
+ }
+ }
+
+ int64_t for_each_cache_prune_stale_wrap(std::function<void(CachePolicy* cache_policy)> func,
+ RuntimeProfile* profile = nullptr);
+
+ int64_t for_each_cache_prune_stale(RuntimeProfile* profile = nullptr);
+
+ int64_t for_each_cache_prune_all(RuntimeProfile* profile = nullptr);
+
+private:
+ static inline CacheManager* _s_instance = nullptr;
+
+ std::mutex _caches_lock;
+ std::list<CachePolicy*> _caches;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/memory/cache_policy.h b/be/src/runtime/memory/cache_policy.h
new file mode 100644
index 0000000000..14308088e6
--- /dev/null
+++ b/be/src/runtime/memory/cache_policy.h
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/memory/cache_manager.h"
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+static constexpr int32_t CACHE_MIN_FREE_SIZE = 67108864; // 64M
+
+// Base of all caches. register to CacheManager when cache is constructed.
+class CachePolicy {
+public:
+ CachePolicy(const std::string& name, uint32_t stale_sweep_time_s)
+ : _name(name), _stale_sweep_time_s(stale_sweep_time_s) {
+ _it = CacheManager::instance()->register_cache(this);
+ init_profile();
+ }
+
+ virtual ~CachePolicy() { CacheManager::instance()->unregister_cache(_it); };
+ virtual void prune_stale() = 0;
+ virtual void prune_all() = 0;
+
+ RuntimeProfile* profile() { return _profile.get(); }
+
+protected:
+ void init_profile() {
+ _profile = std::make_unique<RuntimeProfile>(fmt::format("Cache name={}", _name));
+ _prune_stale_number_counter = ADD_COUNTER(_profile, "PruneStaleNumber", TUnit::UNIT);
+ _prune_all_number_counter = ADD_COUNTER(_profile, "PruneAllNumber", TUnit::UNIT);
+ _freed_memory_counter = ADD_COUNTER(_profile, "FreedMemory", TUnit::BYTES);
+ _freed_entrys_counter = ADD_COUNTER(_profile, "FreedEntrys", TUnit::UNIT);
+ _cost_timer = ADD_TIMER(_profile, "CostTime");
+ }
+
+ std::string _name;
+ std::list<CachePolicy*>::iterator _it;
+
+ std::unique_ptr<RuntimeProfile> _profile;
+ RuntimeProfile::Counter* _prune_stale_number_counter = nullptr;
+ RuntimeProfile::Counter* _prune_all_number_counter = nullptr;
+ // Reset before each gc
+ RuntimeProfile::Counter* _freed_memory_counter = nullptr;
+ RuntimeProfile::Counter* _freed_entrys_counter = nullptr;
+ RuntimeProfile::Counter* _cost_timer = nullptr;
+
+ uint32_t _stale_sweep_time_s;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h
new file mode 100644
index 0000000000..fd900bea6c
--- /dev/null
+++ b/be/src/runtime/memory/lru_cache_policy.h
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/lru_cache.h"
+#include "runtime/memory/cache_policy.h"
+#include "util/time.h"
+
+namespace doris {
+
+// Base of the lru cache value.
+struct LRUCacheValueBase {
+ // Save the last visit time of this cache entry.
+ // Use atomic because it may be modified by multi threads.
+ std::atomic<int64_t> last_visit_time = 0;
+ size_t size = 0;
+};
+
+// Base of lru cache, allow prune stale entry and prune all entry.
+class LRUCachePolicy : public CachePolicy {
+public:
+ LRUCachePolicy(const std::string& name, uint32_t stale_sweep_time_s)
+ : CachePolicy(name, stale_sweep_time_s) {};
+ LRUCachePolicy(const std::string& name, size_t capacity, LRUCacheType type,
+ uint32_t stale_sweep_time_s, uint32_t num_shards = -1)
+ : CachePolicy(name, stale_sweep_time_s) {
+ _cache = num_shards == -1
+ ? std::unique_ptr<Cache>(new_lru_cache(name, capacity, type))
+ : std::unique_ptr<Cache>(new_lru_cache(name, capacity, type, num_shards));
+ }
+
+ ~LRUCachePolicy() override = default;
+
+ // Try to prune the cache if expired.
+ void prune_stale() override {
+ if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) {
+ COUNTER_SET(_cost_timer, (int64_t)0);
+ SCOPED_TIMER(_cost_timer);
+ const int64_t curtime = UnixMillis();
+ int64_t byte_size = 0L;
+ auto pred = [this, curtime, &byte_size](const void* value) -> bool {
+ LRUCacheValueBase* cache_value = (LRUCacheValueBase*)value;
+ if ((cache_value->last_visit_time + _stale_sweep_time_s * 1000) < curtime) {
+ byte_size += cache_value->size;
+ return true;
+ }
+ return false;
+ };
+
+ // Prune cache in lazy mode to save cpu and minimize the time holding write lock
+ COUNTER_SET(_freed_entrys_counter, _cache->prune_if(pred, true));
+ COUNTER_SET(_freed_memory_counter, byte_size);
+ COUNTER_UPDATE(_prune_stale_number_counter, 1);
+ LOG(INFO) << fmt::format("{} prune stale {} entries, {} bytes, {} times prune", _name,
+ _freed_entrys_counter->value(), _freed_memory_counter->value(),
+ _prune_stale_number_counter->value());
+ }
+ }
+
+ void prune_all() override {
+ if (_cache->mem_consumption() > CACHE_MIN_FREE_SIZE) {
+ COUNTER_SET(_cost_timer, (int64_t)0);
+ SCOPED_TIMER(_cost_timer);
+ auto size = _cache->mem_consumption();
+ COUNTER_SET(_freed_entrys_counter, _cache->prune());
+ COUNTER_SET(_freed_memory_counter, size);
+ COUNTER_UPDATE(_prune_all_number_counter, 1);
+ LOG(INFO) << fmt::format("{} prune all {} entries, {} bytes, {} times prune", _name,
+ _freed_entrys_counter->value(), _freed_memory_counter->value(),
+ _prune_stale_number_counter->value());
+ }
+ }
+
+ Cache* get() { return _cache.get(); }
+
+protected:
+ std::unique_ptr<Cache> _cache;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index e6c09d51c1..e44da881eb 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -35,6 +35,7 @@
#include "util/mem_info.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
+#include "util/runtime_profile.h"
namespace doris {
@@ -45,6 +46,18 @@ static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(MEM_TRACKER_GRO
std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
+// Reset before each free
+static std::unique_ptr<RuntimeProfile> free_top_memory_task_profile {
+ std::make_unique<RuntimeProfile>("-")};
+static RuntimeProfile::Counter* find_cost_time =
+ ADD_TIMER(free_top_memory_task_profile, "FindCostTime");
+static RuntimeProfile::Counter* cancel_cost_time =
+ ADD_TIMER(free_top_memory_task_profile, "CancelCostTime");
+static RuntimeProfile::Counter* freed_memory_counter =
+ ADD_COUNTER(free_top_memory_task_profile, "FreedMemory", TUnit::BYTES);
+static RuntimeProfile::Counter* cancel_tasks_counter =
+ ADD_COUNTER(free_top_memory_task_profile, "CancelTasksNum", TUnit::UNIT);
+
MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit,
RuntimeProfile* profile,
const std::string& profile_counter_name) {
@@ -324,7 +337,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) {
int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
const std::string& vm_rss_str,
- const std::string& mem_available_str, Type type) {
+ const std::string& mem_available_str,
+ RuntimeProfile* profile, Type type) {
return free_top_memory_query(
min_free_mem, type, mem_tracker_limiter_pool,
[&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
@@ -339,13 +353,15 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
BackendOptions::get_localhost(), vm_rss_str, MemInfo::mem_limit_str(),
mem_available_str,
print_bytes(MemInfo::sys_mem_available_low_water_mark()));
- });
+ },
+ profile);
}
template <typename TrackerGroups>
int64_t MemTrackerLimiter::free_top_memory_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups,
- const std::function<std::string(int64_t, const std::string&)>& cancel_msg) {
+ const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
+ RuntimeProfile* profile) {
using MemTrackerMinQueue = std::priority_queue<std::pair<int64_t, std::string>,
std::vector<std::pair<int64_t, std::string>>,
std::greater<std::pair<int64_t, std::string>>>;
@@ -353,54 +369,66 @@ int64_t MemTrackerLimiter::free_top_memory_query(
// After greater than min_free_mem, will not be modified.
int64_t prepare_free_mem = 0;
std::vector<std::string> canceling_task;
+ COUNTER_SET(cancel_cost_time, (int64_t)0);
+ COUNTER_SET(find_cost_time, (int64_t)0);
+ COUNTER_SET(freed_memory_counter, (int64_t)0);
+ COUNTER_SET(cancel_tasks_counter, (int64_t)0);
- auto cancel_top_query = [&cancel_msg, type](auto& min_pq, auto& canceling_task) -> int64_t {
+ auto cancel_top_query = [&cancel_msg, type, profile](auto& min_pq,
+ auto& canceling_task) -> int64_t {
std::vector<std::string> usage_strings;
- int64_t freed_mem = 0;
- while (!min_pq.empty()) {
- TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
- if (cancelled_queryid == TUniqueId()) {
+ {
+ SCOPED_TIMER(cancel_cost_time);
+ while (!min_pq.empty()) {
+ TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
+ if (cancelled_queryid == TUniqueId()) {
+ min_pq.pop();
+ continue;
+ }
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+ cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
+ cancel_msg(min_pq.top().first, min_pq.top().second));
+
+ COUNTER_UPDATE(freed_memory_counter, min_pq.top().first);
+ COUNTER_UPDATE(cancel_tasks_counter, 1);
+ usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
+ min_pq.top().first));
min_pq.pop();
- continue;
}
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
- cancel_msg(min_pq.top().first, min_pq.top().second));
-
- freed_mem += min_pq.top().first;
- usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
- min_pq.top().first));
- min_pq.pop();
}
+ profile->merge(free_top_memory_task_profile.get());
LOG(INFO) << "Process GC Free Top Memory Usage " << type_string(type) << ": "
<< join(usage_strings, ",")
<< ". previous canceling task: " << join(canceling_task, ",");
- return freed_mem;
+ return freed_memory_counter->value();
};
- for (unsigned i = 1; i < tracker_groups.size(); ++i) {
- std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
- for (auto tracker : tracker_groups[i].trackers) {
- if (tracker->type() == type) {
- if (tracker->is_query_cancelled()) {
- canceling_task.push_back(
- fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption()));
- continue;
- }
- if (tracker->consumption() > min_free_mem) {
- MemTrackerMinQueue min_pq_single;
- min_pq_single.emplace(tracker->consumption(), tracker->label());
- return cancel_top_query(min_pq_single, canceling_task);
- } else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
- min_pq.emplace(tracker->consumption(), tracker->label());
- prepare_free_mem += tracker->consumption();
- } else if (tracker->consumption() > min_pq.top().first) {
- min_pq.emplace(tracker->consumption(), tracker->label());
- prepare_free_mem += tracker->consumption();
- while (prepare_free_mem - min_pq.top().first > min_free_mem) {
- prepare_free_mem -= min_pq.top().first;
- min_pq.pop();
+ {
+ SCOPED_TIMER(find_cost_time);
+ for (unsigned i = 1; i < tracker_groups.size(); ++i) {
+ std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
+ for (auto tracker : tracker_groups[i].trackers) {
+ if (tracker->type() == type) {
+ if (tracker->is_query_cancelled()) {
+ canceling_task.push_back(fmt::format("{}:{} Bytes", tracker->label(),
+ tracker->consumption()));
+ continue;
+ }
+ if (tracker->consumption() > min_free_mem) {
+ MemTrackerMinQueue min_pq_single;
+ min_pq_single.emplace(tracker->consumption(), tracker->label());
+ return cancel_top_query(min_pq_single, canceling_task);
+ } else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
+ min_pq.emplace(tracker->consumption(), tracker->label());
+ prepare_free_mem += tracker->consumption();
+ } else if (tracker->consumption() > min_pq.top().first) {
+ min_pq.emplace(tracker->consumption(), tracker->label());
+ prepare_free_mem += tracker->consumption();
+ while (prepare_free_mem - min_pq.top().first > min_free_mem) {
+ prepare_free_mem -= min_pq.top().first;
+ min_pq.pop();
+ }
}
}
}
@@ -412,7 +440,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(
int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
const std::string& vm_rss_str,
const std::string& mem_available_str,
- Type type) {
+ RuntimeProfile* profile, Type type) {
return free_top_overcommit_query(
min_free_mem, type, mem_tracker_limiter_pool,
[&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
@@ -427,35 +455,45 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
BackendOptions::get_localhost(), vm_rss_str, MemInfo::soft_mem_limit_str(),
mem_available_str,
print_bytes(MemInfo::sys_mem_available_warning_water_mark()));
- });
+ },
+ profile);
}
template <typename TrackerGroups>
int64_t MemTrackerLimiter::free_top_overcommit_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups,
- const std::function<std::string(int64_t, const std::string&)>& cancel_msg) {
+ const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
+ RuntimeProfile* profile) {
std::priority_queue<std::pair<int64_t, std::string>> max_pq;
std::unordered_map<std::string, int64_t> query_consumption;
std::vector<std::string> canceling_task;
+ COUNTER_SET(cancel_cost_time, (int64_t)0);
+ COUNTER_SET(find_cost_time, (int64_t)0);
+ COUNTER_SET(freed_memory_counter, (int64_t)0);
+ COUNTER_SET(cancel_tasks_counter, (int64_t)0);
- for (unsigned i = 1; i < tracker_groups.size(); ++i) {
- std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
- for (auto tracker : tracker_groups[i].trackers) {
- if (tracker->type() == type) {
- // 32M small query does not cancel
- if (tracker->consumption() <= 33554432 ||
- tracker->consumption() < tracker->limit()) {
- continue;
- }
- if (tracker->is_query_cancelled()) {
- canceling_task.push_back(
- fmt::format("{}:{} Bytes", tracker->label(), tracker->consumption()));
- continue;
+ {
+ SCOPED_TIMER(find_cost_time);
+ for (unsigned i = 1; i < tracker_groups.size(); ++i) {
+ std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
+ for (auto tracker : tracker_groups[i].trackers) {
+ if (tracker->type() == type) {
+ // 32M small query does not cancel
+ if (tracker->consumption() <= 33554432 ||
+ tracker->consumption() < tracker->limit()) {
+ continue;
+ }
+ if (tracker->is_query_cancelled()) {
+ canceling_task.push_back(fmt::format("{}:{} Bytes", tracker->label(),
+ tracker->consumption()));
+ continue;
+ }
+ int64_t overcommit_ratio =
+ (static_cast<double>(tracker->consumption()) / tracker->limit()) *
+ 10000;
+ max_pq.emplace(overcommit_ratio, tracker->label());
+ query_consumption[tracker->label()] = tracker->consumption();
}
- int64_t overcommit_ratio =
- (static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000;
- max_pq.emplace(overcommit_ratio, tracker->label());
- query_consumption[tracker->label()] = tracker->consumption();
}
}
}
@@ -466,37 +504,42 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
}
std::vector<std::string> usage_strings;
- int64_t freed_mem = 0;
- while (!max_pq.empty()) {
- TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second);
- if (cancelled_queryid == TUniqueId()) {
+ {
+ SCOPED_TIMER(cancel_cost_time);
+ while (!max_pq.empty()) {
+ TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second);
+ if (cancelled_queryid == TUniqueId()) {
+ max_pq.pop();
+ continue;
+ }
+ int64_t query_mem = query_consumption[max_pq.top().second];
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+ cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
+ cancel_msg(query_mem, max_pq.top().second));
+
+ usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}",
+ max_pq.top().second, query_mem,
+ max_pq.top().first));
+ COUNTER_UPDATE(freed_memory_counter, query_mem);
+ COUNTER_UPDATE(cancel_tasks_counter, 1);
+ if (freed_memory_counter->value() > min_free_mem) {
+ break;
+ }
max_pq.pop();
- continue;
}
- int64_t query_mem = query_consumption[max_pq.top().second];
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
- cancel_msg(query_mem, max_pq.top().second));
-
- usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}",
- max_pq.top().second, query_mem, max_pq.top().first));
- freed_mem += query_mem;
- if (freed_mem > min_free_mem) {
- break;
- }
- max_pq.pop();
}
+ profile->merge(free_top_memory_task_profile.get());
LOG(INFO) << "Process GC Free Top Memory Overcommit " << type_string(type) << ": "
<< join(usage_strings, ",")
<< ". previous canceling task: " << join(canceling_task, ",");
- return freed_mem;
+ return freed_memory_counter->value();
}
int64_t MemTrackerLimiter::tg_memory_limit_gc(
int64_t need_free_mem, int64_t used_memory, uint64_t id, const std::string& name,
- int64_t memory_limit,
- std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups) {
+ int64_t memory_limit, std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups,
+ RuntimeProfile* profile) {
if (need_free_mem <= 0) {
return 0;
}
@@ -515,11 +558,11 @@ int64_t MemTrackerLimiter::tg_memory_limit_gc(
};
if (config::enable_query_memory_overcommit) {
freed_mem += MemTrackerLimiter::free_top_overcommit_query(
- need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str);
+ need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str, profile);
}
if (freed_mem < need_free_mem) {
- freed_mem += MemTrackerLimiter::free_top_memory_query(need_free_mem - freed_mem, query_type,
- tracker_limiter_groups, cancel_str);
+ freed_mem += MemTrackerLimiter::free_top_memory_query(
+ need_free_mem - freed_mem, query_type, tracker_limiter_groups, cancel_str, profile);
}
LOG(INFO) << fmt::format(
"task group {} finished gc, memory_limit: {}, used_memory: {}, freed_mem: {}.", name,
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 6e3dd3d51e..c90845ae88 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -175,37 +175,44 @@ public:
// vm_rss_str and mem_available_str recorded when gc is triggered, for log printing.
static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str,
const std::string& mem_available_str,
- Type type = Type::QUERY);
+ RuntimeProfile* profile, Type type = Type::QUERY);
template <typename TrackerGroups>
static int64_t free_top_memory_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups,
- const std::function<std::string(int64_t, const std::string&)>& cancel_msg);
+ const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
+ RuntimeProfile* profile);
static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str,
- const std::string& mem_available_str) {
- return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, Type::LOAD);
+ const std::string& mem_available_str,
+ RuntimeProfile* profile) {
+ return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, profile,
+ Type::LOAD);
}
// Start canceling from the query with the largest memory overcommit ratio until the memory
// of min_free_mem size is freed.
static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str,
const std::string& mem_available_str,
- Type type = Type::QUERY);
+ RuntimeProfile* profile, Type type = Type::QUERY);
template <typename TrackerGroups>
static int64_t free_top_overcommit_query(
int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups,
- const std::function<std::string(int64_t, const std::string&)>& cancel_msg);
+ const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
+ RuntimeProfile* profile);
static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str,
- const std::string& mem_available_str) {
- return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, Type::LOAD);
+ const std::string& mem_available_str,
+ RuntimeProfile* profile) {
+ return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, profile,
+ Type::LOAD);
}
static int64_t tg_memory_limit_gc(
int64_t request_free_memory, int64_t used_memory, uint64_t id, const std::string& name,
int64_t memory_limit,
- std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups);
+ std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups,
+ RuntimeProfile* profile);
// only for Type::QUERY or Type::LOAD.
static TUniqueId label_to_queryid(const std::string& label) {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index deafcdc241..ca4334ad45 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -173,7 +173,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) {
}
// Large memory alloc should use allocator.h
// Direct malloc or new large memory, unable to catch std::bad_alloc, BE may OOM.
- if (size > 1024l * 1024 * 1024) { // 1G
+ if (size > 1024l * 1024 * 1024 && !doris::config::disable_memory_gc) { // 1G
_stop_consume = true;
LOG(WARNING) << fmt::format("MemHook alloc large memory: {}, stacktrace:\n{}", size,
get_stack_trace());
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 97125d0942..29c58ca23e 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -38,9 +38,8 @@
#include "common/config.h"
#include "common/status.h"
#include "gutil/strings/split.h"
-#include "olap/page_cache.h"
-#include "olap/rowset/segment_v2/inverted_index_cache.h"
-#include "olap/segment_loader.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/cache_manager.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/task_group/task_group.h"
#include "runtime/task_group/task_group_manager.h"
@@ -48,6 +47,7 @@
#include "util/defer_op.h"
#include "util/parse_util.h"
#include "util/pretty_printer.h"
+#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "util/string_parser.hpp"
@@ -104,34 +104,6 @@ void MemInfo::refresh_allocator_mem() {
#endif
}
-void MemInfo::process_cache_gc(int64_t& freed_mem) {
- // TODO, free more cache, and should free a certain percentage of capacity, not all.
- int32_t min_free_size = 33554432; // 32M
-
- if (StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE) >
- min_free_size) {
- freed_mem +=
- StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
- StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
- }
-
- if (segment_v2::InvertedIndexSearcherCache::instance()->mem_consumption() > min_free_size) {
- freed_mem += segment_v2::InvertedIndexSearcherCache::instance()->prune();
- }
-
- if (segment_v2::InvertedIndexQueryCache::instance()->mem_consumption() > min_free_size) {
- freed_mem += segment_v2::InvertedIndexQueryCache::instance()->prune();
- }
-
- if (StoragePageCache::instance()->get_page_cache_mem_consumption(
- segment_v2::PRIMARY_KEY_INDEX_PAGE) > min_free_size) {
- freed_mem += StoragePageCache::instance()->get_page_cache_mem_consumption(
- segment_v2::PRIMARY_KEY_INDEX_PAGE);
- StoragePageCache::instance()->prune(segment_v2::PRIMARY_KEY_INDEX_PAGE);
- }
- je_purge_all_arena_dirty_pages();
-}
-
// step1: free all cache
// step2: free resource groups memory that enable overcommit
// step3: free global top overcommit query, if enable query memory overcommit
@@ -140,36 +112,42 @@ bool MemInfo::process_minor_gc() {
MonotonicStopWatch watch;
watch.start();
int64_t freed_mem = 0;
- std::string vm_rss_str = PerfCounters::get_vm_rss_str();
- std::string mem_available_str = MemInfo::sys_mem_available_str();
+ std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("");
+ std::string pre_vm_rss = PerfCounters::get_vm_rss_str();
+ std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
Defer defer {[&]() {
je_purge_all_arena_dirty_pages();
- LOG(INFO) << fmt::format("End Minor GC, Free Memory {} Bytes. cost(us): {}", freed_mem,
- watch.elapsed_time() / 1000);
+ std::stringstream ss;
+ profile->pretty_print(&ss);
+ LOG(INFO) << fmt::format("End Minor GC, Free Memory {} Bytes. cost(us): {}, details: {}",
+ freed_mem, watch.elapsed_time() / 1000, ss.str());
}};
- MemInfo::process_cache_gc(freed_mem);
+ freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get());
+ je_purge_all_arena_dirty_pages();
if (freed_mem > _s_process_minor_gc_size) {
return true;
}
- // TODO add freed_mem
- SegmentLoader::instance()->prune();
-
- freed_mem += tg_soft_memory_limit_gc(_s_process_minor_gc_size - freed_mem);
+ RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
+ freed_mem += tg_soft_memory_limit_gc(_s_process_minor_gc_size - freed_mem, tg_profile);
if (freed_mem > _s_process_minor_gc_size) {
return true;
}
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "Before free top memory overcommit query in Minor GC", MemTrackerLimiter::Type::QUERY);
if (config::enable_query_memory_overcommit) {
+ VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
+ "Before free top memory overcommit query in Minor GC",
+ MemTrackerLimiter::Type::QUERY);
+ RuntimeProfile* toq_profile =
+ profile->create_child("FreeTopOvercommitMemoryQuery", true, true);
freed_mem += MemTrackerLimiter::free_top_overcommit_query(
- _s_process_minor_gc_size - freed_mem, vm_rss_str, mem_available_str);
- }
- if (freed_mem > _s_process_minor_gc_size) {
- return true;
+ _s_process_minor_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available,
+ toq_profile);
+ if (freed_mem > _s_process_minor_gc_size) {
+ return true;
+ }
}
return false;
}
@@ -183,48 +161,47 @@ bool MemInfo::process_full_gc() {
MonotonicStopWatch watch;
watch.start();
int64_t freed_mem = 0;
- std::string vm_rss_str = PerfCounters::get_vm_rss_str();
- std::string mem_available_str = MemInfo::sys_mem_available_str();
+ std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("");
+ std::string pre_vm_rss = PerfCounters::get_vm_rss_str();
+ std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
Defer defer {[&]() {
je_purge_all_arena_dirty_pages();
- LOG(INFO) << fmt::format("End Full GC Free, Memory {} Bytes. cost(us): {}", freed_mem,
- watch.elapsed_time() / 1000);
+ std::stringstream ss;
+ profile->pretty_print(&ss);
+ LOG(INFO) << fmt::format("End Full GC Free, Memory {} Bytes. cost(us): {}, details: {}",
+ freed_mem, watch.elapsed_time() / 1000, ss.str());
}};
- MemInfo::process_cache_gc(freed_mem);
+ freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get());
+ je_purge_all_arena_dirty_pages();
if (freed_mem > _s_process_full_gc_size) {
return true;
}
- if (SegmentLoader::instance()->segment_cache_get_usage_ratio() > 0.1) {
- freed_mem += SegmentLoader::instance()->segment_cache_mem_consumption();
- LOG(INFO) << "prune all " << SegmentLoader::instance()->segment_cache_get_usage()
- << " entries in segment cache.";
- SegmentLoader::instance()->prune_all();
- if (freed_mem > _s_process_full_gc_size) {
- return true;
- }
- }
-
- freed_mem += tg_soft_memory_limit_gc(_s_process_full_gc_size - freed_mem);
+ RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", true, true);
+ freed_mem += tg_soft_memory_limit_gc(_s_process_full_gc_size - freed_mem, tg_profile);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top memory query in Full GC",
MemTrackerLimiter::Type::QUERY);
- freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem,
- vm_rss_str, mem_available_str);
+ RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", true, true);
+ freed_mem += MemTrackerLimiter::free_top_memory_query(
+ _s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, tmq_profile);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "Before free top memory overcommit load in Full GC", MemTrackerLimiter::Type::LOAD);
if (config::enable_query_memory_overcommit) {
+ VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
+ "Before free top memory overcommit load in Full GC", MemTrackerLimiter::Type::LOAD);
+ RuntimeProfile* tol_profile =
+ profile->create_child("FreeTopMemoryOvercommitLoad", true, true);
freed_mem += MemTrackerLimiter::free_top_overcommit_load(
- _s_process_full_gc_size - freed_mem, vm_rss_str, mem_available_str);
+ _s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available,
+ tol_profile);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
@@ -232,8 +209,9 @@ bool MemInfo::process_full_gc() {
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage("Before free top memory load in Full GC",
MemTrackerLimiter::Type::LOAD);
- freed_mem += MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem,
- vm_rss_str, mem_available_str);
+ RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", true, true);
+ freed_mem += MemTrackerLimiter::free_top_memory_load(
+ _s_process_full_gc_size - freed_mem, pre_vm_rss, pre_sys_mem_available, tml_profile);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
@@ -256,12 +234,12 @@ int64_t MemInfo::tg_hard_memory_limit_gc() {
auto used = task_group->memory_used();
total_free_memory += MemTrackerLimiter::tg_memory_limit_gc(
used - tg_info.memory_limit, used, tg_info.id, tg_info.name, tg_info.memory_limit,
- task_group->mem_tracker_limiter_pool());
+ task_group->mem_tracker_limiter_pool(), nullptr);
}
return total_free_memory;
}
-int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory) {
+int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimeProfile* profile) {
std::vector<taskgroup::TaskGroupPtr> task_groups;
ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
@@ -298,7 +276,7 @@ int64_t MemInfo::tg_soft_memory_limit_gc(int64_t request_free_memory) {
task_group->task_group_info(&tg_info);
total_free_memory += MemTrackerLimiter::tg_memory_limit_gc(
tg_need_free_memory, used_memorys[i], tg_info.id, tg_info.name,
- tg_info.memory_limit, task_group->mem_tracker_limiter_pool());
+ tg_info.memory_limit, task_group->mem_tracker_limiter_pool(), profile);
}
return total_free_memory;
}
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 542e0fd430..c83068befd 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -42,6 +42,8 @@
namespace doris {
+class RuntimeProfile;
+
// Provides the amount of physical memory available.
// Populated from /proc/meminfo.
// TODO: Combine mem-info, cpu-info and disk-info into hardware-info/perf_counters ?
@@ -165,13 +167,11 @@ public:
static std::string debug_string();
- static void process_cache_gc(int64_t& freed_mem);
static bool process_minor_gc();
static bool process_full_gc();
static int64_t tg_hard_memory_limit_gc();
-
- static int64_t tg_soft_memory_limit_gc(int64_t request_free_memory);
+ static int64_t tg_soft_memory_limit_gc(int64_t request_free_memory, RuntimeProfile* profile);
// It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process,
// avoid multiple threads starting at the same time and causing OOM.
diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h
index d23418625a..89f126ae6f 100644
--- a/be/src/util/obj_lru_cache.h
+++ b/be/src/util/obj_lru_cache.h
@@ -24,6 +24,7 @@ namespace doris {
// A common object cache depends on an Sharded LRU Cache.
// It has a certain capacity, which determin how many objects it can cache.
// Caller must hold a CacheHandle instance when visiting the cached object.
+// TODO shouble add gc prune
class ObjLRUCache {
public:
struct ObjKey {
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index d15552824e..9167c7df9f 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -49,7 +49,8 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
size, doris::thread_context()->thread_mem_tracker()->label(),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
- if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc) { // 1G
+ if (size > 1024l * 1024 * 1024 && !doris::enable_thread_catch_bad_alloc &&
+ !doris::config::disable_memory_gc) { // 1G
err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace();
}
diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp
index ae8f0a7fe1..ab3b0b1ea0 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -38,6 +38,7 @@
int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->init_mem_tracker();
doris::thread_context()->thread_mem_tracker_mgr->init();
+ doris::CacheManager::create_global_instance();
doris::TabletSchemaCache::create_global_schema_cache();
doris::StoragePageCache::create_global_cache(1 << 30, 10, 0);
doris::SegmentLoader::create_global_instance(1000);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org