You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2022/08/12 06:39:42 UTC

[incubator-pegasus] branch backup_restore-dev updated: feat: add some functions for block service manager (#1109)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch backup_restore-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/backup_restore-dev by this push:
     new 22d48c3dc feat: add some functions for block service manager (#1109)
22d48c3dc is described below

commit 22d48c3dcbf5a051d248b17cf6910d4ea868879b
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Fri Aug 12 14:39:37 2022 +0800

    feat: add some functions for block service manager (#1109)
---
 .../src/block_service/block_service_manager.cpp    | 157 +++++++++++++++++++--
 src/rdsn/src/block_service/block_service_manager.h |  38 +++++
 .../test/block_service_manager_test.cpp            |  77 ++++++++++
 .../src/block_service/test/block_service_mock.h    |  25 +++-
 4 files changed, 288 insertions(+), 9 deletions(-)

diff --git a/src/rdsn/src/block_service/block_service_manager.cpp b/src/rdsn/src/block_service/block_service_manager.cpp
index fb63e6b97..fccacc495 100644
--- a/src/rdsn/src/block_service/block_service_manager.cpp
+++ b/src/rdsn/src/block_service/block_service_manager.cpp
@@ -112,6 +112,20 @@ static create_file_response create_block_file_sync(const std::string &remote_fil
     return ret;
 }
 
+error_code block_service_manager::create_block_file(const std::string &remote_file_name,
+                                                    bool ignore_meta,
+                                                    block_filesystem *fs,
+                                                    task_tracker *tracker,
+                                                    /*out*/ create_file_response &create_resp)
+{
+    create_resp = create_block_file_sync(remote_file_name, ignore_meta, fs, tracker);
+    const auto &err = create_resp.err;
+    if (err != ERR_OK) {
+        derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
+    }
+    return err;
+}
+
 static download_response
 download_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
 {
@@ -150,19 +164,19 @@ error_code block_service_manager::download_file(const std::string &remote_dir,
     }
 
     task_tracker tracker;
-
     // Create a block_file object.
-    const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
-    auto create_resp =
-        create_block_file_sync(remote_file_name, false /*ignore file meta*/, fs, &tracker);
-    error_code err = create_resp.err;
+    create_file_response create_resp;
+    auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name),
+                                 false /*ignore file meta*/,
+                                 fs,
+                                 &tracker,
+                                 create_resp);
     if (err != ERR_OK) {
-        derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
         return err;
     }
-    block_file_ptr bf = create_resp.file_handle;
 
-    download_response resp = download_block_file_sync(local_file_name, bf.get(), &tracker);
+    download_response resp =
+        download_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker);
     if (resp.err != ERR_OK) {
         // during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
         // error, however, if file damaged on remote file provider, bulk load should stop,
@@ -184,6 +198,133 @@ error_code block_service_manager::download_file(const std::string &remote_dir,
     return ERR_OK;
 }
 
+static upload_response
+upload_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
+{
+    upload_response ret;
+    bf->upload(upload_request{local_file_path},
+               TASK_CODE_EXEC_INLINED,
+               [&ret](const upload_response &resp) { ret = resp; },
+               tracker);
+    tracker->wait_outstanding_tasks();
+    return ret;
+}
+
+error_code block_service_manager::upload_file(const std::string &remote_dir,
+                                              const std::string &local_dir,
+                                              const std::string &file_name,
+                                              block_filesystem *fs)
+{
+    task_tracker tracker;
+    // Create a block_file object.
+    create_file_response create_resp;
+    auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name),
+                                 false /*ignore file meta*/,
+                                 fs,
+                                 &tracker,
+                                 create_resp);
+    if (err != ERR_OK) {
+        return err;
+    }
+    // Upload file
+    const auto &local_file_name = utils::filesystem::path_combine(local_dir, file_name);
+    const upload_response &resp =
+        upload_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker);
+    if (resp.err != ERR_OK) {
+        return resp.err;
+    }
+    ddebug_f("upload file({}) succeed", local_file_name);
+    return ERR_OK;
+}
+
+static write_response
+write_block_file_sync(const blob &value, block_file *bf, task_tracker *tracker)
+{
+    write_response ret;
+    bf->write(write_request{value},
+              TASK_CODE_EXEC_INLINED,
+              [&ret](const write_response &resp) { ret = resp; },
+              tracker);
+    tracker->wait_outstanding_tasks();
+    return ret;
+}
+
+error_code block_service_manager::write_file(const std::string &remote_dir,
+                                             const std::string &file_name,
+                                             const blob &value,
+                                             block_filesystem *fs)
+{
+    task_tracker tracker;
+    // Create a block_file object.
+    create_file_response create_resp;
+    const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
+    auto err =
+        create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp);
+    if (err != ERR_OK) {
+        return err;
+    }
+    // Write blob
+    const write_response &resp =
+        write_block_file_sync(value, create_resp.file_handle.get(), &tracker);
+    if (resp.err != ERR_OK) {
+        return resp.err;
+    }
+    ddebug_f("write remote file({}) succeed", remote_file_name);
+    return ERR_OK;
+}
+
+error_code
+block_service_manager::remove_path(const std::string &path, bool recursive, block_filesystem *fs)
+{
+    task_tracker tracker;
+    remove_path_response ret;
+    fs->remove_path(remove_path_request{path, recursive},
+                    TASK_CODE_EXEC_INLINED,
+                    [&ret](const remove_path_response &resp) { ret = resp; },
+                    &tracker);
+    tracker.wait_outstanding_tasks();
+    return ret.err;
+}
+
+static read_response read_block_file_sync(block_file *bf,
+                                          const uint64_t remote_pos,
+                                          const int64_t remote_length,
+                                          task_tracker *tracker)
+{
+    read_response ret;
+    bf->read(read_request{remote_pos, remote_length},
+             TASK_CODE_EXEC_INLINED,
+             [&ret](const read_response &resp) { ret = resp; },
+             tracker);
+    tracker->wait_outstanding_tasks();
+    return ret;
+}
+
+error_code block_service_manager::read_file(const std::string &remote_dir,
+                                            const std::string &file_name,
+                                            block_filesystem *fs,
+                                            blob &value)
+{
+    task_tracker tracker;
+    // Create a block_file object.
+    create_file_response create_resp;
+    const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
+    auto err =
+        create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp);
+    if (err != ERR_OK) {
+        return err;
+    }
+    // Read blob
+    const read_response &resp = read_block_file_sync(
+        create_resp.file_handle.get(), 0 /*remote_pos*/, -1 /*remote_length*/, &tracker);
+    if (resp.err != ERR_OK) {
+        return resp.err;
+    }
+    ddebug_f("read remote file({}) succeed", remote_file_name);
+    value = resp.buffer;
+    return ERR_OK;
+}
+
 } // namespace block_service
 } // namespace dist
 } // namespace dsn
diff --git a/src/rdsn/src/block_service/block_service_manager.h b/src/rdsn/src/block_service/block_service_manager.h
index 65b0b7294..ec67db1bd 100644
--- a/src/rdsn/src/block_service/block_service_manager.h
+++ b/src/rdsn/src/block_service/block_service_manager.h
@@ -45,6 +45,14 @@ public:
     ~block_service_manager();
     block_filesystem *get_or_create_block_filesystem(const std::string &provider);
 
+    // create block file
+    // \return  ERR_FS_INTERNAL: remote file system error
+    error_code create_block_file(const std::string &remote_file_name,
+                                 bool ignore_meta,
+                                 block_filesystem *fs,
+                                 task_tracker *tracker,
+                                 /*out*/ create_file_response &resp);
+
     // download files from remote file system
     // \return  ERR_FILE_OPERATION_FAILED: local file system error
     // \return  ERR_FS_INTERNAL: remote file system error
@@ -69,6 +77,36 @@ public:
                              block_filesystem *fs,
                              /*out*/ uint64_t &download_file_size);
 
+    // upload files from remote file system
+    // \return  ERR_FILE_OPERATION_FAILED: local file system error
+    // \return  ERR_FS_INTERNAL: remote file system error
+    error_code upload_file(const std::string &remote_dir,
+                           const std::string &local_dir,
+                           const std::string &file_name,
+                           block_filesystem *fs);
+
+    // write blob value onto remote file system
+    // \return  ERR_FILE_OPERATION_FAILED: local file system error
+    // \return  ERR_FS_INTERNAL: remote file system error
+    error_code write_file(const std::string &remote_dir,
+                          const std::string &file_name,
+                          const blob &value,
+                          block_filesystem *fs);
+
+    // remove path on remote file system
+    // \return ERR_OBJECT_NOT_FOUND: remove path not exist
+    // \return ERR_FS_INTERNAL: remote file system error
+    // \return ERR_DIR_NOT_EMPTY: path not empty
+    error_code remove_path(const std::string &path, bool recursive, block_filesystem *fs);
+
+    // read blob value from remote file system
+    // \return  ERR_FILE_OPERATION_FAILED: local file system error
+    // \return  ERR_FS_INTERNAL: remote file system error
+    error_code read_file(const std::string &remote_dir,
+                         const std::string &file_name,
+                         block_filesystem *fs,
+                         /*out*/ blob &value);
+
 private:
     block_service_registry &_registry_holder;
 
diff --git a/src/rdsn/src/block_service/test/block_service_manager_test.cpp b/src/rdsn/src/block_service/test/block_service_manager_test.cpp
index e2269de6b..7c5f863c4 100644
--- a/src/rdsn/src/block_service/test/block_service_manager_test.cpp
+++ b/src/rdsn/src/block_service/test/block_service_manager_test.cpp
@@ -46,6 +46,27 @@ public:
             PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get(), download_size);
     }
 
+    error_code test_upload_file()
+    {
+        return _block_service_manager.upload_file(PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get());
+    }
+
+    error_code test_write_file()
+    {
+        return _block_service_manager.write_file(
+            PROVIDER, FILE_NAME, blob::create_from_bytes("test_value"), _fs.get());
+    }
+
+    error_code test_read_file(blob &value)
+    {
+        return _block_service_manager.read_file(PROVIDER, FILE_NAME, _fs.get(), value);
+    }
+
+    error_code test_remove_path(bool recursive)
+    {
+        return _block_service_manager.remove_path(REMOTE_DIR, recursive, _fs.get());
+    }
+
     void create_local_file(const std::string &file_name)
     {
         std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
@@ -63,9 +84,28 @@ public:
     void create_remote_file(const std::string &file_name, int64_t size, const std::string &md5)
     {
         std::string whole_file_name = utils::filesystem::path_combine(PROVIDER, file_name);
+
         _fs->files[whole_file_name] = std::make_pair(size, md5);
     }
 
+    void create_remote_dir(bool empty)
+    {
+        std::vector<ls_entry> entries;
+        if (!empty) {
+            ls_entry dir_entry;
+            dir_entry.entry_name = REMOTE_DIR;
+            dir_entry.is_directory = true;
+            entries.emplace_back(dir_entry);
+            ls_entry file_entry;
+            file_entry.entry_name = FILE_NAME;
+            file_entry.is_directory = false;
+            entries.emplace_back(file_entry);
+        }
+        _fs->dir_files[REMOTE_DIR] = entries;
+    }
+
+    void clear_remote_dir() { _fs->dir_files.clear(); }
+
 public:
     block_service_manager _block_service_manager;
     std::unique_ptr<block_service_mock> _fs;
@@ -74,6 +114,7 @@ public:
     std::string PROVIDER = "local_service";
     std::string LOCAL_DIR = "test_dir";
     std::string FILE_NAME = "test_file";
+    std::string REMOTE_DIR = "remote_test_dir";
 };
 
 // download_file unit tests
@@ -119,6 +160,42 @@ TEST_F(block_service_manager_test, do_download_succeed)
     ASSERT_EQ(download_size, _file_meta.size);
 }
 
+TEST_F(block_service_manager_test, upload_file_test)
+{
+    create_local_file(FILE_NAME);
+    ASSERT_EQ(test_upload_file(), ERR_OK);
+}
+
+TEST_F(block_service_manager_test, write_file_test) { ASSERT_EQ(test_write_file(), ERR_OK); }
+
+TEST_F(block_service_manager_test, read_file_test)
+{
+    blob value;
+    ASSERT_EQ(test_read_file(value), ERR_OK);
+}
+
+TEST_F(block_service_manager_test, remove_path_test)
+{
+    struct test_struct
+    {
+        bool mock_dir;
+        bool dir_empty;
+        bool recursive;
+        error_code expected_err;
+    } tests[]{{false, false, false, ERR_OBJECT_NOT_FOUND},
+              {true, true, false, ERR_OK},
+              {true, true, true, ERR_OK},
+              {true, false, false, ERR_DIR_NOT_EMPTY},
+              {true, false, true, ERR_OK}};
+    for (const auto &test : tests) {
+        if (test.mock_dir) {
+            create_remote_dir(test.dir_empty);
+        }
+        ASSERT_EQ(test_remove_path(test.recursive), test.expected_err);
+        clear_remote_dir();
+    }
+}
+
 } // namespace block_service
 } // namespace dist
 } // namespace dsn
diff --git a/src/rdsn/src/block_service/test/block_service_mock.h b/src/rdsn/src/block_service/test/block_service_mock.h
index 5c899a39d..dc23063b1 100644
--- a/src/rdsn/src/block_service/test/block_service_mock.h
+++ b/src/rdsn/src/block_service/test/block_service_mock.h
@@ -145,7 +145,10 @@ class block_service_mock : public block_filesystem
 {
 public:
     block_service_mock()
-        : block_filesystem(), enable_create_file_fail(false), enable_list_dir_fail(false)
+        : block_filesystem(),
+          enable_create_file_fail(false),
+          enable_list_dir_fail(false),
+          enable_remote_path_fail(false)
     {
     }
     virtual error_code initialize(const std::vector<std::string> &args) { return ERR_OK; }
@@ -201,6 +204,25 @@ public:
                               const remove_path_callback &cb,
                               dsn::task_tracker *tracker)
     {
+        remove_path_response resp;
+        if (enable_remote_path_fail) {
+            resp.err = ERR_MOCK_INTERNAL;
+        } else {
+            resp.err = ERR_OK;
+            std::string path_name = req.path;
+            if (dir_files.find(path_name) == dir_files.end()) {
+                resp.err = ERR_OBJECT_NOT_FOUND;
+            } else {
+                std::vector<ls_entry> files = dir_files[path_name];
+                if (!files.empty() && !req.recursive) {
+                    resp.err = ERR_DIR_NOT_EMPTY;
+                } else {
+                    dir_files.erase(path_name);
+                    resp.err = ERR_OK;
+                }
+            }
+        }
+        cb(resp);
         return task_ptr();
     }
 
@@ -209,6 +231,7 @@ public:
     std::map<std::string, std::pair<int64_t, std::string>> files;
     bool enable_create_file_fail;
     bool enable_list_dir_fail;
+    bool enable_remote_path_fail;
 };
 
 } // namespace block_service


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org