You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/05/17 09:15:40 UTC
[incubator-pegasus] branch master updated: refactor: minor refactor on class fs_manager (#1476)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 6b9fba392 refactor: minor refactor on class fs_manager (#1476)
6b9fba392 is described below
commit 6b9fba3928071e5b9ecf8e19790843a0b0a77ba9
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Wed May 17 17:15:34 2023 +0800
refactor: minor refactor on class fs_manager (#1476)
https://github.com/apache/incubator-pegasus/issues/1383
This is a minor refactor work on class fs_manager, including:
- use `uint64_t` instead of `unsigned` in fs_manager module.
- remove useless "test" parameters.
---
src/common/fs_manager.cpp | 135 +++++++++++++++++++-----------------
src/common/fs_manager.h | 28 ++++----
src/common/test/fs_manager_test.cpp | 17 +++++
src/meta/test/misc/misc.cpp | 4 +-
src/replica/replica_stub.cpp | 5 +-
5 files changed, 105 insertions(+), 84 deletions(-)
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 5a460a896..2ba683da4 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -61,36 +61,39 @@ DSN_DEFINE_int32(replication,
"space insufficient");
DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);
-unsigned dir_node::replicas_count() const
+uint64_t dir_node::replicas_count() const
{
- unsigned sum = 0;
+ uint64_t sum = 0;
for (const auto &s : holding_replicas) {
sum += s.second.size();
}
return sum;
}
-unsigned dir_node::replicas_count(app_id id) const
+uint64_t dir_node::replicas_count(app_id id) const
{
const auto iter = holding_replicas.find(id);
- if (iter == holding_replicas.end())
+ if (iter == holding_replicas.end()) {
return 0;
+ }
return iter->second.size();
}
bool dir_node::has(const gpid &pid) const
{
auto iter = holding_replicas.find(pid.get_app_id());
- if (iter == holding_replicas.end())
+ if (iter == holding_replicas.end()) {
return false;
+ }
return iter->second.find(pid) != iter->second.end();
}
-unsigned dir_node::remove(const gpid &pid)
+uint64_t dir_node::remove(const gpid &pid)
{
auto iter = holding_replicas.find(pid.get_app_id());
- if (iter == holding_replicas.end())
+ if (iter == holding_replicas.end()) {
return 0;
+ }
return iter->second.erase(pid);
}
@@ -134,68 +137,67 @@ bool dir_node::update_disk_stat(const bool update_disk_status)
return (old_status != new_status);
}
-fs_manager::fs_manager(bool for_test)
+fs_manager::fs_manager()
{
- if (!for_test) {
- _counter_total_capacity_mb.init_app_counter("eon.replica_stub",
- "disk.capacity.total(MB)",
+ _counter_total_capacity_mb.init_app_counter("eon.replica_stub",
+ "disk.capacity.total(MB)",
+ COUNTER_TYPE_NUMBER,
+ "total disk capacity in MB");
+ _counter_total_available_mb.init_app_counter("eon.replica_stub",
+ "disk.available.total(MB)",
+ COUNTER_TYPE_NUMBER,
+ "total disk available in MB");
+ _counter_total_available_ratio.init_app_counter("eon.replica_stub",
+ "disk.available.total.ratio",
COUNTER_TYPE_NUMBER,
- "total disk capacity in MB");
- _counter_total_available_mb.init_app_counter("eon.replica_stub",
- "disk.available.total(MB)",
- COUNTER_TYPE_NUMBER,
- "total disk available in MB");
- _counter_total_available_ratio.init_app_counter("eon.replica_stub",
- "disk.available.total.ratio",
- COUNTER_TYPE_NUMBER,
- "total disk available ratio");
- _counter_min_available_ratio.init_app_counter("eon.replica_stub",
- "disk.available.min.ratio",
- COUNTER_TYPE_NUMBER,
- "minimal disk available ratio in all disks");
- _counter_max_available_ratio.init_app_counter("eon.replica_stub",
- "disk.available.max.ratio",
- COUNTER_TYPE_NUMBER,
- "maximal disk available ratio in all disks");
- }
+ "total disk available ratio");
+ _counter_min_available_ratio.init_app_counter("eon.replica_stub",
+ "disk.available.min.ratio",
+ COUNTER_TYPE_NUMBER,
+ "minimal disk available ratio in all disks");
+ _counter_max_available_ratio.init_app_counter("eon.replica_stub",
+ "disk.available.max.ratio",
+ COUNTER_TYPE_NUMBER,
+ "maximal disk available ratio in all disks");
}
dir_node *fs_manager::get_dir_node(const std::string &subdir) const
{
- zauto_read_lock l(_lock);
std::string norm_subdir;
utils::filesystem::get_normalized_path(subdir, norm_subdir);
- for (auto &n : _dir_nodes) {
- // if input is a subdir of some dir_nodes
- const std::string &d = n->full_dir;
- if (norm_subdir.compare(0, d.size(), d) == 0 &&
- (norm_subdir.size() == d.size() || norm_subdir[d.size()] == '/')) {
- return n.get();
+
+ zauto_read_lock l(_lock);
+ for (const auto &dn : _dir_nodes) {
+ // Check if 'subdir' is a sub-directory of 'dn'.
+ const std::string &full_dir = dn->full_dir;
+ if (full_dir.size() > norm_subdir.size()) {
+ continue;
+ }
+
+ if ((norm_subdir.size() == full_dir.size() || norm_subdir[full_dir.size()] == '/') &&
+ norm_subdir.compare(0, full_dir.size(), full_dir) == 0) {
+ return dn.get();
}
}
return nullptr;
}
-// size of the two vectors should be equal
-dsn::error_code fs_manager::initialize(const std::vector<std::string> &data_dirs,
- const std::vector<std::string> &tags,
- bool for_test)
+void fs_manager::initialize(const std::vector<std::string> &data_dirs,
+ const std::vector<std::string> &data_dir_tags)
{
- // create all dir_nodes
- CHECK_EQ(data_dirs.size(), tags.size());
+ CHECK_EQ(data_dirs.size(), data_dir_tags.size());
for (unsigned i = 0; i < data_dirs.size(); ++i) {
std::string norm_path;
utils::filesystem::get_normalized_path(data_dirs[i], norm_path);
- dir_node *n = new dir_node(tags[i], norm_path);
+ dir_node *n = new dir_node(data_dir_tags[i], norm_path);
_dir_nodes.emplace_back(n);
- LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, tags[i]);
+ LOG_INFO(
+ "{}: mark data dir({}) as tag({})", dsn_primary_address(), norm_path, data_dir_tags[i]);
}
_available_data_dirs = data_dirs;
- if (!for_test) {
- update_disk_stat(false);
- }
- return dsn::ERR_OK;
+ // Update the disk statistics.
+ update_disk_stat();
}
dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &tag)
@@ -211,21 +213,26 @@ dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &ta
void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
{
- dir_node *n = get_dir_node(pid_dir);
- if (nullptr == n) {
+ const auto &dn = get_dir_node(pid_dir);
+ if (dsn_unlikely(nullptr == dn)) {
LOG_ERROR(
"{}: dir({}) of gpid({}) haven't registered", dsn_primary_address(), pid_dir, pid);
- } else {
+ return;
+ }
+
+ bool emplace_success = false;
+ {
zauto_write_lock l(_lock);
- std::set<dsn::gpid> &replicas_for_app = n->holding_replicas[pid.get_app_id()];
- auto result = replicas_for_app.emplace(pid);
- if (!result.second) {
- LOG_WARNING(
- "{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, n->tag);
- } else {
- LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, n->tag);
- }
+ auto &replicas_for_app = dn->holding_replicas[pid.get_app_id()];
+ emplace_success = replicas_for_app.emplace(pid).second;
+ }
+ if (!emplace_success) {
+ LOG_WARNING(
+ "{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, dn->tag);
+ return;
}
+
+ LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
}
void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/ std::string &dir)
@@ -271,16 +278,16 @@ void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/
void fs_manager::remove_replica(const gpid &pid)
{
zauto_write_lock l(_lock);
- unsigned remove_count = 0;
- for (auto &n : _dir_nodes) {
- unsigned r = n->remove(pid);
+ uint64_t remove_count = 0;
+ for (auto &dn : _dir_nodes) {
+ uint64_t r = dn->remove(pid);
CHECK_LE_MSG(remove_count + r,
1,
"gpid({}) found in dir({}), which was removed before",
pid,
- n->tag);
+ dn->tag);
if (r != 0) {
- LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, n->tag);
+ LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, dn->tag);
}
remove_count += r;
}
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 75427cc89..ebdb3640a 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -17,9 +17,9 @@
#pragma once
+#include <gtest/gtest_prod.h>
#include <stdint.h>
#include <functional>
-#include <gtest/gtest_prod.h>
#include <map>
#include <memory>
#include <set>
@@ -37,15 +37,14 @@ namespace dsn {
class gpid;
namespace replication {
-class replication_options;
DSN_DECLARE_int32(disk_min_available_space_ratio);
struct dir_node
{
public:
- std::string tag;
- std::string full_dir;
+ const std::string tag;
+ const std::string full_dir;
int64_t disk_capacity_mb;
int64_t disk_available_mb;
int disk_available_ratio;
@@ -69,24 +68,24 @@ public:
status(status_)
{
}
- unsigned replicas_count(app_id id) const;
- unsigned replicas_count() const;
+ // All functions are not thread-safe. However, they are only used in fs_manager
+ // and protected by the lock in fs_manager.
+ uint64_t replicas_count(app_id id) const;
+ uint64_t replicas_count() const;
bool has(const dsn::gpid &pid) const;
- unsigned remove(const dsn::gpid &pid);
+ uint64_t remove(const dsn::gpid &pid);
bool update_disk_stat(const bool update_disk_status);
};
class fs_manager
{
public:
- fs_manager(bool for_test);
- ~fs_manager() {}
+ fs_manager();
- // this should be called before open/load any replicas
- dsn::error_code initialize(const replication_options &opts);
- dsn::error_code initialize(const std::vector<std::string> &data_dirs,
- const std::vector<std::string> &tags,
- bool for_test);
+ // Should be called before open/load any replicas.
+ // NOTE: 'data_dirs' and 'data_dir_tags' must have the same size and in the same order.
+ void initialize(const std::vector<std::string> &data_dirs,
+ const std::vector<std::string> &data_dir_tags);
dsn::error_code get_disk_tag(const std::string &dir, /*out*/ std::string &tag);
void allocate_dir(const dsn::gpid &pid,
@@ -148,6 +147,7 @@ private:
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
+ FRIEND_TEST(fs_manager, get_dir_node);
FRIEND_TEST(replica_test, test_auto_trash);
};
} // replication
diff --git a/src/common/test/fs_manager_test.cpp b/src/common/test/fs_manager_test.cpp
index 2ccd579f1..d6bbcfa31 100644
--- a/src/common/test/fs_manager_test.cpp
+++ b/src/common/test/fs_manager_test.cpp
@@ -60,5 +60,22 @@ TEST(fs_manager, dir_update_disk_status)
}
}
+TEST(fs_manager, get_dir_node)
+{
+ fs_manager fm;
+ fm.initialize({"/data1"}, {"data1"});
+
+ ASSERT_EQ(nullptr, fm.get_dir_node(""));
+ ASSERT_EQ(nullptr, fm.get_dir_node("/"));
+
+ ASSERT_NE(nullptr, fm.get_dir_node("/data1"));
+ ASSERT_NE(nullptr, fm.get_dir_node("/data1/"));
+ ASSERT_NE(nullptr, fm.get_dir_node("/data1/replica1"));
+
+ ASSERT_EQ(nullptr, fm.get_dir_node("/data2"));
+ ASSERT_EQ(nullptr, fm.get_dir_node("/data2/"));
+ ASSERT_EQ(nullptr, fm.get_dir_node("/data2/replica1"));
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp
index 5733dd651..aacf8c043 100644
--- a/src/meta/test/misc/misc.cpp
+++ b/src/meta/test/misc/misc.cpp
@@ -203,10 +203,10 @@ void generate_node_fs_manager(const app_mapper &apps,
for (const auto &kv : nodes) {
const node_state &ns = kv.second;
if (nfm.find(ns.addr()) == nfm.end()) {
- nfm.emplace(ns.addr(), std::make_shared<fs_manager>(true));
+ nfm.emplace(ns.addr(), std::make_shared<fs_manager>());
}
fs_manager &manager = *(nfm.find(ns.addr())->second);
- manager.initialize(data_dirs, tags, true);
+ manager.initialize(data_dirs, tags);
ns.for_each_partition([&](const dsn::gpid &pid) {
const config_context &cc = *get_config_context(apps, pid);
snprintf(pid_dir,
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index d5cf1f0bc..77a0d58f6 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -198,7 +198,6 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_mem_release_max_reserved_mem_percentage(10),
_max_concurrent_bulk_load_downloading_count(5),
_learn_app_concurrent_count(0),
- _fs_manager(false),
_bulk_load_downloading_count(0),
_manual_emergency_checkpointing_count(0),
_is_running(false)
@@ -858,9 +857,7 @@ void replica_stub::initialize_fs_manager(const std::vector<std::string> &data_di
CHECK_GT_MSG(
available_dirs.size(), 0, "initialize fs manager failed, no available data directory");
- CHECK_EQ_MSG(_fs_manager.initialize(available_dirs, available_dir_tags, false),
- dsn::ERR_OK,
- "initialize fs manager failed");
+ _fs_manager.initialize(available_dirs, available_dir_tags);
}
void replica_stub::initialize_start()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org