You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/08/25 10:31:56 UTC

[incubator-pegasus] branch backup_restore-dev updated: feat(backup): 5. replica create backup checkpoint (#1128)

This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch backup_restore-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/backup_restore-dev by this push:
     new 48f48308a feat(backup): 5. replica create backup checkpoint (#1128)
48f48308a is described below

commit 48f48308acaac08b26ceb4677ba7d492c5e6f641
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Thu Aug 25 18:31:50 2022 +0800

    feat(backup): 5. replica create backup checkpoint (#1128)
---
 .../src/replica/backup/replica_backup_manager.cpp  | 173 ++++++++++++++++++++-
 .../src/replica/backup/replica_backup_manager.h    |  77 ++++++++-
 .../backup/test/replica_backup_manager_test.cpp    | 116 +++++++++++++-
 src/rdsn/src/replica/replica_stub.cpp              |  18 +++
 src/rdsn/src/replica/replica_stub.h                |   4 +
 5 files changed, 376 insertions(+), 12 deletions(-)

diff --git a/src/rdsn/src/replica/backup/replica_backup_manager.cpp b/src/rdsn/src/replica/backup/replica_backup_manager.cpp
index 233430abc..5908425c5 100644
--- a/src/rdsn/src/replica/backup/replica_backup_manager.cpp
+++ b/src/rdsn/src/replica/backup/replica_backup_manager.cpp
@@ -15,21 +15,182 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "replica_backup_manager.h"
-#include "replica/replica.h"
-
 #include <dsn/dist/fmt_logging.h>
-#include <dsn/utility/filesystem.h>
-#include <dsn/dist/replication/replication_app_base.h>
+#include <dsn/utility/fail_point.h>
+
+#include "replica_backup_manager.h"
 
 namespace dsn {
 namespace replication {
 
 // TODO(heyuchen): implement it
 
-replica_backup_manager::replica_backup_manager(replica *r) : replica_base(r), _replica(r) {}
+replica_backup_manager::replica_backup_manager(replica *r)
+    : replica_base(r), _replica(r), _stub(r->get_replica_stub())
+{
+}
 
 replica_backup_manager::~replica_backup_manager() {}
 
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::on_backup(const backup_request &request,
+                                       /*out*/ backup_response &response)
+{
+    // TODO(heyuchen): add other status
+
+    if (request.status == backup_status::CHECKPOINTING) {
+        try_to_checkpoint(request.backup_id, response);
+        return;
+    }
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::try_to_checkpoint(const int64_t &backup_id,
+                                               /*out*/ backup_response &response)
+{
+    switch (get_backup_status()) {
+    case backup_status::UNINITIALIZED:
+        start_checkpointing(backup_id, response);
+        break;
+    case backup_status::CHECKPOINTING:
+    case backup_status::CHECKPOINTED:
+        report_checkpointing(response);
+        break;
+    default:
+        response.err = ERR_INVALID_STATE;
+        derror_replica("invalid local status({}) while request status = {}",
+                       enum_to_string(_status),
+                       enum_to_string(backup_status::CHECKPOINTING));
+        break;
+    }
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::start_checkpointing(int64_t backup_id,
+                                                 /*out*/ backup_response &response)
+{
+    FAIL_POINT_INJECT_F("replica_backup_start_checkpointing", [&](dsn::string_view) {
+        _status = backup_status::CHECKPOINTING;
+        response.err = ERR_OK;
+    });
+
+    ddebug_replica("start to checkpoint, backup_id = {}", backup_id);
+    zauto_write_lock l(_lock);
+    _status = backup_status::CHECKPOINTING;
+    _backup_id = backup_id;
+    _checkpointing_task =
+        tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP,
+                         tracker(),
+                         std::bind(&replica_backup_manager::generate_checkpoint, this));
+    fill_response_unlock(response);
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::report_checkpointing(/*out*/ backup_response &response)
+{
+    ddebug_replica("check checkpoint, backup_id = {}", _backup_id);
+    zauto_read_lock l(_lock);
+    if (_checkpoint_err != ERR_OK) {
+        derror_replica("checkpoint failed, error = {}", _checkpoint_err);
+        response.__set_checkpoint_upload_err(_checkpoint_err);
+    }
+    fill_response_unlock(response);
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION
+void replica_backup_manager::fill_response_unlock(/*out*/ backup_response &response)
+{
+    response.err = ERR_OK;
+    response.pid = get_gpid();
+    response.backup_id = _backup_id;
+    response.status = _status;
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION_LONG
+void replica_backup_manager::generate_checkpoint()
+{
+    const auto &local_checkpoint_dir = get_local_checkpoint_dir();
+
+    if (!utils::filesystem::directory_exists(local_checkpoint_dir) &&
+        !utils::filesystem::create_directory(local_checkpoint_dir)) {
+        derror_replica("create local backup dir {} failed", local_checkpoint_dir);
+        set_checkpoint_err(ERR_FILE_OPERATION_FAILED);
+        return;
+    }
+
+    // generate checkpoint and flush memtable
+    int64_t checkpoint_decree;
+    const auto &ec = _replica->_app->copy_checkpoint_to_dir(
+        local_checkpoint_dir.c_str(), &checkpoint_decree, true);
+    if (ec != ERR_OK) {
+        derror_replica("generate backup checkpoint failed, error = {}", ec);
+        set_checkpoint_err(ec);
+        return;
+    }
+    ddebug_replica(
+        "generate backup checkpoint succeed: checkpoint dir = {}, checkpoint decree = {}",
+        local_checkpoint_dir,
+        checkpoint_decree);
+
+    {
+        zauto_write_lock l(_lock);
+        if (!set_backup_metadata_unlock(
+                local_checkpoint_dir, checkpoint_decree, static_cast<int64_t>(dsn_now_ms()))) {
+            _checkpoint_err = ERR_FILE_OPERATION_FAILED;
+            return;
+        }
+        _checkpoint_err = ERR_OK;
+        _status = backup_status::CHECKPOINTED;
+    }
+}
+
+// ThreadPool: THREAD_POOL_REPLICATION_LONG
+bool replica_backup_manager::set_backup_metadata_unlock(const std::string &local_checkpoint_dir,
+                                                        int64_t checkpoint_decree,
+                                                        int64_t checkpoint_timestamp)
+{
+    FAIL_POINT_INJECT_F("replica_set_backup_metadata", [](dsn::string_view) { return true; });
+
+    std::vector<std::string> sub_files;
+    if (!utils::filesystem::get_subfiles(local_checkpoint_dir, sub_files, false)) {
+        derror_replica("list sub files of local checkpoint dir = {} failed", local_checkpoint_dir);
+        return false;
+    }
+
+    int64_t total_file_size = 0;
+    for (const auto &file : sub_files) {
+        file_meta meta;
+        meta.name = utils::filesystem::get_file_name(file);
+        if (!utils::filesystem::file_size(file, meta.size)) {
+            derror_replica("get file size of {} failed", file);
+            return false;
+        }
+        if (utils::filesystem::md5sum(file, meta.md5) != ERR_OK) {
+            derror_replica("get file md5 of {} failed", file);
+            return false;
+        }
+        total_file_size += meta.size;
+        _backup_metadata.files.emplace_back(meta);
+    }
+
+    if (total_file_size <= 0) {
+        derror_replica(
+            "wrong metadata, total_size={}, file_count={}", total_file_size, sub_files.size());
+        return false;
+    }
+
+    _backup_metadata.checkpoint_decree = checkpoint_decree;
+    _backup_metadata.checkpoint_timestamp = checkpoint_timestamp;
+    _backup_metadata.checkpoint_total_size = total_file_size;
+    ddebug_replica("set backup metadata succeed, decree = {}, timestamp = {}, file_count = {}, "
+                   "total_size = {}",
+                   _backup_metadata.checkpoint_decree,
+                   _backup_metadata.checkpoint_timestamp,
+                   _backup_metadata.files.size(),
+                   _backup_metadata.checkpoint_total_size);
+
+    return true;
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/rdsn/src/replica/backup/replica_backup_manager.h b/src/rdsn/src/replica/backup/replica_backup_manager.h
index 537f1e4a8..820fd439c 100644
--- a/src/rdsn/src/replica/backup/replica_backup_manager.h
+++ b/src/rdsn/src/replica/backup/replica_backup_manager.h
@@ -17,25 +17,94 @@
 
 #pragma once
 
-#include <dsn/dist/replication/replica_base.h>
-#include <dsn/dist/replication/replication_types.h>
+#include <dsn/dist/replication/replication_app_base.h>
+#include <dsn/tool-api/zlocks.h>
+#include <dsn/utility/filesystem.h>
+
+#include "replica/replica.h"
+#include "replica/replica_stub.h"
 
 namespace dsn {
 namespace replication {
 
-// TODO(heyuchen): implement it
 class replica;
+
+///
+/// Replica backup process
+///
+///  ----------->  Invalid  ----------------|
+///  |                |                     |
+///  |                v       Error/Cancel  |
+///  |            Checkpoint -------------->|
+///  |                |                     |
+///  |                v       Error/Cancel  |
+///  |           Checkpoined -------------->|
+///  |                |                     |
+///  |                v       Error/Cancel  |
+///  |            Uploading  -------------->|
+///  |                |                     |
+///  |                v                     |
+///  |             Succeed                  |
+///  |                |                     |
+///  |                v                     |
+///  |<--  Async-clear backup files  <------|
+
 class replica_backup_manager : replica_base
 {
 public:
     explicit replica_backup_manager(replica *r);
     ~replica_backup_manager();
 
+    void on_backup(const backup_request &request, /*out*/ backup_response &response);
+
+private:
+    void try_to_checkpoint(const int64_t &backup_id, /*out*/ backup_response &response);
+    void start_checkpointing(int64_t backup_id, /*out*/ backup_response &response);
+    void report_checkpointing(/*out*/ backup_response &response);
+    void fill_response_unlock(/*out*/ backup_response &response);
+
+    void generate_checkpoint();
+    bool set_backup_metadata_unlock(const std::string &local_checkpoint_dir,
+                                    int64_t checkpoint_decree,
+                                    int64_t checkpoint_timestamp);
+
+    task_tracker *tracker() { return _replica->tracker(); }
+
+    // local backup directory: <backup_dir>/<backup_id>
+    std::string get_local_checkpoint_dir()
+    {
+        zauto_read_lock l(_lock);
+        return utils::filesystem::path_combine(_replica->_app->backup_dir(),
+                                               std::to_string(_backup_id));
+    }
+
+    backup_status::type get_backup_status()
+    {
+        zauto_read_lock l(_lock);
+        return _status;
+    }
+
+    void set_checkpoint_err(const error_code &ec)
+    {
+        zauto_write_lock l(_lock);
+        _checkpoint_err = ec;
+    }
+
 private:
+    replica *_replica;
+    replica_stub *_stub;
+
     friend class replica;
+    friend class replica_stub;
     friend class replica_backup_manager_test;
 
-    replica *_replica;
+    zrwlock_nr _lock; // {
+    backup_status::type _status{backup_status::UNINITIALIZED};
+    int64_t _backup_id{0};
+    error_code _checkpoint_err{ERR_OK};
+    cold_backup_metadata _backup_metadata;
+    task_ptr _checkpointing_task;
+    // }
 };
 
 } // namespace replication
diff --git a/src/rdsn/src/replica/backup/test/replica_backup_manager_test.cpp b/src/rdsn/src/replica/backup/test/replica_backup_manager_test.cpp
index 678cbd622..e2d506eb6 100644
--- a/src/rdsn/src/replica/backup/test/replica_backup_manager_test.cpp
+++ b/src/rdsn/src/replica/backup/test/replica_backup_manager_test.cpp
@@ -15,17 +15,129 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "replica/test/replica_test_base.h"
+#include <dsn/utility/fail_point.h>
+#include <gtest/gtest.h>
+
 #include "replica/backup/replica_backup_manager.h"
+#include "replica/test/replica_test_base.h"
 
 namespace dsn {
 namespace replication {
 
-// TODO(heyuchen): implement it
 class replica_backup_manager_test : public replica_test_base
 {
 public:
+    replica_backup_manager_test()
+    {
+        _replica = create_mock_replica(stub.get());
+        _backup_mgr = make_unique<replica_backup_manager>(_replica.get());
+        utils::filesystem::create_directory(LOCAL_BACKUP_DIR);
+        fail::setup();
+    }
+
+    ~replica_backup_manager_test()
+    {
+        utils::filesystem::remove_path(LOCAL_BACKUP_DIR);
+        utils::filesystem::remove_path(PATH);
+        fail::teardown();
+    }
+
+    void generate_checkpoint() { _backup_mgr->generate_checkpoint(); }
+
+    bool set_backup_metadata()
+    {
+        auto dir_name = create_local_backup_checkpoint_dir();
+        create_local_backup_file(dir_name, FILE_NAME1);
+        return _backup_mgr->set_backup_metadata_unlock(dir_name, DECREE, _backup_mgr->_backup_id);
+    }
+
+    void report_checkpointing(backup_response &response)
+    {
+        _backup_mgr->report_checkpointing(response);
+    }
+
+    void mock_local_backup_states(backup_status::type status,
+                                  error_code checkpoint_err = ERR_OK,
+                                  error_code upload_err = ERR_OK,
+                                  int32_t upload_file_size = 0)
+    {
+        _backup_mgr->_status = status;
+        _backup_mgr->_backup_id = dsn_now_ms();
+        _backup_mgr->_checkpoint_err = checkpoint_err;
+        // TODO(heyuchen): add upload params
+        // _backup_mgr->_upload_err = upload_err;
+        // _backup_mgr->_upload_file_size = upload_file_size;
+        _backup_mgr->_backup_metadata.checkpoint_total_size = 100;
+    }
+
+    std::string create_local_backup_checkpoint_dir()
+    {
+        _backup_mgr->_backup_id = dsn_now_ms();
+        auto dir = utils::filesystem::path_combine(LOCAL_BACKUP_DIR,
+                                                   std::to_string(_backup_mgr->_backup_id));
+        utils::filesystem::create_directory(dir);
+        return dir;
+    }
+
+    void create_local_backup_file(const std::string &dir, const std::string &fname)
+    {
+        auto fpath = utils::filesystem::path_combine(dir, fname);
+        utils::filesystem::create_file(fpath);
+        std::string value = "test_value";
+        utils::filesystem::write_file(fpath, value);
+    }
+
+    backup_status::type get_status() { return _backup_mgr->_status; }
+
+    error_code get_checkpoint_err() { return _backup_mgr->_checkpoint_err; }
+
+protected:
+    const std::string LOCAL_BACKUP_DIR = "backup";
+    const std::string APP_NAME = "backup_test";
+    const std::string PROVIDER = "local_service";
+    const std::string PATH = "unit_test";
+    const int64_t DECREE = 5;
+    const std::string FILE_NAME1 = "test_file1";
+    const std::string FILE_NAME2 = "test_file2";
+    std::unique_ptr<replica_backup_manager> _backup_mgr;
 };
 
+// TODO(heyuchen): add unit test for on_backup after implement all status
+
+TEST_F(replica_backup_manager_test, generate_checkpoint_test)
+{
+    fail::cfg("replica_set_backup_metadata", "return()");
+    mock_local_backup_states(backup_status::CHECKPOINTING);
+    generate_checkpoint();
+    ASSERT_EQ(get_checkpoint_err(), ERR_OK);
+    ASSERT_EQ(get_status(), backup_status::CHECKPOINTED);
+}
+
+TEST_F(replica_backup_manager_test, set_backup_metadata_test)
+{
+    ASSERT_TRUE(set_backup_metadata());
+}
+
+TEST_F(replica_backup_manager_test, report_checkpointing_test)
+{
+    struct test_struct
+    {
+        backup_status::type status;
+        error_code checkpoint_err;
+    } tests[]{
+        {backup_status::CHECKPOINTING, ERR_FILE_OPERATION_FAILED},
+        {backup_status::CHECKPOINTING, ERR_WRONG_TIMING},
+        {backup_status::CHECKPOINTING, ERR_LOCAL_APP_FAILURE},
+        {backup_status::CHECKPOINTED, ERR_OK},
+    };
+    for (const auto &test : tests) {
+        mock_local_backup_states(test.status, test.checkpoint_err);
+        backup_response resp;
+        report_checkpointing(resp);
+        ASSERT_EQ(resp.status, test.status);
+        ASSERT_EQ(resp.checkpoint_upload_err, test.checkpoint_err);
+    }
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/rdsn/src/replica/replica_stub.cpp b/src/rdsn/src/replica/replica_stub.cpp
index 17cb964c0..1c43f41c7 100644
--- a/src/rdsn/src/replica/replica_stub.cpp
+++ b/src/rdsn/src/replica/replica_stub.cpp
@@ -37,6 +37,7 @@
 #include "replica_stub.h"
 #include "mutation_log.h"
 #include "mutation.h"
+#include "backup/replica_backup_manager.h"
 #include "bulk_load/replica_bulk_loader.h"
 #include "duplication/duplication_sync_timer.h"
 #include "split/replica_split_manager.h"
@@ -2255,6 +2256,8 @@ void replica_stub::open_service()
         RPC_DETECT_HOTKEY, "detect_hotkey", &replica_stub::on_detect_hotkey);
     register_rpc_handler_with_rpc_holder(
         RPC_ADD_NEW_DISK, "add_new_disk", &replica_stub::on_add_new_disk);
+    register_rpc_handler_with_rpc_holder(
+        RPC_COLD_BACKUP, "cold_backup", &replica_stub::on_cold_backup);
 
     register_ctrl_command();
 }
@@ -2997,5 +3000,20 @@ void replica_stub::update_disks_status()
     }
 }
 
+void replica_stub::on_cold_backup(backup_rpc rpc)
+{
+    const backup_request &request = rpc.request();
+    backup_response &response = rpc.response();
+
+    ddebug_f("[{}@{}]: receive backup request", request.pid, _primary_address_str);
+    replica_ptr rep = get_replica(request.pid);
+    if (rep != nullptr) {
+        rep->get_backup_manager()->on_backup(request, response);
+    } else {
+        derror_f("replica({}) is not existed", request.pid);
+        response.err = ERR_OBJECT_NOT_FOUND;
+    }
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/rdsn/src/replica/replica_stub.h b/src/rdsn/src/replica/replica_stub.h
index 06483a8c1..c0abf484a 100644
--- a/src/rdsn/src/replica/replica_stub.h
+++ b/src/rdsn/src/replica/replica_stub.h
@@ -39,6 +39,7 @@
 #include <dsn/dist/nfs_node.h>
 
 #include "common/replication_common.h"
+#include "common/backup_restore_common.h"
 #include "common/bulk_load_common.h"
 #include "common/fs_manager.h"
 #include "block_service/block_service_manager.h"
@@ -80,6 +81,7 @@ class replica_stub;
 typedef dsn::ref_ptr<replica_stub> replica_stub_ptr;
 
 class duplication_sync_timer;
+class replica_backup_manager;
 class replica_bulk_loader;
 class replica_split_manager;
 
@@ -228,6 +230,8 @@ public:
     // query last checkpoint info for follower in duplication process
     void on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc);
 
+    void on_cold_backup(backup_rpc rpc);
+
 private:
     enum replica_node_state
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org