You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/10/31 18:28:21 UTC
[mesos] 03/06: Synced SLRP checkpoints to the filesystem.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 16bcf61231b6d14019b1d703d887c55b01b85aee
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Fri Oct 12 15:12:41 2018 -0700
Synced SLRP checkpoints to the filesystem.
Currently if a system crashes, SLRP checkpoints might not be synced to
the filesystem, so it is possible that an old or empty checkpoint will
be read upon recovery. Moreover, if a CSI call has been issued right
before the crash, the recovered state may be inconsistent with the
actual state reported by the plugin. For example, the plugin might have
created a volume but the checkpointed state does not know about it.
To avoid this inconsistency, we always call fsync() when checkpointing
SLRP states.
Review: https://reviews.apache.org/r/69010
---
src/resource_provider/storage/provider.cpp | 102 ++++++++++++++++++-----------
src/slave/state.hpp | 34 ++++++----
2 files changed, 83 insertions(+), 53 deletions(-)
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index db783b5..025b13b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -21,6 +21,7 @@
#include <memory>
#include <numeric>
#include <utility>
+#include <vector>
#include <glog/logging.h>
@@ -28,6 +29,7 @@
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
+#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/process.hpp>
@@ -88,23 +90,24 @@ using std::shared_ptr;
using std::string;
using std::vector;
+using process::after;
+using process::await;
using process::Break;
+using process::collect;
using process::Continue;
using process::ControlFlow;
+using process::defer;
+using process::delay;
+using process::dispatch;
using process::Failure;
using process::Future;
+using process::loop;
using process::Owned;
using process::Process;
using process::Promise;
using process::Sequence;
-using process::Timeout;
-
-using process::after;
-using process::await;
-using process::collect;
-using process::defer;
-using process::loop;
using process::spawn;
+using process::Timeout;
using process::http::authentication::Principal;
@@ -425,6 +428,8 @@ private:
const id::UUID& operationUuid,
const Try<vector<ResourceConversion>>& conversions);
+ void garbageCollectOperationPath(const id::UUID& operationUuid);
+
void checkpointResourceProviderState();
void checkpointVolumeState(const string& volumeId);
@@ -1154,7 +1159,16 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
uuid.error());
}
- CHECK(operations.contains(uuid.get()));
+ // NOTE: This could happen if we failed to remove the operation path before.
+ if (!operations.contains(uuid.get())) {
+ LOG(WARNING)
+ << "Ignoring unknown operation (uuid: " << uuid.get()
+ << ") for resource provider " << info.id();
+
+ garbageCollectOperationPath(uuid.get());
+ continue;
+ }
+
operationUuids.emplace_back(std::move(uuid.get()));
}
@@ -1165,27 +1179,23 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
using StreamState =
typename OperationStatusUpdateManagerState::StreamState;
- // Clean up the operations that are terminated.
+ // Clean up the operations that are completed.
+ vector<id::UUID> completedOperations;
foreachpair (const id::UUID& uuid,
const Option<StreamState>& stream,
statusUpdateManagerState.streams) {
if (stream.isSome() && stream->terminated) {
operations.erase(uuid);
-
- // Garbage collect the operation metadata.
- const string path = slave::paths::getOperationPath(
- slave::paths::getResourceProviderPath(
- metaDir, slaveId, info.type(), info.name(), info.id()),
- uuid);
-
- Try<Nothing> rmdir = os::rmdir(path);
- if (rmdir.isError()) {
- return Failure(
- "Failed to remove directory '" + path + "': " + rmdir.error());
- }
+ completedOperations.push_back(uuid);
}
}
+ // Garbage collect the operation streams after checkpointing.
+ checkpointResourceProviderState();
+ foreach (const id::UUID& uuid, completedOperations) {
+ garbageCollectOperationPath(uuid);
+ }
+
// Send updates for all missing statuses.
foreachpair (const id::UUID& uuid,
const Operation& operation,
@@ -1790,25 +1800,11 @@ void StorageLocalResourceProviderProcess::acknowledgeOperationStatus(
// acknowledgement will be received. In this case, the following call
// will fail, so we just leave an error log.
statusUpdateManager.acknowledgement(operationUuid.get(), statusUuid.get())
- .then(defer(self(), [=](bool continuation) -> Future<Nothing> {
+ .then(defer(self(), [=](bool continuation) {
if (!continuation) {
operations.erase(operationUuid.get());
-
- // Garbage collect the operation metadata.
- const string path = slave::paths::getOperationPath(
- slave::paths::getResourceProviderPath(
- metaDir, slaveId, info.type(), info.name(), info.id()),
- operationUuid.get());
-
- // NOTE: We check if the path exists since we do not checkpoint
- // some status updates, such as OPERATION_DROPPED.
- if (os::exists(path)) {
- Try<Nothing> rmdir = os::rmdir(path);
- if (rmdir.isError()) {
- return Failure(
- "Failed to remove directory '" + path + "': " + rmdir.error());
- }
- }
+ checkpointResourceProviderState();
+ garbageCollectOperationPath(operationUuid.get());
}
return Nothing();
@@ -3436,6 +3432,28 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
}
+void StorageLocalResourceProviderProcess::garbageCollectOperationPath(
+ const id::UUID& operationUuid)
+{
+ CHECK(!operations.contains(operationUuid));
+
+ const string path = slave::paths::getOperationPath(
+ slave::paths::getResourceProviderPath(
+ metaDir, slaveId, info.type(), info.name(), info.id()),
+ operationUuid);
+
+ // NOTE: We check if the path exists since we do not checkpoint some status
+ // updates, such as OPERATION_DROPPED.
+ if (os::exists(path)) {
+ Try<Nothing> rmdir = os::rmdir(path);
+ if (rmdir.isError()) {
+ LOG(ERROR)
+ << "Failed to remove directory '" << path << "': " << rmdir.error();
+ }
+ }
+}
+
+
void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
{
ResourceProviderState state;
@@ -3476,7 +3494,9 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
const string statePath = slave::paths::getResourceProviderStatePath(
metaDir, slaveId, info.type(), info.name(), info.id());
- Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state);
+ // NOTE: We ensure the checkpoint is synced to the filesystem to avoid
+ // resulting in a stale or empty checkpoint when a system crash happens.
+ Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state, true);
CHECK_SOME(checkpoint)
<< "Failed to checkpoint resource provider state to '" << statePath << "': "
<< checkpoint.error();
@@ -3492,8 +3512,10 @@ void StorageLocalResourceProviderProcess::checkpointVolumeState(
info.storage().plugin().name(),
volumeId);
+ // NOTE: We ensure the checkpoint is synced to the filesystem to avoid
+ // resulting in a stale or empty checkpoint when a system crash happens.
Try<Nothing> checkpoint =
- slave::state::checkpoint(statePath, volumes.at(volumeId).state);
+ slave::state::checkpoint(statePath, volumes.at(volumeId).state, true);
CHECK_SOME(checkpoint)
<< "Failed to checkpoint volume state to '" << statePath << "':"
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 003211e..4f3d4ce 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -122,9 +122,10 @@ namespace internal {
inline Try<Nothing> checkpoint(
const std::string& path,
- const std::string& message)
+ const std::string& message,
+ bool sync)
{
- return ::os::write(path, message);
+ return ::os::write(path, message, sync);
}
@@ -133,7 +134,7 @@ template <
typename std::enable_if<
std::is_convertible<T*, google::protobuf::Message*>::value,
int>::type = 0>
-inline Try<Nothing> checkpoint(const std::string& path, T message)
+inline Try<Nothing> checkpoint(const std::string& path, T message, bool sync)
{
// If the `Try` from `downgradeResources` returns an `Error`, we currently
// continue to checkpoint the resources in a partially downgraded state.
@@ -144,13 +145,14 @@ inline Try<Nothing> checkpoint(const std::string& path, T message)
// TODO(mpark): Do something smarter with the result once
// something like an agent recovery capability is introduced.
downgradeResources(&message);
- return ::protobuf::write(path, message);
+ return ::protobuf::write(path, message, sync);
}
inline Try<Nothing> checkpoint(
const std::string& path,
- google::protobuf::RepeatedPtrField<Resource> resources)
+ google::protobuf::RepeatedPtrField<Resource> resources,
+ bool sync)
{
// If the `Try` from `downgradeResources` returns an `Error`, we currently
// continue to checkpoint the resources in a partially downgraded state.
@@ -161,16 +163,17 @@ inline Try<Nothing> checkpoint(
// TODO(mpark): Do something smarter with the result once
// something like an agent recovery capability is introduced.
downgradeResources(&resources);
- return ::protobuf::write(path, resources);
+ return ::protobuf::write(path, resources, sync);
}
inline Try<Nothing> checkpoint(
const std::string& path,
- const Resources& resources)
+ const Resources& resources,
+ bool sync)
{
const google::protobuf::RepeatedPtrField<Resource>& messages = resources;
- return checkpoint(path, messages);
+ return checkpoint(path, messages, sync);
}
} // namespace internal {
@@ -187,14 +190,19 @@ inline Try<Nothing> checkpoint(
//
// NOTE: We provide atomic (all-or-nothing) semantics here by always
// writing to a temporary file first then using os::rename to atomically
-// move it to the desired path.
+// move it to the desired path. If `sync` is set to true, this call succeeds
+// only if `fsync` is supported and successfully commits the changes to the
+// filesystem for the checkpoint file and each created directory.
+//
+// TODO(chhsiao): Consider enabling syncing by default after evaluating its
+// performance impact.
template <typename T>
-Try<Nothing> checkpoint(const std::string& path, const T& t)
+Try<Nothing> checkpoint(const std::string& path, const T& t, bool sync = false)
{
// Create the base directory.
std::string base = Path(path).dirname();
- Try<Nothing> mkdir = os::mkdir(base);
+ Try<Nothing> mkdir = os::mkdir(base, true, sync);
if (mkdir.isError()) {
return Error("Failed to create directory '" + base + "': " + mkdir.error());
}
@@ -211,7 +219,7 @@ Try<Nothing> checkpoint(const std::string& path, const T& t)
}
// Now checkpoint the instance of T to the temporary file.
- Try<Nothing> checkpoint = internal::checkpoint(temp.get(), t);
+ Try<Nothing> checkpoint = internal::checkpoint(temp.get(), t, sync);
if (checkpoint.isError()) {
// Try removing the temporary file on error.
os::rm(temp.get());
@@ -221,7 +229,7 @@ Try<Nothing> checkpoint(const std::string& path, const T& t)
}
// Rename the temporary file to the path.
- Try<Nothing> rename = os::rename(temp.get(), path);
+ Try<Nothing> rename = os::rename(temp.get(), path, sync);
if (rename.isError()) {
// Try removing the temporary file on error.
os::rm(temp.get());