You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/05/25 06:52:20 UTC

[incubator-pegasus] branch master updated: refactor: improve the single-responsibility of class fs_manager (#1477)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d5f225967 refactor: improve the single-responsibility of class fs_manager (#1477)
d5f225967 is described below

commit d5f225967aa4575a4670f48dc1bd7dcd5d07ea9e
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Thu May 25 14:52:13 2023 +0800

    refactor: improve the single-responsibility of class fs_manager (#1477)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    This patch moves some functions to fs_manager which are more reasonable to be
    responsibilities of class fs_manager rather than those of class replica_stub.
---
 src/common/fs_manager.cpp                          | 194 ++++++++++++++++-----
 src/common/fs_manager.h                            |  34 +++-
 src/common/test/fs_manager_test.cpp                | 146 +++++++++++++++-
 src/meta/test/misc/misc.cpp                        |  20 ++-
 src/replica/disk_cleaner.cpp                       |   9 +-
 src/replica/disk_cleaner.h                         |   5 +-
 .../test/load_from_private_log_test.cpp            |  19 +-
 src/replica/replica_stub.cpp                       | 183 +++++++------------
 src/replica/replica_stub.h                         |  11 +-
 src/replica/test/mock_utils.h                      |  23 ++-
 src/replica/test/mutation_log_test.cpp             |   1 -
 src/replica/test/open_replica_test.cpp             | 100 +++++------
 src/replica/test/replica_disk_migrate_test.cpp     |  35 ++--
 src/replica/test/replica_disk_test.cpp             |  23 +--
 src/replica/test/replica_disk_test_base.h          |  25 +--
 src/replica/test/replica_test_base.h               |   9 +-
 src/server/test/pegasus_server_test_base.h         |   1 +
 17 files changed, 507 insertions(+), 331 deletions(-)

diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 35a801053..6acdd38ee 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -34,13 +34,15 @@
 
 #include "fs_manager.h"
 
-#include <stdio.h>
 #include <algorithm>
 #include <cmath>
+#include <iosfwd>
 #include <utility>
 
 #include "common/gpid.h"
 #include "common/replication_enums.h"
+#include "fmt/core.h"
+#include "fmt/ostream.h"
 #include "perf_counter/perf_counter.h"
 #include "runtime/api_layer1.h"
 #include "runtime/rpc/rpc_address.h"
@@ -62,6 +64,11 @@ DSN_DEFINE_int32(replication,
                  "space insufficient");
 DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);
 
+DSN_DEFINE_bool(replication,
+                ignore_broken_disk,
+                true,
+                "true means ignore broken data disk when initialize");
+
 uint64_t dir_node::replicas_count() const
 {
     uint64_t sum = 0;
@@ -80,6 +87,11 @@ uint64_t dir_node::replicas_count(app_id id) const
     return iter->second.size();
 }
 
+std::string dir_node::replica_dir(dsn::string_view app_type, const dsn::gpid &pid) const
+{
+    return utils::filesystem::path_combine(full_dir, fmt::format("{}.{}", pid, app_type));
+}
+
 bool dir_node::has(const gpid &pid) const
 {
     auto iter = holding_replicas.find(pid.get_app_id());
@@ -187,15 +199,44 @@ void fs_manager::initialize(const std::vector<std::string> &data_dirs,
                             const std::vector<std::string> &data_dir_tags)
 {
     CHECK_EQ(data_dirs.size(), data_dir_tags.size());
-    for (unsigned i = 0; i < data_dirs.size(); ++i) {
+
+    // Skip the data directories which are broken.
+    std::vector<std::shared_ptr<dir_node>> dir_nodes;
+    for (auto i = 0; i < data_dir_tags.size(); ++i) {
+        const auto &dir_tag = data_dir_tags[i];
+        const auto &dir = data_dirs[i];
+
+        // Check the status of this directory.
+        std::string cdir;
+        std::string err_msg;
+        if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
+                         !utils::filesystem::check_dir_rw(dir, err_msg))) {
+            if (FLAGS_ignore_broken_disk) {
+                LOG_ERROR("data dir({}) is broken, ignore it, error: {}", dir, err_msg);
+            } else {
+                CHECK(false, err_msg);
+            }
+            // TODO(yingchun): Remove the 'continue' and mark its io error status, regardless
+            //  the status of the disks, add all disks.
+            continue;
+        }
+
+        // Normalize the data directories.
         std::string norm_path;
-        utils::filesystem::get_normalized_path(data_dirs[i], norm_path);
-        dir_node *n = new dir_node(data_dir_tags[i], norm_path);
-        _dir_nodes.emplace_back(n);
-        LOG_INFO(
-            "{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, data_dir_tags[i]);
+        utils::filesystem::get_normalized_path(cdir, norm_path);
+
+        // Create and add this dir_node.
+        auto dn = std::make_shared<dir_node>(dir_tag, norm_path);
+        dir_nodes.emplace_back(dn);
+        LOG_INFO("mark data dir({}) as tag({})", norm_path, dir_tag);
+    }
+    CHECK_FALSE(dir_nodes.empty());
+
+    // Update the memory state.
+    {
+        zauto_read_lock l(_lock);
+        _dir_nodes.swap(dir_nodes);
     }
-    _available_data_dirs = data_dirs;
 
     // Update the disk statistics.
     update_disk_stat();
@@ -236,44 +277,40 @@ void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
     LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
 }
 
-void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/ std::string &dir)
+dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
 {
-    char buffer[256];
-    sprintf(buffer, "%d.%d.%s", pid.get_app_id(), pid.get_partition_index(), type.c_str());
-
-    zauto_write_lock l(_lock);
-
     dir_node *selected = nullptr;
-
-    unsigned least_app_replicas_count = 0;
-    unsigned least_total_replicas_count = 0;
-
-    for (auto &n : _dir_nodes) {
-        CHECK(!n->has(pid), "gpid({}) already in dir_node({})", pid, n->tag);
-        unsigned app_replicas = n->replicas_count(pid.get_app_id());
-        unsigned total_replicas = n->replicas_count();
-
-        if (selected == nullptr || least_app_replicas_count > app_replicas) {
-            least_app_replicas_count = app_replicas;
-            least_total_replicas_count = total_replicas;
-            selected = n.get();
-        } else if (least_app_replicas_count == app_replicas &&
-                   least_total_replicas_count > total_replicas) {
-            least_total_replicas_count = total_replicas;
-            selected = n.get();
+    uint64_t least_app_replicas_count = 0;
+    uint64_t least_total_replicas_count = 0;
+    {
+        zauto_write_lock l(_lock);
+        // Try to find the dir_node with the least replica count.
+        for (const auto &dn : _dir_nodes) {
+            CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
+            uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id());
+            uint64_t total_replicas_count = dn->replicas_count();
+
+            if (selected == nullptr || least_app_replicas_count > app_replicas_count) {
+                least_app_replicas_count = app_replicas_count;
+                least_total_replicas_count = total_replicas_count;
+                selected = dn.get();
+            } else if (least_app_replicas_count == app_replicas_count &&
+                       least_total_replicas_count > total_replicas_count) {
+                least_total_replicas_count = total_replicas_count;
+                selected = dn.get();
+            }
         }
     }
-
-    LOG_INFO(
-        "{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
-        dsn_primary_address(),
-        pid,
-        selected->tag,
-        least_app_replicas_count,
-        least_total_replicas_count);
-
-    selected->holding_replicas[pid.get_app_id()].emplace(pid);
-    dir = utils::filesystem::path_combine(selected->full_dir, buffer);
+    if (selected != nullptr) {
+        LOG_INFO(
+            "{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
+            dsn_primary_address(),
+            pid,
+            selected->tag,
+            least_app_replicas_count,
+            least_total_replicas_count);
+    }
+    return selected;
 }
 
 void fs_manager::remove_replica(const gpid &pid)
@@ -342,7 +379,6 @@ void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string
     utils::filesystem::get_normalized_path(data_dir, norm_path);
     dir_node *n = new dir_node(tag, norm_path);
     _dir_nodes.emplace_back(n);
-    _available_data_dirs.emplace_back(data_dir);
     LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);
 }
 
@@ -359,5 +395,77 @@ bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::s
     return false;
 }
 
+dir_node *fs_manager::find_replica_dir(dsn::string_view app_type, gpid pid)
+{
+    std::string replica_dir;
+    dir_node *replica_dn = nullptr;
+    {
+        zauto_read_lock l(_lock);
+        for (const auto &dn : _dir_nodes) {
+            const auto dir = dn->replica_dir(app_type, pid);
+            if (utils::filesystem::directory_exists(dir)) {
+                // Check if there are duplicate replica instance directories.
+                CHECK(replica_dir.empty(), "replica dir conflict: {} <--> {}", dir, replica_dir);
+                replica_dir = dir;
+                replica_dn = dn.get();
+            }
+        }
+    }
+
+    return replica_dn;
+}
+
+dir_node *fs_manager::create_replica_dir_if_necessary(dsn::string_view app_type, gpid pid)
+{
+    // Try to find the replica directory.
+    auto replica_dn = find_replica_dir(app_type, pid);
+    if (replica_dn != nullptr) {
+        return replica_dn;
+    }
+
+    // Find a dir_node for the new replica.
+    replica_dn = find_best_dir_for_new_replica(pid);
+    if (replica_dn == nullptr) {
+        return nullptr;
+    }
+
+    const auto dir = replica_dn->replica_dir(app_type, pid);
+    if (!dsn::utils::filesystem::create_directory(dir)) {
+        LOG_ERROR("create replica directory({}) failed", dir);
+        return nullptr;
+    }
+
+    replica_dn->holding_replicas[pid.get_app_id()].emplace(pid);
+    return replica_dn;
+}
+
+dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type,
+                                               gpid child_pid,
+                                               const std::string &parent_dir)
+{
+    dir_node *child_dn = nullptr;
+    std::string child_dir;
+    {
+        zauto_read_lock l(_lock);
+        for (const auto &dn : _dir_nodes) {
+            child_dir = dn->replica_dir(app_type, child_pid);
+            // <parent_dir> = <prefix>/<gpid>.<app_type>
+            // check if <parent_dir>'s <prefix> is equal to <data_dir>
+            // TODO(yingchun): use a function instead.
+            if (parent_dir.substr(0, dn->full_dir.size() + 1) == dn->full_dir + "/") {
+                child_dn = dn.get();
+                break;
+            }
+        }
+    }
+    CHECK_NOTNULL(child_dn, "can not find parent_dir {} in data_dirs", parent_dir);
+    if (!dsn::utils::filesystem::create_directory(child_dir)) {
+        LOG_ERROR("create child replica directory({}) failed", child_dir);
+        return nullptr;
+    }
+    add_replica(child_pid, child_dir);
+    return child_dn;
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index ebdb3640a..0518897a7 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -31,6 +31,7 @@
 #include "perf_counter/perf_counter_wrapper.h"
 #include "utils/error_code.h"
 #include "utils/flags.h"
+#include "utils/string_view.h"
 #include "utils/zlocks.h"
 
 namespace dsn {
@@ -72,6 +73,9 @@ public:
     // and protected by the lock in fs_manager.
     uint64_t replicas_count(app_id id) const;
     uint64_t replicas_count() const;
+    // Construct the replica dir for the given 'app_type' and 'pid'.
+    // NOTE: Just construct the string, the directory will not be created.
+    std::string replica_dir(dsn::string_view app_type, const dsn::gpid &pid) const;
     bool has(const dsn::gpid &pid) const;
     uint64_t remove(const dsn::gpid &pid);
     bool update_disk_stat(const bool update_disk_status);
@@ -86,23 +90,36 @@ public:
     // NOTE: 'data_dirs' and 'data_dir_tags' must have the same size and in the same order.
     void initialize(const std::vector<std::string> &data_dirs,
                     const std::vector<std::string> &data_dir_tags);
-
+    // Try to find the best dir_node to place the new replica. The less replica count the
+    // dir_node has, which means the load is lower generally, the higher opportunity it
+    // will be selected.
+    // TODO(yingchun): consider the disk capacity and available space.
+    // NOTE: the 'pid' must not exist in any dir_nodes.
+    dir_node *find_best_dir_for_new_replica(const dsn::gpid &pid) const;
     dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
-    void allocate_dir(const dsn::gpid &pid,
-                      const std::string &type,
-                      /*out*/ std::string &dir);
     void add_replica(const dsn::gpid &pid, const std::string &pid_dir);
+    // Find the replica instance directory.
+    dir_node *find_replica_dir(dsn::string_view app_type, gpid pid);
+    // Similar to the above, but it will create a new directory if not found.
+    dir_node *create_replica_dir_if_necessary(dsn::string_view app_type, gpid pid);
+    // Similar to the above, and will create a directory for the child on the same dir_node
+    // of parent.
+    // During partition split, we should guarantee child replica and parent replica share the
+    // same data dir.
+    dir_node *create_child_replica_dir(dsn::string_view app_type,
+                                       gpid child_pid,
+                                       const std::string &parent_dir);
     void remove_replica(const dsn::gpid &pid);
     bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
     void update_disk_stat(bool check_status_changed = true);
 
     void add_new_dir_node(const std::string &data_dir, const std::string &tag);
-    bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
-    const std::vector<std::string> &get_available_data_dirs() const
+    const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
     {
         zauto_read_lock l(_lock);
-        return _available_data_dirs;
+        return _dir_nodes;
     }
+    bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
 
 private:
     void reset_disk_stat()
@@ -128,7 +145,6 @@ private:
     int _max_available_ratio = 0;
 
     std::vector<std::shared_ptr<dir_node>> _dir_nodes;
-    std::vector<std::string> _available_data_dirs;
 
     // Used for disk available space check
     // disk status will be updated periodically, this vector record nodes whose disk_status changed
@@ -147,7 +163,9 @@ private:
     friend class replica_disk_migrator;
     friend class replica_disk_test_base;
     friend class open_replica_test;
+    FRIEND_TEST(fs_manager, find_best_dir_for_new_replica);
     FRIEND_TEST(fs_manager, get_dir_node);
+    FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
     FRIEND_TEST(replica_test, test_auto_trash);
 };
 } // replication
diff --git a/src/common/test/fs_manager_test.cpp b/src/common/test/fs_manager_test.cpp
index 2d1f982f8..a1733741d 100644
--- a/src/common/test/fs_manager_test.cpp
+++ b/src/common/test/fs_manager_test.cpp
@@ -17,19 +17,57 @@
  * under the License.
  */
 
+// IWYU pragma: no_include <ext/alloc_traits.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
+#include <stdint.h>
+#include <map>
 #include <memory>
+#include <ostream>
+#include <set>
 #include <string>
+#include <vector>
 
 #include "common/fs_manager.h"
+#include "common/gpid.h"
+#include "common/replication_other_types.h"
 #include "metadata_types.h"
 #include "utils/fail_point.h"
+#include "utils/filesystem.h"
+
+using namespace dsn::utils::filesystem;
 
 namespace dsn {
 namespace replication {
 
+TEST(dir_node, replica_dir)
+{
+    dir_node dn("tag", "path");
+    ASSERT_EQ("path/1.0.test", dn.replica_dir("test", gpid(1, 0)));
+}
+
+TEST(fs_manager, initialize)
+{
+    fail::setup();
+    struct broken_disk_test
+    {
+        std::string create_dir_ok;
+        std::string check_dir_rw_ok;
+        int32_t data_dir_size;
+    } tests[]{{"true", "true", 3}, {"true", "false", 2}, {"false", "false", 2}};
+    int i = 0;
+    for (const auto &test : tests) {
+        fail::cfg("filesystem_create_directory", "return(" + test.create_dir_ok + ")");
+        fail::cfg("filesystem_check_dir_rw", "return(" + test.check_dir_rw_ok + ")");
+        fs_manager fm;
+        fm.initialize({"disk1", "disk2", "disk3"}, {"tag1", "tag2", "tag3"});
+        ASSERT_EQ(test.data_dir_size, fm.get_dir_nodes().size()) << i;
+        i++;
+    }
+    fail::teardown();
+}
+
 TEST(fs_manager, dir_update_disk_status)
 {
     std::shared_ptr<dir_node> node = std::make_shared<dir_node>("tag", "path");
@@ -64,19 +102,113 @@ TEST(fs_manager, dir_update_disk_status)
 TEST(fs_manager, get_dir_node)
 {
     fs_manager fm;
-    fm.initialize({"/data1"}, {"data1"});
+    fm.initialize({"./data1"}, {"data1"});
+    const auto &dns = fm.get_dir_nodes();
+    ASSERT_EQ(1, dns.size());
+    const auto &base_dir =
+        dns[0]->full_dir.substr(0, dns[0]->full_dir.size() - std::string("/data1").size());
 
     ASSERT_EQ(nullptr, fm.get_dir_node(""));
     ASSERT_EQ(nullptr, fm.get_dir_node("/"));
 
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
-    ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/"));
+    ASSERT_NE(nullptr, fm.get_dir_node(base_dir + "/data1/replica1"));
+
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/"));
+    ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/replica1"));
+}
+
+TEST(fs_manager, find_replica_dir)
+{
+    fs_manager fm;
+    fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", "data3"});
+
+    const char *app_type = "find_replica_dir";
+    gpid test_pid(1, 0);
+
+    // Clear up the remaining directories if exist.
+    for (const auto &dn : fm.get_dir_nodes()) {
+        remove_path(dn->replica_dir(app_type, test_pid));
+    }
+
+    ASSERT_EQ(nullptr, fm.find_replica_dir(app_type, test_pid));
+    auto dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    const auto dir = dn->replica_dir(app_type, test_pid);
+    ASSERT_TRUE(directory_exists(dir));
+    auto dn1 = fm.find_replica_dir(app_type, test_pid);
+    ASSERT_EQ(dn, dn1);
+}
+
+TEST(fs_manager, create_replica_dir_if_necessary)
+{
+    fs_manager fm;
+
+    const char *app_type = "create_replica_dir_if_necessary";
+    gpid test_pid(1, 0);
 
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2"));
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2/"));
-    ASSERT_EQ(nullptr, fm.get_dir_node("/data2/replica1"));
+    // Could not find a valid dir_node.
+    ASSERT_EQ(nullptr, fm.create_replica_dir_if_necessary(app_type, test_pid));
+
+    // It's able to create a dir for the replica after a valid dir_node has been added.
+    fm.add_new_dir_node("./data1", "data1");
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    ASSERT_EQ("data1", dn->tag);
 }
 
+TEST(fs_manager, create_child_replica_dir)
+{
+    fs_manager fm;
+    fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", "data3"});
+
+    const char *app_type = "create_child_replica_dir";
+    gpid test_pid(1, 0);
+    gpid test_child_pid(1, 0);
+
+    dir_node *dn = fm.create_replica_dir_if_necessary(app_type, test_pid);
+    ASSERT_NE(nullptr, dn);
+    const auto dir = dn->replica_dir(app_type, test_pid);
+
+    auto child_dn = fm.create_child_replica_dir(app_type, test_child_pid, dir);
+    ASSERT_EQ(dn, child_dn);
+    const auto child_dir = child_dn->replica_dir(app_type, test_child_pid);
+    ASSERT_TRUE(directory_exists(child_dir));
+    ASSERT_EQ(dir, child_dir);
+}
+
+TEST(fs_manager, find_best_dir_for_new_replica)
+{
+    // dn1 | 1.0,  1.1 +1.6
+    // dn2 | 1.2,  1.3 +1.7   2.0
+    // dn3 | 1.4  +1.5 +1.7   2.1
+    auto dn1 = std::make_shared<dir_node>("./data1", "data1");
+    dn1->holding_replicas[1] = {gpid(1, 0), gpid(1, 1)};
+    auto dn2 = std::make_shared<dir_node>("./data2", "data2");
+    dn2->holding_replicas[1] = {gpid(1, 2), gpid(1, 3)};
+    dn2->holding_replicas[2] = {gpid(2, 0)};
+    auto dn3 = std::make_shared<dir_node>("./data3", "data3");
+    dn3->holding_replicas[1] = {gpid(1, 4)};
+    dn3->holding_replicas[2] = {gpid(2, 1)};
+    fs_manager fm;
+    fm._dir_nodes = {dn1, dn2, dn3};
+
+    gpid pid_1_5(1, 5);
+    auto dn = fm.find_best_dir_for_new_replica(pid_1_5);
+    ASSERT_EQ(dn3.get(), dn);
+    dn->holding_replicas[pid_1_5.get_app_id()].emplace(pid_1_5);
+
+    gpid pid_1_6(1, 6);
+    dn = fm.find_best_dir_for_new_replica(pid_1_6);
+    ASSERT_EQ(dn1.get(), dn);
+    dn->holding_replicas[pid_1_6.get_app_id()].emplace(pid_1_6);
+
+    gpid pid_1_7(1, 7);
+    dn = fm.find_best_dir_for_new_replica(pid_1_7);
+    ASSERT_TRUE(dn == dn2.get() || dn == dn3.get());
+    dn->holding_replicas[pid_1_7.get_app_id()].emplace(pid_1_7);
+}
 } // namespace replication
 } // namespace dsn
diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp
index aacf8c043..25c16d675 100644
--- a/src/meta/test/misc/misc.cpp
+++ b/src/meta/test/misc/misc.cpp
@@ -34,6 +34,7 @@
 #include <cstdint>
 #include <cstdlib>
 #include <iostream>
+#include <set>
 #include <string>
 #include <thread>
 #include <unordered_map>
@@ -46,7 +47,6 @@
 #include "duplication_types.h"
 #include "meta_admin_types.h"
 #include "metadata_types.h"
-#include "utils/error_code.h"
 #include "utils/fmt_logging.h"
 #include "utils/rand.h"
 
@@ -240,19 +240,21 @@ void track_disk_info_check_and_apply(const dsn::replication::configuration_propo
     std::string dir;
     replica_info ri;
     switch (act.type) {
-    case config_type::CT_ASSIGN_PRIMARY:
-        target_manager->allocate_dir(pid, "test", dir);
-        CHECK_EQ(dsn::ERR_OK, target_manager->get_disk_tag(dir, ri.disk_tag));
+    case config_type::CT_ASSIGN_PRIMARY: {
+        auto selected = target_manager->find_best_dir_for_new_replica(pid);
+        CHECK_NOTNULL(selected, "");
+        selected->holding_replicas[pid.get_app_id()].emplace(pid);
         cc->collect_serving_replica(act.target, ri);
         break;
-
+    }
     case config_type::CT_ADD_SECONDARY:
-    case config_type::CT_ADD_SECONDARY_FOR_LB:
-        node_manager->allocate_dir(pid, "test", dir);
-        CHECK_EQ(dsn::ERR_OK, node_manager->get_disk_tag(dir, ri.disk_tag));
+    case config_type::CT_ADD_SECONDARY_FOR_LB: {
+        auto selected = node_manager->find_best_dir_for_new_replica(pid);
+        CHECK_NOTNULL(selected, "");
+        selected->holding_replicas[pid.get_app_id()].emplace(pid);
         cc->collect_serving_replica(act.node, ri);
         break;
-
+    }
     case config_type::CT_DOWNGRADE_TO_SECONDARY:
     case config_type::CT_UPGRADE_TO_PRIMARY:
         break;
diff --git a/src/replica/disk_cleaner.cpp b/src/replica/disk_cleaner.cpp
index a1128f300..04dd174bb 100644
--- a/src/replica/disk_cleaner.cpp
+++ b/src/replica/disk_cleaner.cpp
@@ -24,6 +24,7 @@
 #include <sys/types.h>
 #include <algorithm>
 
+#include "common/fs_manager.h"
 #include "runtime/api_layer1.h"
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
@@ -67,14 +68,14 @@ const std::string kFolderSuffixBak = ".bak";
 const std::string kFolderSuffixOri = ".ori";
 const std::string kFolderSuffixTmp = ".tmp";
 
-error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
+error_s disk_remove_useless_dirs(const std::vector<std::shared_ptr<dir_node>> &dir_nodes,
                                  /*output*/ disk_cleaning_report &report)
 {
     std::vector<std::string> sub_list;
-    for (auto &dir : data_dirs) {
+    for (const auto &dn : dir_nodes) {
         std::vector<std::string> tmp_list;
-        if (!dsn::utils::filesystem::get_subdirectories(dir, tmp_list, false)) {
-            LOG_WARNING("gc_disk: failed to get subdirectories in {}", dir);
+        if (!dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false)) {
+            LOG_WARNING("gc_disk: failed to get subdirectories in {}", dn->full_dir);
             return error_s::make(ERR_OBJECT_NOT_FOUND, "failed to get subdirectories");
         }
         sub_list.insert(sub_list.end(), tmp_list.begin(), tmp_list.end());
diff --git a/src/replica/disk_cleaner.h b/src/replica/disk_cleaner.h
index 91fdb6805..7961d084c 100644
--- a/src/replica/disk_cleaner.h
+++ b/src/replica/disk_cleaner.h
@@ -18,6 +18,7 @@
  */
 #pragma once
 
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -26,6 +27,8 @@
 
 namespace dsn {
 namespace replication {
+struct dir_node;
+
 DSN_DECLARE_uint64(gc_disk_error_replica_interval_seconds);
 DSN_DECLARE_uint64(gc_disk_garbage_replica_interval_seconds);
 DSN_DECLARE_uint64(gc_disk_migration_tmp_replica_interval_seconds);
@@ -49,7 +52,7 @@ struct disk_cleaning_report
 };
 
 // Removes the useless data from data directories.
-extern error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
+extern error_s disk_remove_useless_dirs(const std::vector<std::shared_ptr<dir_node>> &dir_nodes,
                                         /*output*/ disk_cleaning_report &report);
 
 inline bool is_data_dir_removable(const std::string &dir)
diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp b/src/replica/duplication/test/load_from_private_log_test.cpp
index c3edde95f..a9e7edaed 100644
--- a/src/replica/duplication/test/load_from_private_log_test.cpp
+++ b/src/replica/duplication/test/load_from_private_log_test.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <boost/filesystem/path.hpp>
+#include <boost/system/error_code.hpp>
 #include <fcntl.h>
 #include <fmt/core.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
@@ -319,15 +320,21 @@ TEST_F(load_from_private_log_test, handle_real_private_log)
     };
 
     for (auto tt : tests) {
-        boost::filesystem::path file(tt.fname);
-        boost::filesystem::copy_file(
-            file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists);
-
         // reset replica to specified gpid
         duplicator.reset(nullptr);
-        _replica = create_mock_replica(
-            stub.get(), tt.id.get_app_id(), tt.id.get_partition_index(), _log_dir.c_str());
+        _replica = create_mock_replica(stub.get(), tt.id.get_app_id(), tt.id.get_partition_index());
+
+        // Update '_log_dir' to the corresponding replica created above.
+        _log_dir = _replica->dir();
+
+        // Copy the log file to '_log_dir'
+        boost::filesystem::path file(tt.fname);
+        boost::system::error_code ec;
+        boost::filesystem::copy_file(
+            file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists, ec);
+        ASSERT_TRUE(!ec);
 
+        // Start to verify.
         load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1, 0);
     }
 }
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 77a0d58f6..fa814b66e 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -37,7 +37,6 @@
 // IWYU pragma: no_include <ext/alloc_traits.h>
 #include <fmt/core.h>
 #include <fmt/ostream.h>
-#include <inttypes.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -100,11 +99,6 @@
 
 namespace dsn {
 namespace replication {
-
-DSN_DEFINE_bool(replication,
-                ignore_broken_disk,
-                true,
-                "true means ignore broken data disk when initialize");
 DSN_DEFINE_bool(replication,
                 deny_client_on_start,
                 false,
@@ -603,29 +597,32 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
         }
     }
 
-    // init dirs
+    // Initialize the file system manager.
+    _fs_manager.initialize(_options.data_dirs, _options.data_dir_tags);
+
+    // TODO(yingchun): remove the slog related code.
+    // Create slog directory if it does not exist.
     std::string cdir;
     std::string err_msg;
-    CHECK(
-        dsn::utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), "{}", err_msg);
+    CHECK(utils::filesystem::create_directory(_options.slog_dir, cdir, err_msg), err_msg);
     _options.slog_dir = cdir;
-    initialize_fs_manager(_options.data_dirs, _options.data_dir_tags);
 
+    // Initialize slog.
     _log = new mutation_log_shared(_options.slog_dir,
                                    FLAGS_log_shared_file_size_mb,
                                    FLAGS_log_shared_force_flush,
                                    &_counter_shared_log_recent_write_size);
     LOG_INFO("slog_dir = {}", _options.slog_dir);
 
-    // init rps
+    // Start to load replicas in available data directories.
     LOG_INFO("start to load replicas");
 
     std::vector<std::string> dir_list;
-    for (auto &dir : _fs_manager.get_available_data_dirs()) {
+    for (const auto &dn : _fs_manager.get_dir_nodes()) {
         std::vector<std::string> tmp_list;
-        CHECK(dsn::utils::filesystem::get_subdirectories(dir, tmp_list, false),
+        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false),
               "Fail to get subdirectories in {}.",
-              dir);
+              dn->full_dir);
         dir_list.insert(dir_list.end(), tmp_list.begin(), tmp_list.end());
     }
 
@@ -830,36 +827,6 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
     }
 }
 
-void replica_stub::initialize_fs_manager(const std::vector<std::string> &data_dirs,
-                                         const std::vector<std::string> &data_dir_tags)
-{
-    std::string cdir;
-    std::string err_msg;
-    int count = 0;
-    std::vector<std::string> available_dirs;
-    std::vector<std::string> available_dir_tags;
-    for (auto i = 0; i < data_dir_tags.size(); ++i) {
-        const auto &dir = data_dirs[i];
-        if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
-                         !utils::filesystem::check_dir_rw(dir, err_msg))) {
-            if (FLAGS_ignore_broken_disk) {
-                LOG_WARNING("data dir[{}] is broken, ignore it, error:{}", dir, err_msg);
-            } else {
-                CHECK(false, "{}", err_msg);
-            }
-            continue;
-        }
-        LOG_INFO("data_dirs[{}] = {}", count, cdir);
-        available_dirs.emplace_back(cdir);
-        available_dir_tags.emplace_back(data_dir_tags[i]);
-        count++;
-    }
-
-    CHECK_GT_MSG(
-        available_dirs.size(), 0, "initialize fs manager failed, no available data directory");
-    _fs_manager.initialize(available_dirs, available_dir_tags);
-}
-
 void replica_stub::initialize_start()
 {
     if (_is_running) {
@@ -1124,7 +1091,8 @@ void replica_stub::on_query_disk_info(query_disk_info_rpc rpc)
     int app_id = 0;
     if (!req.app_name.empty()) {
         zauto_read_lock l(_replicas_lock);
-        if (!(app_id = get_app_id_from_replicas(req.app_name))) {
+        app_id = get_app_id_from_replicas(req.app_name);
+        if (app_id == 0) {
             resp.err = ERR_OBJECT_NOT_FOUND;
             return;
         }
@@ -1218,7 +1186,7 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
 
     std::vector<std::string> data_dirs;
     std::vector<std::string> data_dir_tags;
-    std::string err_msg = "";
+    std::string err_msg;
     if (disk_str.empty() ||
         !replication_options::get_data_dir_and_tag(
             disk_str, "", "replica", data_dirs, data_dir_tags, err_msg)) {
@@ -1252,6 +1220,8 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
         }
 
         LOG_INFO("Add a new disk in fs_manager, data_dir={}, tag={}", cdir, data_dir_tags[i]);
+        // TODO(yingchun): there is a gap between _fs_manager.is_dir_node_exist() and
+        // _fs_manager.add_new_dir_node() which is not atomic.
         _fs_manager.add_new_dir_node(cdir, data_dir_tags[i]);
     }
 }
@@ -1768,9 +1738,7 @@ void replica_stub::init_gc_for_test()
 
 void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
 {
-    std::string replica_path;
     std::pair<app_info, replica_info> closed_info;
-
     {
         zauto_write_lock l(_replicas_lock);
         auto iter = _closed_replicas.find(id);
@@ -1778,26 +1746,30 @@ void replica_stub::on_gc_replica(replica_stub_ptr this_, gpid id)
             return;
         closed_info = iter->second;
         _closed_replicas.erase(iter);
-        _fs_manager.remove_replica(id);
     }
+    _fs_manager.remove_replica(id);
 
-    replica_path = get_replica_dir(closed_info.first.app_type.c_str(), id, false);
-    if (replica_path.empty()) {
+    const auto *const dn = _fs_manager.find_replica_dir(closed_info.first.app_type, id);
+    if (dn == nullptr) {
         LOG_WARNING(
             "gc closed replica({}.{}) failed, no exist data", id, closed_info.first.app_type);
         return;
     }
 
+    const auto replica_path = dn->replica_dir(closed_info.first.app_type, id);
+    CHECK(
+        dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path);
     LOG_INFO("start to move replica({}) as garbage, path: {}", id, replica_path);
-    char rename_path[1024];
-    sprintf(rename_path, "%s.%" PRIu64 ".gar", replica_path.c_str(), dsn_now_us());
+    const auto rename_path = fmt::format("{}.{}.gar", replica_path, dsn_now_us());
     if (!dsn::utils::filesystem::rename_path(replica_path, rename_path)) {
         LOG_WARNING("gc_replica: failed to move directory '{}' to '{}'", replica_path, rename_path);
 
         // if gc the replica failed, add it back
-        zauto_write_lock l(_replicas_lock);
+        {
+            zauto_write_lock l(_replicas_lock);
+            _closed_replicas.emplace(id, closed_info);
+        }
         _fs_manager.add_replica(id, replica_path);
-        _closed_replicas.emplace(id, closed_info);
     } else {
         LOG_WARNING("gc_replica: replica_dir_op succeed to move directory '{}' to '{}'",
                     replica_path,
@@ -2014,7 +1986,7 @@ void replica_stub::on_disk_stat()
     uint64_t start = dsn_now_ns();
     disk_cleaning_report report{};
 
-    dsn::replication::disk_remove_useless_dirs(_fs_manager.get_available_data_dirs(), report);
+    dsn::replication::disk_remove_useless_dirs(_fs_manager.get_dir_nodes(), report);
     _fs_manager.update_disk_stat();
     update_disk_holding_replicas();
     update_disks_status();
@@ -2097,9 +2069,12 @@ void replica_stub::open_replica(
     const std::shared_ptr<group_check_request> &group_check,
     const std::shared_ptr<configuration_update_request> &configuration_update)
 {
-    std::string dir = get_replica_dir(app.app_type.c_str(), id, false);
-    replica_ptr rep = nullptr;
-    if (!dir.empty()) {
+    replica_ptr rep;
+    std::string dir;
+    auto dn = _fs_manager.find_replica_dir(app.app_type, id);
+    if (dn != nullptr) {
+        dir = dn->replica_dir(app.app_type, id);
+        CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
         // NOTICE: if partition is DDD, and meta select one replica as primary, it will execute the
         // load-process because of a.b.pegasus is exist, so it will never execute the restore
         // process below
@@ -2113,13 +2088,15 @@ void replica_stub::open_replica(
         // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk
         // migration
         if (rep == nullptr) {
-            std::string origin_tmp_dir = get_replica_dir(
-                fmt::format("{}{}", app.app_type, replica_disk_migrator::kReplicaDirOriginSuffix)
-                    .c_str(),
-                id,
-                false);
-            if (!origin_tmp_dir.empty()) {
-                LOG_INFO("mark the dir {} is garbage, start revert and load disk migration origin "
+            const auto origin_dir_type =
+                fmt::format("{}{}", app.app_type, replica_disk_migrator::kReplicaDirOriginSuffix);
+            const auto origin_dn = _fs_manager.find_replica_dir(origin_dir_type, id);
+            if (origin_dn != nullptr) {
+                const auto origin_tmp_dir = origin_dn->replica_dir(origin_dir_type, id);
+                CHECK(dsn::utils::filesystem::directory_exists(origin_tmp_dir),
+                      "dir({}) not exist",
+                      origin_tmp_dir);
+                LOG_INFO("mark the dir {} as garbage, start revert and load disk migration origin "
                          "replica data({})",
                          dir,
                          origin_tmp_dir);
@@ -2232,12 +2209,18 @@ replica *replica_stub::new_replica(gpid gpid,
                                    bool is_duplication_follower,
                                    const std::string &parent_dir)
 {
-    std::string dir;
+    dir_node *dn = nullptr;
     if (parent_dir.empty()) {
-        dir = get_replica_dir(app.app_type.c_str(), gpid);
+        dn = _fs_manager.create_replica_dir_if_necessary(app.app_type, gpid);
     } else {
-        dir = get_child_dir(app.app_type.c_str(), gpid, parent_dir);
+        dn = _fs_manager.create_child_replica_dir(app.app_type, gpid, parent_dir);
     }
+    if (dn == nullptr) {
+        LOG_ERROR("could not allocate a new directory for replica {}", gpid);
+        return nullptr;
+    }
+    const auto &dir = dn->replica_dir(app.app_type, gpid);
+    CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
     auto *rep =
         new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
     error_code err;
@@ -2883,44 +2866,6 @@ void replica_stub::close()
     _is_running = false;
 }
 
-std::string replica_stub::get_replica_dir(const char *app_type, gpid id, bool create_new)
-{
-    std::string gpid_str = fmt::format("{}.{}", id, app_type);
-    std::string replica_dir;
-    bool is_dir_exist = false;
-    for (const std::string &data_dir : _fs_manager.get_available_data_dirs()) {
-        std::string dir = utils::filesystem::path_combine(data_dir, gpid_str);
-        if (utils::filesystem::directory_exists(dir)) {
-            CHECK(!is_dir_exist, "replica dir conflict: {} <--> {}", dir, replica_dir);
-            replica_dir = dir;
-            is_dir_exist = true;
-        }
-    }
-    if (replica_dir.empty() && create_new) {
-        _fs_manager.allocate_dir(id, app_type, replica_dir);
-    }
-    return replica_dir;
-}
-
-std::string
-replica_stub::get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir)
-{
-    std::string gpid_str = fmt::format("{}.{}", child_pid.to_string(), app_type);
-    std::string child_dir;
-    for (const std::string &data_dir : _fs_manager.get_available_data_dirs()) {
-        std::string dir = utils::filesystem::path_combine(data_dir, gpid_str);
-        // <parent_dir> = <prefix>/<gpid>.<app_type>
-        // check if <parent_dir>'s <prefix> is equal to <data_dir>
-        if (parent_dir.substr(0, data_dir.size() + 1) == data_dir + "/") {
-            child_dir = dir;
-            _fs_manager.add_replica(child_pid, child_dir);
-            break;
-        }
-    }
-    CHECK(!child_dir.empty(), "can not find parent_dir {} in data_dirs", parent_dir);
-    return child_dir;
-}
-
 #ifdef DSN_ENABLE_GPERF
 // Get tcmalloc numeric property (name is "prop") value.
 // Return -1 if get property failed (property we used will be greater than zero)
@@ -3094,22 +3039,20 @@ void replica_stub::on_update_child_group_partition_count(update_child_group_part
 
 void replica_stub::update_disk_holding_replicas()
 {
-    for (const auto &dir_node : _fs_manager._dir_nodes) {
-        // clear the holding_primary_replicas/holding_secondary_replicas and re-calculate it from
-        // holding_replicas
-        dir_node->holding_primary_replicas.clear();
-        dir_node->holding_secondary_replicas.clear();
-        for (const auto &holding_replicas : dir_node->holding_replicas) {
-            const std::set<dsn::gpid> &pids = holding_replicas.second;
+    for (const auto &dn : _fs_manager.get_dir_nodes()) {
+        dn->holding_primary_replicas.clear();
+        dn->holding_secondary_replicas.clear();
+        for (const auto &holding_replicas : dn->holding_replicas) {
+            const auto &pids = holding_replicas.second;
             for (const auto &pid : pids) {
-                replica_ptr replica = get_replica(pid);
-                if (replica == nullptr) {
+                const auto rep = get_replica(pid);
+                if (rep == nullptr) {
                     continue;
                 }
-                if (replica->status() == partition_status::PS_PRIMARY) {
-                    dir_node->holding_primary_replicas[holding_replicas.first].emplace(pid);
-                } else if (replica->status() == partition_status::PS_SECONDARY) {
-                    dir_node->holding_secondary_replicas[holding_replicas.first].emplace(pid);
+                if (rep->status() == partition_status::PS_PRIMARY) {
+                    dn->holding_primary_replicas[holding_replicas.first].emplace(pid);
+                } else if (rep->status() == partition_status::PS_SECONDARY) {
+                    dn->holding_secondary_replicas[holding_replicas.first].emplace(pid);
                 }
             }
         }
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 12f17a1cf..00d96f447 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -139,8 +139,6 @@ public:
     //
     void initialize(const replication_options &opts, bool clear = false);
     void initialize(bool clear = false);
-    void initialize_fs_manager(const std::vector<std::string> &data_dirs,
-                               const std::vector<std::string> &data_dir_tags);
     void set_options(const replication_options &opts) { _options = opts; }
     void open_service();
     void close();
@@ -202,12 +200,6 @@ public:
     virtual rpc_address get_meta_server_address() const { return _failure_detector->get_servers(); }
     rpc_address primary_address() const { return _primary_address; }
 
-    std::string get_replica_dir(const char *app_type, gpid id, bool create_new = true);
-
-    // during partition split, we should gurantee child replica and parent replica share the
-    // same data dir
-    std::string get_child_dir(const char *app_type, gpid child_pid, const std::string &parent_dir);
-
     //
     // helper methods
     //
@@ -341,7 +333,7 @@ private:
                       const std::shared_ptr<group_check_request> &req,
                       const std::shared_ptr<configuration_update_request> &req2);
     // Create a new replica according to the parameters.
-    // 'parent_dir' is used in partition split for get_child_dir().
+    // 'parent_dir' is used in partition split for create_child_replica_dir().
     replica *new_replica(gpid gpid,
                          const app_info &app,
                          bool restore_if_necessary,
@@ -427,6 +419,7 @@ private:
     friend class replica_follower;
     friend class replica_follower_test;
     friend class replica_http_service_test;
+    FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
     FRIEND_TEST(replica_test, test_clear_on_failure);
     FRIEND_TEST(replica_test, test_auto_trash);
 
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index cece7c82e..965b81ec6 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -231,23 +231,27 @@ private:
 };
 typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;
 
-inline std::unique_ptr<mock_replica> create_mock_replica(replica_stub *stub,
-                                                         int appid = 1,
-                                                         int partition_index = 1,
-                                                         const char *dir = "./")
+inline std::unique_ptr<mock_replica>
+create_mock_replica(replica_stub *stub, int appid = 1, int partition_index = 1)
 {
     gpid gpid(appid, partition_index);
     app_info app_info;
     app_info.app_type = "replica";
     app_info.app_name = "temp";
 
-    return std::make_unique<mock_replica>(stub, gpid, app_info, dir);
+    const auto *const dn =
+        stub->get_fs_manager()->create_replica_dir_if_necessary(app_info.app_type, gpid);
+    CHECK_NOTNULL(dn, "");
+    const auto replica_path = dn->replica_dir(app_info.app_type, gpid);
+    CHECK(
+        dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path);
+    return std::make_unique<mock_replica>(stub, gpid, app_info, replica_path.c_str());
 }
 
 class mock_replica_stub : public replica_stub
 {
 public:
-    mock_replica_stub() = default;
+    mock_replica_stub() { _fs_manager.initialize({"./"}, {"tag"}); }
 
     ~mock_replica_stub() override = default;
 
@@ -313,9 +317,10 @@ public:
         config.pid = pid;
         config.status = status;
 
-        // TODO(yingchun): should refactor to move to cstor or initializer.
-        initialize_fs_manager({"./"}, {"tag"});
-        std::string dir = get_replica_dir("test", pid);
+        auto dn = _fs_manager.create_replica_dir_if_necessary(info.app_type, pid);
+        CHECK_NOTNULL(dn, "");
+        const auto &dir = dn->replica_dir(info.app_type, pid);
+        CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
         auto *rep =
             new mock_replica(this, pid, info, dir.c_str(), need_restore, is_duplication_follower);
         rep->set_replica_config(config);
diff --git a/src/replica/test/mutation_log_test.cpp b/src/replica/test/mutation_log_test.cpp
index 2b7ce0bab..6bca13336 100644
--- a/src/replica/test/mutation_log_test.cpp
+++ b/src/replica/test/mutation_log_test.cpp
@@ -297,7 +297,6 @@ public:
     {
         utils::filesystem::remove_path(_log_dir);
         utils::filesystem::create_directory(_log_dir);
-
         utils::filesystem::remove_path(_log_dir + ".test");
     }
 
diff --git a/src/replica/test/open_replica_test.cpp b/src/replica/test/open_replica_test.cpp
index 89e080286..b4cb088f6 100644
--- a/src/replica/test/open_replica_test.cpp
+++ b/src/replica/test/open_replica_test.cpp
@@ -20,13 +20,10 @@
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
 #include <stdint.h>
-#include <algorithm>
 #include <memory>
 #include <string>
 #include <unordered_map>
-#include <vector>
 
-#include "common/fs_manager.h"
 #include "common/gpid.h"
 #include "common/replication_other_types.h"
 #include "dsn.layer2_types.h"
@@ -47,68 +44,55 @@ class open_replica_test : public replica_test_base
 public:
     open_replica_test() = default;
     ~open_replica_test() { dsn::utils::filesystem::remove_path("./tmp_dir"); }
+};
 
-    void test_open_replica()
-    {
-        app_info app_info;
-        app_info.app_type = "replica";
-        app_info.is_stateful = true;
-        app_info.max_replica_count = 3;
-        app_info.partition_count = 8;
-        app_info.app_id = 1;
-
-        struct test_data
-        {
-            ballot b;
-            decree last_committed_decree;
-            bool is_in_dir_nodes;
-            bool exec_failed;
-        } tests[] = {
-            {0, 0, true, true}, {0, 0, false, false}, {5, 5, true, true}, {5, 5, false, true},
-        };
-        int i = 0;
-        for (auto tt : tests) {
-            gpid gpid(app_info.app_id, i);
-            stub->_opening_replicas[gpid] = task_ptr(nullptr);
+TEST_F(open_replica_test, open_replica_add_decree_and_ballot_check)
+{
+    app_info ai;
+    ai.app_type = "replica";
+    ai.is_stateful = true;
+    ai.max_replica_count = 3;
+    ai.partition_count = 8;
+    ai.app_id = 11;
 
-            dsn::rpc_address node;
-            node.assign_ipv4("127.0.0.11", static_cast<uint16_t>(12321 + i + 1));
+    struct test_data
+    {
+        ballot b;
+        decree last_committed_decree;
+        bool expect_crash;
+    } tests[] = {{0, 0, false}, {5, 5, true}};
+    int i = 0;
+    for (auto test : tests) {
+        gpid pid(ai.app_id, i);
+        stub->_opening_replicas[pid] = task_ptr(nullptr);
 
-            if (!tt.is_in_dir_nodes) {
-                dir_node *node_disk = new dir_node("tag_" + std::to_string(i), "tmp_dir");
-                stub->_fs_manager._dir_nodes.emplace_back(node_disk);
-                stub->_fs_manager._available_data_dirs.emplace_back("tmp_dir");
-            }
+        dsn::rpc_address node;
+        node.assign_ipv4("127.0.0.11", static_cast<uint16_t>(12321 + i + 1));
 
-            _replica->register_service();
-            mock_mutation_log_shared_ptr shared_log_mock =
-                new mock_mutation_log_shared("./tmp_dir");
-            stub->set_log(shared_log_mock);
-            partition_configuration config;
-            config.pid = gpid;
-            config.ballot = tt.b;
-            config.last_committed_decree = tt.last_committed_decree;
-            std::shared_ptr<app_state> _the_app = app_state::create(app_info);
+        _replica->register_service();
+        mock_mutation_log_shared_ptr shared_log_mock = new mock_mutation_log_shared("./tmp_dir");
+        stub->set_log(shared_log_mock);
 
-            configuration_update_request fake_request;
-            fake_request.info = *_the_app;
-            fake_request.config = config;
-            fake_request.type = config_type::CT_ASSIGN_PRIMARY;
-            fake_request.node = node;
+        partition_configuration config;
+        config.pid = pid;
+        config.ballot = test.b;
+        config.last_committed_decree = test.last_committed_decree;
+        auto as = app_state::create(ai);
 
-            std::shared_ptr<configuration_update_request> req2(new configuration_update_request);
-            *req2 = fake_request;
-            if (tt.exec_failed) {
-                ASSERT_DEATH(stub->open_replica(app_info, gpid, nullptr, req2), "");
-            } else {
-                stub->open_replica(app_info, gpid, nullptr, req2);
-            }
-            ++i;
+        auto req = std::make_shared<configuration_update_request>();
+        req->info = *as;
+        req->config = config;
+        req->type = config_type::CT_ASSIGN_PRIMARY;
+        req->node = node;
+        if (test.expect_crash) {
+            ASSERT_DEATH(stub->open_replica(ai, pid, nullptr, req), "");
+        } else {
+            stub->open_replica(ai, pid, nullptr, req);
         }
+        // Both of the tests will fail, the replica is not exist in the stub.
+        ASSERT_EQ(nullptr, stub->get_replica(pid));
+        ++i;
     }
-};
-
-TEST_F(open_replica_test, open_replica_add_decree_and_ballot_check) { test_open_replica(); }
-
+}
 } // namespace replication
 } // namespace dsn
diff --git a/src/replica/test/replica_disk_migrate_test.cpp b/src/replica/test/replica_disk_migrate_test.cpp
index 64ac2da7f..7c47abad2 100644
--- a/src/replica/test/replica_disk_migrate_test.cpp
+++ b/src/replica/test/replica_disk_migrate_test.cpp
@@ -18,9 +18,11 @@
  */
 
 #include <fmt/core.h>
+#include <fmt/ostream.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
+#include <iosfwd>
 #include <map>
 #include <memory>
 #include <set>
@@ -347,7 +349,7 @@ TEST_F(replica_disk_migrate_test, disk_migrate_replica_update)
     ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaNewDir));
     utils::filesystem::remove_path(fmt::format("./{}/", request.origin_disk));
     utils::filesystem::remove_path(fmt::format("./{}/", request.target_disk));
-    for (const auto &node_disk : get_dir_nodes()) {
+    for (const auto &node_disk : stub->get_fs_manager()->get_dir_nodes()) {
         if (node_disk->tag == request.origin_disk) {
             auto gpids = node_disk->holding_replicas[app_info_1.app_id];
             ASSERT_TRUE(gpids.find(request.pid) == gpids.end());
@@ -362,31 +364,42 @@ TEST_F(replica_disk_migrate_test, disk_migrate_replica_update)
     }
 }
 
+// Test load from new replica dir failed, then fall back to load from origin dir succeed,
+// and then mark the "new" replica dir as ".gar".
 TEST_F(replica_disk_migrate_test, disk_migrate_replica_open)
 {
+    gpid test_pid(app_info_1.app_id, 4);
+
+    // Suppose gpid 1.4 is migrated from tag_2 to tag_empty_1.
     auto &request = *fake_migrate_rpc.mutable_request();
-    request.pid = dsn::gpid(app_info_1.app_id, 4);
+    request.pid = test_pid;
     request.origin_disk = "tag_2";
     request.target_disk = "tag_empty_1";
 
-    remove_mock_dir_node(request.origin_disk);
-    const std::string kReplicaOriginSuffixDir = fmt::format(
-        "./{}/{}.replica.disk.migrate.ori/", request.origin_disk, request.pid.to_string());
-    const std::string kReplicaNewDir =
-        fmt::format("./{}/{}.replica/", request.target_disk, request.pid.to_string());
+    // Remove the gpid 1.4 dir which is created in constructor.
+    const auto kReplicaOriginDir = fmt::format("./{}/{}.replica", request.origin_disk, request.pid);
+    utils::filesystem::remove_path(kReplicaOriginDir);
+    stub->get_fs_manager()->remove_replica(test_pid);
+
+    // Create the related dirs.
+    const auto kReplicaOriginSuffixDir =
+        fmt::format("./{}/{}.replica.disk.migrate.ori/", request.origin_disk, request.pid);
+    const auto kReplicaNewDir = fmt::format("./{}/{}.replica/", request.target_disk, request.pid);
     utils::filesystem::create_directory(kReplicaOriginSuffixDir);
     utils::filesystem::create_directory(kReplicaNewDir);
 
+    // The replica can be opened nomally. In fact, the original dir is opened, and the new dir will
+    // be garbage.
     fail::cfg("mock_replica_load", "return()");
-    const std::string kReplicaOriginDir =
-        fmt::format("./{}/{}.replica", request.origin_disk, request.pid.to_string());
-    const std::string kReplicaGarDir =
-        fmt::format("./{}/{}.replica.gar", request.target_disk, request.pid.to_string());
     open_replica(app_info_1, request.pid);
 
+    // Check it works as expected.
+    const auto kReplicaGarDir =
+        fmt::format("./{}/{}.replica.gar", request.target_disk, request.pid);
     ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaOriginDir));
     ASSERT_TRUE(utils::filesystem::directory_exists(kReplicaGarDir));
 
+    // Clean up.
     utils::filesystem::remove_path(kReplicaOriginDir);
     utils::filesystem::remove_path(kReplicaGarDir);
 }
diff --git a/src/replica/test/replica_disk_test.cpp b/src/replica/test/replica_disk_test.cpp
index 934a5c749..77880952c 100644
--- a/src/replica/test/replica_disk_test.cpp
+++ b/src/replica/test/replica_disk_test.cpp
@@ -215,9 +215,8 @@ TEST_F(replica_disk_test, gc_disk_useless_dir)
 
     sleep(5);
 
-    std::vector<std::string> data_dirs{"./"};
     disk_cleaning_report report{};
-    dsn::replication::disk_remove_useless_dirs(data_dirs, report);
+    dsn::replication::disk_remove_useless_dirs({std::make_shared<dir_node>("test", "./")}, report);
 
     for (const auto &test : tests) {
         if (!dsn::replication::is_data_dir_removable(test)) {
@@ -246,7 +245,7 @@ TEST_F(replica_disk_test, disk_status_test)
               {disk_status::SPACE_INSUFFICIENT, disk_status::SPACE_INSUFFICIENT},
               {disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}};
     for (const auto &test : tests) {
-        auto node = get_dir_nodes()[node_index];
+        auto node = stub->get_fs_manager()->get_dir_nodes()[node_index];
         mock_node_status(node_index, test.old_status, test.new_status);
         update_disks_status();
         for (auto &kv : node->holding_replicas) {
@@ -260,24 +259,6 @@ TEST_F(replica_disk_test, disk_status_test)
     mock_node_status(node_index, disk_status::NORMAL, disk_status::NORMAL);
 }
 
-TEST_F(replica_disk_test, broken_disk_test)
-{
-    // Test cases:
-    // create: true, check_rw: true
-    // create: true, check_rw: false
-    // create: false
-    struct broken_disk_test
-    {
-        std::string mock_create_dir;
-        std::string mock_rw_flag;
-        int32_t data_dir_size;
-    } tests[]{{"true", "true", 3}, {"true", "false", 2}, {"false", "false", 2}};
-    for (const auto &test : tests) {
-        ASSERT_EQ(test.data_dir_size,
-                  ignore_broken_disk_test(test.mock_create_dir, test.mock_rw_flag));
-    }
-}
-
 TEST_F(replica_disk_test, add_new_disk_test)
 {
     // Test case:
diff --git a/src/replica/test/replica_disk_test_base.h b/src/replica/test/replica_disk_test_base.h
index db41cd509..2da9d9276 100644
--- a/src/replica/test/replica_disk_test_base.h
+++ b/src/replica/test/replica_disk_test_base.h
@@ -56,6 +56,8 @@ public:
         fail::cfg("update_disk_stat", "return()");
         generate_mock_app_info();
 
+        stub->_fs_manager._dir_nodes.clear();
+        stub->_fs_manager.reset_disk_stat();
         generate_mock_dir_nodes(dir_nodes_count);
         generate_mock_empty_dir_node(empty_dir_nodes_count);
 
@@ -73,8 +75,6 @@ public:
 
     void update_disks_status() { stub->update_disks_status(); }
 
-    std::vector<std::shared_ptr<dir_node>> get_dir_nodes() { return stub->_fs_manager._dir_nodes; }
-
     void generate_mock_dir_node(const app_info &app,
                                 const gpid pid,
                                 const std::string &tag,
@@ -83,7 +83,6 @@ public:
         dir_node *node_disk = new dir_node(tag, full_dir);
         node_disk->holding_replicas[app.app_id].emplace(pid);
         stub->_fs_manager._dir_nodes.emplace_back(node_disk);
-        stub->_fs_manager._available_data_dirs.emplace_back(full_dir);
     }
 
     void remove_mock_dir_node(const std::string &tag)
@@ -101,7 +100,7 @@ public:
     void
     mock_node_status(int32_t node_index, disk_status::type old_status, disk_status::type new_status)
     {
-        auto node = get_dir_nodes()[node_index];
+        auto node = stub->_fs_manager.get_dir_nodes()[node_index];
         for (const auto &kv : node->holding_replicas) {
             for (const auto &pid : kv.second) {
                 update_replica_disk_status(pid, old_status);
@@ -124,21 +123,6 @@ public:
         return ERR_OK;
     }
 
-    int32_t ignore_broken_disk_test(const std::string &mock_create_directory,
-                                    const std::string &mock_check_rw)
-    {
-        std::vector<std::string> data_dirs = {"disk1", "disk2", "disk3"};
-        std::vector<std::string> data_dir_tags = {"tag1", "tag2", "tag3"};
-        auto test_stub = std::make_unique<mock_replica_stub>();
-        fail::cfg("filesystem_create_directory", "return(" + mock_create_directory + ")");
-        fail::cfg("filesystem_check_dir_rw", "return(" + mock_check_rw + ")");
-        fail::cfg("update_disk_stat", "return()");
-        test_stub->initialize_fs_manager(data_dirs, data_dir_tags);
-        int32_t dir_size = test_stub->_fs_manager.get_available_data_dirs().size();
-        test_stub.reset();
-        return dir_size;
-    }
-
     void prepare_before_add_new_disk_test(const std::string &create_dir,
                                           const std::string &check_rw)
     {
@@ -153,7 +137,6 @@ public:
     void reset_after_add_new_disk_test()
     {
         stub->_fs_manager._dir_nodes.clear();
-        stub->_fs_manager._available_data_dirs.clear();
         dsn::utils::filesystem::remove_path("add_new_not_empty_disk");
     }
 
@@ -193,7 +176,6 @@ private:
             dir_node *node_disk =
                 new dir_node(fmt::format("tag_empty_{}", num), fmt::format("./tag_empty_{}", num));
             stub->_fs_manager._dir_nodes.emplace_back(node_disk);
-            stub->_fs_manager._available_data_dirs.emplace_back(node_disk->full_dir);
             utils::filesystem::create_directory(node_disk->full_dir);
             num--;
         }
@@ -239,7 +221,6 @@ private:
             }
 
             stub->_fs_manager._dir_nodes.emplace_back(node_disk);
-            stub->_fs_manager._available_data_dirs.emplace_back(node_disk->full_dir);
         }
     }
 
diff --git a/src/replica/test/replica_test_base.h b/src/replica/test/replica_test_base.h
index 94260dbce..3aed53c80 100644
--- a/src/replica/test/replica_test_base.h
+++ b/src/replica/test/replica_test_base.h
@@ -53,9 +53,14 @@ class replica_test_base : public replica_stub_test_base
 {
 public:
     std::unique_ptr<mock_replica> _replica;
-    const std::string _log_dir{"./test-log"};
+    // TODO(yingchun): rename to _replica_dir, and consider to remove it totally.
+    std::string _log_dir;
 
-    replica_test_base() { _replica = create_mock_replica(stub.get(), 1, 1, _log_dir.c_str()); }
+    replica_test_base()
+    {
+        _replica = create_mock_replica(stub.get(), 1, 1);
+        _log_dir = _replica->dir();
+    }
 
     virtual mutation_ptr create_test_mutation(int64_t decree, const std::string &data)
     {
diff --git a/src/server/test/pegasus_server_test_base.h b/src/server/test/pegasus_server_test_base.h
index 5ebe2496e..a5a530e41 100644
--- a/src/server/test/pegasus_server_test_base.h
+++ b/src/server/test/pegasus_server_test_base.h
@@ -46,6 +46,7 @@ public:
         // Remove rdb to prevent rocksdb recovery from last test.
         dsn::utils::filesystem::remove_path("./data/rdb");
         _replica_stub = new dsn::replication::replica_stub();
+        _replica_stub->get_fs_manager()->initialize({"./"}, {"test_tag"});
 
         _gpid = dsn::gpid(100, 1);
         dsn::app_info app_info;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org