You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/03/31 09:45:30 UTC
[incubator-pegasus] branch master updated: refactor: minor refactor on replica module (#1423)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 039e2fd3f refactor: minor refactor on replica module (#1423)
039e2fd3f is described below
commit 039e2fd3f54a84198440f7b55f47f295e7a975b5
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Fri Mar 31 17:45:24 2023 +0800
refactor: minor refactor on replica module (#1423)
https://github.com/apache/incubator-pegasus/issues/1383
This patch fix some minor issues includes:
- short return id `FLAGS_fd_disabled` is true in `remove_replica_on_meta_server`
to avoid running meaningless logic
- encapsulate a new function `wait_closing_replicas_finished()` in `replica_stub`
- marks some functions as `const` or `override`
- marks some parameters or variables as `const`
- adds missing lock
- fixes some typos
- use short-circuit return style
---
src/common/fs_manager.cpp | 3 +-
src/common/fs_manager.h | 2 +-
src/replica/duplication/mutation_batch.h | 2 +-
src/replica/prepare_list.h | 3 +-
src/replica/replica_learn.cpp | 26 ++++++++-------
src/replica/replica_stub.cpp | 52 ++++++++++++++++++-----------
src/replica/replica_stub.h | 11 ++++--
src/replica/replication_app_base.h | 4 +++
src/replica/split/replica_split_manager.cpp | 6 ++--
src/replica/test/mock_utils.h | 9 ++---
src/replica/test/replica_test.cpp | 9 +++--
11 files changed, 79 insertions(+), 48 deletions(-)
diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 51c65a0fa..5a460a896 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -160,8 +160,9 @@ fs_manager::fs_manager(bool for_test)
}
}
-dir_node *fs_manager::get_dir_node(const std::string &subdir)
+dir_node *fs_manager::get_dir_node(const std::string &subdir) const
{
+ zauto_read_lock l(_lock);
std::string norm_subdir;
utils::filesystem::get_normalized_path(subdir, norm_subdir);
for (auto &n : _dir_nodes) {
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 4e92316a9..3c29973c2 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -115,7 +115,7 @@ private:
_status_updated_dir_nodes.clear();
}
- dir_node *get_dir_node(const std::string &subdir);
+ dir_node *get_dir_node(const std::string &subdir) const;
// when visit the tag/storage of the _dir_nodes map, there's no need to protect by the lock.
// but when visit the holding_replicas, you must take care.
diff --git a/src/replica/duplication/mutation_batch.h b/src/replica/duplication/mutation_batch.h
index 0f11d5681..fb5efa123 100644
--- a/src/replica/duplication/mutation_batch.h
+++ b/src/replica/duplication/mutation_batch.h
@@ -42,7 +42,7 @@ public:
int max_count,
mutation_committer committer);
- void commit(decree d, commit_type ct);
+ void commit(decree d, commit_type ct) override;
private:
perf_counter_wrapper _counter_dulication_mutation_loss_count;
diff --git a/src/replica/prepare_list.h b/src/replica/prepare_list.h
index da5429536..96e4d653f 100644
--- a/src/replica/prepare_list.h
+++ b/src/replica/prepare_list.h
@@ -70,8 +70,9 @@ public:
//
// if pop_all_committed_mutations = true, pop all committed mutations, will only used during
// bulk load ingestion
- // if secondary_commit = true, and status is secondary or protential secondary, previous logs
+ // if secondary_commit = true, and status is secondary or potential secondary, previous logs
// will be committed
+ // TODO(yingchun): should check return values for all callers by adding WARN_UNUSED_RESULT.
error_code prepare(mutation_ptr &mu,
partition_status::type status,
bool pop_all_committed_mutations = false,
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 92da593dc..1d1abadfc 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -1435,7 +1435,7 @@ void replica::on_add_learner(const group_check_request &request)
error_code replica::apply_learned_state_from_private_log(learn_state &state)
{
bool duplicating = is_duplication_master();
- // if no dunplicate, learn_start_decree=last_commit decree, step_back means whether
+ // if no duplicate, learn_start_decree=last_commit decree, step_back means whether
// `learn_start_decree`should be stepped back to include all the
// unconfirmed when duplicating in this round of learn. default is false
bool step_back = false;
@@ -1490,17 +1490,19 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
_app->last_committed_decree(),
FLAGS_max_mutation_count_in_prepare_list,
[this, duplicating, step_back](mutation_ptr &mu) {
- if (mu->data.header.decree == _app->last_committed_decree() + 1) {
- // TODO: assign the returned error_code to err and check it
- _app->apply_mutation(mu);
-
- // appends logs-in-cache into plog to ensure them can be duplicated.
- // if current case is step back, it means the logs has been reserved
- // through `reset_form` above
- if (duplicating && !step_back) {
- _private_log->append(
- mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr);
- }
+ if (mu->data.header.decree != _app->last_committed_decree() + 1) {
+ return;
+ }
+
+ // TODO: assign the returned error_code to err and check it
+ _app->apply_mutation(mu);
+
+ // appends logs-in-cache into plog to ensure them can be duplicated.
+ // if current case is step back, it means the logs has been reserved
+ // through `reset_form` above
+ if (duplicating && !step_back) {
+ _private_log->append(
+ mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr);
}
});
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 3a38b5723..2b3030295 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -826,8 +826,8 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
}
}
-void replica_stub::initialize_fs_manager(std::vector<std::string> &data_dirs,
- std::vector<std::string> &data_dir_tags)
+void replica_stub::initialize_fs_manager(const std::vector<std::string> &data_dirs,
+ const std::vector<std::string> &data_dir_tags)
{
std::string cdir;
std::string err_msg;
@@ -835,7 +835,7 @@ void replica_stub::initialize_fs_manager(std::vector<std::string> &data_dirs,
std::vector<std::string> available_dirs;
std::vector<std::string> available_dir_tags;
for (auto i = 0; i < data_dir_tags.size(); ++i) {
- std::string &dir = data_dirs[i];
+ const auto &dir = data_dirs[i];
if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
!utils::filesystem::check_dir_rw(dir, err_msg))) {
if (FLAGS_ignore_broken_disk) {
@@ -1345,7 +1345,7 @@ void replica_stub::on_add_learner(const group_check_request &request)
return;
}
- LOG_INFO("{}@{}: received add learner, primary = {}, ballot ={}, status = {}, "
+ LOG_INFO("{}@{}: received add learner, primary = {}, ballot = {}, status = {}, "
"last_committed_decree = {}",
request.config.pid,
_primary_address_str,
@@ -1629,6 +1629,10 @@ void replica_stub::on_node_query_reply_scatter2(replica_stub_ptr this_, gpid id)
void replica_stub::remove_replica_on_meta_server(const app_info &info,
const partition_configuration &config)
{
+ if (FLAGS_fd_disabled) {
+ return;
+ }
+
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_UPDATE_PARTITION_CONFIGURATION);
std::shared_ptr<configuration_update_request> request(new configuration_update_request);
@@ -2818,24 +2822,10 @@ void replica_stub::close()
_mem_release_timer_task = nullptr;
}
+ wait_closing_replicas_finished();
+
{
zauto_write_lock l(_replicas_lock);
- while (!_closing_replicas.empty()) {
- task_ptr task = std::get<0>(_closing_replicas.begin()->second);
- gpid tmp_gpid = _closing_replicas.begin()->first;
- _replicas_lock.unlock_write();
-
- task->wait();
-
- _replicas_lock.lock_write();
- // task will automatically remove this replica from _closing_replicas
- if (!_closing_replicas.empty()) {
- CHECK_NE_MSG(tmp_gpid,
- _closing_replicas.begin()->first,
- "this replica '{}' should has been removed",
- tmp_gpid.to_string());
- }
- }
while (!_opening_replicas.empty()) {
task_ptr task = _opening_replicas.begin()->second;
@@ -3200,5 +3190,27 @@ void replica_stub::update_config(const std::string &name)
UPDATE_CONFIG(_config_sync_timer_task->update_interval, config_sync_interval_ms, name);
}
+void replica_stub::wait_closing_replicas_finished()
+{
+ zauto_write_lock l(_replicas_lock);
+ while (!_closing_replicas.empty()) {
+ auto task = std::get<0>(_closing_replicas.begin()->second);
+ auto first_gpid = _closing_replicas.begin()->first;
+
+ // TODO(yingchun): improve the code
+ _replicas_lock.unlock_write();
+ task->wait();
+ _replicas_lock.lock_write();
+
+ // task will automatically remove this replica from '_closing_replicas'
+ if (!_closing_replicas.empty()) {
+ CHECK_NE_MSG(first_gpid,
+ _closing_replicas.begin()->first,
+ "this replica '{}' should has been removed",
+ first_gpid);
+ }
+ }
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 55fa29055..af04af6c0 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -131,8 +131,8 @@ public:
//
void initialize(const replication_options &opts, bool clear = false);
void initialize(bool clear = false);
- void initialize_fs_manager(std::vector<std::string> &data_dirs,
- std::vector<std::string> &data_dir_tags);
+ void initialize_fs_manager(const std::vector<std::string> &data_dirs,
+ const std::vector<std::string> &data_dir_tags);
void set_options(const replication_options &opts) { _options = opts; }
void open_service();
void close();
@@ -264,6 +264,8 @@ public:
void update_config(const std::string &name);
+ fs_manager *get_fs_manager() { return &_fs_manager; }
+
private:
enum replica_node_state
{
@@ -356,6 +358,9 @@ private:
void register_jemalloc_ctrl_command();
#endif
+ // Wait all replicas in closing state to be finished.
+ void wait_closing_replicas_finished();
+
private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::replica;
@@ -381,7 +386,7 @@ private:
friend class replica_follower;
friend class replica_follower_test;
friend class replica_http_service_test;
- FRIEND_TEST(replica_test, test_clear_on_failer);
+ FRIEND_TEST(replica_test, test_clear_on_failure);
typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h
index e0e292d6d..29006a956 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -135,6 +135,10 @@ public:
error_code close(bool clear_state);
error_code apply_checkpoint(chkpt_apply_mode mode, const learn_state &state);
+
+ // Return code:
+ // - ERR_OK: everything is OK.
+ // - ERR_LOCAL_APP_FAILURE: other type of errors.
error_code apply_mutation(const mutation *mu);
// methods need to implement on storage engine side
diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp
index 20a974a43..f366407c1 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -431,10 +431,12 @@ replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi
_replica->_app->last_committed_decree(),
FLAGS_max_mutation_count_in_prepare_list,
[this](mutation_ptr &mu) {
- if (mu->data.header.decree ==
+ if (mu->data.header.decree !=
_replica->_app->last_committed_decree() + 1) {
- _replica->_app->apply_mutation(mu);
+ return;
}
+
+ _replica->_app->apply_mutation(mu);
});
// replay private log
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 4961b2eb3..e01c5612f 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -313,10 +313,11 @@ public:
config.pid = pid;
config.status = status;
- auto data_dirs = std::vector<std::string>{"./"};
- auto data_dirs_tag = std::vector<std::string>{"tag"};
- initialize_fs_manager(data_dirs, data_dirs_tag);
- auto *rep = new mock_replica(this, pid, info, "./", need_restore, is_duplication_follower);
+ // TODO(yingchun): should refactor to move to cstor or initializer.
+ initialize_fs_manager({"./"}, {"tag"});
+ std::string dir = get_replica_dir("test", pid);
+ auto *rep =
+ new mock_replica(this, pid, info, dir.c_str(), need_restore, is_duplication_follower);
rep->set_replica_config(config);
return rep;
}
diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp
index 69404b6a5..42be2a2ae 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -62,6 +62,7 @@
namespace dsn {
namespace replication {
+DSN_DECLARE_bool(fd_disabled);
DSN_DECLARE_string(cold_backup_root);
class replica_test : public replica_test_base
@@ -462,13 +463,15 @@ TEST_F(replica_test, test_query_last_checkpoint_info)
ASSERT_EQ(resp.base_local_dir, "./data/checkpoint.100");
}
-TEST_F(replica_test, test_clear_on_failer)
+TEST_F(replica_test, test_clear_on_failure)
{
+ // Disable failure detector to avoid connecting with meta server which is not started.
+ FLAGS_fd_disabled = true;
+
replica *rep =
stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
- auto path = stub->get_replica_dir(_app_info.app_type.c_str(), pid);
+ auto path = rep->dir();
dsn::utils::filesystem::create_directory(path);
- ASSERT_TRUE(dsn::utils::filesystem::path_exists(path));
ASSERT_TRUE(has_gpid(pid));
stub->clear_on_failure(rep, path, pid);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org