You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/06/08 10:14:03 UTC
[incubator-pegasus] branch master updated: refactor: improve the single-responsibility of class fs_manager (2/n) (#1522)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d216696e5 refactor: improve the single-responsibility of class fs_manager (2/n) (#1522)
d216696e5 is described below
commit d216696e53be46745dbdcc8c6ac80f1f24a83d96
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Thu Jun 8 18:13:56 2023 +0800
refactor: improve the single-responsibility of class fs_manager (2/n) (#1522)
https://github.com/apache/incubator-pegasus/issues/1383
This patch moves some functions to fs_manager which are more reasonable to be
responsibilities of class fs_manager rather than those of other classes, includeing:
- remove `fs_manager::for_each_dir_node`
- minimize some locks
- rename `fs_manager::is_dir_node_available` to `fs_manager::is_dir_node_exist`
- move `get_disk_infos` code to class `fs_manager` and encapsulate it as a function
- move `validate_migrate_op` code to class `fs_manager` and encapsulate it as a function
- move `disk_status_to_error_code` from replica_2pc.cpp to class `fs_manager`
---
src/common/fs_manager.cpp | 146 +++++++++++++++++++++++++++++-----
src/common/fs_manager.h | 13 ++-
src/replica/replica_2pc.cpp | 15 ----
src/replica/replica_disk_migrator.cpp | 60 ++------------
src/replica/replica_stub.cpp | 37 ++-------
src/utils/filesystem.cpp | 1 +
6 files changed, 150 insertions(+), 122 deletions(-)
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 3d3eda94f..6fed98132 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -36,6 +36,7 @@
#include <algorithm>
#include <cmath>
+#include <cstdint>
#include <iosfwd>
#include <utility>
@@ -44,8 +45,8 @@
#include "fmt/core.h"
#include "fmt/ostream.h"
#include "perf_counter/perf_counter.h"
+#include "replica_admin_types.h"
#include "runtime/api_layer1.h"
-#include "runtime/rpc/rpc_address.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
@@ -69,6 +70,19 @@ DSN_DEFINE_bool(replication,
true,
"true means ignore broken data disk when initialize");
+error_code disk_status_to_error_code(disk_status::type ds)
+{
+ switch (ds) {
+ case disk_status::SPACE_INSUFFICIENT:
+ return dsn::ERR_DISK_INSUFFICIENT;
+ case disk_status::IO_ERROR:
+ return dsn::ERR_DISK_IO_ERROR;
+ default:
+ CHECK_EQ(disk_status::NORMAL, ds);
+ return dsn::ERR_OK;
+ }
+}
+
uint64_t dir_node::replicas_count() const
{
uint64_t sum = 0;
@@ -339,16 +353,6 @@ void fs_manager::remove_replica(const gpid &pid)
}
}
-bool fs_manager::for_each_dir_node(const std::function<bool(const dir_node &)> &func) const
-{
- zauto_read_lock l(_lock);
- for (auto &n : _dir_nodes) {
- if (!func(*n))
- return false;
- }
- return true;
-}
-
void fs_manager::update_disk_stat()
{
zauto_write_lock l(_lock);
@@ -388,21 +392,25 @@ void fs_manager::update_disk_stat()
void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag)
{
- zauto_write_lock l(_lock);
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
- dir_node *n = new dir_node(tag, norm_path);
- _dir_nodes.emplace_back(n);
- LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);
+ auto dn = std::make_shared<dir_node>(tag, norm_path);
+
+ {
+ zauto_write_lock l(_lock);
+ _dir_nodes.emplace_back(dn);
+ }
+ LOG_INFO("add new data dir({}) and mark as tag({})", norm_path, tag);
}
-bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::string &tag) const
+bool fs_manager::is_dir_node_exist(const std::string &data_dir, const std::string &tag) const
{
+ std::string norm_path;
+ utils::filesystem::get_normalized_path(data_dir, norm_path);
+
zauto_read_lock l(_lock);
- for (const auto &dir_node : _dir_nodes) {
- std::string norm_path;
- utils::filesystem::get_normalized_path(data_dir, norm_path);
- if (dir_node->full_dir == norm_path || dir_node->tag == tag) {
+ for (const auto &dn : _dir_nodes) {
+ if (dn->full_dir == norm_path || dn->tag == tag) {
return true;
}
}
@@ -497,5 +505,103 @@ dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type,
return child_dn;
}
+std::vector<disk_info> fs_manager::get_disk_infos(int app_id) const
+{
+ std::vector<disk_info> disk_infos;
+ zauto_read_lock l(_lock);
+ for (const auto &dn : _dir_nodes) {
+ disk_info di;
+ // Query all app info if 'app_id' is 0, which is not a valid app id.
+ if (app_id == 0) {
+ di.holding_primary_replicas = dn->holding_primary_replicas;
+ di.holding_secondary_replicas = dn->holding_secondary_replicas;
+ } else {
+ const auto &primary_replicas = dn->holding_primary_replicas.find(app_id);
+ if (primary_replicas != dn->holding_primary_replicas.end()) {
+ di.holding_primary_replicas[app_id] = primary_replicas->second;
+ }
+
+ const auto &secondary_replicas = dn->holding_secondary_replicas.find(app_id);
+ if (secondary_replicas != dn->holding_secondary_replicas.end()) {
+ di.holding_secondary_replicas[app_id] = secondary_replicas->second;
+ }
+ }
+ di.tag = dn->tag;
+ di.full_dir = dn->full_dir;
+ di.disk_capacity_mb = dn->disk_capacity_mb;
+ di.disk_available_mb = dn->disk_available_mb;
+
+ disk_infos.emplace_back(std::move(di));
+ }
+
+ return disk_infos;
+}
+
+error_code fs_manager::validate_migrate_op(gpid pid,
+ const std::string &origin_disk,
+ const std::string &target_disk,
+ std::string &err_msg) const
+{
+ bool origin_disk_exist = false;
+ bool target_disk_exist = false;
+ zauto_read_lock l(_lock);
+ for (const auto &dn : _dir_nodes) {
+ // Check if the origin directory is valid.
+ if (dn->tag == origin_disk) {
+ CHECK_FALSE(origin_disk_exist);
+ if (!dn->has(pid)) {
+ err_msg = fmt::format(
+ "replica({}) doesn't exist on the origin disk({})", pid, origin_disk);
+ return ERR_OBJECT_NOT_FOUND;
+ }
+
+ // It's OK to migrate a replica from a dir_node which is NORMAL or even
+ // SPACE_INSUFFICIENT, but not allowed when it's IO_ERROR.
+ if (dn->status == disk_status::IO_ERROR) {
+ err_msg = fmt::format(
+ "replica({}) exists on an IO-Error origin disk({})", pid, origin_disk);
+ return ERR_DISK_IO_ERROR;
+ }
+
+ origin_disk_exist = true;
+ }
+
+ // Check if the target directory is valid.
+ if (dn->tag == target_disk) {
+ CHECK_FALSE(target_disk_exist);
+ if (dn->has(pid)) {
+ err_msg =
+ fmt::format("replica({}) already exists on target disk({})", pid, target_disk);
+ return ERR_PATH_ALREADY_EXIST;
+ }
+
+ // It's not allowed to migrate a replica to a dir_node which is either
+ // SPACE_INSUFFICIENT or IO_ERROR.
+ if (dn->status == disk_status::SPACE_INSUFFICIENT ||
+ dn->status == disk_status::IO_ERROR) {
+ err_msg = fmt::format("replica({}) target disk({}) is {}",
+ pid,
+ origin_disk,
+ enum_to_string(dn->status));
+ return disk_status_to_error_code(dn->status);
+ }
+
+ target_disk_exist = true;
+ }
+ }
+
+ if (!origin_disk_exist) {
+ err_msg = fmt::format("origin disk({}) doesn't exist", origin_disk);
+ return ERR_OBJECT_NOT_FOUND;
+ }
+
+ if (!target_disk_exist) {
+ err_msg = fmt::format("target disk({}) doesn't exist", target_disk);
+ return ERR_OBJECT_NOT_FOUND;
+ }
+
+ return ERR_OK;
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index e4c51667d..107366987 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -20,7 +20,6 @@
#include <gtest/gtest_prod.h>
#include <stdint.h>
#include <atomic>
-#include <functional>
#include <map>
#include <memory>
#include <set>
@@ -30,6 +29,7 @@
#include "common/replication_other_types.h"
#include "metadata_types.h"
#include "perf_counter/perf_counter_wrapper.h"
+#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/string_view.h"
#include "utils/zlocks.h"
@@ -38,9 +38,12 @@ namespace dsn {
class gpid;
namespace replication {
+class disk_info;
DSN_DECLARE_int32(disk_min_available_space_ratio);
+error_code disk_status_to_error_code(disk_status::type ds);
+
struct dir_node
{
public:
@@ -116,16 +119,20 @@ public:
gpid child_pid,
const std::string &parent_dir);
void remove_replica(const dsn::gpid &pid);
- bool for_each_dir_node(const std::function<bool(const dir_node &)> &func) const;
void update_disk_stat();
void add_new_dir_node(const std::string &data_dir, const std::string &tag);
+ bool is_dir_node_exist(const std::string &data_dir, const std::string &tag) const;
const std::vector<std::shared_ptr<dir_node>> &get_dir_nodes() const
{
zauto_read_lock l(_lock);
return _dir_nodes;
}
- bool is_dir_node_available(const std::string &data_dir, const std::string &tag) const;
+ error_code validate_migrate_op(gpid pid,
+ const std::string &origin_disk,
+ const std::string &target_disk,
+ std::string &err_msg) const;
+ std::vector<disk_info> get_disk_infos(int app_id) const;
private:
void reset_disk_stat()
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index 2a8e47787..e0cff3868 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -117,21 +117,6 @@ DSN_DEFINE_uint64(
DSN_DECLARE_int32(max_mutation_count_in_prepare_list);
DSN_DECLARE_int32(staleness_for_commit);
-namespace {
-error_code disk_status_to_error_code(disk_status::type ds)
-{
- switch (ds) {
- case disk_status::SPACE_INSUFFICIENT:
- return dsn::ERR_DISK_INSUFFICIENT;
- case disk_status::IO_ERROR:
- return dsn::ERR_DISK_IO_ERROR;
- default:
- CHECK_EQ(disk_status::NORMAL, ds);
- return dsn::ERR_OK;
- }
-}
-} // anonymous namespace
-
void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
{
_checker.only_one_thread_access();
diff --git a/src/replica/replica_disk_migrator.cpp b/src/replica/replica_disk_migrator.cpp
index 1afc5c62e..e1fecba80 100644
--- a/src/replica/replica_disk_migrator.cpp
+++ b/src/replica/replica_disk_migrator.cpp
@@ -19,10 +19,6 @@
#include <boost/algorithm/string/replace.hpp>
#include <fmt/core.h>
-#include <fmt/ostream.h>
-#include <iosfwd>
-#include <memory>
-#include <vector>
#include "common/fs_manager.h"
#include "common/gpid.h"
@@ -126,56 +122,12 @@ bool replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc)
return false;
}
- bool valid_origin_disk = false;
- bool valid_target_disk = false;
- // _dir_nodes: std::vector<std::shared_ptr<dir_node>>
- // TODO(yingchun): skip disks which are SPACE_INSUFFICIENT or IO_ERROR.
- for (const auto &dir_node : _replica->get_replica_stub()->_fs_manager._dir_nodes) {
- if (dir_node->tag == req.origin_disk) {
- valid_origin_disk = true;
- if (!dir_node->has(req.pid)) {
- std::string err_msg =
- fmt::format("Invalid replica(replica({}) doesn't exist on origin disk({}))",
- req.pid,
- req.origin_disk);
- LOG_ERROR_PREFIX(
- "received replica disk migrate request(origin={}, target={}), err = {}",
- req.origin_disk,
- req.target_disk,
- err_msg);
- resp.err = ERR_OBJECT_NOT_FOUND;
- resp.__set_hint(err_msg);
- return false;
- }
- }
-
- if (dir_node->tag == req.target_disk) {
- valid_target_disk = true;
- if (dir_node->has(get_gpid())) {
- std::string err_msg =
- fmt::format("Invalid replica(replica({}) has existed on target disk({}))",
- req.pid,
- req.target_disk);
- LOG_ERROR_PREFIX(
- "received replica disk migrate request(origin={}, target={}), err = {}",
- req.origin_disk,
- req.target_disk,
- err_msg);
- resp.err = ERR_PATH_ALREADY_EXIST;
- resp.__set_hint(err_msg);
- return false;
- }
- }
- }
-
- if (!valid_origin_disk || !valid_target_disk) {
- std::string invalid_disk_tag = !valid_origin_disk ? req.origin_disk : req.target_disk;
- std::string err_msg = fmt::format("Invalid disk tag({} doesn't exist)", invalid_disk_tag);
- LOG_ERROR_PREFIX("received replica disk migrate request(origin={}, target={}), err = {}",
- req.origin_disk,
- req.target_disk,
- err_msg);
- resp.err = ERR_OBJECT_NOT_FOUND;
+ std::string err_msg;
+ auto ec = _replica->get_replica_stub()->_fs_manager.validate_migrate_op(
+ req.pid, req.origin_disk, req.target_disk, err_msg);
+ if (ec != ERR_OK) {
+ LOG_ERROR_PREFIX(err_msg);
+ resp.err = ec;
resp.__set_hint(err_msg);
return false;
}
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index dcacf4baf..a3e16500a 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1105,34 +1105,10 @@ void replica_stub::on_query_disk_info(query_disk_info_rpc rpc)
}
}
- for (const auto &dir_node : _fs_manager._dir_nodes) {
- disk_info info;
- // app_name empty means query all app replica_count
- if (req.app_name.empty()) {
- info.holding_primary_replicas = dir_node->holding_primary_replicas;
- info.holding_secondary_replicas = dir_node->holding_secondary_replicas;
- } else {
- const auto &primary_iter = dir_node->holding_primary_replicas.find(app_id);
- if (primary_iter != dir_node->holding_primary_replicas.end()) {
- info.holding_primary_replicas[app_id] = primary_iter->second;
- }
-
- const auto &secondary_iter = dir_node->holding_secondary_replicas.find(app_id);
- if (secondary_iter != dir_node->holding_secondary_replicas.end()) {
- info.holding_secondary_replicas[app_id] = secondary_iter->second;
- }
- }
- info.tag = dir_node->tag;
- info.full_dir = dir_node->full_dir;
- info.disk_capacity_mb = dir_node->disk_capacity_mb;
- info.disk_available_mb = dir_node->disk_available_mb;
-
- resp.disk_infos.emplace_back(info);
- }
-
- resp.total_capacity_mb = _fs_manager._total_capacity_mb;
- resp.total_available_mb = _fs_manager._total_available_mb;
-
+ resp.disk_infos = _fs_manager.get_disk_infos(app_id);
+ // Get the statistics from fs_manager's metrics, they are thread-safe.
+ resp.total_capacity_mb = _fs_manager._counter_total_capacity_mb->get_integer_value();
+ resp.total_available_mb = _fs_manager._counter_total_available_mb->get_integer_value();
resp.err = ERR_OK;
}
@@ -1203,11 +1179,12 @@ void replica_stub::on_add_new_disk(add_new_disk_rpc rpc)
}
for (auto i = 0; i < data_dir_tags.size(); ++i) {
+ // TODO(yingchun): move the following code to fs_manager.
auto dir = data_dirs[i];
- if (_fs_manager.is_dir_node_available(dir, data_dir_tags[i])) {
+ if (_fs_manager.is_dir_node_exist(dir, data_dir_tags[i])) {
resp.err = ERR_NODE_ALREADY_EXIST;
resp.__set_err_hint(
- fmt::format("data_dir({}) tag({}) already available", dir, data_dir_tags[i]));
+ fmt::format("data_dir({}) tag({}) already exist", dir, data_dir_tags[i]));
return;
}
diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp
index ec4b28a73..1954f29b5 100644
--- a/src/utils/filesystem.cpp
+++ b/src/utils/filesystem.cpp
@@ -94,6 +94,7 @@ static inline int get_stat_internal(const std::string &npath, struct stat_ &st)
return err;
}
+// TODO(yingchun): remove the return value because it's always 0.
int get_normalized_path(const std::string &path, std::string &npath)
{
char sep;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org