You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/08/25 10:31:56 UTC
[incubator-pegasus] branch backup_restore-dev updated: feat(backup): 5. replica create backup checkpoint (#1128)
This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch backup_restore-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/backup_restore-dev by this push:
new 48f48308a feat(backup): 5. replica create backup checkpoint (#1128)
48f48308a is described below
commit 48f48308acaac08b26ceb4677ba7d492c5e6f641
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Thu Aug 25 18:31:50 2022 +0800
feat(backup): 5. replica create backup checkpoint (#1128)
---
.../src/replica/backup/replica_backup_manager.cpp | 173 ++++++++++++++++++++-
.../src/replica/backup/replica_backup_manager.h | 77 ++++++++-
.../backup/test/replica_backup_manager_test.cpp | 116 +++++++++++++-
src/rdsn/src/replica/replica_stub.cpp | 18 +++
src/rdsn/src/replica/replica_stub.h | 4 +
5 files changed, 376 insertions(+), 12 deletions(-)
diff --git a/src/rdsn/src/replica/backup/replica_backup_manager.cpp b/src/rdsn/src/replica/backup/replica_backup_manager.cpp
index 233430abc..5908425c5 100644
--- a/src/rdsn/src/replica/backup/replica_backup_manager.cpp
+++ b/src/rdsn/src/replica/backup/replica_backup_manager.cpp
@@ -15,21 +15,182 @@
// specific language governing permissions and limitations
// under the License.
-#include "replica_backup_manager.h"
-#include "replica/replica.h"
-
#include <dsn/dist/fmt_logging.h>
-#include <dsn/utility/filesystem.h>
-#include <dsn/dist/replication/replication_app_base.h>
+#include <dsn/utility/fail_point.h>
+
+#include "replica_backup_manager.h"
namespace dsn {
namespace replication {
// TODO(heyuchen): implement it
-replica_backup_manager::replica_backup_manager(replica *r) : replica_base(r), _replica(r) {}
+replica_backup_manager::replica_backup_manager(replica *r)
+ : replica_base(r), _replica(r), _stub(r->get_replica_stub())
+{
+}
replica_backup_manager::~replica_backup_manager() {}
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::on_backup(const backup_request &request,
+ /*out*/ backup_response &response)
+{
+ // TODO(heyuchen): add other status
+
+ if (request.status == backup_status::CHECKPOINTING) {
+ try_to_checkpoint(request.backup_id, response);
+ return;
+ }
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::try_to_checkpoint(const int64_t &backup_id,
+ /*out*/ backup_response &response)
+{
+ switch (get_backup_status()) {
+ case backup_status::UNINITIALIZED:
+ start_checkpointing(backup_id, response);
+ break;
+ case backup_status::CHECKPOINTING:
+ case backup_status::CHECKPOINTED:
+ report_checkpointing(response);
+ break;
+ default:
+ response.err = ERR_INVALID_STATE;
+ derror_replica("invalid local status({}) while request status = {}",
+ enum_to_string(_status),
+ enum_to_string(backup_status::CHECKPOINTING));
+ break;
+ }
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::start_checkpointing(int64_t backup_id,
+ /*out*/ backup_response &response)
+{
+ FAIL_POINT_INJECT_F("replica_backup_start_checkpointing", [&](dsn::string_view) {
+ _status = backup_status::CHECKPOINTING;
+ response.err = ERR_OK;
+ });
+
+ ddebug_replica("start to checkpoint, backup_id = {}", backup_id);
+ zauto_write_lock l(_lock);
+ _status = backup_status::CHECKPOINTING;
+ _backup_id = backup_id;
+ _checkpointing_task =
+ tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
+ tracker(),
+ std::bind(&replica_backup_manager::generate_checkpoint, this));
+ fill_response_unlock(response);
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::report_checkpointing(/*out*/ backup_response &response)
+{
+ ddebug_replica("check checkpoint, backup_id = {}", _backup_id);
+ zauto_read_lock l(_lock);
+ if (_checkpoint_err != ERR_OK) {
+ derror_replica("checkpoint failed, error = {}", _checkpoint_err);
+ response.__set_checkpoint_upload_err(_checkpoint_err);
+ }
+ fill_response_unlock(response);
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::fill_response_unlock(/*out*/ backup_response &response)
+{
+ response.err = ERR_OK;
+ response.pid = get_gpid();
+ response.backup_id = _backup_id;
+ response.status = _status;
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION_LONG
+void replica_backup_manager::generate_checkpoint()
+{
+ const auto &local_checkpoint_dir = get_local_checkpoint_dir();
+
+ if (!utils::filesystem::directory_exists(local_checkpoint_dir) &&
+ !utils::filesystem::create_directory(local_checkpoint_dir)) {
+ derror_replica("create local backup dir {} failed", local_checkpoint_dir);
+ set_checkpoint_err(ERR_FILE_OPERATION_FAILED);
+ return;
+ }
+
+ // generate checkpoint and flush memtable
+ int64_t checkpoint_decree;
+ const auto &ec = _replica->_app->copy_checkpoint_to_dir(
+ local_checkpoint_dir.c_str(), &checkpoint_decree, true);
+ if (ec != ERR_OK) {
+ derror_replica("generate backup checkpoint failed, error = {}", ec);
+ set_checkpoint_err(ec);
+ return;
+ }
+ ddebug_replica(
+ "generate backup checkpoint succeed: checkpoint dir = {}, checkpoint decree = {}",
+ local_checkpoint_dir,
+ checkpoint_decree);
+
+ {
+ zauto_write_lock l(_lock);
+ if (!set_backup_metadata_unlock(
+ local_checkpoint_dir, checkpoint_decree, static_cast<int64_t>(dsn_now_ms()))) {
+ _checkpoint_err = ERR_FILE_OPERATION_FAILED;
+ return;
+ }
+ _checkpoint_err = ERR_OK;
+ _status = backup_status::CHECKPOINTED;
+ }
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION_LONG
+bool replica_backup_manager::set_backup_metadata_unlock(const std::string &local_checkpoint_dir,
+ int64_t checkpoint_decree,
+ int64_t checkpoint_timestamp)
+{
+ FAIL_POINT_INJECT_F("replica_set_backup_metadata", [](dsn::string_view) { return true; });
+
+ std::vector<std::string> sub_files;
+ if (!utils::filesystem::get_subfiles(local_checkpoint_dir, sub_files, false)) {
+ derror_replica("list sub files of local checkpoint dir = {} failed", local_checkpoint_dir);
+ return false;
+ }
+
+ int64_t total_file_size = 0;
+ for (const auto &file : sub_files) {
+ file_meta meta;
+ meta.name = utils::filesystem::get_file_name(file);
+ if (!utils::filesystem::file_size(file, meta.size)) {
+ derror_replica("get file size of {} failed", file);
+ return false;
+ }
+ if (utils::filesystem::md5sum(file, meta.md5) != ERR_OK) {
+ derror_replica("get file md5 of {} failed", file);
+ return false;
+ }
+ total_file_size += meta.size;
+ _backup_metadata.files.emplace_back(meta);
+ }
+
+ if (total_file_size <= 0) {
+ derror_replica(
+ "wrong metadata, total_size={}, file_count={}", total_file_size, sub_files.size());
+ return false;
+ }
+
+ _backup_metadata.checkpoint_decree = checkpoint_decree;
+ _backup_metadata.checkpoint_timestamp = checkpoint_timestamp;
+ _backup_metadata.checkpoint_total_size = total_file_size;
+ ddebug_replica("set backup metadata succeed, decree = {}, timestamp = {}, file_count = {}, "
+ "total_size = {}",
+ _backup_metadata.checkpoint_decree,
+ _backup_metadata.checkpoint_timestamp,
+ _backup_metadata.files.size(),
+ _backup_metadata.checkpoint_total_size);
+
+ return true;
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/rdsn/src/replica/backup/replica_backup_manager.h b/src/rdsn/src/replica/backup/replica_backup_manager.h
index 537f1e4a8..820fd439c 100644
--- a/src/rdsn/src/replica/backup/replica_backup_manager.h
+++ b/src/rdsn/src/replica/backup/replica_backup_manager.h
@@ -17,25 +17,94 @@
#pragma once
-#include <dsn/dist/replication/replica_base.h>
-#include <dsn/dist/replication/replication_types.h>
+#include <dsn/dist/replication/replication_app_base.h>
+#include <dsn/tool-api/zlocks.h>
+#include <dsn/utility/filesystem.h>
+
+#include "replica/replica.h"
+#include "replica/replica_stub.h"
namespace dsn {
namespace replication {
-// TODO(heyuchen): implement it
class replica;
+
+///
+/// Replica backup process
+///
+/// -----------> Invalid ----------------|
+/// | | |
+/// | v Error/Cancel |
+/// | Checkpoint -------------->|
+/// | | |
+/// | v Error/Cancel |
+/// | Checkpoined -------------->|
+/// | | |
+/// | v Error/Cancel |
+/// | Uploading -------------->|
+/// | | |
+/// | v |
+/// | Succeed |
+/// | | |
+/// | v |
+/// |<-- Async-clear backup files <------|
+
class replica_backup_manager : replica_base
{
public:
explicit replica_backup_manager(replica *r);
~replica_backup_manager();
+ void on_backup(const backup_request &request, /*out*/ backup_response &response);
+
+private:
+ void try_to_checkpoint(const int64_t &backup_id, /*out*/ backup_response &response);
+ void start_checkpointing(int64_t backup_id, /*out*/ backup_response &response);
+ void report_checkpointing(/*out*/ backup_response &response);
+ void fill_response_unlock(/*out*/ backup_response &response);
+
+ void generate_checkpoint();
+ bool set_backup_metadata_unlock(const std::string &local_checkpoint_dir,
+ int64_t checkpoint_decree,
+ int64_t checkpoint_timestamp);
+
+ task_tracker *tracker() { return _replica->tracker(); }
+
+ // local backup directory: <backup_dir>/<backup_id>
+ std::string get_local_checkpoint_dir()
+ {
+ zauto_read_lock l(_lock);
+ return utils::filesystem::path_combine(_replica->_app->backup_dir(),
+ std::to_string(_backup_id));
+ }
+
+ backup_status::type get_backup_status()
+ {
+ zauto_read_lock l(_lock);
+ return _status;
+ }
+
+ void set_checkpoint_err(const error_code &ec)
+ {
+ zauto_write_lock l(_lock);
+ _checkpoint_err = ec;
+ }
+
private:
+ replica *_replica;
+ replica_stub *_stub;
+
friend class replica;
+ friend class replica_stub;
friend class replica_backup_manager_test;
- replica *_replica;
+ zrwlock_nr _lock; // {
+ backup_status::type _status{backup_status::UNINITIALIZED};
+ int64_t _backup_id{0};
+ error_code _checkpoint_err{ERR_OK};
+ cold_backup_metadata _backup_metadata;
+ task_ptr _checkpointing_task;
+ // }
};
} // namespace replication
diff --git a/src/rdsn/src/replica/backup/test/replica_backup_manager_test.cpp b/src/rdsn/src/replica/backup/test/replica_backup_manager_test.cpp
index 678cbd622..e2d506eb6 100644
--- a/src/rdsn/src/replica/backup/test/replica_backup_manager_test.cpp
+++ b/src/rdsn/src/replica/backup/test/replica_backup_manager_test.cpp
@@ -15,17 +15,129 @@
// specific language governing permissions and limitations
// under the License.
-#include "replica/test/replica_test_base.h"
+#include <dsn/utility/fail_point.h>
+#include <gtest/gtest.h>
+
#include "replica/backup/replica_backup_manager.h"
+#include "replica/test/replica_test_base.h"
namespace dsn {
namespace replication {
-// TODO(heyuchen): implement it
class replica_backup_manager_test : public replica_test_base
{
public:
+ replica_backup_manager_test()
+ {
+ _replica = create_mock_replica(stub.get());
+ _backup_mgr = make_unique<replica_backup_manager>(_replica.get());
+ utils::filesystem::create_directory(LOCAL_BACKUP_DIR);
+ fail::setup();
+ }
+
+ ~replica_backup_manager_test()
+ {
+ utils::filesystem::remove_path(LOCAL_BACKUP_DIR);
+ utils::filesystem::remove_path(PATH);
+ fail::teardown();
+ }
+
+ void generate_checkpoint() { _backup_mgr->generate_checkpoint(); }
+
+ bool set_backup_metadata()
+ {
+ auto dir_name = create_local_backup_checkpoint_dir();
+ create_local_backup_file(dir_name, FILE_NAME1);
+ return _backup_mgr->set_backup_metadata_unlock(dir_name, DECREE, _backup_mgr->_backup_id);
+ }
+
+ void report_checkpointing(backup_response &response)
+ {
+ _backup_mgr->report_checkpointing(response);
+ }
+
+ void mock_local_backup_states(backup_status::type status,
+ error_code checkpoint_err = ERR_OK,
+ error_code upload_err = ERR_OK,
+ int32_t upload_file_size = 0)
+ {
+ _backup_mgr->_status = status;
+ _backup_mgr->_backup_id = dsn_now_ms();
+ _backup_mgr->_checkpoint_err = checkpoint_err;
+ // TODO(heyuchen): add upload params
+ // _backup_mgr->_upload_err = upload_err;
+ // _backup_mgr->_upload_file_size = upload_file_size;
+ _backup_mgr->_backup_metadata.checkpoint_total_size = 100;
+ }
+
+ std::string create_local_backup_checkpoint_dir()
+ {
+ _backup_mgr->_backup_id = dsn_now_ms();
+ auto dir = utils::filesystem::path_combine(LOCAL_BACKUP_DIR,
+ std::to_string(_backup_mgr->_backup_id));
+ utils::filesystem::create_directory(dir);
+ return dir;
+ }
+
+ void create_local_backup_file(const std::string &dir, const std::string &fname)
+ {
+ auto fpath = utils::filesystem::path_combine(dir, fname);
+ utils::filesystem::create_file(fpath);
+ std::string value = "test_value";
+ utils::filesystem::write_file(fpath, value);
+ }
+
+ backup_status::type get_status() { return _backup_mgr->_status; }
+
+ error_code get_checkpoint_err() { return _backup_mgr->_checkpoint_err; }
+
+protected:
+ const std::string LOCAL_BACKUP_DIR = "backup";
+ const std::string APP_NAME = "backup_test";
+ const std::string PROVIDER = "local_service";
+ const std::string PATH = "unit_test";
+ const int64_t DECREE = 5;
+ const std::string FILE_NAME1 = "test_file1";
+ const std::string FILE_NAME2 = "test_file2";
+ std::unique_ptr<replica_backup_manager> _backup_mgr;
};
+// TODO(heyuchen): add unit test for on_backup after implement all status
+
+TEST_F(replica_backup_manager_test, generate_checkpoint_test)
+{
+ fail::cfg("replica_set_backup_metadata", "return()");
+ mock_local_backup_states(backup_status::CHECKPOINTING);
+ generate_checkpoint();
+ ASSERT_EQ(get_checkpoint_err(), ERR_OK);
+ ASSERT_EQ(get_status(), backup_status::CHECKPOINTED);
+}
+
+TEST_F(replica_backup_manager_test, set_backup_metadata_test)
+{
+ ASSERT_TRUE(set_backup_metadata());
+}
+
+TEST_F(replica_backup_manager_test, report_checkpointing_test)
+{
+ struct test_struct
+ {
+ backup_status::type status;
+ error_code checkpoint_err;
+ } tests[]{
+ {backup_status::CHECKPOINTING, ERR_FILE_OPERATION_FAILED},
+ {backup_status::CHECKPOINTING, ERR_WRONG_TIMING},
+ {backup_status::CHECKPOINTING, ERR_LOCAL_APP_FAILURE},
+ {backup_status::CHECKPOINTED, ERR_OK},
+ };
+ for (const auto &test : tests) {
+ mock_local_backup_states(test.status, test.checkpoint_err);
+ backup_response resp;
+ report_checkpointing(resp);
+ ASSERT_EQ(resp.status, test.status);
+ ASSERT_EQ(resp.checkpoint_upload_err, test.checkpoint_err);
+ }
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/rdsn/src/replica/replica_stub.cpp b/src/rdsn/src/replica/replica_stub.cpp
index 17cb964c0..1c43f41c7 100644
--- a/src/rdsn/src/replica/replica_stub.cpp
+++ b/src/rdsn/src/replica/replica_stub.cpp
@@ -37,6 +37,7 @@
#include "replica_stub.h"
#include "mutation_log.h"
#include "mutation.h"
+#include "backup/replica_backup_manager.h"
#include "bulk_load/replica_bulk_loader.h"
#include "duplication/duplication_sync_timer.h"
#include "split/replica_split_manager.h"
@@ -2255,6 +2256,8 @@ void replica_stub::open_service()
RPC_DETECT_HOTKEY, "detect_hotkey", &replica_stub::on_detect_hotkey);
register_rpc_handler_with_rpc_holder(
RPC_ADD_NEW_DISK, "add_new_disk", &replica_stub::on_add_new_disk);
+ register_rpc_handler_with_rpc_holder(
+ RPC_COLD_BACKUP, "cold_backup", &replica_stub::on_cold_backup);
register_ctrl_command();
}
@@ -2997,5 +3000,20 @@ void replica_stub::update_disks_status()
}
}
+void replica_stub::on_cold_backup(backup_rpc rpc)
+{
+ const backup_request &request = rpc.request();
+ backup_response &response = rpc.response();
+
+ ddebug_f("[{}@{}]: receive backup request", request.pid, _primary_address_str);
+ replica_ptr rep = get_replica(request.pid);
+ if (rep != nullptr) {
+ rep->get_backup_manager()->on_backup(request, response);
+ } else {
+ derror_f("replica({}) is not existed", request.pid);
+ response.err = ERR_OBJECT_NOT_FOUND;
+ }
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/rdsn/src/replica/replica_stub.h b/src/rdsn/src/replica/replica_stub.h
index 06483a8c1..c0abf484a 100644
--- a/src/rdsn/src/replica/replica_stub.h
+++ b/src/rdsn/src/replica/replica_stub.h
@@ -39,6 +39,7 @@
#include <dsn/dist/nfs_node.h>
#include "common/replication_common.h"
+#include "common/backup_restore_common.h"
#include "common/bulk_load_common.h"
#include "common/fs_manager.h"
#include "block_service/block_service_manager.h"
@@ -80,6 +81,7 @@ class replica_stub;
typedef dsn::ref_ptr<replica_stub> replica_stub_ptr;
class duplication_sync_timer;
+class replica_backup_manager;
class replica_bulk_loader;
class replica_split_manager;
@@ -228,6 +230,8 @@ public:
// query last checkpoint info for follower in duplication process
void on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc);
+ void on_cold_backup(backup_rpc rpc);
+
private:
enum replica_node_state
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org