You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/17 09:15:40 UTC

[incubator-pegasus] branch master updated: refactor: minor refactor on class fs_manager (#1476)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b9fba392 refactor: minor refactor on class fs_manager (#1476)
6b9fba392 is described below

commit 6b9fba3928071e5b9ecf8e19790843a0b0a77ba9
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Wed May 17 17:15:34 2023 +0800

    refactor: minor refactor on class fs_manager (#1476)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    This is a minor refactor work on class fs_manager, including:
    - use `uint64_t` instead of `unsigned` in fs_manager module.
    - remove useless "test" parameters.
---
 src/common/fs_manager.cpp           | 135 +++++++++++++++++++-----------------
 src/common/fs_manager.h             |  28 ++++----
 src/common/test/fs_manager_test.cpp |  17 +++++
 src/meta/test/misc/misc.cpp         |   4 +-
 src/replica/replica_stub.cpp        |   5 +-
 5 files changed, 105 insertions(+), 84 deletions(-)

diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 5a460a896..2ba683da4 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -61,36 +61,39 @@ DSN_DEFINE_int32(replication,
                  "space insufficient");
 DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);
 
-unsigned dir_node::replicas_count() const
+uint64_t dir_node::replicas_count() const
 {
-    unsigned sum = 0;
+    uint64_t sum = 0;
     for (const auto &s : holding_replicas) {
         sum += s.second.size();
     }
     return sum;
 }
 
-unsigned dir_node::replicas_count(app_id id) const
+uint64_t dir_node::replicas_count(app_id id) const
 {
     const auto iter = holding_replicas.find(id);
-    if (iter == holding_replicas.end())
+    if (iter == holding_replicas.end()) {
         return 0;
+    }
     return iter->second.size();
 }
 
 bool dir_node::has(const gpid &pid) const
 {
     auto iter = holding_replicas.find(pid.get_app_id());
-    if (iter == holding_replicas.end())
+    if (iter == holding_replicas.end()) {
         return false;
+    }
     return iter->second.find(pid) != iter->second.end();
 }
 
-unsigned dir_node::remove(const gpid &pid)
+uint64_t dir_node::remove(const gpid &pid)
 {
     auto iter = holding_replicas.find(pid.get_app_id());
-    if (iter == holding_replicas.end())
+    if (iter == holding_replicas.end()) {
         return 0;
+    }
     return iter->second.erase(pid);
 }
 
@@ -134,68 +137,67 @@ bool dir_node::update_disk_stat(const bool update_disk_status)
     return (old_status != new_status);
 }
 
-fs_manager::fs_manager(bool for_test)
+fs_manager::fs_manager()
 {
-    if (!for_test) {
-        _counter_total_capacity_mb.init_app_counter("eon.replica_stub",
-                                                    "disk.capacity.total(MB)",
+    _counter_total_capacity_mb.init_app_counter("eon.replica_stub",
+                                                "disk.capacity.total(MB)",
+                                                COUNTER_TYPE_NUMBER,
+                                                "total disk capacity in MB");
+    _counter_total_available_mb.init_app_counter("eon.replica_stub",
+                                                 "disk.available.total(MB)",
+                                                 COUNTER_TYPE_NUMBER,
+                                                 "total disk available in MB");
+    _counter_total_available_ratio.init_app_counter("eon.replica_stub",
+                                                    "disk.available.total.ratio",
                                                     COUNTER_TYPE_NUMBER,
-                                                    "total disk capacity in MB");
-        _counter_total_available_mb.init_app_counter("eon.replica_stub",
-                                                     "disk.available.total(MB)",
-                                                     COUNTER_TYPE_NUMBER,
-                                                     "total disk available in MB");
-        _counter_total_available_ratio.init_app_counter("eon.replica_stub",
-                                                        "disk.available.total.ratio",
-                                                        COUNTER_TYPE_NUMBER,
-                                                        "total disk available ratio");
-        _counter_min_available_ratio.init_app_counter("eon.replica_stub",
-                                                      "disk.available.min.ratio",
-                                                      COUNTER_TYPE_NUMBER,
-                                                      "minimal disk available ratio in all disks");
-        _counter_max_available_ratio.init_app_counter("eon.replica_stub",
-                                                      "disk.available.max.ratio",
-                                                      COUNTER_TYPE_NUMBER,
-                                                      "maximal disk available ratio in all disks");
-    }
+                                                    "total disk available ratio");
+    _counter_min_available_ratio.init_app_counter("eon.replica_stub",
+                                                  "disk.available.min.ratio",
+                                                  COUNTER_TYPE_NUMBER,
+                                                  "minimal disk available ratio in all disks");
+    _counter_max_available_ratio.init_app_counter("eon.replica_stub",
+                                                  "disk.available.max.ratio",
+                                                  COUNTER_TYPE_NUMBER,
+                                                  "maximal disk available ratio in all disks");
 }
 
 dir_node *fs_manager::get_dir_node(const std::string &subdir) const
 {
-    zauto_read_lock l(_lock);
     std::string norm_subdir;
     utils::filesystem::get_normalized_path(subdir, norm_subdir);
-    for (auto &n : _dir_nodes) {
-        // if input is a subdir of some dir_nodes
-        const std::string &d = n->full_dir;
-        if (norm_subdir.compare(0, d.size(), d) == 0 &&
-            (norm_subdir.size() == d.size() || norm_subdir[d.size()] == '/')) {
-            return n.get();
+
+    zauto_read_lock l(_lock);
+    for (const auto &dn : _dir_nodes) {
+        // Check if 'subdir' is a sub-directory of 'dn'.
+        const std::string &full_dir = dn->full_dir;
+        if (full_dir.size() > norm_subdir.size()) {
+            continue;
+        }
+
+        if ((norm_subdir.size() == full_dir.size() || norm_subdir[full_dir.size()] == '/') &&
+            norm_subdir.compare(0, full_dir.size(), full_dir) == 0) {
+            return dn.get();
         }
     }
     return nullptr;
 }
 
-// size of the two vectors should be equal
-dsn::error_code fs_manager::initialize(const std::vector<std::string> &data_dirs,
-                                       const std::vector<std::string> &tags,
-                                       bool for_test)
+void fs_manager::initialize(const std::vector<std::string> &data_dirs,
+                            const std::vector<std::string> &data_dir_tags)
 {
-    // create all dir_nodes
-    CHECK_EQ(data_dirs.size(), tags.size());
+    CHECK_EQ(data_dirs.size(), data_dir_tags.size());
     for (unsigned i = 0; i < data_dirs.size(); ++i) {
         std::string norm_path;
         utils::filesystem::get_normalized_path(data_dirs[i], norm_path);
-        dir_node *n = new dir_node(tags[i], norm_path);
+        dir_node *n = new dir_node(data_dir_tags[i], norm_path);
         _dir_nodes.emplace_back(n);
-        LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, tags[i]);
+        LOG_INFO(
+            "{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, data_dir_tags[i]);
     }
     _available_data_dirs = data_dirs;
 
-    if (!for_test) {
-        update_disk_stat(false);
-    }
-    return dsn::ERR_OK;
+    // Update the disk statistics.
+    update_disk_stat();
 }
 
 dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &tag)
@@ -211,21 +213,26 @@ dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &ta
 
 void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
 {
-    dir_node *n = get_dir_node(pid_dir);
-    if (nullptr == n) {
+    const auto &dn = get_dir_node(pid_dir);
+    if (dsn_unlikely(nullptr == dn)) {
         LOG_ERROR(
             "{}: dir({}) of gpid({}) haven't registered", dsn_primary_address(), pid_dir, pid);
-    } else {
+        return;
+    }
+
+    bool emplace_success = false;
+    {
         zauto_write_lock l(_lock);
-        std::set<dsn::gpid> &replicas_for_app = n->holding_replicas[pid.get_app_id()];
-        auto result = replicas_for_app.emplace(pid);
-        if (!result.second) {
-            LOG_WARNING(
-                "{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, n->tag);
-        } else {
-            LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, n->tag);
-        }
+        auto &replicas_for_app = dn->holding_replicas[pid.get_app_id()];
+        emplace_success = replicas_for_app.emplace(pid).second;
+    }
+    if (!emplace_success) {
+        LOG_WARNING(
+            "{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, dn->tag);
+        return;
     }
+
+    LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
 }
 
 void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/ std::string &dir)
@@ -271,16 +278,16 @@ void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/
 void fs_manager::remove_replica(const gpid &pid)
 {
     zauto_write_lock l(_lock);
-    unsigned remove_count = 0;
-    for (auto &n : _dir_nodes) {
-        unsigned r = n->remove(pid);
+    uint64_t remove_count = 0;
+    for (auto &dn : _dir_nodes) {
+        uint64_t r = dn->remove(pid);
         CHECK_LE_MSG(remove_count + r,
                      1,
                      "gpid({}) found in dir({}), which was removed before",
                      pid,
-                     n->tag);
+                     dn->tag);
         if (r != 0) {
-            LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, n->tag);
+            LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, dn->tag);
         }
         remove_count += r;
     }
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 75427cc89..ebdb3640a 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -17,9 +17,9 @@
 
 #pragma once
 
+#include <gtest/gtest_prod.h>
 #include <stdint.h>
 #include <functional>
-#include <gtest/gtest_prod.h>
 #include <map>
 #include <memory>
 #include <set>
@@ -37,15 +37,14 @@ namespace dsn {
 class gpid;
 
 namespace replication {
-class replication_options;
 
 DSN_DECLARE_int32(disk_min_available_space_ratio);
 
 struct dir_node
 {
 public:
-    std::string tag;
-    std::string full_dir;
+    const std::string tag;
+    const std::string full_dir;
     int64_t disk_capacity_mb;
     int64_t disk_available_mb;
     int disk_available_ratio;
@@ -69,24 +68,24 @@ public:
           status(status_)
     {
     }
-    unsigned replicas_count(app_id id) const;
-    unsigned replicas_count() const;
+    // All functions are not thread-safe. However, they are only used in fs_manager
+    // and protected by the lock in fs_manager.
+    uint64_t replicas_count(app_id id) const;
+    uint64_t replicas_count() const;
     bool has(const dsn::gpid &pid) const;
-    unsigned remove(const dsn::gpid &pid);
+    uint64_t remove(const dsn::gpid &pid);
     bool update_disk_stat(const bool update_disk_status);
 };
 
 class fs_manager
 {
 public:
-    fs_manager(bool for_test);
-    ~fs_manager() {}
+    fs_manager();
 
-    // this should be called before open/load any replicas
-    dsn::error_code initialize(const replication_options &opts);
-    dsn::error_code initialize(const std::vector<std::string> &data_dirs,
-                               const std::vector<std::string> &tags,
-                               bool for_test);
+    // Should be called before open/load any replicas.
+    // NOTE: 'data_dirs' and 'data_dir_tags' must have the same size and in the same order.
+    void initialize(const std::vector<std::string> &data_dirs,
+                    const std::vector<std::string> &data_dir_tags);
 
     dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
     void allocate_dir(const dsn::gpid &pid,
@@ -148,6 +147,7 @@ private:
     friend class replica_disk_migrator;
     friend class replica_disk_test_base;
     friend class open_replica_test;
+    FRIEND_TEST(fs_manager, get_dir_node);
     FRIEND_TEST(replica_test, test_auto_trash);
 };
 } // replication
diff --git a/src/common/test/fs_manager_test.cpp b/src/common/test/fs_manager_test.cpp
index 2ccd579f1..d6bbcfa31 100644
--- a/src/common/test/fs_manager_test.cpp
+++ b/src/common/test/fs_manager_test.cpp
@@ -60,5 +60,22 @@ TEST(fs_manager, dir_update_disk_status)
     }
 }
 
+TEST(fs_manager, get_dir_node)
+{
+    fs_manager fm;
+    fm.initialize({"/data1"}, {"data1"});
+
+    ASSERT_EQ(nullptr, fm.get_dir_node(""));
+    ASSERT_EQ(nullptr, fm.get_dir_node("/"));
+
+    ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
+    ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
+    ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));
+
+    ASSERT_EQ(nullptr, fm.get_dir_node("/data2"));
+    ASSERT_EQ(nullptr, fm.get_dir_node("/data2/"));
+    ASSERT_EQ(nullptr, fm.get_dir_node("/data2/replica1"));
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp
index 5733dd651..aacf8c043 100644
--- a/src/meta/test/misc/misc.cpp
+++ b/src/meta/test/misc/misc.cpp
@@ -203,10 +203,10 @@ void generate_node_fs_manager(const app_mapper &apps,
     for (const auto &kv : nodes) {
         const node_state &ns = kv.second;
         if (nfm.find(ns.addr()) == nfm.end()) {
-            nfm.emplace(ns.addr(), std::make_shared<fs_manager>(true));
+            nfm.emplace(ns.addr(), std::make_shared<fs_manager>());
         }
         fs_manager &manager = *(nfm.find(ns.addr())->second);
-        manager.initialize(data_dirs, tags, true);
+        manager.initialize(data_dirs, tags);
         ns.for_each_partition([&](const dsn::gpid &pid) {
             const config_context &cc = *get_config_context(apps, pid);
             snprintf(pid_dir,
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index d5cf1f0bc..77a0d58f6 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -198,7 +198,6 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
       _mem_release_max_reserved_mem_percentage(10),
       _max_concurrent_bulk_load_downloading_count(5),
       _learn_app_concurrent_count(0),
-      _fs_manager(false),
       _bulk_load_downloading_count(0),
       _manual_emergency_checkpointing_count(0),
       _is_running(false)
@@ -858,9 +857,7 @@ void replica_stub::initialize_fs_manager(const std::vector<std::string> &data_di
 
     CHECK_GT_MSG(
         available_dirs.size(), 0, "initialize fs manager failed, no available data directory");
-    CHECK_EQ_MSG(_fs_manager.initialize(available_dirs, available_dir_tags, false),
-                 dsn::ERR_OK,
-                 "initialize fs manager failed");
+    _fs_manager.initialize(available_dirs, available_dir_tags);
 }
 
 void replica_stub::initialize_start()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org