You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/01/18 07:44:03 UTC
[incubator-pegasus] branch master updated: fix(bulk_load): add ballot check before executing ingestion (#881)
This is an automated email from the ASF dual-hosted git repository.
yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new ead6c8a fix(bulk_load): add ballot check before executing ingestion (#881)
ead6c8a is described below
commit ead6c8a9f51373aa92aa83940b7b88469fccd6d0
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Tue Jan 18 15:42:32 2022 +0800
fix(bulk_load): add ballot check before executing ingestion (#881)
---
rdsn | 2 +-
src/server/pegasus_write_service.cpp | 14 ++++++++------
src/server/pegasus_write_service_impl.h | 20 ++++++++++++++++----
3 files changed, 25 insertions(+), 11 deletions(-)
diff --git a/rdsn b/rdsn
index 93fe4c4..0cb88e2 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 93fe4c4f4cfd8d85bf0925dccc398e3eec9a8999
+Subproject commit 0cb88e20076936da62bc245451d26fc354cbd4b2
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index 7658cad..48eaeb7 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -390,13 +390,15 @@ int pegasus_write_service::ingest_files(int64_t decree,
// ingest files asynchronously
_server->set_ingestion_status(dsn::replication::ingestion_status::IS_RUNNING);
dsn::tasking::enqueue(LPC_INGESTION, &_server->_tracker, [this, decree, req]() {
- dsn::error_code err =
- _impl->ingest_files(decree, _server->bulk_load_dir(), req.metadata, req.ingest_behind);
- if (err == dsn::ERR_OK) {
- _server->set_ingestion_status(dsn::replication::ingestion_status::IS_SUCCEED);
- } else {
- _server->set_ingestion_status(dsn::replication::ingestion_status::IS_FAILED);
+ const auto &err =
+ _impl->ingest_files(decree, _server->bulk_load_dir(), req, _server->get_ballot());
+ auto status = dsn::replication::ingestion_status::IS_SUCCEED;
+ if (err == dsn::ERR_INVALID_VERSION) {
+ status = dsn::replication::ingestion_status::IS_INVALID;
+ } else if (err != dsn::ERR_OK) {
+ status = dsn::replication::ingestion_status::IS_FAILED;
}
+ _server->set_ingestion_status(status);
});
return rocksdb::Status::kOk;
}
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index 6f96763..2f79953 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -480,23 +480,35 @@ public:
return 0;
}
+ // \return ERR_INVALID_VERSION: replay or commit out-date ingest request
// \return ERR_WRONG_CHECKSUM: verify files failed
// \return ERR_INGESTION_FAILED: rocksdb ingestion failed
// \return ERR_OK: rocksdb ingestion succeed
dsn::error_code ingest_files(const int64_t decree,
const std::string &bulk_load_dir,
- const dsn::replication::bulk_load_metadata &metadata,
- const bool ingest_behind)
+ const dsn::replication::ingestion_request &req,
+ const int64_t current_ballot)
{
+ const auto &req_ballot = req.ballot;
+
+ // if ballot updated, ignore this request
+ if (req_ballot < current_ballot) {
+ dwarn_replica("out-dated ingestion request, ballot changed, request({}) vs "
+ "current({}), ignore it",
+ req_ballot,
+ current_ballot);
+ return dsn::ERR_INVALID_VERSION;
+ }
+
// verify external files before ingestion
std::vector<std::string> sst_file_list;
- dsn::error_code err = get_external_files_path(bulk_load_dir, metadata, sst_file_list);
+ const auto &err = get_external_files_path(bulk_load_dir, req.metadata, sst_file_list);
if (err != dsn::ERR_OK) {
return err;
}
// ingest external files
- if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, ingest_behind) !=
+ if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, req.ingest_behind) !=
0)) {
return dsn::ERR_INGESTION_FAILED;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org