You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:04 UTC
[mesos] 07/15: Cleanup volume creation,
validation and deletion for SLRP.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 75cae10dbca0e3d4146ed1a75d939b1e29b194d7
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:23:51 2019 -0700
Cleanup volume creation, validation and deletion for SLRP.
This patch introduces methods for volume creation, validation and
deletion that conform to `VolumeManager`'s public interface in SLRP, and
cleans up SLRP based on these functions. They will be moved out from
SLRP to v0 `VolumeManager` later.
Specifically, volume deletion now supports deleting untracked volumes.
Review: https://reviews.apache.org/r/70217/
---
src/resource_provider/storage/provider.cpp | 350 +++++++++++----------
src/resource_provider/storage/provider_process.hpp | 22 +-
2 files changed, 200 insertions(+), 172 deletions(-)
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 84e5557..5a63e7b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -81,6 +81,7 @@
#include "csi/service_manager.hpp"
#include "csi/state.hpp"
#include "csi/utils.hpp"
+#include "csi/volume_manager.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
@@ -109,6 +110,8 @@ using std::shared_ptr;
using std::string;
using std::vector;
+using google::protobuf::Map;
+
using process::after;
using process::await;
using process::Break;
@@ -1965,45 +1968,51 @@ Future<Nothing> StorageLocalResourceProviderProcess::__unpublishVolume(
}
-Future<string> StorageLocalResourceProviderProcess::createVolume(
+Future<VolumeInfo> StorageLocalResourceProviderProcess::createVolume(
const string& name,
const Bytes& capacity,
- const DiskProfileAdaptor::ProfileInfo& profileInfo)
+ const types::VolumeCapability& capability,
+ const Map<string, string>& parameters)
{
if (!controllerCapabilities->createDeleteVolume) {
return Failure(
- "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
+ "CREATE_DELETE_VOLUME controller capability is not supported for CSI "
+ "plugin type '" + info.type() + "' and name '" + info.name());
}
- csi::v0::CreateVolumeRequest request;
+ LOG(INFO) << "Creating volume with name '" << name << "'";
+
+ CreateVolumeRequest request;
request.set_name(name);
request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
- *request.add_volume_capabilities() = csi::v0::evolve(profileInfo.capability);
- *request.mutable_parameters() = profileInfo.parameters;
+ *request.add_volume_capabilities() = csi::v0::evolve(capability);
+ *request.mutable_parameters() = parameters;
- return call<csi::v0::CREATE_VOLUME>(
- csi::CONTROLLER_SERVICE, std::move(request), true) // Retry.
- .then(defer(self(), [=](
- const csi::v0::CreateVolumeResponse& response) -> string {
- const csi::v0::Volume& volume = response.volume();
+ // We retry the `CreateVolume` call for MESOS-9517.
+ return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+ .then(process::defer(self(), [=](
+ const CreateVolumeResponse& response) -> Future<VolumeInfo> {
+ const string& volumeId = response.volume().id();
- if (volumes.contains(volume.id())) {
- // The resource provider failed over after the last `createVolume` call,
- // but before the operation status was checkpointed.
- CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state());
- } else {
- VolumeState volumeState;
- volumeState.set_state(VolumeState::CREATED);
- *volumeState.mutable_volume_capability() = profileInfo.capability;
- *volumeState.mutable_parameters() = profileInfo.parameters;
- *volumeState.mutable_volume_attributes() = volume.attributes();
-
- volumes.put(volume.id(), std::move(volumeState));
- checkpointVolumeState(volume.id());
+ // NOTE: If the volume is already tracked, there might already be
+ // operations running in its sequence. Since this continuation runs
+ // outside the sequence, we fail the call here to avoid any race issue.
+ // This also means that this call is not idempotent.
+ if (volumes.contains(volumeId)) {
+ return Failure("Volume with name '" + name + "' already exists");
}
- return volume.id();
+ VolumeState volumeState;
+ volumeState.set_state(VolumeState::CREATED);
+ *volumeState.mutable_volume_capability() = capability;
+ *volumeState.mutable_parameters() = parameters;
+ *volumeState.mutable_volume_attributes() = response.volume().attributes();
+
+ volumes.put(volumeId, std::move(volumeState));
+ checkpointVolumeState(volumeId);
+
+ return VolumeInfo{capacity, volumeId, response.volume().attributes()};
}));
}
@@ -2011,145 +2020,162 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
const string& volumeId)
{
- Future<Nothing> deleted = Nothing();
-
- // If the volume has a checkpointed state, we transition it to `CREATED` state
- // before deleting. Otherwise, we have to delete it directly because we have
- // no idea what state this volume is in.
- if (volumes.contains(volumeId)) {
- VolumeData& volume = volumes.at(volumeId);
-
- if (volume.state.node_publish_required()) {
- CHECK_EQ(VolumeState::PUBLISHED, volume.state.state());
-
- const string targetPath = csi::paths::getMountTargetPath(
- csi::paths::getMountRootDir(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name()),
- volumeId);
-
- // NOTE: Normally the volume should have been cleaned up before
- // `deleteVolume` is called. However this may not be true for
- // preprovisioned volumes (e.g., leftover from a previous resource
- // provider instance). To prevent data leakage in such cases, we clean up
- // the data (but not the target path) here.
- Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
- if (rmdir.isError()) {
- return Failure(
- "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
- }
+ if (!volumes.contains(volumeId)) {
+ return __deleteVolume(volumeId);
+ }
- volume.state.set_node_publish_required(false);
- checkpointVolumeState(volumeId);
+ VolumeData& volume = volumes.at(volumeId);
+
+ LOG(INFO) << "Deleting volume '" << volumeId << "' in "
+ << volume.state.state() << " state";
+
+ // Volume deletion is sequentialized with other operations on the same volume
+ // to avoid races.
+ return volume.sequence->add(std::function<Future<bool>()>(
+ process::defer(self(), &Self::_deleteVolume, volumeId)));
+}
+
+
+Future<bool> StorageLocalResourceProviderProcess::_deleteVolume(
+ const std::string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ if (volumeState.node_publish_required()) {
+ CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
+
+ const string targetPath = paths::getMountTargetPath(
+ paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
+ volumeId);
+
+ // NOTE: Normally the volume should have been cleaned up. However this may
+ // not be true for preprovisioned volumes (e.g., leftover from a previous
+ // resource provider instance). To prevent data leakage in such cases, we
+ // clean up the data (but not the target path) here.
+ Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
+ if (rmdir.isError()) {
+ return Failure(
+ "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
}
- deleted = _detachVolume(volumeId);
+ volumeState.set_node_publish_required(false);
+ checkpointVolumeState(volumeId);
}
- // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
- // supported. Otherwise, we simply leave it as a preprovisioned volume.
- if (controllerCapabilities->createDeleteVolume) {
- deleted = deleted
- .then(defer(self(), [this, volumeId] {
- csi::v0::DeleteVolumeRequest request;
- request.set_volume_id(volumeId);
-
- return call<csi::v0::DELETE_VOLUME>(
- csi::CONTROLLER_SERVICE, std::move(request), true) // Retry.
- .then([] { return Nothing(); });
- }));
+ if (volumeState.state() != VolumeState::CREATED) {
+ // Retry after transitioning the volume to `CREATED` state.
+ return _detachVolume(volumeId)
+ .then(process::defer(self(), &Self::_deleteVolume, volumeId));
}
- // NOTE: The last asynchronous continuation of `deleteVolume`, which is
- // supposed to be run in the volume's sequence if it exists, would cause the
- // sequence to be destructed, which would in turn discard the returned future.
- // However, since the continuation would have already been run, the returned
- // future will become ready.
- return deleted
- .then(defer(self(), [this, volumeId] {
- if (volumes.contains(volumeId)) {
- volumes.erase(volumeId);
+ // NOTE: The last asynchronous continuation, which is supposed to be run in
+ // the volume's sequence, would cause the sequence to be destructed, which
+ // would in turn discard the returned future. However, since the continuation
+ // would have already been run, the returned future will become ready, making
+ // the future returned by the sequence ready as well.
+ return __deleteVolume(volumeId)
+ .then(process::defer(self(), [this, volumeId](bool deleted) {
+ volumes.erase(volumeId);
- const string volumePath = csi::paths::getVolumePath(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name(),
- volumeId);
+ const string volumePath = paths::getVolumePath(
+ rootDir, pluginInfo.type(), pluginInfo.name(), volumeId);
- Try<Nothing> rmdir = os::rmdir(volumePath);
- CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
- << volumePath << "': " << rmdir.error();
+ Try<Nothing> rmdir = os::rmdir(volumePath);
+ CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
+ << volumePath << "': " << rmdir.error();
- garbageCollectMountPath(volumeId);
- }
+ garbageCollectMountPath(volumeId);
- return controllerCapabilities->createDeleteVolume;
+ return deleted;
}));
}
-Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
- const string& volumeId,
- const Option<Labels>& metadata,
- const DiskProfileAdaptor::ProfileInfo& profileInfo)
+Future<bool> StorageLocalResourceProviderProcess::__deleteVolume(
+ const string& volumeId)
+{
+ if (!controllerCapabilities->createDeleteVolume) {
+ return false;
+ }
+
+ LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '"
+ << volumeId << "'";
+
+ DeleteVolumeRequest request;
+ request.set_volume_id(volumeId);
+
+ // We retry the `DeleteVolume` call for MESOS-9517.
+ return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+ .then([] { return true; });
+}
+
+
+Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume(
+ const VolumeInfo& volumeInfo,
+ const types::VolumeCapability& capability,
+ const Map<string, string>& parameters)
{
- // If the volume has a checkpointed state, the validation succeeds only if the
+ // If the volume has been checkpointed, the validation succeeds only if the
// capability and parameters of the specified profile are the same as those in
// the checkpoint.
- if (volumes.contains(volumeId)) {
- const VolumeState& volumeState = volumes.at(volumeId).state;
+ if (volumes.contains(volumeInfo.id)) {
+ const VolumeState& volumeState = volumes.at(volumeInfo.id).state;
- if (volumeState.volume_capability() != profileInfo.capability) {
- return Failure("Invalid volume capability for volume '" + volumeId + "'");
+ if (volumeState.volume_capability() != capability) {
+ return Some(
+ Error("Mismatched capability for volume '" + volumeInfo.id + "'"));
}
- if (volumeState.parameters() != profileInfo.parameters) {
- return Failure("Invalid parameters for volume '" + volumeId + "'");
+ if (volumeState.parameters() != parameters) {
+ return Some(
+ Error("Mismatched parameters for volume '" + volumeInfo.id + "'"));
}
- return Nothing();
+ return None();
}
- if (!pluginCapabilities->controllerService) {
- return Failure(
- "Plugin capability 'CONTROLLER_SERVICE' is not supported");
+ if (!parameters.empty()) {
+ LOG(WARNING)
+ << "Validating volumes against parameters is not supported in CSI v0";
}
- google::protobuf::Map<string, string> volumeAttributes;
+ LOG(INFO) << "Validating volume '" << volumeInfo.id << "'";
- if (metadata.isSome()) {
- volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
- }
+ ValidateVolumeCapabilitiesRequest request;
+ request.set_volume_id(volumeInfo.id);
+ *request.add_volume_capabilities() = csi::v0::evolve(capability);
+ *request.mutable_volume_attributes() = volumeInfo.context;
- // TODO(chhsiao): Validate the volume against the parameters of the profile
- // once we get CSI v1.
- csi::v0::ValidateVolumeCapabilitiesRequest request;
- request.set_volume_id(volumeId);
- *request.add_volume_capabilities() = csi::v0::evolve(profileInfo.capability);
- *request.mutable_volume_attributes() = volumeAttributes;
-
- return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
- csi::CONTROLLER_SERVICE, std::move(request))
- .then(defer(self(), [=](
- const csi::v0::ValidateVolumeCapabilitiesResponse& response)
- -> Future<Nothing> {
+ return call<VALIDATE_VOLUME_CAPABILITIES>(
+ CONTROLLER_SERVICE, std::move(request))
+ .then(process::defer(self(), [=](
+ const ValidateVolumeCapabilitiesResponse& response)
+ -> Future<Option<Error>> {
if (!response.supported()) {
- return Failure(
- "Unsupported volume capability for volume '" + volumeId + "': " +
- response.message());
+ return Error(
+ "Unsupported volume capability for volume '" + volumeInfo.id +
+ "': " + response.message());
+ }
+
+ // NOTE: If the volume is already tracked, there might already be
+ // operations running in its sequence. Since this continuation runs
+ // outside the sequence, we fail the call here to avoid any race issue.
+ // This also means that this call is not idempotent.
+ if (volumes.contains(volumeInfo.id)) {
+ return Failure("Volume '" + volumeInfo.id + "' already validated");
}
VolumeState volumeState;
volumeState.set_state(VolumeState::CREATED);
- *volumeState.mutable_volume_capability() = profileInfo.capability;
- *volumeState.mutable_parameters() = profileInfo.parameters;
- *volumeState.mutable_volume_attributes() = volumeAttributes;
+ *volumeState.mutable_volume_capability() = capability;
+ *volumeState.mutable_parameters() = parameters;
+ *volumeState.mutable_volume_attributes() = volumeInfo.context;
- volumes.put(volumeId, std::move(volumeState));
- checkpointVolumeState(volumeId);
+ volumes.put(volumeInfo.id, std::move(volumeState));
+ checkpointVolumeState(volumeInfo.id);
- return Nothing();
+ return None();
}));
}
@@ -2471,32 +2497,45 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
// TODO(chhsiao): Consider calling `createVolume` sequentially with other
// create or delete operations, and send an `UPDATE_STATE` for storage pools
// afterward. See MESOS-9254.
- Future<string> created = resource.disk().source().has_profile()
- ? createVolume(
- operationUuid.toString(),
- Bytes(resource.scalar().value() * Bytes::MEGABYTES),
- profileInfo)
- : validateVolume(
- resource.disk().source().id(),
- resource.disk().source().has_metadata()
- ? resource.disk().source().metadata()
- : Option<Labels>::none(),
- profileInfo)
- .then([=]() -> string { return resource.disk().source().id(); });
+ Future<VolumeInfo> created;
+ if (resource.disk().source().has_profile()) {
+ created = createVolume(
+ operationUuid.toString(),
+ resource.scalar().value() * Bytes::MEGABYTES,
+ profileInfo.capability,
+ profileInfo.parameters);
+ } else {
+ VolumeInfo volumeInfo = {
+ resource.scalar().value() * Bytes::MEGABYTES,
+ resource.disk().source().id(),
+ CHECK_NOTERROR(convertLabelsToStringMap(
+ resource.disk().source().metadata()))
+ };
- return created
- .then(defer(self(), [=](const string& volumeId) {
- CHECK(volumes.contains(volumeId));
- const VolumeState& volumeState = volumes.at(volumeId).state;
+ created = validateVolume(
+ volumeInfo, profileInfo.capability, profileInfo.parameters)
+ .then([resource, profile, volumeInfo](
+ const Option<Error>& error) -> Future<VolumeInfo> {
+ if (error.isSome()) {
+ return Failure(
+ "Cannot apply profile '" + profile + "' to resource '" +
+ stringify(resource) + "': " + error->message);
+ }
+
+ return volumeInfo;
+ });
+ }
+ return created
+ .then(defer(self(), [=](const VolumeInfo& volumeInfo) {
Resource converted = resource;
- converted.mutable_disk()->mutable_source()->set_id(volumeId);
+ converted.mutable_disk()->mutable_source()->set_id(volumeInfo.id);
converted.mutable_disk()->mutable_source()->set_type(targetType);
converted.mutable_disk()->mutable_source()->set_profile(profile);
- if (!volumeState.volume_attributes().empty()) {
- converted.mutable_disk()->mutable_source()->mutable_metadata()
- ->CopyFrom(convertStringMapToLabels(volumeState.volume_attributes()));
+ if (!volumeInfo.context.empty()) {
+ *converted.mutable_disk()->mutable_source()->mutable_metadata() =
+ convertStringMapToLabels(volumeInfo.context);
}
const string mountRootDir = csi::paths::getMountRootDir(
@@ -2507,8 +2546,8 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
switch (targetType) {
case Resource::DiskInfo::Source::MOUNT: {
// Set the root path relative to agent work dir.
- converted.mutable_disk()->mutable_source()->mutable_mount()
- ->set_root(mountRootDir);
+ converted.mutable_disk()->mutable_source()->mutable_mount()->set_root(
+ mountRootDir);
break;
}
@@ -2537,16 +2576,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
CHECK(!Resources::isPersistentVolume(resource));
CHECK(resource.disk().source().has_id());
- const string& volumeId = resource.disk().source().id();
-
- Future<bool> deleted =
- volumes.contains(volumeId)
- ? volumes.at(volumeId).sequence->add(std::function<Future<bool>()>(
- defer(self(), &Self::deleteVolume, volumeId)))
- : deleteVolume(volumeId);
-
- // Sequentialize the deletion with other operation on the same volume.
- return deleted
+ return deleteVolume(resource.disk().source().id())
.then(defer(self(), [=](bool deprovisioned) {
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_type(
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
index 0b73531..b298a8e 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -56,6 +56,7 @@
#include "csi/service_manager.hpp"
#include "csi/state.hpp"
#include "csi/utils.hpp"
+#include "csi/volume_manager.hpp"
#include "status_update_manager/operation.hpp"
@@ -229,26 +230,23 @@ private:
// Transition a volume to `VOL_READY` state from any state below.
process::Future<Nothing> __unpublishVolume(const std::string& volumeId);
- // Returns a CSI volume ID.
- //
// NOTE: This can only be called after `prepareServices`.
- process::Future<std::string> createVolume(
+ process::Future<csi::VolumeInfo> createVolume(
const std::string& name,
const Bytes& capacity,
- const DiskProfileAdaptor::ProfileInfo& profileInfo);
+ const csi::types::VolumeCapability& capability,
+ const google::protobuf::Map<std::string, std::string>& parameters);
- // Returns true if the volume has been deprovisioned.
- //
// NOTE: This can only be called after `prepareServices`.
process::Future<bool> deleteVolume(const std::string& volumeId);
+ process::Future<bool> _deleteVolume(const std::string& volumeId);
+ process::Future<bool> __deleteVolume(const std::string& volumeId);
- // Validates if a volume supports the capability of the specified profile.
- //
// NOTE: This can only be called after `prepareServices`.
- process::Future<Nothing> validateVolume(
- const std::string& volumeId,
- const Option<Labels>& metadata,
- const DiskProfileAdaptor::ProfileInfo& profileInfo);
+ process::Future<Option<Error>> validateVolume(
+ const csi::VolumeInfo& volumeInfo,
+ const csi::types::VolumeCapability& capability,
+ const google::protobuf::Map<std::string, std::string>& parameters);
// NOTE: This can only be called after `prepareServices`.
process::Future<Resources> listVolumes();