You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pegasus.apache.org by GitBox <gi...@apache.org> on 2022/08/24 03:26:44 UTC

[GitHub] [incubator-pegasus] foreverneverer commented on a diff in pull request #1129: feat(backup): 6. meta handle backup_response during checkpointing

foreverneverer commented on code in PR #1129:
URL: https://github.com/apache/incubator-pegasus/pull/1129#discussion_r953286428


##########
src/rdsn/src/meta/meta_backup_engine.cpp:
##########
@@ -179,6 +179,86 @@ void meta_backup_engine::backup_app_partition(const gpid &pid)
         });
 }
 
+// ThreadPool: THREAD_POOL_DEFAULT
+void meta_backup_engine::on_backup_reply(const error_code err,
+                                         const backup_response &response,
+                                         const gpid &pid,
+                                         const rpc_address &primary)
+{
+    {
+        zauto_read_lock l(_lock);
+        if (_is_backup_failed) {
+            derror_f("partition[{}] handle backup failed", pid);
+            return;
+        }
+
+        // TODO(heyuchen): check if backup canceled
+    }
+
+    auto rep_error = err == ERR_OK ? response.err : err;
+    if (rep_error != ERR_OK) {
+        derror_f(
+            "backup_id({}): receive backup response for partition {} from server {}, error = {}",
+            _cur_backup.backup_id,
+            pid.to_string(),
+            primary.to_string(),
+            rep_error);
+        handle_replica_backup_failed(pid.get_app_id());
+        return;
+    }
+
+    if (response.backup_id != _cur_backup.backup_id) {
+        dwarn_f("backup_id({}): receive outdated backup response(backup_id={}) for partition {} "
+                "from server {}, ignore it",
+                _cur_backup.backup_id,
+                response.backup_id,
+                pid.to_string(),
+                primary.to_string());
+        retry_backup(pid);
+        return;
+    }
+
+    if (response.__isset.checkpoint_upload_err) {
+        auto type = response.status == backup_status::UPLOADING ? "upload" : "checkpoint";

Review Comment:
   `enum` seem can be converted to `string`? 



##########
src/rdsn/src/meta/meta_backup_engine.cpp:
##########
@@ -179,6 +179,86 @@ void meta_backup_engine::backup_app_partition(const gpid &pid)
         });
 }
 
+// ThreadPool: THREAD_POOL_DEFAULT
+void meta_backup_engine::on_backup_reply(const error_code err,
+                                         const backup_response &response,
+                                         const gpid &pid,
+                                         const rpc_address &primary)
+{
+    {
+        zauto_read_lock l(_lock);
+        if (_is_backup_failed) {
+            derror_f("partition[{}] handle backup failed", pid);
+            return;
+        }
+
+        // TODO(heyuchen): check if backup canceled
+    }
+
+    auto rep_error = err == ERR_OK ? response.err : err;
+    if (rep_error != ERR_OK) {
+        derror_f(
+            "backup_id({}): receive backup response for partition {} from server {}, error = {}",
+            _cur_backup.backup_id,
+            pid.to_string(),
+            primary.to_string(),
+            rep_error);
+        handle_replica_backup_failed(pid.get_app_id());
+        return;
+    }
+
+    if (response.backup_id != _cur_backup.backup_id) {
+        dwarn_f("backup_id({}): receive outdated backup response(backup_id={}) for partition {} "
+                "from server {}, ignore it",
+                _cur_backup.backup_id,
+                response.backup_id,
+                pid.to_string(),
+                primary.to_string());
+        retry_backup(pid);
+        return;
+    }
+
+    if (response.__isset.checkpoint_upload_err) {
+        auto type = response.status == backup_status::UPLOADING ? "upload" : "checkpoint";
+        derror_f("backup_id({}): receive backup response for partition {} from server {}, meet {} "
+                 "error = {}",
+                 _cur_backup.backup_id,
+                 pid.to_string(),
+                 primary.to_string(),
+                 type,
+                 response.checkpoint_upload_err);
+        handle_replica_backup_failed(pid.get_app_id());
+        retry_backup(pid);
+        return;
+    }
+
+    if (response.status == backup_status::CHECKPOINTED) {
+        ddebug_f("backup_id({}): backup for partition {} from server {} finish checkpoint",
+                 _cur_backup.backup_id,
+                 pid.to_string(),
+                 primary.to_string());
+        {
+            zauto_write_lock l(_lock);
+            _backup_status[pid.get_partition_index()] = backup_status::UPLOADING;
+        }
+        if (check_partition_backup_status(backup_status::UPLOADING)) {

Review Comment:
   so just `uploading` status storage zk?



##########
src/rdsn/src/meta/meta_backup_engine.cpp:
##########
@@ -179,6 +179,86 @@ void meta_backup_engine::backup_app_partition(const gpid &pid)
         });
 }
 
+// ThreadPool: THREAD_POOL_DEFAULT
+void meta_backup_engine::on_backup_reply(const error_code err,
+                                         const backup_response &response,
+                                         const gpid &pid,
+                                         const rpc_address &primary)
+{
+    {
+        zauto_read_lock l(_lock);
+        if (_is_backup_failed) {
+            derror_f("partition[{}] handle backup failed", pid);
+            return;
+        }
+
+        // TODO(heyuchen): check if backup canceled
+    }
+
+    auto rep_error = err == ERR_OK ? response.err : err;
+    if (rep_error != ERR_OK) {
+        derror_f(
+            "backup_id({}): receive backup response for partition {} from server {}, error = {}",
+            _cur_backup.backup_id,
+            pid.to_string(),
+            primary.to_string(),
+            rep_error);
+        handle_replica_backup_failed(pid.get_app_id());
+        return;
+    }
+
+    if (response.backup_id != _cur_backup.backup_id) {
+        dwarn_f("backup_id({}): receive outdated backup response(backup_id={}) for partition {} "
+                "from server {}, ignore it",
+                _cur_backup.backup_id,
+                response.backup_id,
+                pid.to_string(),
+                primary.to_string());
+        retry_backup(pid);
+        return;
+    }
+
+    if (response.__isset.checkpoint_upload_err) {

Review Comment:
   is `response.__isset.checkpoint_upload_err` always  `true`



##########
src/rdsn/src/meta/meta_backup_engine.h:
##########
@@ -119,6 +124,17 @@ class meta_backup_engine
 
     void update_backup_item_on_remote_storage(backup_status::type new_status, int64_t end_time = 0);
 
+    bool check_partition_backup_status(backup_status::type expected_status) const
+    {

Review Comment:
   consider use `std::all_of`?



##########
src/rdsn/src/meta/meta_backup_engine.cpp:
##########
@@ -179,6 +179,86 @@ void meta_backup_engine::backup_app_partition(const gpid &pid)
         });
 }
 
+// ThreadPool: THREAD_POOL_DEFAULT
+void meta_backup_engine::on_backup_reply(const error_code err,
+                                         const backup_response &response,
+                                         const gpid &pid,
+                                         const rpc_address &primary)
+{
+    {
+        zauto_read_lock l(_lock);
+        if (_is_backup_failed) {

Review Comment:
   Is there a status of BACKUP_FAILED?  Why do we still need a flag variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pegasus.apache.org
For additional commands, e-mail: dev-help@pegasus.apache.org