You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by wa...@apache.org on 2023/03/08 07:54:12 UTC
[incubator-pegasus] branch master updated: refactor: Move some functions from 'replica' to 'replica_stub' (#1384)
This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 37434d28d refactor: Move some functions from 'replica' to 'replica_stub' (#1384)
37434d28d is described below
commit 37434d28de9ab1cf70e2743dfc3f4eab7ab15730
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Wed Mar 8 15:54:05 2023 +0800
refactor: Move some functions from 'replica' to 'replica_stub' (#1384)
https://github.com/apache/incubator-pegasus/issues/1383
This is a refactor patch before fixing #1383. This patch has no functionality changes, but just including refactors:
1. Moves functions `load()`, `newr()` and `clear_on_failure()` from class replica to class replica_stub, and the first two have been renamed to `load_replica()` and `new_replica()`.
2. Encapsulates a new function `move_to_err_path`.
3. Some minor refactors like fix typo.
---
src/replica/disk_cleaner.cpp | 13 ++-
src/replica/disk_cleaner.h | 2 +
src/replica/replica.h | 16 ---
src/replica/replica_config.cpp | 23 +++--
src/replica/replica_init.cpp | 132 ------------------------
src/replica/replica_stub.cpp | 189 +++++++++++++++++++++++++++--------
src/replica/replica_stub.h | 12 +++
src/replica/replication_app_base.cpp | 18 ++--
src/replica/test/replica_test.cpp | 10 +-
9 files changed, 198 insertions(+), 217 deletions(-)
diff --git a/src/replica/disk_cleaner.cpp b/src/replica/disk_cleaner.cpp
index 7c624036f..81ba82d82 100644
--- a/src/replica/disk_cleaner.cpp
+++ b/src/replica/disk_cleaner.cpp
@@ -21,7 +21,7 @@
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "runtime/api_layer1.h"
-
+#include <fmt/format.h>
#include "disk_cleaner.h"
namespace dsn {
@@ -125,5 +125,16 @@ error_s disk_remove_useless_dirs(const std::vector<std::string> &data_dirs,
}
return error_s::ok();
}
+
+void move_to_err_path(const std::string &path, const std::string &log_prefix)
+{
+ const std::string new_path = fmt::format("{}.{}{}", path, dsn_now_us(), kFolderSuffixErr);
+ CHECK(dsn::utils::filesystem::rename_path(path, new_path),
+ "{}: failed to move directory from '{}' to '{}'",
+ log_prefix,
+ path,
+ new_path);
+ LOG_WARNING("{}: succeed to move directory from '{}' to '{}'", log_prefix, path, new_path);
+}
} // namespace replication
} // namespace dsn
diff --git a/src/replica/disk_cleaner.h b/src/replica/disk_cleaner.h
index 86d3f82cc..dbcc0a862 100644
--- a/src/replica/disk_cleaner.h
+++ b/src/replica/disk_cleaner.h
@@ -69,5 +69,7 @@ inline bool is_data_dir_invalid(const std::string &dir)
const std::string folder_suffix = dir.substr(dir.length() - 4);
return is_data_dir_removable(dir) || folder_suffix == kFolderSuffixBak;
}
+
+void move_to_err_path(const std::string &path, const std::string &log_prefix);
} // namespace replication
} // namespace dsn
diff --git a/src/replica/replica.h b/src/replica/replica.h
index 20c9877dd..6d23c54f5 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -125,18 +125,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
public:
~replica(void);
- //
- // routines for replica stub
- //
- static replica *load(replica_stub *stub, const char *dir);
- // {parent_dir} is used in partition split for get_child_dir in replica_stub
- static replica *newr(replica_stub *stub,
- gpid gpid,
- const app_info &app,
- bool restore_if_necessary,
- bool is_duplication_follower,
- const std::string &parent_dir = "");
-
// return true when the mutation is valid for the current replica
bool replay_mutation(mutation_ptr &mu, bool is_private);
void reset_prepare_list_after_replay();
@@ -491,10 +479,6 @@ private:
// path = "" means using the default directory (`_dir`/.app_info)
error_code store_app_info(app_info &info, const std::string &path = "");
- // clear replica if open failed
- static replica *
- clear_on_failure(replica_stub *stub, replica *rep, const std::string &path, const gpid &pid);
-
void update_app_max_replica_count(int32_t max_replica_count);
void update_app_name(const std::string &app_name);
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index cd463210b..10eaf4f0b 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -634,13 +634,14 @@ bool replica::update_local_configuration(const replica_configuration &config,
partition_status::type old_status = status();
ballot old_ballot = get_ballot();
- // skip unncessary configuration change
- if (old_status == config.status && old_ballot == config.ballot)
+ // skip unnecessary configuration change
+ if (old_status == config.status && old_ballot == config.ballot) {
return true;
+ }
// skip invalid change
// but do not disable transitions to partition_status::PS_ERROR as errors
- // must be handled immmediately
+ // must be handled immediately
switch (old_status) {
case partition_status::PS_ERROR: {
LOG_WARNING_PREFIX("status change from {} @ {} to {} @ {} is not allowed",
@@ -716,8 +717,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
break;
}
- bool r = false;
- uint64_t oldTs = _last_config_change_time_ms;
+ uint64_t old_ts = _last_config_change_time_ms;
_config = config;
// we should durable the new ballot to prevent the inconsistent state
if (_config.ballot > old_ballot) {
@@ -827,8 +827,8 @@ bool replica::update_local_configuration(const replica_configuration &config,
_prepare_list->truncate(_app->last_committed_decree());
// using force cleanup now as all tasks must be done already
- r = _potential_secondary_states.cleanup(true);
- CHECK(r, "{}: potential secondary context cleanup failed", name());
+ CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
+ "potential secondary context cleanup failed");
check_state_completeness();
break;
@@ -840,8 +840,8 @@ bool replica::update_local_configuration(const replica_configuration &config,
_prepare_list->reset(_app->last_committed_decree());
_potential_secondary_states.cleanup(false);
// => do this in close as it may block
- // r = _potential_secondary_states.cleanup(true);
- // CHECK(r, "{}: potential secondary context cleanup failed", name());
+ // CHECK_PREFIX_MSG(_potential_secondary_states.cleanup(true),
+ // "potential secondary context cleanup failed");
break;
default:
CHECK(false, "invalid execution path");
@@ -942,7 +942,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
_prepare_list->last_committed_decree(),
_app->last_committed_decree(),
_app->last_durable_decree(),
- _last_config_change_time_ms - oldTs,
+ _last_config_change_time_ms - old_ts,
boost::lexical_cast<std::string>(_config));
if (status() != old_status) {
@@ -985,8 +985,9 @@ bool replica::update_local_configuration(const replica_configuration &config,
bool replica::update_local_configuration_with_no_ballot_change(partition_status::type s)
{
- if (status() == s)
+ if (status() == s) {
return false;
+ }
auto config = _config;
config.status = s;
diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp
index b7e932e6c..637f2c130 100644
--- a/src/replica/replica_init.cpp
+++ b/src/replica/replica_init.cpp
@@ -76,61 +76,6 @@ error_code replica::initialize_on_new()
return init_app_and_prepare_list(true);
}
-/*static*/ replica *replica::newr(replica_stub *stub,
- gpid gpid,
- const app_info &app,
- bool restore_if_necessary,
- bool is_duplication_follower,
- const std::string &parent_dir)
-{
- std::string dir;
- if (parent_dir.empty()) {
- dir = stub->get_replica_dir(app.app_type.c_str(), gpid);
- } else {
- dir = stub->get_child_dir(app.app_type.c_str(), gpid, parent_dir);
- }
- replica *rep =
- new replica(stub, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
- error_code err;
- if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) {
- LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err.to_string());
- return clear_on_failure(stub, rep, dir, gpid);
- }
-
- if (is_duplication_follower &&
- (err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) {
- LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check "
- "previous detail error log",
- rep->name(),
- err.to_string());
- return clear_on_failure(stub, rep, dir, gpid);
- }
-
- err = rep->initialize_on_new();
- if (err == ERR_OK) {
- LOG_DEBUG("{}: new replica succeed", rep->name());
- return rep;
- } else {
- LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err.to_string());
- return clear_on_failure(stub, rep, dir, gpid);
- }
-}
-
-/* static */ replica *replica::clear_on_failure(replica_stub *stub,
- replica *rep,
- const std::string &path,
- const gpid &pid)
-{
- rep->close();
- delete rep;
- rep = nullptr;
-
- // clear work on failure
- utils::filesystem::remove_path(path);
- stub->_fs_manager.remove_replica(pid);
- return nullptr;
-}
-
error_code replica::initialize_on_load()
{
LOG_INFO_PREFIX("initialize replica on load, dir = {}", _dir);
@@ -143,83 +88,6 @@ error_code replica::initialize_on_load()
return init_app_and_prepare_list(false);
}
-/*static*/ replica *replica::load(replica_stub *stub, const char *dir)
-{
- FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; });
-
- char splitters[] = {'\\', '/', 0};
- std::string name = utils::get_last_component(std::string(dir), splitters);
- if (name == "") {
- LOG_ERROR("invalid replica dir {}", dir);
- return nullptr;
- }
-
- char app_type[128];
- int32_t app_id, pidx;
- if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
- LOG_ERROR("invalid replica dir {}", dir);
- return nullptr;
- }
-
- gpid pid(app_id, pidx);
- if (!utils::filesystem::directory_exists(dir)) {
- LOG_ERROR("replica dir {} not exist", dir);
- return nullptr;
- }
-
- dsn::app_info info;
- replica_app_info info2(&info);
- std::string path = utils::filesystem::path_combine(dir, kAppInfo);
- auto err = info2.load(path);
- if (ERR_OK != err) {
- LOG_ERROR("load app-info from {} failed, err = {}", path, err);
- return nullptr;
- }
-
- if (info.app_type != app_type) {
- LOG_ERROR("unmatched app type {} for {}", info.app_type, path);
- return nullptr;
- }
-
- if (info.partition_count < pidx) {
- LOG_ERROR("partition[{}], count={}, this replica may be partition split garbage partition, "
- "ignore it",
- pid,
- info.partition_count);
- return nullptr;
- }
-
- replica *rep = new replica(stub, pid, info, dir, false);
-
- err = rep->initialize_on_load();
- if (err == ERR_OK) {
- LOG_INFO("{}: load replica succeed", rep->name());
- return rep;
- } else {
- LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
- rep->close();
- delete rep;
- rep = nullptr;
-
- // clear work on failure
- if (dsn::utils::filesystem::directory_exists(dir)) {
- char rename_dir[1024];
- sprintf(rename_dir, "%s.%" PRIu64 ".err", dir, dsn_now_us());
- CHECK(dsn::utils::filesystem::rename_path(dir, rename_dir),
- "load_replica: failed to move directory '{}' to '{}'",
- dir,
- rename_dir);
- LOG_WARNING("load_replica: replica_dir_op succeed to move directory '{}' to '{}'",
- dir,
- rename_dir);
- stub->_counter_replicas_recent_replica_move_error_count->increment();
- stub->_fs_manager.remove_replica(pid);
- }
-
- return nullptr;
- }
-}
-
decree replica::get_replay_start_decree()
{
decree replay_start_decree = _app->last_committed_decree();
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index e38112867..13584f517 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -55,6 +55,7 @@
#include <vector>
#include <deque>
#include "utils/fmt_logging.h"
+#include "replica/duplication/replica_follower.h"
#ifdef DSN_ENABLE_GPERF
#include <gperftools/malloc_extension.h>
#elif defined(DSN_USE_JEMALLOC)
@@ -610,7 +611,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
[this, dir, &rps, &rps_lock] {
LOG_INFO("process dir {}", dir);
- auto r = replica::load(this, dir.c_str());
+ auto r = load_replica(dir.c_str());
if (r != nullptr) {
LOG_INFO("{}@{}: load replica '{}' success, <durable, "
"commit> = <{}, {}>, last_prepared_decree = {}",
@@ -682,17 +683,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f
// TODO: checkpoint latest state and update on meta server so learning is cheaper
for (auto it = rps.begin(); it != rps.end(); ++it) {
it->second->close();
- // move to '.err' directory
- const char *dir = it->second->dir().c_str();
- char rename_dir[1024];
- sprintf(rename_dir, "%s.%" PRIu64 ".err", dir, dsn_now_us());
- CHECK(dsn::utils::filesystem::rename_path(dir, rename_dir),
- "init_replica: failed to move directory '{}' to '{}'",
- dir,
- rename_dir);
- LOG_WARNING("init_replica: replica_dir_op succeed to move directory '{}' to '{}'",
- dir,
- rename_dir);
+ move_to_err_path(it->second->dir(), "initialize replica");
_counter_replicas_recent_replica_move_error_count->increment();
}
rps.clear();
@@ -2059,7 +2050,7 @@ void replica_stub::open_replica(
_primary_address_str,
group_check ? "with" : "without",
dir);
- rep = replica::load(this, dir.c_str());
+ rep = load_replica(dir.c_str());
// if load data failed, re-open the `*.ori` folder which is the origin replica dir of disk
// migration
@@ -2082,7 +2073,7 @@ void replica_stub::open_replica(
boost::replace_first(
origin_dir, replica_disk_migrator::kReplicaDirOriginSuffix, "");
dsn::utils::filesystem::rename_path(origin_tmp_dir, origin_dir);
- rep = replica::load(this, origin_dir.c_str());
+ rep = load_replica(origin_dir.c_str());
FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> void {});
}
@@ -2132,7 +2123,7 @@ void replica_stub::open_replica(
"remove useless directory({}) failed",
dir);
}
- rep = replica::newr(this, id, app, restore_if_necessary, is_duplication_follower);
+ rep = new_replica(id, app, restore_if_necessary, is_duplication_follower);
}
if (rep == nullptr) {
@@ -2177,6 +2168,127 @@ void replica_stub::open_replica(
}
}
+replica *replica_stub::new_replica(gpid gpid,
+ const app_info &app,
+ bool restore_if_necessary,
+ bool is_duplication_follower,
+ const std::string &parent_dir)
+{
+ std::string dir;
+ if (parent_dir.empty()) {
+ dir = get_replica_dir(app.app_type.c_str(), gpid);
+ } else {
+ dir = get_child_dir(app.app_type.c_str(), gpid, parent_dir);
+ }
+ auto *rep =
+ new replica(this, gpid, app, dir.c_str(), restore_if_necessary, is_duplication_follower);
+ error_code err;
+ if (restore_if_necessary && (err = rep->restore_checkpoint()) != dsn::ERR_OK) {
+ LOG_ERROR("{}: try to restore replica failed, error({})", rep->name(), err);
+ clear_on_failure(rep, dir, gpid);
+ return nullptr;
+ }
+
+ if (is_duplication_follower &&
+ (err = rep->get_replica_follower()->duplicate_checkpoint()) != dsn::ERR_OK) {
+ LOG_ERROR("{}: try to duplicate replica checkpoint failed, error({}) and please check "
+ "previous detail error log",
+ rep->name(),
+ err);
+ clear_on_failure(rep, dir, gpid);
+ return nullptr;
+ }
+
+ err = rep->initialize_on_new();
+ if (err != ERR_OK) {
+ LOG_ERROR("{}: new replica failed, err = {}", rep->name(), err);
+ clear_on_failure(rep, dir, gpid);
+ return nullptr;
+ }
+
+ LOG_DEBUG("{}: new replica succeed", rep->name());
+ return rep;
+}
+
+replica *replica_stub::load_replica(const char *dir)
+{
+ FAIL_POINT_INJECT_F("mock_replica_load", [&](string_view) -> replica * { return nullptr; });
+
+ char splitters[] = {'\\', '/', 0};
+ std::string name = utils::get_last_component(std::string(dir), splitters);
+ if (name.empty()) {
+ LOG_ERROR("invalid replica dir {}", dir);
+ return nullptr;
+ }
+
+ char app_type[128];
+ int32_t app_id, pidx;
+ if (3 != sscanf(name.c_str(), "%d.%d.%s", &app_id, &pidx, app_type)) {
+ LOG_ERROR("invalid replica dir {}", dir);
+ return nullptr;
+ }
+
+ gpid pid(app_id, pidx);
+ if (!utils::filesystem::directory_exists(dir)) {
+ LOG_ERROR("replica dir {} not exist", dir);
+ return nullptr;
+ }
+
+ dsn::app_info info;
+ replica_app_info info2(&info);
+ std::string path = utils::filesystem::path_combine(dir, replica::kAppInfo);
+ auto err = info2.load(path);
+ if (ERR_OK != err) {
+ LOG_ERROR("load app-info from {} failed, err = {}", path, err);
+ return nullptr;
+ }
+
+ if (info.app_type != app_type) {
+ LOG_ERROR("unmatched app type {} for {}", info.app_type, path);
+ return nullptr;
+ }
+
+ if (info.partition_count < pidx) {
+ LOG_ERROR("partition[{}], count={}, this replica may be partition split garbage partition, "
+ "ignore it",
+ pid,
+ info.partition_count);
+ return nullptr;
+ }
+
+ auto *rep = new replica(this, pid, info, dir, false);
+ err = rep->initialize_on_load();
+ if (err != ERR_OK) {
+ LOG_ERROR("{}: load replica failed, err = {}", rep->name(), err);
+ rep->close();
+ delete rep;
+ rep = nullptr;
+
+ // clear work on failure
+ if (dsn::utils::filesystem::directory_exists(dir)) {
+ move_to_err_path(dir, "load replica");
+ _counter_replicas_recent_replica_move_error_count->increment();
+ _fs_manager.remove_replica(pid);
+ }
+
+ return nullptr;
+ }
+
+ LOG_INFO("{}: load replica succeed", rep->name());
+ return rep;
+}
+
+void replica_stub::clear_on_failure(replica *rep, const std::string &path, const gpid &pid)
+{
+ rep->close();
+ delete rep;
+ rep = nullptr;
+
+ // clear work on failure
+ utils::filesystem::remove_path(path);
+ _fs_manager.remove_replica(pid);
+}
+
task_ptr replica_stub::begin_close_replica(replica_ptr r)
{
CHECK(r->status() == partition_status::PS_ERROR ||
@@ -2191,32 +2303,31 @@ task_ptr replica_stub::begin_close_replica(replica_ptr r)
gpid id = r->get_gpid();
zauto_write_lock l(_replicas_lock);
+ if (_replicas.erase(id) == 0) {
+ return nullptr;
+ }
- if (_replicas.erase(id) > 0) {
- _counter_replicas_count->decrement();
-
- int delay_ms = 0;
- if (r->status() == partition_status::PS_INACTIVE) {
- delay_ms = FLAGS_gc_memory_replica_interval_ms;
- LOG_INFO("{}: delay {} milliseconds to close replica, status = PS_INACTIVE",
- r->name(),
- delay_ms);
- }
+ _counter_replicas_count->decrement();
- app_info a_info = *(r->get_app_info());
- replica_info r_info;
- get_replica_info(r_info, r);
- task_ptr task = tasking::enqueue(LPC_CLOSE_REPLICA,
- &_tracker,
- [=]() { close_replica(r); },
- 0,
- std::chrono::milliseconds(delay_ms));
- _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info));
- _counter_replicas_closing_count->increment();
- return task;
- } else {
- return nullptr;
+ int delay_ms = 0;
+ if (r->status() == partition_status::PS_INACTIVE) {
+ delay_ms = FLAGS_gc_memory_replica_interval_ms;
+ LOG_INFO("{}: delay {} milliseconds to close replica, status = PS_INACTIVE",
+ r->name(),
+ delay_ms);
}
+
+ app_info a_info = *(r->get_app_info());
+ replica_info r_info;
+ get_replica_info(r_info, r);
+ task_ptr task = tasking::enqueue(LPC_CLOSE_REPLICA,
+ &_tracker,
+ [=]() { close_replica(r); },
+ 0,
+ std::chrono::milliseconds(delay_ms));
+ _closing_replicas[id] = std::make_tuple(task, r, std::move(a_info), std::move(r_info));
+ _counter_replicas_closing_count->increment();
+ return task;
}
void replica_stub::close_replica(replica_ptr r)
@@ -2865,7 +2976,7 @@ replica_ptr replica_stub::create_child_replica_if_not_found(gpid child_pid,
LOG_WARNING("failed create child replica({}) because it is under close", child_pid);
return nullptr;
} else {
- replica *rep = replica::newr(this, child_pid, *app, false, false, parent_dir);
+ replica *rep = new_replica(child_pid, *app, false, false, parent_dir);
if (rep != nullptr) {
auto pr = _replicas.insert(replicas::value_type(child_pid, rep));
CHECK(pr.second, "child replica {} has been existed", rep->name());
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 693ed6a27..648e0c1c1 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -265,6 +265,17 @@ private:
gpid id,
const std::shared_ptr<group_check_request> &req,
const std::shared_ptr<configuration_update_request> &req2);
+ // Create a new replica according to the parameters.
+ // 'parent_dir' is used in partition split for get_child_dir().
+ replica *new_replica(gpid gpid,
+ const app_info &app,
+ bool restore_if_necessary,
+ bool is_duplication_follower,
+ const std::string &parent_dir = "");
+ // Load an existing replica from 'dir'.
+ replica *load_replica(const char *dir);
+ // Clean up the memory state and on disk data if creating replica failed.
+ void clear_on_failure(replica *rep, const std::string &path, const gpid &pid);
task_ptr begin_close_replica(replica_ptr r);
void close_replica(replica_ptr r);
void notify_replica_state_update(const replica_configuration &config, bool is_closing);
@@ -338,6 +349,7 @@ private:
friend class replica_follower;
friend class replica_follower_test;
friend class replica_http_service_test;
+ FRIEND_TEST(replica_test, test_clear_on_failer);
typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp
index 281732246..e08fdd77d 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -413,15 +413,15 @@ error_code replication_app_base::apply_mutation(const mutation *mu)
if (perror != 0) {
LOG_ERROR_PREFIX("mutation {}: get internal error {}", mu->name(), perror);
- // for normal write requests, if got rocksdb error, this replica will be set error and evoke
- // learn for ingestion requests, should not do as normal write requests, there are two
- // reasons:
- // 1. all ingestion errors should be handled by meta server in function
- // `on_partition_ingestion_reply`, rocksdb error will be returned to meta server in
- // structure `ingestion_response`, not in this function
- // 2. if replica apply ingestion mutation during learn, it may got error from rocksdb,
- // because the external sst files may not exist, in this case, we won't consider it as an
- // error
+ // For normal write requests, if got rocksdb error, this replica will be set error and evoke
+ // learn.
+ // For ingestion requests, should not do as normal write requests, there are two reasons:
+ // 1. All ingestion errors should be handled by meta server in function
+ // `on_partition_ingestion_reply`, rocksdb error will be returned to meta server in
+ // structure `ingestion_response`, not in this function.
+ // 2. If replica apply ingestion mutation during learn, it may get error from rocksdb,
+ // because the external sst files may not exist, in this case, we won't consider it as
+ // an error.
if (!has_ingestion_request) {
return ERR_LOCAL_APP_FAILURE;
}
diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp
index 545dff489..f1155155d 100644
--- a/src/replica/test/replica_test.cpp
+++ b/src/replica/test/replica_test.cpp
@@ -168,14 +168,6 @@ public:
bool is_checkpointing() { return _mock_replica->_is_manual_emergency_checkpointing; }
- replica *call_clear_on_failure(replica_stub *stub,
- replica *rep,
- const std::string &path,
- const gpid &gpid)
- {
- return replica::clear_on_failure(stub, rep, path, gpid);
- }
-
bool has_gpid(gpid &gpid) const
{
for (const auto &node : stub->_fs_manager._dir_nodes) {
@@ -445,7 +437,7 @@ TEST_F(replica_test, test_clear_on_failer)
ASSERT_TRUE(dsn::utils::filesystem::path_exists(path));
ASSERT_TRUE(has_gpid(pid));
- ASSERT_FALSE(call_clear_on_failure(stub.get(), rep, path, pid));
+ stub->clear_on_failure(rep, path, pid);
ASSERT_FALSE(dsn::utils::filesystem::path_exists(path));
ASSERT_FALSE(has_gpid(pid));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org