You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/07/13 21:41:09 UTC
[04/17] mesos git commit: Reconciled storage pools when destroying
volumes with stale profiles.
Reconciled storage pools when destroying volumes with stale profiles.
The storage pools needs to be reconciled in the following two scenarios:
1. When there is a change in the set of known profiles.
2. When a volume/block of a disappeared profile is destroyed, because
the disk space being freed up may belong to another appeared profile.
This patch sequentializes reconciliations triggered by the above two
cases.
Review: https://reviews.apache.org/r/65975
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16d19ac3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16d19ac3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16d19ac3
Branch: refs/heads/master
Commit: 16d19ac37dc3b7954bba5b7860d0899a5dc468e6
Parents: 1418686
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Mar 7 20:35:57 2018 -0800
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Fri Jul 13 14:30:06 2018 -0700
----------------------------------------------------------------------
src/resource_provider/storage/provider.cpp | 388 ++++++++++++++----------
1 file changed, 232 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/16d19ac3/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index b90a4b8..07645fc 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -98,6 +98,7 @@ using process::Sequence;
using process::Timeout;
using process::after;
+using process::await;
using process::collect;
using process::defer;
using process::loop;
@@ -308,9 +309,8 @@ public:
slaveId(_slaveId),
authToken(_authToken),
strict(_strict),
- reconciling(false),
resourceVersion(id::UUID::random()),
- operationSequence("operation-sequence"),
+ sequence("storage-local-resource-provider-sequence"),
metrics("resource_providers/" + info.type() + "." + info.name() + "/")
{
diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
@@ -361,12 +361,23 @@ private:
const Resources& checkpointed,
const Resources& discovered);
- // Helper for updating the profiles mapping upon receiving an updated
- // set of profiles from the DiskProfileAdaptor module.
- Future<Nothing> updateProfiles();
+ // Spawns a loop to watch for changes in the set of known profiles and update
+ // the profile mapping and storage pools accordingly.
+ void watchProfiles();
- // Reconcile the storage pools upon profile updates.
- Future<Nothing> reconcileProfileUpdates();
+ // Update the profile mapping when the set of known profiles changes.
+ // NOTE: This function never fails. If it fails to translate a new
+ // profile, the resource provider will continue to operate with the
+ // set of profiles it knows about.
+ Future<Nothing> updateProfiles(const hashset<string>& profiles);
+
+ // Reconcile the storage pools when the set of known profiles changes,
+ // or a volume with an unknown profile is destroyed.
+ Future<Nothing> reconcileStoragePools();
+
+ // Returns true if the storage pools are allowed to be reconciled when
+ // the operation is being applied.
+ static bool allowsReconciliation(const Offer::Operation& operation);
// Functions for received events.
void subscribed(const Event::Subscribed& subscribed);
@@ -465,12 +476,6 @@ private:
// The mapping of known profiles fetched from the DiskProfileAdaptor.
hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
- // The last set of profile names fetched from the DiskProfileAdaptor.
- hashset<string> knownProfiles;
-
- // True if a reconciliation of storage pools is happening.
- bool reconciling;
-
hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services;
@@ -493,10 +498,14 @@ private:
id::UUID resourceVersion;
hashmap<string, VolumeData> volumes;
- // We maintain a sequence to keep track of ongoing volume/block
- // creation or destroy. These operations will not be sequentialized
- // through the sequence. It is simply used to wait for them to finish.
- Sequence operationSequence;
+ // If pending, it means that the storage pools are being reconciled, and all
+ // incoming operations that disallow reconciliation will be dropped.
+ Future<Nothing> reconciled;
+
+ // We maintain a sequence to coordinate reconciliations of storage pools. It
+ // keeps track of pending operations that disallow reconciliation, and ensures
+ // that any reconciliation waits for these operations to finish.
+ Sequence sequence;
struct Metrics
{
@@ -664,30 +673,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
statusUpdateManager.pause();
- auto err = [](const string& message) {
- LOG(ERROR)
- << "Failed to watch for DiskProfileAdaptor: " << message;
- };
-
- // Start watching the DiskProfileAdaptor.
- // TODO(chhsiao): Consider retrying with backoff.
- loop(
- self(),
- [=] {
- return diskProfileAdaptor->watch(knownProfiles, info)
- .then(defer(self(), [=](const hashset<string>& profiles) {
- // Save the returned set of profiles so that we
- // can watch the module for changes to it.
- knownProfiles = profiles;
-
- return updateProfiles()
- .then(defer(self(), &Self::reconcileProfileUpdates));
- }));
- },
- [](Nothing) -> ControlFlow<Nothing> { return Continue(); })
- .onFailed(std::bind(err, lambda::_1))
- .onDiscarded(std::bind(err, "future discarded"));
-
driver.reset(new Driver(
Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
contentType,
@@ -1271,6 +1256,92 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
}
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
+{
+ CHECK_PENDING(reconciled);
+
+ auto die = [=](const string& message) {
+ LOG(ERROR)
+ << "Failed to reconcile storage pools for resource provider " << info.id()
+ << ": " << message;
+ fatal();
+ };
+
+ return getCapacities()
+ .then(defer(self(), [=](const Resources& discovered) {
+ ResourceConversion conversion = reconcileResources(
+ totalResources.filter(
+ [](const Resource& r) { return !r.disk().source().has_id(); }),
+ discovered);
+
+ Try<Resources> result = totalResources.apply(conversion);
+ CHECK_SOME(result);
+
+ if (result.get() != totalResources) {
+ LOG(INFO)
+ << "Removing '" << conversion.consumed << "' and adding '"
+ << conversion.converted << "' to the total resources";
+
+ totalResources = result.get();
+ checkpointResourceProviderState();
+
+ // NOTE: We always update the resource version before sending
+ // an `UPDATE_STATE`, so that any racing speculative operation
+ // will be rejected. Otherwise, the speculative resource
+ // conversion done on the master will be cancelled out.
+ resourceVersion = id::UUID::random();
+ sendResourceProviderStateUpdate();
+ }
+
+ return Nothing();
+ }))
+ .onFailed(defer(self(), std::bind(die, lambda::_1)))
+ .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+}
+
+
+bool StorageLocalResourceProviderProcess::allowsReconciliation(
+ const Offer::Operation& operation)
+{
+ switch (operation.type()) {
+ case Offer::Operation::RESERVE:
+ case Offer::Operation::UNRESERVE: {
+ Resources consumedStoragePools =
+ CHECK_NOTERROR(protobuf::getConsumedResources(operation))
+ .filter([](const Resource& r) {
+ return r.disk().source().has_profile() &&
+ r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+ });
+
+ return consumedStoragePools.empty();
+ }
+ case Offer::Operation::CREATE:
+ case Offer::Operation::DESTROY: {
+ return true;
+ }
+ case Offer::Operation::CREATE_VOLUME:
+ case Offer::Operation::DESTROY_VOLUME:
+ case Offer::Operation::CREATE_BLOCK:
+ case Offer::Operation::DESTROY_BLOCK: {
+ return false;
+ }
+ case Offer::Operation::GROW_VOLUME:
+ case Offer::Operation::SHRINK_VOLUME: {
+ // TODO(chhsiao): These operations are currently not supported for
+ // resource providers, and should have been validated by the master.
+ UNREACHABLE();
+ }
+ case Offer::Operation::UNKNOWN:
+ case Offer::Operation::LAUNCH:
+ case Offer::Operation::LAUNCH_GROUP: {
+ UNREACHABLE();
+ }
+ }
+
+ UNREACHABLE();
+}
+
+
ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
const Resources& checkpointed,
const Resources& discovered)
@@ -1319,93 +1390,80 @@ ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
}
-Future<Nothing> StorageLocalResourceProviderProcess::updateProfiles()
+void StorageLocalResourceProviderProcess::watchProfiles()
{
- LOG(INFO)
- << "Updating metadata for profiles: " << stringify(knownProfiles);
+ auto err = [](const string& message) {
+ LOG(ERROR) << "Failed to watch for DiskProfileAdaptor: " << message;
+ };
+
+ // TODO(chhsiao): Consider retrying with backoff.
+ loop(
+ self(),
+ [=] {
+ return diskProfileAdaptor->watch(profileInfos.keys(), info);
+ },
+ [=](const hashset<string>& profiles) {
+ CHECK(info.has_id());
+
+ LOG(INFO)
+ << "Updating profiles " << stringify(profiles)
+ << " for resource provider " << info.id();
+
+ std::function<Future<Nothing>()> update = defer(self(), [=] {
+ return updateProfiles(profiles)
+ .then(defer(self(), &Self::reconcileStoragePools));
+ });
+ // Update the profile mapping and storage pools in `sequence` to wait
+ // for any pending operation that disallow reconciliation or the last
+ // reconciliation (if any) to finish, and set up `reconciled` to drop
+ // incoming operations that disallow reconciliation until the storage
+ // pools are reconciled.
+ reconciled = sequence.add(update);
+
+ return reconciled
+ .then(defer(self(), [=]() -> ControlFlow<Nothing> {
+ return Continue();
+ }));
+ })
+ .onFailed(std::bind(err, lambda::_1))
+ .onDiscarded(std::bind(err, "future discarded"));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::updateProfiles(
+ const hashset<string>& profiles)
+{
vector<Future<Nothing>> futures;
- foreach (const string& profile, knownProfiles) {
- // Since profiles are immutable after creation and cannot be
- // deleted, we do not need to update any profile that is already in
- // the mapping.
+ foreach (const string& profile, profiles) {
+ // Since profiles are immutable after creation, we do not need to
+ // translate any profile that is already in the mapping.
// TODO(chhsiao): Handle profile deactivation.
if (profileInfos.contains(profile)) {
continue;
}
+ auto err = [](const string& profile, const string& message) {
+ LOG(ERROR)
+ << "Failed to translate profile '" << profile << "': " << message;
+ };
+
futures.push_back(diskProfileAdaptor->translate(profile, info)
- .then(defer(self(), [=](const DiskProfileAdaptor::ProfileInfo& info) {
- profileInfos.put(profile, info);
+ .then(defer(self(), [=](
+ const DiskProfileAdaptor::ProfileInfo& profileInfo) {
+ profileInfos.put(profile, profileInfo);
return Nothing();
- })));
+ }))
+ .onFailed(std::bind(err, profile, lambda::_1))
+ .onDiscarded(std::bind(err, profile, "future discarded")));
}
- return collect(futures).then([] { return Nothing(); });
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::reconcileProfileUpdates()
-{
- // Do nothing if the resource provider ID is not known yet, since it
- // is used to construct the resource metadata of storage pools. The
- // metadata will be constructed in `reconcileResourceProviderState`.
- if (!info.has_id()) {
- return Nothing();
- }
-
- CHECK(!reconciling);
-
- LOG(INFO) << "Reconciling storage pools for resource provider " << info.id();
-
- reconciling = true;
-
- // We add a lambda into `OperationSequence` so that it will return
- // after waiting for all pending operations in the sequence.
- return operationSequence.add(
- std::function<Future<Nothing>()>([] { return Nothing(); }))
- .then(defer(self(), &Self::getCapacities))
- .then(defer(self(), [=](const Resources& discovered) {
- auto isStoragePool = [](const Resource& r) {
- return !r.disk().source().has_id();
- };
-
- ResourceConversion conversion = reconcileResources(
- totalResources.filter(isStoragePool),
- discovered);
-
- Try<Resources> result = totalResources.apply(conversion);
- CHECK_SOME(result);
-
- if (result.get() != totalResources) {
- LOG(INFO)
- << "Removing '" << conversion.consumed << "' and adding '"
- << conversion.converted << "' to the total resources";
-
- totalResources = result.get();
- checkpointResourceProviderState();
-
- // NOTE: We ensure that the first `UPDATE_STATE` of the current
- // subscription is sent by `reconcileResourceProviderState`, so
- // that the total resources contain existing volumes.
- if (state == READY) {
- // NOTE: We always update the resource version before sending
- // an `UPDATE_STATE`, so that any racing speculative operation
- // will be rejected. Otherwise, the speculative resource
- // conversion done on the master will be cancelled out.
- resourceVersion = id::UUID::random();
- sendResourceProviderStateUpdate();
- }
- }
-
- LOG(INFO)
- << "Finished reconciliation of storage pools for resource provider "
- << info.id();
-
- reconciling = false;
-
- return Nothing();
- }));
+ // We use `await` here to return a future that never fails, so the loop in
+ // `watchProfiles` will continue to watch for profile changes. If any profile
+ // translation fails, the profile will not be added to the set of known
+ // profiles and thus the disk profile adaptor will notify the resource
+ // provider again.
+ return await(futures).then([] { return Nothing(); });
}
@@ -1436,9 +1494,11 @@ void StorageLocalResourceProviderProcess::subscribed(
fatal();
};
- // Reconcile resources after obtaining the resource provider ID.
- // TODO(chhsiao): Do the reconciliation early.
- reconcileResourceProviderState()
+ // Reconcile resources after obtaining the resource provider ID and start
+ // watching for profile changes after the reconciliation.
+ // TODO(chhsiao): Reconcile and watch for profile changes early.
+ reconciled = reconcileResourceProviderState()
+ .onReady(defer(self(), &Self::watchProfiles))
.onFailed(defer(self(), std::bind(die, lambda::_1)))
.onDiscarded(defer(self(), std::bind(die, "future discarded")));
}
@@ -1467,7 +1527,7 @@ void StorageLocalResourceProviderProcess::applyOperation(
"Cannot apply operation in SUBSCRIBED state");
}
- if (reconciling) {
+ if (reconciled.isPending() && !allowsReconciliation(operation.info())) {
return dropOperation(
uuid.get(),
frameworkId,
@@ -2813,14 +2873,9 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
.then(defer(self(), [=](csi::v0::Client client) {
vector<Future<Resources>> futures;
- foreach (const string& profile, knownProfiles) {
- CHECK(profileInfos.contains(profile));
-
- // TODO(chhsiao): Skip inactive profiles.
-
- const DiskProfileAdaptor::ProfileInfo& profileInfo =
- profileInfos.at(profile);
-
+ foreachpair (const string& profile,
+ const DiskProfileAdaptor::ProfileInfo& profileInfo,
+ profileInfos) {
csi::v0::GetCapacityRequest request;
request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
*request.mutable_parameters() = profileInfo.parameters;
@@ -2859,7 +2914,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
CHECK(!protobuf::isTerminalState(operation.latest_status().state()));
Future<vector<ResourceConversion>> conversions;
- Option<Resource> source;
switch (operation.info().type()) {
case Offer::Operation::RESERVE:
@@ -2876,9 +2930,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
case Offer::Operation::CREATE_VOLUME: {
CHECK(operation.info().has_create_volume());
- source = operation.info().create_volume().source();
conversions = applyCreateVolumeOrBlock(
- source.get(),
+ operation.info().create_volume().source(),
operationUuid,
operation.info().create_volume().target_type());
@@ -2887,17 +2940,16 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
case Offer::Operation::DESTROY_VOLUME: {
CHECK(operation.info().has_destroy_volume());
- source = operation.info().destroy_volume().volume();
- conversions = applyDestroyVolumeOrBlock(source.get());
+ conversions = applyDestroyVolumeOrBlock(
+ operation.info().destroy_volume().volume());
break;
}
case Offer::Operation::CREATE_BLOCK: {
CHECK(operation.info().has_create_block());
- source = operation.info().create_block().source();
conversions = applyCreateVolumeOrBlock(
- source.get(),
+ operation.info().create_block().source(),
operationUuid,
Resource::DiskInfo::Source::BLOCK);
@@ -2906,8 +2958,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
case Offer::Operation::DESTROY_BLOCK: {
CHECK(operation.info().has_destroy_block());
- source = operation.info().destroy_block().block();
- conversions = applyDestroyVolumeOrBlock(source.get());
+ conversions = applyDestroyVolumeOrBlock(
+ operation.info().destroy_block().block());
break;
}
@@ -2924,7 +2976,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
}
}
- // NOTE: The code below is executed only when applying a storage operation.
+ CHECK(!protobuf::isSpeculativeOperation(operation.info()))
+ << "Unexpected speculative operation: " << operation.info().type();
+
shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
conversions
@@ -2946,12 +3000,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
Future<Nothing> future = promise->future();
- CHECK_SOME(source);
- if (source->disk().source().has_profile()) {
- // We place the future in `operationSequence` so it can be waited
- // for during reconciliation upon profile updates.
- operationSequence.add(
- std::function<Future<Nothing>()>([future] { return future; }));
+ if (!allowsReconciliation(operation.info())) {
+ // We place the future in `sequence` so it can be waited before reconciling
+ // storage pools.
+ sequence.add(std::function<Future<Nothing>()>([future] { return future; }));
}
return future;
@@ -3199,7 +3251,6 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
CHECK(volumes.contains(resource.disk().source().id()));
// Sequentialize the deletion with other operation on the same volume.
- // NOTE: A resource has no profile iff it is a pre-existing volume.
return volumes.at(resource.disk().source().id()).sequence->add(
std::function<Future<Nothing>()>(defer(
self(),
@@ -3207,19 +3258,44 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
resource.disk().source().id(),
!resource.disk().source().has_profile())))
.then(defer(self(), [=]() {
- // TODO(chhsiao): Convert to an empty resource and update all
- // storage pools if the profile has been deactivated.
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_type(
Resource::DiskInfo::Source::RAW);
converted.mutable_disk()->mutable_source()->clear_path();
converted.mutable_disk()->mutable_source()->clear_mount();
- // NOTE: We keep the source ID and metadata if it is a
- // pre-existing volume, which has no profile.
+ // We only clear the the volume ID and metadata if the destroyed volume is
+ // not a pre-existing volume.
if (resource.disk().source().has_profile()) {
converted.mutable_disk()->mutable_source()->clear_id();
converted.mutable_disk()->mutable_source()->clear_metadata();
+
+ if (!profileInfos.contains(resource.disk().source().profile())) {
+ // The destroyed volume is converted into an empty resource to prevent
+ // the freed disk from being sent out with a disappeared profile.
+ converted.mutable_scalar()->set_value(0);
+
+ // Since the profile disappears, The freed disk might be claimed by
+ // other appeared profiles. If there is an ongoing reconciliation, it
+ // is waiting for this operation to finish and will recover the freed
+ // disk, so no reconciliation should be done here. Otherwise, we
+ // reconcile the storage pools to recover the freed disk.
+ if (!reconciled.isPending()) {
+ CHECK(info.has_id());
+
+ LOG(INFO)
+ << "Reconciling storage pools for resource provider " << info.id()
+ << " after the disk with profile '"
+ << resource.disk().source().profile() << "' has been freed";
+
+ // Reconcile the storage pools in `sequence` to wait for any other
+ // pending operation that disallow reconciliation to finish, and set
+ // up `reconciled` to drop incoming operations that disallow
+ // reconciliation until the storage pools are reconciled.
+ reconciled = sequence.add(std::function<Future<Nothing>()>(
+ defer(self(), &Self::reconcileStoragePools)));
+ }
+ }
}
vector<ResourceConversion> conversions;
@@ -3319,10 +3395,7 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
if (error.isSome()) {
// We only send `UPDATE_STATE` for failed speculative operations.
- if (operation.info().type() == Offer::Operation::RESERVE ||
- operation.info().type() == Offer::Operation::UNRESERVE ||
- operation.info().type() == Offer::Operation::CREATE ||
- operation.info().type() == Offer::Operation::DESTROY) {
+ if (protobuf::isSpeculativeOperation(operation.info())) {
resourceVersion = id::UUID::random();
sendResourceProviderStateUpdate();
}
@@ -3419,15 +3492,18 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
<< "' and " << update->operations_size() << " operations to agent "
<< slaveId;
- auto err = [](const ResourceProviderID& id, const string& message) {
+ // NOTE: We terminate the resource provider here if the state cannot be
+ // updated, so that the state is in sync with the agent's view.
+ auto die = [=](const ResourceProviderID& id, const string& message) {
LOG(ERROR)
<< "Failed to update state for resource provider " << id << ": "
<< message;
+ fatal();
};
driver->send(evolve(call))
- .onFailed(std::bind(err, info.id(), lambda::_1))
- .onDiscarded(std::bind(err, info.id(), "future discarded"));
+ .onFailed(std::bind(die, info.id(), lambda::_1))
+ .onDiscarded(std::bind(die, info.id(), "future discarded"));
}