You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/22 11:18:02 UTC

[doris] 14/15: [bugfix](memleak) UserFunctionCache may have memory leak during close (#18913)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 814f83e4534c887d048fcaca1ea39819a573c382
Author: yiguolei <67...@qq.com>
AuthorDate: Sat Apr 22 10:15:51 2023 +0800

    [bugfix](memleak) UserFunctionCache may have memory leak during close (#18913)
    
    * [bugfix](memleak) UserFunctionCache may have memory leak during close
    
    * [bugfix](memleak) UserFunctionCache may have memory leak during close
    
    ---------
    
    Co-authored-by: yiguolei <yi...@gmail.com>
---
 be/src/runtime/user_function_cache.cpp | 124 ++++++---------------------------
 be/src/runtime/user_function_cache.h   |  25 ++-----
 2 files changed, 28 insertions(+), 121 deletions(-)

diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp
index 312e9ca1f2..b912fbae24 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "common/config.h"
+#include "common/factory_creator.h"
 #include "common/status.h"
 #include "gutil/strings/split.h"
 #include "http/http_client.h"
@@ -38,16 +39,12 @@ static const int kLibShardNum = 128;
 
 // function cache entry, store information for
 struct UserFunctionCacheEntry {
+    ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry);
     UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const std::string& lib_file_,
                            LibType type)
             : function_id(fid_), checksum(checksum_), lib_file(lib_file_), type(type) {}
     ~UserFunctionCacheEntry();
 
-    void ref() { _refs.fetch_add(1); }
-
-    // If unref() returns true, this object should be delete
-    bool unref() { return _refs.fetch_sub(1) == 1; }
-
     int64_t function_id = 0;
     // used to check if this library is valid.
     std::string checksum;
@@ -78,9 +75,6 @@ struct UserFunctionCacheEntry {
     std::unordered_map<std::string, void*> fptr_map;
 
     LibType type;
-
-private:
-    std::atomic<int> _refs {0};
 };
 
 UserFunctionCacheEntry::~UserFunctionCacheEntry() {
@@ -104,9 +98,6 @@ UserFunctionCache::~UserFunctionCache() {
     while (it != _entry_map.end()) {
         auto entry = it->second;
         it = _entry_map.erase(it);
-        if (entry->unref()) {
-            delete entry;
-        }
     }
 }
 
@@ -152,11 +143,9 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
         return Status::InternalError("duplicate function id");
     }
     // create a cache entry and put it into entry map
-    UserFunctionCacheEntry* entry =
-            new UserFunctionCacheEntry(function_id, checksum, dir + "/" + file, lib_type);
+    std::shared_ptr<UserFunctionCacheEntry> entry = UserFunctionCacheEntry::create_shared(
+            function_id, checksum, dir + "/" + file, lib_type);
     entry->is_downloaded = true;
-
-    entry->ref();
     _entry_map[function_id] = entry;
 
     return Status::OK();
@@ -194,64 +183,11 @@ std::string get_real_symbol(const std::string& symbol) {
     return str2;
 }
 
-Status UserFunctionCache::get_function_ptr(int64_t fid, const std::string& orig_symbol,
-                                           const std::string& url, const std::string& checksum,
-                                           void** fn_ptr, UserFunctionCacheEntry** output_entry) {
-    auto symbol = get_real_symbol(orig_symbol);
-    if (fid == 0) {
-        // Just loading a function ptr in the current process. No need to take any locks.
-        RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, symbol.c_str(), fn_ptr));
-        return Status::OK();
-    }
-
-    // if we need to unref entry
-    bool need_unref_entry = false;
-    UserFunctionCacheEntry* entry = nullptr;
-    // find the library entry for this function. If *output_entry is not null
-    // find symbol in it without to get other entry
-    if (output_entry != nullptr && *output_entry != nullptr) {
-        entry = *output_entry;
-    } else {
-        RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::SO));
-        need_unref_entry = true;
-    }
-
-    Status status;
-    {
-        std::lock_guard<SpinLock> l(entry->map_lock);
-        // now, we have the library entry, we need to lock it to find symbol
-        auto it = entry->fptr_map.find(symbol);
-        if (it != entry->fptr_map.end()) {
-            *fn_ptr = it->second;
-        } else {
-            status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr);
-            if (status.ok()) {
-                entry->fptr_map.emplace(symbol, *fn_ptr);
-            } else {
-                LOG(WARNING) << "fail to lookup symbol in library, symbol=" << symbol
-                             << ", file=" << entry->lib_file;
-            }
-        }
-    }
-
-    if (status.ok() && output_entry != nullptr && *output_entry == nullptr) {
-        *output_entry = entry;
-        need_unref_entry = false;
-    }
-
-    if (need_unref_entry) {
-        if (entry->unref()) {
-            delete entry;
-        }
-    }
-
-    return status;
-}
-
 Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
                                            const std::string& checksum,
-                                           UserFunctionCacheEntry** output_entry, LibType type) {
-    UserFunctionCacheEntry* entry = nullptr;
+                                           std::shared_ptr<UserFunctionCacheEntry>& output_entry,
+                                           LibType type) {
+    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
     std::string file_name = _get_file_name_from_url(url);
     {
         std::lock_guard<std::mutex> l(_cache_lock);
@@ -259,12 +195,10 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
         if (it != _entry_map.end()) {
             entry = it->second;
         } else {
-            entry = new UserFunctionCacheEntry(
+            entry = UserFunctionCacheEntry::create_shared(
                     fid, checksum, _make_lib_file(fid, checksum, type, file_name), type);
-            entry->ref();
             _entry_map.emplace(fid, entry);
         }
-        entry->ref();
     }
     auto st = _load_cache_entry(url, entry);
     if (!st.ok()) {
@@ -275,28 +209,21 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
         return st;
     }
 
-    *output_entry = entry;
+    output_entry = entry;
     return Status::OK();
 }
 
-void UserFunctionCache::_destroy_cache_entry(UserFunctionCacheEntry* entry) {
+void UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry) {
     // 1. we remove cache entry from entry map
-    size_t num_removed = 0;
-    {
-        std::lock_guard<std::mutex> l(_cache_lock);
-        num_removed = _entry_map.erase(entry->function_id);
-    }
-    if (num_removed > 0) {
-        entry->unref();
-    }
+    std::lock_guard<std::mutex> l(_cache_lock);
+    // set should delete flag to true, so that the jar file will be removed when
+    // the entry is removed from map, and deconstruct method is called.
     entry->should_delete_library.store(true);
-    // now we need to drop
-    if (entry->unref()) {
-        delete entry;
-    }
+    _entry_map.erase(entry->function_id);
 }
 
-Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry) {
+Status UserFunctionCache::_load_cache_entry(const std::string& url,
+                                            std::shared_ptr<UserFunctionCacheEntry> entry) {
     if (entry->is_loaded.load()) {
         return Status::OK();
     }
@@ -316,7 +243,8 @@ Status UserFunctionCache::_load_cache_entry(const std::string& url, UserFunction
 }
 
 // entry's lock must be held
-Status UserFunctionCache::_download_lib(const std::string& url, UserFunctionCacheEntry* entry) {
+Status UserFunctionCache::_download_lib(const std::string& url,
+                                        std::shared_ptr<UserFunctionCacheEntry> entry) {
     DCHECK(!entry->is_downloaded);
 
     // get local path to save library
@@ -389,7 +317,8 @@ std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) c
 }
 
 // entry's lock must be held
-Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* entry) {
+Status UserFunctionCache::_load_cache_entry_internal(
+        std::shared_ptr<UserFunctionCacheEntry> entry) {
     RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
     entry->is_loaded.store(true);
     return Status::OK();
@@ -408,19 +337,10 @@ std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::st
     return ss.str();
 }
 
-void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) {
-    if (entry == nullptr) {
-        return;
-    }
-    if (entry->unref()) {
-        delete entry;
-    }
-}
-
 Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
                                       const std::string& checksum, std::string* libpath) {
-    UserFunctionCacheEntry* entry = nullptr;
-    RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry, LibType::JAR));
+    std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
+    RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR));
     *libpath = entry->lib_file;
     return Status::OK();
 }
diff --git a/be/src/runtime/user_function_cache.h b/be/src/runtime/user_function_cache.h
index b3c4aa7e80..f49b6d216d 100644
--- a/be/src/runtime/user_function_cache.h
+++ b/be/src/runtime/user_function_cache.h
@@ -54,19 +54,6 @@ public:
 
     static UserFunctionCache* instance();
 
-    // Return function pointer for given fid and symbol.
-    // If fid is 0, lookup symbol from this doris-be process.
-    // Otherwise find symbol in UserFunction's library.
-    // Found function pointer is returned in fn_ptr, and cache entry
-    // is returned by entry. Client must call release_entry to release
-    // cache entry if didn't need it.
-    // If *entry is not true means that we should find symbol in this
-    // entry.
-    Status get_function_ptr(int64_t fid, const std::string& symbol, const std::string& url,
-                            const std::string& checksum, void** fn_ptr,
-                            UserFunctionCacheEntry** entry);
-    void release_entry(UserFunctionCacheEntry* entry);
-
     Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum,
                        std::string* libpath);
 
@@ -74,14 +61,14 @@ private:
     Status _load_cached_lib();
     Status _load_entry_from_lib(const std::string& dir, const std::string& file);
     Status _get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum,
-                            UserFunctionCacheEntry** output_entry, LibType type);
-    Status _load_cache_entry(const std::string& url, UserFunctionCacheEntry* entry);
-    Status _download_lib(const std::string& url, UserFunctionCacheEntry* entry);
-    Status _load_cache_entry_internal(UserFunctionCacheEntry* entry);
+                            std::shared_ptr<UserFunctionCacheEntry>& output_entry, LibType type);
+    Status _load_cache_entry(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry);
+    Status _download_lib(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry);
+    Status _load_cache_entry_internal(std::shared_ptr<UserFunctionCacheEntry> entry);
 
     std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type,
                                const std::string& file_name);
-    void _destroy_cache_entry(UserFunctionCacheEntry* entry);
+    void _destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry);
 
     std::string _get_real_url(const std::string& url);
     std::string _get_file_name_from_url(const std::string& url) const;
@@ -91,7 +78,7 @@ private:
     void* _current_process_handle = nullptr;
 
     std::mutex _cache_lock;
-    std::unordered_map<int64_t, UserFunctionCacheEntry*> _entry_map;
+    std::unordered_map<int64_t, std::shared_ptr<UserFunctionCacheEntry>> _entry_map;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org