You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2023/03/31 09:45:30 UTC

[incubator-pegasus] branch master updated: refactor: minor refactor on replica module (#1423)

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 039e2fd3f refactor: minor refactor on replica module (#1423)
039e2fd3f is described below

commit 039e2fd3f54a84198440f7b55f47f295e7a975b5
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Fri Mar 31 17:45:24 2023 +0800

    refactor: minor refactor on replica module (#1423)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    This patch fix some minor issues includes:
    - short return id `FLAGS_fd_disabled` is true in `remove_replica_on_meta_server`
      to avoid running meaningless logic
    - encapsulate a new function `wait_closing_replicas_finished()` in `replica_stub`
    - marks some functions as `const` or `override`
    - marks some parameters or variables as `const`
    - adds missing lock
    - fixes some typos
    - use short-circuit return style
---
 src/common/fs_manager.cpp                   |  3 +-
 src/common/fs_manager.h                     |  2 +-
 src/replica/duplication/mutation_batch.h    |  2 +-
 src/replica/prepare_list.h                  |  3 +-
 src/replica/replica_learn.cpp               | 26 ++++++++-------
 src/replica/replica_stub.cpp                | 52 ++++++++++++++++++-----------
 src/replica/replica_stub.h                  | 11 ++++--
 src/replica/replication_app_base.h          |  4 +++
 src/replica/split/replica_split_manager.cpp |  6 ++--
 src/replica/test/mock_utils.h               |  9 ++---
 src/replica/test/replica_test.cpp           |  9 +++--
 11 files changed, 79 insertions(+), 48 deletions(-)

diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp
index 51c65a0fa..5a460a896 100644
--- a/src/common/fs_manager.cpp
+++ b/src/common/fs_manager.cpp
@@ -160,8 +160,9 @@ fs_manager::fs_manager(bool for_test)
     }
 }
 
-dir_node *fs_manager::get_dir_node(const std::string &subdir)
+dir_node *fs_manager::get_dir_node(const std::string &subdir) const
 {
+    zauto_read_lock l(_lock);
     std::string norm_subdir;
     utils::filesystem::get_normalized_path(subdir, norm_subdir);
     for (auto &n : _dir_nodes) {
diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 4e92316a9..3c29973c2 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -115,7 +115,7 @@ private:
         _status_updated_dir_nodes.clear();
     }
 
-    dir_node *get_dir_node(const std::string &subdir);
+    dir_node *get_dir_node(const std::string &subdir) const;
 
     // when visit the tag/storage of the _dir_nodes map, there's no need to protect by the lock.
     // but when visit the holding_replicas, you must take care.
diff --git a/src/replica/duplication/mutation_batch.h b/src/replica/duplication/mutation_batch.h
index 0f11d5681..fb5efa123 100644
--- a/src/replica/duplication/mutation_batch.h
+++ b/src/replica/duplication/mutation_batch.h
@@ -42,7 +42,7 @@ public:
                     int max_count,
                     mutation_committer committer);
 
-    void commit(decree d, commit_type ct);
+    void commit(decree d, commit_type ct) override;
 
 private:
     perf_counter_wrapper _counter_dulication_mutation_loss_count;
diff --git a/src/replica/prepare_list.h b/src/replica/prepare_list.h
index da5429536..96e4d653f 100644
--- a/src/replica/prepare_list.h
+++ b/src/replica/prepare_list.h
@@ -70,8 +70,9 @@ public:
     //
     // if pop_all_committed_mutations = true, pop all committed mutations, will only used during
     // bulk load ingestion
-    // if secondary_commit = true, and status is secondary or protential secondary, previous logs
+    // if secondary_commit = true, and status is secondary or potential secondary, previous logs
     // will be committed
+    // TODO(yingchun): should check return values for all callers by adding WARN_UNUSED_RESULT.
     error_code prepare(mutation_ptr &mu,
                        partition_status::type status,
                        bool pop_all_committed_mutations = false,
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 92da593dc..1d1abadfc 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -1435,7 +1435,7 @@ void replica::on_add_learner(const group_check_request &request)
 error_code replica::apply_learned_state_from_private_log(learn_state &state)
 {
     bool duplicating = is_duplication_master();
-    // if no dunplicate, learn_start_decree=last_commit decree, step_back means whether
+    // if no duplicate, learn_start_decree=last_commit decree, step_back means whether
     // `learn_start_decree`should be stepped back to include all the
     // unconfirmed when duplicating in this round of learn. default is false
     bool step_back = false;
@@ -1490,17 +1490,19 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state)
                        _app->last_committed_decree(),
                        FLAGS_max_mutation_count_in_prepare_list,
                        [this, duplicating, step_back](mutation_ptr &mu) {
-                           if (mu->data.header.decree == _app->last_committed_decree() + 1) {
-                               // TODO: assign the returned error_code to err and check it
-                               _app->apply_mutation(mu);
-
-                               // appends logs-in-cache into plog to ensure them can be duplicated.
-                               // if current case is step back, it means the logs has been reserved
-                               // through `reset_form` above
-                               if (duplicating && !step_back) {
-                                   _private_log->append(
-                                       mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr);
-                               }
+                           if (mu->data.header.decree != _app->last_committed_decree() + 1) {
+                               return;
+                           }
+
+                           // TODO: assign the returned error_code to err and check it
+                           _app->apply_mutation(mu);
+
+                           // appends logs-in-cache into plog to ensure them can be duplicated.
+                           // if current case is step back, it means the logs has been reserved
+                           // through `reset_form` above
+                           if (duplicating && !step_back) {
+                               _private_log->append(
+                                   mu, LPC_WRITE_REPLICATION_LOG_COMMON, &_tracker, nullptr);
                            }
                        });
 
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 3a38b5723..2b3030295 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -826,8 +826,8 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
     }
 }
 
-void replica_stub::initialize_fs_manager(std::vector<std::string> &data_dirs,
-                                         std::vector<std::string> &data_dir_tags)
+void replica_stub::initialize_fs_manager(const std::vector<std::string> &data_dirs,
+                                         const std::vector<std::string> &data_dir_tags)
 {
     std::string cdir;
     std::string err_msg;
@@ -835,7 +835,7 @@ void replica_stub::initialize_fs_manager(std::vector<std::string> &data_dirs,
     std::vector<std::string> available_dirs;
     std::vector<std::string> available_dir_tags;
     for (auto i = 0; i < data_dir_tags.size(); ++i) {
-        std::string &dir = data_dirs[i];
+        const auto &dir = data_dirs[i];
         if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
                          !utils::filesystem::check_dir_rw(dir, err_msg))) {
             if (FLAGS_ignore_broken_disk) {
@@ -1345,7 +1345,7 @@ void replica_stub::on_add_learner(const group_check_request &request)
         return;
     }
 
-    LOG_INFO("{}@{}: received add learner, primary = {}, ballot ={}, status = {}, "
+    LOG_INFO("{}@{}: received add learner, primary = {}, ballot = {}, status = {}, "
              "last_committed_decree = {}",
              request.config.pid,
              _primary_address_str,
@@ -1629,6 +1629,10 @@ void replica_stub::on_node_query_reply_scatter2(replica_stub_ptr this_, gpid id)
 void replica_stub::remove_replica_on_meta_server(const app_info &info,
                                                  const partition_configuration &config)
 {
+    if (FLAGS_fd_disabled) {
+        return;
+    }
+
     dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_UPDATE_PARTITION_CONFIGURATION);
 
     std::shared_ptr<configuration_update_request> request(new configuration_update_request);
@@ -2818,24 +2822,10 @@ void replica_stub::close()
         _mem_release_timer_task = nullptr;
     }
 
+    wait_closing_replicas_finished();
+
     {
         zauto_write_lock l(_replicas_lock);
-        while (!_closing_replicas.empty()) {
-            task_ptr task = std::get<0>(_closing_replicas.begin()->second);
-            gpid tmp_gpid = _closing_replicas.begin()->first;
-            _replicas_lock.unlock_write();
-
-            task->wait();
-
-            _replicas_lock.lock_write();
-            // task will automatically remove this replica from _closing_replicas
-            if (!_closing_replicas.empty()) {
-                CHECK_NE_MSG(tmp_gpid,
-                             _closing_replicas.begin()->first,
-                             "this replica '{}' should has been removed",
-                             tmp_gpid.to_string());
-            }
-        }
 
         while (!_opening_replicas.empty()) {
             task_ptr task = _opening_replicas.begin()->second;
@@ -3200,5 +3190,27 @@ void replica_stub::update_config(const std::string &name)
     UPDATE_CONFIG(_config_sync_timer_task->update_interval, config_sync_interval_ms, name);
 }
 
+void replica_stub::wait_closing_replicas_finished()
+{
+    zauto_write_lock l(_replicas_lock);
+    while (!_closing_replicas.empty()) {
+        auto task = std::get<0>(_closing_replicas.begin()->second);
+        auto first_gpid = _closing_replicas.begin()->first;
+
+        // TODO(yingchun): improve the code
+        _replicas_lock.unlock_write();
+        task->wait();
+        _replicas_lock.lock_write();
+
+        // task will automatically remove this replica from '_closing_replicas'
+        if (!_closing_replicas.empty()) {
+            CHECK_NE_MSG(first_gpid,
+                         _closing_replicas.begin()->first,
+                         "this replica '{}' should has been removed",
+                         first_gpid);
+        }
+    }
+}
+
 } // namespace replication
 } // namespace dsn
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 55fa29055..af04af6c0 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -131,8 +131,8 @@ public:
     //
     void initialize(const replication_options &opts, bool clear = false);
     void initialize(bool clear = false);
-    void initialize_fs_manager(std::vector<std::string> &data_dirs,
-                               std::vector<std::string> &data_dir_tags);
+    void initialize_fs_manager(const std::vector<std::string> &data_dirs,
+                               const std::vector<std::string> &data_dir_tags);
     void set_options(const replication_options &opts) { _options = opts; }
     void open_service();
     void close();
@@ -264,6 +264,8 @@ public:
 
     void update_config(const std::string &name);
 
+    fs_manager *get_fs_manager() { return &_fs_manager; }
+
 private:
     enum replica_node_state
     {
@@ -356,6 +358,9 @@ private:
     void register_jemalloc_ctrl_command();
 #endif
 
+    // Wait all replicas in closing state to be finished.
+    void wait_closing_replicas_finished();
+
 private:
     friend class ::dsn::replication::test::test_checker;
     friend class ::dsn::replication::replica;
@@ -381,7 +386,7 @@ private:
     friend class replica_follower;
     friend class replica_follower_test;
     friend class replica_http_service_test;
-    FRIEND_TEST(replica_test, test_clear_on_failer);
+    FRIEND_TEST(replica_test, test_clear_on_failure);
 
     typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
     typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
diff --git a/src/replica/replication_app_base.h b/src/replica/replication_app_base.h
index e0e292d6d..29006a956 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -135,6 +135,10 @@ public:
     error_code close(bool clear_state);
 
     error_code apply_checkpoint(chkpt_apply_mode mode, const learn_state &state);
+
+    // Return code:
+    //   - ERR_OK: everything is OK.
+    //   - ERR_LOCAL_APP_FAILURE: other type of errors.
     error_code apply_mutation(const mutation *mu);
 
     // methods need to implement on storage engine side
diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp
index 20a974a43..f366407c1 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -431,10 +431,12 @@ replica_split_manager::child_apply_private_logs(std::vector<std::string> plog_fi
                        _replica->_app->last_committed_decree(),
                        FLAGS_max_mutation_count_in_prepare_list,
                        [this](mutation_ptr &mu) {
-                           if (mu->data.header.decree ==
+                           if (mu->data.header.decree !=
                                _replica->_app->last_committed_decree() + 1) {
-                               _replica->_app->apply_mutation(mu);
+                               return;
                            }
+
+                           _replica->_app->apply_mutation(mu);
                        });
 
     // replay private log
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index 4961b2eb3..e01c5612f 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -313,10 +313,11 @@ public:
         config.pid = pid;
         config.status = status;
 
-        auto data_dirs = std::vector<std::string>{"./"};
-        auto data_dirs_tag = std::vector<std::string>{"tag"};
-        initialize_fs_manager(data_dirs, data_dirs_tag);
-        auto *rep = new mock_replica(this, pid, info, "./", need_restore, is_duplication_follower);
+        // TODO(yingchun): should refactor to move to cstor or initializer.
+        initialize_fs_manager({"./"}, {"tag"});
+        std::string dir = get_replica_dir("test", pid);
+        auto *rep =
+            new mock_replica(this, pid, info, dir.c_str(), need_restore, is_duplication_follower);
         rep->set_replica_config(config);
         return rep;
     }
diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp
index 69404b6a5..42be2a2ae 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -62,6 +62,7 @@
 
 namespace dsn {
 namespace replication {
+DSN_DECLARE_bool(fd_disabled);
 DSN_DECLARE_string(cold_backup_root);
 
 class replica_test : public replica_test_base
@@ -462,13 +463,15 @@ TEST_F(replica_test, test_query_last_checkpoint_info)
     ASSERT_EQ(resp.base_local_dir, "./data/checkpoint.100");
 }
 
-TEST_F(replica_test, test_clear_on_failer)
+TEST_F(replica_test, test_clear_on_failure)
 {
+    // Disable failure detector to avoid connecting with meta server which is not started.
+    FLAGS_fd_disabled = true;
+
     replica *rep =
         stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1, false, true);
-    auto path = stub->get_replica_dir(_app_info.app_type.c_str(), pid);
+    auto path = rep->dir();
     dsn::utils::filesystem::create_directory(path);
-    ASSERT_TRUE(dsn::utils::filesystem::path_exists(path));
     ASSERT_TRUE(has_gpid(pid));
 
     stub->clear_on_failure(rep, path, pid);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org