You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2016/07/18 17:01:54 UTC

mesos git commit: Consistency in persistent volumes between master and agent on failure.

Repository: mesos
Updated Branches:
  refs/heads/master f1f3851ef -> 8a8286ccd


Consistency in persistent volumes between master and agent on failure.

When the agent receives CheckpointedResourcesMessage, we store the
target checkpoint on disk. On successful create and destroy of
persistent volumes as a part of handling this messages, we commit
the checkpoint on the disk, and clear the target checkpoint.

However, in case of any failure we do not commit the checkpoint to
disk, and exit the agent. When the agent restarts and there is a
target checkpoint present on disk which differs from the committed
checkpoint, we retry to sync the target and committed checkpoint.
On success, we reregister the agent with the master, but in case it
fails, we do not commit the checkpoint and the agent exits.

Review: https://reviews.apache.org/r/48313/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8a8286cc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8a8286cc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8a8286cc

Branch: refs/heads/master
Commit: 8a8286ccda5b7883ace45662f5b16c525f506d2c
Parents: f1f3851
Author: Anindya Sinha <an...@apple.com>
Authored: Mon Jul 18 10:00:05 2016 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Mon Jul 18 10:00:05 2016 -0700

----------------------------------------------------------------------
 src/slave/paths.cpp |   8 +++
 src/slave/paths.hpp |  11 +++-
 src/slave/slave.cpp | 149 +++++++++++++++++++++++++++++++++++++----------
 src/slave/slave.hpp |   7 +++
 src/slave/state.cpp |  56 ++++++++++++++----
 src/slave/state.hpp |   6 ++
 6 files changed, 193 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8a8286cc/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index 03157f9..1e9f3d2 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -56,6 +56,7 @@ const char FORKED_PID_FILE[] = "forked.pid";
 const char TASK_INFO_FILE[] = "task.info";
 const char TASK_UPDATES_FILE[] = "task.updates";
 const char RESOURCES_INFO_FILE[] = "resources.info";
+const char RESOURCES_TARGET_FILE[] = "resources.target";
 
 
 const char SLAVES_DIR[] = "slaves";
@@ -437,6 +438,13 @@ string getResourcesInfoPath(
 }
 
 
+string getResourcesTargetPath(
+    const string& rootDir)
+{
+  return path::join(rootDir, "resources", RESOURCES_TARGET_FILE);
+}
+
+
 string getPersistentVolumePath(
     const string& rootDir,
     const string& role,

http://git-wip-us.apache.org/repos/asf/mesos/blob/8a8286cc/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index 339e539..ebff3a9 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -61,6 +61,10 @@ namespace paths {
 //   |                           |-- latest (symlink)
 //   |                           |-- <container_id> (sandbox)
 //   |-- meta
+//   |   |-- boot_id
+//   |   |-- resources
+//   |   |   |-- resources.info
+//   |   |   |-- resources.target
 //   |   |-- slaves
 //   |       |-- latest (symlink)
 //   |       |-- <slave_id>
@@ -83,9 +87,6 @@ namespace paths {
 //   |                                       |-- <task_id>
 //   |                                           |-- task.info
 //   |                                           |-- task.updates
-//   |-- boot_id
-//   |-- resources
-//   |   |-- resources.info
 //   |-- volumes
 //   |   |-- roles
 //   |       |-- <role>
@@ -283,6 +284,10 @@ std::string getResourcesInfoPath(
     const std::string& rootDir);
 
 
+std::string getResourcesTargetPath(
+    const std::string& rootDir);
+
+
 std::string getPersistentVolumePath(
     const std::string& rootDir,
     const std::string& role,

http://git-wip-us.apache.org/repos/asf/mesos/blob/8a8286cc/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 02982d5..dc619eb 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2520,11 +2520,53 @@ void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
   //      offers they get.
   Resources newCheckpointedResources = _checkpointedResources;
 
+  // Store the target checkpoint resources. We commit the checkpoint
+  // only after all operations are successful. If any of the operations
+  // fail, the agent exits and the update to checkpointed resources
+  // is re-attempted after the agent restarts before agent reregistration.
+  //
+  // Since we commit the checkpoint after all operations are successful,
+  // we avoid a case of inconsistency between the master and the agent if
+  // the agent restarts during handling of CheckpointResourcesMessage.
   CHECK_SOME(state::checkpoint(
-      paths::getResourcesInfoPath(metaDir),
+      paths::getResourcesTargetPath(metaDir),
       newCheckpointedResources))
-    << "Failed to checkpoint resources " << newCheckpointedResources;
+    << "Failed to checkpoint resources target " << newCheckpointedResources;
+
+  Try<Nothing> syncResult = syncCheckpointedResources(
+      newCheckpointedResources);
+
+  if (syncResult.isError()) {
+    // Exit the agent (without committing the checkpoint) on failure.
+    EXIT(EXIT_FAILURE)
+      << "Failed to sync checkpointed resources: "
+      << syncResult.error();
+  }
+
+  // Rename the target checkpoint to the committed checkpoint.
+  Try<Nothing> renameResult = os::rename(
+      paths::getResourcesTargetPath(metaDir),
+      paths::getResourcesInfoPath(metaDir));
+
+  if (renameResult.isError()) {
+    // Exit the agent since the checkpoint could not be committed.
+    EXIT(EXIT_FAILURE)
+      << "Failed to checkpoint resources " << newCheckpointedResources
+      << ": " << renameResult.error();
+  }
+
+  LOG(INFO) << "Updated checkpointed resources from "
+            << checkpointedResources << " to "
+            << newCheckpointedResources;
 
+  checkpointedResources = newCheckpointedResources;
+}
+
+
+Try<Nothing> Slave::syncCheckpointedResources(
+    const Resources& newCheckpointedResources)
+{
+  Resources oldVolumes = checkpointedResources.persistentVolumes();
   Resources newVolumes = newCheckpointedResources.persistentVolumes();
 
   // Create persistent volumes that do not already exist.
@@ -2537,13 +2579,24 @@ void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
     // This is validated in master.
     CHECK_NE(volume.role(), "*");
 
+    if (oldVolumes.contains(volume)) {
+      continue;
+    }
+
     string path = paths::getPersistentVolumePath(flags.work_dir, volume);
 
+    // If creation of persistent volume fails, the agent exits.
+    string volumeDescription = "persistent volume " +
+      volume.disk().persistence().id() + " at '" + path + "'";
+
     if (!os::exists(path)) {
-      CHECK_SOME(os::mkdir(path, true))
-        << "Failed to create persistent volume '"
-        << volume.disk().persistence().id()
-        << "' at '" << path << "'";
+      // If the directory does not exist, we should proceed only if the
+      // target directory is successfully created.
+      Try<Nothing> result = os::mkdir(path, true);
+      if (result.isError()) {
+        return Error("Failed to create the " +
+            volumeDescription + ": " + result.error());
+      }
     }
   }
 
@@ -2553,18 +2606,6 @@ void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
   // remove the filesystem objects for the removed volume. Note that
   // for MOUNT disks, we don't remove the root directory (mount point)
   // of the volume.
-  //
-  // TODO(neilc): There is a window during which the filesystem
-  // content for destroyed persistent volumes might be orphaned if we
-  // crash after checkpointing the new resources to disk but before we
-  // finish removing the associated filesystem objects. Particularly
-  // for MOUNT disks, this might result in exposing data from a
-  // previous persistent volume on the disk to a framework that
-  // creates a new volume. We might address this by doing a "cleanup"
-  // operation to remove data from MOUNT disks before creating
-  // persistent volumes on them.
-  Resources oldVolumes = checkpointedResources.persistentVolumes();
-
   foreach (const Resource& volume, oldVolumes) {
     if (newVolumes.contains(volume)) {
       continue;
@@ -2588,20 +2629,19 @@ void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
         removeRoot = false;
       }
 
+      // We should proceed only if the directory is removed.
       Try<Nothing> result = os::rmdir(path, true, removeRoot);
+
       if (result.isError()) {
-        LOG(ERROR) << "Failed to remove persistent volume '"
-                   << volume.disk().persistence().id()
-                   << "' at '" << path << "'";
+        return Error(
+            "Failed to remove persistent volume '" +
+            stringify(volume.disk().persistence().id()) +
+            "' at '" + path + "': " + result.error());
       }
     }
   }
 
-  LOG(INFO) << "Updated checkpointed resources from "
-            << checkpointedResources << " to "
-            << newCheckpointedResources;
-
-  checkpointedResources = newCheckpointedResources;
+  return Nothing();
 }
 
 
@@ -4700,23 +4740,70 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
       metrics.recovery_errors += resourcesState.get().errors;
     }
 
+    checkpointedResources = resourcesState.get().resources;
+
+    if (resourcesState.get().target.isSome()) {
+      Resources targetResources = resourcesState.get().target.get();
+
+      // If targetResources does not match with resources, then we attempt
+      // to sync the checkpointed resources from the target. If
+      // there is any failure, the checkpoint is not committed and the
+      // agent exits. In that case, sync of checkpoints will be reattempted
+      // on the next agent restart (before agent reregistration).
+      if (checkpointedResources != targetResources) {
+        Try<Nothing> syncResult = syncCheckpointedResources(targetResources);
+
+        if (syncResult.isError()) {
+          return Failure(
+              "Target checkpointed resources " +
+              stringify(targetResources) +
+              " failed to sync from current checkpointed resources " +
+              stringify(checkpointedResources) + ": " +
+              syncResult.error());
+        }
+
+        // Rename the target checkpoint to the committed checkpoint.
+        Try<Nothing> renameResult = os::rename(
+            paths::getResourcesTargetPath(metaDir),
+            paths::getResourcesInfoPath(metaDir));
+
+        if (renameResult.isError()) {
+          return Failure(
+              "Failed to checkpoint resources " +
+              stringify(targetResources) + ": " +
+              renameResult.error());
+        }
+
+        // Since we synced the target resources to the committed resources, we
+        // check resource compatibility with `--resources` command line flag
+        // based on the committed checkpoint.
+        checkpointedResources = targetResources;
+      } else {
+        // Remove the target checkpoint resources that are no longer needed
+        // since they are same as the committed checkpointed resources.
+        Try<Nothing> rmResult = os::rm(paths::getResourcesTargetPath(metaDir));
+
+        if (rmResult.isError()) {
+          LOG(ERROR) << "Failed to clean up target checkpointed resources: "
+                     << rmResult.error();
+        }
+      }
+    }
+
     // This is to verify that the checkpointed resources are
     // compatible with the slave resources specified through the
     // '--resources' command line flag.
     Try<Resources> totalResources = applyCheckpointedResources(
-        info.resources(),
-        resourcesState.get().resources);
+        info.resources(), checkpointedResources);
 
     if (totalResources.isError()) {
       return Failure(
           "Checkpointed resources " +
-          stringify(resourcesState.get().resources) +
+          stringify(checkpointedResources) +
           " are incompatible with agent resources " +
           stringify(info.resources()) + ": " +
           totalResources.error());
     }
-
-    checkpointedResources = resourcesState.get().resources;
   }
 
   if (slaveState.isSome() && slaveState.get().info.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8a8286cc/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 9864cf4..ffe4220 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -421,6 +421,13 @@ private:
   // exited.
   void _shutdownExecutor(Framework* framework, Executor* executor);
 
+  // Process creation of persistent volumes (for CREATE) and/or deletion
+  // of persistent volumes (for DESTROY) as a part of handling
+  // checkpointed resources, and commit the checkpointed resources on
+  // successful completion of all the operations.
+  Try<Nothing> syncCheckpointedResources(
+      const Resources& newCheckpointedResources);
+
   process::Future<bool> authorizeLogAccess(
       const Option<std::string>& principal);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8a8286cc/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 9cec086..91dee11 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -692,12 +692,49 @@ Try<ResourcesState> ResourcesState::recover(
 {
   ResourcesState state;
 
-  const string& path = paths::getResourcesInfoPath(rootDir);
-  if (!os::exists(path)) {
-    LOG(INFO) << "No checkpointed resources found at '" << path << "'";
+  // Process the committed resources.
+  const string& infoPath = paths::getResourcesInfoPath(rootDir);
+  if (!os::exists(infoPath)) {
+    LOG(INFO) << "No committed checkpointed resources found at '"
+              << infoPath << "'";
+    return state;
+  }
+
+  Try<Resources> resources = ResourcesState::recoverResources(
+      infoPath, strict, state.errors);
+
+  if (resources.isError()) {
+    return Error(resources.error());
+  }
+
+  state.resources = resources.get();
+
+  // Process the target resources.
+  const string& targetPath = paths::getResourcesTargetPath(rootDir);
+  if (!os::exists(targetPath)) {
     return state;
   }
 
+  Try<Resources> target = ResourcesState::recoverResources(
+      targetPath, strict, state.errors);
+
+  if (target.isError()) {
+    return Error(target.error());
+  }
+
+  state.target = target.get();
+
+  return state;
+}
+
+
+Try<Resources> ResourcesState::recoverResources(
+    const string& path,
+    bool strict,
+    unsigned int& errors)
+{
+  Resources resources;
+
   Try<int> fd = os::open(path, O_RDWR | O_CLOEXEC);
   if (fd.isError()) {
     string message =
@@ -707,8 +744,8 @@ Try<ResourcesState> ResourcesState::recover(
       return Error(message);
     } else {
       LOG(WARNING) << message;
-      state.errors++;
-      return state;
+      errors++;
+      return resources;
     }
   }
 
@@ -721,7 +758,7 @@ Try<ResourcesState> ResourcesState::recover(
       break;
     }
 
-    state.resources += resource.get();
+    resources += resource.get();
   }
 
   off_t offset = lseek(fd.get(), 0, SEEK_CUR);
@@ -755,17 +792,16 @@ Try<ResourcesState> ResourcesState::recover(
       return Error(message);
     } else {
       LOG(WARNING) << message;
-      state.errors++;
-      return state;
+      errors++;
+      return resources;
     }
   }
 
   os::close(fd.get());
 
-  return state;
+  return resources;
 }
 
-
 } // namespace state {
 } // namespace slave {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8a8286cc/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 0de2a4e..a1c8496 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -274,7 +274,13 @@ struct ResourcesState
       const std::string& rootDir,
       bool strict);
 
+  static Try<Resources> recoverResources(
+      const std::string& path,
+      bool strict,
+      unsigned int& errors);
+
   Resources resources;
+  Option<Resources> target;
   unsigned int errors;
 };