You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/05/25 06:52:20 UTC
[incubator-pegasus] branch master updated: refactor: improve the single-responsibility of class fs_manager (#1477)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d5f225967 refactor: improve the single-responsibility of class fs_manager (#1477)
d5f225967 is described below
commit d5f225967aa4575a4670f48dc1bd7dcd5d07ea9e
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Thu May 25 14:52:13 2023 +0800
refactor: improve the single-responsibility of class fs_manager (#1477)
https://github.com/apache/incubator-pegasus/issues/1383
This patch moves some functions to fs_manager which are more reasonable to be
responsibilities of class fs_manager rather than those of class replica_stub.
---
src/common/fs_manager.cpp | 194 ++++++++++++++++-----
src/common/fs_manager.h | 34 +++-
src/common/test/fs_manager_test.cpp | 146 +++++++++++++++-
src/meta/test/misc/misc.cpp | 20 ++-
src/replica/disk_cleaner.cpp | 9 +-
src/replica/disk_cleaner.h | 5 +-
.../test/load_from_private_log_test.cpp | 19 +-
src/replica/replica_stub.cpp | 183 +++++++------------
src/replica/replica_stub.h | 11 +-
src/replica/test/mock_utils.h | 23 ++-
src/replica/test/mutation_log_test.cpp | 1 -
src/replica/test/open_replica_test.cpp | 100 +++++------
src/replica/test/replica_disk_migrate_test.cpp | 35 ++--
src/replica/test/replica_disk_test.cpp | 23 +--
src/replica/test/replica_disk_test_base.h | 25 +--
src/replica/test/replica_test_base.h | 9 +-
src/server/test/pegasus_server_test_base.h | 1 +
17 files changed, 507 insertions(+), 331 deletions(-)
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 35a801053..6acdd38ee 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -34,13 +34,15 @@
#include "fs_manager.h"
-#include <stdio.h>
#include <algorithm>
#include <cmath>
+#include <iosfwd>
#include <utility>
#include "common/gpid.h"
#include "common/replication_enums.h"
+#include "fmt/core.h"
+#include "fmt/ostream.h"
#include "perf_counter/perf_counter.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
@@ -62,6 +64,11 @@ DSN_DEFINE_int32(replication,
"space insufficient");
DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);
+DSN_DEFINE_bool(replication,
+ ignore_broken_disk,
+ true,
+ "true means ignore broken data disk when initialize");
+
uint64_t dir_node::replicas_count() const
{
uint64_t sum = 0;
@@ -80,6 +87,11 @@ uint64_t dir_node::replicas_count(app_id id) const
return iter->second.size();
}
+std::string dir_node::replica_dir(dsn::string_view app_type, const dsn::gpid &pid) const
+{
+ return utils::filesystem::path_combine(full_dir, fmt::format("{}.{}", pid, app_type));
+}
+
bool dir_node::has(const gpid &pid) const
{
auto iter = holding_replicas.find(pid.get_app_id());
@@ -187,15 +199,44 @@ void fs_manager::initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags)
{
CHECK_EQ(data_dirs.size(), data_dir_tags.size());
- for (unsigned i = 0; i < data_dirs.size(); ++i) {
+
+ // Skip the data directories which are broken.
+ std::vector<std::shared_ptr<dir_node>> dir_nodes;
+ for (auto i = 0; i < data_dir_tags.size(); ++i) {
+ const auto &dir_tag = data_dir_tags[i];
+ const auto &dir = data_dirs[i];
+
+ // Check the status of this directory.
+ std::string cdir;
+ std::string err_msg;
+ if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
+ !utils::filesystem::check_dir_rw(dir, err_msg))) {
+ if (FLAGS_ignore_broken_disk) {
+ LOG_ERROR("data dir({}) is broken, ignore it, error: {}", dir, err_msg);
+ } else {
+ CHECK(false, err_msg);
+ }
+ // TODO(yingchun): Remove the 'continue' and mark its io error status, regardless
+ // the status of the disks, add all disks.
+ continue;
+ }
+
+ // Normalize the data directories.
std::string norm_path;
- utils::filesystem::get_normalized_path(data_dirs[i], norm_path);
- dir_node *n = new dir_node(data_dir_tags[i], norm_path);
- _dir_nodes.emplace_back(n);
- LOG_INFO(
- "{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, data_dir_tags[i]);
+ utils::filesystem::get_normalized_path(cdir, norm_path);
+
+ // Create and add this dir_node.
+ auto dn = std::make_shared<dir_node>(dir_tag, norm_path);
+ dir_nodes.emplace_back(dn);
+ LOG_INFO("mark data dir({}) as tag({})", norm_path, dir_tag);
+ }
+ CHECK_FALSE(dir_nodes.empty());
+
+ // Update the memory state.
+ {
+ zauto_read_lock l(_lock);
+ _dir_nodes.swap(dir_nodes);
}
- _available_data_dirs = data_dirs;
// Update the disk statistics.
update_disk_stat();
@@ -236,44 +277,40 @@ void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
}
-void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/ std::string &dir)
+dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
{
- char buffer[256];
- sprintf(buffer, "%d.%d.%s", pid.get_app_id(), pid.get_partition_index(), type.c_str());
-
- zauto_write_lock l(_lock);
-
dir_node *selected = nullptr;
-
- unsigned least_app_replicas_count = 0;
- unsigned least_total_replicas_count = 0;
-
- for (auto &n : _dir_nodes) {
- CHECK(!n->has(pid), "gpid({}) already in dir_node({})", pid, n->tag);
- unsigned app_replicas = n->replicas_count(pid.get_app_id());
- unsigned total_replicas = n->replicas_count();
-
- if (selected == nullptr || least_app_replicas_count > app_replicas) {
- least_app_replicas_count = app_replicas;
- least_total_replicas_count = total_replicas;
- selected = n.get();
- } else if (least_app_replicas_count == app_replicas &&
- least_total_replicas_count > total_replicas) {
- least_total_replicas_count = total_replicas;
- selected = n.get();
+ uint64_t least_app_replicas_count = 0;
+ uint64_t least_total_replicas_count = 0;
+ {
+ zauto_write_lock l(_lock);
+ // Try to find the dir_node with the least replica count.
+ for (const auto &dn : _dir_nodes) {
+ CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
+ uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id());
+ uint64_t total_replicas_count = dn->replicas_count();
+
+ if (selected == nullptr || least_app_replicas_count > app_replicas_count) {
+ least_app_replicas_count = app_replicas_count;
+ least_total_replicas_count = total_replicas_count;
+ selected = dn.get();
+ } else if (least_app_replicas_count == app_replicas_count &&
+ least_total_replicas_count > total_replicas_count) {
+ least_total_replicas_count = total_replicas_count;
+ selected = dn.get();
+ }
}
}
-
- LOG_INFO(
- "{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
- dsn_primary_address(),
- pid,
- selected->tag,
- least_app_replicas_count,
- least_total_replicas_count);
-
- selected->holding_replicas[pid.get_app_id()].emplace(pid);
- dir = utils::filesystem::path_combine(selected->full_dir, buffer);
+ if (selected != nullptr) {
+ LOG_INFO(
+ "{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
+ dsn_primary_address(),
+ pid,
+ selected->tag,
+ least_app_replicas_count,
+ least_total_replicas_count);
+ }
+ return selected;
}
void fs_manager::remove_replica(const gpid &pid)
@@ -342,7 +379,6 @@ void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string
utils::filesystem::get_normalized_path(data_dir, norm_path);
dir_node *n = new dir_node(tag, norm_path);
_dir_nodes.emplace_back(n);
- _available_data_dirs.emplace_back(data_dir);
LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);
}
@@ -359,5 +395,77 @@ bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::s
return false;
}
+dir_node *fs_manager::find_replica_dir(dsn::string_view app_type, gpid pid)
+{
+ std::string replica_dir;
+ dir_node *replica_dn = nullptr;
+ {
+ zauto_read_lock l(_lock);
+ for (const auto &dn : _dir_nodes) {
+ const auto dir = dn->replica_dir(app_type, pid);
+ if (utils::filesystem::directory_exists(dir)) {
+ // Check if there are duplicate replica instance directories.
+ CHECK(replica_dir.empty(), "replica dir conflict: {} <--> {}", dir, replica_dir);
+ replica_dir = dir;
+ replica_dn = dn.get();
+ }
+ }
+ }
+
+ return replica_dn;
+}
+
+dir_node *fs_manager::create_replica_dir_if_necessary(dsn::string_view app_type, gpid pid)
+{
+ // Try to find the replica directory.
+ auto replica_dn = find_replica_dir(app_type, pid);
+ if (replica_dn != nullptr) {
+ return replica_dn;
+ }
+
+ // Find a dir_node for the new replica.
+ replica_dn = find_best_dir_for_new_replica(pid);
+ if (replica_dn == nullptr) {
+ return nullptr;
+ }
+
+ const auto dir = replica_dn->replica_dir(app_type, pid);
+ if (!dsn::utils::filesystem::create_directory(dir)) {
+ LOG_ERROR("create replica directory({}) failed", dir);
+ return nullptr;
+ }
+
+ replica_dn->holding_replicas[pid.get_app_id()].emplace(pid);
+ return replica_dn;
+}
+
+dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type,
+ gpid child_pid,
+ const std::string &parent_dir)
+{
+ dir_node *child_dn = nullptr;
+ std::string child_dir;
+ {
+ zauto_read_lock l(_lock);
+ for (const auto &dn : _dir_nodes) {
+ child_dir = dn->replica_dir(app_type, child_pid);
+ // <parent_dir> = <prefix>/<gpid>.<app_type>
+ // check if <parent_dir>'s <prefix> is equal to <data_dir>
+ // TODO(yingchun): use a function instead.
+ if (parent_dir.substr(0, dn->full_dir.size() + 1) == dn->full_dir + "/") {
+ child_dn = dn.get();
+ break;
+ }
+ }
+ }
+ CHECK_NOTNULL(child_dn, "can not find parent_dir {} in data_dirs", parent_dir);
+ if (!dsn::utils::filesystem::create_directory(child_dir)) {
+ LOG_ERROR("create child replica directory({}) failed", child_dir);
+ return nullptr;
+ }
+ add_replica(child_pid, child_dir);
+ return child_dn;
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index ebdb3640a..0518897a7 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -31,6 +31,7 @@
#include "perf_counter/perf_counter_wrapper.h"
#include "utils/error_code.h"
#include "utils/flags.h"
+#include "utils/string_view.h"
#include "utils/zlocks.h"
namespace dsn {
@@ -72,6 +73,9 @@ public:
// and protected by the lock in fs_manager.
uint64_t replicas_count(app_id id) const;
uint64_t replicas_count() const;
+ // Construct the replica dir for the given 'app_type' and 'pid'.
+ // NOTE: Just construct the string, the directory will not be created.
+ std::string replica_dir(dsn::string_view app_type, const dsn::gpid &pid) const;
bool has(const dsn::gpid &pid) const;
uint64_t remove(const dsn::gpid &pid);
bool update_disk_stat(const bool update_disk_status);
@@ -86,23 +90,36 @@ public:
// NOTE: 'data_dirs' and 'data_dir_tags' must have the same size and in the same order.
void initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags);
-
+ // Try to find the best dir_node to place the new replica. The less replica count the
+ // dir_node has, which means the load is lower generally, the higher opportunity it
+ // will be selected.
+ // TODO(yingchun): consider the disk capacity and available space.
+ // NOTE: the 'pid' must not exist in any dir_nodes.
+ dir_node *find_best_dir_for_new_replica(const dsn::gpid &pid) const;
dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
- void allocate_dir(const dsn::gpid &pid,
- const std::string &type,
- /*out*/ std::string &dir);
void add_replica(const dsn::gpid &pid, const std::string &pid_dir);
+ // Find the replica instance directory.
+ dir_node *find_replica_dir(dsn::string_view app_type, gpid pid);
+ // Similar to the above, but it will create a new directory if not found.
+ dir_node *create_replica_dir_if_necessary(dsn::string_view app_type, gpid pid);
+ // Similar to the above, and will create a directory for the child on the same dir_node
+ // of parent.
+ // During partition split, we should guarantee child replica and parent replica share the
+ // same data dir.
+ dir_node *create_child_replica_dir(dsn::string_view app_type,
+ gpid child_pid,
+ const std::string &parent_dir);
void remove_replica(const dsn::gpid &pid);
bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
void update_disk_stat(bool check_status_changed = true);
void add_new_dir_node(const std::string &data_dir, const std::string &tag);
- bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
- const std::vector<std::string> &get_available_data_dirs() const
+ const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
{
zauto_read_lock l(_lock);
- return _available_data_dirs;
+ return _dir_nodes;
}
+ bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
private:
void reset_disk_stat()
@@ -128,7 +145,6 @@ private:
int _max_available_ratio = 0;
std::vector<std::shared_ptr<dir_node>> _dir_nodes;
- std::vector<std::string> _available_data_dirs;
// Used for disk available space check
// disk status will be updated periodically, this vector record nodes whose disk_status changed
@@ -147,7 +163,9 @@ private:
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
+ FRIEND_TEST(fs_manager, find_best_dir_for_new_replica);
FRIEND_TEST(fs_manager, get_dir_node);
+ FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
FRIEND_TEST(replica_test, test_auto_trash);
};
} // replication
diff --git a/src/common/test/fs_manager_test.cpp b/src/common/test/fs_manager_test.cpp
index 2d1f982f8..a1733741d 100644
--- a/src/common/test/fs_manager_test.cpp
+++ b/src/common/test/fs_manager_test.cpp
@@ -17,19 +17,57 @@
* under the License.
*/
+// IWYU pragma: no_include <ext/alloc_traits.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
+#include <stdint.h>
+#include <map>
#include <memory>
+#include <ostream>
+#include <set>
#include <string>
+#include <vector>
#include "common/fs_manager.h"
+#include "common/gpid.h"
+#include "common/replication_other_types.h"
#include "metadata_types.h"
#include "utils/fail_point.h"
+#include "utils/filesystem.h"
+
+using namespace dsn::utils::filesystem;
namespace dsn {
namespace replication {
+TEST(dir_node, replica_dir)
+{
+ dir_node dn("tag", "path");
+ ASSERT_EQ("path/1.0.test", dn.replica_dir("test", gpid(1, 0)));
+}
+
+TEST(fs_manager, initialize)
+{
+ fail::setup();
+ struct broken_disk_test
+ {
+ std::string create_dir_ok;
+ std::string check_dir_rw_ok;
+ int32_t data_dir_size;
+ } tests[]{{"true", "true", 3}, {"true", "false", 2}, {"false", "false", 2}};
+ int i = 0;
+ for (const auto &test : tests) {
+ fail::cfg("filesystem_create_directory", "return(" + test.create_dir_ok + ")");
+ fail::cfg("filesystem_check_dir_rw", "return(" + test.check_dir_rw_ok + ")");
+ fs_manager fm;
+ fm.initialize({"disk1", "disk2", "disk3"}, {"tag1", "tag2", "tag3"});
+ ASSERT_EQ(test.data_dir_size, fm.get_dir_nodes().size()) << i;
+ i++;
+ }
+ fail::teardown();
+}
+
TEST(fs_manager, dir_update_disk_status)
{
std::shared_ptr<dir_node> node = std::make_shared<dir_node>("tag", "path");
@@ -64,19 +102,113 @@ TEST(fs_manager, dir_update_disk_status)
TEST(fs_manager, get_dir_node)
{
fs_manager fm;
- fm.initialize({"/data1"}, {"data1"});
+ fm.initialize({"./data1"}, {"data1"});
+ const auto &dns = fm.get_dir_nodes();
+ ASSERT_EQ(1, dns.size());
+ const auto &base_dir =
+ dns[0]->full_dir.substr(0, dns[0]->full_dir.size() - std::string("/data1").size());
ASSERT_EQ(nullptr, fm.get_dir_node(""));
ASSERT_EQ(nullptr, fm.get_dir_node("/"));
- ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
- ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
- ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));
+ ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1"));
+ ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/"));
+ ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/replica1"));
+
+ ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2"));
+ ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/"));
+ ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/replica1"));
+}
+
+TEST(fs_manager, find_replica_dir)
+{
+ fs_manager fm;
+ fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", "data3"});
+
+ const char *app_type = "find_replica_dir";
+ gpid test_pid(1, 0);
+
+ // Clear up the remaining directories if exist.
+ for (const auto &dn : fm.get_dir_nodes()) {
+ remove_path(dn->replica_dir(app_type, test_pid));
+ }
+
+ ASSERT_EQ(nullptr, fm.find_replica_dir(app_type, test_pid));
+ auto dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+ ASSERT_NE(nullptr, dn);
+ const auto dir = dn->replica_dir(app_type, test_pid);
+ ASSERT_TRUE(directory_exists(dir));
+ auto dn1 = fm.find_replica_dir(app_type, test_pid);
+ ASSERT_EQ(dn, dn1);
+}
+
+TEST(fs_manager, create_replica_dir_if_necessary)
+{
+ fs_manager fm;
+
+ const char *app_type = "create_replica_dir_if_necessary";
+ gpid test_pid(1, 0);
- ASSERT_EQ(nullptr, fm.get_dir_node("/data2"));
- ASSERT_EQ(nullptr, fm.get_dir_node("/data2/"));
- ASSERT_EQ(nullptr, fm.get_dir_node("/data2/replica1"));
+ // Could not find a valid dir_node.
+ ASSERT_EQ(nullptr, fm.create_replica_dir_if_necessary(app_type, test_pid));
+
+ // It's able to create a dir for the replica after a valid dir_node has been added.
+ fm.add_new_dir_node("./data1", "data1");
+ dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+ ASSERT_NE(nullptr, dn);
+ ASSERT_EQ("data1", dn->tag);
}
+TEST(fs_manager, create_child_replica_dir)
+{
+ fs_manager fm;
+ fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", "data3"});
+
+ const char *app_type = "create_child_replica_dir";
+ gpid test_pid(1, 0);
+ gpid test_child_pid(1, 0);
+
+ dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+ ASSERT_NE(nullptr, dn);
+ const auto dir = dn->replica_dir(app_type, test_pid);
+
+ auto child_dn = fm.create_child_replica_dir(app_type, test_child_pid, dir);
+ ASSERT_EQ(dn, child_dn);
+ const auto child_dir = child_dn->replica_dir(app_type, test_child_pid);
+ ASSERT_TRUE(directory_exists(child_dir));
+ ASSERT_EQ(dir, child_dir);
+}
+
+TEST(fs_manager, find_best_dir_for_new_replica)
+{
+ // dn1 | 1.0, 1.1 +1.6
+ // dn2 | 1.2, 1.3 +1.7 2.0
+ // dn3 | 1.4 +1.5 +1.7 2.1
+ auto dn1 = std::make_shared<dir_node>("./data1", "data1");
+ dn1->holding_replicas[1] = {gpid(1, 0), gpid(1, 1)};
+ auto dn2 = std::make_shared<dir_node>("./data2", "data2");
+ dn2->holding_replicas[1] = {gpid(1, 2), gpid(1, 3)};
+ dn2->holding_replicas[2] = {gpid(2, 0)};
+ auto dn3 = std::make_shared<dir_node>("./data3", "data3");
+ dn3->holding_replicas[1] = {gpid(1, 4)};
+ dn3->holding_replicas[2] = {gpid(2, 1)};
+ fs_manager fm;
+ fm._dir_nodes = {dn1, dn2, dn3};
+
+ gpid pid_1_5(1, 5);
+ auto dn = fm.find_best_dir_for_new_replica(pid_1_5);
+ ASSERT_EQ(dn3.get(), dn);
+ dn->holding_replicas[pid_1_5.get_app_id()].emplace(pid_1_5);
+
+ gpid pid_1_6(1, 6);
+ dn = fm.find_best_dir_for_new_replica(pid_1_6);
+ ASSERT_EQ(dn1.get(), dn);
+ dn->holding_replicas[pid_1_6.get_app_id()].emplace(pid_1_6);
+
+ gpid pid_1_7(1, 7);
+ dn = fm.find_best_dir_for_new_replica(pid_1_7);
+ ASSERT_TRUE(dn == dn2.get() || dn == dn3.get());
+ dn->holding_replicas[pid_1_7.get_app_id()].emplace(pid_1_7);
+}
} // namespace replication
} // namespace dsn
diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp
index aacf8c043..25c16d675 100644
--- a/src/meta/test/misc/misc.cpp
+++ b/src/meta/test/misc/misc.cpp
@@ -34,6 +34,7 @@
#include <cstdint>
#include <cstdlib>
#include <iostream>
+#include <set>
#include <string>
#include <thread>
#include <unordered_map>
@@ -46,7 +47,6 @@
#include "duplication_types.h"
#include "meta_admin_types.h"
#include "metadata_types.h"
-#include "utils/error_code.h"
#include "utils/fmt_logging.h"
#include "utils/rand.h"
@@ -240,19 +240,21 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo
std::string dir;
replica_info ri;
switch (act.type) {
- case config_type::CT_ASSIGN_PRIMARY:
- target_manager->allocate_dir(pid, "test", dir);
- CHECK_EQ(dsn::ERR_OK, target_manager->get_disk_tag(dir, ri.disk_tag));
+ case config_type::CT_ASSIGN_PRIMARY: {
+ auto selected = target_manager->find_best_dir_for_new_replica(pid);
+ CHECK_NOTNULL(selected, "");
+ selected->holding_replicas[pid.get_app_id()].emplace(pid);
cc->collect_serving_replica(act.target, ri);
break;
-
+ }
case config_type::CT_ADD_SECONDARY:
- case config_type::CT_ADD_SECONDARY_FOR_LB:
- node_manager->allocate_dir(pid, "test", dir);
- CHECK_EQ(dsn::ERR_OK, node_manager->get_disk_tag(dir, ri.disk_tag));
+ case config_type::CT_ADD_SECONDARY_FOR_LB: {
+ auto selected = node_manager->find_best_dir_for_new_replica(pid);
+ CHECK_NOTNULL(selected, "");
+ selected->holding_replicas[pid.get_app_id()].emplace(pid);
cc->collect_serving_replica(act.node, ri);
break;
-
+ }
case config_type::CT_DOWNGRADE_TO_SECONDARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
break;
diff --git a/src/replica/disk_cleaner.cpp b/src/replica/disk_cleaner.cpp
index a1128f300..04dd174bb 100644
--- a/src/replica/disk_cleaner.cpp
+++ b/src/replica/disk_cleaner.cpp
@@ -24,6 +24,7 @@
#include <sys/types.h>
#include <algorithm>
+#include "common/fs_manager.h"
#include "runtime/api_layer1.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
@@ -67,14 +68,14 @@ const std::string kFolderSuffixBak = ".bak";
const std::string kFolderSuffixOri = ".ori";
const std::string kFolderSuffixTmp = ".tmp";
-error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
+error_s disk_remove_useless_dirs(const std::vector<std::shared_ptr<dir_node>> &dir_nodes,
/*output*/ disk_cleaning_report &report)
{
std::vector<std::string> sub_list;
- for (auto &dir : data_dirs) {
+ for (const auto &dn : dir_nodes) {
std::vector<std::string> tmp_list;
- if (!dsn::utils::filesystem::get_subdirectories(dir, tmp_list, false)) {
- LOG_WARNING("gc_disk: failed to get subdirectories in {}", dir);
+ if (!dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false)) {
+ LOG_WARNING("gc_disk: failed to get subdirectories in {}", dn->full_dir);
return error_s::make(ERR_OBJECT_NOT_FOUND, "failed to get subdirectories");
}
sub_list.insert(sub_list.end(), tmp_list.begin(), tmp_list.end());
diff --git a/src/replica/disk_cleaner.h b/src/replica/disk_cleaner.h
index 91fdb6805..7961d084c 100644
--- a/src/replica/disk_cleaner.h
+++ b/src/replica/disk_cleaner.h
@@ -18,6 +18,7 @@
*/
#pragma once
+#include <memory>
#include <string>
#include <vector>
@@ -26,6 +27,8 @@
namespace dsn {
namespace replication {
+struct dir_node;
+
DSN_DECLARE_uint64(gc_disk_error_replica_interval_seconds);
DSN_DECLARE_uint64(gc_disk_garbage_replica_interval_seconds);
DSN_DECLARE_uint64(gc_disk_migration_tmp_replica_interval_seconds);
@@ -49,7 +52,7 @@ struct disk_cleaning_report
};
// Removes the useless data from data directories.
-extern error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
+extern error_s disk_remove_useless_dirs(const std::vector<std::shared_ptr<dir_node>> &dir_nodes,
/*output*/ disk_cleaning_report &report);
inline bool is_data_dir_removable(const std::string &dir)
diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp b/src/replica/duplication/test/load_from_private_log_test.cpp
index c3edde95f..a9e7edaed 100644
--- a/src/replica/duplication/test/load_from_private_log_test.cpp
+++ b/src/replica/duplication/test/load_from_private_log_test.cpp
@@ -16,6 +16,7 @@
// under the License.
#include <boost/filesystem/path.hpp>
+#include <boost/system/error_code.hpp>
#include <fcntl.h>
#include <fmt/core.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
@@ -319,15 +320,21 @@ TEST_F(load_from_private_log_test, handle_real_private_log)
};
for (auto tt : tests) {
- boost::filesystem::path file(tt.fname);
- boost::filesystem::copy_file(
- file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists);
-
// reset replica to specified gpid
duplicator.reset(nullptr);
- _replica = create_mock_replica(
- stub.get(), tt.id.get_app_id(), tt.id.get_partition_index(), _log_dir.c_str());
+ _replica = create_mock_replica(stub.get(), tt.id.get_app_id(), tt.id.get_partition_index());
+
+ // Update '_log_dir' to the corresponding replica created above.
+ _log_dir = _replica->dir();
+
+ // Copy the log file to '_log_dir'
+ boost::filesystem::path file(tt.fname);
+ boost::system::error_code ec;
+ boost::filesystem::copy_file(
+ file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists, ec);
+ ASSERT_TRUE(!ec);
+ // Start to verify.
load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1, 0);
}
}
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 77a0d58f6..fa814b66e 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -37,7 +37,6 @@
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <fmt/core.h>
#include <fmt/ostream.h>
-#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -100,11 +99,6 @@
namespace dsn {
namespace replication {
-
-DSN_DEFINE_bool(replication,
- ignore_broken_disk,
- true,
- "true means ignore broken data disk when initialize");
DSN_DEFINE_bool(replication,
deny_client_on_start,
false,
@@ -603,29 +597,32 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
}
}
- // init dirs
+ // Initialize the file system manager.
+ _fs_manager.initialize(_options.data_dirs, _options.data_dir_tags);
+
+ // TODO(yingchun): remove the slog related code.
+ // Create slog directory if it does not exist.
std::string cdir;
std::string err_msg;
- CHECK(
- dsn::utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), "{}", err_msg);
+ CHECK(utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), err_msg);
_options.slog_dir = cdir;
- initialize_fs_manager(_options.data_dirs, _options.data_dir_tags);
+ // Initialize slog.
_log = new mutation_log_shared(_options.slog_dir,
FLAGS_log_shared_file_size_mb,
FLAGS_log_shared_force_flush,
&_counter_shared_log_recent_write_size);
LOG_INFO("slog_dir = {}", _options.slog_dir);
- // init rps
+ // Start to load replicas in available data directories.
LOG_INFO("start to load replicas");
std::vector<std::string> dir_list;
- for (auto &dir : _fs_manager.get_available_data_dirs()) {
+ for (const auto &dn : _fs_manager.get_dir_nodes()) {
std::vector<std::string> tmp_list;
- CHECK(dsn::utils::filesystem::get_subdirectories(dir, tmp_list, false),
+ CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false),
"Fail to get subdirectories in {}.",
- dir);
+ dn->full_dir);
dir_list.insert(dir_list.end(), tmp_list.begin(), tmp_list.end());
}
@@ -830,36 +827,6 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
}
}
-void replica_stub::initialize_fs_manager(const std::vector<std::string> &data_dirs,
- const std::vector<std::string> &data_dir_tags)
-{
- std::string cdir;
- std::string err_msg;
- int count = 0;
- std::vector<std::string> available_dirs;
- std::vector<std::string> available_dir_tags;
- for (auto i = 0; i < data_dir_tags.size(); ++i) {
- const auto &dir = data_dirs[i];
- if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
- !utils::filesystem::check_dir_rw(dir, err_msg))) {
- if (FLAGS_ignore_broken_disk) {
- LOG_WARNING("data dir[{}] is broken, ignore it, error:{}", dir, err_msg);
- } else {
- CHECK(false, "{}", err_msg);
- }
- continue;
- }
- LOG_INFO("data_dirs[{}] = {}", count, cdir);
- available_dirs.emplace_back(cdir);
- available_dir_tags.emplace_back(data_dir_tags[i]);
- count++;
- }
-
- CHECK_GT_MSG(
- available_dirs.size(), 0, "initialize fs manager failed, no available data directory");
- _fs_manager.initialize(available_dirs, available_dir_tags);
-}
-
void replica_stub::initialize_start()
{
if (_is_running) {
@@ -1124,7 +1091,8 @@ void replica_stub::on_query_disk_info(query_disk_info_rpc rpc)
int app_id = 0;
if (!req.app_name.empty()) {
zauto_read_lock l(_replicas_lock);
- if (!(app_id = get_app_id_from_replicas(req.app_name))) {
+ app_id = get_app_id_from_replicas(req.app_name);
+ if (app_id == 0) {
resp.err = ERR_OBJECT_NOT_FOUND;
return;
}
@@ -1218,7 +1186,7 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
std::vector<std::string> data_dirs;
std::vector<std::string> data_dir_tags;
- std::string err_msg = "";
+ std::string err_msg;
if (disk_str.empty() ||
!replication_options::get_data_dir_and_tag(
disk_str, "", "replica", data_dirs, data_dir_tags, err_msg)) {
@@ -1252,6 +1220,8 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
}
LOG_INFO("Add a new disk in fs_manager, data_dir={}, tag={}", cdir, data_dir_tags[i]);
+ // TODO(yingchun): there is a gap between _fs_manager.is_dir_node_exist() and
+ // _fs_manager.add_new_dir_node() which is not atomic.
_fs_manager.add_new_dir_node(cdir, data_dir_tags[i]);
}
}
@@ -1768,9 +1738,7 @@ void replica_stub::init_gc_for_test()
void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
{
- std::string replica_path;
std::pair<app_info, replica_info> closed_info;
-
{
zauto_write_lock l(_replicas_lock);
auto iter = _closed_replicas.find(id);
@@ -1778,26 +1746,30 @@ void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
return;
closed_info = iter->second;
_closed_replicas.erase(iter);
- _fs_manager.remove_replica(id);
}
+ _fs_manager.remove_replica(id);
- replica_path = get_replica_dir(closed_info.first.app_type.c_str(), id, false);
- if (replica_path.empty()) {
+ const auto *const dn = _fs_manager.find_replica_dir(closed_info.first.app_type, id);
+ if (dn == nullptr) {
LOG_WARNING(
"gc closed replica({}.{}) failed, no exist data", id, closed_info.first.app_type);
return;
}
+ const auto replica_path = dn->replica_dir(closed_info.first.app_type, id);
+ CHECK(
+ dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path);
LOG_INFO("start to move replica({}) as garbage, path: {}", id, replica_path);
- char rename_path[1024];
- sprintf(rename_path, "%s.%" PRIu64 ".gar", replica_path.c_str(), dsn_now_us());
+ const auto rename_path = fmt::format("{}.{}.gar", replica_path, dsn_now_us());
if (!dsn::utils::filesystem::rename_path(replica_path, rename_path)) {
LOG_WARNING("gc_replica: failed to move directory '{}' to '{}'", replica_path, rename_path);
// if gc the replica failed, add it back
- zauto_write_lock l(_replicas_lock);
+ {
+ zauto_write_lock l(_replicas_lock);
+ _closed_replicas.emplace(id, closed_info);
+ }
_fs_manager.add_replica(id, replica_path);
- _closed_replicas.emplace(id, closed_info);
} else {
LOG_WARNING("gc_replica: replica_dir_op succeed to move directory '{}' to '{}'",
replica_path,
@@ -2014,7 +1986,7 @@ void replica_stub::on_disk_stat()
uint64_t start = dsn_now_ns();
disk_cleaning_report report{};
- dsn::replication::disk_remove_useless_dirs(_fs_manager.get_available_data_dirs(), report);
+ dsn::replication::disk_remove_useless_dirs(_fs_manager.get_dir_nodes(), report);
_fs_manager.update_disk_stat();
update_disk_holding_replicas();
update_disks_status();
@@ -2097,9 +2069,12 @@ void replica_stub::open_replica(
const std::shared_ptr<group_check_request> &group_check,
const std::shared_ptr<configuration_update_request> &configuration_update)
{
- std::string dir = get_replica_dir(app.app_type.c_str(), id, false);
- replica_ptr rep = nullptr;
- if (!dir.empty()) {
+ replica_ptr rep;
+ std::string dir;
+ auto dn = _fs_manager.find_replica_dir(app.app_type, id);
+ if (dn != nullptr) {
+ dir = dn->replica_dir(app.app_type, id);
+ CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
// NOTICE: if partition is DDD, and meta select one replica as primary, it will execute the
// load-process because of a.b.pegasus is exist, so it will never execute the restore
// process below
@@ -2113,13 +2088,15 @@ void replica_stub::open_replica(
// if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk
// migration
if (rep == nullptr) {
- std::string origin_tmp_dir = get_replica_dir(
- fmt::format("{}{}", app.app_type, replica_disk_migrator::kReplicaDirOriginSuffix)
- .c_str(),
- id,
- false);
- if (!origin_tmp_dir.empty()) {
- LOG_INFO("mark the dir {} is garbage, start revert and load disk migration origin "
+ const auto origin_dir_type =
+ fmt::format("{}{}", app.app_type, replica_disk_migrator::kReplicaDirOriginSuffix);
+ const auto origin_dn = _fs_manager.find_replica_dir(origin_dir_type, id);
+ if (origin_dn != nullptr) {
+ const auto origin_tmp_dir = origin_dn->replica_dir(origin_dir_type, id);
+ CHECK(dsn::utils::filesystem::directory_exists(origin_tmp_dir),
+ "dir({}) not exist",
+ origin_tmp_dir);
+ LOG_INFO("mark the dir {} as garbage, start revert and load disk migration origin "
"replica data({})",
dir,
origin_tmp_dir);
@@ -2232,12 +2209,18 @@ replica *replica_stub::new_replica(gpid gpid,
bool is_duplication_follower,
const std::string &parent_dir)
{
- std::string dir;
+ dir_node *dn = nullptr;
if (parent_dir.empty()) {
- dir = get_replica_dir(app.app_type.c_str(), gpid);
+ dn = _fs_manager.create_replica_dir_if_necessary(app.app_type, gpid);
} else {
- dir = get_child_dir(app.app_type.c_str(), gpid, parent_dir);
+ dn = _fs_manager.create_child_replica_dir(app.app_type, gpid, parent_dir);
}
+ if (dn == nullptr) {
+ LOG_ERROR("could not allocate a new directory for replica {}", gpid);
+ return nullptr;
+ }
+ const auto &dir = dn->replica_dir(app.app_type, gpid);
+ CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
auto *rep =
new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
error_code err;
@@ -2883,44 +2866,6 @@ void replica_stub::close()
_is_running = false;
}
-std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new)
-{
- std::string gpid_str = fmt::format("{}.{}", id, app_type);
- std::string replica_dir;
- bool is_dir_exist = false;
- for (const std::string &data_dir : _fs_manager.get_available_data_dirs()) {
- std::string dir = utils::filesystem::path_combine(data_dir, gpid_str);
- if (utils::filesystem::directory_exists(dir)) {
- CHECK(!is_dir_exist, "replica dir conflict: {} <--> {}", dir, replica_dir);
- replica_dir = dir;
- is_dir_exist = true;
- }
- }
- if (replica_dir.empty() && create_new) {
- _fs_manager.allocate_dir(id, app_type, replica_dir);
- }
- return replica_dir;
-}
-
-std::string
-replica_stub::get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir)
-{
- std::string gpid_str = fmt::format("{}.{}", child_pid.to_string(), app_type);
- std::string child_dir;
- for (const std::string &data_dir : _fs_manager.get_available_data_dirs()) {
- std::string dir = utils::filesystem::path_combine(data_dir, gpid_str);
- // <parent_dir> = <prefix>/<gpid>.<app_type>
- // check if <parent_dir>'s <prefix> is equal to <data_dir>
- if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") {
- child_dir = dir;
- _fs_manager.add_replica(child_pid, child_dir);
- break;
- }
- }
- CHECK(!child_dir.empty(), "can not find parent_dir {} in data_dirs", parent_dir);
- return child_dir;
-}
-
#ifdef DSN_ENABLE_GPERF
// Get tcmalloc numeric property (name is "prop") value.
// Return -1 if get property failed (property we used will be greater than zero)
@@ -3094,22 +3039,20 @@ void replica_stub::on_update_child_group_partition_count(update_child_group_part
void replica_stub::update_disk_holding_replicas()
{
- for (const auto &dir_node : _fs_manager._dir_nodes) {
- // clear the holding_primary_replicas/holding_secondary_replicas and re-calculate it from
- // holding_replicas
- dir_node->holding_primary_replicas.clear();
- dir_node->holding_secondary_replicas.clear();
- for (const auto &holding_replicas : dir_node->holding_replicas) {
- const std::set<dsn::gpid> &pids = holding_replicas.second;
+ for (const auto &dn : _fs_manager.get_dir_nodes()) {
+ dn->holding_primary_replicas.clear();
+ dn->holding_secondary_replicas.clear();
+ for (const auto &holding_replicas : dn->holding_replicas) {
+ const auto &pids = holding_replicas.second;
for (const auto &pid : pids) {
- replica_ptr replica = get_replica(pid);
- if (replica == nullptr) {
+ const auto rep = get_replica(pid);
+ if (rep == nullptr) {
continue;
}
- if (replica->status() == partition_status::PS_PRIMARY) {
- dir_node->holding_primary_replicas[holding_replicas.first].emplace(pid);
- } else if (replica->status() == partition_status::PS_SECONDARY) {
- dir_node->holding_secondary_replicas[holding_replicas.first].emplace(pid);
+ if (rep->status() == partition_status::PS_PRIMARY) {
+ dn->holding_primary_replicas[holding_replicas.first].emplace(pid);
+ } else if (rep->status() == partition_status::PS_SECONDARY) {
+ dn->holding_secondary_replicas[holding_replicas.first].emplace(pid);
}
}
}
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 12f17a1cf..00d96f447 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -139,8 +139,6 @@ public:
//
void initialize(const replication_options &opts, bool clear = false);
void initialize(bool clear = false);
- void initialize_fs_manager(const std::vector<std::string> &data_dirs,
- const std::vector<std::string> &data_dir_tags);
void set_options(const replication_options &opts) { _options = opts; }
void open_service();
void close();
@@ -202,12 +200,6 @@ public:
virtual rpc_address get_meta_server_address() const { return _failure_detector->get_servers(); }
rpc_address primary_address() const { return _primary_address; }
- std::string get_replica_dir(const char *app_type, gpid id, bool create_new = true);
-
- // during partition split, we should gurantee child replica and parent replica share the
- // same data dir
- std::string get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir);
-
//
// helper methods
//
@@ -341,7 +333,7 @@ private:
const std::shared_ptr<group_check_request> &req,
const std::shared_ptr<configuration_update_request> &req2);
// Create a new replica according to the parameters.
- // 'parent_dir' is used in partition split for get_child_dir().
+ // 'parent_dir' is used in partition split for create_child_replica_dir().
replica *new_replica(gpid gpid,
const app_info &app,
bool restore_if_necessary,
@@ -427,6 +419,7 @@ private:
friend class replica_follower;
friend class replica_follower_test;
friend class replica_http_service_test;
+ FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
FRIEND_TEST(replica_test, test_clear_on_failure);
FRIEND_TEST(replica_test, test_auto_trash);
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index cece7c82e..965b81ec6 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -231,23 +231,27 @@ private:
};
typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;
-inline std::unique_ptr<mock_replica> create_mock_replica(replica_stub *stub,
- int appid = 1,
- int partition_index = 1,
- const char *dir = "./")
+inline std::unique_ptr<mock_replica>
+create_mock_replica(replica_stub *stub, int appid = 1, int partition_index = 1)
{
gpid gpid(appid, partition_index);
app_info app_info;
app_info.app_type = "replica";
app_info.app_name = "temp";
- return std::make_unique<mock_replica>(stub, gpid, app_info, dir);
+ const auto *const dn =
+ stub->get_fs_manager()->create_replica_dir_if_necessary(app_info.app_type, gpid);
+ CHECK_NOTNULL(dn, "");
+ const auto replica_path = dn->replica_dir(app_info.app_type, gpid);
+ CHECK(
+ dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path);
+ return std::make_unique<mock_replica>(stub, gpid, app_info, replica_path.c_str());
}
class mock_replica_stub : public replica_stub
{
public:
- mock_replica_stub() = default;
+ mock_replica_stub() { _fs_manager.initialize({"./"}, {"tag"}); }
~mock_replica_stub() override = default;
@@ -313,9 +317,10 @@ public:
config.pid = pid;
config.status = status;
- // TODO(yingchun): should refactor to move to cstor or initializer.
- initialize_fs_manager({"./"}, {"tag"});
- std::string dir = get_replica_dir("test", pid);
+ auto dn = _fs_manager.create_replica_dir_if_necessary(info.app_type, pid);
+ CHECK_NOTNULL(dn, "");
+ const auto &dir = dn->replica_dir(info.app_type, pid);
+ CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
auto *rep =
new mock_replica(this, pid, info, dir.c_str(), need_restore, is_duplication_follower);
rep->set_replica_config(config);
diff --git a/src/replica/test/mutation_log_test.cpp b/src/replica/test/mutation_log_test.cpp
index 2b7ce0bab..6bca13336 100644
--- a/src/replica/test/mutation_log_test.cpp
+++ b/src/replica/test/mutation_log_test.cpp
@@ -297,7 +297,6 @@ public:
{
utils::filesystem::remove_path(_log_dir);
utils::filesystem::create_directory(_log_dir);
-
utils::filesystem::remove_path(_log_dir + ".test");
}
diff --git a/src/replica/test/open_replica_test.cpp b/src/replica/test/open_replica_test.cpp
index 89e080286..b4cb088f6 100644
--- a/src/replica/test/open_replica_test.cpp
+++ b/src/replica/test/open_replica_test.cpp
@@ -20,13 +20,10 @@
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <stdint.h>
-#include <algorithm>
#include <memory>
#include <string>
#include <unordered_map>
-#include <vector>
-#include "common/fs_manager.h"
#include "common/gpid.h"
#include "common/replication_other_types.h"
#include "dsn.layer2_types.h"
@@ -47,68 +44,55 @@ class open_replica_test : public replica_test_base
public:
open_replica_test() = default;
~open_replica_test() { dsn::utils::filesystem::remove_path("./tmp_dir"); }
+};
- void test_open_replica()
- {
- app_info app_info;
- app_info.app_type = "replica";
- app_info.is_stateful = true;
- app_info.max_replica_count = 3;
- app_info.partition_count = 8;
- app_info.app_id = 1;
-
- struct test_data
- {
- ballot b;
- decree last_committed_decree;
- bool is_in_dir_nodes;
- bool exec_failed;
- } tests[] = {
- {0, 0, true, true}, {0, 0, false, false}, {5, 5, true, true}, {5, 5, false, true},
- };
- int i = 0;
- for (auto tt : tests) {
- gpid gpid(app_info.app_id, i);
- stub->_opening_replicas[gpid] = task_ptr(nullptr);
+TEST_F(open_replica_test, open_replica_add_decree_and_ballot_check)
+{
+ app_info ai;
+ ai.app_type = "replica";
+ ai.is_stateful = true;
+ ai.max_replica_count = 3;
+ ai.partition_count = 8;
+ ai.app_id = 11;
- dsn::rpc_address node;
- node.assign_ipv4("127.0.0.11", static_cast<uint16_t>(12321 + i + 1));
+ struct test_data
+ {
+ ballot b;
+ decree last_committed_decree;
+ bool expect_crash;
+ } tests[] = {{0, 0, false}, {5, 5, true}};
+ int i = 0;
+ for (auto test : tests) {
+ gpid pid(ai.app_id, i);
+ stub->_opening_replicas[pid] = task_ptr(nullptr);
- if (!tt.is_in_dir_nodes) {
- dir_node *node_disk = new dir_node("tag_" + std::to_string(i), "tmp_dir");
- stub->_fs_manager._dir_nodes.emplace_back(node_disk);
- stub->_fs_manager._available_data_dirs.emplace_back("tmp_dir");
- }
+ dsn::rpc_address node;
+ node.assign_ipv4("127.0.0.11", static_cast<uint16_t>(12321 + i + 1));
- _replica->register_service();
- mock_mutation_log_shared_ptr shared_log_mock =
- new mock_mutation_log_shared("./tmp_dir");
- stub->set_log(shared_log_mock);
- partition_configuration config;
- config.pid = gpid;
- config.ballot = tt.b;
- config.last_committed_decree = tt.last_committed_decree;
- std::shared_ptr<app_state> _the_app = app_state::create(app_info);
+ _replica->register_service();
+ mock_mutation_log_shared_ptr shared_log_mock = new mock_mutation_log_shared("./tmp_dir");
+ stub->set_log(shared_log_mock);
- configuration_update_request fake_request;
- fake_request.info = *_the_app;
- fake_request.config = config;
- fake_request.type = config_type::CT_ASSIGN_PRIMARY;
- fake_request.node = node;
+ partition_configuration config;
+ config.pid = pid;
+ config.ballot = test.b;
+ config.last_committed_decree = test.last_committed_decree;
+ auto as = app_state::create(ai);
- std::shared_ptr<configuration_update_request> req2(new configuration_update_request);
- *req2 = fake_request;
- if (tt.exec_failed) {
- ASSERT_DEATH(stub->open_replica(app_info, gpid, nullptr, req2), "");
- } else {
- stub->open_replica(app_info, gpid, nullptr, req2);
- }
- ++i;
+ auto req = std::make_shared<configuration_update_request>();
+ req->info = *as;
+ req->config = config;
+ req->type = config_type::CT_ASSIGN_PRIMARY;
+ req->node = node;
+ if (test.expect_crash) {
+ ASSERT_DEATH(stub->open_replica(ai, pid, nullptr, req), "");
+ } else {
+ stub->open_replica(ai, pid, nullptr, req);
}
+ // Both of the tests will fail, the replica is not exist in the stub.
+ ASSERT_EQ(nullptr, stub->get_replica(pid));
+ ++i;
}
-};
-
-TEST_F(open_replica_test, open_replica_add_decree_and_ballot_check) { test_open_replica(); }
-
+}
} // namespace replication
} // namespace dsn
diff --git a/src/replica/test/replica_disk_migrate_test.cpp b/src/replica/test/replica_disk_migrate_test.cpp
index 64ac2da7f..7c47abad2 100644
--- a/src/replica/test/replica_disk_migrate_test.cpp
+++ b/src/replica/test/replica_disk_migrate_test.cpp
@@ -18,9 +18,11 @@
*/
#include <fmt/core.h>
+#include <fmt/ostream.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
+#include <iosfwd>
#include <map>
#include <memory>
#include <set>
@@ -347,7 +349,7 @@ TEST_F(replica_disk_migrate_test, disk_migrate_replica_update)
ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaNewDir));
utils::filesystem::remove_path(fmt::format("./{}/", request.origin_disk));
utils::filesystem::remove_path(fmt::format("./{}/", request.target_disk));
- for (const auto &node_disk : get_dir_nodes()) {
+ for (const auto &node_disk : stub->get_fs_manager()->get_dir_nodes()) {
if (node_disk->tag == request.origin_disk) {
auto gpids = node_disk->holding_replicas[app_info_1.app_id];
ASSERT_TRUE(gpids.find(request.pid) == gpids.end());
@@ -362,31 +364,42 @@ TEST_F(replica_disk_migrate_test, disk_migrate_replica_update)
}
}
+// Test load from new replica dir failed, then fall back to load from origin dir succeed,
+// and then mark the "new" replica dir as ".gar".
TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
{
+ gpid test_pid(app_info_1.app_id, 4);
+
+ // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
auto &request = *fake_migrate_rpc.mutable_request();
- request.pid = dsn::gpid(app_info_1.app_id, 4);
+ request.pid = test_pid;
request.origin_disk = "tag_2";
request.target_disk = "tag_empty_1";
- remove_mock_dir_node(request.origin_disk);
- const std::string kReplicaOriginSuffixDir = fmt::format(
- "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, request.pid.to_string());
- const std::string kReplicaNewDir =
- fmt::format("./{}/{}.replica/", request.target_disk, request.pid.to_string());
+ // Remove the gpid 1.4 dir which is created in constructor.
+ const auto kReplicaOriginDir = fmt::format("./{}/{}.replica", request.origin_disk, request.pid);
+ utils::filesystem::remove_path(kReplicaOriginDir);
+ stub->get_fs_manager()->remove_replica(test_pid);
+
+ // Create the related dirs.
+ const auto kReplicaOriginSuffixDir =
+ fmt::format("./{}/{}.replica.disk.migrate.ori/", request.origin_disk, request.pid);
+ const auto kReplicaNewDir = fmt::format("./{}/{}.replica/", request.target_disk, request.pid);
utils::filesystem::create_directory(kReplicaOriginSuffixDir);
utils::filesystem::create_directory(kReplicaNewDir);
+ // The replica can be opened nomally. In fact, the original dir is opened, and the new dir will
+ // be garbage.
fail::cfg("mock_replica_load", "return()");
- const std::string kReplicaOriginDir =
- fmt::format("./{}/{}.replica", request.origin_disk, request.pid.to_string());
- const std::string kReplicaGarDir =
- fmt::format("./{}/{}.replica.gar", request.target_disk, request.pid.to_string());
open_replica(app_info_1, request.pid);
+ // Check it works as expected.
+ const auto kReplicaGarDir =
+ fmt::format("./{}/{}.replica.gar", request.target_disk, request.pid);
ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaOriginDir));
ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaGarDir));
+ // Clean up.
utils::filesystem::remove_path(kReplicaOriginDir);
utils::filesystem::remove_path(kReplicaGarDir);
}
diff --git a/src/replica/test/replica_disk_test.cpp b/src/replica/test/replica_disk_test.cpp
index 934a5c749..77880952c 100644
--- a/src/replica/test/replica_disk_test.cpp
+++ b/src/replica/test/replica_disk_test.cpp
@@ -215,9 +215,8 @@ TEST_F(replica_disk_test, gc_disk_useless_dir)
sleep(5);
- std::vector<std::string> data_dirs{"./"};
disk_cleaning_report report{};
- dsn::replication::disk_remove_useless_dirs(data_dirs, report);
+ dsn::replication::disk_remove_useless_dirs({std::make_shared<dir_node>("test", "./")}, report);
for (const auto &test : tests) {
if (!dsn::replication::is_data_dir_removable(test)) {
@@ -246,7 +245,7 @@ TEST_F(replica_disk_test, disk_status_test)
{disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT},
{disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}};
for (const auto &test : tests) {
- auto node = get_dir_nodes()[node_index];
+ auto node = stub->get_fs_manager()->get_dir_nodes()[node_index];
mock_node_status(node_index, test.old_status, test.new_status);
update_disks_status();
for (auto &kv : node->holding_replicas) {
@@ -260,24 +259,6 @@ TEST_F(replica_disk_test, disk_status_test)
mock_node_status(node_index, disk_status::NORMAL, disk_status::NORMAL);
}
-TEST_F(replica_disk_test, broken_disk_test)
-{
- // Test cases:
- // create: true, check_rw: true
- // create: true, check_rw: false
- // create: false
- struct broken_disk_test
- {
- std::string mock_create_dir;
- std::string mock_rw_flag;
- int32_t data_dir_size;
- } tests[]{{"true", "true", 3}, {"true", "false", 2}, {"false", "false", 2}};
- for (const auto &test : tests) {
- ASSERT_EQ(test.data_dir_size,
- ignore_broken_disk_test(test.mock_create_dir, test.mock_rw_flag));
- }
-}
-
TEST_F(replica_disk_test, add_new_disk_test)
{
// Test case:
diff --git a/src/replica/test/replica_disk_test_base.h b/src/replica/test/replica_disk_test_base.h
index db41cd509..2da9d9276 100644
--- a/src/replica/test/replica_disk_test_base.h
+++ b/src/replica/test/replica_disk_test_base.h
@@ -56,6 +56,8 @@ public:
fail::cfg("update_disk_stat", "return()");
generate_mock_app_info();
+ stub->_fs_manager._dir_nodes.clear();
+ stub->_fs_manager.reset_disk_stat();
generate_mock_dir_nodes(dir_nodes_count);
generate_mock_empty_dir_node(empty_dir_nodes_count);
@@ -73,8 +75,6 @@ public:
void update_disks_status() { stub->update_disks_status(); }
- std::vector<std::shared_ptr<dir_node>> get_dir_nodes() { return stub->_fs_manager._dir_nodes; }
-
void generate_mock_dir_node(const app_info &app,
const gpid pid,
const std::string &tag,
@@ -83,7 +83,6 @@ public:
dir_node *node_disk = new dir_node(tag, full_dir);
node_disk->holding_replicas[app.app_id].emplace(pid);
stub->_fs_manager._dir_nodes.emplace_back(node_disk);
- stub->_fs_manager._available_data_dirs.emplace_back(full_dir);
}
void remove_mock_dir_node(const std::string &tag)
@@ -101,7 +100,7 @@ public:
void
mock_node_status(int32_t node_index, disk_status::type old_status, disk_status::type new_status)
{
- auto node = get_dir_nodes()[node_index];
+ auto node = stub->_fs_manager.get_dir_nodes()[node_index];
for (const auto &kv : node->holding_replicas) {
for (const auto &pid : kv.second) {
update_replica_disk_status(pid, old_status);
@@ -124,21 +123,6 @@ public:
return ERR_OK;
}
- int32_t ignore_broken_disk_test(const std::string &mock_create_directory,
- const std::string &mock_check_rw)
- {
- std::vector<std::string> data_dirs = {"disk1", "disk2", "disk3"};
- std::vector<std::string> data_dir_tags = {"tag1", "tag2", "tag3"};
- auto test_stub = std::make_unique<mock_replica_stub>();
- fail::cfg("filesystem_create_directory", "return(" + mock_create_directory + ")");
- fail::cfg("filesystem_check_dir_rw", "return(" + mock_check_rw + ")");
- fail::cfg("update_disk_stat", "return()");
- test_stub->initialize_fs_manager(data_dirs, data_dir_tags);
- int32_t dir_size = test_stub->_fs_manager.get_available_data_dirs().size();
- test_stub.reset();
- return dir_size;
- }
-
void prepare_before_add_new_disk_test(const std::string &create_dir,
const std::string &check_rw)
{
@@ -153,7 +137,6 @@ public:
void reset_after_add_new_disk_test()
{
stub->_fs_manager._dir_nodes.clear();
- stub->_fs_manager._available_data_dirs.clear();
dsn::utils::filesystem::remove_path("add_new_not_empty_disk");
}
@@ -193,7 +176,6 @@ private:
dir_node *node_disk =
new dir_node(fmt::format("tag_empty_{}", num), fmt::format("./tag_empty_{}", num));
stub->_fs_manager._dir_nodes.emplace_back(node_disk);
- stub->_fs_manager._available_data_dirs.emplace_back(node_disk->full_dir);
utils::filesystem::create_directory(node_disk->full_dir);
num--;
}
@@ -239,7 +221,6 @@ private:
}
stub->_fs_manager._dir_nodes.emplace_back(node_disk);
- stub->_fs_manager._available_data_dirs.emplace_back(node_disk->full_dir);
}
}
diff --git a/src/replica/test/replica_test_base.h b/src/replica/test/replica_test_base.h
index 94260dbce..3aed53c80 100644
--- a/src/replica/test/replica_test_base.h
+++ b/src/replica/test/replica_test_base.h
@@ -53,9 +53,14 @@ class replica_test_base : public replica_stub_test_base
{
public:
std::unique_ptr<mock_replica> _replica;
- const std::string _log_dir{"./test-log"};
+ // TODO(yingchun): rename to _replica_dir, and consider to remove it totally.
+ std::string _log_dir;
- replica_test_base() { _replica = create_mock_replica(stub.get(), 1, 1, _log_dir.c_str()); }
+ replica_test_base()
+ {
+ _replica = create_mock_replica(stub.get(), 1, 1);
+ _log_dir = _replica->dir();
+ }
virtual mutation_ptr create_test_mutation(int64_t decree, const std::string &data)
{
diff --git a/src/server/test/pegasus_server_test_base.h b/src/server/test/pegasus_server_test_base.h
index 5ebe2496e..a5a530e41 100644
--- a/src/server/test/pegasus_server_test_base.h
+++ b/src/server/test/pegasus_server_test_base.h
@@ -46,6 +46,7 @@ public:
// Remove rdb to prevent rocksdb recovery from last test.
dsn::utils::filesystem::remove_path("./data/rdb");
_replica_stub = new dsn::replication::replica_stub();
+ _replica_stub->get_fs_manager()->initialize({"./"}, {"test_tag"});
_gpid = dsn::gpid(100, 1);
dsn::app_info app_info;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org