You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/14 05:03:19 UTC

[4/7] mesos git commit: Supporting pre-existing volumes with no profile in SLRP.

Supporting pre-existing volumes with no profile in SLRP.

The storage local resource provider will now report two catagories of
RAW resources: resources that represents storage pools with profiles,
and resources that represents pre-existing volumes with IDs but no
profile. When applying `CREATE_VOLUME` or `CREATE_BLOCK` on pre-existing
volumes, we issue a `ValicateVolumeCapabilities` CSI call with the
default Mount or Block capabilities and convert the RAW resources into
MOUNT, PATH or BLOCK resources. When applying `DESTROY_VOLUME` or
`DESTROY_BLOCK` on these resources, they will be converted back to RAW
resources with IDs, but the pre-existing volumes won't be deleted.

Review: https://reviews.apache.org/r/64583/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/673b421d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/673b421d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/673b421d

Branch: refs/heads/master
Commit: 673b421d414f2c8855ffb7ca47e0dc23e52af3ce
Parents: dbd4596
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Dec 13 20:46:56 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 13 20:46:56 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 431 ++++++++++++++----------
 1 file changed, 257 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/673b421d/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 03a12c7..e239317 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -351,22 +351,22 @@ private:
 
   Future<Nothing> prepareControllerService();
   Future<Nothing> prepareNodeService();
-  Future<Resources> importResources();
+  Future<Resources> discoverResources();
   Future<Nothing> controllerPublish(const string& volumeId);
   Future<Nothing> controllerUnpublish(const string& volumeId);
   Future<Nothing> nodePublish(const string& volumeId);
   Future<Nothing> nodeUnpublish(const string& volumeId);
-
-  // Returns a CSI volume ID.
   Future<string> createVolume(
       const string& name,
       const Bytes& capacity,
       const ProfileData& profile);
-  Future<Nothing> deleteVolume(const string& volumeId);
+  Future<Nothing> deleteVolume(const string& volumeId, bool preExisting);
+  Future<string> validateCapability(
+      const string& volumeId,
+      const Option<Labels>& metadata,
+      const csi::VolumeCapability& capability);
+  Future<Resources> getCapacities(const hashmap<string, ProfileData>& profiles);
 
-  // Applies the offer operation. Conventional operations will be
-  // synchronously applied. Do nothing if the operation is already in a
-  // terminal state.
   Future<Nothing> _applyOfferOperation(const UUID& operationUuid);
 
   Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
@@ -376,9 +376,6 @@ private:
   Future<vector<ResourceConversion>> applyDestroyVolumeOrBlock(
       const Resource& resource);
 
-  // Synchronously updates `totalResources` and the offer operation
-  // status and then asks the status update manager to send status
-  // updates.
   Try<Nothing> updateOfferOperationStatus(
       const UUID& operationUuid,
       const Try<vector<ResourceConversion>>& conversions);
@@ -412,6 +409,8 @@ private:
   const bool strict;
 
   csi::Version csiVersion;
+  csi::VolumeCapability defaultMountCapability;
+  csi::VolumeCapability defaultBlockCapability;
   string bootId;
   hashmap<string, ProfileData> profiles;
   process::grpc::client::Runtime runtime;
@@ -437,6 +436,9 @@ private:
   LinkedHashMap<UUID, OfferOperation> offerOperations;
   Resources totalResources;
   UUID resourceVersion;
+
+  // We maintain the state of a CSI volume if and only if its
+  // corresponding resource is not RAW.
   hashmap<string, VolumeData> volumes;
 };
 
@@ -506,6 +508,14 @@ void StorageLocalResourceProviderProcess::initialize()
   csiVersion.set_minor(1);
   csiVersion.set_patch(0);
 
+  // Default mount and block capabilities for pre-existing volumes.
+  defaultMountCapability.mutable_mount();
+  defaultMountCapability.mutable_access_mode()
+    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+  defaultBlockCapability.mutable_block();
+  defaultBlockCapability.mutable_access_mode()
+    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+
   Try<string> _bootId = os::bootId();
   if (_bootId.isError()) {
     return fatal("Failed to get boot ID", _bootId.error());
@@ -537,10 +547,8 @@ void StorageLocalResourceProviderProcess::initialize()
     }
   }
 
-  // NOTE: The name of the default profile is an empty string, which is
-  // the default value for `Resource.disk.source.profile` when unset.
   // TODO(chhsiao): Use the volume profile module.
-  ProfileData& defaultProfile = profiles[""];
+  ProfileData& defaultProfile = profiles["default"];
   defaultProfile.capability.mutable_mount();
   defaultProfile.capability.mutable_access_mode()
     ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
@@ -999,17 +1007,17 @@ Future<Nothing>
 StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return recoverStatusUpdates()
-    .then(defer(self(), &Self::importResources))
-    .then(defer(self(), [=](Resources importedResources) {
+    .then(defer(self(), &Self::discoverResources))
+    .then(defer(self(), [=](Resources discoveredResources) {
       // NODE: If a resource in the checkpointed total resources is
-      // missing in the imported resources, we will still keep it if it
-      // is converted by an offer operation before (i.e., has extra info
-      // other than the default reservations). The reason is that we
-      // want to maintain a consistent view with frameworks, and do not
-      // want to lose any data on persistent volumes due to some
+      // missing in the discovered resources, we will still keep it if
+      // it is converted by an offer operation before (i.e., has extra
+      // info other than the default reservations). The reason is that
+      // we want to maintain a consistent view with frameworks, and do
+      // not want to lose any data on persistent volumes due to some
       // temporarily CSI plugin faults. Other missing resources that are
       // "unconverted" by any framework will be removed from the total
-      // resources. Then, any new imported resource will be reported
+      // resources. Then, any newly discovered resource will be reported
       // under the default reservations.
 
       Resources result;
@@ -1025,8 +1033,8 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState()
               ? resource.disk().source().id() : Option<string>::none(),
             resource.disk().source().has_metadata()
               ? resource.disk().source().metadata() : Option<Labels>::none());
-        if (importedResources.contains(unconverted)) {
-          // The checkponited resource appears in the imported resources.
+        if (discoveredResources.contains(unconverted)) {
+          // The checkponited resource appears in the discovered resources.
           result += resource;
           unconvertedTotal += unconverted;
         } else if (!totalResources.contains(unconverted)) {
@@ -1040,30 +1048,15 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState()
         }
       }
 
-      foreach (Resource resource, importedResources - unconvertedTotal) {
-        if (resource.disk().source().has_id() &&
-            !volumes.contains(resource.disk().source().id())) {
-          csi::state::VolumeState volumeState;
-          volumeState.set_state(csi::state::VolumeState::CREATED);
-
-          // The default profile is used if `profile` is unset.
-          volumeState.mutable_volume_capability()->CopyFrom(
-              profiles.at(resource.disk().source().profile()).capability);
+      // NOTE: The states of newly discovered pre-existing volumes will
+      // be added to `volumes` when `CREATE_VOLUME` or `CREATE_BLOCK`
+      // operations are applied.
+      const Resources newResources = discoveredResources - unconvertedTotal;
+      result += newResources;
 
-          if (resource.disk().source().has_metadata()) {
-            volumeState.mutable_volume_attributes()->swap(
-                convertLabelsToStringMap(
-                    resource.disk().source().metadata()).get());
-          }
+      LOG(INFO) << "Adding new resources '" << newResources << "'";
 
-          volumes.put(resource.disk().source().id(), std::move(volumeState));
-          checkpointVolumeState(resource.disk().source().id());
-        }
-
-        result += resource;
-
-        LOG(INFO) << "Adding new resource '" << resource << "'";
-      }
+      // TODO(chhsiao): Check that all profiles exist.
 
       if (result != totalResources) {
         totalResources = result;
@@ -1754,17 +1747,18 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 
 // Returns resources reported by the CSI plugin, which are unreserved
 // raw disk resources without any persistent volume.
-Future<Resources> StorageLocalResourceProviderProcess::importResources()
+Future<Resources> StorageLocalResourceProviderProcess::discoverResources()
 {
   // NOTE: This can only be called after `prepareControllerService` and
   // the resource provider ID has been obtained.
   CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
-  Future<Resources> preprovisioned;
+  list<Future<Resources>> futures;
+  futures.push_back(getCapacities(profiles));
 
   if (controllerCapabilities->listVolumes) {
-    preprovisioned = getService(controllerContainerId)
+    futures.push_back(getService(controllerContainerId)
       .then(defer(self(), [=](csi::Client client) {
         // TODO(chhsiao): Set the max entries and use a loop to do
         // mutliple `ListVolumes` calls.
@@ -1780,8 +1774,9 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources()
             foreach (const Resource& resource, totalResources) {
               if (resource.disk().source().has_id() &&
                   resource.disk().source().has_profile()) {
-                volumesToProfiles[resource.disk().source().id()] =
-                  resource.disk().source().profile();
+                volumesToProfiles.put(
+                    resource.disk().source().id(),
+                    resource.disk().source().profile());
               }
             }
 
@@ -1801,78 +1796,12 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources()
 
             return resources;
           }));
-      }));
-  } else {
-    preprovisioned = Resources();
+      })));
   }
 
-  return preprovisioned
-    .then(defer(self(), [=](const Resources& preprovisioned) {
-      list<Future<Resources>> futures;
-
-      foreach (const Resource& resource, preprovisioned) {
-        futures.push_back(getService(controllerContainerId)
-          .then(defer(self(), [=](csi::Client client) {
-            csi::ValidateVolumeCapabilitiesRequest request;
-            request.mutable_version()->CopyFrom(csiVersion);
-            request.set_volume_id(resource.disk().source().id());
-
-            // The default profile is used if `profile` is unset.
-            request.add_volume_capabilities()->CopyFrom(
-                profiles.at(resource.disk().source().profile()).capability);
-
-            if (resource.disk().source().has_metadata()) {
-              request.mutable_volume_attributes()->swap(
-                  convertLabelsToStringMap(
-                      resource.disk().source().metadata()).get());
-            }
-
-            return client.ValidateVolumeCapabilities(request)
-              .then(defer(self(), [=](
-                  const csi::ValidateVolumeCapabilitiesResponse& response)
-                  -> Future<Resources> {
-                if (!response.supported()) {
-                  return Failure(
-                      "Unsupported volume capability for resource " +
-                      stringify(resource) + ": " + response.message());
-                }
-
-                return resource;
-              }));
-          })));
-      }
-
-      if (controllerCapabilities->getCapacity) {
-        foreachkey (const string& profile, profiles) {
-          futures.push_back(getService(controllerContainerId)
-            .then(defer(self(), [=](csi::Client client) {
-              csi::GetCapacityRequest request;
-              request.mutable_version()->CopyFrom(csiVersion);
-              request.add_volume_capabilities()
-                ->CopyFrom(profiles.at(profile).capability);
-              *request.mutable_parameters() = profiles.at(profile).parameters;
-
-              return client.GetCapacity(request)
-                .then(defer(self(), [=](
-                    const csi::GetCapacityResponse& response)
-                    -> Future<Resources> {
-                  if (response.available_capacity() == 0) {
-                    return Resources();
-                  }
-
-                  return createRawDiskResource(
-                      info,
-                      response.available_capacity(),
-                      profile.empty() ? Option<string>::none() : profile);
-              }));
-            })));
-        }
-      }
-
-      return collect(futures)
-        .then(defer(self(), [=](const list<Resources>& resources) {
-          return accumulate(resources.begin(), resources.end(), Resources());
-        }));
+  return collect(futures)
+    .then(defer(self(), [=](const list<Resources>& resources) {
+      return accumulate(resources.begin(), resources.end(), Resources());
     }));
 }
 
@@ -2131,6 +2060,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 }
 
 
+// Returns a CSI volume ID.
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
@@ -2170,8 +2100,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
             volumeState.set_state(csi::state::VolumeState::CREATED);
             volumeState.mutable_volume_capability()
               ->CopyFrom(profile.capability);
-            *volumeState.mutable_volume_attributes() =
-              volumeInfo.attributes();
+            *volumeState.mutable_volume_attributes() = volumeInfo.attributes();
 
             volumes.put(volumeInfo.id(), std::move(volumeState));
             checkpointVolumeState(volumeInfo.id());
@@ -2184,14 +2113,17 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 
 
 Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
-    const string& volumeId)
+    const string& volumeId,
+    bool preExisting)
 {
   // NOTE: This can only be called after `prepareControllerService` and
   // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
   CHECK_SOME(controllerCapabilities);
   CHECK_SOME(nodeId);
 
-  if (!controllerCapabilities->createDeleteVolume) {
+  // We do not need the capability for pre-existing volumes since no
+  // actual `DeleteVolume` call will be made.
+  if (!preExisting && !controllerCapabilities->createDeleteVolume) {
     return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
@@ -2216,21 +2148,26 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
           .then(defer(self(), &Self::controllerUnpublish, volumeId));
       }
       case csi::state::VolumeState::CREATED: {
+        if (!preExisting) {
+          deleted = deleted
+            .then(defer(self(), &Self::getService, controllerContainerId))
+            .then(defer(self(), [=](csi::Client client) {
+              csi::DeleteVolumeRequest request;
+              request.mutable_version()->CopyFrom(csiVersion);
+              request.set_volume_id(volumeId);
+
+              return client.DeleteVolume(request)
+                .then([] { return Nothing(); });
+            }));
+        }
+
         deleted = deleted
-          .then(defer(self(), &Self::getService, controllerContainerId))
-          .then(defer(self(), [=](csi::Client client) {
-            csi::DeleteVolumeRequest request;
-            request.mutable_version()->CopyFrom(csiVersion);
-            request.set_volume_id(volumeId);
-
-            return client.DeleteVolume(request)
-              .then(defer(self(), [=] {
-                // NOTE: This will destruct the volume's sequence!
-                volumes.erase(volumeId);
-                CHECK_SOME(os::rmdir(volumePath));
-
-                return Nothing();
-              }));
+          .then(defer(self(), [=] {
+            // NOTE: This will destruct the volume's sequence!
+            volumes.erase(volumeId);
+            CHECK_SOME(os::rmdir(volumePath));
+
+            return Nothing();
           }));
         break;
       }
@@ -2257,6 +2194,110 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
+// Validates if a volume has the specified capability. This is called
+// when applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing
+// volume, so we make it returns a volume ID, similar to `createVolume`.
+Future<string> StorageLocalResourceProviderProcess::validateCapability(
+    const string& volumeId,
+    const Option<Labels>& metadata,
+    const csi::VolumeCapability& capability)
+{
+  return getService(controllerContainerId)
+    .then(defer(self(), [=](csi::Client client) {
+      google::protobuf::Map<string, string> volumeAttributes;
+
+      if (metadata.isSome()) {
+        volumeAttributes.swap(convertLabelsToStringMap(metadata.get()).get());
+      }
+
+      csi::ValidateVolumeCapabilitiesRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+      request.set_volume_id(volumeId);
+      request.add_volume_capabilities()->CopyFrom(capability);
+      *request.mutable_volume_attributes() = volumeAttributes;
+
+      return client.ValidateVolumeCapabilities(request)
+        .then(defer(self(), [=](
+            const csi::ValidateVolumeCapabilitiesResponse& response)
+            -> Future<string> {
+          if (!response.supported()) {
+            return Failure(
+                "Unsupported volume capability for volume '" + volumeId +
+                "': " + response.message());
+          }
+
+          if (volumes.contains(volumeId)) {
+            // The resource provider failed over after the last
+            // `ValidateVolumeCapability` call, but before the offer
+            // operation status was checkpointed.
+            CHECK_EQ(csi::state::VolumeState::CREATED,
+                     volumes.at(volumeId).state.state());
+          } else {
+            csi::state::VolumeState volumeState;
+            volumeState.set_state(csi::state::VolumeState::CREATED);
+            volumeState.mutable_volume_capability()->CopyFrom(capability);
+            *volumeState.mutable_volume_attributes() = volumeAttributes;
+
+            volumes.put(volumeId, std::move(volumeState));
+            checkpointVolumeState(volumeId);
+          }
+
+          return volumeId;
+        }));
+    }));
+}
+
+
+// Returns RAW disk resources for specified profiles.
+Future<Resources> StorageLocalResourceProviderProcess::getCapacities(
+    const hashmap<string, ProfileData>& profiles)
+{
+  // NOTE: This can only be called after `prepareControllerService` and
+  // the resource provider ID has been obtained.
+  CHECK_SOME(controllerCapabilities);
+  CHECK(info.has_id());
+
+  // We do not return a failure because this is always called when a
+  // profile is added or a `CreateVolume` CSI call is made.
+  if (!controllerCapabilities->getCapacity) {
+    return Resources();
+  }
+
+  return getService(controllerContainerId)
+    .then(defer(self(), [=](csi::Client client) {
+      list<Future<Resources>> futures;
+
+      foreachpair (const string& profile, const ProfileData& data, profiles) {
+        csi::GetCapacityRequest request;
+        request.mutable_version()->CopyFrom(csiVersion);
+        request.add_volume_capabilities()->CopyFrom(data.capability);
+        *request.mutable_parameters() = data.parameters;
+
+        futures.push_back(client.GetCapacity(request)
+          .then(defer(self(), [=](
+              const csi::GetCapacityResponse& response) -> Resources {
+            if (response.available_capacity() == 0) {
+              return Resources();
+            }
+
+            return createRawDiskResource(
+                info,
+                response.available_capacity(),
+                profile);
+          })));
+      }
+
+      return collect(futures)
+        .then([](const list<Resources>& resources) {
+          return accumulate(resources.begin(), resources.end(), Resources());
+        });
+    }));
+}
+
+
+// Applies the offer operation. Conventional operations will be
+// synchronously applied. Do nothing if the operation is already in a
+// terminal state.
 Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     const UUID& operationUuid)
 {
@@ -2358,27 +2399,78 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
 {
   if (resource.disk().source().type() != Resource::DiskInfo::Source::RAW) {
     return Failure(
-        "Cannot create volume from source of " +
+        "Cannot create volume or block from source of " +
         stringify(resource.disk().source().type()) + " type");
   }
 
+  // NOTE: Currently we only support two type of RAW disk resources:
+  //   1. RAW disk from `GetCapacity` with a profile but no volume ID.
+  //   2. RAW disk from `ListVolumes` for a pre-existing volume, which
+  //      has a volume ID but no profile.
+  //
+  // For 1, we check if its profile is mount or block capable, then
+  // call `CreateVolume` with the operation UUID as the name (so that
+  // the same volume will be returned when recovering from a failover).
+  // For 2, we call `ValidateVolumeCapabilities` with a default mount or
+  // block capability.
+  CHECK_NE(resource.disk().source().has_profile(),
+           resource.disk().source().has_id());
+
+  Future<string> created;
+
   switch (type) {
     case Resource::DiskInfo::Source::PATH:
     case Resource::DiskInfo::Source::MOUNT: {
-      if (!profiles.at(resource.disk().source().profile())
-             .capability.has_mount()) {
-        return Failure(
-            "Profile '" + resource.disk().source().profile() +
-            "' cannot be used for CREATE_VOLUME operation");
+      if (resource.disk().source().has_profile()) {
+        if (!profiles.at(resource.disk().source().profile())
+               .capability.has_mount()) {
+          return Failure(
+              "Profile '" + resource.disk().source().profile() +
+              "' cannot be used for CREATE_VOLUME operation");
+        }
+
+        // TODO(chhsiao): Call `CreateVolume` sequentially with other
+        // create or delete operations, and send an `UPDATE_STATE` for
+        // RAW profiled resources afterward.
+        created = createVolume(
+            operationUuid.toString(),
+            resource.scalar().value(),
+            profiles.at(resource.disk().source().profile()));
+      } else {
+        // No need to call `ValidateVolumeCapabilities` sequentially
+        // since the volume is not used and thus not in `volumes` yet.
+        created = validateCapability(
+            resource.disk().source().id(),
+            resource.disk().source().has_metadata()
+              ? resource.disk().source().metadata() : Option<Labels>::none(),
+            defaultMountCapability);
       }
       break;
     }
     case Resource::DiskInfo::Source::BLOCK: {
-      if (!profiles.at(resource.disk().source().profile())
-             .capability.has_block()) {
-        return Failure(
-            "Profile '" + resource.disk().source().profile() +
-            "' cannot be used for CREATE_BLOCK operation");
+      if (resource.disk().source().has_profile()) {
+        if (!profiles.at(resource.disk().source().profile())
+               .capability.has_block()) {
+          return Failure(
+              "Profile '" + resource.disk().source().profile() +
+              "' cannot be used for CREATE_BLOCK operation");
+        }
+
+        // TODO(chhsiao): Call `CreateVolume` sequentially with other
+        // create or delete operations, and send an `UPDATE_STATE` for
+        // RAW profiled resources afterward.
+        created = createVolume(
+            operationUuid.toString(),
+            resource.scalar().value(),
+            profiles.at(resource.disk().source().profile()));
+      } else {
+        // No need to call `ValidateVolumeCapabilities` sequentially
+        // since the volume is not used and thus not in `volumes` yet.
+        created = validateCapability(
+            resource.disk().source().id(),
+            resource.disk().source().has_metadata()
+              ? resource.disk().source().metadata() : Option<Labels>::none(),
+            defaultBlockCapability);
       }
       break;
     }
@@ -2388,26 +2480,6 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     }
   }
 
-  Future<string> created;
-
-  if (resource.disk().source().has_id()) {
-    // Preprovisioned volumes with RAW type.
-    // TODO(chhsiao): Call `ValidateVolumeCapabilities` sequentially
-    // once we use the profile module and make profile optional.
-    CHECK(volumes.contains(resource.disk().source().id()));
-    created = resource.disk().source().id();
-  } else {
-    // We use the operation UUID as the name of the volume, so the same
-    // offer operation will create the same volume after recovery.
-    // TODO(chhsiao): Call `CreateVolume` sequentially with other create
-    // or delete operations.
-    // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources.
-    created = createVolume(
-        operationUuid.toString(),
-        resource.scalar().value(),
-        profiles.at(resource.disk().source().profile()));
-  }
-
   return created
     .then(defer(self(), [=](const string& volumeId) {
       CHECK(volumes.contains(volumeId));
@@ -2471,7 +2543,7 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
     case Resource::DiskInfo::Source::UNKNOWN:
     case Resource::DiskInfo::Source::RAW: {
       return Failure(
-          "Cannot delete volume of " +
+          "Cannot destroy volume or block of " +
           stringify(resource.disk().source().type()) + " type");
       break;
     }
@@ -2481,18 +2553,27 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
   CHECK(volumes.contains(resource.disk().source().id()));
 
   // Sequentialize the deletion with other operation on the same volume.
+  // NOTE: A resource has no profile iff it is a pre-existing volume.
   return volumes.at(resource.disk().source().id()).sequence->add(
-      std::function<Future<Nothing>()>(
-          defer(self(), &Self::deleteVolume, resource.disk().source().id())))
+      std::function<Future<Nothing>()>(defer(
+          self(),
+          &Self::deleteVolume,
+          resource.disk().source().id(),
+          !resource.disk().source().has_profile())))
     .then(defer(self(), [=]() {
       Resource converted = resource;
-      converted.mutable_disk()->mutable_source()->clear_id();
-      converted.mutable_disk()->mutable_source()->clear_metadata();
       converted.mutable_disk()->mutable_source()->set_type(
           Resource::DiskInfo::Source::RAW);
       converted.mutable_disk()->mutable_source()->clear_path();
       converted.mutable_disk()->mutable_source()->clear_mount();
 
+      // NOTE: We keep the source ID and metadata if it is a
+      // pre-existing volume, which has no profile.
+      if (resource.disk().source().has_profile()) {
+        converted.mutable_disk()->mutable_source()->clear_id();
+        converted.mutable_disk()->mutable_source()->clear_metadata();
+      }
+
       vector<ResourceConversion> conversions;
       conversions.emplace_back(resource, std::move(converted));
 
@@ -2501,6 +2582,8 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
 }
 
 
+// Synchronously updates `totalResources` and the offer operation status
+// and then asks the status update manager to send status updates.
 Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
     const UUID& operationUuid,
     const Try<vector<ResourceConversion>>& conversions)