You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by yu...@apache.org on 2022/01/18 07:44:03 UTC

[incubator-pegasus] branch master updated: fix(bulk_load): add ballot check before executing ingestion (#881)

This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new ead6c8a  fix(bulk_load): add ballot check before executing ingestion (#881)
ead6c8a is described below

commit ead6c8a9f51373aa92aa83940b7b88469fccd6d0
Author: HeYuchen <he...@xiaomi.com>
AuthorDate: Tue Jan 18 15:42:32 2022 +0800

    fix(bulk_load): add ballot check before executing ingestion (#881)
---
 rdsn                                    |  2 +-
 src/server/pegasus_write_service.cpp    | 14 ++++++++------
 src/server/pegasus_write_service_impl.h | 20 ++++++++++++++++----
 3 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/rdsn b/rdsn
index 93fe4c4..0cb88e2 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 93fe4c4f4cfd8d85bf0925dccc398e3eec9a8999
+Subproject commit 0cb88e20076936da62bc245451d26fc354cbd4b2
diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp
index 7658cad..48eaeb7 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -390,13 +390,15 @@ int pegasus_write_service::ingest_files(int64_t decree,
     // ingest files asynchronously
     _server->set_ingestion_status(dsn::replication::ingestion_status::IS_RUNNING);
     dsn::tasking::enqueue(LPC_INGESTION, &_server->_tracker, [this, decree, req]() {
-        dsn::error_code err =
-            _impl->ingest_files(decree, _server->bulk_load_dir(), req.metadata, req.ingest_behind);
-        if (err == dsn::ERR_OK) {
-            _server->set_ingestion_status(dsn::replication::ingestion_status::IS_SUCCEED);
-        } else {
-            _server->set_ingestion_status(dsn::replication::ingestion_status::IS_FAILED);
+        const auto &err =
+            _impl->ingest_files(decree, _server->bulk_load_dir(), req, _server->get_ballot());
+        auto status = dsn::replication::ingestion_status::IS_SUCCEED;
+        if (err == dsn::ERR_INVALID_VERSION) {
+            status = dsn::replication::ingestion_status::IS_INVALID;
+        } else if (err != dsn::ERR_OK) {
+            status = dsn::replication::ingestion_status::IS_FAILED;
         }
+        _server->set_ingestion_status(status);
     });
     return rocksdb::Status::kOk;
 }
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index 6f96763..2f79953 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -480,23 +480,35 @@ public:
         return 0;
     }
 
+    // \return ERR_INVALID_VERSION: replay or commit out-date ingest request
     // \return ERR_WRONG_CHECKSUM: verify files failed
     // \return ERR_INGESTION_FAILED: rocksdb ingestion failed
     // \return ERR_OK: rocksdb ingestion succeed
     dsn::error_code ingest_files(const int64_t decree,
                                  const std::string &bulk_load_dir,
-                                 const dsn::replication::bulk_load_metadata &metadata,
-                                 const bool ingest_behind)
+                                 const dsn::replication::ingestion_request &req,
+                                 const int64_t current_ballot)
     {
+        const auto &req_ballot = req.ballot;
+
+        // if ballot updated, ignore this request
+        if (req_ballot < current_ballot) {
+            dwarn_replica("out-dated ingestion request, ballot changed, request({}) vs "
+                          "current({}), ignore it",
+                          req_ballot,
+                          current_ballot);
+            return dsn::ERR_INVALID_VERSION;
+        }
+
         // verify external files before ingestion
         std::vector<std::string> sst_file_list;
-        dsn::error_code err = get_external_files_path(bulk_load_dir, metadata, sst_file_list);
+        const auto &err = get_external_files_path(bulk_load_dir, req.metadata, sst_file_list);
         if (err != dsn::ERR_OK) {
             return err;
         }
 
         // ingest external files
-        if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, ingest_behind) !=
+        if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, req.ingest_behind) !=
                          0)) {
             return dsn::ERR_INGESTION_FAILED;
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org