You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/10/31 18:28:50 UTC

[mesos] 03/04: Synced SLRP checkpoints to the filesystem.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d48ec80a45ec824e53bf2de9f3b74793b754ac29
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Fri Oct 12 15:12:41 2018 -0700

    Synced SLRP checkpoints to the filesystem.
    
    Currently if a system crashes, SLRP checkpoints might not be synced to
    the filesystem, so it is possible that an old or empty checkpoint will
    be read upon recovery. Moreover, if a CSI call has been issued right
    before the crash, the recovered state may be inconsistent with the
    actual state reported by the plugin. For example, the plugin might have
    created a volume but the checkpointed state does not know about it.
    
    To avoid this inconsistency, we always call fsync()  when checkpointing
    SLRP states.
    
    Review: https://reviews.apache.org/r/69010
---
 src/resource_provider/storage/provider.cpp | 102 ++++++++++++++++++-----------
 src/slave/state.hpp                        |  34 ++++++----
 2 files changed, 83 insertions(+), 53 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 7099a76..5f755bf 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -21,6 +21,7 @@
 #include <memory>
 #include <numeric>
 #include <utility>
+#include <vector>
 
 #include <glog/logging.h>
 
@@ -28,6 +29,7 @@
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
+#include <process/dispatch.hpp>
 #include <process/id.hpp>
 #include <process/loop.hpp>
 #include <process/process.hpp>
@@ -88,23 +90,24 @@ using std::shared_ptr;
 using std::string;
 using std::vector;
 
+using process::after;
+using process::await;
 using process::Break;
+using process::collect;
 using process::Continue;
 using process::ControlFlow;
+using process::defer;
+using process::delay;
+using process::dispatch;
 using process::Failure;
 using process::Future;
+using process::loop;
 using process::Owned;
 using process::Process;
 using process::Promise;
 using process::Sequence;
-using process::Timeout;
-
-using process::after;
-using process::await;
-using process::collect;
-using process::defer;
-using process::loop;
 using process::spawn;
+using process::Timeout;
 
 using process::http::authentication::Principal;
 
@@ -425,6 +428,8 @@ private:
       const id::UUID& operationUuid,
       const Try<vector<ResourceConversion>>& conversions);
 
+  void garbageCollectOperationPath(const id::UUID& operationUuid);
+
   void checkpointResourceProviderState();
   void checkpointVolumeState(const string& volumeId);
 
@@ -1150,7 +1155,16 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
           uuid.error());
     }
 
-    CHECK(operations.contains(uuid.get()));
+    // NOTE: This could happen if we failed to remove the operation path before.
+    if (!operations.contains(uuid.get())) {
+      LOG(WARNING)
+        << "Ignoring unknown operation (uuid: " << uuid.get()
+        << ") for resource provider " << info.id();
+
+      garbageCollectOperationPath(uuid.get());
+      continue;
+    }
+
     operationUuids.emplace_back(std::move(uuid.get()));
   }
 
@@ -1161,27 +1175,23 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
       using StreamState =
         typename OperationStatusUpdateManagerState::StreamState;
 
-      // Clean up the operations that are terminated.
+      // Clean up the operations that are completed.
+      vector<id::UUID> completedOperations;
       foreachpair (const id::UUID& uuid,
                    const Option<StreamState>& stream,
                    statusUpdateManagerState.streams) {
         if (stream.isSome() && stream->terminated) {
           operations.erase(uuid);
-
-          // Garbage collect the operation metadata.
-          const string path = slave::paths::getOperationPath(
-              slave::paths::getResourceProviderPath(
-                  metaDir, slaveId, info.type(), info.name(), info.id()),
-              uuid);
-
-          Try<Nothing> rmdir = os::rmdir(path);
-          if (rmdir.isError()) {
-            return Failure(
-                "Failed to remove directory '" + path + "': " + rmdir.error());
-          }
+          completedOperations.push_back(uuid);
         }
       }
 
+      // Garbage collect the operation streams after checkpointing.
+      checkpointResourceProviderState();
+      foreach (const id::UUID& uuid, completedOperations) {
+        garbageCollectOperationPath(uuid);
+      }
+
       // Send updates for all missing statuses.
       foreachpair (const id::UUID& uuid,
                    const Operation& operation,
@@ -1786,25 +1796,11 @@ void StorageLocalResourceProviderProcess::acknowledgeOperationStatus(
   // acknowledgement will be received. In this case, the following call
   // will fail, so we just leave an error log.
   statusUpdateManager.acknowledgement(operationUuid.get(), statusUuid.get())
-    .then(defer(self(), [=](bool continuation) -> Future<Nothing> {
+    .then(defer(self(), [=](bool continuation) {
       if (!continuation) {
         operations.erase(operationUuid.get());
-
-        // Garbage collect the operation metadata.
-        const string path = slave::paths::getOperationPath(
-            slave::paths::getResourceProviderPath(
-                metaDir, slaveId, info.type(), info.name(), info.id()),
-            operationUuid.get());
-
-        // NOTE: We check if the path exists since we do not checkpoint
-        // some status updates, such as OPERATION_DROPPED.
-        if (os::exists(path)) {
-          Try<Nothing> rmdir = os::rmdir(path);
-          if (rmdir.isError()) {
-            return Failure(
-                "Failed to remove directory '" + path + "': " + rmdir.error());
-          }
-        }
+        checkpointResourceProviderState();
+        garbageCollectOperationPath(operationUuid.get());
       }
 
       return Nothing();
@@ -3432,6 +3428,28 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
 }
 
 
+void StorageLocalResourceProviderProcess::garbageCollectOperationPath(
+    const id::UUID& operationUuid)
+{
+  CHECK(!operations.contains(operationUuid));
+
+  const string path = slave::paths::getOperationPath(
+      slave::paths::getResourceProviderPath(
+          metaDir, slaveId, info.type(), info.name(), info.id()),
+      operationUuid);
+
+  // NOTE: We check if the path exists since we do not checkpoint some status
+  // updates, such as OPERATION_DROPPED.
+  if (os::exists(path)) {
+    Try<Nothing> rmdir =  os::rmdir(path);
+    if (rmdir.isError()) {
+      LOG(ERROR)
+        << "Failed to remove directory '" << path << "': " << rmdir.error();
+    }
+  }
+}
+
+
 void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 {
   ResourceProviderState state;
@@ -3472,7 +3490,9 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
   const string statePath = slave::paths::getResourceProviderStatePath(
       metaDir, slaveId, info.type(), info.name(), info.id());
 
-  Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state);
+  // NOTE: We ensure the checkpoint is synced to the filesystem to avoid
+  // resulting in a stale or empty checkpoint when a system crash happens.
+  Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state, true);
   CHECK_SOME(checkpoint)
     << "Failed to checkpoint resource provider state to '" << statePath << "': "
     << checkpoint.error();
@@ -3488,8 +3508,10 @@ void StorageLocalResourceProviderProcess::checkpointVolumeState(
       info.storage().plugin().name(),
       volumeId);
 
+  // NOTE: We ensure the checkpoint is synced to the filesystem to avoid
+  // resulting in a stale or empty checkpoint when a system crash happens.
   Try<Nothing> checkpoint =
-    slave::state::checkpoint(statePath, volumes.at(volumeId).state);
+    slave::state::checkpoint(statePath, volumes.at(volumeId).state, true);
 
   CHECK_SOME(checkpoint)
     << "Failed to checkpoint volume state to '" << statePath << "':"
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 003211e..4f3d4ce 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -122,9 +122,10 @@ namespace internal {
 
 inline Try<Nothing> checkpoint(
     const std::string& path,
-    const std::string& message)
+    const std::string& message,
+    bool sync)
 {
-  return ::os::write(path, message);
+  return ::os::write(path, message, sync);
 }
 
 
@@ -133,7 +134,7 @@ template <
     typename std::enable_if<
         std::is_convertible<T*, google::protobuf::Message*>::value,
         int>::type = 0>
-inline Try<Nothing> checkpoint(const std::string& path, T message)
+inline Try<Nothing> checkpoint(const std::string& path, T message, bool sync)
 {
   // If the `Try` from `downgradeResources` returns an `Error`, we currently
   // continue to checkpoint the resources in a partially downgraded state.
@@ -144,13 +145,14 @@ inline Try<Nothing> checkpoint(const std::string& path, T message)
   // TODO(mpark): Do something smarter with the result once
   // something like an agent recovery capability is introduced.
   downgradeResources(&message);
-  return ::protobuf::write(path, message);
+  return ::protobuf::write(path, message, sync);
 }
 
 
 inline Try<Nothing> checkpoint(
     const std::string& path,
-    google::protobuf::RepeatedPtrField<Resource> resources)
+    google::protobuf::RepeatedPtrField<Resource> resources,
+    bool sync)
 {
   // If the `Try` from `downgradeResources` returns an `Error`, we currently
   // continue to checkpoint the resources in a partially downgraded state.
@@ -161,16 +163,17 @@ inline Try<Nothing> checkpoint(
   // TODO(mpark): Do something smarter with the result once
   // something like an agent recovery capability is introduced.
   downgradeResources(&resources);
-  return ::protobuf::write(path, resources);
+  return ::protobuf::write(path, resources, sync);
 }
 
 
 inline Try<Nothing> checkpoint(
     const std::string& path,
-    const Resources& resources)
+    const Resources& resources,
+    bool sync)
 {
   const google::protobuf::RepeatedPtrField<Resource>& messages = resources;
-  return checkpoint(path, messages);
+  return checkpoint(path, messages, sync);
 }
 
 }  // namespace internal {
@@ -187,14 +190,19 @@ inline Try<Nothing> checkpoint(
 //
 // NOTE: We provide atomic (all-or-nothing) semantics here by always
 // writing to a temporary file first then using os::rename to atomically
-// move it to the desired path.
+// move it to the desired path. If `sync` is set to true, this call succeeds
+// only if `fsync` is supported and successfully commits the changes to the
+// filesystem for the checkpoint file and each created directory.
+//
+// TODO(chhsiao): Consider enabling syncing by default after evaluating its
+// performance impact.
 template <typename T>
-Try<Nothing> checkpoint(const std::string& path, const T& t)
+Try<Nothing> checkpoint(const std::string& path, const T& t, bool sync = false)
 {
   // Create the base directory.
   std::string base = Path(path).dirname();
 
-  Try<Nothing> mkdir = os::mkdir(base);
+  Try<Nothing> mkdir = os::mkdir(base, true, sync);
   if (mkdir.isError()) {
     return Error("Failed to create directory '" + base + "': " + mkdir.error());
   }
@@ -211,7 +219,7 @@ Try<Nothing> checkpoint(const std::string& path, const T& t)
   }
 
   // Now checkpoint the instance of T to the temporary file.
-  Try<Nothing> checkpoint = internal::checkpoint(temp.get(), t);
+  Try<Nothing> checkpoint = internal::checkpoint(temp.get(), t, sync);
   if (checkpoint.isError()) {
     // Try removing the temporary file on error.
     os::rm(temp.get());
@@ -221,7 +229,7 @@ Try<Nothing> checkpoint(const std::string& path, const T& t)
   }
 
   // Rename the temporary file to the path.
-  Try<Nothing> rename = os::rename(temp.get(), path);
+  Try<Nothing> rename = os::rename(temp.get(), path, sync);
   if (rename.isError()) {
     // Try removing the temporary file on error.
     os::rm(temp.get());