You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2023/01/04 12:10:10 UTC

[GitHub] [doris] github-actions[bot] commented on a diff in pull request #15622: [feature](file cache)Import `file cache` for remote file reader

github-actions[bot] commented on code in PR #15622:
URL: https://github.com/apache/doris/pull/15622#discussion_r1061417929


##########
be/src/io/cloud/cached_remote_file_writer.h:
##########
@@ -0,0 +1,55 @@
+#pragma once
+
+#include "io/cloud/cloud_file_cache.h"
+#include "io/fs/file_writer.h"
+
+namespace doris {
+namespace io {
+
+class CachedRemoteFileWriter final : public FileWriter {
+public:
+    CachedRemoteFileWriter(FileWriterPtr remote_file_writer);
+    ~CachedRemoteFileWriter() override { close(); }

Review Comment:
   warning: use '= default' to define a trivial destructor [modernize-use-equals-default]
   ```cpp
       ~CachedRemoteFileWriter() override { close(); }
       ^
   ```
   



##########
be/src/io/cloud/cached_remote_file_writer.cpp:
##########
@@ -0,0 +1,92 @@
+#include "io/cloud/cached_remote_file_writer.h"
+
+#include <memory>
+
+#include "common/status.h"
+#include "io/cloud/cloud_file_cache.h"
+#include "io/cloud/cloud_file_cache_factory.h"
+#include "io/cloud/cloud_file_segment.h"
+#include "io/fs/s3_file_writer.h"
+
+namespace doris {
+namespace io {
+
+CachedRemoteFileWriter::CachedRemoteFileWriter(FileWriterPtr remote_file_writer)
+        : _remote_file_writer(std::move(remote_file_writer)) {}
+
+Status CachedRemoteFileWriter::open() {
+    RETURN_IF_ERROR(_remote_file_writer->open());
+    _cache_key = IFileCache::hash(_remote_file_writer->path().filename().native());
+    _cache = FileCacheFactory::instance().get_by_path(_cache_key);
+    return Status::OK();
+}
+
+Status CachedRemoteFileWriter::append(const Slice& data) {
+    RETURN_IF_ERROR(_remote_file_writer->append(data));
+    if (_need_buffer) {
+        _buffer.append(data.data, data.size);
+    }
+    return Status::OK();
+}
+
+Status CachedRemoteFileWriter::appendv(const Slice* data, size_t data_cnt) {
+    RETURN_IF_ERROR(_remote_file_writer->appendv(data, data_cnt));
+    if (_need_buffer) {
+        for (size_t i = 0; i < data_cnt; i++) {
+            _buffer.append((data + i)->get_data(), (data + i)->size);
+        }
+    }
+    return Status::OK();
+}
+
+Status CachedRemoteFileWriter::write_at(size_t offset, const Slice& data) {
+    RETURN_IF_ERROR(_remote_file_writer->write_at(offset, data));
+    if (!_need_buffer) return Status::OK();

Review Comment:
   warning: statement should be inside braces [readability-braces-around-statements]
   
   ```suggestion
       if (!_need_buffer) { return Status::OK();
   }
   ```
   



##########
be/src/io/cloud/cached_remote_file_writer.h:
##########
@@ -0,0 +1,55 @@
+#pragma once
+
+#include "io/cloud/cloud_file_cache.h"
+#include "io/fs/file_writer.h"
+
+namespace doris {
+namespace io {
+
+class CachedRemoteFileWriter final : public FileWriter {
+public:
+    CachedRemoteFileWriter(FileWriterPtr remote_file_writer);
+    ~CachedRemoteFileWriter() override { close(); }
+
+    Status open() override;
+
+    Status close(bool sync = true) override { return _remote_file_writer->close(sync); }

Review Comment:
   warning: non-virtual member function marked 'override' hides virtual member function [clang-diagnostic-error]
   ```cpp
       Status close(bool sync = true) override { return _remote_file_writer->close(sync); }
                                      ^
   ```
   **be/src/io/fs/file_writer.h:38:** hidden overloaded virtual function 'doris::io::FileWriter::close' declared here: different number of parameters (0 vs 1)
   ```cpp
       virtual Status close() = 0;
                      ^
   ```
   



##########
be/src/io/cloud/cached_remote_file_writer.cpp:
##########
@@ -0,0 +1,92 @@
+#include "io/cloud/cached_remote_file_writer.h"
+
+#include <memory>
+
+#include "common/status.h"
+#include "io/cloud/cloud_file_cache.h"
+#include "io/cloud/cloud_file_cache_factory.h"
+#include "io/cloud/cloud_file_segment.h"
+#include "io/fs/s3_file_writer.h"
+
+namespace doris {
+namespace io {
+
+CachedRemoteFileWriter::CachedRemoteFileWriter(FileWriterPtr remote_file_writer)
+        : _remote_file_writer(std::move(remote_file_writer)) {}
+
+Status CachedRemoteFileWriter::open() {
+    RETURN_IF_ERROR(_remote_file_writer->open());

Review Comment:
   warning: no member named 'open' in 'doris::io::FileWriter' [clang-diagnostic-error]
   ```cpp
       RETURN_IF_ERROR(_remote_file_writer->open());
                                            ^
   ```
   **be/src/common/status.h:484:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           Status _status_ = (stmt);       \
                              ^
   ```
   



##########
be/src/io/cloud/cached_remote_file_writer.h:
##########
@@ -0,0 +1,55 @@
+#pragma once
+
+#include "io/cloud/cloud_file_cache.h"
+#include "io/fs/file_writer.h"
+
+namespace doris {
+namespace io {
+
+class CachedRemoteFileWriter final : public FileWriter {
+public:
+    CachedRemoteFileWriter(FileWriterPtr remote_file_writer);
+    ~CachedRemoteFileWriter() override { close(); }
+
+    Status open() override;

Review Comment:
   warning: only virtual member functions can be marked 'override' [clang-diagnostic-error]
   
   ```suggestion
       Status open() ;
   ```
   



##########
be/src/io/cloud/cached_remote_file_writer.h:
##########
@@ -0,0 +1,55 @@
+#pragma once
+
+#include "io/cloud/cloud_file_cache.h"
+#include "io/fs/file_writer.h"
+
+namespace doris {
+namespace io {
+
+class CachedRemoteFileWriter final : public FileWriter {
+public:
+    CachedRemoteFileWriter(FileWriterPtr remote_file_writer);
+    ~CachedRemoteFileWriter() override { close(); }
+
+    Status open() override;
+
+    Status close(bool sync = true) override { return _remote_file_writer->close(sync); }

Review Comment:
   warning: too many arguments to function call, expected 0, have 1 [clang-diagnostic-error]
   ```cpp
       Status close(bool sync = true) override { return _remote_file_writer->close(sync); }
                                                                                   ^
   ```
   **be/src/io/fs/file_writer.h:38:** 'close' declared here
   ```cpp
       virtual Status close() = 0;
                      ^
   ```
   



##########
be/src/io/cloud/cached_remote_file_writer.h:
##########
@@ -0,0 +1,55 @@
+#pragma once
+
+#include "io/cloud/cloud_file_cache.h"
+#include "io/fs/file_writer.h"
+
+namespace doris {
+namespace io {
+
+class CachedRemoteFileWriter final : public FileWriter {
+public:
+    CachedRemoteFileWriter(FileWriterPtr remote_file_writer);
+    ~CachedRemoteFileWriter() override { close(); }
+
+    Status open() override;
+
+    Status close(bool sync = true) override { return _remote_file_writer->close(sync); }
+
+    Status abort() override { return _remote_file_writer->abort(); }
+
+    Status append(const Slice& data) override;
+
+    Status appendv(const Slice* data, size_t data_cnt) override;
+
+    Status write_at(size_t offset, const Slice& data) override;
+
+    Status finalize() override;
+
+    size_t bytes_appended() const override { return _remote_file_writer->bytes_appended(); }
+
+    // From now, the datas are appended will be cached in file cache
+    void cache_data_from_current_offset() {
+        _buffer.clear();
+        _need_buffer = true;
+        _buffer_start_offset = bytes_appended();
+    }
+
+    // cache_data_from_current_offset and put_buffer_to_cache are a pair.
+    // call this method, the datas in buffer will be flush in file cache.
+    // it should call after cache_data_from_current_offset
+    Status put_buffer_to_cache();
+
+    const Path& path() const override { return _remote_file_writer->path(); }

Review Comment:
   warning: only virtual member functions can be marked 'override' [clang-diagnostic-error]
   
   ```suggestion
       const Path& path() const { return _remote_file_writer->path(); }
   ```
   



##########
be/src/util/lock.h:
##########
@@ -0,0 +1,86 @@
+#pragma once
+
+#include <bthread/condition_variable.h>
+#include <bthread/mutex.h>
+
+#include <atomic>
+#include <cassert>
+#include <condition_variable>
+#include <cstdint>
+#include <mutex>
+#include <shared_mutex>
+
+#include "service/brpc_conflict.h"
+
+namespace doris {
+class BthreadSharedMutex;
+#if !defined(USE_BTHREAD_SCANNER)
+using Mutex = std::mutex;
+using ConditionVariable = std::condition_variable;
+using SharedMutex = std::shared_mutex;
+#else
+using Mutex = bthread::Mutex;
+using ConditionVariable = bthread::ConditionVariable;
+using SharedMutex = BthreadSharedMutex;
+#endif
+
+class BthreadSharedMutex {
+public:
+    BthreadSharedMutex() : _reader_nums(0), _is_writing(false) {}
+    ~BthreadSharedMutex() = default;
+
+    void lock_shared() {
+        std::unique_lock<doris::Mutex> lock(_mutex);
+        while (_is_writing) {
+            _cv.wait(lock);
+        }
+        ++_reader_nums;
+    }
+
+    void unlock_shared() {
+        std::unique_lock<doris::Mutex> lock(_mutex);
+        --_reader_nums;
+        _cv.notify_one();
+    }
+
+    void lock() {
+        std::unique_lock<doris::Mutex> lock(_mutex);
+        while (_reader_nums != 0 || _is_writing == true) {
+            _cv.wait(lock);
+        }
+        _is_writing = true;
+    }
+
+    void unlock() {
+        std::unique_lock<doris::Mutex> lock(_mutex);
+        _is_writing = false;
+        _cv.notify_all();
+    }
+
+    void try_lock_shared_until() {
+        // not support yet
+        assert(false);
+    }
+
+    void try_lock_shared() {
+        // not support yet
+        assert(false);
+    }
+
+    void try_lock_shared_for() {
+        // not support ye
+        assert(false);
+    }
+
+private:
+    int64_t _reader_nums;
+    bool _is_writing;
+
+    doris::Mutex _mutex;
+    doris::ConditionVariable _cv;
+
+private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/util/lock.h:74:** previously declared here
   ```cpp
   private:
   ^
   ```
   



##########
be/src/io/cloud/cached_remote_file_writer.h:
##########
@@ -0,0 +1,55 @@
+#pragma once
+
+#include "io/cloud/cloud_file_cache.h"
+#include "io/fs/file_writer.h"
+
+namespace doris {
+namespace io {
+
+class CachedRemoteFileWriter final : public FileWriter {

Review Comment:
   warning: abstract class is marked 'final' [clang-diagnostic-abstract-final-class]
   ```cpp
   class CachedRemoteFileWriter final : public FileWriter {
         ^
   ```
   **be/src/io/fs/file_writer.h:38:** unimplemented pure virtual method 'close' in 'CachedRemoteFileWriter'
   ```cpp
       virtual Status close() = 0;
                      ^
   ```
   **be/src/io/fs/file_writer.h:55:** unimplemented pure virtual method 'fs' in 'CachedRemoteFileWriter'
   ```cpp
       virtual FileSystem* fs() const = 0;
                           ^
   ```
   



##########
be/src/io/cloud/cached_remote_file_writer.cpp:
##########
@@ -0,0 +1,92 @@
+#include "io/cloud/cached_remote_file_writer.h"
+
+#include <memory>
+
+#include "common/status.h"
+#include "io/cloud/cloud_file_cache.h"
+#include "io/cloud/cloud_file_cache_factory.h"
+#include "io/cloud/cloud_file_segment.h"
+#include "io/fs/s3_file_writer.h"
+
+namespace doris {
+namespace io {
+
+CachedRemoteFileWriter::CachedRemoteFileWriter(FileWriterPtr remote_file_writer)

Review Comment:
   warning: constructor for 'doris::io::CachedRemoteFileWriter' must explicitly initialize the base class 'doris::io::FileWriter' which does not have a default constructor [clang-diagnostic-error]
   ```cpp
   CachedRemoteFileWriter::CachedRemoteFileWriter(FileWriterPtr remote_file_writer)
                           ^
   ```
   **be/src/io/fs/file_writer.h:30:** 'doris::io::FileWriter' declared here
   ```cpp
   class FileWriter {
         ^
   ```
   



##########
be/src/util/async_io.h:
##########
@@ -0,0 +1,91 @@
+#pragma once
+
+#include <bthread/bthread.h>
+
+#include "io/fs/file_system.h"
+#include "olap/olap_define.h"
+#include "priority_thread_pool.hpp"
+#include "runtime/threadlocal.h"
+#include "service/brpc_conflict.h"
+#include "util/lock.h"
+
+namespace doris {
+
+struct AsyncIOCtx {
+    int nice;
+};
+
+/**
+ * Separate task from bthread to pthread, specific for IO task.
+ */
+class AsyncIO {
+public:
+    AsyncIO() {
+        _io_thread_pool =
+                new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
+                                       config::doris_scanner_thread_pool_queue_size, "AsyncIo");
+        _remote_thread_pool = new PriorityThreadPool(
+                config::doris_remote_scanner_thread_pool_thread_num,
+                config::doris_remote_scanner_thread_pool_queue_size, "AsyncIo");
+    }
+
+    ~AsyncIO() {
+        SAFE_DELETE(_io_thread_pool);
+        SAFE_DELETE(_remote_thread_pool);
+    }
+
+    AsyncIO& operator=(const AsyncIO&) = delete;
+    AsyncIO(const AsyncIO&) = delete;
+
+    static AsyncIO& instance() {
+        static AsyncIO instance;
+        return instance;
+    }
+
+    // This function should run on the bthread, and it will put the task into
+    // thread_pool and release the bthread_worker at cv.wait. When the task is completed,
+    // the bthread will continue to execute.
+    static void run_task(std::function<void()> fn, io::FileSystemType file_type) {
+        DCHECK(bthread_self() != 0);
+        doris::Mutex mutex;
+        doris::ConditionVariable cv;
+        std::unique_lock<doris::Mutex> l(mutex);
+
+        AsyncIOCtx* ctx = static_cast<AsyncIOCtx*>(bthread_getspecific(btls_io_ctx_key));
+        int nice = -1;
+        if (ctx == nullptr) {
+            nice = 18;
+        } else {
+            nice = ctx->nice;
+        }
+
+        PriorityThreadPool::Task task;
+        task.priority = nice;
+        task.work_function = [&] {
+            fn();
+            std::unique_lock<doris::Mutex> l(mutex);
+            cv.notify_one();
+        };
+
+        if (file_type == io::FileSystemType::S3) {
+            AsyncIO::instance().remote_thread_pool()->offer(task);
+        } else {
+            AsyncIO::instance().io_thread_pool()->offer(task);
+        }
+        cv.wait(l);
+    }
+
+    inline static bthread_key_t btls_io_ctx_key;
+
+    static void io_ctx_key_deleter(void* d) { delete static_cast<AsyncIOCtx*>(d); }
+
+private:
+    PriorityThreadPool* _io_thread_pool = nullptr;
+    PriorityThreadPool* _remote_thread_pool = nullptr;
+
+private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/util/async_io.h:81:** previously declared here
   ```cpp
   private:
   ^
   ```
   



##########
be/src/io/cloud/cached_remote_file_reader.h:
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "gutil/macros.h"
+#include "io/cloud/cloud_file_cache.h"
+#include "io/cloud/cloud_file_cache_fwd.h"
+#include "io/cloud/cloud_file_cache_profile.h"
+#include "io/cloud/cloud_file_segment.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/path.h"
+#include "io/fs/s3_file_system.h"
+
+namespace doris {
+namespace io {
+
+class CachedRemoteFileReader final : public FileReader {
+public:
+    using metrics_hook = std::function<void(FileCacheStatistics*)>;
+    CachedRemoteFileReader(FileReaderSPtr remote_file_reader, metrics_hook);
+
+    ~CachedRemoteFileReader() override;
+
+    Status close() override;
+
+    Status read_at(size_t offset, Slice result, const IOContext& io_ctx,
+                   size_t* bytes_read) override;
+
+    Status read_at_impl(size_t offset, Slice result, const IOContext& io_ctx, size_t* bytes_read);
+
+    const Path& path() const override { return _remote_file_reader->path(); }
+
+    size_t size() const override { return _remote_file_reader->size(); }
+
+    bool closed() const override { return _remote_file_reader->closed(); }
+
+    // FileSystem* fs() const override { return _remote_file_reader->fs(); }
+
+private:
+    std::pair<size_t, size_t> _align_size(size_t offset, size_t size) const;
+
+    FileReaderSPtr _remote_file_reader;
+    IFileCache::Key _cache_key;
+    CloudFileCachePtr _cache;
+    CloudFileCachePtr _disposable_cache;
+
+private:

Review Comment:
   warning: redundant access specifier has the same accessibility as the previous access specifier [readability-redundant-access-specifiers]
   
   ```suggestion
   
   ```
   **be/src/io/cloud/cached_remote_file_reader.h:53:** previously declared here
   ```cpp
   private:
   ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org