You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/05/31 02:46:10 UTC
[incubator-pegasus] branch master updated: refactor: update replica's dir_node status (part2) (#1489)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new e4350d4c7 refactor: update replica's dir_node status (part2) (#1489)
e4350d4c7 is described below
commit e4350d4c74c1f2c57cd58eb71ac2202c688809be
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Wed May 31 10:46:03 2023 +0800
refactor: update replica's dir_node status (part2) (#1489)
https://github.com/apache/incubator-pegasus/issues/1383
This patch removes the duplicated _disk_tag and _disk_status of the dir_node where
it is placed on, instead, introduce a dir_node pointer for replica. So once the
status of the dir_node updated, we can judge the replica's status more conveniently.
Some unit tests have been updated as well, including:
- change the test directory from `./` to `test_dir`
- simplify the logic of replica_disk_test related test
---
src/aio/native_linux_aio_provider.cpp | 2 +-
src/common/fs_manager.cpp | 37 +++++---
src/common/fs_manager.h | 9 +-
src/replica/duplication/replica_follower.cpp | 3 +-
.../test/load_from_private_log_test.cpp | 5 +-
src/replica/replica.cpp | 15 +--
src/replica/replica.h | 15 +--
src/replica/replica_2pc.cpp | 4 +-
src/replica/replica_check.cpp | 3 +-
src/replica/replica_learn.cpp | 5 +-
src/replica/replica_stub.cpp | 101 +++++++++++----------
src/replica/replica_stub.h | 7 +-
src/replica/test/mock_utils.h | 80 +++++++++-------
src/replica/test/replica_disk_migrate_test.cpp | 2 +-
src/replica/test/replica_disk_test.cpp | 40 ++++----
src/replica/test/replica_disk_test_base.h | 63 ++++---------
src/replica/test/replica_learn_test.cpp | 15 ++-
src/replica/test/replica_test.cpp | 57 ++++++++----
src/server/test/hotkey_collector_test.cpp | 3 +-
src/server/test/pegasus_server_test_base.h | 15 ++-
src/server/test/rocksdb_wrapper_test.cpp | 8 +-
src/utils/test_macros.h | 6 ++
22 files changed, 268 insertions(+), 227 deletions(-)
diff --git a/src/aio/native_linux_aio_provider.cpp b/src/aio/native_linux_aio_provider.cpp
index 55e8961d2..470f3d63a 100644
--- a/src/aio/native_linux_aio_provider.cpp
+++ b/src/aio/native_linux_aio_provider.cpp
@@ -52,7 +52,7 @@ linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag, int
{
auto fd = ::open(file_name, flag, pmode);
if (fd == DSN_INVALID_FILE_HANDLE) {
- LOG_ERROR("create file failed, err = {}", utils::safe_strerror(errno));
+ LOG_ERROR("create file '{}' failed, err = {}", file_name, utils::safe_strerror(errno));
}
return linux_fd_t(fd);
}
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 653b0c138..a33d20e54 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -235,17 +235,6 @@ void fs_manager::initialize(const std::vector<std::string> &data_dirs,
update_disk_stat();
}
-dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &tag)
-{
- dir_node *n = get_dir_node(dir);
- if (nullptr == n) {
- return dsn::ERR_OBJECT_NOT_FOUND;
- } else {
- tag = n->tag;
- return dsn::ERR_OK;
- }
-}
-
void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
{
const auto &dn = get_dir_node(pid_dir);
@@ -306,6 +295,24 @@ dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
return selected;
}
+void fs_manager::specify_dir_for_new_replica_for_test(dir_node *specified_dn,
+ dsn::string_view app_type,
+ const dsn::gpid &pid) const
+{
+ bool dn_found = false;
+ zauto_write_lock l(_lock);
+ for (const auto &dn : _dir_nodes) {
+ CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
+ if (dn.get() == specified_dn) {
+ dn_found = true;
+ }
+ }
+ CHECK(dn_found, "dir_node({}) is not exist", specified_dn->tag);
+ const auto dir = specified_dn->replica_dir(app_type, pid);
+ CHECK_TRUE(dsn::utils::filesystem::create_directory(dir));
+ specified_dn->holding_replicas[pid.get_app_id()].emplace(pid);
+}
+
void fs_manager::remove_replica(const gpid &pid)
{
zauto_write_lock l(_lock);
@@ -412,9 +419,17 @@ dir_node *fs_manager::create_replica_dir_if_necessary(dsn::string_view app_type,
// Try to find the replica directory.
auto replica_dn = find_replica_dir(app_type, pid);
if (replica_dn != nullptr) {
+ // TODO(yingchun): enable this check after unit tests are refactored and fixed.
+ // CHECK(replica_dn->has(pid),
+ // "replica({})'s directory({}) exists but not in management",
+ // pid,
+ // replica_dn->tag);
return replica_dn;
}
+ // TODO(yingchun): enable this check after unit tests are refactored and fixed.
+ // CHECK(0 == replica_dn->holding_replicas.count(pid.get_app_id()) ||
+ // 0 == replica_dn->holding_replicas[pid.get_app_id()].count(pid), "");
// Find a dir_node for the new replica.
replica_dn = find_best_dir_for_new_replica(pid);
if (replica_dn == nullptr) {
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 3c67d626a..be19d79b6 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -29,7 +29,6 @@
#include "common/replication_other_types.h"
#include "metadata_types.h"
#include "perf_counter/perf_counter_wrapper.h"
-#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/string_view.h"
#include "utils/zlocks.h"
@@ -96,7 +95,13 @@ public:
// TODO(yingchun): consider the disk capacity and available space.
// NOTE: the 'pid' must not exist in any dir_nodes.
dir_node *find_best_dir_for_new_replica(const dsn::gpid &pid) const;
- dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
+ // Similar to the above, but will specify the dir_node for the new replica.
+ // NOTE: the 'pid' must not exist in any dir_nodes and the 'specified_dn' must be in the
+ // dir_nodes.
+ // NOTE: only used in test.
+ void specify_dir_for_new_replica_for_test(dir_node *specified_dn,
+ dsn::string_view app_type,
+ const dsn::gpid &pid) const;
void add_replica(const dsn::gpid &pid, const std::string &pid_dir);
// Find the replica instance directory.
dir_node *find_replica_dir(dsn::string_view app_type, gpid pid);
diff --git a/src/replica/duplication/replica_follower.cpp b/src/replica/duplication/replica_follower.cpp
index b38503d3b..f9c2a996d 100644
--- a/src/replica/duplication/replica_follower.cpp
+++ b/src/replica/duplication/replica_follower.cpp
@@ -27,6 +27,7 @@
#include <utility>
#include "common/duplication_common.h"
+#include "common/fs_manager.h"
#include "common/replication.codes.h"
#include "consensus_types.h"
#include "nfs/nfs_node.h"
@@ -236,7 +237,7 @@ void replica_follower::nfs_copy_remote_files(const rpc_address &remote_node,
request->source_disk_tag = remote_disk;
request->source_dir = remote_dir;
request->files = file_list;
- request->dest_disk_tag = _replica->get_replica_disk_tag();
+ request->dest_disk_tag = _replica->get_dir_node()->tag;
request->dest_dir = dest_dir;
request->overwrite = true;
request->high_priority = false;
diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp b/src/replica/duplication/test/load_from_private_log_test.cpp
index a9e7edaed..342669f63 100644
--- a/src/replica/duplication/test/load_from_private_log_test.cpp
+++ b/src/replica/duplication/test/load_from_private_log_test.cpp
@@ -58,6 +58,7 @@
#include <iterator>
#include <map>
#include <memory>
+#include <ostream>
#include <string>
#include <utility>
#include <vector>
@@ -326,13 +327,15 @@ TEST_F(load_from_private_log_test, handle_real_private_log)
// Update '_log_dir' to the corresponding replica created above.
_log_dir = _replica->dir();
+ ASSERT_TRUE(utils::filesystem::path_exists(_log_dir)) << _log_dir;
// Copy the log file to '_log_dir'
boost::filesystem::path file(tt.fname);
+ ASSERT_TRUE(dsn::utils::filesystem::file_exists(tt.fname)) << tt.fname;
boost::system::error_code ec;
boost::filesystem::copy_file(
file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists, ec);
- ASSERT_TRUE(!ec);
+ ASSERT_TRUE(!ec) << ec.value() << ", " << ec.category().name() << ", " << ec.message();
// Start to verify.
load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1, 0);
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index bcaa762df..d2f9ccefb 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -100,7 +100,7 @@ const std::string replica::kAppInfo = ".app-info";
replica::replica(replica_stub *stub,
gpid gpid,
const app_info &app,
- const char *dir,
+ dir_node *dn,
bool need_restore,
bool is_duplication_follower)
: serverlet<replica>("replica"),
@@ -124,7 +124,9 @@ replica::replica(replica_stub *stub,
CHECK(!_app_info.app_type.empty(), "");
CHECK_NOTNULL(stub, "");
_stub = stub;
- _dir = dir;
+ CHECK_NOTNULL(dn, "");
+ _dir_node = dn;
+ _dir = dn->replica_dir(_app_info.app_type, gpid);
_options = &stub->options();
init_state();
_config.pid = gpid;
@@ -232,7 +234,6 @@ void replica::init_state()
_last_config_change_time_ms = _create_time_ms;
update_last_checkpoint_generate_time();
_private_log = nullptr;
- init_disk_tag();
get_bool_envs(_app_info.envs, replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND, _allow_ingest_behind);
}
@@ -590,14 +591,6 @@ uint32_t replica::query_data_version() const
return _app->query_data_version();
}
-void replica::init_disk_tag()
-{
- dsn::error_code err = _stub->_fs_manager.get_disk_tag(dir(), _disk_tag);
- if (dsn::ERR_OK != err) {
- LOG_ERROR_PREFIX("get disk tag of {} failed: {}, init it to empty ", dir(), err);
- }
-}
-
error_code replica::store_app_info(app_info &info, const std::string &path)
{
replica_app_info new_info((app_info *)&info);
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 612830c70..3c1f1068b 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -120,6 +120,7 @@ class replica_split_manager;
class replica_stub;
class replication_app_base;
class replication_options;
+struct dir_node;
typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
@@ -291,11 +292,7 @@ public:
// routine for get extra envs from replica
const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }
-
- void set_disk_status(disk_status::type status) { _disk_status = status; }
- bool disk_space_insufficient() { return _disk_status == disk_status::SPACE_INSUFFICIENT; }
- disk_status::type get_disk_status() { return _disk_status; }
- std::string get_replica_disk_tag() const { return _disk_tag; }
+ const dir_node *get_dir_node() const { return _dir_node; }
static const std::string kAppInfo;
@@ -315,7 +312,7 @@ private:
replica(replica_stub *stub,
gpid gpid,
const app_info &app,
- const char *dir,
+ dir_node *dn,
bool need_restore,
bool is_duplication_follower = false);
error_code initialize_on_new();
@@ -523,8 +520,6 @@ private:
// update envs to deny client request
void update_deny_client(const std::map<std::string, std::string> &envs);
- void init_disk_tag();
-
// store `info` into a file under `path` directory
// path = "" means using the default directory (`_dir`/.app_info)
error_code store_app_info(app_info &info, const std::string &path = "");
@@ -582,7 +577,6 @@ private:
// constants
replica_stub *_stub;
std::string _dir;
- std::string _disk_tag;
replication_options *_options;
app_info _app_info;
std::map<std::string, std::string> _extra_envs;
@@ -678,7 +672,8 @@ private:
std::unique_ptr<security::access_controller> _access_controller;
- disk_status::type _disk_status{disk_status::NORMAL};
+ // The dir_node where the replica data is placed.
+ dir_node *_dir_node{nullptr};
bool _allow_ingest_behind{false};
// Indicate where the storage engine data is corrupted and unrecoverable.
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index 0fd3b9851..be1328b73 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -37,6 +37,7 @@
#include "bulk_load/replica_bulk_loader.h"
#include "bulk_load_types.h"
+#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_common.h"
@@ -178,7 +179,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
}
if (FLAGS_reject_write_when_disk_insufficient &&
- (disk_space_insufficient() || _primary_states.secondary_disk_space_insufficient())) {
+ (_dir_node->status == disk_status::SPACE_INSUFFICIENT ||
+ _primary_states.secondary_disk_space_insufficient())) {
response_client_write(request, ERR_DISK_INSUFFICIENT);
return;
}
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index 8cd0325e2..e978a91db 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -38,6 +38,7 @@
#include <unordered_map>
#include <utility>
+#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_common.h"
@@ -210,7 +211,7 @@ void replica::on_group_check(const group_check_request &request,
}
// the group check may trigger start/finish/cancel/pause a split on the secondary.
_split_mgr->trigger_secondary_parent_split(request, response);
- response.__set_disk_status(_disk_status);
+ response.__set_disk_status(_dir_node->status);
break;
case partition_status::PS_POTENTIAL_SECONDARY:
init_learn(request.config.learner_signature);
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 589e3e28d..f65f77519 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -47,6 +47,7 @@
#include <vector>
#include "aio/aio_task.h"
+#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_enums.h"
@@ -543,7 +544,7 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
err);
} else {
response.base_local_dir = _app->data_dir();
- response.__set_replica_disk_tag(get_replica_disk_tag());
+ response.__set_replica_disk_tag(_dir_node->tag);
LOG_INFO_PREFIX(
"on_learn[{:#018x}]: learner = {}, get app learn state succeed, "
"learned_meta_size = {}, learned_file_count = {}, learned_to_decree = {}",
@@ -910,7 +911,7 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response
resp.replica_disk_tag,
resp.base_local_dir,
resp.state.files,
- get_replica_disk_tag(),
+ _dir_node->tag,
learn_dir,
get_gpid(),
true, // overwrite
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 5f6016ff5..54b2879e9 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -616,34 +616,37 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
// Start to load replicas in available data directories.
LOG_INFO("start to load replicas");
-
- std::vector<std::string> dir_list;
+ std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
for (const auto &dn : _fs_manager.get_dir_nodes()) {
- std::vector<std::string> tmp_list;
- CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false),
- "Fail to get subdirectories in {}.",
+ std::vector<std::string> sub_directories;
+ CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_directories, false),
+ "fail to get sub_directories in {}",
dn->full_dir);
- dir_list.insert(dir_list.end(), tmp_list.begin(), tmp_list.end());
+ dirs_by_dn.emplace(dn.get(), sub_directories);
}
replicas rps;
utils::ex_lock rps_lock;
std::deque<task_ptr> load_tasks;
uint64_t start_time = dsn_now_ms();
- for (auto &dir : dir_list) {
- if (dsn::replication::is_data_dir_invalid(dir)) {
- LOG_INFO("ignore dir {}", dir);
- continue;
- }
+ for (const auto &dn_dirs : dirs_by_dn) {
+ const auto dn = dn_dirs.first;
+ for (const auto &dir : dn_dirs.second) {
+ if (dsn::replication::is_data_dir_invalid(dir)) {
+ LOG_WARNING("ignore dir {}", dir);
+ continue;
+ }
- load_tasks.push_back(
- tasking::create_task(LPC_REPLICATION_INIT_LOAD,
- &_tracker,
- [this, dir, &rps, &rps_lock] {
- LOG_INFO("process dir {}", dir);
+ load_tasks.push_back(
+ tasking::create_task(LPC_REPLICATION_INIT_LOAD,
+ &_tracker,
+ [this, dn, dir, &rps, &rps_lock] {
+ LOG_INFO("process dir {}", dir);
- auto r = load_replica(dir.c_str());
- if (r != nullptr) {
+ auto r = load_replica(dn, dir.c_str());
+ if (r == nullptr) {
+ return;
+ }
LOG_INFO("{}@{}: load replica '{}' success, <durable, "
"commit> = <{}, {}>, last_prepared_decree = {}",
r->get_gpid(),
@@ -660,17 +663,17 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
rps[r->get_gpid()]->dir());
rps[r->get_gpid()] = r;
- }
- },
- load_tasks.size()));
- load_tasks.back()->enqueue();
+ },
+ load_tasks.size()));
+ load_tasks.back()->enqueue();
+ }
}
for (auto &tsk : load_tasks) {
tsk->wait();
}
uint64_t finish_time = dsn_now_ms();
- dir_list.clear();
+ dirs_by_dn.clear();
load_tasks.clear();
LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms",
rps.size(),
@@ -1375,12 +1378,7 @@ void replica_stub::get_replica_info(replica_info &info, replica_ptr r)
info.last_committed_decree = r->last_committed_decree();
info.last_prepared_decree = r->last_prepared_decree();
info.last_durable_decree = r->last_durable_decree();
-
- dsn::error_code err = _fs_manager.get_disk_tag(r->dir(), info.disk_tag);
- if (dsn::ERR_OK != err) {
- LOG_WARNING("get disk tag of {} failed: {}", r->dir(), err);
- }
-
+ info.disk_tag = r->get_dir_node()->tag;
info.__set_manual_compact_status(r->get_manual_compact_status());
}
@@ -2082,7 +2080,7 @@ void replica_stub::open_replica(
_primary_address_str,
group_check ? "with" : "without",
dir);
- rep = load_replica(dir.c_str());
+ rep = load_replica(dn, dir.c_str());
// if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk
// migration
@@ -2107,7 +2105,7 @@ void replica_stub::open_replica(
boost::replace_first(
origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, "");
dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir);
- rep = load_replica(origin_dir.c_str());
+ rep = load_replica(origin_dn, origin_dir.c_str());
FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> void {});
}
@@ -2220,12 +2218,11 @@ replica *replica_stub::new_replica(gpid gpid,
}
const auto &dir = dn->replica_dir(app.app_type, gpid);
CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
- auto *rep =
- new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
+ auto *rep = new replica(this, gpid, app, dn, restore_if_necessary, is_duplication_follower);
error_code err;
if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) {
LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err);
- clear_on_failure(rep, dir, gpid);
+ clear_on_failure(rep);
return nullptr;
}
@@ -2235,14 +2232,14 @@ replica *replica_stub::new_replica(gpid gpid,
"previous detail error log",
rep->name(),
err);
- clear_on_failure(rep, dir, gpid);
+ clear_on_failure(rep);
return nullptr;
}
err = rep->initialize_on_new();
if (err != ERR_OK) {
LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err);
- clear_on_failure(rep, dir, gpid);
+ clear_on_failure(rep);
return nullptr;
}
@@ -2250,7 +2247,7 @@ replica *replica_stub::new_replica(gpid gpid,
return rep;
}
-replica *replica_stub::load_replica(const char *dir)
+replica *replica_stub::load_replica(dir_node *dn, const char *dir)
{
FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; });
@@ -2296,7 +2293,9 @@ replica *replica_stub::load_replica(const char *dir)
return nullptr;
}
- auto *rep = new replica(this, pid, info, dir, false);
+ // The replica's directory must exists when creating a replica.
+ CHECK_EQ(dir, dn->replica_dir(app_type, pid));
+ auto *rep = new replica(this, pid, info, dn, false);
err = rep->initialize_on_load();
if (err != ERR_OK) {
LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
@@ -2318,14 +2317,17 @@ replica *replica_stub::load_replica(const char *dir)
return rep;
}
-void replica_stub::clear_on_failure(replica *rep, const std::string &path, const gpid &pid)
+void replica_stub::clear_on_failure(replica *rep)
{
+ const auto rep_dir = rep->dir();
+ const auto pid = rep->get_gpid();
+
rep->close();
delete rep;
rep = nullptr;
// clear work on failure
- utils::filesystem::remove_path(path);
+ utils::filesystem::remove_path(rep_dir);
_fs_manager.remove_replica(pid);
}
@@ -2954,14 +2956,17 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
app_info *app,
const std::string &parent_dir)
{
- FAIL_POINT_INJECT_F("replica_stub_create_child_replica_if_not_found",
- [=](dsn::string_view) -> replica_ptr {
- replica *rep = new replica(this, child_pid, *app, "./", false);
- rep->_config.status = partition_status::PS_INACTIVE;
- _replicas.insert(replicas::value_type(child_pid, rep));
- LOG_INFO("mock create_child_replica_if_not_found succeed");
- return rep;
- });
+ FAIL_POINT_INJECT_F(
+ "replica_stub_create_child_replica_if_not_found", [=](dsn::string_view) -> replica_ptr {
+ const auto dn =
+ _fs_manager.create_child_replica_dir(app->app_type, child_pid, parent_dir);
+ CHECK_NOTNULL(dn, "");
+ auto *rep = new replica(this, child_pid, *app, dn, false);
+ rep->_config.status = partition_status::PS_INACTIVE;
+ _replicas.insert(replicas::value_type(child_pid, rep));
+ LOG_INFO("mock create_child_replica_if_not_found succeed");
+ return rep;
+ });
zauto_write_lock l(_replicas_lock);
auto it = _replicas.find(child_pid);
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index db517912c..c6aa984f9 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -339,10 +339,10 @@ private:
bool restore_if_necessary,
bool is_duplication_follower,
const std::string &parent_dir = "");
- // Load an existing replica from 'dir'.
- replica *load_replica(const char *dir);
+ // Load an existing replica which is located in 'dn' with 'dir' directory.
+ replica *load_replica(dir_node *dn, const char *dir);
// Clean up the memory state and on disk data if creating replica failed.
- void clear_on_failure(replica *rep, const std::string &path, const gpid &pid);
+ void clear_on_failure(replica *rep);
task_ptr begin_close_replica(replica_ptr r);
void close_replica(replica_ptr r);
void notify_replica_state_update(const replica_configuration &config, bool is_closing);
@@ -403,7 +403,6 @@ private:
friend class replica_bulk_loader;
friend class replica_split_manager;
friend class replica_disk_migrator;
-
friend class mock_replica_stub;
friend class duplication_sync_timer;
friend class duplication_sync_timer_test;
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 965b81ec6..104bd2c07 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -115,10 +115,10 @@ public:
mock_replica(replica_stub *stub,
gpid gpid,
const app_info &app,
- const char *dir,
+ dir_node *dn,
bool need_restore = false,
bool is_duplication_follower = false)
- : replica(stub, gpid, app, dir, need_restore, is_duplication_follower)
+ : replica(stub, gpid, app, dn, need_restore, is_duplication_follower)
{
_app = std::make_unique<replication::mock_replication_app_base>(this);
}
@@ -232,26 +232,22 @@ private:
typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;
inline std::unique_ptr<mock_replica>
-create_mock_replica(replica_stub *stub, int appid = 1, int partition_index = 1)
+create_mock_replica(replica_stub *stub, int app_id = 1, int partition_index = 1)
{
- gpid gpid(appid, partition_index);
+ gpid pid(app_id, partition_index);
app_info app_info;
app_info.app_type = "replica";
app_info.app_name = "temp";
- const auto *const dn =
- stub->get_fs_manager()->create_replica_dir_if_necessary(app_info.app_type, gpid);
+ auto *dn = stub->get_fs_manager()->create_replica_dir_if_necessary(app_info.app_type, pid);
CHECK_NOTNULL(dn, "");
- const auto replica_path = dn->replica_dir(app_info.app_type, gpid);
- CHECK(
- dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path);
- return std::make_unique<mock_replica>(stub, gpid, app_info, replica_path.c_str());
+ return std::make_unique<mock_replica>(stub, pid, app_info, dn);
}
class mock_replica_stub : public replica_stub
{
public:
- mock_replica_stub() { _fs_manager.initialize({"./"}, {"tag"}); }
+ mock_replica_stub() { _fs_manager.initialize({"test_dir"}, {"test"}); }
~mock_replica_stub() override = default;
@@ -290,15 +286,20 @@ public:
partition_status::type status = partition_status::PS_INACTIVE,
ballot b = 5,
bool need_restore = false,
- bool is_duplication_follower = false)
+ bool is_duplication_follower = false,
+ dir_node *dn = nullptr)
{
replica_configuration config;
config.ballot = b;
config.pid = pid;
config.status = status;
+ if (dn == nullptr) {
+ dn = _fs_manager.create_replica_dir_if_necessary(info.app_type, pid);
+ }
+ CHECK_NOTNULL(dn, "");
mock_replica_ptr rep =
- new mock_replica(this, pid, info, "./", need_restore, is_duplication_follower);
+ new mock_replica(this, pid, info, dn, need_restore, is_duplication_follower);
rep->set_replica_config(config);
_replicas[pid] = rep;
@@ -319,35 +320,46 @@ public:
auto dn = _fs_manager.create_replica_dir_if_necessary(info.app_type, pid);
CHECK_NOTNULL(dn, "");
- const auto &dir = dn->replica_dir(info.app_type, pid);
- CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
- auto *rep =
- new mock_replica(this, pid, info, dir.c_str(), need_restore, is_duplication_follower);
+ auto *rep = new mock_replica(this, pid, info, dn, need_restore, is_duplication_follower);
rep->set_replica_config(config);
return rep;
}
- void generate_replicas_base_dir_nodes_for_app(app_info mock_app,
- int primary_count_for_disk = 1,
- int secondary_count_for_disk = 2)
+ void generate_replicas_base_dir_nodes_for_app(const app_info &ai,
+ int primary_count_per_disk,
+ int secondary_count_per_disk)
{
- const auto &dir_nodes = _fs_manager._dir_nodes;
- for (auto &dir_node : dir_nodes) {
- const auto &replica_iter = dir_node->holding_replicas.find(mock_app.app_id);
- if (replica_iter == dir_node->holding_replicas.end()) {
- continue;
+ int partition_index = 0;
+ for (const auto &dn : _fs_manager.get_dir_nodes()) {
+ // Create 'partition_count' count of replicas.
+ if (partition_index >= ai.partition_count) {
+ break;
}
- const std::set<gpid> &pids = replica_iter->second;
- int primary_count = primary_count_for_disk;
- int secondary_count = secondary_count_for_disk;
- for (const gpid &pid : pids) {
- // generate primary replica and secondary replica.
+ int replica_count_per_disk = primary_count_per_disk + secondary_count_per_disk;
+ int primary_count = primary_count_per_disk;
+ int secondary_count = secondary_count_per_disk;
+ // Create 'replica_count_per_disk' count of replicas on 'dn'.
+ while (replica_count_per_disk-- > 0) {
+ gpid new_gpid(ai.app_id, partition_index++);
+ _fs_manager.specify_dir_for_new_replica_for_test(dn.get(), ai.app_type, new_gpid);
if (primary_count-- > 0) {
- add_replica(generate_replica_ptr(
- mock_app, pid, partition_status::PS_PRIMARY, mock_app.app_id));
+ // Create 'primary_count' count of primary replicas on 'dn'.
+ add_replica(generate_replica_ptr(ai,
+ new_gpid,
+ partition_status::PS_PRIMARY,
+ ai.app_id,
+ false,
+ false,
+ dn.get()));
} else if (secondary_count-- > 0) {
- add_replica(generate_replica_ptr(
- mock_app, pid, partition_status::PS_SECONDARY, mock_app.app_id));
+ // Create 'secondary_count' count of secondary replicas on 'dn'.
+ add_replica(generate_replica_ptr(ai,
+ new_gpid,
+ partition_status::PS_SECONDARY,
+ ai.app_id,
+ false,
+ false,
+ dn.get()));
}
}
}
diff --git a/src/replica/test/replica_disk_migrate_test.cpp b/src/replica/test/replica_disk_migrate_test.cpp
index 7c47abad2..024b039ce 100644
--- a/src/replica/test/replica_disk_migrate_test.cpp
+++ b/src/replica/test/replica_disk_migrate_test.cpp
@@ -176,7 +176,7 @@ TEST_F(replica_disk_migrate_test, migrate_disk_replica_check)
auto &request = *fake_migrate_rpc.mutable_request();
auto &response = fake_migrate_rpc.response();
- request.pid = dsn::gpid(app_info_1.app_id, 1);
+ request.pid = dsn::gpid(app_info_1.app_id, 0);
request.origin_disk = "tag_1";
request.target_disk = "tag_2";
diff --git a/src/replica/test/replica_disk_test.cpp b/src/replica/test/replica_disk_test.cpp
index 78f2dfd72..ce7441199 100644
--- a/src/replica/test/replica_disk_test.cpp
+++ b/src/replica/test/replica_disk_test.cpp
@@ -94,18 +94,17 @@ TEST_F(replica_disk_test, on_query_disk_info_all_app)
ASSERT_EQ(disk_infos.size(), 6);
int info_size = disk_infos.size();
- int app_id_1_partition_index = 1;
- int app_id_2_partition_index = 1;
+ int app_id_1_partition_index = 0;
+ int app_id_2_partition_index = 0;
for (int i = 0; i < info_size; i++) {
- if (disk_infos[i].tag == "tag_empty_1") {
+ if (disk_infos[i].holding_primary_replicas.empty() &&
+ disk_infos[i].holding_secondary_replicas.empty()) {
continue;
}
ASSERT_EQ(disk_infos[i].tag, "tag_" + std::to_string(i + 1));
ASSERT_EQ(disk_infos[i].full_dir, "./tag_" + std::to_string(i + 1));
ASSERT_EQ(disk_infos[i].disk_capacity_mb, 500);
ASSERT_EQ(disk_infos[i].disk_available_mb, (i + 1) * 50);
- // `holding_primary_replicas` and `holding_secondary_replicas` is std::map<app_id,
- // std::set<::dsn::gpid>>
ASSERT_EQ(disk_infos[i].holding_primary_replicas.size(), 2);
ASSERT_EQ(disk_infos[i].holding_secondary_replicas.size(), 2);
@@ -172,24 +171,21 @@ TEST_F(replica_disk_test, on_query_disk_info_one_app)
request.app_name = app_info_1.app_name;
stub->on_query_disk_info(fake_query_disk_rpc);
- auto &disk_infos_with_app_1 = fake_query_disk_rpc.response().disk_infos;
- int info_size = disk_infos_with_app_1.size();
- for (int i = 0; i < info_size; i++) {
- if (disk_infos_with_app_1[i].tag == "tag_empty_1") {
+ for (auto disk_info : fake_query_disk_rpc.response().disk_infos) {
+ if (disk_info.holding_primary_replicas.empty() &&
+ disk_info.holding_secondary_replicas.empty()) {
continue;
}
- // `holding_primary_replicas` and `holding_secondary_replicas` is std::map<app_id,
- // std::set<::dsn::gpid>>
- ASSERT_EQ(disk_infos_with_app_1[i].holding_primary_replicas.size(), 1);
- ASSERT_EQ(disk_infos_with_app_1[i].holding_secondary_replicas.size(), 1);
- ASSERT_EQ(disk_infos_with_app_1[i].holding_primary_replicas[app_info_1.app_id].size(),
+ ASSERT_EQ(disk_info.holding_primary_replicas.size(), 1);
+ ASSERT_EQ(disk_info.holding_secondary_replicas.size(), 1);
+ ASSERT_EQ(disk_info.holding_primary_replicas[app_info_1.app_id].size(),
app_id_1_primary_count_for_disk);
- ASSERT_EQ(disk_infos_with_app_1[i].holding_secondary_replicas[app_info_1.app_id].size(),
+ ASSERT_EQ(disk_info.holding_secondary_replicas[app_info_1.app_id].size(),
app_id_1_secondary_count_for_disk);
- ASSERT_TRUE(disk_infos_with_app_1[i].holding_primary_replicas.find(app_info_2.app_id) ==
- disk_infos_with_app_1[i].holding_primary_replicas.end());
- ASSERT_TRUE(disk_infos_with_app_1[i].holding_secondary_replicas.find(app_info_2.app_id) ==
- disk_infos_with_app_1[i].holding_secondary_replicas.end());
+ ASSERT_TRUE(disk_info.holding_primary_replicas.find(app_info_2.app_id) ==
+ disk_info.holding_primary_replicas.end());
+ ASSERT_TRUE(disk_info.holding_secondary_replicas.find(app_info_2.app_id) ==
+ disk_info.holding_secondary_replicas.end());
}
}
@@ -247,16 +243,16 @@ TEST_F(replica_disk_test, disk_status_test)
{disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}};
auto dn = stub->get_fs_manager()->get_dir_nodes()[0];
for (const auto &test : tests) {
- update_node_status(dn, test.old_status, test.new_status);
+ dn->status = test.new_status;
for (const auto &pids_of_app : dn->holding_replicas) {
for (const auto &pid : pids_of_app.second) {
replica_ptr rep = stub->get_replica(pid);
ASSERT_NE(nullptr, rep);
- ASSERT_EQ(test.new_status, rep->disk_space_insufficient());
+ ASSERT_EQ(test.new_status, rep->get_dir_node()->status);
}
}
}
- update_node_status(dn, disk_status::NORMAL, disk_status::NORMAL);
+ dn->status = disk_status::NORMAL;
}
TEST_F(replica_disk_test, add_new_disk_test)
diff --git a/src/replica/test/replica_disk_test_base.h b/src/replica/test/replica_disk_test_base.h
index 2bb37817f..08e7ffc39 100644
--- a/src/replica/test/replica_disk_test_base.h
+++ b/src/replica/test/replica_disk_test_base.h
@@ -39,16 +39,23 @@ public:
// tag_5 100*5 50*5 50%
// total 2500 750 30%
// replica info, for example:
- // dir_node primary/secondary
- //
- // tag_empty_1
- // tag_1 1.1 | 1.2,1.3
- // 2.1,2.2 | 2.3,2.4,2.5,2.6
- //
- // tag_2 1.4 | 1.5,1.6
- // 2.7,2.8 | 2.9,2.10,2.11,2.12,2.13
- // ...
- // ...
+ // dir_node primary | secondary
+ // --------------------------------+---------------------------
+ // tag_1 1.0 | 1.1 1.2
+ // 2.0 2.1 | 2.2 2.3 2.4 2.5
+ // --------------------------------+---------------------------
+ // tag_2 1.3 | 1.4 1.5
+ // 2.6 2.7 | 2.8 2.9 2.10 2.11
+ // --------------------------------+---------------------------
+ // tag_3 1.6 | 1.7 1.8
+ // 2.12 2.13 | 2.14 2.15 2.16 2.17
+ // --------------------------------+---------------------------
+ // tag_4 |
+ // --------------------------------+---------------------------
+ // tag_5 |
+ // --------------------------------+---------------------------
+ // tag_empty_1 |
+ // --------------------------------+---------------------------
replica_disk_test_base()
{
fail::setup();
@@ -93,22 +100,6 @@ public:
}
}
- void update_node_status(const std::shared_ptr<dir_node> &dn,
- disk_status::type old_status,
- disk_status::type new_status)
- {
- for (const auto &pids_of_app : dn->holding_replicas) {
- for (const auto &pid : pids_of_app.second) {
- replica_ptr rep = stub->get_replica(pid);
- ASSERT_NE(nullptr, rep);
- rep->set_disk_status(new_status);
- }
- }
- if (old_status != new_status) {
- dn->status = new_status;
- }
- }
-
void prepare_before_add_new_disk_test(const std::string &create_dir,
const std::string &check_rw)
{
@@ -169,14 +160,6 @@ private:
void generate_mock_dir_nodes(int num)
{
- int app_id_1_disk_holding_replica_count =
- app_id_1_primary_count_for_disk + app_id_1_secondary_count_for_disk;
- int app_id_2_disk_holding_replica_count =
- app_id_2_primary_count_for_disk + app_id_2_secondary_count_for_disk;
-
- int app_id_1_partition_index = 1;
- int app_id_2_partition_index = 1;
-
int64_t disk_capacity_mb = num * 100;
int count = 0;
while (count++ < num) {
@@ -194,18 +177,6 @@ private:
node_disk->full_dir); // open replica need the options
utils::filesystem::create_directory(node_disk->full_dir);
- int app_1_replica_count_per_disk = app_id_1_disk_holding_replica_count;
- while (app_1_replica_count_per_disk-- > 0) {
- node_disk->holding_replicas[app_info_1.app_id].emplace(
- gpid(app_info_1.app_id, app_id_1_partition_index++));
- }
-
- int app_2_replica_count_per_disk = app_id_2_disk_holding_replica_count;
- while (app_2_replica_count_per_disk-- > 0) {
- node_disk->holding_replicas[app_info_2.app_id].emplace(
- gpid(app_info_2.app_id, app_id_2_partition_index++));
- }
-
stub->_fs_manager._dir_nodes.emplace_back(node_disk);
}
}
diff --git a/src/replica/test/replica_learn_test.cpp b/src/replica/test/replica_learn_test.cpp
index 5f937e24e..8ff02993a 100644
--- a/src/replica/test/replica_learn_test.cpp
+++ b/src/replica/test/replica_learn_test.cpp
@@ -23,6 +23,7 @@
#include <string>
#include <utility>
+#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication_other_types.h"
#include "consensus_types.h"
@@ -31,6 +32,7 @@
#include "replica/duplication/test/duplication_test_base.h"
#include "replica/prepare_list.h"
#include "replica/replica_context.h"
+#include "utils/fmt_logging.h"
namespace dsn {
namespace replication {
@@ -44,11 +46,14 @@ public:
std::unique_ptr<mock_replica> create_duplicating_replica()
{
- gpid gpid(1, 1);
- app_info app_info;
- app_info.app_type = "replica";
- app_info.duplicating = true;
- auto r = std::make_unique<mock_replica>(stub.get(), gpid, app_info, "./");
+ gpid pid(1, 0);
+ app_info ai;
+ ai.app_type = "replica";
+ ai.duplicating = true;
+
+ dir_node *dn = stub->get_fs_manager()->find_best_dir_for_new_replica(pid);
+ CHECK_NOTNULL(dn, "");
+ auto r = std::make_unique<mock_replica>(stub.get(), pid, ai, dn);
r->as_primary();
return r;
}
diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp
index 8e09a8e03..45ca0eab3 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <gmock/gmock-matchers.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
@@ -23,6 +24,7 @@
#include <iostream>
#include <map>
#include <memory>
+#include <set>
#include <string>
#include <unordered_map>
#include <utility>
@@ -34,6 +36,7 @@
#include "common/gpid.h"
#include "common/replica_envs.h"
#include "common/replication.codes.h"
+#include "common/replication_other_types.h"
#include "consensus_types.h"
#include "dsn.layer2_types.h"
#include "http/http_server.h"
@@ -61,6 +64,7 @@
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
+#include "utils/test_macros.h"
namespace dsn {
namespace replication {
@@ -71,7 +75,7 @@ class replica_test : public replica_test_base
{
public:
replica_test()
- : pid(gpid(2, 1)),
+ : _pid(gpid(2, 1)),
_backup_id(dsn_now_ms()),
_provider_name("local_service"),
_policy_name("mock_policy")
@@ -83,7 +87,8 @@ public:
FLAGS_enable_http_server = false;
stub->install_perf_counters();
mock_app_info();
- _mock_replica = stub->generate_replica_ptr(_app_info, pid, partition_status::PS_PRIMARY, 1);
+ _mock_replica =
+ stub->generate_replica_ptr(_app_info, _pid, partition_status::PS_PRIMARY, 1);
// set FLAGS_cold_backup_root manually.
// FLAGS_cold_backup_root is set by configuration "replication.cold_backup_root",
@@ -153,7 +158,7 @@ public:
void test_on_cold_backup(const std::string user_specified_path = "")
{
backup_request req;
- req.pid = pid;
+ req.pid = _pid;
policy_info backup_policy_info;
backup_policy_info.__set_backup_provider_type(_provider_name);
backup_policy_info.__set_policy_name(_policy_name);
@@ -205,10 +210,10 @@ public:
bool is_checkpointing() { return _mock_replica->_is_manual_emergency_checkpointing; }
- bool has_gpid(gpid &gpid) const
+ bool has_gpid(gpid &pid) const
{
- for (const auto &node : stub->_fs_manager._dir_nodes) {
- if (node->has(gpid)) {
+ for (const auto &node : stub->_fs_manager.get_dir_nodes()) {
+ if (node->has(pid)) {
return true;
}
}
@@ -251,7 +256,7 @@ public:
public:
dsn::app_info _app_info;
- dsn::gpid pid;
+ dsn::gpid _pid;
mock_replica_ptr _mock_replica;
private:
@@ -274,7 +279,7 @@ TEST_F(replica_test, write_size_limited)
write_request->io_session = sim_net->create_client_session(rpc_address());
for (int i = 0; i < count; i++) {
- stub->on_client_write(pid, write_request);
+ stub->on_client_write(_pid, write_request);
}
ASSERT_EQ(get_write_size_exceed_threshold_count(), count);
@@ -402,6 +407,7 @@ TEST_F(replica_test, update_allow_ingest_behind_test)
TEST_F(replica_test, test_replica_backup_and_restore)
{
+ // TODO(yingchun): this test last too long time, optimize it!
test_on_cold_backup();
auto err = test_find_valid_checkpoint();
ASSERT_EQ(ERR_OK, err);
@@ -409,6 +415,7 @@ TEST_F(replica_test, test_replica_backup_and_restore)
TEST_F(replica_test, test_replica_backup_and_restore_with_specific_path)
{
+ // TODO(yingchun): this test last too long time, optimize it!
std::string user_specified_path = "test/backup";
test_on_cold_backup(user_specified_path);
auto err = test_find_valid_checkpoint(user_specified_path);
@@ -462,42 +469,54 @@ TEST_F(replica_test, test_query_last_checkpoint_info)
_mock_replica->set_last_committed_decree(200);
_mock_replica->on_query_last_checkpoint(resp);
ASSERT_EQ(resp.last_committed_decree, 200);
- ASSERT_EQ(resp.base_local_dir, "./data/checkpoint.100");
+ ASSERT_STR_CONTAINS(resp.base_local_dir, "/data/checkpoint.100");
}
TEST_F(replica_test, test_clear_on_failure)
{
+ // Clear up the remaining state.
+ auto *dn = stub->get_fs_manager()->find_replica_dir(_app_info.app_type, _pid);
+ if (dn != nullptr) {
+ dsn::utils::filesystem::remove_path(dn->replica_dir(_app_info.app_type, _pid));
+ }
+ dn->holding_replicas.clear();
+
// Disable failure detector to avoid connecting with meta server which is not started.
FLAGS_fd_disabled = true;
replica *rep =
- stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
+ stub->generate_replica(_app_info, _pid, partition_status::PS_PRIMARY, 1, false, true);
auto path = rep->dir();
- dsn::utils::filesystem::create_directory(path);
- ASSERT_TRUE(has_gpid(pid));
+ ASSERT_TRUE(has_gpid(_pid));
- stub->clear_on_failure(rep, path, pid);
+ stub->clear_on_failure(rep);
ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
- ASSERT_FALSE(has_gpid(pid));
+ ASSERT_FALSE(has_gpid(_pid));
}
TEST_F(replica_test, test_auto_trash)
{
+ // Clear up the remaining state.
+ auto *dn = stub->get_fs_manager()->find_replica_dir(_app_info.app_type, _pid);
+ if (dn != nullptr) {
+ dsn::utils::filesystem::remove_path(dn->replica_dir(_app_info.app_type, _pid));
+ }
+ dn->holding_replicas.clear();
+
// Disable failure detector to avoid connecting with meta server which is not started.
FLAGS_fd_disabled = true;
replica *rep =
- stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
+ stub->generate_replica(_app_info, _pid, partition_status::PS_PRIMARY, 1, false, true);
auto path = rep->dir();
- dsn::utils::filesystem::create_directory(path);
- ASSERT_TRUE(has_gpid(pid));
+ ASSERT_TRUE(has_gpid(_pid));
rep->handle_local_failure(ERR_RDB_CORRUPTION);
stub->wait_closing_replicas_finished();
ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
- dir_node *dn = stub->get_fs_manager()->get_dir_node(path);
+ dn = stub->get_fs_manager()->get_dir_node(path);
ASSERT_NE(dn, nullptr);
std::vector<std::string> subs;
ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs, false));
@@ -516,7 +535,7 @@ TEST_F(replica_test, test_auto_trash)
}
}
ASSERT_TRUE(found);
- ASSERT_FALSE(has_gpid(pid));
+ ASSERT_FALSE(has_gpid(_pid));
}
TEST_F(replica_test, update_deny_client_test)
diff --git a/src/server/test/hotkey_collector_test.cpp b/src/server/test/hotkey_collector_test.cpp
index c331b3cac..7b784a6d2 100644
--- a/src/server/test/hotkey_collector_test.cpp
+++ b/src/server/test/hotkey_collector_test.cpp
@@ -37,6 +37,7 @@
#include "server/test/message_utils.h"
#include "utils/error_code.h"
#include "utils/flags.h"
+#include "utils/fmt_logging.h"
#include "utils/rand.h"
namespace dsn {
@@ -221,7 +222,7 @@ TEST_F(fine_collector_test, fine_collector)
class hotkey_collector_test : public pegasus_server_test_base
{
public:
- hotkey_collector_test() { start(); }
+ hotkey_collector_test() { CHECK_EQ(::dsn::ERR_OK, start()); }
std::shared_ptr<pegasus::server::hotkey_collector> get_read_collector()
{
diff --git a/src/server/test/pegasus_server_test_base.h b/src/server/test/pegasus_server_test_base.h
index a5a530e41..7b1fe225d 100644
--- a/src/server/test/pegasus_server_test_base.h
+++ b/src/server/test/pegasus_server_test_base.h
@@ -23,6 +23,7 @@
#include <gtest/gtest.h>
#include <gmock/gmock.h>
+#include "common/fs_manager.h"
#include "replica/replica_stub.h"
#include "utils/filesystem.h"
@@ -44,16 +45,22 @@ public:
pegasus_server_test_base()
{
// Remove rdb to prevent rocksdb recovery from last test.
- dsn::utils::filesystem::remove_path("./data/rdb");
+ dsn::utils::filesystem::remove_path("./test_dir");
_replica_stub = new dsn::replication::replica_stub();
- _replica_stub->get_fs_manager()->initialize({"./"}, {"test_tag"});
+ _replica_stub->get_fs_manager()->initialize({"test_dir"}, {"test_tag"});
_gpid = dsn::gpid(100, 1);
dsn::app_info app_info;
app_info.app_type = "pegasus";
- _replica =
- new dsn::replication::replica(_replica_stub, _gpid, app_info, "./", false, false);
+ auto *dn = _replica_stub->get_fs_manager()->find_best_dir_for_new_replica(_gpid);
+ CHECK_NOTNULL(dn, "");
+ _replica = new dsn::replication::replica(_replica_stub, _gpid, app_info, dn, false, false);
+ const auto dir_data = dsn::utils::filesystem::path_combine(_replica->dir(), "data");
+ CHECK(dsn::utils::filesystem::create_directory(dir_data),
+ "create data dir {} failed",
+ dir_data);
+
_server = std::make_unique<mock_pegasus_server_impl>(_replica);
}
diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp
index f0ee6a8e4..24d88e656 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -25,18 +25,21 @@
#include <string>
#include <utility>
+#include "common/fs_manager.h"
#include "dsn.layer2_types.h"
#include "pegasus_key_schema.h"
#include "pegasus_server_test_base.h"
#include "pegasus_utils.h"
#include "pegasus_value_schema.h"
#include "replica/replica.h"
+#include "replica/replica_stub.h"
#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service.h"
#include "server/pegasus_write_service_impl.h"
#include "server/rocksdb_wrapper.h"
#include "utils/blob.h"
#include "utils/error_code.h"
+#include "utils/fmt_logging.h"
#include "utils/string_view.h"
namespace pegasus {
@@ -83,8 +86,9 @@ public:
app_info.app_type = "pegasus";
app_info.duplicating = true;
- _replica =
- new dsn::replication::replica(_replica_stub, _gpid, app_info, "./", false, false);
+ auto *dn = _replica_stub->get_fs_manager()->find_best_dir_for_new_replica(_gpid);
+ CHECK_NOTNULL(dn, "");
+ _replica = new dsn::replication::replica(_replica_stub, _gpid, app_info, dn, false, false);
_server = std::make_unique<mock_pegasus_server_impl>(_replica);
SetUp();
diff --git a/src/utils/test_macros.h b/src/utils/test_macros.h
index 6f9c88973..c1c9c3fd4 100644
--- a/src/utils/test_macros.h
+++ b/src/utils/test_macros.h
@@ -33,3 +33,9 @@
return; \
} \
} while (0)
+
+// Substring matches.
+#define ASSERT_STR_CONTAINS(str, substr) ASSERT_THAT(str, testing::HasSubstr(substr))
+
+#define ASSERT_STR_NOT_CONTAINS(str, substr) \
+ ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr)))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org