You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/19 23:14:37 UTC

[07/12] mesos git commit: Modified SLRP to use the VolumeProfileAdaptor module.

Modified SLRP to use the VolumeProfileAdaptor module.

This changes the Storage Local Resource Provider's source of profile
information from only using the default to using a VolumeProfileAdaptor
module.

This patch is based on https://reviews.apache.org/r/64616.

Review: https://reviews.apache.org/r/64658/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0f3bdd2e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0f3bdd2e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0f3bdd2e

Branch: refs/heads/master
Commit: 0f3bdd2ee12574111d4ba2938a6d70fd0a3ee4dc
Parents: 28bf089
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:10 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 476 ++++++++++++++++++------
 1 file changed, 356 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0f3bdd2e/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 8510a31..a6e4a0f 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -297,7 +297,9 @@ public:
       slaveId(_slaveId),
       authToken(_authToken),
       strict(_strict),
-      resourceVersion(id::UUID::random())
+      reconciling(false),
+      resourceVersion(id::UUID::random()),
+      offerOperationSequence("offer-operation-sequence")
   {
     volumeProfileAdaptor = VolumeProfileAdaptor::getAdaptor();
     CHECK_NOTNULL(volumeProfileAdaptor.get());
@@ -314,12 +316,6 @@ public:
   void received(const Event& event);
 
 private:
-  struct ProfileData
-  {
-    csi::VolumeCapability capability;
-    google::protobuf::Map<string, string> parameters;
-  };
-
   struct VolumeData
   {
     VolumeData(const csi::state::VolumeState& _state)
@@ -338,14 +334,22 @@ private:
   Future<Nothing> recover();
   Future<Nothing> recoverServices();
   Future<Nothing> recoverVolumes();
-  Future<Nothing> recoverResources();
-  Future<Nothing> recoverStatusUpdates();
+  Future<Nothing> recoverResourceProviderState();
+  Future<Nothing> recoverProfiles();
   void doReliableRegistration();
   Future<Nothing> reconcileResourceProviderState();
+  Future<Nothing> reconcileStatusUpdates();
   ResourceConversion reconcileResources(
       const Resources& checkpointed,
       const Resources& discovered);
 
+  // Helper for updating the profiles mapping upon receiving an updated
+  // set of profiles from the VolumeProfileAdaptor module.
+  Future<Nothing> updateProfiles();
+
+  // Reconcile the storage pools upon profile updates.
+  Future<Nothing> reconcileProfileUpdates();
+
   // Functions for received events.
   void subscribed(const Event::Subscribed& subscribed);
   void applyOfferOperation(const Event::ApplyOfferOperation& operation);
@@ -368,7 +372,7 @@ private:
   Future<string> createVolume(
       const string& name,
       const Bytes& capacity,
-      const ProfileData& profile);
+      const VolumeProfileAdaptor::ProfileInfo& profileInfo);
   Future<Nothing> deleteVolume(const string& volumeId, bool preExisting);
   Future<string> validateCapability(
       const string& volumeId,
@@ -424,11 +428,19 @@ private:
   csi::VolumeCapability defaultMountCapability;
   csi::VolumeCapability defaultBlockCapability;
   string bootId;
-  hashmap<string, ProfileData> profiles;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
   OfferOperationStatusUpdateManager statusUpdateManager;
 
+  // The mapping of known profiles fetched from the VolumeProfileAdaptor.
+  hashmap<string, VolumeProfileAdaptor::ProfileInfo> profileInfos;
+
+  // The last set of profile names fetched from the VolumeProfileAdaptor.
+  hashset<string> knownProfiles;
+
+  // True if a reconcilition of storage pools is happening.
+  bool reconciling;
+
   ContainerID controllerContainerId;
   ContainerID nodeContainerId;
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
@@ -449,6 +461,11 @@ private:
   Resources totalResources;
   id::UUID resourceVersion;
   hashmap<string, VolumeData> volumes;
+
+  // We maintain a sequence to keep track of ongoing volume/block
+  // creation or destroy. These operations will not be sequentialized
+  // through the sequence. It is simply used to wait for them to finish.
+  Sequence offerOperationSequence;
 };
 
 
@@ -456,6 +473,8 @@ void StorageLocalResourceProviderProcess::connected()
 {
   CHECK_EQ(DISCONNECTED, state);
 
+  LOG(INFO) << "Connected to resource provider manager";
+
   state = CONNECTED;
 
   doReliableRegistration();
@@ -469,6 +488,8 @@ void StorageLocalResourceProviderProcess::disconnected()
   LOG(INFO) << "Disconnected from resource provider manager";
 
   state = DISCONNECTED;
+
+  statusUpdateManager.pause();
 }
 
 
@@ -557,12 +578,6 @@ void StorageLocalResourceProviderProcess::initialize()
     }
   }
 
-  // TODO(chhsiao): Use the volume profile module.
-  ProfileData& defaultProfile = profiles["default"];
-  defaultProfile.capability.mutable_mount();
-  defaultProfile.capability.mutable_access_mode()
-    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
-
   auto die = [=](const string& message) {
     LOG(ERROR)
       << "Failed to recover resource provider with type '" << info.type()
@@ -594,10 +609,43 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 
   return recoverServices()
     .then(defer(self(), &Self::recoverVolumes))
-    .then(defer(self(), &Self::recoverResources))
+    .then(defer(self(), &Self::recoverResourceProviderState))
+    .then(defer(self(), &Self::recoverProfiles))
     .then(defer(self(), [=]() -> Future<Nothing> {
+      LOG(INFO)
+        << "Finished recovery for resource provider with type '" << info.type()
+        << "' and name '" << info.name();
+
       state = DISCONNECTED;
 
+      statusUpdateManager.pause();
+
+      auto err = [](const string& message) {
+        LOG(ERROR)
+          << "Failed to watch for VolumeprofileAdaptor: " << message;
+      };
+
+      // Start watching the VolumeProfileAdaptor.
+      // TODO(chhsiao): Consider retrying with backoff.
+      loop(
+          self(),
+          [=] {
+            return volumeProfileAdaptor->watch(
+                knownProfiles,
+                info.storage().plugin().type())
+              .then(defer(self(), [=](const hashset<string>& profiles) {
+                // Save the returned set of profiles so that we
+                // can watch the module for changes to it.
+                knownProfiles = profiles;
+
+                return updateProfiles()
+                  .then(defer(self(), &Self::reconcileProfileUpdates));
+              }));
+          },
+          [](Nothing) -> ControlFlow<Nothing> { return Continue(); })
+        .onFailed(std::bind(err, lambda::_1))
+        .onDiscarded(std::bind(err, "future discarded"));
+
       driver.reset(new Driver(
           Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
           contentType,
@@ -814,7 +862,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::recoverResources()
+Future<Nothing>
+StorageLocalResourceProviderProcess::recoverResourceProviderState()
 {
   // Recover the resource provider ID and state from the latest
   // symlink. If the symlink does not exist, this is a new resource
@@ -864,7 +913,134 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverResources()
   return Nothing();
 }
 
-Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
+
+// NOTE: Currently we need to recover profiles for replaying pending
+// `CREATE_VOLUME` or `CREATE_BLOCK` operations after failover. Consider
+// either checkpointing the required profiles for these calls, or
+// checkpointing CSI volume states by volume names instead of IDs.
+Future<Nothing> StorageLocalResourceProviderProcess::recoverProfiles()
+{
+  // Rebuild the set of required profiles from the checkpointed storage
+  // pools (i.e., RAW resources that have no volume ID). We do not need
+  // to resolve profiles for resoures that have volume IDs, since their
+  // volume capabilities are already checkpointed.
+  hashset<string> requiredProfiles;
+  foreach (const Resource& resource, totalResources) {
+    if (!resource.disk().source().has_id()) {
+      requiredProfiles.insert(resource.disk().source().profile());
+    }
+  }
+
+  // If no pending offer operation uses any profile, there is no need
+  // to recover any profile. Watching the VolumeProfileAdaptor will be
+  // initiated later.
+  if (requiredProfiles.empty()) {
+    return Nothing();
+  }
+
+  LOG(INFO)
+    << "Waiting for VolumeProfileAdaptor to recover profiles: "
+    << stringify(requiredProfiles);
+
+  // The VolumeProfileAdapter module must at lest have knowledge of
+  // the required profiles. Because the module is initialized separately
+  // from this resource provider, we must watch the module until all
+  // required profiles have been recovered.
+  return loop(
+      self(),
+      [=] {
+        return volumeProfileAdaptor->watch(
+            knownProfiles,
+            info.storage().plugin().type());
+      },
+      [=](const hashset<string>& profiles) -> ControlFlow<Nothing> {
+        // Save the returned set of profiles so that we can watch the
+        // module for changes to it, both in this loop and after
+        // recovery completes.
+        knownProfiles = profiles;
+
+        foreach (const string& profile, requiredProfiles) {
+          if (!knownProfiles.contains(profile)) {
+            return Continue();
+          }
+        }
+
+        return Break();
+      })
+    .then(defer(self(), &Self::updateProfiles));
+}
+
+
+void StorageLocalResourceProviderProcess::doReliableRegistration()
+{
+  if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
+    return;
+  }
+
+  CHECK_EQ(CONNECTED, state);
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_resource_provider_info()->CopyFrom(info);
+
+  auto err = [](const ResourceProviderInfo& info, const string& message) {
+    LOG(ERROR)
+      << "Failed to subscribe resource provider with type '" << info.type()
+      << "' and name '" << info.name() << "': " << message;
+  };
+
+  driver->send(evolve(call))
+    .onFailed(std::bind(err, info, lambda::_1))
+    .onDiscarded(std::bind(err, info, "future discarded"));
+
+  // TODO(chhsiao): Consider doing an exponential backoff.
+  delay(Seconds(1), self(), &Self::doReliableRegistration);
+}
+
+
+Future<Nothing>
+StorageLocalResourceProviderProcess::reconcileResourceProviderState()
+{
+  return reconcileStatusUpdates()
+    .then(defer(self(), [=] {
+      return collect(list<Future<Resources>>{listVolumes(), getCapacities()})
+        .then(defer(self(), [=](const list<Resources>& discovered) {
+          ResourceConversion conversion = reconcileResources(
+              totalResources,
+              accumulate(discovered.begin(), discovered.end(), Resources()));
+
+          Try<Resources> result = totalResources.apply(conversion);
+          CHECK_SOME(result);
+
+          if (result.get() != totalResources) {
+            LOG(INFO)
+              << "Removing '" << conversion.consumed << "' and adding '"
+              << conversion.converted << "' to the total resources";
+
+            totalResources = result.get();
+            checkpointResourceProviderState();
+          }
+
+          // NOTE: Since this is the first `UPDATE_STATE` call of the
+          // current subscription, there must be no racing speculative
+          // operation, thus no need to update the resource version.
+          sendResourceProviderStateUpdate();
+          statusUpdateManager.resume();
+
+          LOG(INFO)
+            << "Resource provider " << info.id() << " is in READY state";
+
+          state = READY;
+
+          return Nothing();
+        }));
+    }));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileStatusUpdates()
 {
   CHECK(info.has_id());
 
@@ -878,8 +1054,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
           resourceProviderDir,
           lambda::_1));
 
-  statusUpdateManager.pause();
-
   Try<list<string>> operationPaths = slave::paths::getOfferOperationPaths(
       slave::paths::getResourceProviderPath(
           metaDir, slaveId, info.type(), info.name(), info.id()));
@@ -999,65 +1173,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
 }
 
 
-void StorageLocalResourceProviderProcess::doReliableRegistration()
-{
-  if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
-    return;
-  }
-
-  CHECK_EQ(CONNECTED, state);
-
-  Call call;
-  call.set_type(Call::SUBSCRIBE);
-
-  Call::Subscribe* subscribe = call.mutable_subscribe();
-  subscribe->mutable_resource_provider_info()->CopyFrom(info);
-
-  auto err = [](const ResourceProviderInfo& info, const string& message) {
-    LOG(ERROR)
-      << "Failed to subscribe resource provider with type '" << info.type()
-      << "' and name '" << info.name() << "': " << message;
-  };
-
-  driver->send(evolve(call))
-    .onFailed(std::bind(err, info, lambda::_1))
-    .onDiscarded(std::bind(err, info, "future discarded"));
-
-  // TODO(chhsiao): Consider doing an exponential backoff.
-  delay(Seconds(1), self(), &Self::doReliableRegistration);
-}
-
-
-Future<Nothing>
-StorageLocalResourceProviderProcess::reconcileResourceProviderState()
-{
-  return recoverStatusUpdates()
-    .then(defer(self(), [=] {
-      return collect(list<Future<Resources>>{listVolumes(), getCapacities()})
-        .then(defer(self(), [=](const list<Resources>& discovered) {
-          ResourceConversion conversion = reconcileResources(
-              totalResources,
-              accumulate(discovered.begin(), discovered.end(), Resources()));
-
-          Try<Resources> result = totalResources.apply(conversion);
-          CHECK_SOME(result);
-
-          if (result.get() != totalResources) {
-            totalResources = result.get();
-            checkpointResourceProviderState();
-          }
-
-          sendResourceProviderStateUpdate();
-          statusUpdateManager.resume();
-
-          state = READY;
-
-          return Nothing();
-        }));
-    }));
-}
-
-
 ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
     const Resources& checkpointed,
     const Resources& discovered)
@@ -1106,6 +1221,97 @@ ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
 }
 
 
+Future<Nothing> StorageLocalResourceProviderProcess::updateProfiles()
+{
+  LOG(INFO)
+    << "Updating metadata for profiles: " << stringify(knownProfiles);
+
+  list<Future<Nothing>> futures;
+  foreach (const string& profile, knownProfiles) {
+    // Since profiles are immutable after creation and cannot be
+    // deleted, we do not need to update any profile that is already in
+    // the mapping.
+    // TODO(chhsiao): Handle profile deactivation.
+    if (profileInfos.contains(profile)) {
+      continue;
+    }
+
+    futures.push_back(volumeProfileAdaptor->translate(
+        profile, info.storage().plugin().type())
+      .then(defer(self(), [=](const VolumeProfileAdaptor::ProfileInfo& info) {
+        profileInfos.put(profile, info);
+        return Nothing();
+      })));
+  }
+
+  return collect(futures).then([] { return Nothing(); });
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileProfileUpdates()
+{
+  // Do nothing if the resource provider ID is not known yet, since it
+  // is used to construct the resource metadata of storage pools. The
+  // metadata will be constructed in `reconcileResourceProviderState`.
+  if (!info.has_id()) {
+    return Nothing();
+  }
+
+  CHECK(!reconciling);
+
+  LOG(INFO) << "Reconciling storage pools for resource provider " << info.id();
+
+  reconciling = true;
+
+  // We add a lambda into `offerOperationSequence` so that it will
+  // return after waiting for all pending operations in the sequence.
+  return offerOperationSequence.add(
+      std::function<Future<Nothing>()>([] { return Nothing(); }))
+    .then(defer(self(), &Self::getCapacities))
+    .then(defer(self(), [=](const Resources& discovered) {
+      auto isStoragePool = [](const Resource& r) {
+        return !r.disk().source().has_id();
+      };
+
+      ResourceConversion conversion = reconcileResources(
+          totalResources.filter(isStoragePool),
+          discovered);
+
+      Try<Resources> result = totalResources.apply(conversion);
+      CHECK_SOME(result);
+
+      if (result.get() != totalResources) {
+        LOG(INFO)
+          << "Removing '" << conversion.consumed << "' and adding '"
+          << conversion.converted << "' to the total resources";
+
+        totalResources = result.get();
+        checkpointResourceProviderState();
+
+        // NOTE: We ensure that the first `UPDATE_STATE` of the current
+        // subscription is sent by `reconcileResourceProviderState`, so
+        // that the total resources contain existing volumes.
+        if (state == READY) {
+          // NOTE: We always update the resource version before sending
+          // an `UPDATE_STATE`, so that any racing speculative operation
+          // will be rejected. Otherwise, the speculative resource
+          // conversion done on the master will be cancelled out.
+          resourceVersion = id::UUID::random();
+          sendResourceProviderStateUpdate();
+        }
+      }
+
+      LOG(INFO)
+        << "Finished reconciliation of storage pools for resource provider "
+        << info.id();
+
+      reconciling = false;
+
+      return Nothing();
+    }));
+}
+
+
 void StorageLocalResourceProviderProcess::subscribed(
     const Event::Subscribed& subscribed)
 {
@@ -1144,9 +1350,6 @@ void StorageLocalResourceProviderProcess::subscribed(
 void StorageLocalResourceProviderProcess::applyOfferOperation(
     const Event::ApplyOfferOperation& operation)
 {
-  // NOTE: If we receive an offer operation in SUBSCRIBED state, there
-  // must be a resource version mismatch since the current resource
-  // version is not reported yet.
   CHECK(state == SUBSCRIBED || state == READY);
 
   Try<id::UUID> uuid = id::UUID::fromBytes(operation.operation_uuid().value());
@@ -1177,7 +1380,13 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
 
   CHECK_SOME(operationVersion);
 
-  if (operationVersion.get() != resourceVersion) {
+  if (state == SUBSCRIBED) {
+    result = updateOfferOperationStatus(uuid.get(), Error(
+        "Cannot apply offer operation in SUBSCRIBED state"));
+  } else if (reconciling) {
+    result = updateOfferOperationStatus(uuid.get(), Error(
+        "Cannot apply offer operation when reconciling storage pools"));
+  } else if (operationVersion.get() != resourceVersion) {
     result = updateOfferOperationStatus(uuid.get(), Error(
         "Mismatched resource version " + stringify(operationVersion.get()) +
         " (expected: " + stringify(resourceVersion) + ")"));
@@ -2059,7 +2268,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
-    const ProfileData& profile)
+    const VolumeProfileAdaptor::ProfileInfo& profileInfo)
 {
   // NOTE: This can only be called after `prepareControllerService`.
   CHECK_SOME(controllerCapabilities);
@@ -2077,8 +2286,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
         ->set_required_bytes(capacity.bytes());
       request.mutable_capacity_range()
         ->set_limit_bytes(capacity.bytes());
-      request.add_volume_capabilities()->CopyFrom(profile.capability);
-      *request.mutable_parameters() = profile.parameters;
+      request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
+      *request.mutable_parameters() = profileInfo.parameters;
 
       return client.CreateVolume(request)
         .then(defer(self(), [=](const csi::CreateVolumeResponse& response) {
@@ -2094,7 +2303,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
             csi::state::VolumeState volumeState;
             volumeState.set_state(csi::state::VolumeState::CREATED);
             volumeState.mutable_volume_capability()
-              ->CopyFrom(profile.capability);
+              ->CopyFrom(profileInfo.capability);
             *volumeState.mutable_volume_attributes() = volumeInfo.attributes();
 
             volumes.put(volumeInfo.id(), std::move(volumeState));
@@ -2308,11 +2517,18 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
     .then(defer(self(), [=](csi::Client client) {
       list<Future<Resources>> futures;
 
-      foreachpair (const string& profile, const ProfileData& data, profiles) {
+      foreach (const string& profile, knownProfiles) {
+        CHECK(profileInfos.contains(profile));
+
+        // TODO(chhsiao): Skip inactive profiles.
+
+        const VolumeProfileAdaptor::ProfileInfo& profileInfo =
+          profileInfos.at(profile);
+
         csi::GetCapacityRequest request;
         request.mutable_version()->CopyFrom(csiVersion);
-        request.add_volume_capabilities()->CopyFrom(data.capability);
-        *request.mutable_parameters() = data.parameters;
+        request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
+        *request.mutable_parameters() = profileInfo.parameters;
 
         futures.push_back(client.GetCapacity(request)
           .then(defer(self(), [=](
@@ -2348,6 +2564,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
   CHECK(!protobuf::isTerminalState(operation.latest_status().state()));
 
   Future<vector<ResourceConversion>> conversions;
+  Option<Resource> source;
 
   switch (operation.info().type()) {
     case Offer::Operation::RESERVE:
@@ -2364,8 +2581,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     case Offer::Operation::CREATE_VOLUME: {
       CHECK(operation.info().has_create_volume());
 
+      source = operation.info().create_volume().source();
       conversions = applyCreateVolumeOrBlock(
-          operation.info().create_volume().source(),
+          source.get(),
           operationUuid,
           operation.info().create_volume().target_type());
 
@@ -2374,16 +2592,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     case Offer::Operation::DESTROY_VOLUME: {
       CHECK(operation.info().has_destroy_volume());
 
-      conversions = applyDestroyVolumeOrBlock(
-          operation.info().destroy_volume().volume());
+      source = operation.info().destroy_volume().volume();
+      conversions = applyDestroyVolumeOrBlock(source.get());
 
       break;
     }
     case Offer::Operation::CREATE_BLOCK: {
       CHECK(operation.info().has_create_block());
 
+      source = operation.info().create_block().source();
       conversions = applyCreateVolumeOrBlock(
-          operation.info().create_block().source(),
+          source.get(),
           operationUuid,
           Resource::DiskInfo::Source::BLOCK);
 
@@ -2392,8 +2611,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     case Offer::Operation::DESTROY_BLOCK: {
       CHECK(operation.info().has_destroy_block());
 
-      conversions = applyDestroyVolumeOrBlock(
-          operation.info().destroy_block().block());
+      source = operation.info().destroy_block().block();
+      conversions = applyDestroyVolumeOrBlock(source.get());
 
       break;
     }
@@ -2428,7 +2647,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
           updateOfferOperationStatus(operationUuid, conversions));
     }));
 
-  return promise->future();
+  Future<Nothing> future = promise->future();
+
+  CHECK_SOME(source);
+  if (source->disk().source().has_profile()) {
+    // We place the future in `offerOperationSequence` so it can be
+    // waited for during reconciliation upon profile updates.
+    offerOperationSequence.add(
+        std::function<Future<Nothing>()>([future] { return future; }));
+  }
+
+  return future;
 }
 
 
@@ -2468,8 +2697,11 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     case Resource::DiskInfo::Source::PATH:
     case Resource::DiskInfo::Source::MOUNT: {
       if (resource.disk().source().has_profile()) {
-        CHECK(profiles.contains(resource.disk().source().profile()));
-        if (!profiles.at(resource.disk().source().profile())
+        CHECK(profileInfos.contains(resource.disk().source().profile()));
+
+        // TODO(chhsiao): Reject if the source has an inactive profile.
+
+        if (!profileInfos.at(resource.disk().source().profile())
                .capability.has_mount()) {
           return Failure(
               "Profile '" + resource.disk().source().profile() +
@@ -2482,7 +2714,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
         created = createVolume(
             operationUuid.toString(),
             Bytes(resource.scalar().value(), Bytes::MEGABYTES),
-            profiles.at(resource.disk().source().profile()));
+            profileInfos.at(resource.disk().source().profile()));
       } else {
         const string& volumeId = resource.disk().source().id();
 
@@ -2508,8 +2740,11 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     }
     case Resource::DiskInfo::Source::BLOCK: {
       if (resource.disk().source().has_profile()) {
-        CHECK(profiles.contains(resource.disk().source().profile()));
-        if (!profiles.at(resource.disk().source().profile())
+        CHECK(profileInfos.contains(resource.disk().source().profile()));
+
+        // TODO(chhsiao): Reject if the source has an inactive profile.
+
+        if (!profileInfos.at(resource.disk().source().profile())
                .capability.has_block()) {
           return Failure(
               "Profile '" + resource.disk().source().profile() +
@@ -2522,7 +2757,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
         created = createVolume(
             operationUuid.toString(),
             Bytes(resource.scalar().value(), Bytes::MEGABYTES),
-            profiles.at(resource.disk().source().profile()));
+            profileInfos.at(resource.disk().source().profile()));
       } else {
         const string& volumeId = resource.disk().source().id();
 
@@ -2633,6 +2868,8 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
           resource.disk().source().id(),
           !resource.disk().source().has_profile())))
     .then(defer(self(), [=]() {
+      // TODO(chhsiao): Convert to an empty resource and update all
+      // storage pools if the profile has been deactivated.
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_type(
           Resource::DiskInfo::Source::RAW);
@@ -2698,20 +2935,6 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
 
   operation.add_statuses()->CopyFrom(operation.latest_status());
 
-  if (error.isSome()) {
-    // We only update the resource version for failed conventional
-    // operations, which are speculatively executed on the master.
-    if (operation.info().type() == Offer::Operation::RESERVE ||
-        operation.info().type() == Offer::Operation::UNRESERVE ||
-        operation.info().type() == Offer::Operation::CREATE ||
-        operation.info().type() == Offer::Operation::DESTROY) {
-      resourceVersion = id::UUID::random();
-
-      // Send an `UPDATE_STATE` after we finish the current operation.
-      dispatch(self(), &Self::sendResourceProviderStateUpdate);
-    }
-  }
-
   checkpointResourceProviderState();
 
   // Send out the status update for the offer operation.
@@ -2735,7 +2958,20 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
     .onFailed(defer(self(), std::bind(die, lambda::_1)))
     .onDiscarded(defer(self(), std::bind(die, "future discarded")));
 
-  return error.isNone() ? Nothing() : Try<Nothing>::error(error.get());
+  if (error.isSome()) {
+    // We only send `UPDATE_STATE` for failed speculative operations.
+    if (operation.info().type() == Offer::Operation::RESERVE ||
+        operation.info().type() == Offer::Operation::UNRESERVE ||
+        operation.info().type() == Offer::Operation::CREATE ||
+        operation.info().type() == Offer::Operation::DESTROY) {
+      resourceVersion = id::UUID::random();
+      sendResourceProviderStateUpdate();
+    }
+
+    return error.get();
+  }
+
+  return Nothing();
 }