You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2022/08/12 06:39:42 UTC
[incubator-pegasus] branch backup_restore-dev updated: feat: add some functions for block service manager (#1109)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch backup_restore-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/backup_restore-dev by this push:
new 22d48c3dc feat: add some functions for block service manager (#1109)
22d48c3dc is described below
commit 22d48c3dcbf5a051d248b17cf6910d4ea868879b
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Fri Aug 12 14:39:37 2022 +0800
feat: add some functions for block service manager (#1109)
---
.../src/block_service/block_service_manager.cpp | 157 +++++++++++++++++++--
src/rdsn/src/block_service/block_service_manager.h | 38 +++++
.../test/block_service_manager_test.cpp | 77 ++++++++++
.../src/block_service/test/block_service_mock.h | 25 +++-
4 files changed, 288 insertions(+), 9 deletions(-)
diff --git a/src/rdsn/src/block_service/block_service_manager.cpp b/src/rdsn/src/block_service/block_service_manager.cpp
index fb63e6b97..fccacc495 100644
--- a/src/rdsn/src/block_service/block_service_manager.cpp
+++ b/src/rdsn/src/block_service/block_service_manager.cpp
@@ -112,6 +112,20 @@ static create_file_response create_block_file_sync(const std::string &remote_fil
return ret;
}
+error_code block_service_manager::create_block_file(const std::string &remote_file_name,
+ bool ignore_meta,
+ block_filesystem *fs,
+ task_tracker *tracker,
+ /*out*/ create_file_response &create_resp)
+{
+ create_resp = create_block_file_sync(remote_file_name, ignore_meta, fs, tracker);
+ const auto &err = create_resp.err;
+ if (err != ERR_OK) {
+ derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
+ }
+ return err;
+}
+
static download_response
download_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
{
@@ -150,19 +164,19 @@ error_code block_service_manager::download_file(const std::string &remote_dir,
}
task_tracker tracker;
-
// Create a block_file object.
- const std::string remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
- auto create_resp =
- create_block_file_sync(remote_file_name, false /*ignore file meta*/, fs, &tracker);
- error_code err = create_resp.err;
+ create_file_response create_resp;
+ auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name),
+ false /*ignore file meta*/,
+ fs,
+ &tracker,
+ create_resp);
if (err != ERR_OK) {
- derror_f("create file({}) failed with error({})", remote_file_name, err.to_string());
return err;
}
- block_file_ptr bf = create_resp.file_handle;
- download_response resp = download_block_file_sync(local_file_name, bf.get(), &tracker);
+ download_response resp =
+ download_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker);
if (resp.err != ERR_OK) {
// during bulk load process, ERR_OBJECT_NOT_FOUND will be considered as a recoverable
// error, however, if file damaged on remote file provider, bulk load should stop,
@@ -184,6 +198,133 @@ error_code block_service_manager::download_file(const std::string &remote_dir,
return ERR_OK;
}
+static upload_response
+upload_block_file_sync(const std::string &local_file_path, block_file *bf, task_tracker *tracker)
+{
+ upload_response ret;
+ bf->upload(upload_request{local_file_path},
+ TASK_CODE_EXEC_INLINED,
+ [&ret](const upload_response &resp) { ret = resp; },
+ tracker);
+ tracker->wait_outstanding_tasks();
+ return ret;
+}
+
+error_code block_service_manager::upload_file(const std::string &remote_dir,
+ const std::string &local_dir,
+ const std::string &file_name,
+ block_filesystem *fs)
+{
+ task_tracker tracker;
+ // Create a block_file object.
+ create_file_response create_resp;
+ auto err = create_block_file(utils::filesystem::path_combine(remote_dir, file_name),
+ false /*ignore file meta*/,
+ fs,
+ &tracker,
+ create_resp);
+ if (err != ERR_OK) {
+ return err;
+ }
+ // Upload file
+ const auto &local_file_name = utils::filesystem::path_combine(local_dir, file_name);
+ const upload_response &resp =
+ upload_block_file_sync(local_file_name, create_resp.file_handle.get(), &tracker);
+ if (resp.err != ERR_OK) {
+ return resp.err;
+ }
+ ddebug_f("upload file({}) succeed", local_file_name);
+ return ERR_OK;
+}
+
+static write_response
+write_block_file_sync(const blob &value, block_file *bf, task_tracker *tracker)
+{
+ write_response ret;
+ bf->write(write_request{value},
+ TASK_CODE_EXEC_INLINED,
+ [&ret](const write_response &resp) { ret = resp; },
+ tracker);
+ tracker->wait_outstanding_tasks();
+ return ret;
+}
+
+error_code block_service_manager::write_file(const std::string &remote_dir,
+ const std::string &file_name,
+ const blob &value,
+ block_filesystem *fs)
+{
+ task_tracker tracker;
+ // Create a block_file object.
+ create_file_response create_resp;
+ const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
+ auto err =
+ create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp);
+ if (err != ERR_OK) {
+ return err;
+ }
+ // Write blob
+ const write_response &resp =
+ write_block_file_sync(value, create_resp.file_handle.get(), &tracker);
+ if (resp.err != ERR_OK) {
+ return resp.err;
+ }
+ ddebug_f("write remote file({}) succeed", remote_file_name);
+ return ERR_OK;
+}
+
+error_code
+block_service_manager::remove_path(const std::string &path, bool recursive, block_filesystem *fs)
+{
+ task_tracker tracker;
+ remove_path_response ret;
+ fs->remove_path(remove_path_request{path, recursive},
+ TASK_CODE_EXEC_INLINED,
+ [&ret](const remove_path_response &resp) { ret = resp; },
+ &tracker);
+ tracker.wait_outstanding_tasks();
+ return ret.err;
+}
+
+static read_response read_block_file_sync(block_file *bf,
+ const uint64_t remote_pos,
+ const int64_t remote_length,
+ task_tracker *tracker)
+{
+ read_response ret;
+ bf->read(read_request{remote_pos, remote_length},
+ TASK_CODE_EXEC_INLINED,
+ [&ret](const read_response &resp) { ret = resp; },
+ tracker);
+ tracker->wait_outstanding_tasks();
+ return ret;
+}
+
+error_code block_service_manager::read_file(const std::string &remote_dir,
+ const std::string &file_name,
+ block_filesystem *fs,
+ blob &value)
+{
+ task_tracker tracker;
+ // Create a block_file object.
+ create_file_response create_resp;
+ const auto &remote_file_name = utils::filesystem::path_combine(remote_dir, file_name);
+ auto err =
+ create_block_file(remote_file_name, false /*ignore file meta*/, fs, &tracker, create_resp);
+ if (err != ERR_OK) {
+ return err;
+ }
+ // Read blob
+ const read_response &resp = read_block_file_sync(
+ create_resp.file_handle.get(), 0 /*remote_pos*/, -1 /*remote_length*/, &tracker);
+ if (resp.err != ERR_OK) {
+ return resp.err;
+ }
+ ddebug_f("read remote file({}) succeed", remote_file_name);
+ value = resp.buffer;
+ return ERR_OK;
+}
+
} // namespace block_service
} // namespace dist
} // namespace dsn
diff --git a/src/rdsn/src/block_service/block_service_manager.h b/src/rdsn/src/block_service/block_service_manager.h
index 65b0b7294..ec67db1bd 100644
--- a/src/rdsn/src/block_service/block_service_manager.h
+++ b/src/rdsn/src/block_service/block_service_manager.h
@@ -45,6 +45,14 @@ public:
~block_service_manager();
block_filesystem *get_or_create_block_filesystem(const std::string &provider);
+ // create block file
+ // \return ERR_FS_INTERNAL: remote file system error
+ error_code create_block_file(const std::string &remote_file_name,
+ bool ignore_meta,
+ block_filesystem *fs,
+ task_tracker *tracker,
+ /*out*/ create_file_response &resp);
+
// download files from remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
@@ -69,6 +77,36 @@ public:
block_filesystem *fs,
/*out*/ uint64_t &download_file_size);
+ // upload files from remote file system
+ // \return ERR_FILE_OPERATION_FAILED: local file system error
+ // \return ERR_FS_INTERNAL: remote file system error
+ error_code upload_file(const std::string &remote_dir,
+ const std::string &local_dir,
+ const std::string &file_name,
+ block_filesystem *fs);
+
+ // write blob value onto remote file system
+ // \return ERR_FILE_OPERATION_FAILED: local file system error
+ // \return ERR_FS_INTERNAL: remote file system error
+ error_code write_file(const std::string &remote_dir,
+ const std::string &file_name,
+ const blob &value,
+ block_filesystem *fs);
+
+ // remove path on remote file system
+ // \return ERR_OBJECT_NOT_FOUND: remove path not exist
+ // \return ERR_FS_INTERNAL: remote file system error
+ // \return ERR_DIR_NOT_EMPTY: path not empty
+ error_code remove_path(const std::string &path, bool recursive, block_filesystem *fs);
+
+ // read blob value from remote file system
+ // \return ERR_FILE_OPERATION_FAILED: local file system error
+ // \return ERR_FS_INTERNAL: remote file system error
+ error_code read_file(const std::string &remote_dir,
+ const std::string &file_name,
+ block_filesystem *fs,
+ /*out*/ blob &value);
+
private:
block_service_registry &_registry_holder;
diff --git a/src/rdsn/src/block_service/test/block_service_manager_test.cpp b/src/rdsn/src/block_service/test/block_service_manager_test.cpp
index e2269de6b..7c5f863c4 100644
--- a/src/rdsn/src/block_service/test/block_service_manager_test.cpp
+++ b/src/rdsn/src/block_service/test/block_service_manager_test.cpp
@@ -46,6 +46,27 @@ public:
PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get(), download_size);
}
+ error_code test_upload_file()
+ {
+ return _block_service_manager.upload_file(PROVIDER, LOCAL_DIR, FILE_NAME, _fs.get());
+ }
+
+ error_code test_write_file()
+ {
+ return _block_service_manager.write_file(
+ PROVIDER, FILE_NAME, blob::create_from_bytes("test_value"), _fs.get());
+ }
+
+ error_code test_read_file(blob &value)
+ {
+ return _block_service_manager.read_file(PROVIDER, FILE_NAME, _fs.get(), value);
+ }
+
+ error_code test_remove_path(bool recursive)
+ {
+ return _block_service_manager.remove_path(REMOTE_DIR, recursive, _fs.get());
+ }
+
void create_local_file(const std::string &file_name)
{
std::string whole_name = utils::filesystem::path_combine(LOCAL_DIR, file_name);
@@ -63,9 +84,28 @@ public:
void create_remote_file(const std::string &file_name, int64_t size, const std::string &md5)
{
std::string whole_file_name = utils::filesystem::path_combine(PROVIDER, file_name);
+
_fs->files[whole_file_name] = std::make_pair(size, md5);
}
+ void create_remote_dir(bool empty)
+ {
+ std::vector<ls_entry> entries;
+ if (!empty) {
+ ls_entry dir_entry;
+ dir_entry.entry_name = REMOTE_DIR;
+ dir_entry.is_directory = true;
+ entries.emplace_back(dir_entry);
+ ls_entry file_entry;
+ file_entry.entry_name = FILE_NAME;
+ file_entry.is_directory = false;
+ entries.emplace_back(file_entry);
+ }
+ _fs->dir_files[REMOTE_DIR] = entries;
+ }
+
+ void clear_remote_dir() { _fs->dir_files.clear(); }
+
public:
block_service_manager _block_service_manager;
std::unique_ptr<block_service_mock> _fs;
@@ -74,6 +114,7 @@ public:
std::string PROVIDER = "local_service";
std::string LOCAL_DIR = "test_dir";
std::string FILE_NAME = "test_file";
+ std::string REMOTE_DIR = "remote_test_dir";
};
// download_file unit tests
@@ -119,6 +160,42 @@ TEST_F(block_service_manager_test, do_download_succeed)
ASSERT_EQ(download_size, _file_meta.size);
}
+TEST_F(block_service_manager_test, upload_file_test)
+{
+ create_local_file(FILE_NAME);
+ ASSERT_EQ(test_upload_file(), ERR_OK);
+}
+
+TEST_F(block_service_manager_test, write_file_test) { ASSERT_EQ(test_write_file(), ERR_OK); }
+
+TEST_F(block_service_manager_test, read_file_test)
+{
+ blob value;
+ ASSERT_EQ(test_read_file(value), ERR_OK);
+}
+
+TEST_F(block_service_manager_test, remove_path_test)
+{
+ struct test_struct
+ {
+ bool mock_dir;
+ bool dir_empty;
+ bool recursive;
+ error_code expected_err;
+ } tests[]{{false, false, false, ERR_OBJECT_NOT_FOUND},
+ {true, true, false, ERR_OK},
+ {true, true, true, ERR_OK},
+ {true, false, false, ERR_DIR_NOT_EMPTY},
+ {true, false, true, ERR_OK}};
+ for (const auto &test : tests) {
+ if (test.mock_dir) {
+ create_remote_dir(test.dir_empty);
+ }
+ ASSERT_EQ(test_remove_path(test.recursive), test.expected_err);
+ clear_remote_dir();
+ }
+}
+
} // namespace block_service
} // namespace dist
} // namespace dsn
diff --git a/src/rdsn/src/block_service/test/block_service_mock.h b/src/rdsn/src/block_service/test/block_service_mock.h
index 5c899a39d..dc23063b1 100644
--- a/src/rdsn/src/block_service/test/block_service_mock.h
+++ b/src/rdsn/src/block_service/test/block_service_mock.h
@@ -145,7 +145,10 @@ class block_service_mock : public block_filesystem
{
public:
block_service_mock()
- : block_filesystem(), enable_create_file_fail(false), enable_list_dir_fail(false)
+ : block_filesystem(),
+ enable_create_file_fail(false),
+ enable_list_dir_fail(false),
+ enable_remote_path_fail(false)
{
}
virtual error_code initialize(const std::vector<std::string> &args) { return ERR_OK; }
@@ -201,6 +204,25 @@ public:
const remove_path_callback &cb,
dsn::task_tracker *tracker)
{
+ remove_path_response resp;
+ if (enable_remote_path_fail) {
+ resp.err = ERR_MOCK_INTERNAL;
+ } else {
+ resp.err = ERR_OK;
+ std::string path_name = req.path;
+ if (dir_files.find(path_name) == dir_files.end()) {
+ resp.err = ERR_OBJECT_NOT_FOUND;
+ } else {
+ std::vector<ls_entry> files = dir_files[path_name];
+ if (!files.empty() && !req.recursive) {
+ resp.err = ERR_DIR_NOT_EMPTY;
+ } else {
+ dir_files.erase(path_name);
+ resp.err = ERR_OK;
+ }
+ }
+ }
+ cb(resp);
return task_ptr();
}
@@ -209,6 +231,7 @@ public:
std::map<std::string, std::pair<int64_t, std::string>> files;
bool enable_create_file_fail;
bool enable_list_dir_fail;
+ bool enable_remote_path_fail;
};
} // namespace block_service
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org