You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/05/05 05:08:02 UTC

[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9197: Support remote storage, step 5 only be, add remote file reader

HappenLee commented on code in PR #9197:
URL: https://github.com/apache/incubator-doris/pull/9197#discussion_r865546024


##########
be/src/env/env_util.cpp:
##########
@@ -75,15 +75,18 @@ Status write_string_to_file_sync(Env* env, const Slice& data, const std::string&
     return do_write_string_to_file(env, data, fname, true);
 }
 
+// Support binary bytes reading.
 Status read_file_to_string(Env* env, const std::string& fname, std::string* data) {
     data->clear();
     std::unique_ptr<RandomAccessFile> file;
-    Status s = env->new_random_access_file(fname, &file);
-    if (!s.ok()) {
-        return s;
-    }
-    s = file->read_all(data);
-    return s;
+    RETURN_IF_ERROR(env->new_random_access_file(fname, &file));
+    uint64_t file_size = 0;
+    RETURN_IF_ERROR(file->size(&file_size));

Review Comment:
   if file_size is too big, it is danger to allocate the memory. maybe should do config check here?



##########
be/src/olap/fs/cached_segment_loader.cpp:
##########
@@ -0,0 +1,75 @@
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/fs/cached_segment_loader.h"
+
+#include "util/filesystem_util.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+CachedSegmentLoader::CachedSegmentLoader(size_t capacity)
+        : _mem_tracker(MemTracker::create_tracker(
+        capacity, "CachedSegmentLoader", nullptr, MemTrackerLevel::OVERVIEW)) {
+    _cache = std::unique_ptr<Cache>(new_lru_cache("CachedSegmentCache", capacity));
+
+}
+
+bool CachedSegmentLoader::_lookup(const CachedSegmentLoader::CacheKey& key, CachedSegmentCacheHandle* handle) {
+    auto lru_handle = _cache->lookup(key.encode());

Review Comment:
   seem no need return bool, just use CachedSegmentCacheHandle* == nullptr to check is enough. so return CachedSegmentCacheHandle* is better



##########
be/src/olap/fs/cached_segment_loader.cpp:
##########
@@ -0,0 +1,75 @@
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/fs/cached_segment_loader.h"
+
+#include "util/filesystem_util.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+CachedSegmentLoader::CachedSegmentLoader(size_t capacity)
+        : _mem_tracker(MemTracker::create_tracker(
+        capacity, "CachedSegmentLoader", nullptr, MemTrackerLevel::OVERVIEW)) {
+    _cache = std::unique_ptr<Cache>(new_lru_cache("CachedSegmentCache", capacity));
+
+}
+
+bool CachedSegmentLoader::_lookup(const CachedSegmentLoader::CacheKey& key, CachedSegmentCacheHandle* handle) {
+    auto lru_handle = _cache->lookup(key.encode());
+    if (lru_handle == nullptr) {
+        return false;
+    }
+    *handle = CachedSegmentCacheHandle(_cache.get(), lru_handle);
+    return true;
+}
+
+void CachedSegmentLoader::_insert(const CachedSegmentLoader::CacheKey& key, CachedSegmentLoader::CacheValue* value,
+                                  CachedSegmentCacheHandle* handle) {
+    // When evicting one entry from cache, delete the file on disk.
+    auto deleter = [](const doris::CacheKey& key, void* value) {
+        CachedSegmentLoader::CacheValue* cache_value = (CachedSegmentLoader::CacheValue*)value;
+        // remove file
+        std::vector<std::string> files = {key.to_string(), cache_value->file_path};
+        Status st = FileSystemUtil::remove_paths(files);
+        if (!st.ok()) {
+            LOG(WARNING) << "Fail to remove files [" << key.to_string() << ", "
+                         << cache_value->file_path << "], error_msg=" << st.get_error_msg();
+        }
+        LOG(INFO) << "Successfully remove cached segment files [" << key.to_string() << ", "
+                  << cache_value->file_path << "].";
+        delete cache_value;
+    };
+
+    auto lru_handle = _cache->insert(key.encode(), value, 1,
+                                     deleter, CachePriority::NORMAL);
+    *handle = CachedSegmentCacheHandle(_cache.get(), lru_handle);
+}
+
+bool CachedSegmentLoader::load_cached_segment(const std::string done_file_path,
+                                              CachedSegmentCacheHandle* cache_handle, bool use_cache) {
+    CachedSegmentLoader::CacheKey cache_key(done_file_path);
+    if (_lookup(cache_key, cache_handle)) {
+        return true;
+    }
+
+    return false;
+}
+
+void CachedSegmentLoader::insert(std::string done_file_path, std::string file_path,
+                                 CachedSegmentCacheHandle* cache_handle) {
+    CachedSegmentLoader::CacheKey cache_key(done_file_path);
+    // memory of CachedSegmentLoader::CacheValue will be handled by CachedSegmentLoader
+    CachedSegmentLoader::CacheValue* cache_value = new CachedSegmentLoader::CacheValue(file_path);
+    _insert(cache_key, cache_value, cache_handle);
+}
+
+Status CachedSegmentLoader::prune() {
+    MonotonicStopWatch watch;

Review Comment:
   the funtion only return OK? maybe no need return Status



##########
be/src/olap/fs/cached_segment_loader.h:
##########
@@ -0,0 +1,115 @@
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "gutil/macros.h" // for DISALLOW_COPY_AND_ASSIGN
+#include "olap/lru_cache.h"
+#include "runtime/mem_tracker.h"
+#include "util/time.h"
+
+namespace doris {
+
+class CachedSegmentCacheHandle;
+
+// CachedSegmentLoader is used to load the cached segment of remote storage, like S3, HDFS, etc.
+// An LRUCache is encapsulated inside it, which is used to cache the cached segments.
+// The caller should use the following method to load and obtain the segment:
+//
+//  CachedSegmentCacheHandle cache_handle;
+//  bool cached = StorageEngine::instance()->remote_file_cache()->load_cached_segment(done_file_path, &cache_handle);
+//
+// Make sure that cache_handle is valid during the segment usage period.
+class CachedSegmentLoader {
+public:
+
+    // The cache key for cached segment lru cache
+    // Holding cached segment done file path
+    struct CacheKey {
+        CacheKey(std::string done_file_path_) : done_file_path(done_file_path_) {}
+        std::string done_file_path;
+
+        // Encode to a flat binary which can be used as LRUCache's key
+        std::string encode() const {

Review Comment:
   return const& string



##########
be/src/olap/fs/cached_segment_loader.h:
##########
@@ -0,0 +1,115 @@
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "gutil/macros.h" // for DISALLOW_COPY_AND_ASSIGN
+#include "olap/lru_cache.h"
+#include "runtime/mem_tracker.h"
+#include "util/time.h"
+
+namespace doris {
+
+class CachedSegmentCacheHandle;
+
+// CachedSegmentLoader is used to load the cached segment of remote storage, like S3, HDFS, etc.
+// An LRUCache is encapsulated inside it, which is used to cache the cached segments.
+// The caller should use the following method to load and obtain the segment:
+//
+//  CachedSegmentCacheHandle cache_handle;
+//  bool cached = StorageEngine::instance()->remote_file_cache()->load_cached_segment(done_file_path, &cache_handle);
+//
+// Make sure that cache_handle is valid during the segment usage period.
+class CachedSegmentLoader {
+public:
+
+    // The cache key for cached segment lru cache
+    // Holding cached segment done file path
+    struct CacheKey {
+        CacheKey(std::string done_file_path_) : done_file_path(done_file_path_) {}

Review Comment:
   const string&



##########
be/src/olap/fs/cached_segment_loader.cpp:
##########
@@ -0,0 +1,75 @@
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/fs/cached_segment_loader.h"
+
+#include "util/filesystem_util.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+CachedSegmentLoader::CachedSegmentLoader(size_t capacity)
+        : _mem_tracker(MemTracker::create_tracker(
+        capacity, "CachedSegmentLoader", nullptr, MemTrackerLevel::OVERVIEW)) {
+    _cache = std::unique_ptr<Cache>(new_lru_cache("CachedSegmentCache", capacity));
+
+}
+
+bool CachedSegmentLoader::_lookup(const CachedSegmentLoader::CacheKey& key, CachedSegmentCacheHandle* handle) {
+    auto lru_handle = _cache->lookup(key.encode());
+    if (lru_handle == nullptr) {
+        return false;
+    }
+    *handle = CachedSegmentCacheHandle(_cache.get(), lru_handle);
+    return true;
+}
+
+void CachedSegmentLoader::_insert(const CachedSegmentLoader::CacheKey& key, CachedSegmentLoader::CacheValue* value,
+                                  CachedSegmentCacheHandle* handle) {
+    // When evicting one entry from cache, delete the file on disk.
+    auto deleter = [](const doris::CacheKey& key, void* value) {
+        CachedSegmentLoader::CacheValue* cache_value = (CachedSegmentLoader::CacheValue*)value;
+        // remove file
+        std::vector<std::string> files = {key.to_string(), cache_value->file_path};
+        Status st = FileSystemUtil::remove_paths(files);
+        if (!st.ok()) {
+            LOG(WARNING) << "Fail to remove files [" << key.to_string() << ", "
+                         << cache_value->file_path << "], error_msg=" << st.get_error_msg();
+        }
+        LOG(INFO) << "Successfully remove cached segment files [" << key.to_string() << ", "
+                  << cache_value->file_path << "].";
+        delete cache_value;
+    };
+
+    auto lru_handle = _cache->insert(key.encode(), value, 1,
+                                     deleter, CachePriority::NORMAL);
+    *handle = CachedSegmentCacheHandle(_cache.get(), lru_handle);
+}
+
+bool CachedSegmentLoader::load_cached_segment(const std::string done_file_path,
+                                              CachedSegmentCacheHandle* cache_handle, bool use_cache) {
+    CachedSegmentLoader::CacheKey cache_key(done_file_path);
+    if (_lookup(cache_key, cache_handle)) {
+        return true;
+    }
+
+    return false;
+}
+
+void CachedSegmentLoader::insert(std::string done_file_path, std::string file_path,

Review Comment:
   `std::string done_file_path, std::string file_path`
   recheck a reference is better.



##########
be/src/olap/fs/cached_segment_loader.cpp:
##########
@@ -0,0 +1,75 @@
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/fs/cached_segment_loader.h"
+
+#include "util/filesystem_util.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+CachedSegmentLoader::CachedSegmentLoader(size_t capacity)
+        : _mem_tracker(MemTracker::create_tracker(
+        capacity, "CachedSegmentLoader", nullptr, MemTrackerLevel::OVERVIEW)) {
+    _cache = std::unique_ptr<Cache>(new_lru_cache("CachedSegmentCache", capacity));
+
+}
+
+bool CachedSegmentLoader::_lookup(const CachedSegmentLoader::CacheKey& key, CachedSegmentCacheHandle* handle) {
+    auto lru_handle = _cache->lookup(key.encode());
+    if (lru_handle == nullptr) {
+        return false;
+    }
+    *handle = CachedSegmentCacheHandle(_cache.get(), lru_handle);
+    return true;
+}
+
+void CachedSegmentLoader::_insert(const CachedSegmentLoader::CacheKey& key, CachedSegmentLoader::CacheValue* value,
+                                  CachedSegmentCacheHandle* handle) {
+    // When evicting one entry from cache, delete the file on disk.
+    auto deleter = [](const doris::CacheKey& key, void* value) {
+        CachedSegmentLoader::CacheValue* cache_value = (CachedSegmentLoader::CacheValue*)value;
+        // remove file
+        std::vector<std::string> files = {key.to_string(), cache_value->file_path};
+        Status st = FileSystemUtil::remove_paths(files);
+        if (!st.ok()) {
+            LOG(WARNING) << "Fail to remove files [" << key.to_string() << ", "
+                         << cache_value->file_path << "], error_msg=" << st.get_error_msg();
+        }
+        LOG(INFO) << "Successfully remove cached segment files [" << key.to_string() << ", "
+                  << cache_value->file_path << "].";
+        delete cache_value;
+    };
+
+    auto lru_handle = _cache->insert(key.encode(), value, 1,
+                                     deleter, CachePriority::NORMAL);
+    *handle = CachedSegmentCacheHandle(_cache.get(), lru_handle);
+}
+
+bool CachedSegmentLoader::load_cached_segment(const std::string done_file_path,

Review Comment:
   const std::string& done_file_path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org