You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/05/31 02:46:10 UTC

[incubator-pegasus] branch master updated: refactor: update replica's dir_node status (part2) (#1489)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new e4350d4c7 refactor: update replica's dir_node status (part2) (#1489)
e4350d4c7 is described below

commit e4350d4c74c1f2c57cd58eb71ac2202c688809be
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Wed May 31 10:46:03 2023 +0800

    refactor: update replica's dir_node status (part2) (#1489)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    This patch removes the duplicated _disk_tag and _disk_status of the dir_node where
    it is placed on, instead, introduce a dir_node pointer for replica. So once the
    status of the dir_node updated, we can judge the replica's status more conveniently.
    
    Some unit tests have been updated as well, including:
    - change the test directory from `./` to `test_dir`
    - simplify the logic of replica_disk_test related test
---
 src/aio/native_linux_aio_provider.cpp              |   2 +-
 src/common/fs_manager.cpp                          |  37 +++++---
 src/common/fs_manager.h                            |   9 +-
 src/replica/duplication/replica_follower.cpp       |   3 +-
 .../test/load_from_private_log_test.cpp            |   5 +-
 src/replica/replica.cpp                            |  15 +--
 src/replica/replica.h                              |  15 +--
 src/replica/replica_2pc.cpp                        |   4 +-
 src/replica/replica_check.cpp                      |   3 +-
 src/replica/replica_learn.cpp                      |   5 +-
 src/replica/replica_stub.cpp                       | 101 +++++++++++----------
 src/replica/replica_stub.h                         |   7 +-
 src/replica/test/mock_utils.h                      |  80 +++++++++-------
 src/replica/test/replica_disk_migrate_test.cpp     |   2 +-
 src/replica/test/replica_disk_test.cpp             |  40 ++++----
 src/replica/test/replica_disk_test_base.h          |  63 ++++---------
 src/replica/test/replica_learn_test.cpp            |  15 ++-
 src/replica/test/replica_test.cpp                  |  57 ++++++++----
 src/server/test/hotkey_collector_test.cpp          |   3 +-
 src/server/test/pegasus_server_test_base.h         |  15 ++-
 src/server/test/rocksdb_wrapper_test.cpp           |   8 +-
 src/utils/test_macros.h                            |   6 ++
 22 files changed, 268 insertions(+), 227 deletions(-)

diff --git a/src/aio/native_linux_aio_provider.cpp b/src/aio/native_linux_aio_provider.cpp
index 55e8961d2..470f3d63a 100644
--- a/src/aio/native_linux_aio_provider.cpp
+++ b/src/aio/native_linux_aio_provider.cpp
@@ -52,7 +52,7 @@ linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag, int
 {
     auto fd = ::open(file_name, flag, pmode);
     if (fd == DSN_INVALID_FILE_HANDLE) {
-        LOG_ERROR("create file failed, err = {}", utils::safe_strerror(errno));
+        LOG_ERROR("create file '{}' failed, err = {}", file_name, utils::safe_strerror(errno));
     }
     return linux_fd_t(fd);
 }
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 653b0c138..a33d20e54 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -235,17 +235,6 @@ void fs_manager::initialize(const std::vector<std::string> &data_dirs,
     update_disk_stat();
 }
 
-dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &tag)
-{
-    dir_node *n = get_dir_node(dir);
-    if (nullptr == n) {
-        return dsn::ERR_OBJECT_NOT_FOUND;
-    } else {
-        tag = n->tag;
-        return dsn::ERR_OK;
-    }
-}
-
 void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
 {
     const auto &dn = get_dir_node(pid_dir);
@@ -306,6 +295,24 @@ dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
     return selected;
 }
 
+void fs_manager::specify_dir_for_new_replica_for_test(dir_node *specified_dn,
+                                                      dsn::string_view app_type,
+                                                      const dsn::gpid &pid) const
+{
+    bool dn_found = false;
+    zauto_write_lock l(_lock);
+    for (const auto &dn : _dir_nodes) {
+        CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
+        if (dn.get() == specified_dn) {
+            dn_found = true;
+        }
+    }
+    CHECK(dn_found, "dir_node({}) is not exist", specified_dn->tag);
+    const auto dir = specified_dn->replica_dir(app_type, pid);
+    CHECK_TRUE(dsn::utils::filesystem::create_directory(dir));
+    specified_dn->holding_replicas[pid.get_app_id()].emplace(pid);
+}
+
 void fs_manager::remove_replica(const gpid &pid)
 {
     zauto_write_lock l(_lock);
@@ -412,9 +419,17 @@ dir_node *fs_manager::create_replica_dir_if_necessary(dsn::string_view app_type,
     // Try to find the replica directory.
     auto replica_dn = find_replica_dir(app_type, pid);
     if (replica_dn != nullptr) {
+        // TODO(yingchun): enable this check after unit tests are refactored and fixed.
+        // CHECK(replica_dn->has(pid),
+        //       "replica({})'s directory({}) exists but not in management",
+        //       pid,
+        //       replica_dn->tag);
         return replica_dn;
     }
 
+    // TODO(yingchun): enable this check after unit tests are refactored and fixed.
+    // CHECK(0 == replica_dn->holding_replicas.count(pid.get_app_id()) ||
+    //       0 == replica_dn->holding_replicas[pid.get_app_id()].count(pid), "");
     // Find a dir_node for the new replica.
     replica_dn = find_best_dir_for_new_replica(pid);
     if (replica_dn == nullptr) {
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 3c67d626a..be19d79b6 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -29,7 +29,6 @@
 #include "common/replication_other_types.h"
 #include "metadata_types.h"
 #include "perf_counter/perf_counter_wrapper.h"
-#include "utils/error_code.h"
 #include "utils/flags.h"
 #include "utils/string_view.h"
 #include "utils/zlocks.h"
@@ -96,7 +95,13 @@ public:
     // TODO(yingchun): consider the disk capacity and available space.
     // NOTE: the 'pid' must not exist in any dir_nodes.
     dir_node *find_best_dir_for_new_replica(const dsn::gpid &pid) const;
-    dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
+    // Similar to the above, but will specify the dir_node for the new replica.
+    // NOTE: the 'pid' must not exist in any dir_nodes and the 'specified_dn' must be in the
+    // dir_nodes.
+    // NOTE: only used in test.
+    void specify_dir_for_new_replica_for_test(dir_node *specified_dn,
+                                              dsn::string_view app_type,
+                                              const dsn::gpid &pid) const;
     void add_replica(const dsn::gpid &pid, const std::string &pid_dir);
     // Find the replica instance directory.
     dir_node *find_replica_dir(dsn::string_view app_type, gpid pid);
diff --git a/src/replica/duplication/replica_follower.cpp b/src/replica/duplication/replica_follower.cpp
index b38503d3b..f9c2a996d 100644
--- a/src/replica/duplication/replica_follower.cpp
+++ b/src/replica/duplication/replica_follower.cpp
@@ -27,6 +27,7 @@
 #include <utility>
 
 #include "common/duplication_common.h"
+#include "common/fs_manager.h"
 #include "common/replication.codes.h"
 #include "consensus_types.h"
 #include "nfs/nfs_node.h"
@@ -236,7 +237,7 @@ void replica_follower::nfs_copy_remote_files(const rpc_address &remote_node,
     request->source_disk_tag = remote_disk;
     request->source_dir = remote_dir;
     request->files = file_list;
-    request->dest_disk_tag = _replica->get_replica_disk_tag();
+    request->dest_disk_tag = _replica->get_dir_node()->tag;
     request->dest_dir = dest_dir;
     request->overwrite = true;
     request->high_priority = false;
diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp b/src/replica/duplication/test/load_from_private_log_test.cpp
index a9e7edaed..342669f63 100644
--- a/src/replica/duplication/test/load_from_private_log_test.cpp
+++ b/src/replica/duplication/test/load_from_private_log_test.cpp
@@ -58,6 +58,7 @@
 #include <iterator>
 #include <map>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
@@ -326,13 +327,15 @@ TEST_F(load_from_private_log_test, handle_real_private_log)
 
         // Update '_log_dir' to the corresponding replica created above.
         _log_dir = _replica->dir();
+        ASSERT_TRUE(utils::filesystem::path_exists(_log_dir)) << _log_dir;
 
         // Copy the log file to '_log_dir'
         boost::filesystem::path file(tt.fname);
+        ASSERT_TRUE(dsn::utils::filesystem::file_exists(tt.fname)) << tt.fname;
         boost::system::error_code ec;
         boost::filesystem::copy_file(
             file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists, ec);
-        ASSERT_TRUE(!ec);
+        ASSERT_TRUE(!ec) << ec.value() << ", " << ec.category().name() << ", " << ec.message();
 
         // Start to verify.
         load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1, 0);
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index bcaa762df..d2f9ccefb 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -100,7 +100,7 @@ const std::string replica::kAppInfo = ".app-info";
 replica::replica(replica_stub *stub,
                  gpid gpid,
                  const app_info &app,
-                 const char *dir,
+                 dir_node *dn,
                  bool need_restore,
                  bool is_duplication_follower)
     : serverlet<replica>("replica"),
@@ -124,7 +124,9 @@ replica::replica(replica_stub *stub,
     CHECK(!_app_info.app_type.empty(), "");
     CHECK_NOTNULL(stub, "");
     _stub = stub;
-    _dir = dir;
+    CHECK_NOTNULL(dn, "");
+    _dir_node = dn;
+    _dir = dn->replica_dir(_app_info.app_type, gpid);
     _options = &stub->options();
     init_state();
     _config.pid = gpid;
@@ -232,7 +234,6 @@ void replica::init_state()
     _last_config_change_time_ms = _create_time_ms;
     update_last_checkpoint_generate_time();
     _private_log = nullptr;
-    init_disk_tag();
     get_bool_envs(_app_info.envs, replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND, _allow_ingest_behind);
 }
 
@@ -590,14 +591,6 @@ uint32_t replica::query_data_version() const
     return _app->query_data_version();
 }
 
-void replica::init_disk_tag()
-{
-    dsn::error_code err = _stub->_fs_manager.get_disk_tag(dir(), _disk_tag);
-    if (dsn::ERR_OK != err) {
-        LOG_ERROR_PREFIX("get disk tag of {} failed: {}, init it to empty ", dir(), err);
-    }
-}
-
 error_code replica::store_app_info(app_info &info, const std::string &path)
 {
     replica_app_info new_info((app_info *)&info);
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 612830c70..3c1f1068b 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -120,6 +120,7 @@ class replica_split_manager;
 class replica_stub;
 class replication_app_base;
 class replication_options;
+struct dir_node;
 
 typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
 
@@ -291,11 +292,7 @@ public:
 
     // routine for get extra envs from replica
     const std::map<std::string, std::string> &get_replica_extra_envs() const { return _extra_envs; }
-
-    void set_disk_status(disk_status::type status) { _disk_status = status; }
-    bool disk_space_insufficient() { return _disk_status == disk_status::SPACE_INSUFFICIENT; }
-    disk_status::type get_disk_status() { return _disk_status; }
-    std::string get_replica_disk_tag() const { return _disk_tag; }
+    const dir_node *get_dir_node() const { return _dir_node; }
 
     static const std::string kAppInfo;
 
@@ -315,7 +312,7 @@ private:
     replica(replica_stub *stub,
             gpid gpid,
             const app_info &app,
-            const char *dir,
+            dir_node *dn,
             bool need_restore,
             bool is_duplication_follower = false);
     error_code initialize_on_new();
@@ -523,8 +520,6 @@ private:
     // update envs to deny client request
     void update_deny_client(const std::map<std::string, std::string> &envs);
 
-    void init_disk_tag();
-
     // store `info` into a file under `path` directory
     // path = "" means using the default directory (`_dir`/.app_info)
     error_code store_app_info(app_info &info, const std::string &path = "");
@@ -582,7 +577,6 @@ private:
     // constants
     replica_stub *_stub;
     std::string _dir;
-    std::string _disk_tag;
     replication_options *_options;
     app_info _app_info;
     std::map<std::string, std::string> _extra_envs;
@@ -678,7 +672,8 @@ private:
 
     std::unique_ptr<security::access_controller> _access_controller;
 
-    disk_status::type _disk_status{disk_status::NORMAL};
+    // The dir_node where the replica data is placed.
+    dir_node *_dir_node{nullptr};
 
     bool _allow_ingest_behind{false};
     // Indicate where the storage engine data is corrupted and unrecoverable.
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index 0fd3b9851..be1328b73 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -37,6 +37,7 @@
 
 #include "bulk_load/replica_bulk_loader.h"
 #include "bulk_load_types.h"
+#include "common/fs_manager.h"
 #include "common/gpid.h"
 #include "common/replication.codes.h"
 #include "common/replication_common.h"
@@ -178,7 +179,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
     }
 
     if (FLAGS_reject_write_when_disk_insufficient &&
-        (disk_space_insufficient() || _primary_states.secondary_disk_space_insufficient())) {
+        (_dir_node->status == disk_status::SPACE_INSUFFICIENT ||
+         _primary_states.secondary_disk_space_insufficient())) {
         response_client_write(request, ERR_DISK_INSUFFICIENT);
         return;
     }
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index 8cd0325e2..e978a91db 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -38,6 +38,7 @@
 #include <unordered_map>
 #include <utility>
 
+#include "common/fs_manager.h"
 #include "common/gpid.h"
 #include "common/replication.codes.h"
 #include "common/replication_common.h"
@@ -210,7 +211,7 @@ void replica::on_group_check(const group_check_request &request,
         }
         // the group check may trigger start/finish/cancel/pause a split on the secondary.
         _split_mgr->trigger_secondary_parent_split(request, response);
-        response.__set_disk_status(_disk_status);
+        response.__set_disk_status(_dir_node->status);
         break;
     case partition_status::PS_POTENTIAL_SECONDARY:
         init_learn(request.config.learner_signature);
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 589e3e28d..f65f77519 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -47,6 +47,7 @@
 #include <vector>
 
 #include "aio/aio_task.h"
+#include "common/fs_manager.h"
 #include "common/gpid.h"
 #include "common/replication.codes.h"
 #include "common/replication_enums.h"
@@ -543,7 +544,7 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
                     err);
             } else {
                 response.base_local_dir = _app->data_dir();
-                response.__set_replica_disk_tag(get_replica_disk_tag());
+                response.__set_replica_disk_tag(_dir_node->tag);
                 LOG_INFO_PREFIX(
                     "on_learn[{:#018x}]: learner = {}, get app learn state succeed, "
                     "learned_meta_size = {}, learned_file_count = {}, learned_to_decree = {}",
@@ -910,7 +911,7 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response
             resp.replica_disk_tag,
             resp.base_local_dir,
             resp.state.files,
-            get_replica_disk_tag(),
+            _dir_node->tag,
             learn_dir,
             get_gpid(),
             true, // overwrite
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 5f6016ff5..54b2879e9 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -616,34 +616,37 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
 
     // Start to load replicas in available data directories.
     LOG_INFO("start to load replicas");
-
-    std::vector<std::string> dir_list;
+    std::map<dir_node *, std::vector<std::string>> dirs_by_dn;
     for (const auto &dn : _fs_manager.get_dir_nodes()) {
-        std::vector<std::string> tmp_list;
-        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, tmp_list, false),
-              "Fail to get subdirectories in {}.",
+        std::vector<std::string> sub_directories;
+        CHECK(dsn::utils::filesystem::get_subdirectories(dn->full_dir, sub_directories, false),
+              "fail to get sub_directories in {}",
               dn->full_dir);
-        dir_list.insert(dir_list.end(), tmp_list.begin(), tmp_list.end());
+        dirs_by_dn.emplace(dn.get(), sub_directories);
     }
 
     replicas rps;
     utils::ex_lock rps_lock;
     std::deque<task_ptr> load_tasks;
     uint64_t start_time = dsn_now_ms();
-    for (auto &dir : dir_list) {
-        if (dsn::replication::is_data_dir_invalid(dir)) {
-            LOG_INFO("ignore dir {}", dir);
-            continue;
-        }
+    for (const auto &dn_dirs : dirs_by_dn) {
+        const auto dn = dn_dirs.first;
+        for (const auto &dir : dn_dirs.second) {
+            if (dsn::replication::is_data_dir_invalid(dir)) {
+                LOG_WARNING("ignore dir {}", dir);
+                continue;
+            }
 
-        load_tasks.push_back(
-            tasking::create_task(LPC_REPLICATION_INIT_LOAD,
-                                 &_tracker,
-                                 [this, dir, &rps, &rps_lock] {
-                                     LOG_INFO("process dir {}", dir);
+            load_tasks.push_back(
+                tasking::create_task(LPC_REPLICATION_INIT_LOAD,
+                                     &_tracker,
+                                     [this, dn, dir, &rps, &rps_lock] {
+                                         LOG_INFO("process dir {}", dir);
 
-                                     auto r = load_replica(dir.c_str());
-                                     if (r != nullptr) {
+                                         auto r = load_replica(dn, dir.c_str());
+                                         if (r == nullptr) {
+                                             return;
+                                         }
                                          LOG_INFO("{}@{}: load replica '{}' success, <durable, "
                                                   "commit> = <{}, {}>, last_prepared_decree = {}",
                                                   r->get_gpid(),
@@ -660,17 +663,17 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
                                                rps[r->get_gpid()]->dir());
 
                                          rps[r->get_gpid()] = r;
-                                     }
-                                 },
-                                 load_tasks.size()));
-        load_tasks.back()->enqueue();
+                                     },
+                                     load_tasks.size()));
+            load_tasks.back()->enqueue();
+        }
     }
     for (auto &tsk : load_tasks) {
         tsk->wait();
     }
     uint64_t finish_time = dsn_now_ms();
 
-    dir_list.clear();
+    dirs_by_dn.clear();
     load_tasks.clear();
     LOG_INFO("load replicas succeed, replica_count = {}, time_used = {} ms",
              rps.size(),
@@ -1375,12 +1378,7 @@ void replica_stub::get_replica_info(replica_info &info, replica_ptr r)
     info.last_committed_decree = r->last_committed_decree();
     info.last_prepared_decree = r->last_prepared_decree();
     info.last_durable_decree = r->last_durable_decree();
-
-    dsn::error_code err = _fs_manager.get_disk_tag(r->dir(), info.disk_tag);
-    if (dsn::ERR_OK != err) {
-        LOG_WARNING("get disk tag of {} failed: {}", r->dir(), err);
-    }
-
+    info.disk_tag = r->get_dir_node()->tag;
     info.__set_manual_compact_status(r->get_manual_compact_status());
 }
 
@@ -2082,7 +2080,7 @@ void replica_stub::open_replica(
                  _primary_address_str,
                  group_check ? "with" : "without",
                  dir);
-        rep = load_replica(dir.c_str());
+        rep = load_replica(dn, dir.c_str());
 
         // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk
         // migration
@@ -2107,7 +2105,7 @@ void replica_stub::open_replica(
                 boost::replace_first(
                     origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, "");
                 dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir);
-                rep = load_replica(origin_dir.c_str());
+                rep = load_replica(origin_dn, origin_dir.c_str());
 
                 FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> void {});
             }
@@ -2220,12 +2218,11 @@ replica *replica_stub::new_replica(gpid gpid,
     }
     const auto &dir = dn->replica_dir(app.app_type, gpid);
     CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
-    auto *rep =
-        new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
+    auto *rep = new replica(this, gpid, app, dn, restore_if_necessary, is_duplication_follower);
     error_code err;
     if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) {
         LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err);
-        clear_on_failure(rep, dir, gpid);
+        clear_on_failure(rep);
         return nullptr;
     }
 
@@ -2235,14 +2232,14 @@ replica *replica_stub::new_replica(gpid gpid,
                   "previous detail error log",
                   rep->name(),
                   err);
-        clear_on_failure(rep, dir, gpid);
+        clear_on_failure(rep);
         return nullptr;
     }
 
     err = rep->initialize_on_new();
     if (err != ERR_OK) {
         LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err);
-        clear_on_failure(rep, dir, gpid);
+        clear_on_failure(rep);
         return nullptr;
     }
 
@@ -2250,7 +2247,7 @@ replica *replica_stub::new_replica(gpid gpid,
     return rep;
 }
 
-replica *replica_stub::load_replica(const char *dir)
+replica *replica_stub::load_replica(dir_node *dn, const char *dir)
 {
     FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; });
 
@@ -2296,7 +2293,9 @@ replica *replica_stub::load_replica(const char *dir)
         return nullptr;
     }
 
-    auto *rep = new replica(this, pid, info, dir, false);
+    // The replica's directory must exists when creating a replica.
+    CHECK_EQ(dir, dn->replica_dir(app_type, pid));
+    auto *rep = new replica(this, pid, info, dn, false);
     err = rep->initialize_on_load();
     if (err != ERR_OK) {
         LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
@@ -2318,14 +2317,17 @@ replica *replica_stub::load_replica(const char *dir)
     return rep;
 }
 
-void replica_stub::clear_on_failure(replica *rep, const std::string &path, const gpid &pid)
+void replica_stub::clear_on_failure(replica *rep)
 {
+    const auto rep_dir = rep->dir();
+    const auto pid = rep->get_gpid();
+
     rep->close();
     delete rep;
     rep = nullptr;
 
     // clear work on failure
-    utils::filesystem::remove_path(path);
+    utils::filesystem::remove_path(rep_dir);
     _fs_manager.remove_replica(pid);
 }
 
@@ -2954,14 +2956,17 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
                                                             app_info *app,
                                                             const std::string &parent_dir)
 {
-    FAIL_POINT_INJECT_F("replica_stub_create_child_replica_if_not_found",
-                        [=](dsn::string_view) -> replica_ptr {
-                            replica *rep = new replica(this, child_pid, *app, "./", false);
-                            rep->_config.status = partition_status::PS_INACTIVE;
-                            _replicas.insert(replicas::value_type(child_pid, rep));
-                            LOG_INFO("mock create_child_replica_if_not_found succeed");
-                            return rep;
-                        });
+    FAIL_POINT_INJECT_F(
+        "replica_stub_create_child_replica_if_not_found", [=](dsn::string_view) -> replica_ptr {
+            const auto dn =
+                _fs_manager.create_child_replica_dir(app->app_type, child_pid, parent_dir);
+            CHECK_NOTNULL(dn, "");
+            auto *rep = new replica(this, child_pid, *app, dn, false);
+            rep->_config.status = partition_status::PS_INACTIVE;
+            _replicas.insert(replicas::value_type(child_pid, rep));
+            LOG_INFO("mock create_child_replica_if_not_found succeed");
+            return rep;
+        });
 
     zauto_write_lock l(_replicas_lock);
     auto it = _replicas.find(child_pid);
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index db517912c..c6aa984f9 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -339,10 +339,10 @@ private:
                          bool restore_if_necessary,
                          bool is_duplication_follower,
                          const std::string &parent_dir = "");
-    // Load an existing replica from 'dir'.
-    replica *load_replica(const char *dir);
+    // Load an existing replica which is located in 'dn' with 'dir' directory.
+    replica *load_replica(dir_node *dn, const char *dir);
     // Clean up the memory state and on disk data if creating replica failed.
-    void clear_on_failure(replica *rep, const std::string &path, const gpid &pid);
+    void clear_on_failure(replica *rep);
     task_ptr begin_close_replica(replica_ptr r);
     void close_replica(replica_ptr r);
     void notify_replica_state_update(const replica_configuration &config, bool is_closing);
@@ -403,7 +403,6 @@ private:
     friend class replica_bulk_loader;
     friend class replica_split_manager;
     friend class replica_disk_migrator;
-
     friend class mock_replica_stub;
     friend class duplication_sync_timer;
     friend class duplication_sync_timer_test;
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 965b81ec6..104bd2c07 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -115,10 +115,10 @@ public:
     mock_replica(replica_stub *stub,
                  gpid gpid,
                  const app_info &app,
-                 const char *dir,
+                 dir_node *dn,
                  bool need_restore = false,
                  bool is_duplication_follower = false)
-        : replica(stub, gpid, app, dir, need_restore, is_duplication_follower)
+        : replica(stub, gpid, app, dn, need_restore, is_duplication_follower)
     {
         _app = std::make_unique<replication::mock_replication_app_base>(this);
     }
@@ -232,26 +232,22 @@ private:
 typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;
 
 inline std::unique_ptr<mock_replica>
-create_mock_replica(replica_stub *stub, int appid = 1, int partition_index = 1)
+create_mock_replica(replica_stub *stub, int app_id = 1, int partition_index = 1)
 {
-    gpid gpid(appid, partition_index);
+    gpid pid(app_id, partition_index);
     app_info app_info;
     app_info.app_type = "replica";
     app_info.app_name = "temp";
 
-    const auto *const dn =
-        stub->get_fs_manager()->create_replica_dir_if_necessary(app_info.app_type, gpid);
+    auto *dn = stub->get_fs_manager()->create_replica_dir_if_necessary(app_info.app_type, pid);
     CHECK_NOTNULL(dn, "");
-    const auto replica_path = dn->replica_dir(app_info.app_type, gpid);
-    CHECK(
-        dsn::utils::filesystem::directory_exists(replica_path), "dir({}) not exist", replica_path);
-    return std::make_unique<mock_replica>(stub, gpid, app_info, replica_path.c_str());
+    return std::make_unique<mock_replica>(stub, pid, app_info, dn);
 }
 
 class mock_replica_stub : public replica_stub
 {
 public:
-    mock_replica_stub() { _fs_manager.initialize({"./"}, {"tag"}); }
+    mock_replica_stub() { _fs_manager.initialize({"test_dir"}, {"test"}); }
 
     ~mock_replica_stub() override = default;
 
@@ -290,15 +286,20 @@ public:
                          partition_status::type status = partition_status::PS_INACTIVE,
                          ballot b = 5,
                          bool need_restore = false,
-                         bool is_duplication_follower = false)
+                         bool is_duplication_follower = false,
+                         dir_node *dn = nullptr)
     {
         replica_configuration config;
         config.ballot = b;
         config.pid = pid;
         config.status = status;
 
+        if (dn == nullptr) {
+            dn = _fs_manager.create_replica_dir_if_necessary(info.app_type, pid);
+        }
+        CHECK_NOTNULL(dn, "");
         mock_replica_ptr rep =
-            new mock_replica(this, pid, info, "./", need_restore, is_duplication_follower);
+            new mock_replica(this, pid, info, dn, need_restore, is_duplication_follower);
         rep->set_replica_config(config);
         _replicas[pid] = rep;
 
@@ -319,35 +320,46 @@ public:
 
         auto dn = _fs_manager.create_replica_dir_if_necessary(info.app_type, pid);
         CHECK_NOTNULL(dn, "");
-        const auto &dir = dn->replica_dir(info.app_type, pid);
-        CHECK(dsn::utils::filesystem::directory_exists(dir), "dir({}) not exist", dir);
-        auto *rep =
-            new mock_replica(this, pid, info, dir.c_str(), need_restore, is_duplication_follower);
+        auto *rep = new mock_replica(this, pid, info, dn, need_restore, is_duplication_follower);
         rep->set_replica_config(config);
         return rep;
     }
 
-    void generate_replicas_base_dir_nodes_for_app(app_info mock_app,
-                                                  int primary_count_for_disk = 1,
-                                                  int secondary_count_for_disk = 2)
+    void generate_replicas_base_dir_nodes_for_app(const app_info &ai,
+                                                  int primary_count_per_disk,
+                                                  int secondary_count_per_disk)
     {
-        const auto &dir_nodes = _fs_manager._dir_nodes;
-        for (auto &dir_node : dir_nodes) {
-            const auto &replica_iter = dir_node->holding_replicas.find(mock_app.app_id);
-            if (replica_iter == dir_node->holding_replicas.end()) {
-                continue;
+        int partition_index = 0;
+        for (const auto &dn : _fs_manager.get_dir_nodes()) {
+            // Create 'partition_count' count of replicas.
+            if (partition_index >= ai.partition_count) {
+                break;
             }
-            const std::set<gpid> &pids = replica_iter->second;
-            int primary_count = primary_count_for_disk;
-            int secondary_count = secondary_count_for_disk;
-            for (const gpid &pid : pids) {
-                // generate primary replica and secondary replica.
+            int replica_count_per_disk = primary_count_per_disk + secondary_count_per_disk;
+            int primary_count = primary_count_per_disk;
+            int secondary_count = secondary_count_per_disk;
+            // Create 'replica_count_per_disk' count of replicas on 'dn'.
+            while (replica_count_per_disk-- > 0) {
+                gpid new_gpid(ai.app_id, partition_index++);
+                _fs_manager.specify_dir_for_new_replica_for_test(dn.get(), ai.app_type, new_gpid);
                 if (primary_count-- > 0) {
-                    add_replica(generate_replica_ptr(
-                        mock_app, pid, partition_status::PS_PRIMARY, mock_app.app_id));
+                    // Create 'primary_count' count of primary replicas on 'dn'.
+                    add_replica(generate_replica_ptr(ai,
+                                                     new_gpid,
+                                                     partition_status::PS_PRIMARY,
+                                                     ai.app_id,
+                                                     false,
+                                                     false,
+                                                     dn.get()));
                 } else if (secondary_count-- > 0) {
-                    add_replica(generate_replica_ptr(
-                        mock_app, pid, partition_status::PS_SECONDARY, mock_app.app_id));
+                    // Create 'secondary_count' count of secondary replicas on 'dn'.
+                    add_replica(generate_replica_ptr(ai,
+                                                     new_gpid,
+                                                     partition_status::PS_SECONDARY,
+                                                     ai.app_id,
+                                                     false,
+                                                     false,
+                                                     dn.get()));
                 }
             }
         }
diff --git a/src/replica/test/replica_disk_migrate_test.cpp b/src/replica/test/replica_disk_migrate_test.cpp
index 7c47abad2..024b039ce 100644
--- a/src/replica/test/replica_disk_migrate_test.cpp
+++ b/src/replica/test/replica_disk_migrate_test.cpp
@@ -176,7 +176,7 @@ TEST_F(replica_disk_migrate_test, migrate_disk_replica_check)
     auto &request = *fake_migrate_rpc.mutable_request();
     auto &response = fake_migrate_rpc.response();
 
-    request.pid = dsn::gpid(app_info_1.app_id, 1);
+    request.pid = dsn::gpid(app_info_1.app_id, 0);
     request.origin_disk = "tag_1";
     request.target_disk = "tag_2";
 
diff --git a/src/replica/test/replica_disk_test.cpp b/src/replica/test/replica_disk_test.cpp
index 78f2dfd72..ce7441199 100644
--- a/src/replica/test/replica_disk_test.cpp
+++ b/src/replica/test/replica_disk_test.cpp
@@ -94,18 +94,17 @@ TEST_F(replica_disk_test, on_query_disk_info_all_app)
     ASSERT_EQ(disk_infos.size(), 6);
 
     int info_size = disk_infos.size();
-    int app_id_1_partition_index = 1;
-    int app_id_2_partition_index = 1;
+    int app_id_1_partition_index = 0;
+    int app_id_2_partition_index = 0;
     for (int i = 0; i < info_size; i++) {
-        if (disk_infos[i].tag == "tag_empty_1") {
+        if (disk_infos[i].holding_primary_replicas.empty() &&
+            disk_infos[i].holding_secondary_replicas.empty()) {
             continue;
         }
         ASSERT_EQ(disk_infos[i].tag, "tag_" + std::to_string(i + 1));
         ASSERT_EQ(disk_infos[i].full_dir, "./tag_" + std::to_string(i + 1));
         ASSERT_EQ(disk_infos[i].disk_capacity_mb, 500);
         ASSERT_EQ(disk_infos[i].disk_available_mb, (i + 1) * 50);
-        // `holding_primary_replicas` and `holding_secondary_replicas` is std::map<app_id,
-        // std::set<::dsn::gpid>>
         ASSERT_EQ(disk_infos[i].holding_primary_replicas.size(), 2);
         ASSERT_EQ(disk_infos[i].holding_secondary_replicas.size(), 2);
 
@@ -172,24 +171,21 @@ TEST_F(replica_disk_test, on_query_disk_info_one_app)
     request.app_name = app_info_1.app_name;
     stub->on_query_disk_info(fake_query_disk_rpc);
 
-    auto &disk_infos_with_app_1 = fake_query_disk_rpc.response().disk_infos;
-    int info_size = disk_infos_with_app_1.size();
-    for (int i = 0; i < info_size; i++) {
-        if (disk_infos_with_app_1[i].tag == "tag_empty_1") {
+    for (auto disk_info : fake_query_disk_rpc.response().disk_infos) {
+        if (disk_info.holding_primary_replicas.empty() &&
+            disk_info.holding_secondary_replicas.empty()) {
             continue;
         }
-        // `holding_primary_replicas` and `holding_secondary_replicas` is std::map<app_id,
-        // std::set<::dsn::gpid>>
-        ASSERT_EQ(disk_infos_with_app_1[i].holding_primary_replicas.size(), 1);
-        ASSERT_EQ(disk_infos_with_app_1[i].holding_secondary_replicas.size(), 1);
-        ASSERT_EQ(disk_infos_with_app_1[i].holding_primary_replicas[app_info_1.app_id].size(),
+        ASSERT_EQ(disk_info.holding_primary_replicas.size(), 1);
+        ASSERT_EQ(disk_info.holding_secondary_replicas.size(), 1);
+        ASSERT_EQ(disk_info.holding_primary_replicas[app_info_1.app_id].size(),
                   app_id_1_primary_count_for_disk);
-        ASSERT_EQ(disk_infos_with_app_1[i].holding_secondary_replicas[app_info_1.app_id].size(),
+        ASSERT_EQ(disk_info.holding_secondary_replicas[app_info_1.app_id].size(),
                   app_id_1_secondary_count_for_disk);
-        ASSERT_TRUE(disk_infos_with_app_1[i].holding_primary_replicas.find(app_info_2.app_id) ==
-                    disk_infos_with_app_1[i].holding_primary_replicas.end());
-        ASSERT_TRUE(disk_infos_with_app_1[i].holding_secondary_replicas.find(app_info_2.app_id) ==
-                    disk_infos_with_app_1[i].holding_secondary_replicas.end());
+        ASSERT_TRUE(disk_info.holding_primary_replicas.find(app_info_2.app_id) ==
+                    disk_info.holding_primary_replicas.end());
+        ASSERT_TRUE(disk_info.holding_secondary_replicas.find(app_info_2.app_id) ==
+                    disk_info.holding_secondary_replicas.end());
     }
 }
 
@@ -247,16 +243,16 @@ TEST_F(replica_disk_test, disk_status_test)
               {disk_status::SPACE_INSUFFICIENT, disk_status::NORMAL}};
     auto dn = stub->get_fs_manager()->get_dir_nodes()[0];
     for (const auto &test : tests) {
-        update_node_status(dn, test.old_status, test.new_status);
+        dn->status = test.new_status;
         for (const auto &pids_of_app : dn->holding_replicas) {
             for (const auto &pid : pids_of_app.second) {
                 replica_ptr rep = stub->get_replica(pid);
                 ASSERT_NE(nullptr, rep);
-                ASSERT_EQ(test.new_status, rep->disk_space_insufficient());
+                ASSERT_EQ(test.new_status, rep->get_dir_node()->status);
             }
         }
     }
-    update_node_status(dn, disk_status::NORMAL, disk_status::NORMAL);
+    dn->status = disk_status::NORMAL;
 }
 
 TEST_F(replica_disk_test, add_new_disk_test)
diff --git a/src/replica/test/replica_disk_test_base.h b/src/replica/test/replica_disk_test_base.h
index 2bb37817f..08e7ffc39 100644
--- a/src/replica/test/replica_disk_test_base.h
+++ b/src/replica/test/replica_disk_test_base.h
@@ -39,16 +39,23 @@ public:
     //  tag_5            100*5             50*5              50%
     //  total            2500              750               30%
     // replica info, for example:
-    //   dir_node             primary/secondary
-    //
-    //   tag_empty_1
-    //   tag_1                1.1 | 1.2,1.3
-    //                        2.1,2.2 | 2.3,2.4,2.5,2.6
-    //
-    //   tag_2                1.4 | 1.5,1.6
-    //                        2.7,2.8 | 2.9,2.10,2.11,2.12,2.13
-    //            ...
-    //            ...
+    //   dir_node             primary    | secondary
+    //   --------------------------------+---------------------------
+    //   tag_1                1.0        | 1.1 1.2
+    //                        2.0 2.1    | 2.2 2.3 2.4 2.5
+    //   --------------------------------+---------------------------
+    //   tag_2                1.3        | 1.4 1.5
+    //                        2.6 2.7    | 2.8 2.9 2.10 2.11
+    //   --------------------------------+---------------------------
+    //   tag_3                1.6        | 1.7 1.8
+    //                        2.12 2.13  | 2.14 2.15 2.16 2.17
+    //   --------------------------------+---------------------------
+    //   tag_4                           |
+    //   --------------------------------+---------------------------
+    //   tag_5                           |
+    //   --------------------------------+---------------------------
+    //   tag_empty_1                     |
+    //   --------------------------------+---------------------------
     replica_disk_test_base()
     {
         fail::setup();
@@ -93,22 +100,6 @@ public:
         }
     }
 
-    void update_node_status(const std::shared_ptr<dir_node> &dn,
-                            disk_status::type old_status,
-                            disk_status::type new_status)
-    {
-        for (const auto &pids_of_app : dn->holding_replicas) {
-            for (const auto &pid : pids_of_app.second) {
-                replica_ptr rep = stub->get_replica(pid);
-                ASSERT_NE(nullptr, rep);
-                rep->set_disk_status(new_status);
-            }
-        }
-        if (old_status != new_status) {
-            dn->status = new_status;
-        }
-    }
-
     void prepare_before_add_new_disk_test(const std::string &create_dir,
                                           const std::string &check_rw)
     {
@@ -169,14 +160,6 @@ private:
 
     void generate_mock_dir_nodes(int num)
     {
-        int app_id_1_disk_holding_replica_count =
-            app_id_1_primary_count_for_disk + app_id_1_secondary_count_for_disk;
-        int app_id_2_disk_holding_replica_count =
-            app_id_2_primary_count_for_disk + app_id_2_secondary_count_for_disk;
-
-        int app_id_1_partition_index = 1;
-        int app_id_2_partition_index = 1;
-
         int64_t disk_capacity_mb = num * 100;
         int count = 0;
         while (count++ < num) {
@@ -194,18 +177,6 @@ private:
                 node_disk->full_dir); // open replica need the options
             utils::filesystem::create_directory(node_disk->full_dir);
 
-            int app_1_replica_count_per_disk = app_id_1_disk_holding_replica_count;
-            while (app_1_replica_count_per_disk-- > 0) {
-                node_disk->holding_replicas[app_info_1.app_id].emplace(
-                    gpid(app_info_1.app_id, app_id_1_partition_index++));
-            }
-
-            int app_2_replica_count_per_disk = app_id_2_disk_holding_replica_count;
-            while (app_2_replica_count_per_disk-- > 0) {
-                node_disk->holding_replicas[app_info_2.app_id].emplace(
-                    gpid(app_info_2.app_id, app_id_2_partition_index++));
-            }
-
             stub->_fs_manager._dir_nodes.emplace_back(node_disk);
         }
     }
diff --git a/src/replica/test/replica_learn_test.cpp b/src/replica/test/replica_learn_test.cpp
index 5f937e24e..8ff02993a 100644
--- a/src/replica/test/replica_learn_test.cpp
+++ b/src/replica/test/replica_learn_test.cpp
@@ -23,6 +23,7 @@
 #include <string>
 #include <utility>
 
+#include "common/fs_manager.h"
 #include "common/gpid.h"
 #include "common/replication_other_types.h"
 #include "consensus_types.h"
@@ -31,6 +32,7 @@
 #include "replica/duplication/test/duplication_test_base.h"
 #include "replica/prepare_list.h"
 #include "replica/replica_context.h"
+#include "utils/fmt_logging.h"
 
 namespace dsn {
 namespace replication {
@@ -44,11 +46,14 @@ public:
 
     std::unique_ptr<mock_replica> create_duplicating_replica()
     {
-        gpid gpid(1, 1);
-        app_info app_info;
-        app_info.app_type = "replica";
-        app_info.duplicating = true;
-        auto r = std::make_unique<mock_replica>(stub.get(), gpid, app_info, "./");
+        gpid pid(1, 0);
+        app_info ai;
+        ai.app_type = "replica";
+        ai.duplicating = true;
+
+        dir_node *dn = stub->get_fs_manager()->find_best_dir_for_new_replica(pid);
+        CHECK_NOTNULL(dn, "");
+        auto r = std::make_unique<mock_replica>(stub.get(), pid, ai, dn);
         r->as_primary();
         return r;
     }
diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp
index 8e09a8e03..45ca0eab3 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <gmock/gmock-matchers.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
@@ -23,6 +24,7 @@
 #include <iostream>
 #include <map>
 #include <memory>
+#include <set>
 #include <string>
 #include <unordered_map>
 #include <utility>
@@ -34,6 +36,7 @@
 #include "common/gpid.h"
 #include "common/replica_envs.h"
 #include "common/replication.codes.h"
+#include "common/replication_other_types.h"
 #include "consensus_types.h"
 #include "dsn.layer2_types.h"
 #include "http/http_server.h"
@@ -61,6 +64,7 @@
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/string_conv.h"
+#include "utils/test_macros.h"
 
 namespace dsn {
 namespace replication {
@@ -71,7 +75,7 @@ class replica_test : public replica_test_base
 {
 public:
     replica_test()
-        : pid(gpid(2, 1)),
+        : _pid(gpid(2, 1)),
           _backup_id(dsn_now_ms()),
           _provider_name("local_service"),
           _policy_name("mock_policy")
@@ -83,7 +87,8 @@ public:
         FLAGS_enable_http_server = false;
         stub->install_perf_counters();
         mock_app_info();
-        _mock_replica = stub->generate_replica_ptr(_app_info, pid, partition_status::PS_PRIMARY, 1);
+        _mock_replica =
+            stub->generate_replica_ptr(_app_info, _pid, partition_status::PS_PRIMARY, 1);
 
         // set FLAGS_cold_backup_root manually.
         // FLAGS_cold_backup_root is set by configuration "replication.cold_backup_root",
@@ -153,7 +158,7 @@ public:
     void test_on_cold_backup(const std::string user_specified_path = "")
     {
         backup_request req;
-        req.pid = pid;
+        req.pid = _pid;
         policy_info backup_policy_info;
         backup_policy_info.__set_backup_provider_type(_provider_name);
         backup_policy_info.__set_policy_name(_policy_name);
@@ -205,10 +210,10 @@ public:
 
     bool is_checkpointing() { return _mock_replica->_is_manual_emergency_checkpointing; }
 
-    bool has_gpid(gpid &gpid) const
+    bool has_gpid(gpid &pid) const
     {
-        for (const auto &node : stub->_fs_manager._dir_nodes) {
-            if (node->has(gpid)) {
+        for (const auto &node : stub->_fs_manager.get_dir_nodes()) {
+            if (node->has(pid)) {
                 return true;
             }
         }
@@ -251,7 +256,7 @@ public:
 
 public:
     dsn::app_info _app_info;
-    dsn::gpid pid;
+    dsn::gpid _pid;
     mock_replica_ptr _mock_replica;
 
 private:
@@ -274,7 +279,7 @@ TEST_F(replica_test, write_size_limited)
     write_request->io_session = sim_net->create_client_session(rpc_address());
 
     for (int i = 0; i < count; i++) {
-        stub->on_client_write(pid, write_request);
+        stub->on_client_write(_pid, write_request);
     }
 
     ASSERT_EQ(get_write_size_exceed_threshold_count(), count);
@@ -402,6 +407,7 @@ TEST_F(replica_test, update_allow_ingest_behind_test)
 
 TEST_F(replica_test, test_replica_backup_and_restore)
 {
+    // TODO(yingchun): this test last too long time, optimize it!
     test_on_cold_backup();
     auto err = test_find_valid_checkpoint();
     ASSERT_EQ(ERR_OK, err);
@@ -409,6 +415,7 @@ TEST_F(replica_test, test_replica_backup_and_restore)
 
 TEST_F(replica_test, test_replica_backup_and_restore_with_specific_path)
 {
+    // TODO(yingchun): this test last too long time, optimize it!
     std::string user_specified_path = "test/backup";
     test_on_cold_backup(user_specified_path);
     auto err = test_find_valid_checkpoint(user_specified_path);
@@ -462,42 +469,54 @@ TEST_F(replica_test, test_query_last_checkpoint_info)
     _mock_replica->set_last_committed_decree(200);
     _mock_replica->on_query_last_checkpoint(resp);
     ASSERT_EQ(resp.last_committed_decree, 200);
-    ASSERT_EQ(resp.base_local_dir, "./data/checkpoint.100");
+    ASSERT_STR_CONTAINS(resp.base_local_dir, "/data/checkpoint.100");
 }
 
 TEST_F(replica_test, test_clear_on_failure)
 {
+    // Clear up the remaining state.
+    auto *dn = stub->get_fs_manager()->find_replica_dir(_app_info.app_type, _pid);
+    if (dn != nullptr) {
+        dsn::utils::filesystem::remove_path(dn->replica_dir(_app_info.app_type, _pid));
+    }
+    dn->holding_replicas.clear();
+
     // Disable failure detector to avoid connecting with meta server which is not started.
     FLAGS_fd_disabled = true;
 
     replica *rep =
-        stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
+        stub->generate_replica(_app_info, _pid, partition_status::PS_PRIMARY, 1, false, true);
     auto path = rep->dir();
-    dsn::utils::filesystem::create_directory(path);
-    ASSERT_TRUE(has_gpid(pid));
+    ASSERT_TRUE(has_gpid(_pid));
 
-    stub->clear_on_failure(rep, path, pid);
+    stub->clear_on_failure(rep);
 
     ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
-    ASSERT_FALSE(has_gpid(pid));
+    ASSERT_FALSE(has_gpid(_pid));
 }
 
 TEST_F(replica_test, test_auto_trash)
 {
+    // Clear up the remaining state.
+    auto *dn = stub->get_fs_manager()->find_replica_dir(_app_info.app_type, _pid);
+    if (dn != nullptr) {
+        dsn::utils::filesystem::remove_path(dn->replica_dir(_app_info.app_type, _pid));
+    }
+    dn->holding_replicas.clear();
+
     // Disable failure detector to avoid connecting with meta server which is not started.
     FLAGS_fd_disabled = true;
 
     replica *rep =
-        stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
+        stub->generate_replica(_app_info, _pid, partition_status::PS_PRIMARY, 1, false, true);
     auto path = rep->dir();
-    dsn::utils::filesystem::create_directory(path);
-    ASSERT_TRUE(has_gpid(pid));
+    ASSERT_TRUE(has_gpid(_pid));
 
     rep->handle_local_failure(ERR_RDB_CORRUPTION);
     stub->wait_closing_replicas_finished();
 
     ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
-    dir_node *dn = stub->get_fs_manager()->get_dir_node(path);
+    dn = stub->get_fs_manager()->get_dir_node(path);
     ASSERT_NE(dn, nullptr);
     std::vector<std::string> subs;
     ASSERT_TRUE(dsn::utils::filesystem::get_subdirectories(dn->full_dir, subs, false));
@@ -516,7 +535,7 @@ TEST_F(replica_test, test_auto_trash)
         }
     }
     ASSERT_TRUE(found);
-    ASSERT_FALSE(has_gpid(pid));
+    ASSERT_FALSE(has_gpid(_pid));
 }
 
 TEST_F(replica_test, update_deny_client_test)
diff --git a/src/server/test/hotkey_collector_test.cpp b/src/server/test/hotkey_collector_test.cpp
index c331b3cac..7b784a6d2 100644
--- a/src/server/test/hotkey_collector_test.cpp
+++ b/src/server/test/hotkey_collector_test.cpp
@@ -37,6 +37,7 @@
 #include "server/test/message_utils.h"
 #include "utils/error_code.h"
 #include "utils/flags.h"
+#include "utils/fmt_logging.h"
 #include "utils/rand.h"
 
 namespace dsn {
@@ -221,7 +222,7 @@ TEST_F(fine_collector_test, fine_collector)
 class hotkey_collector_test : public pegasus_server_test_base
 {
 public:
-    hotkey_collector_test() { start(); }
+    hotkey_collector_test() { CHECK_EQ(::dsn::ERR_OK, start()); }
 
     std::shared_ptr<pegasus::server::hotkey_collector> get_read_collector()
     {
diff --git a/src/server/test/pegasus_server_test_base.h b/src/server/test/pegasus_server_test_base.h
index a5a530e41..7b1fe225d 100644
--- a/src/server/test/pegasus_server_test_base.h
+++ b/src/server/test/pegasus_server_test_base.h
@@ -23,6 +23,7 @@
 
 #include <gtest/gtest.h>
 #include <gmock/gmock.h>
+#include "common/fs_manager.h"
 #include "replica/replica_stub.h"
 #include "utils/filesystem.h"
 
@@ -44,16 +45,22 @@ public:
     pegasus_server_test_base()
     {
         // Remove rdb to prevent rocksdb recovery from last test.
-        dsn::utils::filesystem::remove_path("./data/rdb");
+        dsn::utils::filesystem::remove_path("./test_dir");
         _replica_stub = new dsn::replication::replica_stub();
-        _replica_stub->get_fs_manager()->initialize({"./"}, {"test_tag"});
+        _replica_stub->get_fs_manager()->initialize({"test_dir"}, {"test_tag"});
 
         _gpid = dsn::gpid(100, 1);
         dsn::app_info app_info;
         app_info.app_type = "pegasus";
 
-        _replica =
-            new dsn::replication::replica(_replica_stub, _gpid, app_info, "./", false, false);
+        auto *dn = _replica_stub->get_fs_manager()->find_best_dir_for_new_replica(_gpid);
+        CHECK_NOTNULL(dn, "");
+        _replica = new dsn::replication::replica(_replica_stub, _gpid, app_info, dn, false, false);
+        const auto dir_data = dsn::utils::filesystem::path_combine(_replica->dir(), "data");
+        CHECK(dsn::utils::filesystem::create_directory(dir_data),
+              "create data dir {} failed",
+              dir_data);
+
         _server = std::make_unique<mock_pegasus_server_impl>(_replica);
     }
 
diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp
index f0ee6a8e4..24d88e656 100644
--- a/src/server/test/rocksdb_wrapper_test.cpp
+++ b/src/server/test/rocksdb_wrapper_test.cpp
@@ -25,18 +25,21 @@
 #include <string>
 #include <utility>
 
+#include "common/fs_manager.h"
 #include "dsn.layer2_types.h"
 #include "pegasus_key_schema.h"
 #include "pegasus_server_test_base.h"
 #include "pegasus_utils.h"
 #include "pegasus_value_schema.h"
 #include "replica/replica.h"
+#include "replica/replica_stub.h"
 #include "server/pegasus_server_write.h"
 #include "server/pegasus_write_service.h"
 #include "server/pegasus_write_service_impl.h"
 #include "server/rocksdb_wrapper.h"
 #include "utils/blob.h"
 #include "utils/error_code.h"
+#include "utils/fmt_logging.h"
 #include "utils/string_view.h"
 
 namespace pegasus {
@@ -83,8 +86,9 @@ public:
         app_info.app_type = "pegasus";
         app_info.duplicating = true;
 
-        _replica =
-            new dsn::replication::replica(_replica_stub, _gpid, app_info, "./", false, false);
+        auto *dn = _replica_stub->get_fs_manager()->find_best_dir_for_new_replica(_gpid);
+        CHECK_NOTNULL(dn, "");
+        _replica = new dsn::replication::replica(_replica_stub, _gpid, app_info, dn, false, false);
         _server = std::make_unique<mock_pegasus_server_impl>(_replica);
 
         SetUp();
diff --git a/src/utils/test_macros.h b/src/utils/test_macros.h
index 6f9c88973..c1c9c3fd4 100644
--- a/src/utils/test_macros.h
+++ b/src/utils/test_macros.h
@@ -33,3 +33,9 @@
             return;                                                                                \
         }                                                                                          \
     } while (0)
+
+// Substring matches.
+#define ASSERT_STR_CONTAINS(str, substr) ASSERT_THAT(str, testing::HasSubstr(substr))
+
+#define ASSERT_STR_NOT_CONTAINS(str, substr)                                                       \
+    ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org