You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by zh...@apache.org on 2022/03/04 05:18:33 UTC
[incubator-pegasus] branch duplication_dev updated: feat(dup_enhancement#17): replica follower load duplication data when open replica (#917)
This is an automated email from the ASF dual-hosted git repository.
zhaoliwei pushed a commit to branch duplication_dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/duplication_dev by this push:
new ebbc61c feat(dup_enhancement#17): replica follower load duplication data when open replica (#917)
ebbc61c is described below
commit ebbc61c3e16b54826f2e023aea4dbb346555344f
Author: Jiashuo <js...@live.com>
AuthorDate: Fri Mar 4 13:16:14 2022 +0800
feat(dup_enhancement#17): replica follower load duplication data when open replica (#917)
---
.github/workflows/ci-pull-request.yaml | 1 +
rdsn | 2 +-
src/server/pegasus_server_impl.cpp | 156 ++++++++++++++-------------
src/server/test/CMakeLists.txt | 1 +
src/server/test/pegasus_server_impl_test.cpp | 24 +++++
src/server/test/pegasus_server_test_base.h | 18 +++-
src/server/test/rocksdb_wrapper_test.cpp | 6 +-
src/shell/commands/duplication.cpp | 6 +-
8 files changed, 130 insertions(+), 84 deletions(-)
diff --git a/.github/workflows/ci-pull-request.yaml b/.github/workflows/ci-pull-request.yaml
index 544640a..2d43c68 100644
--- a/.github/workflows/ci-pull-request.yaml
+++ b/.github/workflows/ci-pull-request.yaml
@@ -14,6 +14,7 @@ on:
- master
- 'v[0-9]+.*' # release branch
- ci-test # testing branch for github action
+ - '*dev'
# for manually triggering workflow
workflow_dispatch:
diff --git a/rdsn b/rdsn
index 8c310b9..03f7b61 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 8c310b95f8cf0a2123da5eed7e6621b6f94f30e0
+Subproject commit 03f7b613d29c611844af831b9fcb2016620b9977
diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp
index 4221aea..019854b 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -32,6 +32,7 @@
#include <dsn/dist/replication/replication.codes.h>
#include <dsn/utility/flags.h>
#include <dsn/utils/token_bucket_throttling_controller.h>
+#include <dsn/dist/replication/duplication_common.h>
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
@@ -1454,7 +1455,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); }
-::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
+dsn::error_code pegasus_server_impl::start(int argc, char **argv)
{
dassert_replica(!_is_open, "replica is already opened.");
ddebug_replica("start to open app {}", data_dir());
@@ -1465,11 +1466,11 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
if (argc > 0) {
if ((argc - 1) % 2 != 0) {
derror_replica("parse envs failed, invalid argc = {}", argc);
- return ::dsn::ERR_INVALID_PARAMETERS;
+ return dsn::ERR_INVALID_PARAMETERS;
}
if (argv == nullptr) {
derror_replica("parse envs failed, invalid argv = nullptr");
- return ::dsn::ERR_INVALID_PARAMETERS;
+ return dsn::ERR_INVALID_PARAMETERS;
}
int idx = 1;
while (idx < argc) {
@@ -1485,8 +1486,9 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
//
// here, we must distinguish three cases, such as:
// case 1: we open the db that already exist
- // case 2: we open a new db
- // case 3: we restore the db base on old data
+ // case 2: we load duplication data base checkpoint from master
+ // case 3: we open a new db
+ // case 4: we restore the db base on old data
//
// if we want to restore the db base on old data, only all of the restore preconditions are
// satisfied
@@ -1496,62 +1498,67 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
// 3, restore_dir is exist
//
bool db_exist = true;
- auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb");
- if (::dsn::utils::filesystem::path_exists(path)) {
+ auto rdb_path = dsn::utils::filesystem::path_combine(data_dir(), "rdb");
+ auto duplication_path = duplication_dir();
+ if (dsn::utils::filesystem::path_exists(rdb_path)) {
// only case 1
- ddebug("%s: rdb is already exist, path = %s", replica_name(), path.c_str());
+ ddebug_replica("rdb is already exist, path = {}", rdb_path);
} else {
- std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
- const std::string &restore_dir = restore_info.first;
- bool force_restore = restore_info.second;
- if (restore_dir.empty()) {
- // case 2
- if (force_restore) {
- derror("%s: try to restore, but we can't combine restore_dir from envs",
- replica_name());
- return ::dsn::ERR_FILE_OPERATION_FAILED;
- } else {
- db_exist = false;
- dinfo("%s: open a new db, path = %s", replica_name(), path.c_str());
+ // case 2
+ if (dsn::utils::filesystem::path_exists(duplication_path) && is_duplication_follower()) {
+ if (!dsn::utils::filesystem::rename_path(duplication_path, rdb_path)) {
+ derror_replica(
+ "load duplication data from {} to {} failed", duplication_path, rdb_path);
+ return dsn::ERR_FILE_OPERATION_FAILED;
}
} else {
- // case 3
- ddebug("%s: try to restore from restore_dir = %s", replica_name(), restore_dir.c_str());
- if (::dsn::utils::filesystem::directory_exists(restore_dir)) {
- // here, we just rename restore_dir to rdb, then continue the normal process
- if (::dsn::utils::filesystem::rename_path(restore_dir.c_str(), path.c_str())) {
- ddebug("%s: rename restore_dir(%s) to rdb(%s) succeed",
- replica_name(),
- restore_dir.c_str(),
- path.c_str());
+ std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs);
+ const std::string &restore_dir = restore_info.first;
+ bool force_restore = restore_info.second;
+ if (restore_dir.empty()) {
+ // case 3
+ if (force_restore) {
+ derror_replica("try to restore, but we can't combine restore_dir from envs");
+ return dsn::ERR_FILE_OPERATION_FAILED;
} else {
- derror("%s: rename restore_dir(%s) to rdb(%s) failed",
- replica_name(),
- restore_dir.c_str(),
- path.c_str());
- return ::dsn::ERR_FILE_OPERATION_FAILED;
+ db_exist = false;
+ dinfo_replica("open a new db, path = {}", rdb_path);
}
} else {
- if (force_restore) {
- derror("%s: try to restore, but restore_dir isn't exist, restore_dir = %s",
- replica_name(),
- restore_dir.c_str());
- return ::dsn::ERR_FILE_OPERATION_FAILED;
+ // case 4
+ ddebug_replica("try to restore from restore_dir = {}", restore_dir);
+ if (dsn::utils::filesystem::directory_exists(restore_dir)) {
+ // here, we just rename restore_dir to rdb, then continue the normal process
+ if (dsn::utils::filesystem::rename_path(restore_dir, rdb_path)) {
+ ddebug_replica(
+ "rename restore_dir({}) to rdb({}) succeed", restore_dir, rdb_path);
+ } else {
+ derror_replica(
+ "rename restore_dir({}) to rdb({}) failed", restore_dir, rdb_path);
+ return dsn::ERR_FILE_OPERATION_FAILED;
+ }
} else {
- db_exist = false;
- dwarn(
- "%s: try to restore and restore_dir(%s) isn't exist, but we don't force "
- "it, the role of this replica must not primary, so we open a new db on the "
- "path(%s)",
- replica_name(),
- restore_dir.c_str(),
- path.c_str());
+ if (force_restore) {
+ derror_replica(
+ "try to restore, but restore_dir isn't exist, restore_dir = {}",
+ restore_dir);
+ return dsn::ERR_FILE_OPERATION_FAILED;
+ } else {
+ db_exist = false;
+ dwarn_replica(
+ "try to restore and restore_dir({}) isn't exist, but we don't force "
+ "it, the role of this replica must not primary, so we open a new db on "
+ "the "
+ "path({})",
+ restore_dir,
+ rdb_path);
+ }
}
}
}
}
- ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str());
+ ddebug_replica("start to open rocksDB's rdb({})", rdb_path);
// Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// will be used elsewhere.
@@ -1561,9 +1568,9 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
// When DB exists, meta CF and data CF must be present.
bool missing_meta_cf = true;
bool missing_data_cf = true;
- if (check_column_families(path, &missing_meta_cf, &missing_data_cf) != ::dsn::ERR_OK) {
+ if (check_column_families(rdb_path, &missing_meta_cf, &missing_data_cf) != dsn::ERR_OK) {
derror_replica("check column families failed");
- return ::dsn::ERR_LOCAL_APP_FAILURE;
+ return dsn::ERR_LOCAL_APP_FAILURE;
}
dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
dassert_replica(!missing_data_cf, "Missing data column family");
@@ -1573,7 +1580,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
// Set `ignore_unknown_options` true for forward compatibility.
- auto status = rocksdb::LoadLatestOptions(path,
+ auto status = rocksdb::LoadLatestOptions(rdb_path,
rocksdb::Env::Default(),
&loaded_db_opt,
&loaded_cf_descs,
@@ -1584,7 +1591,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
if (status.code() != rocksdb::Status::kInvalidArgument ||
status.ToString().find("pegasus_data") == std::string::npos) {
derror_replica("load latest option file failed: {}.", status.ToString());
- return ::dsn::ERR_LOCAL_APP_FAILURE;
+ return dsn::ERR_LOCAL_APP_FAILURE;
}
has_incompatible_db_options = true;
dwarn_replica("The latest option file has incompatible db options: {}, use default "
@@ -1614,17 +1621,20 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
- auto s = rocksdb::CheckOptionsCompatibility(
- path, rocksdb::Env::Default(), _db_opts, column_families, /*ignore_unknown_options=*/true);
+ auto s = rocksdb::CheckOptionsCompatibility(rdb_path,
+ rocksdb::Env::Default(),
+ _db_opts,
+ column_families,
+ /*ignore_unknown_options=*/true);
if (!s.ok() && !s.IsNotFound() && !has_incompatible_db_options) {
derror_replica("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString());
- return ::dsn::ERR_LOCAL_APP_FAILURE;
+ return dsn::ERR_LOCAL_APP_FAILURE;
}
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
- auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db);
+ auto status = rocksdb::DB::Open(_db_opts, rdb_path, column_families, &handles_opened, &_db);
if (!status.ok()) {
derror_replica("rocksdb::DB::Open failed, error = {}", status.ToString());
- return ::dsn::ERR_LOCAL_APP_FAILURE;
+ return dsn::ERR_LOCAL_APP_FAILURE;
}
dcheck_eq_replica(2, handles_opened.size());
dcheck_eq_replica(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
@@ -1644,7 +1654,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
release_db();
- return ::dsn::ERR_LOCAL_APP_FAILURE;
+ return dsn::ERR_LOCAL_APP_FAILURE;
}
// update last manual compact finish timestamp
@@ -1674,7 +1684,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
last_durable_decree(),
last_flushed);
auto err = async_checkpoint(false);
- if (err != ::dsn::ERR_OK) {
+ if (err != dsn::ERR_OK) {
derror_replica("create checkpoint failed, error = {}", err.to_string());
release_db();
return err;
@@ -1695,10 +1705,10 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
dinfo_replica("start the update replica-level rocksdb statistics timer task");
_update_replica_rdb_stat =
- ::dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
- &_tracker,
- [this]() { this->update_replica_rocksdb_statistics(); },
- _update_rdb_stat_interval);
+ dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON,
+ &_tracker,
+ [this]() { this->update_replica_rocksdb_statistics(); },
+ _update_rdb_stat_interval);
// These counters are singletons on this server shared by all replicas, their metrics update
// task should be scheduled once an interval on the server view.
@@ -1707,7 +1717,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
// The timer task will always running even though there is no replicas
dassert_f(kServerStatUpdateTimeSec.count() != 0,
"kServerStatUpdateTimeSec shouldn't be zero");
- _update_server_rdb_stat = ::dsn::tasking::enqueue_timer(
+ _update_server_rdb_stat = dsn::tasking::enqueue_timer(
LPC_REPLICATION_LONG_COMMON,
nullptr, // TODO: the tracker is nullptr, we will fix it later
[]() { update_server_rocksdb_statistics(); },
@@ -1719,17 +1729,17 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache
this, _read_hotkey_collector, _write_hotkey_collector, _read_size_throttling_controller);
_server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);
- ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
- &_tracker,
- [this]() { _read_hotkey_collector->analyse_data(); },
- std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
+ dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
+ &_tracker,
+ [this]() { _read_hotkey_collector->analyse_data(); },
+ std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
- ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
- &_tracker,
- [this]() { _write_hotkey_collector->analyse_data(); },
- std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
+ dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY,
+ &_tracker,
+ [this]() { _write_hotkey_collector->analyse_data(); },
+ std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s));
- return ::dsn::ERR_OK;
+ return dsn::ERR_OK;
}
void pegasus_server_impl::cancel_background_work(bool wait)
diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt
index ec68275..4bd8558 100644
--- a/src/server/test/CMakeLists.txt
+++ b/src/server/test/CMakeLists.txt
@@ -58,6 +58,7 @@ set(MY_PROJ_LIBS
PocoJSON
pegasus_base
gtest
+ gmock
)
add_definitions(-DPEGASUS_UNIT_TEST)
add_definitions(-DENABLE_FAIL)
diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp
index 0f3fab7..16671e4 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -143,5 +143,29 @@ TEST_F(pegasus_server_impl_test, test_update_user_specified_compaction)
_server->update_user_specified_compaction(envs);
ASSERT_EQ(user_specified_compaction, _server->_user_specified_compaction);
}
+
+TEST_F(pegasus_server_impl_test, test_load_from_duplication_data)
+{
+ auto origin_file = fmt::format("{}/{}", _server->duplication_dir(), "checkpoint");
+ dsn::utils::filesystem::create_directory(_server->duplication_dir());
+ dsn::utils::filesystem::create_file(origin_file);
+ ASSERT_TRUE(dsn::utils::filesystem::file_exists(origin_file));
+
+ EXPECT_CALL(*_server, is_duplication_follower()).WillRepeatedly(testing::Return(true));
+
+ auto tempFolder = "invalid";
+ dsn::utils::filesystem::rename_path(_server->data_dir(), tempFolder);
+ ASSERT_EQ(start(), dsn::ERR_FILE_OPERATION_FAILED);
+
+ dsn::utils::filesystem::rename_path(tempFolder, _server->data_dir());
+ auto rdb_path = fmt::format("{}/rdb/", _server->data_dir());
+ auto new_file = fmt::format("{}/{}", rdb_path, "checkpoint");
+ ASSERT_EQ(start(), dsn::ERR_LOCAL_APP_FAILURE);
+ ASSERT_TRUE(dsn::utils::filesystem::directory_exists(rdb_path));
+ ASSERT_FALSE(dsn::utils::filesystem::file_exists(origin_file));
+ ASSERT_TRUE(dsn::utils::filesystem::file_exists(new_file));
+ dsn::utils::filesystem::remove_file_name(new_file);
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/test/pegasus_server_test_base.h b/src/server/test/pegasus_server_test_base.h
index 54af092..d9ed1e3 100644
--- a/src/server/test/pegasus_server_test_base.h
+++ b/src/server/test/pegasus_server_test_base.h
@@ -22,12 +22,22 @@
#include "server/pegasus_server_impl.h"
#include <gtest/gtest.h>
+#include <gmock/gmock.h>
#include <dsn/dist/replication/replica_test_utils.h>
#include <dsn/utility/filesystem.h>
namespace pegasus {
namespace server {
+class mock_pegasus_server_impl : public pegasus_server_impl
+{
+public:
+ mock_pegasus_server_impl(dsn::replication::replica *r) : pegasus_server_impl(r) {}
+
+public:
+ MOCK_CONST_METHOD0(is_duplication_follower, bool());
+};
+
class pegasus_server_test_base : public ::testing::Test
{
public:
@@ -41,10 +51,10 @@ public:
dsn::app_info app_info;
app_info.app_type = "pegasus";
- _replica =
- dsn::replication::create_test_replica(_replica_stub, _gpid, app_info, "./", false);
+ _replica = dsn::replication::create_test_replica(
+ _replica_stub, _gpid, app_info, "./", false, false);
- _server = dsn::make_unique<pegasus_server_impl>(_replica);
+ _server = dsn::make_unique<mock_pegasus_server_impl>(_replica);
}
dsn::error_code start(const std::map<std::string, std::string> &envs = {})
@@ -72,7 +82,7 @@ public:
}
protected:
- std::unique_ptr<pegasus_server_impl> _server;
+ std::unique_ptr<mock_pegasus_server_impl> _server;
dsn::replication::replica *_replica;
dsn::replication::replica_stub *_replica_stub;
dsn::gpid _gpid;
diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp
index 313f522..7c21d0f 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -62,9 +62,9 @@ public:
dsn::app_info app_info;
app_info.app_type = "pegasus";
app_info.duplicating = true;
- _replica =
- dsn::replication::create_test_replica(_replica_stub, _gpid, app_info, "./", false);
- _server = dsn::make_unique<pegasus_server_impl>(_replica);
+ _replica = dsn::replication::create_test_replica(
+ _replica_stub, _gpid, app_info, "./", false, false);
+ _server = dsn::make_unique<mock_pegasus_server_impl>(_replica);
SetUp();
}
diff --git a/src/shell/commands/duplication.cpp b/src/shell/commands/duplication.cpp
index 90b1781..3119be4 100644
--- a/src/shell/commands/duplication.cpp
+++ b/src/shell/commands/duplication.cpp
@@ -205,7 +205,7 @@ bool change_dup_status(command_executor *e,
std::string operation;
switch (status) {
- case duplication_status::DS_START:
+ case duplication_status::DS_LOG:
operation = "starting duplication";
break;
case duplication_status::DS_PAUSE:
@@ -215,7 +215,7 @@ bool change_dup_status(command_executor *e,
operation = "removing duplication";
break;
default:
- dfatal("unexpected duplication status %d", status);
+ dfatal("can't change duplication under status %d", status);
}
auto err_resp = sc->ddl_client->change_dup_status(app_name, dup_id, status);
@@ -231,7 +231,7 @@ bool remove_dup(command_executor *e, shell_context *sc, arguments args)
bool start_dup(command_executor *e, shell_context *sc, arguments args)
{
- return change_dup_status(e, sc, args, duplication_status::DS_START);
+ return change_dup_status(e, sc, args, duplication_status::DS_LOG);
}
bool pause_dup(command_executor *e, shell_context *sc, arguments args)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org