You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/22 11:18:02 UTC
[doris] 14/15: [bugfix](memleak) UserFunctionCache may have memory leak during close (#18913)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 814f83e4534c887d048fcaca1ea39819a573c382
Author: yiguolei <67...@qq.com>
AuthorDate: Sat Apr 22 10:15:51 2023 +0800
[bugfix](memleak) UserFunctionCache may have memory leak during close (#18913)
* [bugfix](memleak) UserFunctionCache may have memory leak during close
* [bugfix](memleak) UserFunctionCache may have memory leak during close
---------
Co-authored-by: yiguolei <yi...@gmail.com>
---
be/src/runtime/user_function_cache.cpp | 124 ++++++---------------------------
be/src/runtime/user_function_cache.h | 25 ++-----
2 files changed, 28 insertions(+), 121 deletions(-)
diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp
index 312e9ca1f2..b912fbae24 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -22,6 +22,7 @@
#include <vector>
#include "common/config.h"
+#include "common/factory_creator.h"
#include "common/status.h"
#include "gutil/strings/split.h"
#include "http/http_client.h"
@@ -38,16 +39,12 @@ static const int kLibShardNum = 128;
// function cache entry, store information for
struct UserFunctionCacheEntry {
+ ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry);
UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_,
LibType type)
: function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {}
~UserFunctionCacheEntry();
- void ref() { _refs.fetch_add(1); }
-
- // If unref() returns true, this object should be delete
- bool unref() { return _refs.fetch_sub(1) == 1; }
-
int64_t function_id = 0;
// used to check if this library is valid.
std::string checksum;
@@ -78,9 +75,6 @@ struct UserFunctionCacheEntry {
std::unordered_map<std::string, void*> fptr_map;
LibType type;
-
-private:
- std::atomic<int> _refs {0};
};
UserFunctionCacheEntry::~UserFunctionCacheEntry() {
@@ -104,9 +98,6 @@ UserFunctionCache::~UserFunctionCache() {
while (it != _entry_map.end()) {
auto entry = it->second;
it = _entry_map.erase(it);
- if (entry->unref()) {
- delete entry;
- }
}
}
@@ -152,11 +143,9 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
return Status::InternalError("duplicate function id");
}
// create a cache entry and put it into entry map
- UserFunctionCacheEntry* entry =
- new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, lib_type);
+ std::shared_ptr<UserFunctionCacheEntry> entry = UserFunctionCacheEntry::create_shared(
+ function_id, checksum, dir + "/" + file, lib_type);
entry->is_downloaded = true;
-
- entry->ref();
_entry_map[function_id] = entry;
return Status::OK();
@@ -194,64 +183,11 @@ std::string get_real_symbol(const std::string& symbol) {
return str2;
}
-Status UserFunctionCache::get_function_ptr(int64_t fid, const std::string& orig_symbol,
- const std::string& url, const std::string& checksum,
- void** fn_ptr, UserFunctionCacheEntry** output_entry) {
- auto symbol = get_real_symbol(orig_symbol);
- if (fid == 0) {
- // Just loading a function ptr in the current process. No need to take any locks.
- RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, symbol.c_str(), fn_ptr));
- return Status::OK();
- }
-
- // if we need to unref entry
- bool need_unref_entry = false;
- UserFunctionCacheEntry* entry = nullptr;
- // find the library entry for this function. If *output_entry is not null
- // find symbol in it without to get other entry
- if (output_entry != nullptr && *output_entry != nullptr) {
- entry = *output_entry;
- } else {
- RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::SO));
- need_unref_entry = true;
- }
-
- Status status;
- {
- std::lock_guard<SpinLock> l(entry->map_lock);
- // now, we have the library entry, we need to lock it to find symbol
- auto it = entry->fptr_map.find(symbol);
- if (it != entry->fptr_map.end()) {
- *fn_ptr = it->second;
- } else {
- status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr);
- if (status.ok()) {
- entry->fptr_map.emplace(symbol, *fn_ptr);
- } else {
- LOG(WARNING) << "fail to lookup symbol in library, symbol=" << symbol
- << ", file=" << entry->lib_file;
- }
- }
- }
-
- if (status.ok() && output_entry != nullptr && *output_entry == nullptr) {
- *output_entry = entry;
- need_unref_entry = false;
- }
-
- if (need_unref_entry) {
- if (entry->unref()) {
- delete entry;
- }
- }
-
- return status;
-}
-
Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
const std::string& checksum,
- UserFunctionCacheEntry** output_entry, LibType type) {
- UserFunctionCacheEntry* entry = nullptr;
+ std::shared_ptr<UserFunctionCacheEntry>& output_entry,
+ LibType type) {
+ std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
std::string file_name = _get_file_name_from_url(url);
{
std::lock_guard<std::mutex> l(_cache_lock);
@@ -259,12 +195,10 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
if (it != _entry_map.end()) {
entry = it->second;
} else {
- entry = new UserFunctionCacheEntry(
+ entry = UserFunctionCacheEntry::create_shared(
fid, checksum, _make_lib_file(fid, checksum, type, file_name), type);
- entry->ref();
_entry_map.emplace(fid, entry);
}
- entry->ref();
}
auto st = _load_cache_entry(url, entry);
if (!st.ok()) {
@@ -275,28 +209,21 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
return st;
}
- *output_entry = entry;
+ output_entry = entry;
return Status::OK();
}
-void UserFunctionCache::_destroy_cache_entry(UserFunctionCacheEntry* entry) {
+void UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry) {
// 1. we remove cache entry from entry map
- size_t num_removed = 0;
- {
- std::lock_guard<std::mutex> l(_cache_lock);
- num_removed = _entry_map.erase(entry->function_id);
- }
- if (num_removed > 0) {
- entry->unref();
- }
+ std::lock_guard<std::mutex> l(_cache_lock);
+ // set should delete flag to true, so that the jar file will be removed when
+ // the entry is removed from map, and deconstruct method is called.
entry->should_delete_library.store(true);
- // now we need to drop
- if (entry->unref()) {
- delete entry;
- }
+ _entry_map.erase(entry->function_id);
}
-Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry) {
+Status UserFunctionCache::_load_cache_entry(const std::string& url,
+ std::shared_ptr<UserFunctionCacheEntry> entry) {
if (entry->is_loaded.load()) {
return Status::OK();
}
@@ -316,7 +243,8 @@ Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunction
}
// entry's lock must be held
-Status UserFunctionCache::_download_lib(const std::string& url, UserFunctionCacheEntry* entry) {
+Status UserFunctionCache::_download_lib(const std::string& url,
+ std::shared_ptr<UserFunctionCacheEntry> entry) {
DCHECK(!entry->is_downloaded);
// get local path to save library
@@ -389,7 +317,8 @@ std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) c
}
// entry's lock must be held
-Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* entry) {
+Status UserFunctionCache::_load_cache_entry_internal(
+ std::shared_ptr<UserFunctionCacheEntry> entry) {
RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
entry->is_loaded.store(true);
return Status::OK();
@@ -408,19 +337,10 @@ std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::st
return ss.str();
}
-void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) {
- if (entry == nullptr) {
- return;
- }
- if (entry->unref()) {
- delete entry;
- }
-}
-
Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
const std::string& checksum, std::string* libpath) {
- UserFunctionCacheEntry* entry = nullptr;
- RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::JAR));
+ std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
+ RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR));
*libpath = entry->lib_file;
return Status::OK();
}
diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h
index b3c4aa7e80..f49b6d216d 100644
--- a/be/src/runtime/user_function_cache.h
+++ b/be/src/runtime/user_function_cache.h
@@ -54,19 +54,6 @@ public:
static UserFunctionCache* instance();
- // Return function pointer for given fid and symbol.
- // If fid is 0, lookup symbol from this doris-be process.
- // Otherwise find symbol in UserFunction's library.
- // Found function pointer is returned in fn_ptr, and cache entry
- // is returned by entry. Client must call release_entry to release
- // cache entry if didn't need it.
- // If *entry is not true means that we should find symbol in this
- // entry.
- Status get_function_ptr(int64_t fid, const std::string& symbol, const std::string& url,
- const std::string& checksum, void** fn_ptr,
- UserFunctionCacheEntry** entry);
- void release_entry(UserFunctionCacheEntry* entry);
-
Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum,
std::string* libpath);
@@ -74,14 +61,14 @@ private:
Status _load_cached_lib();
Status _load_entry_from_lib(const std::string& dir, const std::string& file);
Status _get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum,
- UserFunctionCacheEntry** output_entry, LibType type);
- Status _load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry);
- Status _download_lib(const std::string& url, UserFunctionCacheEntry* entry);
- Status _load_cache_entry_internal(UserFunctionCacheEntry* entry);
+ std::shared_ptr<UserFunctionCacheEntry>& output_entry, LibType type);
+ Status _load_cache_entry(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry);
+ Status _download_lib(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry);
+ Status _load_cache_entry_internal(std::shared_ptr<UserFunctionCacheEntry> entry);
std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type,
const std::string& file_name);
- void _destroy_cache_entry(UserFunctionCacheEntry* entry);
+ void _destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry);
std::string _get_real_url(const std::string& url);
std::string _get_file_name_from_url(const std::string& url) const;
@@ -91,7 +78,7 @@ private:
void* _current_process_handle = nullptr;
std::mutex _cache_lock;
- std::unordered_map<int64_t, UserFunctionCacheEntry*> _entry_map;
+ std::unordered_map<int64_t, std::shared_ptr<UserFunctionCacheEntry>> _entry_map;
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org