You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/03/08 07:54:12 UTC

[incubator-pegasus] branch master updated: refactor: Move some functions from 'replica' to 'replica_stub' (#1384)

This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 37434d28d refactor: Move some functions from 'replica' to 'replica_stub' (#1384)
37434d28d is described below

commit 37434d28de9ab1cf70e2743dfc3f4eab7ab15730
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Wed Mar 8 15:54:05 2023 +0800

    refactor: Move some functions from 'replica' to 'replica_stub' (#1384)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    This is a refactor patch before fixing #1383. This patch has no functionality changes, but just including refactors:
    1. Moves functions `load()`, `newr()` and `clear_on_failure()` from class replica to class replica_stub, and the first two have been renamed to `load_replica()` and `new_replica()`.
    2. Encapsulates a new function `move_to_err_path`.
    3. Some minor refactors like fix typo.
---
 src/replica/disk_cleaner.cpp         |  13 ++-
 src/replica/disk_cleaner.h           |   2 +
 src/replica/replica.h                |  16 ---
 src/replica/replica_config.cpp       |  23 +++--
 src/replica/replica_init.cpp         | 132 ------------------------
 src/replica/replica_stub.cpp         | 189 +++++++++++++++++++++++++++--------
 src/replica/replica_stub.h           |  12 +++
 src/replica/replication_app_base.cpp |  18 ++--
 src/replica/test/replica_test.cpp    |  10 +-
 9 files changed, 198 insertions(+), 217 deletions(-)

diff --git a/src/replica/disk_cleaner.cpp b/src/replica/disk_cleaner.cpp
index 7c624036f..81ba82d82 100644
--- a/src/replica/disk_cleaner.cpp
+++ b/src/replica/disk_cleaner.cpp
@@ -21,7 +21,7 @@
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
 #include "runtime/api_layer1.h"
-
+#include <fmt/format.h>
 #include "disk_cleaner.h"
 
 namespace dsn {
@@ -125,5 +125,16 @@ error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
     }
     return error_s::ok();
 }
+
+void move_to_err_path(const std::string &path, const std::string &log_prefix)
+{
+    const std::string new_path = fmt::format("{}.{}{}", path, dsn_now_us(), kFolderSuffixErr);
+    CHECK(dsn::utils::filesystem::rename_path(path, new_path),
+          "{}: failed to move directory from '{}' to '{}'",
+          log_prefix,
+          path,
+          new_path);
+    LOG_WARNING("{}: succeed to move directory from '{}' to '{}'", log_prefix, path, new_path);
+}
 } // namespace replication
 } // namespace dsn
diff --git a/src/replica/disk_cleaner.h b/src/replica/disk_cleaner.h
index 86d3f82cc..dbcc0a862 100644
--- a/src/replica/disk_cleaner.h
+++ b/src/replica/disk_cleaner.h
@@ -69,5 +69,7 @@ inline bool is_data_dir_invalid(const std::string &dir)
     const std::string folder_suffix = dir.substr(dir.length() - 4);
     return is_data_dir_removable(dir) || folder_suffix == kFolderSuffixBak;
 }
+
+void move_to_err_path(const std::string &path, const std::string &log_prefix);
 } // namespace replication
 } // namespace dsn
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 20c9877dd..6d23c54f5 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -125,18 +125,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
 public:
     ~replica(void);
 
-    //
-    //    routines for replica stub
-    //
-    static replica *load(replica_stub *stub, const char *dir);
-    // {parent_dir} is used in partition split for get_child_dir in replica_stub
-    static replica *newr(replica_stub *stub,
-                         gpid gpid,
-                         const app_info &app,
-                         bool restore_if_necessary,
-                         bool is_duplication_follower,
-                         const std::string &parent_dir = "");
-
     // return true when the mutation is valid for the current replica
     bool replay_mutation(mutation_ptr &mu, bool is_private);
     void reset_prepare_list_after_replay();
@@ -491,10 +479,6 @@ private:
     // path = "" means using the default directory (`_dir`/.app_info)
     error_code store_app_info(app_info &info, const std::string &path = "");
 
-    // clear replica if open failed
-    static replica *
-    clear_on_failure(replica_stub *stub, replica *rep, const std::string &path, const gpid &pid);
-
     void update_app_max_replica_count(int32_t max_replica_count);
     void update_app_name(const std::string &app_name);
 
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index cd463210b..10eaf4f0b 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -634,13 +634,14 @@ bool replica::update_local_configuration(const replica_configuration &config,
     partition_status::type old_status = status();
     ballot old_ballot = get_ballot();
 
-    // skip unncessary configuration change
-    if (old_status == config.status && old_ballot == config.ballot)
+    // skip unnecessary configuration change
+    if (old_status == config.status && old_ballot == config.ballot) {
         return true;
+    }
 
     // skip invalid change
     // but do not disable transitions to partition_status::PS_ERROR as errors
-    // must be handled immmediately
+    // must be handled immediately
     switch (old_status) {
     case partition_status::PS_ERROR: {
         LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed",
@@ -716,8 +717,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
         break;
     }
 
-    bool r = false;
-    uint64_t oldTs = _last_config_change_time_ms;
+    uint64_t old_ts = _last_config_change_time_ms;
     _config = config;
     // we should durable the new ballot to prevent the inconsistent state
     if (_config.ballot > old_ballot) {
@@ -827,8 +827,8 @@ bool replica::update_local_configuration(const replica_configuration &config,
             _prepare_list->truncate(_app->last_committed_decree());
 
             // using force cleanup now as all tasks must be done already
-            r = _potential_secondary_states.cleanup(true);
-            CHECK(r, "{}: potential secondary context cleanup failed", name());
+            CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
+                             "potential secondary context cleanup failed");
 
             check_state_completeness();
             break;
@@ -840,8 +840,8 @@ bool replica::update_local_configuration(const replica_configuration &config,
             _prepare_list->reset(_app->last_committed_decree());
             _potential_secondary_states.cleanup(false);
             // => do this in close as it may block
-            // r = _potential_secondary_states.cleanup(true);
-            // CHECK(r, "{}: potential secondary context cleanup failed", name());
+            // CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
+            //                  "potential secondary context cleanup failed");
             break;
         default:
             CHECK(false, "invalid execution path");
@@ -942,7 +942,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
         _prepare_list->last_committed_decree(),
         _app->last_committed_decree(),
         _app->last_durable_decree(),
-        _last_config_change_time_ms - oldTs,
+        _last_config_change_time_ms - old_ts,
         boost::lexical_cast<std::string>(_config));
 
     if (status() != old_status) {
@@ -985,8 +985,9 @@ bool replica::update_local_configuration(const replica_configuration &config,
 
 bool replica::update_local_configuration_with_no_ballot_change(partition_status::type s)
 {
-    if (status() == s)
+    if (status() == s) {
         return false;
+    }
 
     auto config = _config;
     config.status = s;
diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp
index b7e932e6c..637f2c130 100644
--- a/src/replica/replica_init.cpp
+++ b/src/replica/replica_init.cpp
@@ -76,61 +76,6 @@ error_code replica::initialize_on_new()
     return init_app_and_prepare_list(true);
 }
 
-/*static*/ replica *replica::newr(replica_stub *stub,
-                                  gpid gpid,
-                                  const app_info &app,
-                                  bool restore_if_necessary,
-                                  bool is_duplication_follower,
-                                  const std::string &parent_dir)
-{
-    std::string dir;
-    if (parent_dir.empty()) {
-        dir = stub->get_replica_dir(app.app_type.c_str(), gpid);
-    } else {
-        dir = stub->get_child_dir(app.app_type.c_str(), gpid, parent_dir);
-    }
-    replica *rep =
-        new replica(stub, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
-    error_code err;
-    if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) {
-        LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err.to_string());
-        return clear_on_failure(stub, rep, dir, gpid);
-    }
-
-    if (is_duplication_follower &&
-        (err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) {
-        LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check "
-                  "previous detail error log",
-                  rep->name(),
-                  err.to_string());
-        return clear_on_failure(stub, rep, dir, gpid);
-    }
-
-    err = rep->initialize_on_new();
-    if (err == ERR_OK) {
-        LOG_DEBUG("{}: new replica succeed", rep->name());
-        return rep;
-    } else {
-        LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err.to_string());
-        return clear_on_failure(stub, rep, dir, gpid);
-    }
-}
-
-/* static */ replica *replica::clear_on_failure(replica_stub *stub,
-                                                replica *rep,
-                                                const std::string &path,
-                                                const gpid &pid)
-{
-    rep->close();
-    delete rep;
-    rep = nullptr;
-
-    // clear work on failure
-    utils::filesystem::remove_path(path);
-    stub->_fs_manager.remove_replica(pid);
-    return nullptr;
-}
-
 error_code replica::initialize_on_load()
 {
     LOG_INFO_PREFIX("initialize replica on load, dir = {}", _dir);
@@ -143,83 +88,6 @@ error_code replica::initialize_on_load()
     return init_app_and_prepare_list(false);
 }
 
-/*static*/ replica *replica::load(replica_stub *stub, const char *dir)
-{
-    FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; });
-
-    char splitters[] = {'\\', '/', 0};
-    std::string name = utils::get_last_component(std::string(dir), splitters);
-    if (name == "") {
-        LOG_ERROR("invalid replica dir {}", dir);
-        return nullptr;
-    }
-
-    char app_type[128];
-    int32_t app_id, pidx;
-    if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
-        LOG_ERROR("invalid replica dir {}", dir);
-        return nullptr;
-    }
-
-    gpid pid(app_id, pidx);
-    if (!utils::filesystem::directory_exists(dir)) {
-        LOG_ERROR("replica dir {} not exist", dir);
-        return nullptr;
-    }
-
-    dsn::app_info info;
-    replica_app_info info2(&info);
-    std::string path = utils::filesystem::path_combine(dir, kAppInfo);
-    auto err = info2.load(path);
-    if (ERR_OK != err) {
-        LOG_ERROR("load app-info from {} failed, err = {}", path, err);
-        return nullptr;
-    }
-
-    if (info.app_type != app_type) {
-        LOG_ERROR("unmatched app type {} for {}", info.app_type, path);
-        return nullptr;
-    }
-
-    if (info.partition_count < pidx) {
-        LOG_ERROR("partition[{}], count={}, this replica may be partition split garbage partition, "
-                  "ignore it",
-                  pid,
-                  info.partition_count);
-        return nullptr;
-    }
-
-    replica *rep = new replica(stub, pid, info, dir, false);
-
-    err = rep->initialize_on_load();
-    if (err == ERR_OK) {
-        LOG_INFO("{}: load replica succeed", rep->name());
-        return rep;
-    } else {
-        LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
-        rep->close();
-        delete rep;
-        rep = nullptr;
-
-        // clear work on failure
-        if (dsn::utils::filesystem::directory_exists(dir)) {
-            char rename_dir[1024];
-            sprintf(rename_dir, "%s.%" PRIu64 ".err", dir, dsn_now_us());
-            CHECK(dsn::utils::filesystem::rename_path(dir, rename_dir),
-                  "load_replica: failed to move directory '{}' to '{}'",
-                  dir,
-                  rename_dir);
-            LOG_WARNING("load_replica: replica_dir_op succeed to move directory '{}' to '{}'",
-                        dir,
-                        rename_dir);
-            stub->_counter_replicas_recent_replica_move_error_count->increment();
-            stub->_fs_manager.remove_replica(pid);
-        }
-
-        return nullptr;
-    }
-}
-
 decree replica::get_replay_start_decree()
 {
     decree replay_start_decree = _app->last_committed_decree();
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index e38112867..13584f517 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -55,6 +55,7 @@
 #include <vector>
 #include <deque>
 #include "utils/fmt_logging.h"
+#include "replica/duplication/replica_follower.h"
 #ifdef DSN_ENABLE_GPERF
 #include <gperftools/malloc_extension.h>
 #elif defined(DSN_USE_JEMALLOC)
@@ -610,7 +611,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
                                  [this, dir, &rps, &rps_lock] {
                                      LOG_INFO("process dir {}", dir);
 
-                                     auto r = replica::load(this, dir.c_str());
+                                     auto r = load_replica(dir.c_str());
                                      if (r != nullptr) {
                                          LOG_INFO("{}@{}: load replica '{}' success, <durable, "
                                                   "commit> = <{}, {}>, last_prepared_decree = {}",
@@ -682,17 +683,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
         // TODO: checkpoint latest state and update on meta server so learning is cheaper
         for (auto it = rps.begin(); it != rps.end(); ++it) {
             it->second->close();
-            // move to '.err' directory
-            const char *dir = it->second->dir().c_str();
-            char rename_dir[1024];
-            sprintf(rename_dir, "%s.%" PRIu64 ".err", dir, dsn_now_us());
-            CHECK(dsn::utils::filesystem::rename_path(dir, rename_dir),
-                  "init_replica: failed to move directory '{}' to '{}'",
-                  dir,
-                  rename_dir);
-            LOG_WARNING("init_replica: replica_dir_op succeed to move directory '{}' to '{}'",
-                        dir,
-                        rename_dir);
+            move_to_err_path(it->second->dir(), "initialize replica");
             _counter_replicas_recent_replica_move_error_count->increment();
         }
         rps.clear();
@@ -2059,7 +2050,7 @@ void replica_stub::open_replica(
                  _primary_address_str,
                  group_check ? "with" : "without",
                  dir);
-        rep = replica::load(this, dir.c_str());
+        rep = load_replica(dir.c_str());
 
         // if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk
         // migration
@@ -2082,7 +2073,7 @@ void replica_stub::open_replica(
                 boost::replace_first(
                     origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, "");
                 dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir);
-                rep = replica::load(this, origin_dir.c_str());
+                rep = load_replica(origin_dir.c_str());
 
                 FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> void {});
             }
@@ -2132,7 +2123,7 @@ void replica_stub::open_replica(
                   "remove useless directory({}) failed",
                   dir);
         }
-        rep = replica::newr(this, id, app, restore_if_necessary, is_duplication_follower);
+        rep = new_replica(id, app, restore_if_necessary, is_duplication_follower);
     }
 
     if (rep == nullptr) {
@@ -2177,6 +2168,127 @@ void replica_stub::open_replica(
     }
 }
 
+replica *replica_stub::new_replica(gpid gpid,
+                                   const app_info &app,
+                                   bool restore_if_necessary,
+                                   bool is_duplication_follower,
+                                   const std::string &parent_dir)
+{
+    std::string dir;
+    if (parent_dir.empty()) {
+        dir = get_replica_dir(app.app_type.c_str(), gpid);
+    } else {
+        dir = get_child_dir(app.app_type.c_str(), gpid, parent_dir);
+    }
+    auto *rep =
+        new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
+    error_code err;
+    if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) {
+        LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err);
+        clear_on_failure(rep, dir, gpid);
+        return nullptr;
+    }
+
+    if (is_duplication_follower &&
+        (err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) {
+        LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check "
+                  "previous detail error log",
+                  rep->name(),
+                  err);
+        clear_on_failure(rep, dir, gpid);
+        return nullptr;
+    }
+
+    err = rep->initialize_on_new();
+    if (err != ERR_OK) {
+        LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err);
+        clear_on_failure(rep, dir, gpid);
+        return nullptr;
+    }
+
+    LOG_DEBUG("{}: new replica succeed", rep->name());
+    return rep;
+}
+
+replica *replica_stub::load_replica(const char *dir)
+{
+    FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; });
+
+    char splitters[] = {'\\', '/', 0};
+    std::string name = utils::get_last_component(std::string(dir), splitters);
+    if (name.empty()) {
+        LOG_ERROR("invalid replica dir {}", dir);
+        return nullptr;
+    }
+
+    char app_type[128];
+    int32_t app_id, pidx;
+    if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
+        LOG_ERROR("invalid replica dir {}", dir);
+        return nullptr;
+    }
+
+    gpid pid(app_id, pidx);
+    if (!utils::filesystem::directory_exists(dir)) {
+        LOG_ERROR("replica dir {} not exist", dir);
+        return nullptr;
+    }
+
+    dsn::app_info info;
+    replica_app_info info2(&info);
+    std::string path = utils::filesystem::path_combine(dir, replica::kAppInfo);
+    auto err = info2.load(path);
+    if (ERR_OK != err) {
+        LOG_ERROR("load app-info from {} failed, err = {}", path, err);
+        return nullptr;
+    }
+
+    if (info.app_type != app_type) {
+        LOG_ERROR("unmatched app type {} for {}", info.app_type, path);
+        return nullptr;
+    }
+
+    if (info.partition_count < pidx) {
+        LOG_ERROR("partition[{}], count={}, this replica may be partition split garbage partition, "
+                  "ignore it",
+                  pid,
+                  info.partition_count);
+        return nullptr;
+    }
+
+    auto *rep = new replica(this, pid, info, dir, false);
+    err = rep->initialize_on_load();
+    if (err != ERR_OK) {
+        LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
+        rep->close();
+        delete rep;
+        rep = nullptr;
+
+        // clear work on failure
+        if (dsn::utils::filesystem::directory_exists(dir)) {
+            move_to_err_path(dir, "load replica");
+            _counter_replicas_recent_replica_move_error_count->increment();
+            _fs_manager.remove_replica(pid);
+        }
+
+        return nullptr;
+    }
+
+    LOG_INFO("{}: load replica succeed", rep->name());
+    return rep;
+}
+
+void replica_stub::clear_on_failure(replica *rep, const std::string &path, const gpid &pid)
+{
+    rep->close();
+    delete rep;
+    rep = nullptr;
+
+    // clear work on failure
+    utils::filesystem::remove_path(path);
+    _fs_manager.remove_replica(pid);
+}
+
 task_ptr replica_stub::begin_close_replica(replica_ptr r)
 {
     CHECK(r->status() == partition_status::PS_ERROR ||
@@ -2191,32 +2303,31 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
     gpid id = r->get_gpid();
 
     zauto_write_lock l(_replicas_lock);
+    if (_replicas.erase(id) == 0) {
+        return nullptr;
+    }
 
-    if (_replicas.erase(id) > 0) {
-        _counter_replicas_count->decrement();
-
-        int delay_ms = 0;
-        if (r->status() == partition_status::PS_INACTIVE) {
-            delay_ms = FLAGS_gc_memory_replica_interval_ms;
-            LOG_INFO("{}: delay {} milliseconds to close replica, status = PS_INACTIVE",
-                     r->name(),
-                     delay_ms);
-        }
+    _counter_replicas_count->decrement();
 
-        app_info a_info = *(r->get_app_info());
-        replica_info r_info;
-        get_replica_info(r_info, r);
-        task_ptr task = tasking::enqueue(LPC_CLOSE_REPLICA,
-                                         &_tracker,
-                                         [=]() { close_replica(r); },
-                                         0,
-                                         std::chrono::milliseconds(delay_ms));
-        _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info));
-        _counter_replicas_closing_count->increment();
-        return task;
-    } else {
-        return nullptr;
+    int delay_ms = 0;
+    if (r->status() == partition_status::PS_INACTIVE) {
+        delay_ms = FLAGS_gc_memory_replica_interval_ms;
+        LOG_INFO("{}: delay {} milliseconds to close replica, status = PS_INACTIVE",
+                 r->name(),
+                 delay_ms);
     }
+
+    app_info a_info = *(r->get_app_info());
+    replica_info r_info;
+    get_replica_info(r_info, r);
+    task_ptr task = tasking::enqueue(LPC_CLOSE_REPLICA,
+                                     &_tracker,
+                                     [=]() { close_replica(r); },
+                                     0,
+                                     std::chrono::milliseconds(delay_ms));
+    _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info));
+    _counter_replicas_closing_count->increment();
+    return task;
 }
 
 void replica_stub::close_replica(replica_ptr r)
@@ -2865,7 +2976,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
             LOG_WARNING("failed create child replica({}) because it is under close", child_pid);
             return nullptr;
         } else {
-            replica *rep = replica::newr(this, child_pid, *app, false, false, parent_dir);
+            replica *rep = new_replica(child_pid, *app, false, false, parent_dir);
             if (rep != nullptr) {
                 auto pr = _replicas.insert(replicas::value_type(child_pid, rep));
                 CHECK(pr.second, "child replica {} has been existed", rep->name());
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 693ed6a27..648e0c1c1 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -265,6 +265,17 @@ private:
                       gpid id,
                       const std::shared_ptr<group_check_request> &req,
                       const std::shared_ptr<configuration_update_request> &req2);
+    // Create a new replica according to the parameters.
+    // 'parent_dir' is used in partition split for get_child_dir().
+    replica *new_replica(gpid gpid,
+                         const app_info &app,
+                         bool restore_if_necessary,
+                         bool is_duplication_follower,
+                         const std::string &parent_dir = "");
+    // Load an existing replica from 'dir'.
+    replica *load_replica(const char *dir);
+    // Clean up the memory state and on disk data if creating replica failed.
+    void clear_on_failure(replica *rep, const std::string &path, const gpid &pid);
     task_ptr begin_close_replica(replica_ptr r);
     void close_replica(replica_ptr r);
     void notify_replica_state_update(const replica_configuration &config, bool is_closing);
@@ -338,6 +349,7 @@ private:
     friend class replica_follower;
     friend class replica_follower_test;
     friend class replica_http_service_test;
+    FRIEND_TEST(replica_test, test_clear_on_failer);
 
     typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
     typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp
index 281732246..e08fdd77d 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -413,15 +413,15 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
 
     if (perror != 0) {
         LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), perror);
-        // for normal write requests, if got rocksdb error, this replica will be set error and evoke
-        // learn for ingestion requests, should not do as normal write requests, there are two
-        // reasons:
-        // 1. all ingestion errors should be handled by meta server in function
-        // `on_partition_ingestion_reply`, rocksdb error will be returned to meta server in
-        // structure `ingestion_response`, not in this function
-        // 2. if replica apply ingestion mutation during learn, it may got error from rocksdb,
-        // because the external sst files may not exist, in this case, we won't consider it as an
-        // error
+        // For normal write requests, if got rocksdb error, this replica will be set error and evoke
+        // learn.
+        // For ingestion requests, should not do as normal write requests, there are two reasons:
+        //   1. All ingestion errors should be handled by meta server in function
+        //      `on_partition_ingestion_reply`, rocksdb error will be returned to meta server in
+        //      structure `ingestion_response`, not in this function.
+        //   2. If replica apply ingestion mutation during learn, it may get error from rocksdb,
+        //      because the external sst files may not exist, in this case, we won't consider it as
+        //      an error.
         if (!has_ingestion_request) {
             return ERR_LOCAL_APP_FAILURE;
         }
diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp
index 545dff489..f1155155d 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -168,14 +168,6 @@ public:
 
     bool is_checkpointing() { return _mock_replica->_is_manual_emergency_checkpointing; }
 
-    replica *call_clear_on_failure(replica_stub *stub,
-                                   replica *rep,
-                                   const std::string &path,
-                                   const gpid &gpid)
-    {
-        return replica::clear_on_failure(stub, rep, path, gpid);
-    }
-
     bool has_gpid(gpid &gpid) const
     {
         for (const auto &node : stub->_fs_manager._dir_nodes) {
@@ -445,7 +437,7 @@ TEST_F(replica_test, test_clear_on_failer)
     ASSERT_TRUE(dsn::utils::filesystem::path_exists(path));
     ASSERT_TRUE(has_gpid(pid));
 
-    ASSERT_FALSE(call_clear_on_failure(stub.get(), rep, path, pid));
+    stub->clear_on_failure(rep, path, pid);
 
     ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
     ASSERT_FALSE(has_gpid(pid));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org