You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2022/03/04 05:18:33 UTC

[incubator-pegasus] branch duplication_dev updated: feat(dup_enhancement#17): replica follower load duplication data when open replica (#917)

This is an automated email from the ASF dual-hosted git repository.

zhaoliwei pushed a commit to branch duplication_dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/duplication_dev by this push:
     new ebbc61c  feat(dup_enhancement#17):  replica follower load duplication data when open replica (#917)
ebbc61c is described below

commit ebbc61c3e16b54826f2e023aea4dbb346555344f
Author: Jiashuo <js...@live.com>
AuthorDate: Fri Mar 4 13:16:14 2022 +0800

    feat(dup_enhancement#17):  replica follower load duplication data when open replica (#917)
---
 .github/workflows/ci-pull-request.yaml       |   1 +
 rdsn                                         |   2 +-
 src/server/pegasus_server_impl.cpp           | 156 ++++++++++++++-------------
 src/server/test/CMakeLists.txt               |   1 +
 src/server/test/pegasus_server_impl_test.cpp |  24 +++++
 src/server/test/pegasus_server_test_base.h   |  18 +++-
 src/server/test/rocksdb_wrapper_test.cpp     |   6 +-
 src/shell/commands/duplication.cpp           |   6 +-
 8 files changed, 130 insertions(+), 84 deletions(-)

diff --git a/.github/workflows/ci-pull-request.yaml b/.github/workflows/ci-pull-request.yaml
index 544640a..2d43c68 100644
--- a/.github/workflows/ci-pull-request.yaml
+++ b/.github/workflows/ci-pull-request.yaml
@@ -14,6 +14,7 @@ on:
       - master
       - 'v[0-9]+.*' # release branch
       - ci-test # testing branch for github action
+      - '*dev'
   # for manually triggering workflow
   workflow_dispatch:
 
diff --git a/rdsn b/rdsn
index 8c310b9..03f7b61 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 8c310b95f8cf0a2123da5eed7e6621b6f94f30e0
+Subproject commit 03f7b613d29c611844af831b9fcb2016620b9977
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 4221aea..019854b 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -32,6 +32,7 @@
 #include <dsn/dist/replication/replication.codes.h>
 #include <dsn/utility/flags.h>
 #include <dsn/utils/token_bucket_throttling_controller.h>
+#include <dsn/dist/replication/duplication_common.h>
 
 #include "base/pegasus_key_schema.h"
 #include "base/pegasus_value_schema.h"
@@ -1454,7 +1455,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
 
 void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); }
 
-::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
+dsn::error_code pegasus_server_impl::start(int argc, char **argv)
 {
     dassert_replica(!_is_open, "replica is already opened.");
     ddebug_replica("start to open app {}", data_dir());
@@ -1465,11 +1466,11 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
     if (argc > 0) {
         if ((argc - 1) % 2 != 0) {
             derror_replica("parse envs failed, invalid argc = {}", argc);
-            return ::dsn::ERR_INVALID_PARAMETERS;
+            return dsn::ERR_INVALID_PARAMETERS;
         }
         if (argv == nullptr) {
             derror_replica("parse envs failed, invalid argv = nullptr");
-            return ::dsn::ERR_INVALID_PARAMETERS;
+            return dsn::ERR_INVALID_PARAMETERS;
         }
         int idx = 1;
         while (idx < argc) {
@@ -1485,8 +1486,9 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
     //
     // here, we must distinguish three cases, such as:
     //  case 1: we open the db that already exist
-    //  case 2: we open a new db
-    //  case 3: we restore the db base on old data
+    //  case 2: we load duplication data base checkpoint from master
+    //  case 3: we open a new db
+    //  case 4: we restore the db base on old data
     //
     // if we want to restore the db base on old data, only all of the restore preconditions are
     // satisfied
@@ -1496,62 +1498,67 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
     //          3, restore_dir is exist
     //
     bool db_exist = true;
-    auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
-    if (::dsn::utils::filesystem::path_exists(path)) {
+    auto rdb_path = dsn::utils::filesystem::path_combine(data_dir(), "rdb");
+    auto duplication_path = duplication_dir();
+    if (dsn::utils::filesystem::path_exists(rdb_path)) {
         // only case 1
-        ddebug("%s: rdb is already exist, path = %s", replica_name(), path.c_str());
+        ddebug_replica("rdb is already exist, path = {}", rdb_path);
     } else {
-        std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
-        const std::string &restore_dir = restore_info.first;
-        bool force_restore = restore_info.second;
-        if (restore_dir.empty()) {
-            // case 2
-            if (force_restore) {
-                derror("%s: try to restore, but we can't combine restore_dir from envs",
-                       replica_name());
-                return ::dsn::ERR_FILE_OPERATION_FAILED;
-            } else {
-                db_exist = false;
-                dinfo("%s: open a new db, path = %s", replica_name(), path.c_str());
+        // case 2
+        if (dsn::utils::filesystem::path_exists(duplication_path) && is_duplication_follower()) {
+            if (!dsn::utils::filesystem::rename_path(duplication_path, rdb_path)) {
+                derror_replica(
+                    "load duplication data from {} to {} failed", duplication_path, rdb_path);
+                return dsn::ERR_FILE_OPERATION_FAILED;
             }
         } else {
-            // case 3
-            ddebug("%s: try to restore from restore_dir = %s", replica_name(), restore_dir.c_str());
-            if (::dsn::utils::filesystem::directory_exists(restore_dir)) {
-                // here, we just rename restore_dir to rdb, then continue the normal process
-                if (::dsn::utils::filesystem::rename_path(restore_dir.c_str(), path.c_str())) {
-                    ddebug("%s: rename restore_dir(%s) to rdb(%s) succeed",
-                           replica_name(),
-                           restore_dir.c_str(),
-                           path.c_str());
+            std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
+            const std::string &restore_dir = restore_info.first;
+            bool force_restore = restore_info.second;
+            if (restore_dir.empty()) {
+                // case 3
+                if (force_restore) {
+                    derror_replica("try to restore, but we can't combine restore_dir from envs");
+                    return dsn::ERR_FILE_OPERATION_FAILED;
                 } else {
-                    derror("%s: rename restore_dir(%s) to rdb(%s) failed",
-                           replica_name(),
-                           restore_dir.c_str(),
-                           path.c_str());
-                    return ::dsn::ERR_FILE_OPERATION_FAILED;
+                    db_exist = false;
+                    dinfo_replica("open a new db, path = {}", rdb_path);
                 }
             } else {
-                if (force_restore) {
-                    derror("%s: try to restore, but restore_dir isn't exist, restore_dir = %s",
-                           replica_name(),
-                           restore_dir.c_str());
-                    return ::dsn::ERR_FILE_OPERATION_FAILED;
+                // case 4
+                ddebug_replica("try to restore from restore_dir = {}", restore_dir);
+                if (dsn::utils::filesystem::directory_exists(restore_dir)) {
+                    // here, we just rename restore_dir to rdb, then continue the normal process
+                    if (dsn::utils::filesystem::rename_path(restore_dir, rdb_path)) {
+                        ddebug_replica(
+                            "rename restore_dir({}) to rdb({}) succeed", restore_dir, rdb_path);
+                    } else {
+                        derror_replica(
+                            "rename restore_dir({}) to rdb({}) failed", restore_dir, rdb_path);
+                        return dsn::ERR_FILE_OPERATION_FAILED;
+                    }
                 } else {
-                    db_exist = false;
-                    dwarn(
-                        "%s: try to restore and restore_dir(%s) isn't exist, but we don't force "
-                        "it, the role of this replica must not primary, so we open a new db on the "
-                        "path(%s)",
-                        replica_name(),
-                        restore_dir.c_str(),
-                        path.c_str());
+                    if (force_restore) {
+                        derror_replica(
+                            "try to restore, but restore_dir isn't exist, restore_dir = {}",
+                            restore_dir);
+                        return dsn::ERR_FILE_OPERATION_FAILED;
+                    } else {
+                        db_exist = false;
+                        dwarn_replica(
+                            "try to restore and restore_dir({}) isn't exist, but we don't force "
+                            "it, the role of this replica must not primary, so we open a new db on "
+                            "the "
+                            "path({})",
+                            restore_dir,
+                            rdb_path);
+                    }
                 }
             }
         }
     }
 
-    ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
+    ddebug_replica("start to open rocksDB's rdb({})", rdb_path);
 
     // Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
     // will be used elsewhere.
@@ -1561,9 +1568,9 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
         // When DB exists, meta CF and data CF must be present.
         bool missing_meta_cf = true;
         bool missing_data_cf = true;
-        if (check_column_families(path, &missing_meta_cf, &missing_data_cf) != ::dsn::ERR_OK) {
+        if (check_column_families(rdb_path, &missing_meta_cf, &missing_data_cf) != dsn::ERR_OK) {
             derror_replica("check column families failed");
-            return ::dsn::ERR_LOCAL_APP_FAILURE;
+            return dsn::ERR_LOCAL_APP_FAILURE;
         }
         dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
         dassert_replica(!missing_data_cf, "Missing data column family");
@@ -1573,7 +1580,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
         std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
         rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
         // Set `ignore_unknown_options` true for forward compatibility.
-        auto status = rocksdb::LoadLatestOptions(path,
+        auto status = rocksdb::LoadLatestOptions(rdb_path,
                                                  rocksdb::Env::Default(),
                                                  &loaded_db_opt,
                                                  &loaded_cf_descs,
@@ -1584,7 +1591,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
             if (status.code() != rocksdb::Status::kInvalidArgument ||
                 status.ToString().find("pegasus_data") == std::string::npos) {
                 derror_replica("load latest option file failed: {}.", status.ToString());
-                return ::dsn::ERR_LOCAL_APP_FAILURE;
+                return dsn::ERR_LOCAL_APP_FAILURE;
             }
             has_incompatible_db_options = true;
             dwarn_replica("The latest option file has incompatible db options: {}, use default "
@@ -1614,17 +1621,20 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
 
     std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
         {{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
-    auto s = rocksdb::CheckOptionsCompatibility(
-        path, rocksdb::Env::Default(), _db_opts, column_families, /*ignore_unknown_options=*/true);
+    auto s = rocksdb::CheckOptionsCompatibility(rdb_path,
+                                                rocksdb::Env::Default(),
+                                                _db_opts,
+                                                column_families,
+                                                /*ignore_unknown_options=*/true);
     if (!s.ok() && !s.IsNotFound() && !has_incompatible_db_options) {
         derror_replica("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString());
-        return ::dsn::ERR_LOCAL_APP_FAILURE;
+        return dsn::ERR_LOCAL_APP_FAILURE;
     }
     std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
-    auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db);
+    auto status = rocksdb::DB::Open(_db_opts, rdb_path, column_families, &handles_opened, &_db);
     if (!status.ok()) {
         derror_replica("rocksdb::DB::Open failed, error = {}", status.ToString());
-        return ::dsn::ERR_LOCAL_APP_FAILURE;
+        return dsn::ERR_LOCAL_APP_FAILURE;
     }
     dcheck_eq_replica(2, handles_opened.size());
     dcheck_eq_replica(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
@@ -1644,7 +1654,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
         if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
             derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
             release_db();
-            return ::dsn::ERR_LOCAL_APP_FAILURE;
+            return dsn::ERR_LOCAL_APP_FAILURE;
         }
 
         // update last manual compact finish timestamp
@@ -1674,7 +1684,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
             last_durable_decree(),
             last_flushed);
         auto err = async_checkpoint(false);
-        if (err != ::dsn::ERR_OK) {
+        if (err != dsn::ERR_OK) {
             derror_replica("create checkpoint failed, error = {}", err.to_string());
             release_db();
             return err;
@@ -1695,10 +1705,10 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
 
     dinfo_replica("start the update replica-level rocksdb statistics timer task");
     _update_replica_rdb_stat =
-        ::dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
-                                      &_tracker,
-                                      [this]() { this->update_replica_rocksdb_statistics(); },
-                                      _update_rdb_stat_interval);
+        dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
+                                    &_tracker,
+                                    [this]() { this->update_replica_rocksdb_statistics(); },
+                                    _update_rdb_stat_interval);
 
     // These counters are singletons on this server shared by all replicas, their metrics update
     // task should be scheduled once an interval on the server view.
@@ -1707,7 +1717,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
         // The timer task will always running even though there is no replicas
         dassert_f(kServerStatUpdateTimeSec.count() != 0,
                   "kServerStatUpdateTimeSec shouldn't be zero");
-        _update_server_rdb_stat = ::dsn::tasking::enqueue_timer(
+        _update_server_rdb_stat = dsn::tasking::enqueue_timer(
             LPC_REPLICATION_LONG_COMMON,
             nullptr, // TODO: the tracker is nullptr, we will fix it later
             []() { update_server_rocksdb_statistics(); },
@@ -1719,17 +1729,17 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
         this, _read_hotkey_collector, _write_hotkey_collector, _read_size_throttling_controller);
     _server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
 
-    ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
-                                  &_tracker,
-                                  [this]() { _read_hotkey_collector->analyse_data(); },
-                                  std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
+    dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
+                                &_tracker,
+                                [this]() { _read_hotkey_collector->analyse_data(); },
+                                std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
 
-    ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
-                                  &_tracker,
-                                  [this]() { _write_hotkey_collector->analyse_data(); },
-                                  std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
+    dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
+                                &_tracker,
+                                [this]() { _write_hotkey_collector->analyse_data(); },
+                                std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
 
-    return ::dsn::ERR_OK;
+    return dsn::ERR_OK;
 }
 
 void pegasus_server_impl::cancel_background_work(bool wait)
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index ec68275..4bd8558 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -58,6 +58,7 @@ set(MY_PROJ_LIBS
         PocoJSON
         pegasus_base
         gtest
+        gmock
         )
 add_definitions(-DPEGASUS_UNIT_TEST)
 add_definitions(-DENABLE_FAIL)
diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp
index 0f3fab7..16671e4 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -143,5 +143,29 @@ TEST_F(pegasus_server_impl_test, test_update_user_specified_compaction)
     _server->update_user_specified_compaction(envs);
     ASSERT_EQ(user_specified_compaction, _server->_user_specified_compaction);
 }
+
+TEST_F(pegasus_server_impl_test, test_load_from_duplication_data)
+{
+    auto origin_file = fmt::format("{}/{}", _server->duplication_dir(), "checkpoint");
+    dsn::utils::filesystem::create_directory(_server->duplication_dir());
+    dsn::utils::filesystem::create_file(origin_file);
+    ASSERT_TRUE(dsn::utils::filesystem::file_exists(origin_file));
+
+    EXPECT_CALL(*_server, is_duplication_follower()).WillRepeatedly(testing::Return(true));
+
+    auto tempFolder = "invalid";
+    dsn::utils::filesystem::rename_path(_server->data_dir(), tempFolder);
+    ASSERT_EQ(start(), dsn::ERR_FILE_OPERATION_FAILED);
+
+    dsn::utils::filesystem::rename_path(tempFolder, _server->data_dir());
+    auto rdb_path = fmt::format("{}/rdb/", _server->data_dir());
+    auto new_file = fmt::format("{}/{}", rdb_path, "checkpoint");
+    ASSERT_EQ(start(), dsn::ERR_LOCAL_APP_FAILURE);
+    ASSERT_TRUE(dsn::utils::filesystem::directory_exists(rdb_path));
+    ASSERT_FALSE(dsn::utils::filesystem::file_exists(origin_file));
+    ASSERT_TRUE(dsn::utils::filesystem::file_exists(new_file));
+    dsn::utils::filesystem::remove_file_name(new_file);
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/test/pegasus_server_test_base.h b/src/server/test/pegasus_server_test_base.h
index 54af092..d9ed1e3 100644
--- a/src/server/test/pegasus_server_test_base.h
+++ b/src/server/test/pegasus_server_test_base.h
@@ -22,12 +22,22 @@
 #include "server/pegasus_server_impl.h"
 
 #include <gtest/gtest.h>
+#include <gmock/gmock.h>
 #include <dsn/dist/replication/replica_test_utils.h>
 #include <dsn/utility/filesystem.h>
 
 namespace pegasus {
 namespace server {
 
+class mock_pegasus_server_impl : public pegasus_server_impl
+{
+public:
+    mock_pegasus_server_impl(dsn::replication::replica *r) : pegasus_server_impl(r) {}
+
+public:
+    MOCK_CONST_METHOD0(is_duplication_follower, bool());
+};
+
 class pegasus_server_test_base : public ::testing::Test
 {
 public:
@@ -41,10 +51,10 @@ public:
         dsn::app_info app_info;
         app_info.app_type = "pegasus";
 
-        _replica =
-            dsn::replication::create_test_replica(_replica_stub, _gpid, app_info, "./", false);
+        _replica = dsn::replication::create_test_replica(
+            _replica_stub, _gpid, app_info, "./", false, false);
 
-        _server = dsn::make_unique<pegasus_server_impl>(_replica);
+        _server = dsn::make_unique<mock_pegasus_server_impl>(_replica);
     }
 
     dsn::error_code start(const std::map<std::string, std::string> &envs = {})
@@ -72,7 +82,7 @@ public:
     }
 
 protected:
-    std::unique_ptr<pegasus_server_impl> _server;
+    std::unique_ptr<mock_pegasus_server_impl> _server;
     dsn::replication::replica *_replica;
     dsn::replication::replica_stub *_replica_stub;
     dsn::gpid _gpid;
diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp
index 313f522..7c21d0f 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -62,9 +62,9 @@ public:
         dsn::app_info app_info;
         app_info.app_type = "pegasus";
         app_info.duplicating = true;
-        _replica =
-            dsn::replication::create_test_replica(_replica_stub, _gpid, app_info, "./", false);
-        _server = dsn::make_unique<pegasus_server_impl>(_replica);
+        _replica = dsn::replication::create_test_replica(
+            _replica_stub, _gpid, app_info, "./", false, false);
+        _server = dsn::make_unique<mock_pegasus_server_impl>(_replica);
 
         SetUp();
     }
diff --git a/src/shell/commands/duplication.cpp b/src/shell/commands/duplication.cpp
index 90b1781..3119be4 100644
--- a/src/shell/commands/duplication.cpp
+++ b/src/shell/commands/duplication.cpp
@@ -205,7 +205,7 @@ bool change_dup_status(command_executor *e,
 
     std::string operation;
     switch (status) {
-    case duplication_status::DS_START:
+    case duplication_status::DS_LOG:
         operation = "starting duplication";
         break;
     case duplication_status::DS_PAUSE:
@@ -215,7 +215,7 @@ bool change_dup_status(command_executor *e,
         operation = "removing duplication";
         break;
     default:
-        dfatal("unexpected duplication status %d", status);
+        dfatal("can't change duplication under status %d", status);
     }
 
     auto err_resp = sc->ddl_client->change_dup_status(app_name, dup_id, status);
@@ -231,7 +231,7 @@ bool remove_dup(command_executor *e, shell_context *sc, arguments args)
 
 bool start_dup(command_executor *e, shell_context *sc, arguments args)
 {
-    return change_dup_status(e, sc, args, duplication_status::DS_START);
+    return change_dup_status(e, sc, args, duplication_status::DS_LOG);
 }
 
 bool pause_dup(command_executor *e, shell_context *sc, arguments args)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org