You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/06/08 10:14:03 UTC

[incubator-pegasus] branch master updated: refactor: improve the single-responsibility of class fs_manager (2/n) (#1522)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new d216696e5 refactor: improve the single-responsibility of class fs_manager (2/n) (#1522)
d216696e5 is described below

commit d216696e53be46745dbdcc8c6ac80f1f24a83d96
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Thu Jun 8 18:13:56 2023 +0800

    refactor: improve the single-responsibility of class fs_manager (2/n) (#1522)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    This patch moves some functions to fs_manager which are more reasonable to be
    responsibilities of class fs_manager rather than those of other classes, includeing:
    - remove `fs_manager::for_each_dir_node`
    - minimize some locks
    - rename `fs_manager::is_dir_node_available` to `fs_manager::is_dir_node_exist`
    - move `get_disk_infos` code to class `fs_manager` and encapsulate it as a function
    - move `validate_migrate_op` code to class `fs_manager` and encapsulate it as a function
    - move `disk_status_to_error_code` from replica_2pc.cpp to class `fs_manager`
---
 src/common/fs_manager.cpp             | 146 +++++++++++++++++++++++++++++-----
 src/common/fs_manager.h               |  13 ++-
 src/replica/replica_2pc.cpp           |  15 ----
 src/replica/replica_disk_migrator.cpp |  60 ++------------
 src/replica/replica_stub.cpp          |  37 ++-------
 src/utils/filesystem.cpp              |   1 +
 6 files changed, 150 insertions(+), 122 deletions(-)

diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 3d3eda94f..6fed98132 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -36,6 +36,7 @@
 
 #include <algorithm>
 #include <cmath>
+#include <cstdint>
 #include <iosfwd>
 #include <utility>
 
@@ -44,8 +45,8 @@
 #include "fmt/core.h"
 #include "fmt/ostream.h"
 #include "perf_counter/perf_counter.h"
+#include "replica_admin_types.h"
 #include "runtime/api_layer1.h"
-#include "runtime/rpc/rpc_address.h"
 #include "utils/fail_point.h"
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
@@ -69,6 +70,19 @@ DSN_DEFINE_bool(replication,
                 true,
                 "true means ignore broken data disk when initialize");
 
+error_code disk_status_to_error_code(disk_status::type ds)
+{
+    switch (ds) {
+    case disk_status::SPACE_INSUFFICIENT:
+        return dsn::ERR_DISK_INSUFFICIENT;
+    case disk_status::IO_ERROR:
+        return dsn::ERR_DISK_IO_ERROR;
+    default:
+        CHECK_EQ(disk_status::NORMAL, ds);
+        return dsn::ERR_OK;
+    }
+}
+
 uint64_t dir_node::replicas_count() const
 {
     uint64_t sum = 0;
@@ -339,16 +353,6 @@ void fs_manager::remove_replica(const gpid &pid)
     }
 }
 
-bool fs_manager::for_each_dir_node(const std::function<bool(const dir_node &)> &func) const
-{
-    zauto_read_lock l(_lock);
-    for (auto &n : _dir_nodes) {
-        if (!func(*n))
-            return false;
-    }
-    return true;
-}
-
 void fs_manager::update_disk_stat()
 {
     zauto_write_lock l(_lock);
@@ -388,21 +392,25 @@ void fs_manager::update_disk_stat()
 
 void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag)
 {
-    zauto_write_lock l(_lock);
     std::string norm_path;
     utils::filesystem::get_normalized_path(data_dir, norm_path);
-    dir_node *n = new dir_node(tag, norm_path);
-    _dir_nodes.emplace_back(n);
-    LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);
+    auto dn = std::make_shared<dir_node>(tag, norm_path);
+
+    {
+        zauto_write_lock l(_lock);
+        _dir_nodes.emplace_back(dn);
+    }
+    LOG_INFO("add new data dir({}) and mark as tag({})", norm_path, tag);
 }
 
-bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::string &tag) const
+bool fs_manager::is_dir_node_exist(const std::string &data_dir, const std::string &tag) const
 {
+    std::string norm_path;
+    utils::filesystem::get_normalized_path(data_dir, norm_path);
+
     zauto_read_lock l(_lock);
-    for (const auto &dir_node : _dir_nodes) {
-        std::string norm_path;
-        utils::filesystem::get_normalized_path(data_dir, norm_path);
-        if (dir_node->full_dir == norm_path || dir_node->tag == tag) {
+    for (const auto &dn : _dir_nodes) {
+        if (dn->full_dir == norm_path || dn->tag == tag) {
             return true;
         }
     }
@@ -497,5 +505,103 @@ dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type,
     return child_dn;
 }
 
+std::vector<disk_info> fs_manager::get_disk_infos(int app_id) const
+{
+    std::vector<disk_info> disk_infos;
+    zauto_read_lock l(_lock);
+    for (const auto &dn : _dir_nodes) {
+        disk_info di;
+        // Query all app info if 'app_id' is 0, which is not a valid app id.
+        if (app_id == 0) {
+            di.holding_primary_replicas = dn->holding_primary_replicas;
+            di.holding_secondary_replicas = dn->holding_secondary_replicas;
+        } else {
+            const auto &primary_replicas = dn->holding_primary_replicas.find(app_id);
+            if (primary_replicas != dn->holding_primary_replicas.end()) {
+                di.holding_primary_replicas[app_id] = primary_replicas->second;
+            }
+
+            const auto &secondary_replicas = dn->holding_secondary_replicas.find(app_id);
+            if (secondary_replicas != dn->holding_secondary_replicas.end()) {
+                di.holding_secondary_replicas[app_id] = secondary_replicas->second;
+            }
+        }
+        di.tag = dn->tag;
+        di.full_dir = dn->full_dir;
+        di.disk_capacity_mb = dn->disk_capacity_mb;
+        di.disk_available_mb = dn->disk_available_mb;
+
+        disk_infos.emplace_back(std::move(di));
+    }
+
+    return disk_infos;
+}
+
+error_code fs_manager::validate_migrate_op(gpid pid,
+                                           const std::string &origin_disk,
+                                           const std::string &target_disk,
+                                           std::string &err_msg) const
+{
+    bool origin_disk_exist = false;
+    bool target_disk_exist = false;
+    zauto_read_lock l(_lock);
+    for (const auto &dn : _dir_nodes) {
+        // Check if the origin directory is valid.
+        if (dn->tag == origin_disk) {
+            CHECK_FALSE(origin_disk_exist);
+            if (!dn->has(pid)) {
+                err_msg = fmt::format(
+                    "replica({}) doesn't exist on the origin disk({})", pid, origin_disk);
+                return ERR_OBJECT_NOT_FOUND;
+            }
+
+            // It's OK to migrate a replica from a dir_node which is NORMAL or even
+            // SPACE_INSUFFICIENT, but not allowed when it's IO_ERROR.
+            if (dn->status == disk_status::IO_ERROR) {
+                err_msg = fmt::format(
+                    "replica({}) exists on an IO-Error origin disk({})", pid, origin_disk);
+                return ERR_DISK_IO_ERROR;
+            }
+
+            origin_disk_exist = true;
+        }
+
+        // Check if the target directory is valid.
+        if (dn->tag == target_disk) {
+            CHECK_FALSE(target_disk_exist);
+            if (dn->has(pid)) {
+                err_msg =
+                    fmt::format("replica({}) already exists on target disk({})", pid, target_disk);
+                return ERR_PATH_ALREADY_EXIST;
+            }
+
+            // It's not allowed to migrate a replica to a dir_node which is either
+            // SPACE_INSUFFICIENT or IO_ERROR.
+            if (dn->status == disk_status::SPACE_INSUFFICIENT ||
+                dn->status == disk_status::IO_ERROR) {
+                err_msg = fmt::format("replica({}) target disk({}) is {}",
+                                      pid,
+                                      origin_disk,
+                                      enum_to_string(dn->status));
+                return disk_status_to_error_code(dn->status);
+            }
+
+            target_disk_exist = true;
+        }
+    }
+
+    if (!origin_disk_exist) {
+        err_msg = fmt::format("origin disk({}) doesn't exist", origin_disk);
+        return ERR_OBJECT_NOT_FOUND;
+    }
+
+    if (!target_disk_exist) {
+        err_msg = fmt::format("target disk({}) doesn't exist", target_disk);
+        return ERR_OBJECT_NOT_FOUND;
+    }
+
+    return ERR_OK;
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index e4c51667d..107366987 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -20,7 +20,6 @@
 #include <gtest/gtest_prod.h>
 #include <stdint.h>
 #include <atomic>
-#include <functional>
 #include <map>
 #include <memory>
 #include <set>
@@ -30,6 +29,7 @@
 #include "common/replication_other_types.h"
 #include "metadata_types.h"
 #include "perf_counter/perf_counter_wrapper.h"
+#include "utils/error_code.h"
 #include "utils/flags.h"
 #include "utils/string_view.h"
 #include "utils/zlocks.h"
@@ -38,9 +38,12 @@ namespace dsn {
 class gpid;
 
 namespace replication {
+class disk_info;
 
 DSN_DECLARE_int32(disk_min_available_space_ratio);
 
+error_code disk_status_to_error_code(disk_status::type ds);
+
 struct dir_node
 {
 public:
@@ -116,16 +119,20 @@ public:
                                        gpid child_pid,
                                        const std::string &parent_dir);
     void remove_replica(const dsn::gpid &pid);
-    bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
     void update_disk_stat();
 
     void add_new_dir_node(const std::string &data_dir, const std::string &tag);
+    bool is_dir_node_exist(const std::string &data_dir, const std::string &tag) const;
     const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
     {
         zauto_read_lock l(_lock);
         return _dir_nodes;
     }
-    bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
+    error_code validate_migrate_op(gpid pid,
+                                   const std::string &origin_disk,
+                                   const std::string &target_disk,
+                                   std::string &err_msg) const;
+    std::vector<disk_info> get_disk_infos(int app_id) const;
 
 private:
     void reset_disk_stat()
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index 2a8e47787..e0cff3868 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -117,21 +117,6 @@ DSN_DEFINE_uint64(
 DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
 DSN_DECLARE_int32(staleness_for_commit);
 
-namespace {
-error_code disk_status_to_error_code(disk_status::type ds)
-{
-    switch (ds) {
-    case disk_status::SPACE_INSUFFICIENT:
-        return dsn::ERR_DISK_INSUFFICIENT;
-    case disk_status::IO_ERROR:
-        return dsn::ERR_DISK_IO_ERROR;
-    default:
-        CHECK_EQ(disk_status::NORMAL, ds);
-        return dsn::ERR_OK;
-    }
-}
-} // anonymous namespace
-
 void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
 {
     _checker.only_one_thread_access();
diff --git a/src/replica/replica_disk_migrator.cpp b/src/replica/replica_disk_migrator.cpp
index 1afc5c62e..e1fecba80 100644
--- a/src/replica/replica_disk_migrator.cpp
+++ b/src/replica/replica_disk_migrator.cpp
@@ -19,10 +19,6 @@
 
 #include <boost/algorithm/string/replace.hpp>
 #include <fmt/core.h>
-#include <fmt/ostream.h>
-#include <iosfwd>
-#include <memory>
-#include <vector>
 
 #include "common/fs_manager.h"
 #include "common/gpid.h"
@@ -126,56 +122,12 @@ bool replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc)
         return false;
     }
 
-    bool valid_origin_disk = false;
-    bool valid_target_disk = false;
-    // _dir_nodes: std::vector<std::shared_ptr<dir_node>>
-    // TODO(yingchun): skip disks which are SPACE_INSUFFICIENT or IO_ERROR.
-    for (const auto &dir_node : _replica->get_replica_stub()->_fs_manager._dir_nodes) {
-        if (dir_node->tag == req.origin_disk) {
-            valid_origin_disk = true;
-            if (!dir_node->has(req.pid)) {
-                std::string err_msg =
-                    fmt::format("Invalid replica(replica({}) doesn't exist on origin disk({}))",
-                                req.pid,
-                                req.origin_disk);
-                LOG_ERROR_PREFIX(
-                    "received replica disk migrate request(origin={}, target={}), err = {}",
-                    req.origin_disk,
-                    req.target_disk,
-                    err_msg);
-                resp.err = ERR_OBJECT_NOT_FOUND;
-                resp.__set_hint(err_msg);
-                return false;
-            }
-        }
-
-        if (dir_node->tag == req.target_disk) {
-            valid_target_disk = true;
-            if (dir_node->has(get_gpid())) {
-                std::string err_msg =
-                    fmt::format("Invalid replica(replica({}) has existed on target disk({}))",
-                                req.pid,
-                                req.target_disk);
-                LOG_ERROR_PREFIX(
-                    "received replica disk migrate request(origin={}, target={}), err = {}",
-                    req.origin_disk,
-                    req.target_disk,
-                    err_msg);
-                resp.err = ERR_PATH_ALREADY_EXIST;
-                resp.__set_hint(err_msg);
-                return false;
-            }
-        }
-    }
-
-    if (!valid_origin_disk || !valid_target_disk) {
-        std::string invalid_disk_tag = !valid_origin_disk ? req.origin_disk : req.target_disk;
-        std::string err_msg = fmt::format("Invalid disk tag({} doesn't exist)", invalid_disk_tag);
-        LOG_ERROR_PREFIX("received replica disk migrate request(origin={}, target={}), err = {}",
-                         req.origin_disk,
-                         req.target_disk,
-                         err_msg);
-        resp.err = ERR_OBJECT_NOT_FOUND;
+    std::string err_msg;
+    auto ec = _replica->get_replica_stub()->_fs_manager.validate_migrate_op(
+        req.pid, req.origin_disk, req.target_disk, err_msg);
+    if (ec != ERR_OK) {
+        LOG_ERROR_PREFIX(err_msg);
+        resp.err = ec;
         resp.__set_hint(err_msg);
         return false;
     }
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index dcacf4baf..a3e16500a 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1105,34 +1105,10 @@ void replica_stub::on_query_disk_info(query_disk_info_rpc rpc)
         }
     }
 
-    for (const auto &dir_node : _fs_manager._dir_nodes) {
-        disk_info info;
-        // app_name empty means query all app replica_count
-        if (req.app_name.empty()) {
-            info.holding_primary_replicas = dir_node->holding_primary_replicas;
-            info.holding_secondary_replicas = dir_node->holding_secondary_replicas;
-        } else {
-            const auto &primary_iter = dir_node->holding_primary_replicas.find(app_id);
-            if (primary_iter != dir_node->holding_primary_replicas.end()) {
-                info.holding_primary_replicas[app_id] = primary_iter->second;
-            }
-
-            const auto &secondary_iter = dir_node->holding_secondary_replicas.find(app_id);
-            if (secondary_iter != dir_node->holding_secondary_replicas.end()) {
-                info.holding_secondary_replicas[app_id] = secondary_iter->second;
-            }
-        }
-        info.tag = dir_node->tag;
-        info.full_dir = dir_node->full_dir;
-        info.disk_capacity_mb = dir_node->disk_capacity_mb;
-        info.disk_available_mb = dir_node->disk_available_mb;
-
-        resp.disk_infos.emplace_back(info);
-    }
-
-    resp.total_capacity_mb = _fs_manager._total_capacity_mb;
-    resp.total_available_mb = _fs_manager._total_available_mb;
-
+    resp.disk_infos = _fs_manager.get_disk_infos(app_id);
+    // Get the statistics from fs_manager's metrics, they are thread-safe.
+    resp.total_capacity_mb = _fs_manager._counter_total_capacity_mb->get_integer_value();
+    resp.total_available_mb = _fs_manager._counter_total_available_mb->get_integer_value();
     resp.err = ERR_OK;
 }
 
@@ -1203,11 +1179,12 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
     }
 
     for (auto i = 0; i < data_dir_tags.size(); ++i) {
+        // TODO(yingchun): move the following code to fs_manager.
         auto dir = data_dirs[i];
-        if (_fs_manager.is_dir_node_available(dir, data_dir_tags[i])) {
+        if (_fs_manager.is_dir_node_exist(dir, data_dir_tags[i])) {
             resp.err = ERR_NODE_ALREADY_EXIST;
             resp.__set_err_hint(
-                fmt::format("data_dir({}) tag({}) already available", dir, data_dir_tags[i]));
+                fmt::format("data_dir({}) tag({}) already exist", dir, data_dir_tags[i]));
             return;
         }
 
diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp
index ec4b28a73..1954f29b5 100644
--- a/src/utils/filesystem.cpp
+++ b/src/utils/filesystem.cpp
@@ -94,6 +94,7 @@ static inline int get_stat_internal(const std::string &npath, struct stat_ &st)
     return err;
 }
 
+// TODO(yingchun): remove the return value because it's always 0.
 int get_normalized_path(const std::string &path, std::string &npath)
 {
     char sep;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org