You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/11/29 18:31:29 UTC

[mesos] 05/14: Implemented the new `CREATE_DISK`/`DESTROY_DISK` semantics in SLRP.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f11de38a492669f38b8df006b25f504ae2bea4d3
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Nov 15 12:25:18 2018 -0800

    Implemented the new `CREATE_DISK`/`DESTROY_DISK` semantics in SLRP.
    
    The default mount/block volume capabilities is removed from SLRP.
    Instead, `CREATE_DISK` will convert a preprovisioned RAW disk to a
    profile disk, and `DESTROY_DISK` will always deprovision a profile disk
    as long as the CSI plugin is capable of deprovisioning volumes.
    
    Review: https://reviews.apache.org/r/69361
---
 src/csi/utils.hpp                          |   8 +
 src/resource_provider/storage/provider.cpp | 235 ++++++++++++-----------------
 2 files changed, 103 insertions(+), 140 deletions(-)

diff --git a/src/csi/utils.hpp b/src/csi/utils.hpp
index 5ce318e..9145c67 100644
--- a/src/csi/utils.hpp
+++ b/src/csi/utils.hpp
@@ -45,6 +45,14 @@ bool operator==(
 bool operator==(const VolumeCapability& left, const VolumeCapability& right);
 
 
+inline bool operator!=(
+    const VolumeCapability& left,
+    const VolumeCapability& right)
+{
+  return !(left == right);
+}
+
+
 std::ostream& operator<<(
     std::ostream& stream,
     const ControllerServiceCapability::RPC::Type& type);
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index c137fa4..ebd5a88 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -402,7 +402,7 @@ private:
       const string& name,
       const Bytes& capacity,
       const DiskProfileAdaptor::ProfileInfo& profileInfo);
-  Future<Nothing> deleteVolume(const string& volumeId, bool preExisting);
+  Future<bool> deleteVolume(const string& volumeId);
   Future<string> validateCapability(
       const string& volumeId,
       const Option<Labels>& metadata,
@@ -420,7 +420,8 @@ private:
   Future<vector<ResourceConversion>> applyCreateDisk(
       const Resource& resource,
       const id::UUID& operationUuid,
-      const Resource::DiskInfo::Source::Type& type);
+      const Resource::DiskInfo::Source::Type& targetType,
+      const Option<string>& targetProfile);
   Future<vector<ResourceConversion>> applyDestroyDisk(
       const Resource& resource);
 
@@ -460,8 +461,6 @@ private:
 
   shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
-  csi::v0::VolumeCapability defaultMountCapability;
-  csi::v0::VolumeCapability defaultBlockCapability;
   string bootId;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
@@ -590,14 +589,6 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
-  // Default mount and block capabilities for pre-existing volumes.
-  defaultMountCapability.mutable_mount();
-  defaultMountCapability.mutable_access_mode()
-    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
-  defaultBlockCapability.mutable_block();
-  defaultBlockCapability.mutable_access_mode()
-    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
-
   Try<string> _bootId = os::bootId();
   if (_bootId.isError()) {
     LOG(ERROR) << "Failed to get boot ID: " << _bootId.error();
@@ -2650,6 +2641,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 
 
 // Returns a CSI volume ID.
+//
 // NOTE: This can only be called after `prepareControllerService`.
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
@@ -2680,9 +2672,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
           const csi::v0::Volume& volume = response.volume();
 
           if (volumes.contains(volume.id())) {
-            // The resource provider failed over after the last
-            // `CreateVolume` call, but before the operation status was
-            // checkpointed.
+            // The resource provider failed over after the last `createVolume`
+            // call, but before the operation status was checkpointed.
             CHECK_EQ(VolumeState::CREATED,
                      volumes.at(volume.id()).state.state());
           } else {
@@ -2702,19 +2693,13 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 }
 
 
+// Returns true if the volume has been deprovisioned.
+//
 // NOTE: This can only be called after `prepareControllerService` and
 // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
-Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
-    const string& volumeId,
-    bool preExisting)
+Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
+    const string& volumeId)
 {
-  // We do not need the capability for pre-existing volumes since no
-  // actual `DeleteVolume` call will be made.
-  if (!preExisting && !controllerCapabilities.createDeleteVolume) {
-    return Failure(
-        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
-  }
-
   CHECK_SOME(controllerContainerId);
 
   const string volumePath = csi::paths::getVolumePath(
@@ -2724,11 +2709,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       volumeId);
 
   if (!volumes.contains(volumeId)) {
-    // The resource provider failed over after the last `DeleteVolume`
-    // call, but before the operation status was checkpointed.
+    // The resource provider failed over after the last `deleteVolume` call, but
+    // before the operation status was checkpointed.
     CHECK(!os::exists(volumePath));
 
-    return Nothing();
+    return controllerCapabilities.createDeleteVolume;
   }
 
   const VolumeData& volume = volumes.at(volumeId);
@@ -2766,7 +2751,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       // state once the above is done.
     }
     case VolumeState::CREATED: {
-      if (!preExisting) {
+      // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
+      // supported. Otherwise, we simply leave it as a preprovisioned volume.
+      if (controllerCapabilities.createDeleteVolume) {
         deleted = deleted
           .then(defer(self(), &Self::getService, controllerContainerId.get()))
           .then(defer(self(), [this, volumeId](csi::v0::Client client) {
@@ -2804,7 +2791,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       volumes.erase(volumeId);
       CHECK_SOME(os::rmdir(volumePath));
 
-      return Nothing();
+      return controllerCapabilities.createDeleteVolume;
     }));
 }
 
@@ -2994,7 +2981,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
       conversions = applyCreateDisk(
           operation.info().create_disk().source(),
           operationUuid,
-          operation.info().create_disk().target_type());
+          operation.info().create_disk().target_type(),
+          operation.info().create_disk().has_target_profile()
+            ? operation.info().create_disk().target_profile()
+            : Option<string>::none());
 
       break;
     }
@@ -3098,120 +3088,50 @@ Future<vector<ResourceConversion>>
 StorageLocalResourceProviderProcess::applyCreateDisk(
     const Resource& resource,
     const id::UUID& operationUuid,
-    const Resource::DiskInfo::Source::Type& type)
+    const Resource::DiskInfo::Source::Type& targetType,
+    const Option<string>& targetProfile)
 {
   CHECK_EQ(Resource::DiskInfo::Source::RAW, resource.disk().source().type());
 
-  // NOTE: Currently we only support two type of RAW disk resources:
+  // NOTE: Currently we only support two types of RAW disk resources:
   //   1. RAW disk from `GetCapacity` with a profile but no volume ID.
-  //   2. RAW disk from `ListVolumes` for a pre-existing volume, which
-  //      has a volume ID but no profile.
+  //   2. RAW disk from `ListVolumes` for a preprovisioned volume, which has a
+  //      volume ID but no profile.
   //
   // For 1, we check if its profile is mount or block capable, then
-  // call `CreateVolume` with the operation UUID as the name (so that
+  // call `createVolume` with the operation UUID as the name (so that
   // the same volume will be returned when recovering from a failover).
   //
-  // For 2, there are two scenarios:
+  // For 2, the target profile will be specified, so we first check if the
+  // profile is mount or block capable. Then, there are two scenarios:
   //   a. If the volume has a checkpointed state (because it was created
-  //      by a previous resource provider), we simply check if its
-  //      checkpointed capability supports the conversion.
+  //      by a previous resource provider), we simply check if its checkpointed
+  //      capability and parameters match the profile.
   //   b. If the volume is newly discovered, `ValidateVolumeCapabilities`
-  //      is called with a default mount or block capability.
+  //      is called with the capability of the profile.
   CHECK_NE(resource.disk().source().has_profile(),
-           resource.disk().source().has_id());
+           resource.disk().source().has_id() && targetProfile.isSome());
 
-  Future<string> created;
-
-  switch (type) {
-    case Resource::DiskInfo::Source::MOUNT: {
-      if (resource.disk().source().has_profile()) {
-        // The profile exists since any operation with a stale profile must have
-        // been dropped for a mismatched resource version or a reconciliation.
-        CHECK(profileInfos.contains(resource.disk().source().profile()))
-          << "Profile '" << resource.disk().source().profile() << "' not found";
+  const string profile =
+    targetProfile.getOrElse(resource.disk().source().profile());
 
-        const DiskProfileAdaptor::ProfileInfo& profileInfo =
-          profileInfos.at(resource.disk().source().profile());
-
-        if (!profileInfo.capability.has_mount()) {
-          return Failure(
-              "Profile '" + resource.disk().source().profile() +
-              "' cannot be used to create a MOUNT disk");
-        }
-
-        // TODO(chhsiao): Call `CreateVolume` sequentially with other
-        // create or delete operations, and send an `UPDATE_STATE` for
-        // RAW profiled resources afterward.
-        created = createVolume(
-            operationUuid.toString(),
-            Bytes(resource.scalar().value() * Bytes::MEGABYTES),
-            profileInfo);
-      } else {
-        const string& volumeId = resource.disk().source().id();
-
-        if (volumes.contains(volumeId)) {
-          if (!volumes.at(volumeId).state.volume_capability().has_mount()) {
-            return Failure(
-                "Volume '" + volumeId +
-                "' cannot be converted to a MOUNT disk");
-          }
+  if (!profileInfos.contains(profile)) {
+    return Failure("Profile '" + profile + "' not found");
+  }
 
-          created = volumeId;
-        } else {
-          // No need to call `ValidateVolumeCapabilities` sequentially
-          // since the volume is not used and thus not in `volumes` yet.
-          created = validateCapability(
-              volumeId,
-              resource.disk().source().has_metadata()
-                ? resource.disk().source().metadata() : Option<Labels>::none(),
-              defaultMountCapability);
-        }
+  const DiskProfileAdaptor::ProfileInfo& profileInfo = profileInfos.at(profile);
+  switch (targetType) {
+    case Resource::DiskInfo::Source::MOUNT: {
+      if (!profileInfo.capability.has_mount()) {
+        return Failure(
+            "Profile '" + profile + "' cannot be used to create a MOUNT disk");
       }
       break;
     }
     case Resource::DiskInfo::Source::BLOCK: {
-      if (resource.disk().source().has_profile()) {
-        // The profile exists since any operation with a stale profile must have
-        // been dropped for a mismatched resource version or a reconciliation.
-        CHECK(profileInfos.contains(resource.disk().source().profile()))
-          << "Profile '" << resource.disk().source().profile() << "' not found";
-
-        const DiskProfileAdaptor::ProfileInfo& profileInfo =
-          profileInfos.at(resource.disk().source().profile());
-
-        if (!profileInfo.capability.has_block()) {
-          return Failure(
-              "Profile '" + resource.disk().source().profile() +
-              "' cannot be used to create a BLOCK disk");
-        }
-
-        // TODO(chhsiao): Call `CreateVolume` sequentially with other
-        // create or delete operations, and send an `UPDATE_STATE` for
-        // RAW profiled resources afterward.
-        created = createVolume(
-            operationUuid.toString(),
-            Bytes(resource.scalar().value() * Bytes::MEGABYTES),
-            profileInfo);
-      } else {
-        const string& volumeId = resource.disk().source().id();
-
-        if (volumes.contains(volumeId)) {
-          if (!volumes.at(volumeId).state.volume_capability().has_block()) {
-            return Failure(
-                "Volume '" + volumeId +
-                "' cannot be converted to a BLOCK disk");
-          }
-
-          created = volumeId;
-        } else {
-          // No need to call `ValidateVolumeCapabilities` sequentially
-          // since the volume is not used and thus not in `volumes` yet.
-          created = validateCapability(
-              volumeId,
-              resource.disk().source().has_metadata()
-                ? resource.disk().source().metadata() : Option<Labels>::none(),
-              defaultBlockCapability);
-        }
+      if (!profileInfo.capability.has_block()) {
+        return Failure(
+            "Profile '" + profile + "' cannot be used to create a BLOCK disk");
       }
       break;
     }
@@ -3222,6 +3142,40 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
     }
   }
 
+  Future<string> created;
+  if (resource.disk().source().has_id()) {
+    const string& volumeId = resource.disk().source().id();
+
+    if (volumes.contains(volumeId)) {
+      const VolumeState& volumeState = volumes.at(volumeId).state;
+
+      // TODO(chhsiao): Validate the volume against the parameters of the
+      // profile once they are checkpointed.
+      if (volumeState.volume_capability() != profileInfo.capability) {
+        return Failure(
+            "Profile '" + profile + "' cannot be applied to volume '" +
+            volumeId + "'");
+      }
+
+      created = volumeId;
+    } else {
+      created = validateCapability(
+          volumeId,
+          resource.disk().source().has_metadata()
+            ? resource.disk().source().metadata()
+            : Option<Labels>::none(),
+          profileInfo.capability);
+    }
+  } else {
+    // TODO(chhsiao): Consider calling `CreateVolume` sequentially with other
+    // create or delete operations, and send an `UPDATE_STATE` for storage pools
+    // afterward. See MESOS-9254.
+    created = createVolume(
+        operationUuid.toString(),
+        Bytes(resource.scalar().value() * Bytes::MEGABYTES),
+        profileInfo);
+  }
+
   return created
     .then(defer(self(), [=](const string& volumeId) {
       CHECK(volumes.contains(volumeId));
@@ -3229,7 +3183,8 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
 
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_id(volumeId);
-      converted.mutable_disk()->mutable_source()->set_type(type);
+      converted.mutable_disk()->mutable_source()->set_type(targetType);
+      converted.mutable_disk()->mutable_source()->set_profile(profile);
 
       if (!volumeState.volume_attributes().empty()) {
         converted.mutable_disk()->mutable_source()->mutable_metadata()
@@ -3241,7 +3196,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
           info.storage().plugin().type(),
           info.storage().plugin().name());
 
-      switch (type) {
+      switch (targetType) {
         case Resource::DiskInfo::Source::MOUNT: {
           // Set the root path relative to agent work dir.
           converted.mutable_disk()->mutable_source()->mutable_mount()
@@ -3274,24 +3229,22 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
   CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT ||
         resource.disk().source().type() == Resource::DiskInfo::Source::BLOCK);
   CHECK(resource.disk().source().has_id());
-  CHECK(volumes.contains(resource.disk().source().id()));
+
+  const string& volumeId = resource.disk().source().id();
+  CHECK(volumes.contains(volumeId));
 
   // Sequentialize the deletion with other operation on the same volume.
-  return volumes.at(resource.disk().source().id()).sequence->add(
-      std::function<Future<Nothing>()>(defer(
-          self(),
-          &Self::deleteVolume,
-          resource.disk().source().id(),
-          !resource.disk().source().has_profile())))
-    .then(defer(self(), [=]() {
+  return volumes.at(volumeId).sequence->add(std::function<Future<bool>()>(
+      defer(self(), &Self::deleteVolume, volumeId)))
+    .then(defer(self(), [=](bool deprovisioned) {
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_type(
           Resource::DiskInfo::Source::RAW);
       converted.mutable_disk()->mutable_source()->clear_mount();
 
-      // We only clear the volume ID and metadata if the destroyed volume is not
-      // a pre-existing volume.
-      if (resource.disk().source().has_profile()) {
+      // We clear the volume ID and metadata if the volume has been
+      // deprovisioned. Otherwise, we clear the profile.
+      if (deprovisioned) {
         converted.mutable_disk()->mutable_source()->clear_id();
         converted.mutable_disk()->mutable_source()->clear_metadata();
 
@@ -3321,6 +3274,8 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
                 defer(self(), &Self::reconcileStoragePools)));
           }
         }
+      } else {
+        converted.mutable_disk()->mutable_source()->clear_profile();
       }
 
       vector<ResourceConversion> conversions;