You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2019/08/25 03:59:01 UTC

[mesos] 02/02: Fixed recovery of agent resources and operations after crash.

This is an automated email from the ASF dual-hosted git repository.

jpeach pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 75a93fde264d2a591ec9c024354908e85012c271
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Sat Aug 24 18:29:28 2019 -0700

    Fixed recovery of agent resources and operations after crash.
    
    Fixes an issue where the agent may incorrectly send an
    OPERATION_FINISHED update for a failed offer operation
    following agent failover and recovery.
    
    The agent previously relied on the difference between the
    set of checkpointed operations and the set of operation
    IDs recovered from the operation status update manager to
    apply any operations which had not been applied due to an
    ill-timed agent failover.
    
    However, this approach did not work in the case where a
    persistent volume failed to be successfully created by
    syncCheckpointedResources(). In order to handle this
    case, this patch changes the agent code to continue with
    the old approach of a two-phase-commit of persistent
    volumes to disk, where the agent will fail to complete
    recovery if syncCheckpointedResources() cannot be
    executed successfully after failover.
    
    Review: https://reviews.apache.org/r/71285/
---
 src/slave/paths.cpp |  7 ++++++
 src/slave/paths.hpp |  4 +++
 src/slave/slave.cpp | 61 +++++++++++++++++++++++++++++++++++++++------
 src/slave/state.cpp | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 4 files changed, 134 insertions(+), 9 deletions(-)

diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index 28a7cf9..5afcc7e 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -66,6 +66,7 @@ const char FORKED_PID_FILE[] = "forked.pid";
 const char TASK_INFO_FILE[] = "task.info";
 const char TASK_UPDATES_FILE[] = "task.updates";
 const char RESOURCE_STATE_FILE[] = "resources_and_operations.state";
+const char RESOURCE_STATE_TARGET_FILE[] = "resources_and_operations.target";
 const char RESOURCES_INFO_FILE[] = "resources.info";
 const char RESOURCES_TARGET_FILE[] = "resources.target";
 const char RESOURCE_PROVIDER_STATE_FILE[] = "resource_provider.state";
@@ -645,6 +646,12 @@ string getResourceStatePath(const string& rootDir)
 }
 
 
+string getResourceStateTargetPath(const string& rootDir)
+{
+  return path::join(rootDir, "resources", RESOURCE_STATE_TARGET_FILE);
+}
+
+
 string getResourcesInfoPath(
     const string& rootDir)
 {
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index e077587..d7fcb13 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -415,6 +415,10 @@ std::string getResourceStatePath(
     const std::string& rootDir);
 
 
+std::string getResourceStateTargetPath(
+    const std::string& rootDir);
+
+
 std::string getResourcesInfoPath(
     const std::string& rootDir);
 
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1d0ec9d..882040d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -4381,22 +4381,34 @@ void Slave::checkpointResourceState(
     totalResources = _totalResources.get();
   }
 
-  // Store the target checkpoint resources. We commit the checkpoint
-  // only after all operations are successful. If any of the operations
-  // fail, the agent exits and the update to checkpointed resources
+  // Store the target checkpoint resources. We commit the checkpoint by renaming
+  // the target file only after all operations are successful. If any of the
+  // operations fail, the agent exits and the update to checkpointed resources
   // is re-attempted after the agent restarts before agent reregistration.
   //
   // Since we commit the checkpoint after all operations are successful,
   // we avoid a case of inconsistency between the master and the agent if
-  // the agent restarts during handling of CheckpointResourcesMessage.
+  // the agent restarts during handling of `CheckpointResourcesMessage`.
+  //
+  // NOTE: Since the addition of operation feedback on the agent, the resources
+  // are checkpointed in two formats:
+  // 1) Pre-operation-feedback, where only resources are written to a target
+  //    file, then moved to the final checkpoint location once any persistent
+  //    volumes have been committed to disk.
+  // 2) Post-operation-feedback, where both resources and operations are written
+  //    to a target file, then moved to the final checkpoint location once any
+  //    persistent volumes have been committed to disk.
+  //
+  // Both of these formats continue to be written to disk in order to permit
+  // agent downgrades.
 
   CHECK_SOME(state::checkpoint(
-      paths::getResourceStatePath(metaDir),
+      paths::getResourceStateTargetPath(metaDir),
       resourceState,
       false,
       false))
     << "Failed to checkpoint resources " << resourceState.resources()
-    << " and operations " << resourceState.operations();
+    << " and operations " << resourceState.operations() << "to target file";
 
   if (resourcesToCheckpoint != checkpointedResources) {
     CHECK_SOME(state::checkpoint(
@@ -4432,6 +4444,22 @@ void Slave::checkpointResourceState(
     checkpointedResources = std::move(resourcesToCheckpoint);
   }
 
+  // At this point, `syncCheckpointedResources()` has ensured that any change in
+  // checkpointed resources (e.g. persistent volumes) is now reflected on disk.
+  // We rename the target resource state file to the actual resource state file,
+  // which is our source of truth for the current state of the agent resources.
+  Try<Nothing> renameResult = os::rename(
+      paths::getResourceStateTargetPath(metaDir),
+      paths::getResourceStatePath(metaDir));
+
+  if (renameResult.isError()) {
+    // Exit the agent since the checkpoint could not be committed.
+    EXIT(EXIT_FAILURE)
+      << "Failed to move target resources " << resourceState.resources()
+      << " and operations " << resourceState.operations()
+      << ": " << renameResult.error();
+  }
+
   if (operationsToCheckpoint != checkpointedOperations) {
     LOG(INFO) << "Updated checkpointed operations from "
               << checkpointedOperations.values() << " to "
@@ -7557,14 +7585,31 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
             syncResult.error());
       }
 
-      // Rename the target checkpoint to the committed checkpoint.
+      // At this point, `syncCheckpointedResources()` has ensured that any
+      // change in checkpointed resources (e.g. persistent volumes) is now
+      // reflected on disk. We rename the target resource state file to the
+      // actual resource state file, which is our source of truth for the
+      // current state of the agent resources.
       Try<Nothing> renameResult = os::rename(
+          paths::getResourceStateTargetPath(metaDir),
+          paths::getResourceStatePath(metaDir));
+
+      if (renameResult.isError()) {
+        return Failure(
+            "Failed to rename target resources " +
+            stringify(targetResources) + " and associated operations: " +
+            renameResult.error());
+      }
+
+      // The following rename call handles the pre-operation-feedback
+      // checkpoint format for backward compatibility.
+      renameResult = os::rename(
           paths::getResourcesTargetPath(metaDir),
           paths::getResourcesInfoPath(metaDir));
 
       if (renameResult.isError()) {
         return Failure(
-            "Failed to checkpoint resources " +
+            "Failed to rename target resources " +
             stringify(targetResources) + ": " +
             renameResult.error());
       }
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index cd3fac7..821132b 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -219,8 +219,39 @@ Try<SlaveState> SlaveState::recover(
     }
   }
 
+  // Operations might be checkpointed in either the target resource state file
+  // or in the final resource state checkpoint location. If the target file
+  // exists, then the agent must have crashed while attempting to sync those
+  // target resources to disk. Since agent recovery guarantees that the agent
+  // will not successfully recover until the target resources are synced to disk
+  // and moved to the final checkpoint location, here we can safely recover
+  // operations from the target file if it exists.
+
+  const string targetPath = paths::getResourceStateTargetPath(rootDir);
   const string resourceStatePath = paths::getResourceStatePath(rootDir);
-  if (os::exists(resourceStatePath)) {
+
+  if (os::exists(targetPath)) {
+    Result<ResourceState> target = state::read<ResourceState>(targetPath);
+    if (target.isError()) {
+      string message = "Failed to read resources and operations target file '" +
+                       targetPath + "': " + target.error();
+
+      if (strict) {
+        return Error(message);
+      } else {
+        LOG(WARNING) << message;
+        state.errors++;
+        return state;
+      }
+    }
+
+    if (target.isSome()) {
+      state.operations = std::vector<Operation>();
+      foreach (const Operation& operation, target->operations()) {
+        state.operations->push_back(operation);
+      }
+    }
+  } else if (os::exists(resourceStatePath)) {
     Result<ResourceState> resourceState =
       state::read<ResourceState>(resourceStatePath);
     if (resourceState.isError()) {
@@ -778,8 +809,20 @@ Try<ResourcesState> ResourcesState::recover(
 {
   ResourcesState state;
 
+  // The checkpointed resources may exist in one of two different formats:
+  // 1) Pre-operation-feedback, where only resources are written to a target
+  //    file, then moved to the final checkpoint location once any persistent
+  //    volumes have been committed to disk.
+  // 2) Post-operation-feedback, where both resources and operations are written
+  //    to a target file, then moved to the final checkpoint location once any
+  //    persistent volumes have been committed to disk.
+  //
+  // The post-operation-feedback agent writes both of these formats to disk to
+  // enable agent downgrades.
+
   const string resourceStatePath = paths::getResourceStatePath(rootDir);
   if (os::exists(resourceStatePath)) {
+    // The post-operation-feedback format was detected.
     Result<ResourceState> resourceState =
       state::read<ResourceState>(resourceStatePath);
     if (resourceState.isError()) {
@@ -799,9 +842,35 @@ Try<ResourcesState> ResourcesState::recover(
       state.resources = resourceState->resources();
     }
 
+    const string targetPath = paths::getResourceStateTargetPath(rootDir);
+    if (!os::exists(targetPath)) {
+      return state;
+    }
+
+    Result<ResourceState> target = state::read<ResourceState>(targetPath);
+    if (target.isError()) {
+      string message =
+        "Failed to read resources and operations target file '" +
+        targetPath + "': " + target.error();
+
+      if (strict) {
+        return Error(message);
+      } else {
+        LOG(WARNING) << message;
+        state.errors++;
+        return state;
+      }
+    }
+
+    if (target.isSome()) {
+      state.target = target->resources();
+    }
+
     return state;
   }
 
+  // Falling back to the pre-operation-feedback format.
+
   LOG(INFO) << "No committed checkpointed resources and operations found at '"
             << resourceStatePath << "'";