You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/11/29 18:32:04 UTC
[mesos] 05/09: Implemented the new `CREATE_DISK`/`DESTROY_DISK`
semantics in SLRP.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1c7be01b2b3a5f577b12713ea319c79769e55803
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Nov 15 12:25:18 2018 -0800
Implemented the new `CREATE_DISK`/`DESTROY_DISK` semantics in SLRP.
The default mount/block volume capabilities is removed from SLRP.
Instead, `CREATE_DISK` will convert a preprovisioned RAW disk to a
profile disk, and `DESTROY_DISK` will always deprovision a profile disk
as long as the CSI plugin is capable of deprovisioning volumes.
Review: https://reviews.apache.org/r/69361
---
src/csi/utils.hpp | 8 +
src/resource_provider/storage/provider.cpp | 235 ++++++++++++-----------------
2 files changed, 103 insertions(+), 140 deletions(-)
diff --git a/src/csi/utils.hpp b/src/csi/utils.hpp
index 5ce318e..9145c67 100644
--- a/src/csi/utils.hpp
+++ b/src/csi/utils.hpp
@@ -45,6 +45,14 @@ bool operator==(
bool operator==(const VolumeCapability& left, const VolumeCapability& right);
+inline bool operator!=(
+ const VolumeCapability& left,
+ const VolumeCapability& right)
+{
+ return !(left == right);
+}
+
+
std::ostream& operator<<(
std::ostream& stream,
const ControllerServiceCapability::RPC::Type& type);
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 5f755bf..97aa340 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -402,7 +402,7 @@ private:
const string& name,
const Bytes& capacity,
const DiskProfileAdaptor::ProfileInfo& profileInfo);
- Future<Nothing> deleteVolume(const string& volumeId, bool preExisting);
+ Future<bool> deleteVolume(const string& volumeId);
Future<string> validateCapability(
const string& volumeId,
const Option<Labels>& metadata,
@@ -420,7 +420,8 @@ private:
Future<vector<ResourceConversion>> applyCreateDisk(
const Resource& resource,
const id::UUID& operationUuid,
- const Resource::DiskInfo::Source::Type& type);
+ const Resource::DiskInfo::Source::Type& targetType,
+ const Option<string>& targetProfile);
Future<vector<ResourceConversion>> applyDestroyDisk(
const Resource& resource);
@@ -460,8 +461,6 @@ private:
shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
- csi::v0::VolumeCapability defaultMountCapability;
- csi::v0::VolumeCapability defaultBlockCapability;
string bootId;
process::grpc::client::Runtime runtime;
Owned<v1::resource_provider::Driver> driver;
@@ -586,14 +585,6 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
void StorageLocalResourceProviderProcess::initialize()
{
- // Default mount and block capabilities for pre-existing volumes.
- defaultMountCapability.mutable_mount();
- defaultMountCapability.mutable_access_mode()
- ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
- defaultBlockCapability.mutable_block();
- defaultBlockCapability.mutable_access_mode()
- ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
-
Try<string> _bootId = os::bootId();
if (_bootId.isError()) {
LOG(ERROR) << "Failed to get boot ID: " << _bootId.error();
@@ -2646,6 +2637,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
// Returns a CSI volume ID.
+//
// NOTE: This can only be called after `prepareControllerService`.
Future<string> StorageLocalResourceProviderProcess::createVolume(
const string& name,
@@ -2676,9 +2668,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
const csi::v0::Volume& volume = response.volume();
if (volumes.contains(volume.id())) {
- // The resource provider failed over after the last
- // `CreateVolume` call, but before the operation status was
- // checkpointed.
+ // The resource provider failed over after the last `createVolume`
+ // call, but before the operation status was checkpointed.
CHECK_EQ(VolumeState::CREATED,
volumes.at(volume.id()).state.state());
} else {
@@ -2698,19 +2689,13 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
}
+// Returns true if the volume has been deprovisioned.
+//
// NOTE: This can only be called after `prepareControllerService` and
// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
-Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
- const string& volumeId,
- bool preExisting)
+Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
+ const string& volumeId)
{
- // We do not need the capability for pre-existing volumes since no
- // actual `DeleteVolume` call will be made.
- if (!preExisting && !controllerCapabilities.createDeleteVolume) {
- return Failure(
- "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
- }
-
CHECK_SOME(controllerContainerId);
const string volumePath = csi::paths::getVolumePath(
@@ -2720,11 +2705,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
volumeId);
if (!volumes.contains(volumeId)) {
- // The resource provider failed over after the last `DeleteVolume`
- // call, but before the operation status was checkpointed.
+ // The resource provider failed over after the last `deleteVolume` call, but
+ // before the operation status was checkpointed.
CHECK(!os::exists(volumePath));
- return Nothing();
+ return controllerCapabilities.createDeleteVolume;
}
const VolumeData& volume = volumes.at(volumeId);
@@ -2762,7 +2747,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
// state once the above is done.
}
case VolumeState::CREATED: {
- if (!preExisting) {
+ // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
+ // supported. Otherwise, we simply leave it as a preprovisioned volume.
+ if (controllerCapabilities.createDeleteVolume) {
deleted = deleted
.then(defer(self(), &Self::getService, controllerContainerId.get()))
.then(defer(self(), [this, volumeId](csi::v0::Client client) {
@@ -2800,7 +2787,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
volumes.erase(volumeId);
CHECK_SOME(os::rmdir(volumePath));
- return Nothing();
+ return controllerCapabilities.createDeleteVolume;
}));
}
@@ -2990,7 +2977,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
conversions = applyCreateDisk(
operation.info().create_disk().source(),
operationUuid,
- operation.info().create_disk().target_type());
+ operation.info().create_disk().target_type(),
+ operation.info().create_disk().has_target_profile()
+ ? operation.info().create_disk().target_profile()
+ : Option<string>::none());
break;
}
@@ -3094,120 +3084,50 @@ Future<vector<ResourceConversion>>
StorageLocalResourceProviderProcess::applyCreateDisk(
const Resource& resource,
const id::UUID& operationUuid,
- const Resource::DiskInfo::Source::Type& type)
+ const Resource::DiskInfo::Source::Type& targetType,
+ const Option<string>& targetProfile)
{
CHECK_EQ(Resource::DiskInfo::Source::RAW, resource.disk().source().type());
- // NOTE: Currently we only support two type of RAW disk resources:
+ // NOTE: Currently we only support two types of RAW disk resources:
// 1. RAW disk from `GetCapacity` with a profile but no volume ID.
- // 2. RAW disk from `ListVolumes` for a pre-existing volume, which
- // has a volume ID but no profile.
+ // 2. RAW disk from `ListVolumes` for a preprovisioned volume, which has a
+ // volume ID but no profile.
//
// For 1, we check if its profile is mount or block capable, then
- // call `CreateVolume` with the operation UUID as the name (so that
+ // call `createVolume` with the operation UUID as the name (so that
// the same volume will be returned when recovering from a failover).
//
- // For 2, there are two scenarios:
+ // For 2, the target profile will be specified, so we first check if the
+ // profile is mount or block capable. Then, there are two scenarios:
// a. If the volume has a checkpointed state (because it was created
- // by a previous resource provider), we simply check if its
- // checkpointed capability supports the conversion.
+ // by a previous resource provider), we simply check if its checkpointed
+ // capability and parameters match the profile.
// b. If the volume is newly discovered, `ValidateVolumeCapabilities`
- // is called with a default mount or block capability.
+ // is called with the capability of the profile.
CHECK_NE(resource.disk().source().has_profile(),
- resource.disk().source().has_id());
+ resource.disk().source().has_id() && targetProfile.isSome());
- Future<string> created;
-
- switch (type) {
- case Resource::DiskInfo::Source::MOUNT: {
- if (resource.disk().source().has_profile()) {
- // The profile exists since any operation with a stale profile must have
- // been dropped for a mismatched resource version or a reconciliation.
- CHECK(profileInfos.contains(resource.disk().source().profile()))
- << "Profile '" << resource.disk().source().profile() << "' not found";
+ const string profile =
+ targetProfile.getOrElse(resource.disk().source().profile());
- const DiskProfileAdaptor::ProfileInfo& profileInfo =
- profileInfos.at(resource.disk().source().profile());
-
- if (!profileInfo.capability.has_mount()) {
- return Failure(
- "Profile '" + resource.disk().source().profile() +
- "' cannot be used to create a MOUNT disk");
- }
-
- // TODO(chhsiao): Call `CreateVolume` sequentially with other
- // create or delete operations, and send an `UPDATE_STATE` for
- // RAW profiled resources afterward.
- created = createVolume(
- operationUuid.toString(),
- Bytes(resource.scalar().value() * Bytes::MEGABYTES),
- profileInfo);
- } else {
- const string& volumeId = resource.disk().source().id();
-
- if (volumes.contains(volumeId)) {
- if (!volumes.at(volumeId).state.volume_capability().has_mount()) {
- return Failure(
- "Volume '" + volumeId +
- "' cannot be converted to a MOUNT disk");
- }
+ if (!profileInfos.contains(profile)) {
+ return Failure("Profile '" + profile + "' not found");
+ }
- created = volumeId;
- } else {
- // No need to call `ValidateVolumeCapabilities` sequentially
- // since the volume is not used and thus not in `volumes` yet.
- created = validateCapability(
- volumeId,
- resource.disk().source().has_metadata()
- ? resource.disk().source().metadata() : Option<Labels>::none(),
- defaultMountCapability);
- }
+ const DiskProfileAdaptor::ProfileInfo& profileInfo = profileInfos.at(profile);
+ switch (targetType) {
+ case Resource::DiskInfo::Source::MOUNT: {
+ if (!profileInfo.capability.has_mount()) {
+ return Failure(
+ "Profile '" + profile + "' cannot be used to create a MOUNT disk");
}
break;
}
case Resource::DiskInfo::Source::BLOCK: {
- if (resource.disk().source().has_profile()) {
- // The profile exists since any operation with a stale profile must have
- // been dropped for a mismatched resource version or a reconciliation.
- CHECK(profileInfos.contains(resource.disk().source().profile()))
- << "Profile '" << resource.disk().source().profile() << "' not found";
-
- const DiskProfileAdaptor::ProfileInfo& profileInfo =
- profileInfos.at(resource.disk().source().profile());
-
- if (!profileInfo.capability.has_block()) {
- return Failure(
- "Profile '" + resource.disk().source().profile() +
- "' cannot be used to create a BLOCK disk");
- }
-
- // TODO(chhsiao): Call `CreateVolume` sequentially with other
- // create or delete operations, and send an `UPDATE_STATE` for
- // RAW profiled resources afterward.
- created = createVolume(
- operationUuid.toString(),
- Bytes(resource.scalar().value() * Bytes::MEGABYTES),
- profileInfo);
- } else {
- const string& volumeId = resource.disk().source().id();
-
- if (volumes.contains(volumeId)) {
- if (!volumes.at(volumeId).state.volume_capability().has_block()) {
- return Failure(
- "Volume '" + volumeId +
- "' cannot be converted to a BLOCK disk");
- }
-
- created = volumeId;
- } else {
- // No need to call `ValidateVolumeCapabilities` sequentially
- // since the volume is not used and thus not in `volumes` yet.
- created = validateCapability(
- volumeId,
- resource.disk().source().has_metadata()
- ? resource.disk().source().metadata() : Option<Labels>::none(),
- defaultBlockCapability);
- }
+ if (!profileInfo.capability.has_block()) {
+ return Failure(
+ "Profile '" + profile + "' cannot be used to create a BLOCK disk");
}
break;
}
@@ -3218,6 +3138,40 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
}
}
+ Future<string> created;
+ if (resource.disk().source().has_id()) {
+ const string& volumeId = resource.disk().source().id();
+
+ if (volumes.contains(volumeId)) {
+ const VolumeState& volumeState = volumes.at(volumeId).state;
+
+ // TODO(chhsiao): Validate the volume against the parameters of the
+ // profile once they are checkpointed.
+ if (volumeState.volume_capability() != profileInfo.capability) {
+ return Failure(
+ "Profile '" + profile + "' cannot be applied to volume '" +
+ volumeId + "'");
+ }
+
+ created = volumeId;
+ } else {
+ created = validateCapability(
+ volumeId,
+ resource.disk().source().has_metadata()
+ ? resource.disk().source().metadata()
+ : Option<Labels>::none(),
+ profileInfo.capability);
+ }
+ } else {
+ // TODO(chhsiao): Consider calling `CreateVolume` sequentially with other
+ // create or delete operations, and send an `UPDATE_STATE` for storage pools
+ // afterward. See MESOS-9254.
+ created = createVolume(
+ operationUuid.toString(),
+ Bytes(resource.scalar().value() * Bytes::MEGABYTES),
+ profileInfo);
+ }
+
return created
.then(defer(self(), [=](const string& volumeId) {
CHECK(volumes.contains(volumeId));
@@ -3225,7 +3179,8 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_id(volumeId);
- converted.mutable_disk()->mutable_source()->set_type(type);
+ converted.mutable_disk()->mutable_source()->set_type(targetType);
+ converted.mutable_disk()->mutable_source()->set_profile(profile);
if (!volumeState.volume_attributes().empty()) {
converted.mutable_disk()->mutable_source()->mutable_metadata()
@@ -3237,7 +3192,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
info.storage().plugin().type(),
info.storage().plugin().name());
- switch (type) {
+ switch (targetType) {
case Resource::DiskInfo::Source::MOUNT: {
// Set the root path relative to agent work dir.
converted.mutable_disk()->mutable_source()->mutable_mount()
@@ -3270,24 +3225,22 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT ||
resource.disk().source().type() == Resource::DiskInfo::Source::BLOCK);
CHECK(resource.disk().source().has_id());
- CHECK(volumes.contains(resource.disk().source().id()));
+
+ const string& volumeId = resource.disk().source().id();
+ CHECK(volumes.contains(volumeId));
// Sequentialize the deletion with other operation on the same volume.
- return volumes.at(resource.disk().source().id()).sequence->add(
- std::function<Future<Nothing>()>(defer(
- self(),
- &Self::deleteVolume,
- resource.disk().source().id(),
- !resource.disk().source().has_profile())))
- .then(defer(self(), [=]() {
+ return volumes.at(volumeId).sequence->add(std::function<Future<bool>()>(
+ defer(self(), &Self::deleteVolume, volumeId)))
+ .then(defer(self(), [=](bool deprovisioned) {
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_type(
Resource::DiskInfo::Source::RAW);
converted.mutable_disk()->mutable_source()->clear_mount();
- // We only clear the volume ID and metadata if the destroyed volume is not
- // a pre-existing volume.
- if (resource.disk().source().has_profile()) {
+ // We clear the volume ID and metadata if the volume has been
+ // deprovisioned. Otherwise, we clear the profile.
+ if (deprovisioned) {
converted.mutable_disk()->mutable_source()->clear_id();
converted.mutable_disk()->mutable_source()->clear_metadata();
@@ -3317,6 +3270,8 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
defer(self(), &Self::reconcileStoragePools)));
}
}
+ } else {
+ converted.mutable_disk()->mutable_source()->clear_profile();
}
vector<ResourceConversion> conversions;