You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/14 05:03:19 UTC
[4/7] mesos git commit: Supporting pre-existing volumes with no
profile in SLRP.
Supporting pre-existing volumes with no profile in SLRP.
The storage local resource provider will now report two catagories of
RAW resources: resources that represents storage pools with profiles,
and resources that represents pre-existing volumes with IDs but no
profile. When applying `CREATE_VOLUME` or `CREATE_BLOCK` on pre-existing
volumes, we issue a `ValicateVolumeCapabilities` CSI call with the
default Mount or Block capabilities and convert the RAW resources into
MOUNT, PATH or BLOCK resources. When applying `DESTROY_VOLUME` or
`DESTROY_BLOCK` on these resources, they will be converted back to RAW
resources with IDs, but the pre-existing volumes won't be deleted.
Review: https://reviews.apache.org/r/64583/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/673b421d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/673b421d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/673b421d
Branch: refs/heads/master
Commit: 673b421d414f2c8855ffb7ca47e0dc23e52af3ce
Parents: dbd4596
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Dec 13 20:46:56 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 13 20:46:56 2017 -0800
----------------------------------------------------------------------
src/resource_provider/storage/provider.cpp | 431 ++++++++++++++----------
1 file changed, 257 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/673b421d/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 03a12c7..e239317 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -351,22 +351,22 @@ private:
Future<Nothing> prepareControllerService();
Future<Nothing> prepareNodeService();
- Future<Resources> importResources();
+ Future<Resources> discoverResources();
Future<Nothing> controllerPublish(const string& volumeId);
Future<Nothing> controllerUnpublish(const string& volumeId);
Future<Nothing> nodePublish(const string& volumeId);
Future<Nothing> nodeUnpublish(const string& volumeId);
-
- // Returns a CSI volume ID.
Future<string> createVolume(
const string& name,
const Bytes& capacity,
const ProfileData& profile);
- Future<Nothing> deleteVolume(const string& volumeId);
+ Future<Nothing> deleteVolume(const string& volumeId, bool preExisting);
+ Future<string> validateCapability(
+ const string& volumeId,
+ const Option<Labels>& metadata,
+ const csi::VolumeCapability& capability);
+ Future<Resources> getCapacities(const hashmap<string, ProfileData>& profiles);
- // Applies the offer operation. Conventional operations will be
- // synchronously applied. Do nothing if the operation is already in a
- // terminal state.
Future<Nothing> _applyOfferOperation(const UUID& operationUuid);
Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
@@ -376,9 +376,6 @@ private:
Future<vector<ResourceConversion>> applyDestroyVolumeOrBlock(
const Resource& resource);
- // Synchronously updates `totalResources` and the offer operation
- // status and then asks the status update manager to send status
- // updates.
Try<Nothing> updateOfferOperationStatus(
const UUID& operationUuid,
const Try<vector<ResourceConversion>>& conversions);
@@ -412,6 +409,8 @@ private:
const bool strict;
csi::Version csiVersion;
+ csi::VolumeCapability defaultMountCapability;
+ csi::VolumeCapability defaultBlockCapability;
string bootId;
hashmap<string, ProfileData> profiles;
process::grpc::client::Runtime runtime;
@@ -437,6 +436,9 @@ private:
LinkedHashMap<UUID, OfferOperation> offerOperations;
Resources totalResources;
UUID resourceVersion;
+
+ // We maintain the state of a CSI volume if and only if its
+ // corresponding resource is not RAW.
hashmap<string, VolumeData> volumes;
};
@@ -506,6 +508,14 @@ void StorageLocalResourceProviderProcess::initialize()
csiVersion.set_minor(1);
csiVersion.set_patch(0);
+ // Default mount and block capabilities for pre-existing volumes.
+ defaultMountCapability.mutable_mount();
+ defaultMountCapability.mutable_access_mode()
+ ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+ defaultBlockCapability.mutable_block();
+ defaultBlockCapability.mutable_access_mode()
+ ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+
Try<string> _bootId = os::bootId();
if (_bootId.isError()) {
return fatal("Failed to get boot ID", _bootId.error());
@@ -537,10 +547,8 @@ void StorageLocalResourceProviderProcess::initialize()
}
}
- // NOTE: The name of the default profile is an empty string, which is
- // the default value for `Resource.disk.source.profile` when unset.
// TODO(chhsiao): Use the volume profile module.
- ProfileData& defaultProfile = profiles[""];
+ ProfileData& defaultProfile = profiles["default"];
defaultProfile.capability.mutable_mount();
defaultProfile.capability.mutable_access_mode()
->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
@@ -999,17 +1007,17 @@ Future<Nothing>
StorageLocalResourceProviderProcess::reconcileResourceProviderState()
{
return recoverStatusUpdates()
- .then(defer(self(), &Self::importResources))
- .then(defer(self(), [=](Resources importedResources) {
+ .then(defer(self(), &Self::discoverResources))
+ .then(defer(self(), [=](Resources discoveredResources) {
// NODE: If a resource in the checkpointed total resources is
- // missing in the imported resources, we will still keep it if it
- // is converted by an offer operation before (i.e., has extra info
- // other than the default reservations). The reason is that we
- // want to maintain a consistent view with frameworks, and do not
- // want to lose any data on persistent volumes due to some
+ // missing in the discovered resources, we will still keep it if
+ // it is converted by an offer operation before (i.e., has extra
+ // info other than the default reservations). The reason is that
+ // we want to maintain a consistent view with frameworks, and do
+ // not want to lose any data on persistent volumes due to some
// temporarily CSI plugin faults. Other missing resources that are
// "unconverted" by any framework will be removed from the total
- // resources. Then, any new imported resource will be reported
+ // resources. Then, any newly discovered resource will be reported
// under the default reservations.
Resources result;
@@ -1025,8 +1033,8 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState()
? resource.disk().source().id() : Option<string>::none(),
resource.disk().source().has_metadata()
? resource.disk().source().metadata() : Option<Labels>::none());
- if (importedResources.contains(unconverted)) {
- // The checkponited resource appears in the imported resources.
+ if (discoveredResources.contains(unconverted)) {
+ // The checkponited resource appears in the discovered resources.
result += resource;
unconvertedTotal += unconverted;
} else if (!totalResources.contains(unconverted)) {
@@ -1040,30 +1048,15 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState()
}
}
- foreach (Resource resource, importedResources - unconvertedTotal) {
- if (resource.disk().source().has_id() &&
- !volumes.contains(resource.disk().source().id())) {
- csi::state::VolumeState volumeState;
- volumeState.set_state(csi::state::VolumeState::CREATED);
-
- // The default profile is used if `profile` is unset.
- volumeState.mutable_volume_capability()->CopyFrom(
- profiles.at(resource.disk().source().profile()).capability);
+ // NOTE: The states of newly discovered pre-existing volumes will
+ // be added to `volumes` when `CREATE_VOLUME` or `CREATE_BLOCK`
+ // operations are applied.
+ const Resources newResources = discoveredResources - unconvertedTotal;
+ result += newResources;
- if (resource.disk().source().has_metadata()) {
- volumeState.mutable_volume_attributes()->swap(
- convertLabelsToStringMap(
- resource.disk().source().metadata()).get());
- }
+ LOG(INFO) << "Adding new resources '" << newResources << "'";
- volumes.put(resource.disk().source().id(), std::move(volumeState));
- checkpointVolumeState(resource.disk().source().id());
- }
-
- result += resource;
-
- LOG(INFO) << "Adding new resource '" << resource << "'";
- }
+ // TODO(chhsiao): Check that all profiles exist.
if (result != totalResources) {
totalResources = result;
@@ -1754,17 +1747,18 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
// Returns resources reported by the CSI plugin, which are unreserved
// raw disk resources without any persistent volume.
-Future<Resources> StorageLocalResourceProviderProcess::importResources()
+Future<Resources> StorageLocalResourceProviderProcess::discoverResources()
{
// NOTE: This can only be called after `prepareControllerService` and
// the resource provider ID has been obtained.
CHECK_SOME(controllerCapabilities);
CHECK(info.has_id());
- Future<Resources> preprovisioned;
+ list<Future<Resources>> futures;
+ futures.push_back(getCapacities(profiles));
if (controllerCapabilities->listVolumes) {
- preprovisioned = getService(controllerContainerId)
+ futures.push_back(getService(controllerContainerId)
.then(defer(self(), [=](csi::Client client) {
// TODO(chhsiao): Set the max entries and use a loop to do
// mutliple `ListVolumes` calls.
@@ -1780,8 +1774,9 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources()
foreach (const Resource& resource, totalResources) {
if (resource.disk().source().has_id() &&
resource.disk().source().has_profile()) {
- volumesToProfiles[resource.disk().source().id()] =
- resource.disk().source().profile();
+ volumesToProfiles.put(
+ resource.disk().source().id(),
+ resource.disk().source().profile());
}
}
@@ -1801,78 +1796,12 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources()
return resources;
}));
- }));
- } else {
- preprovisioned = Resources();
+ })));
}
- return preprovisioned
- .then(defer(self(), [=](const Resources& preprovisioned) {
- list<Future<Resources>> futures;
-
- foreach (const Resource& resource, preprovisioned) {
- futures.push_back(getService(controllerContainerId)
- .then(defer(self(), [=](csi::Client client) {
- csi::ValidateVolumeCapabilitiesRequest request;
- request.mutable_version()->CopyFrom(csiVersion);
- request.set_volume_id(resource.disk().source().id());
-
- // The default profile is used if `profile` is unset.
- request.add_volume_capabilities()->CopyFrom(
- profiles.at(resource.disk().source().profile()).capability);
-
- if (resource.disk().source().has_metadata()) {
- request.mutable_volume_attributes()->swap(
- convertLabelsToStringMap(
- resource.disk().source().metadata()).get());
- }
-
- return client.ValidateVolumeCapabilities(request)
- .then(defer(self(), [=](
- const csi::ValidateVolumeCapabilitiesResponse& response)
- -> Future<Resources> {
- if (!response.supported()) {
- return Failure(
- "Unsupported volume capability for resource " +
- stringify(resource) + ": " + response.message());
- }
-
- return resource;
- }));
- })));
- }
-
- if (controllerCapabilities->getCapacity) {
- foreachkey (const string& profile, profiles) {
- futures.push_back(getService(controllerContainerId)
- .then(defer(self(), [=](csi::Client client) {
- csi::GetCapacityRequest request;
- request.mutable_version()->CopyFrom(csiVersion);
- request.add_volume_capabilities()
- ->CopyFrom(profiles.at(profile).capability);
- *request.mutable_parameters() = profiles.at(profile).parameters;
-
- return client.GetCapacity(request)
- .then(defer(self(), [=](
- const csi::GetCapacityResponse& response)
- -> Future<Resources> {
- if (response.available_capacity() == 0) {
- return Resources();
- }
-
- return createRawDiskResource(
- info,
- response.available_capacity(),
- profile.empty() ? Option<string>::none() : profile);
- }));
- })));
- }
- }
-
- return collect(futures)
- .then(defer(self(), [=](const list<Resources>& resources) {
- return accumulate(resources.begin(), resources.end(), Resources());
- }));
+ return collect(futures)
+ .then(defer(self(), [=](const list<Resources>& resources) {
+ return accumulate(resources.begin(), resources.end(), Resources());
}));
}
@@ -2131,6 +2060,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
}
+// Returns a CSI volume ID.
Future<string> StorageLocalResourceProviderProcess::createVolume(
const string& name,
const Bytes& capacity,
@@ -2170,8 +2100,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
volumeState.set_state(csi::state::VolumeState::CREATED);
volumeState.mutable_volume_capability()
->CopyFrom(profile.capability);
- *volumeState.mutable_volume_attributes() =
- volumeInfo.attributes();
+ *volumeState.mutable_volume_attributes() = volumeInfo.attributes();
volumes.put(volumeInfo.id(), std::move(volumeState));
checkpointVolumeState(volumeInfo.id());
@@ -2184,14 +2113,17 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
- const string& volumeId)
+ const string& volumeId,
+ bool preExisting)
{
// NOTE: This can only be called after `prepareControllerService` and
// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
CHECK_SOME(controllerCapabilities);
CHECK_SOME(nodeId);
- if (!controllerCapabilities->createDeleteVolume) {
+ // We do not need the capability for pre-existing volumes since no
+ // actual `DeleteVolume` call will be made.
+ if (!preExisting && !controllerCapabilities->createDeleteVolume) {
return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
}
@@ -2216,21 +2148,26 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
.then(defer(self(), &Self::controllerUnpublish, volumeId));
}
case csi::state::VolumeState::CREATED: {
+ if (!preExisting) {
+ deleted = deleted
+ .then(defer(self(), &Self::getService, controllerContainerId))
+ .then(defer(self(), [=](csi::Client client) {
+ csi::DeleteVolumeRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+ request.set_volume_id(volumeId);
+
+ return client.DeleteVolume(request)
+ .then([] { return Nothing(); });
+ }));
+ }
+
deleted = deleted
- .then(defer(self(), &Self::getService, controllerContainerId))
- .then(defer(self(), [=](csi::Client client) {
- csi::DeleteVolumeRequest request;
- request.mutable_version()->CopyFrom(csiVersion);
- request.set_volume_id(volumeId);
-
- return client.DeleteVolume(request)
- .then(defer(self(), [=] {
- // NOTE: This will destruct the volume's sequence!
- volumes.erase(volumeId);
- CHECK_SOME(os::rmdir(volumePath));
-
- return Nothing();
- }));
+ .then(defer(self(), [=] {
+ // NOTE: This will destruct the volume's sequence!
+ volumes.erase(volumeId);
+ CHECK_SOME(os::rmdir(volumePath));
+
+ return Nothing();
}));
break;
}
@@ -2257,6 +2194,110 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
}
+// Validates if a volume has the specified capability. This is called
+// when applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing
+// volume, so we make it returns a volume ID, similar to `createVolume`.
+Future<string> StorageLocalResourceProviderProcess::validateCapability(
+ const string& volumeId,
+ const Option<Labels>& metadata,
+ const csi::VolumeCapability& capability)
+{
+ return getService(controllerContainerId)
+ .then(defer(self(), [=](csi::Client client) {
+ google::protobuf::Map<string, string> volumeAttributes;
+
+ if (metadata.isSome()) {
+ volumeAttributes.swap(convertLabelsToStringMap(metadata.get()).get());
+ }
+
+ csi::ValidateVolumeCapabilitiesRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+ request.set_volume_id(volumeId);
+ request.add_volume_capabilities()->CopyFrom(capability);
+ *request.mutable_volume_attributes() = volumeAttributes;
+
+ return client.ValidateVolumeCapabilities(request)
+ .then(defer(self(), [=](
+ const csi::ValidateVolumeCapabilitiesResponse& response)
+ -> Future<string> {
+ if (!response.supported()) {
+ return Failure(
+ "Unsupported volume capability for volume '" + volumeId +
+ "': " + response.message());
+ }
+
+ if (volumes.contains(volumeId)) {
+ // The resource provider failed over after the last
+ // `ValidateVolumeCapability` call, but before the offer
+ // operation status was checkpointed.
+ CHECK_EQ(csi::state::VolumeState::CREATED,
+ volumes.at(volumeId).state.state());
+ } else {
+ csi::state::VolumeState volumeState;
+ volumeState.set_state(csi::state::VolumeState::CREATED);
+ volumeState.mutable_volume_capability()->CopyFrom(capability);
+ *volumeState.mutable_volume_attributes() = volumeAttributes;
+
+ volumes.put(volumeId, std::move(volumeState));
+ checkpointVolumeState(volumeId);
+ }
+
+ return volumeId;
+ }));
+ }));
+}
+
+
+// Returns RAW disk resources for specified profiles.
+Future<Resources> StorageLocalResourceProviderProcess::getCapacities(
+ const hashmap<string, ProfileData>& profiles)
+{
+ // NOTE: This can only be called after `prepareControllerService` and
+ // the resource provider ID has been obtained.
+ CHECK_SOME(controllerCapabilities);
+ CHECK(info.has_id());
+
+ // We do not return a failure because this is always called when a
+ // profile is added or a `CreateVolume` CSI call is made.
+ if (!controllerCapabilities->getCapacity) {
+ return Resources();
+ }
+
+ return getService(controllerContainerId)
+ .then(defer(self(), [=](csi::Client client) {
+ list<Future<Resources>> futures;
+
+ foreachpair (const string& profile, const ProfileData& data, profiles) {
+ csi::GetCapacityRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+ request.add_volume_capabilities()->CopyFrom(data.capability);
+ *request.mutable_parameters() = data.parameters;
+
+ futures.push_back(client.GetCapacity(request)
+ .then(defer(self(), [=](
+ const csi::GetCapacityResponse& response) -> Resources {
+ if (response.available_capacity() == 0) {
+ return Resources();
+ }
+
+ return createRawDiskResource(
+ info,
+ response.available_capacity(),
+ profile);
+ })));
+ }
+
+ return collect(futures)
+ .then([](const list<Resources>& resources) {
+ return accumulate(resources.begin(), resources.end(), Resources());
+ });
+ }));
+}
+
+
+// Applies the offer operation. Conventional operations will be
+// synchronously applied. Do nothing if the operation is already in a
+// terminal state.
Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
const UUID& operationUuid)
{
@@ -2358,27 +2399,78 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
{
if (resource.disk().source().type() != Resource::DiskInfo::Source::RAW) {
return Failure(
- "Cannot create volume from source of " +
+ "Cannot create volume or block from source of " +
stringify(resource.disk().source().type()) + " type");
}
+ // NOTE: Currently we only support two type of RAW disk resources:
+ // 1. RAW disk from `GetCapacity` with a profile but no volume ID.
+ // 2. RAW disk from `ListVolumes` for a pre-existing volume, which
+ // has a volume ID but no profile.
+ //
+ // For 1, we check if its profile is mount or block capable, then
+ // call `CreateVolume` with the operation UUID as the name (so that
+ // the same volume will be returned when recovering from a failover).
+ // For 2, we call `ValidateVolumeCapabilities` with a default mount or
+ // block capability.
+ CHECK_NE(resource.disk().source().has_profile(),
+ resource.disk().source().has_id());
+
+ Future<string> created;
+
switch (type) {
case Resource::DiskInfo::Source::PATH:
case Resource::DiskInfo::Source::MOUNT: {
- if (!profiles.at(resource.disk().source().profile())
- .capability.has_mount()) {
- return Failure(
- "Profile '" + resource.disk().source().profile() +
- "' cannot be used for CREATE_VOLUME operation");
+ if (resource.disk().source().has_profile()) {
+ if (!profiles.at(resource.disk().source().profile())
+ .capability.has_mount()) {
+ return Failure(
+ "Profile '" + resource.disk().source().profile() +
+ "' cannot be used for CREATE_VOLUME operation");
+ }
+
+ // TODO(chhsiao): Call `CreateVolume` sequentially with other
+ // create or delete operations, and send an `UPDATE_STATE` for
+ // RAW profiled resources afterward.
+ created = createVolume(
+ operationUuid.toString(),
+ resource.scalar().value(),
+ profiles.at(resource.disk().source().profile()));
+ } else {
+ // No need to call `ValidateVolumeCapabilities` sequentially
+ // since the volume is not used and thus not in `volumes` yet.
+ created = validateCapability(
+ resource.disk().source().id(),
+ resource.disk().source().has_metadata()
+ ? resource.disk().source().metadata() : Option<Labels>::none(),
+ defaultMountCapability);
}
break;
}
case Resource::DiskInfo::Source::BLOCK: {
- if (!profiles.at(resource.disk().source().profile())
- .capability.has_block()) {
- return Failure(
- "Profile '" + resource.disk().source().profile() +
- "' cannot be used for CREATE_BLOCK operation");
+ if (resource.disk().source().has_profile()) {
+ if (!profiles.at(resource.disk().source().profile())
+ .capability.has_block()) {
+ return Failure(
+ "Profile '" + resource.disk().source().profile() +
+ "' cannot be used for CREATE_BLOCK operation");
+ }
+
+ // TODO(chhsiao): Call `CreateVolume` sequentially with other
+ // create or delete operations, and send an `UPDATE_STATE` for
+ // RAW profiled resources afterward.
+ created = createVolume(
+ operationUuid.toString(),
+ resource.scalar().value(),
+ profiles.at(resource.disk().source().profile()));
+ } else {
+ // No need to call `ValidateVolumeCapabilities` sequentially
+ // since the volume is not used and thus not in `volumes` yet.
+ created = validateCapability(
+ resource.disk().source().id(),
+ resource.disk().source().has_metadata()
+ ? resource.disk().source().metadata() : Option<Labels>::none(),
+ defaultBlockCapability);
}
break;
}
@@ -2388,26 +2480,6 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
}
}
- Future<string> created;
-
- if (resource.disk().source().has_id()) {
- // Preprovisioned volumes with RAW type.
- // TODO(chhsiao): Call `ValidateVolumeCapabilities` sequentially
- // once we use the profile module and make profile optional.
- CHECK(volumes.contains(resource.disk().source().id()));
- created = resource.disk().source().id();
- } else {
- // We use the operation UUID as the name of the volume, so the same
- // offer operation will create the same volume after recovery.
- // TODO(chhsiao): Call `CreateVolume` sequentially with other create
- // or delete operations.
- // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources.
- created = createVolume(
- operationUuid.toString(),
- resource.scalar().value(),
- profiles.at(resource.disk().source().profile()));
- }
-
return created
.then(defer(self(), [=](const string& volumeId) {
CHECK(volumes.contains(volumeId));
@@ -2471,7 +2543,7 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
case Resource::DiskInfo::Source::UNKNOWN:
case Resource::DiskInfo::Source::RAW: {
return Failure(
- "Cannot delete volume of " +
+ "Cannot destroy volume or block of " +
stringify(resource.disk().source().type()) + " type");
break;
}
@@ -2481,18 +2553,27 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
CHECK(volumes.contains(resource.disk().source().id()));
// Sequentialize the deletion with other operation on the same volume.
+ // NOTE: A resource has no profile iff it is a pre-existing volume.
return volumes.at(resource.disk().source().id()).sequence->add(
- std::function<Future<Nothing>()>(
- defer(self(), &Self::deleteVolume, resource.disk().source().id())))
+ std::function<Future<Nothing>()>(defer(
+ self(),
+ &Self::deleteVolume,
+ resource.disk().source().id(),
+ !resource.disk().source().has_profile())))
.then(defer(self(), [=]() {
Resource converted = resource;
- converted.mutable_disk()->mutable_source()->clear_id();
- converted.mutable_disk()->mutable_source()->clear_metadata();
converted.mutable_disk()->mutable_source()->set_type(
Resource::DiskInfo::Source::RAW);
converted.mutable_disk()->mutable_source()->clear_path();
converted.mutable_disk()->mutable_source()->clear_mount();
+ // NOTE: We keep the source ID and metadata if it is a
+ // pre-existing volume, which has no profile.
+ if (resource.disk().source().has_profile()) {
+ converted.mutable_disk()->mutable_source()->clear_id();
+ converted.mutable_disk()->mutable_source()->clear_metadata();
+ }
+
vector<ResourceConversion> conversions;
conversions.emplace_back(resource, std::move(converted));
@@ -2501,6 +2582,8 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
}
+// Synchronously updates `totalResources` and the offer operation status
+// and then asks the status update manager to send status updates.
Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
const UUID& operationUuid,
const Try<vector<ResourceConversion>>& conversions)