You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/19 23:14:36 UTC
[06/12] mesos git commit: Refactored and fixed bugs for SLRP resource
reconciliation.
Refactored and fixed bugs for SLRP resource reconciliation.
Review: https://reviews.apache.org/r/64644/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28bf0891
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28bf0891
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28bf0891
Branch: refs/heads/master
Commit: 28bf0891bf0b985153fa129e69fda0b7fd97d456
Parents: b36152d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:06 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800
----------------------------------------------------------------------
src/resource_provider/storage/provider.cpp | 261 ++++++++++++------------
1 file changed, 133 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/28bf0891/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a103494..8510a31 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -342,6 +342,9 @@ private:
Future<Nothing> recoverStatusUpdates();
void doReliableRegistration();
Future<Nothing> reconcileResourceProviderState();
+ ResourceConversion reconcileResources(
+ const Resources& checkpointed,
+ const Resources& discovered);
// Functions for received events.
void subscribed(const Event::Subscribed& subscribed);
@@ -358,7 +361,6 @@ private:
Future<Nothing> prepareControllerService();
Future<Nothing> prepareNodeService();
- Future<Resources> discoverResources();
Future<Nothing> controllerPublish(const string& volumeId);
Future<Nothing> controllerUnpublish(const string& volumeId);
Future<Nothing> nodePublish(const string& volumeId);
@@ -372,7 +374,8 @@ private:
const string& volumeId,
const Option<Labels>& metadata,
const csi::VolumeCapability& capability);
- Future<Resources> getCapacities(const hashmap<string, ProfileData>& profiles);
+ Future<Resources> listVolumes();
+ Future<Resources> getCapacities();
Future<Nothing> _applyOfferOperation(const id::UUID& operationUuid);
@@ -969,6 +972,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
// We replay all pending operations here, so that if a volume is
// created or deleted before the last failover, the result will be
// reflected in the total resources before reconciliation.
+ list<Future<Nothing>> futures;
+
foreachpair (const id::UUID& uuid,
const OfferOperation& operation,
offerOperations) {
@@ -982,12 +987,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
<< message;
};
- _applyOfferOperation(uuid)
+ futures.push_back(_applyOfferOperation(uuid)
.onFailed(std::bind(err, uuid, lambda::_1))
- .onDiscarded(std::bind(err, uuid, "future discarded"));
+ .onDiscarded(std::bind(err, uuid, "future discarded")));
}
- return Nothing();
+ // We await the futures instead of collect them because it is OK
+ // for offer operations to fail.
+ return await(futures).then([] { return Nothing(); });
}));
}
@@ -1025,69 +1032,77 @@ Future<Nothing>
StorageLocalResourceProviderProcess::reconcileResourceProviderState()
{
return recoverStatusUpdates()
- .then(defer(self(), &Self::discoverResources))
- .then(defer(self(), [=](Resources discoveredResources) {
- // NODE: If a resource in the checkpointed total resources is
- // missing in the discovered resources, we will still keep it if
- // it is converted by an offer operation before (i.e., has extra
- // info other than the default reservations). The reason is that
- // we want to maintain a consistent view with frameworks, and do
- // not want to lose any data on persistent volumes due to some
- // temporarily CSI plugin faults. Other missing resources that are
- // "unconverted" by any framework will be removed from the total
- // resources. Then, any newly discovered resource will be reported
- // under the default reservations.
-
- Resources result;
- Resources unconvertedTotal;
-
- foreach (const Resource& resource, totalResources) {
- Resource unconverted = createRawDiskResource(
- info,
- Bytes(resource.scalar().value(), Bytes::MEGABYTES),
- resource.disk().source().has_profile()
- ? resource.disk().source().profile() : Option<string>::none(),
- resource.disk().source().has_id()
- ? resource.disk().source().id() : Option<string>::none(),
- resource.disk().source().has_metadata()
- ? resource.disk().source().metadata() : Option<Labels>::none());
- if (discoveredResources.contains(unconverted)) {
- // The checkponited resource appears in the discovered resources.
- result += resource;
- unconvertedTotal += unconverted;
- } else if (!totalResources.contains(unconverted)) {
- // The checkpointed resource is missing but converted by a
- // framework or the operator before, so we keep it.
- result += resource;
-
- LOG(WARNING)
- << "Missing converted resource '" << resource
- << "'. This might cause further offer operations to fail.";
- }
- }
-
- // NOTE: The states of newly discovered pre-existing volumes will
- // be added to `volumes` when `CREATE_VOLUME` or `CREATE_BLOCK`
- // operations are applied.
- const Resources newResources = discoveredResources - unconvertedTotal;
- result += newResources;
+ .then(defer(self(), [=] {
+ return collect(list<Future<Resources>>{listVolumes(), getCapacities()})
+ .then(defer(self(), [=](const list<Resources>& discovered) {
+ ResourceConversion conversion = reconcileResources(
+ totalResources,
+ accumulate(discovered.begin(), discovered.end(), Resources()));
+
+ Try<Resources> result = totalResources.apply(conversion);
+ CHECK_SOME(result);
+
+ if (result.get() != totalResources) {
+ totalResources = result.get();
+ checkpointResourceProviderState();
+ }
- LOG(INFO) << "Adding new resources '" << newResources << "'";
+ sendResourceProviderStateUpdate();
+ statusUpdateManager.resume();
- // TODO(chhsiao): Check that all profiles exist.
+ state = READY;
- if (result != totalResources) {
- totalResources = result;
- checkpointResourceProviderState();
- }
+ return Nothing();
+ }));
+ }));
+}
- sendResourceProviderStateUpdate();
- statusUpdateManager.resume();
- state = READY;
+ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
+ const Resources& checkpointed,
+ const Resources& discovered)
+{
+ // NOTE: If a resource in the checkpointed resources is missing in the
+ // discovered resources, we will still keep it if it is converted by
+ // an offer operation before (i.e., has extra info other than the
+ // default reservations). The reason is that we want to maintain a
+ // consistent view with frameworks, and do not want to lose any data on
+ // persistent volumes due to some temporarily CSI plugin faults. Other
+ // missing resources that are "unconverted" by any framework will be
+ // removed. Then, any newly discovered resource will be added.
+ Resources toRemove;
+ Resources toAdd = discovered;
+
+ foreach (const Resource& resource, checkpointed) {
+ Resource unconverted = createRawDiskResource(
+ info,
+ Bytes(resource.scalar().value(), Bytes::MEGABYTES),
+ resource.disk().source().has_profile()
+ ? resource.disk().source().profile() : Option<string>::none(),
+ resource.disk().source().has_id()
+ ? resource.disk().source().id() : Option<string>::none(),
+ resource.disk().source().has_metadata()
+ ? resource.disk().source().metadata() : Option<Labels>::none());
+
+ if (toAdd.contains(unconverted)) {
+ // If the remaining of the discovered resources contain the
+ // "unconverted" version of a checkpointed resource, this is not a
+ // new resource.
+ toAdd -= unconverted;
+ } else if (checkpointed.contains(unconverted)) {
+ // If the remaining of the discovered resources does not contain
+ // the "unconverted" version of the checkpointed resource, the
+ // resource is missing. However, if it remains unconverted in the
+ // checkpoint, we can safely remove it from the total resources.
+ toRemove += unconverted;
+ } else {
+ LOG(WARNING)
+ << "Missing converted resource '" << resource
+ << "'. This might cause further offer operations to fail.";
+ }
+ }
- return Nothing();
- }));
+ return ResourceConversion(std::move(toRemove), std::move(toAdd));
}
@@ -1786,67 +1801,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
}
-// Returns resources reported by the CSI plugin, which are unreserved
-// raw disk resources without any persistent volume.
-Future<Resources> StorageLocalResourceProviderProcess::discoverResources()
-{
- // NOTE: This can only be called after `prepareControllerService` and
- // the resource provider ID has been obtained.
- CHECK_SOME(controllerCapabilities);
- CHECK(info.has_id());
-
- list<Future<Resources>> futures;
- futures.push_back(getCapacities(profiles));
-
- if (controllerCapabilities->listVolumes) {
- futures.push_back(getService(controllerContainerId)
- .then(defer(self(), [=](csi::Client client) {
- // TODO(chhsiao): Set the max entries and use a loop to do
- // mutliple `ListVolumes` calls.
- csi::ListVolumesRequest request;
- request.mutable_version()->CopyFrom(csiVersion);
-
- return client.ListVolumes(request)
- .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
- Resources resources;
-
- // Recover volume profiles from the checkpointed state.
- hashmap<string, string> volumesToProfiles;
- foreach (const Resource& resource, totalResources) {
- if (resource.disk().source().has_id() &&
- resource.disk().source().has_profile()) {
- volumesToProfiles.put(
- resource.disk().source().id(),
- resource.disk().source().profile());
- }
- }
-
- foreach (const auto& entry, response.entries()) {
- resources += createRawDiskResource(
- info,
- Bytes(entry.volume_info().capacity_bytes()),
- volumesToProfiles.contains(entry.volume_info().id())
- ? volumesToProfiles.at(entry.volume_info().id())
- : Option<string>::none(),
- entry.volume_info().id(),
- entry.volume_info().attributes().empty()
- ? Option<Labels>::none()
- : convertStringMapToLabels(
- entry.volume_info().attributes()));
- }
-
- return resources;
- }));
- })));
- }
-
- return collect(futures)
- .then(defer(self(), [=](const list<Resources>& resources) {
- return accumulate(resources.begin(), resources.end(), Resources());
- }));
-}
-
-
Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
const string& volumeId)
{
@@ -2284,17 +2238,68 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
}
-// Returns RAW disk resources for specified profiles.
-Future<Resources> StorageLocalResourceProviderProcess::getCapacities(
- const hashmap<string, ProfileData>& profiles)
+Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
+{
+ // NOTE: This can only be called after `prepareControllerService` and
+ // the resource provider ID has been obtained.
+ CHECK_SOME(controllerCapabilities);
+ CHECK(info.has_id());
+
+ // This is only used for reconciliation so no failure is returned.
+ if (!controllerCapabilities->listVolumes) {
+ return Resources();
+ }
+
+ return getService(controllerContainerId)
+ .then(defer(self(), [=](csi::Client client) {
+ // TODO(chhsiao): Set the max entries and use a loop to do
+ // mutliple `ListVolumes` calls.
+ csi::ListVolumesRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+
+ return client.ListVolumes(request)
+ .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
+ Resources resources;
+
+ // Recover volume profiles from the checkpointed state.
+ hashmap<string, string> volumesToProfiles;
+ foreach (const Resource& resource, totalResources) {
+ if (resource.disk().source().has_id() &&
+ resource.disk().source().has_profile()) {
+ volumesToProfiles.put(
+ resource.disk().source().id(),
+ resource.disk().source().profile());
+ }
+ }
+
+ foreach (const auto& entry, response.entries()) {
+ resources += createRawDiskResource(
+ info,
+ Bytes(entry.volume_info().capacity_bytes()),
+ volumesToProfiles.contains(entry.volume_info().id())
+ ? volumesToProfiles.at(entry.volume_info().id())
+ : Option<string>::none(),
+ entry.volume_info().id(),
+ entry.volume_info().attributes().empty()
+ ? Option<Labels>::none()
+ : convertStringMapToLabels(
+ entry.volume_info().attributes()));
+ }
+
+ return resources;
+ }));
+ }));
+}
+
+
+Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
{
// NOTE: This can only be called after `prepareControllerService` and
// the resource provider ID has been obtained.
CHECK_SOME(controllerCapabilities);
CHECK(info.has_id());
- // We do not return a failure because this is always called when a
- // profile is added or a `CreateVolume` CSI call is made.
+ // This is only used for reconciliation so no failure is returned.
if (!controllerCapabilities->getCapacity) {
return Resources();
}