You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:04 UTC

[mesos] 07/15: Cleanup volume creation, validation and deletion for SLRP.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 75cae10dbca0e3d4146ed1a75d939b1e29b194d7
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:23:51 2019 -0700

    Cleanup volume creation, validation and deletion for SLRP.
    
    This patch introduces methods for volume creation, validation and
    deletion that conform to `VolumeManager`'s public interface in SLRP, and
    cleans up SLRP based on these functions. They will be moved out from
    SLRP to v0 `VolumeManager` later.
    
    Specifically, volume deletion now supports deleting untracked volumes.
    
    Review: https://reviews.apache.org/r/70217/
---
 src/resource_provider/storage/provider.cpp         | 350 +++++++++++----------
 src/resource_provider/storage/provider_process.hpp |  22 +-
 2 files changed, 200 insertions(+), 172 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 84e5557..5a63e7b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -81,6 +81,7 @@
 #include "csi/service_manager.hpp"
 #include "csi/state.hpp"
 #include "csi/utils.hpp"
+#include "csi/volume_manager.hpp"
 
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
@@ -109,6 +110,8 @@ using std::shared_ptr;
 using std::string;
 using std::vector;
 
+using google::protobuf::Map;
+
 using process::after;
 using process::await;
 using process::Break;
@@ -1965,45 +1968,51 @@ Future<Nothing> StorageLocalResourceProviderProcess::__unpublishVolume(
 }
 
 
-Future<string> StorageLocalResourceProviderProcess::createVolume(
+Future<VolumeInfo> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
-    const DiskProfileAdaptor::ProfileInfo& profileInfo)
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
 {
   if (!controllerCapabilities->createDeleteVolume) {
     return Failure(
-        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
+        "CREATE_DELETE_VOLUME controller capability is not supported for CSI "
+        "plugin type '" + info.type() + "' and name '" + info.name());
   }
 
-  csi::v0::CreateVolumeRequest request;
+  LOG(INFO) << "Creating volume with name '" << name << "'";
+
+  CreateVolumeRequest request;
   request.set_name(name);
   request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
   request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
-  *request.add_volume_capabilities() = csi::v0::evolve(profileInfo.capability);
-  *request.mutable_parameters() = profileInfo.parameters;
+  *request.add_volume_capabilities() = csi::v0::evolve(capability);
+  *request.mutable_parameters() = parameters;
 
-  return call<csi::v0::CREATE_VOLUME>(
-      csi::CONTROLLER_SERVICE, std::move(request), true) // Retry.
-    .then(defer(self(), [=](
-        const csi::v0::CreateVolumeResponse& response) -> string {
-      const csi::v0::Volume& volume = response.volume();
+  // We retry the `CreateVolume` call for MESOS-9517.
+  return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+    .then(process::defer(self(), [=](
+        const CreateVolumeResponse& response) -> Future<VolumeInfo> {
+      const string& volumeId = response.volume().id();
 
-      if (volumes.contains(volume.id())) {
-        // The resource provider failed over after the last `createVolume` call,
-        // but before the operation status was checkpointed.
-        CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state());
-      } else {
-        VolumeState volumeState;
-        volumeState.set_state(VolumeState::CREATED);
-        *volumeState.mutable_volume_capability() = profileInfo.capability;
-        *volumeState.mutable_parameters() = profileInfo.parameters;
-        *volumeState.mutable_volume_attributes() = volume.attributes();
-
-        volumes.put(volume.id(), std::move(volumeState));
-        checkpointVolumeState(volume.id());
+      // NOTE: If the volume is already tracked, there might already be
+      // operations running in its sequence. Since this continuation runs
+      // outside the sequence, we fail the call here to avoid any race issue.
+      // This also means that this call is not idempotent.
+      if (volumes.contains(volumeId)) {
+        return Failure("Volume with name '" + name + "' already exists");
       }
 
-      return volume.id();
+      VolumeState volumeState;
+      volumeState.set_state(VolumeState::CREATED);
+      *volumeState.mutable_volume_capability() = capability;
+      *volumeState.mutable_parameters() = parameters;
+      *volumeState.mutable_volume_attributes() = response.volume().attributes();
+
+      volumes.put(volumeId, std::move(volumeState));
+      checkpointVolumeState(volumeId);
+
+      return VolumeInfo{capacity, volumeId, response.volume().attributes()};
     }));
 }
 
@@ -2011,145 +2020,162 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId)
 {
-  Future<Nothing> deleted = Nothing();
-
-  // If the volume has a checkpointed state, we transition it to `CREATED` state
-  // before deleting. Otherwise, we have to delete it directly because we have
-  // no idea what state this volume is in.
-  if (volumes.contains(volumeId)) {
-    VolumeData& volume = volumes.at(volumeId);
-
-    if (volume.state.node_publish_required()) {
-      CHECK_EQ(VolumeState::PUBLISHED, volume.state.state());
-
-      const string targetPath = csi::paths::getMountTargetPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
-
-      // NOTE: Normally the volume should have been cleaned up before
-      // `deleteVolume` is called. However this may not be true for
-      // preprovisioned volumes (e.g., leftover from a previous resource
-      // provider instance). To prevent data leakage in such cases, we clean up
-      // the data (but not the target path) here.
-      Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
-      if (rmdir.isError()) {
-        return Failure(
-            "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
-      }
+  if (!volumes.contains(volumeId)) {
+    return __deleteVolume(volumeId);
+  }
 
-      volume.state.set_node_publish_required(false);
-      checkpointVolumeState(volumeId);
+  VolumeData& volume = volumes.at(volumeId);
+
+  LOG(INFO) << "Deleting volume '" << volumeId << "' in "
+            << volume.state.state() << " state";
+
+  // Volume deletion is sequentialized with other operations on the same volume
+  // to avoid races.
+  return volume.sequence->add(std::function<Future<bool>()>(
+      process::defer(self(), &Self::_deleteVolume, volumeId)));
+}
+
+
+Future<bool> StorageLocalResourceProviderProcess::_deleteVolume(
+    const std::string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeState& volumeState = volumes.at(volumeId).state;
+
+  if (volumeState.node_publish_required()) {
+    CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
+
+    const string targetPath = paths::getMountTargetPath(
+        paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
+        volumeId);
+
+    // NOTE: Normally the volume should have been cleaned up. However this may
+    // not be true for preprovisioned volumes (e.g., leftover from a previous
+    // resource provider instance). To prevent data leakage in such cases, we
+    // clean up the data (but not the target path) here.
+    Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
+    if (rmdir.isError()) {
+      return Failure(
+          "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
     }
 
-    deleted = _detachVolume(volumeId);
+    volumeState.set_node_publish_required(false);
+    checkpointVolumeState(volumeId);
   }
 
-  // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
-  // supported. Otherwise, we simply leave it as a preprovisioned volume.
-  if (controllerCapabilities->createDeleteVolume) {
-    deleted = deleted
-      .then(defer(self(), [this, volumeId] {
-        csi::v0::DeleteVolumeRequest request;
-        request.set_volume_id(volumeId);
-
-        return call<csi::v0::DELETE_VOLUME>(
-            csi::CONTROLLER_SERVICE, std::move(request), true) // Retry.
-          .then([] { return Nothing(); });
-      }));
+  if (volumeState.state() != VolumeState::CREATED) {
+    // Retry after transitioning the volume to `CREATED` state.
+    return _detachVolume(volumeId)
+      .then(process::defer(self(), &Self::_deleteVolume, volumeId));
   }
 
-  // NOTE: The last asynchronous continuation of `deleteVolume`, which is
-  // supposed to be run in the volume's sequence if it exists, would cause the
-  // sequence to be destructed, which would in turn discard the returned future.
-  // However, since the continuation would have already been run, the returned
-  // future will become ready.
-  return deleted
-    .then(defer(self(), [this, volumeId] {
-      if (volumes.contains(volumeId)) {
-        volumes.erase(volumeId);
+  // NOTE: The last asynchronous continuation, which is supposed to be run in
+  // the volume's sequence, would cause the sequence to be destructed, which
+  // would in turn discard the returned future. However, since the continuation
+  // would have already been run, the returned future will become ready, making
+  // the future returned by the sequence ready as well.
+  return __deleteVolume(volumeId)
+    .then(process::defer(self(), [this, volumeId](bool deleted) {
+      volumes.erase(volumeId);
 
-        const string volumePath = csi::paths::getVolumePath(
-            slave::paths::getCsiRootDir(workDir),
-            info.storage().plugin().type(),
-            info.storage().plugin().name(),
-            volumeId);
+      const string volumePath = paths::getVolumePath(
+          rootDir, pluginInfo.type(), pluginInfo.name(), volumeId);
 
-        Try<Nothing> rmdir = os::rmdir(volumePath);
-        CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
-                          << volumePath << "': " << rmdir.error();
+      Try<Nothing> rmdir = os::rmdir(volumePath);
+      CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
+                        << volumePath << "': " << rmdir.error();
 
-        garbageCollectMountPath(volumeId);
-      }
+      garbageCollectMountPath(volumeId);
 
-      return controllerCapabilities->createDeleteVolume;
+      return deleted;
     }));
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
-    const string& volumeId,
-    const Option<Labels>& metadata,
-    const DiskProfileAdaptor::ProfileInfo& profileInfo)
+Future<bool> StorageLocalResourceProviderProcess::__deleteVolume(
+    const string& volumeId)
+{
+  if (!controllerCapabilities->createDeleteVolume) {
+    return false;
+  }
+
+  LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '"
+            << volumeId << "'";
+
+  DeleteVolumeRequest request;
+  request.set_volume_id(volumeId);
+
+  // We retry the `DeleteVolume` call for MESOS-9517.
+  return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+    .then([] { return true; });
+}
+
+
+Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume(
+    const VolumeInfo& volumeInfo,
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
 {
-  // If the volume has a checkpointed state, the validation succeeds only if the
+  // If the volume has been checkpointed, the validation succeeds only if the
   // capability and parameters of the specified profile are the same as those in
   // the checkpoint.
-  if (volumes.contains(volumeId)) {
-    const VolumeState& volumeState = volumes.at(volumeId).state;
+  if (volumes.contains(volumeInfo.id)) {
+    const VolumeState& volumeState = volumes.at(volumeInfo.id).state;
 
-    if (volumeState.volume_capability() != profileInfo.capability) {
-      return Failure("Invalid volume capability for volume '" + volumeId + "'");
+    if (volumeState.volume_capability() != capability) {
+      return Some(
+          Error("Mismatched capability for volume '" + volumeInfo.id + "'"));
     }
 
-    if (volumeState.parameters() != profileInfo.parameters) {
-      return Failure("Invalid parameters for volume '" + volumeId + "'");
+    if (volumeState.parameters() != parameters) {
+      return Some(
+          Error("Mismatched parameters for volume '" + volumeInfo.id + "'"));
     }
 
-    return Nothing();
+    return None();
   }
 
-  if (!pluginCapabilities->controllerService) {
-    return Failure(
-        "Plugin capability 'CONTROLLER_SERVICE' is not supported");
+  if (!parameters.empty()) {
+    LOG(WARNING)
+      << "Validating volumes against parameters is not supported in CSI v0";
   }
 
-  google::protobuf::Map<string, string> volumeAttributes;
+  LOG(INFO) << "Validating volume '" << volumeInfo.id << "'";
 
-  if (metadata.isSome()) {
-    volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
-  }
+  ValidateVolumeCapabilitiesRequest request;
+  request.set_volume_id(volumeInfo.id);
+  *request.add_volume_capabilities() = csi::v0::evolve(capability);
+  *request.mutable_volume_attributes() = volumeInfo.context;
 
-  // TODO(chhsiao): Validate the volume against the parameters of the profile
-  // once we get CSI v1.
-  csi::v0::ValidateVolumeCapabilitiesRequest request;
-  request.set_volume_id(volumeId);
-  *request.add_volume_capabilities() = csi::v0::evolve(profileInfo.capability);
-  *request.mutable_volume_attributes() = volumeAttributes;
-
-  return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
-      csi::CONTROLLER_SERVICE, std::move(request))
-    .then(defer(self(), [=](
-        const csi::v0::ValidateVolumeCapabilitiesResponse& response)
-        -> Future<Nothing> {
+  return call<VALIDATE_VOLUME_CAPABILITIES>(
+      CONTROLLER_SERVICE, std::move(request))
+    .then(process::defer(self(), [=](
+        const ValidateVolumeCapabilitiesResponse& response)
+        -> Future<Option<Error>> {
       if (!response.supported()) {
-        return Failure(
-            "Unsupported volume capability for volume '" + volumeId + "': " +
-            response.message());
+        return Error(
+            "Unsupported volume capability for volume '" + volumeInfo.id +
+            "': " + response.message());
+      }
+
+      // NOTE: If the volume is already tracked, there might already be
+      // operations running in its sequence. Since this continuation runs
+      // outside the sequence, we fail the call here to avoid any race issue.
+      // This also means that this call is not idempotent.
+      if (volumes.contains(volumeInfo.id)) {
+        return Failure("Volume '" + volumeInfo.id + "' already validated");
       }
 
       VolumeState volumeState;
       volumeState.set_state(VolumeState::CREATED);
-      *volumeState.mutable_volume_capability() = profileInfo.capability;
-      *volumeState.mutable_parameters() = profileInfo.parameters;
-      *volumeState.mutable_volume_attributes() = volumeAttributes;
+      *volumeState.mutable_volume_capability() = capability;
+      *volumeState.mutable_parameters() = parameters;
+      *volumeState.mutable_volume_attributes() = volumeInfo.context;
 
-      volumes.put(volumeId, std::move(volumeState));
-      checkpointVolumeState(volumeId);
+      volumes.put(volumeInfo.id, std::move(volumeState));
+      checkpointVolumeState(volumeInfo.id);
 
-      return Nothing();
+      return None();
     }));
 }
 
@@ -2471,32 +2497,45 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
   // TODO(chhsiao): Consider calling `createVolume` sequentially with other
   // create or delete operations, and send an `UPDATE_STATE` for storage pools
   // afterward. See MESOS-9254.
-  Future<string> created = resource.disk().source().has_profile()
-    ? createVolume(
-          operationUuid.toString(),
-          Bytes(resource.scalar().value() * Bytes::MEGABYTES),
-          profileInfo)
-    : validateVolume(
-          resource.disk().source().id(),
-          resource.disk().source().has_metadata()
-            ? resource.disk().source().metadata()
-            : Option<Labels>::none(),
-          profileInfo)
-        .then([=]() -> string { return resource.disk().source().id(); });
+  Future<VolumeInfo> created;
+  if (resource.disk().source().has_profile()) {
+    created = createVolume(
+        operationUuid.toString(),
+        resource.scalar().value() * Bytes::MEGABYTES,
+        profileInfo.capability,
+        profileInfo.parameters);
+  } else {
+    VolumeInfo volumeInfo = {
+      resource.scalar().value() * Bytes::MEGABYTES,
+      resource.disk().source().id(),
+      CHECK_NOTERROR(convertLabelsToStringMap(
+          resource.disk().source().metadata()))
+    };
 
-  return created
-    .then(defer(self(), [=](const string& volumeId) {
-      CHECK(volumes.contains(volumeId));
-      const VolumeState& volumeState = volumes.at(volumeId).state;
+    created = validateVolume(
+        volumeInfo, profileInfo.capability, profileInfo.parameters)
+      .then([resource, profile, volumeInfo](
+          const Option<Error>& error) -> Future<VolumeInfo> {
+        if (error.isSome()) {
+          return Failure(
+              "Cannot apply profile '" + profile + "' to resource '" +
+              stringify(resource) + "': " + error->message);
+        }
+
+        return volumeInfo;
+      });
+  }
 
+  return created
+    .then(defer(self(), [=](const VolumeInfo& volumeInfo) {
       Resource converted = resource;
-      converted.mutable_disk()->mutable_source()->set_id(volumeId);
+      converted.mutable_disk()->mutable_source()->set_id(volumeInfo.id);
       converted.mutable_disk()->mutable_source()->set_type(targetType);
       converted.mutable_disk()->mutable_source()->set_profile(profile);
 
-      if (!volumeState.volume_attributes().empty()) {
-        converted.mutable_disk()->mutable_source()->mutable_metadata()
-          ->CopyFrom(convertStringMapToLabels(volumeState.volume_attributes()));
+      if (!volumeInfo.context.empty()) {
+        *converted.mutable_disk()->mutable_source()->mutable_metadata() =
+          convertStringMapToLabels(volumeInfo.context);
       }
 
       const string mountRootDir = csi::paths::getMountRootDir(
@@ -2507,8 +2546,8 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
       switch (targetType) {
         case Resource::DiskInfo::Source::MOUNT: {
           // Set the root path relative to agent work dir.
-          converted.mutable_disk()->mutable_source()->mutable_mount()
-            ->set_root(mountRootDir);
+          converted.mutable_disk()->mutable_source()->mutable_mount()->set_root(
+              mountRootDir);
 
           break;
         }
@@ -2537,16 +2576,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
   CHECK(!Resources::isPersistentVolume(resource));
   CHECK(resource.disk().source().has_id());
 
-  const string& volumeId = resource.disk().source().id();
-
-  Future<bool> deleted =
-    volumes.contains(volumeId)
-      ? volumes.at(volumeId).sequence->add(std::function<Future<bool>()>(
-            defer(self(), &Self::deleteVolume, volumeId)))
-      : deleteVolume(volumeId);
-
-  // Sequentialize the deletion with other operation on the same volume.
-  return deleted
+  return deleteVolume(resource.disk().source().id())
     .then(defer(self(), [=](bool deprovisioned) {
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_type(
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
index 0b73531..b298a8e 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -56,6 +56,7 @@
 #include "csi/service_manager.hpp"
 #include "csi/state.hpp"
 #include "csi/utils.hpp"
+#include "csi/volume_manager.hpp"
 
 #include "status_update_manager/operation.hpp"
 
@@ -229,26 +230,23 @@ private:
   // Transition a volume to `VOL_READY` state from any state below.
   process::Future<Nothing> __unpublishVolume(const std::string& volumeId);
 
-  // Returns a CSI volume ID.
-  //
   // NOTE: This can only be called after `prepareServices`.
-  process::Future<std::string> createVolume(
+  process::Future<csi::VolumeInfo> createVolume(
       const std::string& name,
       const Bytes& capacity,
-      const DiskProfileAdaptor::ProfileInfo& profileInfo);
+      const csi::types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters);
 
-  // Returns true if the volume has been deprovisioned.
-  //
   // NOTE: This can only be called after `prepareServices`.
   process::Future<bool> deleteVolume(const std::string& volumeId);
+  process::Future<bool> _deleteVolume(const std::string& volumeId);
+  process::Future<bool> __deleteVolume(const std::string& volumeId);
 
-  // Validates if a volume supports the capability of the specified profile.
-  //
   // NOTE: This can only be called after `prepareServices`.
-  process::Future<Nothing> validateVolume(
-      const std::string& volumeId,
-      const Option<Labels>& metadata,
-      const DiskProfileAdaptor::ProfileInfo& profileInfo);
+  process::Future<Option<Error>> validateVolume(
+      const csi::VolumeInfo& volumeInfo,
+      const csi::types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters);
 
   // NOTE: This can only be called after `prepareServices`.
   process::Future<Resources> listVolumes();