You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/07 23:46:00 UTC

[1/3] mesos git commit: Handling offer operations in storage local resource provider.

Repository: mesos
Updated Branches:
  refs/heads/master 31d718e39 -> ea98a3440


Handling offer operations in storage local resource provider.

For legacy operations, we just call Resources::apply(). New operations
CREATE_VOLUME, DESTROY_VOLUME, CREATE_BLOCK, DESTROY_BLOCK are
implemented through CSI. Specially, DESTROY_* requires unpublishing
the resources first.

Review: https://reviews.apache.org/r/63388


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31197766
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31197766
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31197766

Branch: refs/heads/master
Commit: 31197766768b3122807ea6dcff4cd313a0bbd5dc
Parents: 31d718e
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu Nov 30 16:37:05 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 7 15:07:53 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 580 +++++++++++++++++++++++-
 1 file changed, 569 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/31197766/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a2794ac..a029421 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -41,6 +41,7 @@
 
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/linkedhashmap.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 
@@ -48,6 +49,7 @@
 
 #include "common/http.hpp"
 #include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
 
 #include "csi/client.hpp"
 #include "csi/paths.hpp"
@@ -70,7 +72,9 @@ using std::accumulate;
 using std::find;
 using std::list;
 using std::queue;
+using std::shared_ptr;
 using std::string;
+using std::vector;
 
 using process::Break;
 using process::Continue;
@@ -88,6 +92,7 @@ using process::collect;
 using process::defer;
 using process::loop;
 using process::spawn;
+using process::undiscardable;
 
 using process::http::authentication::Principal;
 
@@ -343,6 +348,29 @@ private:
   Future<Nothing> nodePublish(const string& volumeId);
   Future<Nothing> nodeUnpublish(const string& volumeId);
 
+  // Returns a CSI volume ID.
+  Future<string> createVolume(
+      const string& name,
+      const Bytes& capacity,
+      const ProfileData& profile);
+  Future<Nothing> deleteVolume(const string& volumeId);
+
+  // Applies the offer operation. Conventional operations will be
+  // synchoronusly applied.
+  Future<Nothing> applyOfferOperation(const UUID& operationUuid);
+
+  Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
+      const Resource& resource,
+      const UUID& operationUuid,
+      const Resource::DiskInfo::Source::Type& type);
+  Future<vector<ResourceConversion>> applyDestroyVolumeOrBlock(
+      const Resource& resource);
+
+  // Synchronously update `totalResources` and the offer operation status.
+  Try<Nothing> applyResourceConversions(
+      const UUID& operationUuid,
+      const Try<vector<ResourceConversion>>& conversions);
+
   void checkpointResourceProviderState();
   void sendResourceProviderStateUpdate();
   void checkpointVolumeState(const string& volumeId);
@@ -380,7 +408,10 @@ private:
   Option<csi::ControllerCapabilities> controllerCapabilities;
   Option<string> nodeId;
 
-  list<Event::Operation> pendingOperations;
+  // NOTE: We store the list of pending operations in a `LinkedHashMap`
+  // to preserve the order we receive the operations. This is useful
+  // when we replay depending operations during recovery.
+  LinkedHashMap<UUID, Event::Operation> pendingOperations;
   Resources totalResources;
   Option<UUID> resourceVersion;
   hashmap<string, VolumeData> volumes;
@@ -558,7 +589,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
         if (resourceProviderState.isSome()) {
           foreach (const Event::Operation& operation,
                    resourceProviderState->operations()) {
-            pendingOperations.push_back(operation);
+            Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+            CHECK_SOME(uuid);
+
+            pendingOperations[uuid.get()] = operation;
           }
 
           totalResources = resourceProviderState->resources();
@@ -571,6 +605,23 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
         }
       }
 
+      // We replay all pending operations here, so that if a volume is
+      // actually created before the last failover, it will be reflected
+      // in the updated total resources before we do the reconciliation.
+      // NOTE: `applyOfferOperation` will remove the applied operation
+      // from the list of pending operations, so we make a copy of keys
+      // here.
+      foreach (const UUID& uuid, pendingOperations.keys()) {
+        applyOfferOperation(uuid)
+          .onAny(defer(self(), [=](const Future<Nothing>& future) {
+            if (!future.isReady()) {
+              LOG(ERROR)
+                << "Failed to apply operation " << uuid << ": "
+                << (future.isFailed() ? future.failure() : "future discarded");
+            }
+          }));
+      }
+
       state = DISCONNECTED;
 
       driver.reset(new Driver(
@@ -917,6 +968,8 @@ void StorageLocalResourceProviderProcess::subscribed(
 void StorageLocalResourceProviderProcess::operation(
     const Event::Operation& operation)
 {
+  Future<Resources> converted;
+
   if (state == SUBSCRIBED) {
     // TODO(chhsiao): Reject this operation.
     return;
@@ -924,8 +977,23 @@ void StorageLocalResourceProviderProcess::operation(
 
   CHECK_EQ(READY, state);
 
-  pendingOperations.push_back(operation);
+  LOG(INFO) << "Received " << operation.info().type() << " operation";
+
+  Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(!pendingOperations.contains(uuid.get()));
+  pendingOperations[uuid.get()] = operation;
   checkpointResourceProviderState();
+
+  applyOfferOperation(uuid.get())
+    .onAny(defer(self(), [=](const Future<Nothing>& future) {
+      if (!future.isReady()) {
+        LOG(ERROR)
+          << "Failed to apply " << operation.info().type() << " operation: "
+          << (future.isFailed() ? future.failure() : "future discarded");
+      }
+    }));
 }
 
 
@@ -1065,14 +1133,14 @@ void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish)
 void StorageLocalResourceProviderProcess::acknowledgeOfferOperation(
     const Event::AcknowledgeOfferOperation& acknowledge)
 {
-  CHECK_EQ(SUBSCRIBED, state);
+  CHECK_EQ(READY, state);
 }
 
 
 void StorageLocalResourceProviderProcess::reconcileOfferOperations(
     const Event::ReconcileOfferOperations& reconcile)
 {
-  CHECK_EQ(SUBSCRIBED, state);
+  CHECK_EQ(READY, state);
 }
 
 
@@ -1822,11 +1890,502 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 }
 
 
+Future<string> StorageLocalResourceProviderProcess::createVolume(
+    const string& name,
+    const Bytes& capacity,
+    const ProfileData& profile)
+{
+  // NOTE: This can only be called after `prepareControllerService`.
+  CHECK_SOME(controllerCapabilities);
+
+  if (!controllerCapabilities->createDeleteVolume) {
+    return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+  }
+
+  return getService(controllerContainerId)
+    .then(defer(self(), [=](csi::Client client) {
+      csi::CreateVolumeRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+      request.set_name(name);
+      request.mutable_capacity_range()
+        ->set_required_bytes(capacity.bytes());
+      request.mutable_capacity_range()
+        ->set_limit_bytes(capacity.bytes());
+      request.add_volume_capabilities()->CopyFrom(profile.capability);
+      *request.mutable_parameters() = profile.parameters;
+
+      return client.CreateVolume(request)
+        .then(defer(self(), [=](const csi::CreateVolumeResponse& response) {
+          const csi::VolumeInfo& volumeInfo = response.volume_info();
+
+          if (volumes.contains(volumeInfo.id())) {
+            // The resource provider failed over after the last
+            // `CreateVolume` call, but before the operation status
+            // was checkpointed.
+            CHECK_EQ(csi::state::VolumeState::CREATED,
+                     volumes.at(volumeInfo.id()).state.state());
+          } else {
+            csi::state::VolumeState volumeState;
+            volumeState.set_state(csi::state::VolumeState::CREATED);
+            volumeState.mutable_volume_capability()
+              ->CopyFrom(profile.capability);
+            *volumeState.mutable_volume_attributes() =
+              volumeInfo.attributes();
+
+            volumes.put(volumeInfo.id(), std::move(volumeState));
+            checkpointVolumeState(volumeInfo.id());
+          }
+
+          return volumeInfo.id();
+        }));
+    }));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
+    const string& volumeId)
+{
+  // NOTE: This can only be called after `prepareControllerService` and
+  // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
+  CHECK_SOME(controllerCapabilities);
+  CHECK_SOME(nodeId);
+
+  if (!controllerCapabilities->createDeleteVolume) {
+    return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+  }
+
+  const string volumePath = csi::paths::getVolumePath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin().type(),
+      info.storage().plugin().name(),
+      volumeId);
+
+  Future<Nothing> deleted = Nothing();
+
+  if (volumes.contains(volumeId)) {
+    // NOTE: We don't break for `PUBLISHED` and `NODE_READY` as deleting
+    // the volume in these states needs all operations beneath it.
+    switch (volumes.at(volumeId).state.state()) {
+      case csi::state::VolumeState::PUBLISHED: {
+        deleted = deleted
+          .then(defer(self(), &Self::nodeUnpublish, volumeId));
+      }
+      case csi::state::VolumeState::NODE_READY: {
+        deleted = deleted
+          .then(defer(self(), &Self::controllerUnpublish, volumeId));
+      }
+      case csi::state::VolumeState::CREATED: {
+        deleted = deleted
+          .then(defer(self(), &Self::getService, controllerContainerId))
+          .then(defer(self(), [=](csi::Client client) {
+            csi::DeleteVolumeRequest request;
+            request.mutable_version()->CopyFrom(csiVersion);
+            request.set_volume_id(volumeId);
+
+            return client.DeleteVolume(request)
+              .then(defer(self(), [=] {
+                // NOTE: This will destruct the volume's sequence!
+                volumes.erase(volumeId);
+                CHECK_SOME(os::rmdir(volumePath));
+
+                return Nothing();
+              }));
+          }));
+        break;
+      }
+      case csi::state::VolumeState::UNKNOWN:
+      case csi::state::VolumeState::CONTROLLER_PUBLISH:
+      case csi::state::VolumeState::CONTROLLER_UNPUBLISH:
+      case csi::state::VolumeState::NODE_PUBLISH:
+      case csi::state::VolumeState::NODE_UNPUBLISH:
+      case google::protobuf::kint32min:
+      case google::protobuf::kint32max: {
+        UNREACHABLE();
+      }
+    }
+  } else {
+    // The resource provider failed over after the last `DeleteVolume`
+    // call, but before the operation status was checkpointed.
+    CHECK(!os::exists(volumePath));
+  }
+
+  // NOTE: We make the returned future undiscardable because the
+  // deletion may cause the volume's sequence to be destructed, which
+  // will in turn discard all futures in the sequence.
+  return undiscardable(deleted);
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::applyOfferOperation(
+    const UUID& operationUuid)
+{
+  Future<vector<ResourceConversion>> conversions;
+  Option<Error> error;
+
+  CHECK(pendingOperations.contains(operationUuid));
+  const Event::Operation& operation = pendingOperations.at(operationUuid);
+
+  Try<UUID> operationVersion =
+    UUID::fromBytes(operation.resource_version_uuid());
+  CHECK_SOME(operationVersion);
+
+  if (resourceVersion.get() != operationVersion.get()) {
+    error = Error(
+        "Mismatched resource version " + stringify(operationVersion.get()) +
+        " (expected: " + stringify(resourceVersion.get()) + ")");
+  }
+
+  switch (operation.info().type()) {
+    case Offer::Operation::RESERVE:
+    case Offer::Operation::UNRESERVE:
+    case Offer::Operation::CREATE:
+    case Offer::Operation::DESTROY: {
+      // Synchronously apply the conventional operations.
+      return applyResourceConversions(
+          operationUuid,
+          error.isNone()
+            ? getResourceConversions(operation.info())
+            : Try<vector<ResourceConversion>>::error(error.get()));
+    }
+    case Offer::Operation::CREATE_VOLUME: {
+      CHECK(operation.info().has_create_volume());
+
+      if (error.isNone()) {
+        conversions = applyCreateVolumeOrBlock(
+            operation.info().create_volume().source(),
+            operationUuid,
+            operation.info().create_volume().target_type());
+      } else {
+        conversions = Failure(error.get());
+      }
+      break;
+    }
+    case Offer::Operation::DESTROY_VOLUME: {
+      CHECK(operation.info().has_destroy_volume());
+
+      if (error.isNone()) {
+        conversions = applyDestroyVolumeOrBlock(
+            operation.info().destroy_volume().volume());
+      } else {
+        conversions = Failure(error.get());
+      }
+      break;
+    }
+    case Offer::Operation::CREATE_BLOCK: {
+      CHECK(operation.info().has_create_block());
+
+      if (error.isNone()) {
+        conversions = applyCreateVolumeOrBlock(
+            operation.info().create_block().source(),
+            operationUuid,
+            Resource::DiskInfo::Source::BLOCK);
+      } else {
+        conversions = Failure(error.get());
+      }
+      break;
+    }
+    case Offer::Operation::DESTROY_BLOCK: {
+      CHECK(operation.info().has_destroy_block());
+
+      if (error.isNone()) {
+        conversions = applyDestroyVolumeOrBlock(
+            operation.info().destroy_block().block());
+      } else {
+        conversions = Failure(error.get());
+      }
+      break;
+    }
+    case Offer::Operation::UNKNOWN:
+    case Offer::Operation::LAUNCH:
+    case Offer::Operation::LAUNCH_GROUP: {
+      UNREACHABLE();
+    }
+  }
+
+  // NOTE: The code below is executed only when applying a storage operation.
+  shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
+
+  conversions
+    .onAny(defer(self(), [=](const Future<vector<ResourceConversion>>& future) {
+      Option<Error> error;
+      if (!future.isReady()) {
+        error =
+          Error(future.isFailed() ? future.failure() : "future discarded");
+      }
+
+      if (future.isReady()) {
+        LOG(INFO)
+          << "Applying conversion from '" << future->at(0).consumed << "' to '"
+          << future->at(0).converted << "'";
+      } else {
+        LOG(ERROR)
+          << "Failed to apply " << operation.info().type() << " operation: "
+          << error->message;
+      }
+
+      promise->associate(applyResourceConversions(
+          operationUuid,
+          error.isNone()
+            ? future.get()
+            : Try<vector<ResourceConversion>>::error(error.get())));
+    }));
+
+  return promise->future();
+}
+
+
+Future<vector<ResourceConversion>>
+StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
+    const Resource& resource,
+    const UUID& operationUuid,
+    const Resource::DiskInfo::Source::Type& type)
+{
+  if (resource.disk().source().type() != Resource::DiskInfo::Source::RAW) {
+    return Failure(
+        "Cannot create volume from source of " +
+        stringify(resource.disk().source().type()) + " type");
+  }
+
+  switch (type) {
+    case Resource::DiskInfo::Source::PATH:
+    case Resource::DiskInfo::Source::MOUNT: {
+      if (!profiles.at(resource.disk().source().profile())
+             .capability.has_mount()) {
+        return Failure(
+            "Profile '" + resource.disk().source().profile() +
+            "' cannot be used for CREATE_VOLUME operation");
+      }
+      break;
+    }
+    case Resource::DiskInfo::Source::BLOCK: {
+      if (!profiles.at(resource.disk().source().profile())
+             .capability.has_block()) {
+        return Failure(
+            "Profile '" + resource.disk().source().profile() +
+            "' cannot be used for CREATE_BLOCK operation");
+      }
+      break;
+    }
+    case Resource::DiskInfo::Source::UNKNOWN:
+    case Resource::DiskInfo::Source::RAW: {
+      UNREACHABLE();
+    }
+  }
+
+  Future<string> created;
+
+  if (resource.disk().source().has_id()) {
+    // Preprovisioned volumes with RAW type.
+    // TODO(chhsiao): Call `ValidateVolumeCapabilities` sequentially
+    // once we use the profile module and make profile optional.
+    CHECK(volumes.contains(resource.disk().source().id()));
+    created = resource.disk().source().id();
+  } else {
+    // We use the operation UUID as the name of the volume, so the same
+    // operation will create the same volume after recovery.
+    // TODO(chhsiao): Call `CreateVolume` sequentially with other create
+    // or delete operations.
+    // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources.
+    created = createVolume(
+        operationUuid.toString(),
+        resource.scalar().value(),
+        profiles.at(resource.disk().source().profile()));
+  }
+
+  return created
+    .then(defer(self(), [=](const string& volumeId) {
+      CHECK(volumes.contains(volumeId));
+      const csi::state::VolumeState& volumeState = volumes.at(volumeId).state;
+
+      Resource converted = resource;
+      converted.mutable_disk()->mutable_source()->set_id(volumeId);
+      converted.mutable_disk()->mutable_source()->set_type(type);
+
+      if (!volumeState.volume_attributes().empty()) {
+        converted.mutable_disk()->mutable_source()->mutable_metadata()
+          ->CopyFrom(convertStringMapToLabels(volumeState.volume_attributes()));
+      }
+
+      const string mountPath = csi::paths::getMountPath(
+          slave::paths::getCsiRootDir("."),
+          info.storage().plugin().type(),
+          info.storage().plugin().name(),
+          volumeId);
+
+      switch (type) {
+        case Resource::DiskInfo::Source::PATH: {
+          // Set the root path relative to agent work dir.
+          converted.mutable_disk()->mutable_source()->mutable_path()
+            ->set_root(mountPath);
+          break;
+        }
+        case Resource::DiskInfo::Source::MOUNT: {
+          // Set the root path relative to agent work dir.
+          converted.mutable_disk()->mutable_source()->mutable_mount()
+            ->set_root(mountPath);
+          break;
+        }
+        case Resource::DiskInfo::Source::BLOCK: {
+          break;
+        }
+        case Resource::DiskInfo::Source::UNKNOWN:
+        case Resource::DiskInfo::Source::RAW: {
+          UNREACHABLE();
+        }
+      }
+
+      vector<ResourceConversion> conversions;
+      conversions.emplace_back(resource, std::move(converted));
+
+      return conversions;
+    }));
+}
+
+
+Future<vector<ResourceConversion>>
+StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
+    const Resource& resource)
+{
+  switch (resource.disk().source().type()) {
+    case Resource::DiskInfo::Source::PATH:
+    case Resource::DiskInfo::Source::MOUNT:
+    case Resource::DiskInfo::Source::BLOCK: {
+      break;
+    }
+    case Resource::DiskInfo::Source::UNKNOWN:
+    case Resource::DiskInfo::Source::RAW: {
+      return Failure(
+          "Cannot delete volume of " +
+          stringify(resource.disk().source().type()) + " type");
+      break;
+    }
+  }
+
+  CHECK(resource.disk().source().has_id());
+  CHECK(volumes.contains(resource.disk().source().id()));
+
+  // Sequentialize the deletion with other operation on the same volume.
+  return volumes.at(resource.disk().source().id()).sequence->add(
+      std::function<Future<Nothing>()>(
+          defer(self(), &Self::deleteVolume, resource.disk().source().id())))
+    .then(defer(self(), [=]() {
+      Resource converted = resource;
+      converted.mutable_disk()->mutable_source()->clear_id();
+      converted.mutable_disk()->mutable_source()->clear_metadata();
+      converted.mutable_disk()->mutable_source()->set_type(
+          Resource::DiskInfo::Source::RAW);
+      converted.mutable_disk()->mutable_source()->clear_path();
+      converted.mutable_disk()->mutable_source()->clear_mount();
+
+      vector<ResourceConversion> conversions;
+      conversions.emplace_back(resource, std::move(converted));
+
+      return conversions;
+    }));
+}
+
+
+Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions(
+    const UUID& operationUuid,
+    const Try<vector<ResourceConversion>>& conversions)
+{
+  Option<Error> error;
+
+  CHECK(pendingOperations.contains(operationUuid));
+  const Event::Operation& operation = pendingOperations.at(operationUuid);
+
+  if (conversions.isSome()) {
+    // Strip away the allocation info when applying the convertion to
+    // the total resources.
+    vector<ResourceConversion> _conversions;
+    foreach (ResourceConversion conversion, conversions.get()) {
+      conversion.consumed.unallocate();
+      conversion.converted.unallocate();
+      _conversions.push_back(std::move(conversion));
+    }
+
+    Try<Resources> result = totalResources.apply(_conversions);
+    if (result.isSome()) {
+      totalResources = result.get();
+    } else {
+      error = result.error();
+    }
+  } else {
+    error = conversions.error();
+  }
+
+  // We first ask the status update manager to checkpoint the operation
+  // status, then checkpoint the resource provider state.
+  // TODO(chhsiao): Use the status update manager.
+  Call call;
+  call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+  call.mutable_resource_provider_id()->CopyFrom(info.id());
+
+  Call::UpdateOfferOperationStatus* update =
+    call.mutable_update_offer_operation_status();
+  update->mutable_framework_id()->CopyFrom(operation.framework_id());
+  update->set_operation_uuid(operation.operation_uuid());
+
+  OfferOperationStatus* status = update->mutable_status();
+  status->set_status_uuid(UUID::random().toBytes());
+
+  if (operation.info().has_id()) {
+    status->mutable_operation_id()->CopyFrom(operation.info().id());
+  }
+
+  if (error.isSome()) {
+    // We only update the resource version for failed conventional
+    // operations, which are speculatively executed on the master.
+    if (operation.info().type() == Offer::Operation::RESERVE ||
+        operation.info().type() == Offer::Operation::UNRESERVE ||
+        operation.info().type() == Offer::Operation::CREATE ||
+        operation.info().type() == Offer::Operation::DESTROY) {
+      resourceVersion = UUID::random();
+
+      // Send an `UPDATE_STATE` after we finish the current operation.
+      dispatch(self(), &Self::sendResourceProviderStateUpdate);
+    }
+
+    status->set_state(OFFER_OPERATION_FAILED);
+    status->set_message(error->message);
+  } else {
+    status->set_state(OFFER_OPERATION_FINISHED);
+
+    foreach (const ResourceConversion& conversion, conversions.get()) {
+      foreach (const Resource& resource, conversion.converted) {
+        status->add_converted_resources()->CopyFrom(resource);
+      }
+    }
+  }
+
+  update->mutable_latest_status()->CopyFrom(*status);
+
+  auto err = [](const UUID& operationUuid, const string& message) {
+    LOG(ERROR)
+      << "Failed to send status update for offer operation " << operationUuid
+      << ": " << message;
+  };
+
+  driver->send(evolve(call))
+    .onFailed(std::bind(err, operationUuid, lambda::_1))
+    .onDiscarded(std::bind(err, operationUuid, "future discarded"));
+
+  pendingOperations.erase(operationUuid);
+  checkpointResourceProviderState();
+
+  if (error.isSome()) {
+    return error.get();
+  }
+
+  return Nothing();
+}
+
+
 void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 {
   ResourceProviderState state;
 
-  foreach (const Event::Operation& operation, pendingOperations) {
+  foreachvalue (const Event::Operation& operation, pendingOperations) {
     state.add_operations()->CopyFrom(operation);
   }
 
@@ -1851,10 +2410,9 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
 
   Call::UpdateState* update = call.mutable_update_state();
 
-  foreach (const Event::Operation& operation, pendingOperations) {
-    Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
-    CHECK_SOME(operationUuid);
-
+  foreachpair (const UUID& uuid,
+               const Event::Operation& operation,
+               pendingOperations) {
     // TODO(chhsiao): Maintain a list of terminated but unacknowledged
     // offer operations in memory and reconstruc that during recovery
     // by querying status update manager.
@@ -1868,7 +2426,7 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
                   : None()),
             operation.framework_id(),
             slaveId,
-            operationUuid.get()));
+            uuid));
   }
 
   update->mutable_resources()->CopyFrom(totalResources);


[2/3] mesos git commit: Publish resource provider resources before container launch or update.

Posted by ji...@apache.org.
Publish resource provider resources before container launch or update.

`Slave::publishAllocatedResources()` will compute the total allocated
resources for all currently running executor containers, and takes an
`extra` argument for resources that will be used by the executor that
is about to launch, then sums them up and asks the resource provider
manager to publish the resources.

Review: https://reviews.apache.org/r/63555


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e715399
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e715399
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e715399

Branch: refs/heads/master
Commit: 4e71539976013b76e85ce79d2a123a02adfb832a
Parents: 3119776
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Nov 3 18:43:30 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 7 15:07:54 2017 -0800

----------------------------------------------------------------------
 src/internal/devolve.cpp          |  13 +--
 src/internal/devolve.hpp          |   2 +-
 src/internal/evolve.cpp           |  10 +++
 src/internal/evolve.hpp           |   1 +
 src/resource_provider/manager.cpp |   9 ++
 src/slave/slave.cpp               |  72 ++++++++++++---
 src/slave/slave.hpp               |   8 ++
 src/tests/mesos.hpp               |  37 +++++++-
 src/tests/slave_tests.cpp         | 158 +++++++++++++++++++++++++++++++++
 9 files changed, 288 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 289c6e3..60a768d 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -116,16 +116,19 @@ Resource devolve(const v1::Resource& resource)
 }
 
 
-Resources devolve(const v1::Resources& resources)
+ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
 {
-  return devolve<Resource>(
-      static_cast<const RepeatedPtrField<v1::Resource>&>(resources));
+  // NOTE: We do not use the common 'devolve' call for performance.
+  ResourceProviderID id;
+  id.set_value(resourceProviderId.value());
+  return id;
 }
 
 
-ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
+Resources devolve(const v1::Resources& resources)
 {
-  return devolve<ResourceProviderID>(resourceProviderId);
+  return devolve<Resource>(
+      static_cast<const RepeatedPtrField<v1::Resource>&>(resources));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 17ab76e..9e56fd9 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -62,8 +62,8 @@ InverseOffer devolve(const v1::InverseOffer& inverseOffer);
 Offer devolve(const v1::Offer& offer);
 OfferOperationStatus devolve(const v1::OfferOperationStatus& status);
 Resource devolve(const v1::Resource& resource);
-Resources devolve(const v1::Resources& resources);
 ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId);
+Resources devolve(const v1::Resources& resources);
 SlaveID devolve(const v1::AgentID& agentId);
 SlaveInfo devolve(const v1::AgentInfo& agentInfo);
 TaskID devolve(const v1::TaskID& taskId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index f46f864..6ce6150 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -164,6 +164,16 @@ v1::Resource evolve(const Resource& resource)
 }
 
 
+v1::ResourceProviderID evolve(
+    const ResourceProviderID& resourceProviderId)
+{
+  // NOTE: We do not use the common 'devolve' call for performance.
+  v1::ResourceProviderID id;
+  id.set_value(resourceProviderId.value());
+  return id;
+}
+
+
 v1::Resources evolve(const Resources& resources)
 {
   return evolve<v1::Resource>(

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index d796f32..77b7172 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -75,6 +75,7 @@ v1::MasterInfo evolve(const MasterInfo& masterInfo);
 v1::Offer evolve(const Offer& offer);
 v1::OfferID evolve(const OfferID& offerId);
 v1::Resource evolve(const Resource& resource);
+v1::ResourceProviderID evolve(const ResourceProviderID& resourceProviderId);
 v1::Resources evolve(const Resources& resources);
 v1::Task evolve(const Task& task);
 v1::TaskID evolve(const TaskID& taskId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 2aee46e..e75d528 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -538,6 +538,10 @@ Future<Nothing> ResourceProviderManagerProcess::publish(
     ResourceProvider* resourceProvider =
       resourceProviders.subscribed.at(resourceProviderId).get();
 
+    LOG(INFO)
+      << "Sending PUBLISH event " << uuid << " with resources '" << resources
+      << "' to resource provider " << resourceProviderId;
+
     if (!resourceProvider->http.send(event)) {
       return Failure(
           "Failed to send PUBLISH event to resource provider " +
@@ -677,6 +681,11 @@ void ResourceProviderManagerProcess::updatePublishStatus(
     return;
   }
 
+  LOG(INFO)
+    << "Received UPDATE_PUBLISH_STATUS call for PUBLISH event " << uuid.get()
+    << " with " << update.status() << " status from resource provider "
+    << resourceProvider->info.id();
+
   if (update.status() == Call::UpdatePublishStatus::OK) {
     resourceProvider->publishes.at(uuid.get())->set(Nothing());
   } else {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fb077b7..1bdc9d8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2554,9 +2554,12 @@ void Slave::__run(
       LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
                 << " for executor " << *executor;
 
-      containerizer->update(
-          executor->containerId,
-          executor->allocatedResources())
+      publishResources()
+        .then(defer(self(), [=] {
+          return containerizer->update(
+              executor->containerId,
+              executor->allocatedResources());
+        }))
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -2998,11 +3001,19 @@ void Slave::launchExecutor(
             << "' of framework " << framework->id();
 
   // Launch the container.
-  containerizer->launch(
-      executor->containerId,
-      containerConfig,
-      environment,
-      pidCheckpointPath)
+  // NOTE: Since we modify the ExecutorInfo to include the task's
+  // resources when launching the executor, these resources need to be
+  // published before the containerizer preparing them. This should be
+  // revisited after MESOS-600.
+  publishResources(
+      taskInfo.isSome() ? taskInfo->resources() : Option<Resources>::none())
+    .then(defer(self(), [=] {
+      return containerizer->launch(
+          executor->containerId,
+          containerConfig,
+          environment,
+          pidCheckpointPath);
+    }))
     .onAny(defer(self(),
                  &Self::executorLaunched,
                  frameworkId,
@@ -4154,9 +4165,12 @@ void Slave::subscribe(
         }
       }
 
-      containerizer->update(
-          executor->containerId,
-          executor->allocatedResources())
+      publishResources()
+        .then(defer(self(), [=] {
+          return containerizer->update(
+              executor->containerId,
+              executor->allocatedResources());
+        }))
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -4358,9 +4372,12 @@ void Slave::registerExecutor(
         }
       }
 
-      containerizer->update(
-          executor->containerId,
-          executor->allocatedResources())
+      publishResources()
+        .then(defer(self(), [=] {
+          return containerizer->update(
+              executor->containerId,
+              executor->allocatedResources());
+        }))
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -7323,6 +7340,33 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
 }
 
 
+Future<Nothing> Slave::publishResources(
+    const Option<Resources>& additionalResources)
+{
+  Resources resources;
+
+  // NOTE: For resources providers that serve quantity-based resources
+  // without any identifiers (such as memory), it is very hard to keep
+  // track of published resources. So instead of implementing diff-based
+  // resource publishing, we implement an "ensure-all" semantics, and
+  // always calculate the total resources that need to remain published.
+  foreachvalue (const Framework* framework, frameworks) {
+    // NOTE: We do not call `framework->allocatedResource()` here
+    // because we do not want to publsh resources for pending tasks that
+    // have not been authorized yet.
+    foreachvalue (const Executor* executor, framework->executors) {
+      resources += executor->allocatedResources();
+    }
+  }
+
+  if (additionalResources.isSome()) {
+    resources += additionalResources.get();
+  }
+
+  return resourceProviderManager.publish(resources);
+}
+
+
 void Slave::qosCorrections()
 {
   qosController->corrections()

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index bbf5b79..d9b0469 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -567,6 +567,14 @@ private:
 
   void apply(const std::vector<ResourceConversion>& conversions);
 
+  // Publish all resources that are needed to run the current set of
+  // tasks and executors on the agent.
+  // NOTE: The `additionalResources` parameter is for publishing
+  // additional task resources when launching executors. Consider
+  // removing this parameter once we revisited MESOS-600.
+  process::Future<Nothing> publishResources(
+      const Option<Resources>& additionalResources = None());
+
   // Gauge methods.
   double _frameworks_active()
   {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 53890d8..657f925 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -440,6 +440,7 @@ using mesos::v1::MachineID;
 using mesos::v1::Metric;
 using mesos::v1::Offer;
 using mesos::v1::Resource;
+using mesos::v1::ResourceProviderInfo;
 using mesos::v1::Resources;
 using mesos::v1::TaskID;
 using mesos::v1::TaskInfo;
@@ -1345,7 +1346,7 @@ inline typename TOffer::Operation CREATE_VOLUME(
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::CREATE_VOLUME);
   operation.mutable_create_volume()->mutable_source()->CopyFrom(source);
-  operation.set_target_type(type);
+  operation.mutable_create_volume()->set_target_type(type);
   return operation;
 }
 
@@ -2848,6 +2849,22 @@ public:
               Operation,
               Source>::operationDefault));
     EXPECT_CALL(*this, operation(_)).WillRepeatedly(DoDefault());
+
+    ON_CALL(*this, publish(_))
+      .WillByDefault(Invoke(
+          this,
+          &MockResourceProvider<
+              Event,
+              Call,
+              Driver,
+              ResourceProviderInfo,
+              Resource,
+              Resources,
+              ResourceProviderID,
+              OfferOperationState,
+              Operation,
+              Source>::publishDefault));
+    EXPECT_CALL(*this, publish(_)).WillRepeatedly(DoDefault());
   }
 
   MOCK_METHOD0_T(connected, void());
@@ -3027,7 +3044,7 @@ public:
           ->Mutable(0)
           ->mutable_disk()
           ->mutable_source()
-          ->set_type(Source::BLOCK);
+          ->set_type(Source::RAW);
         break;
       case Operation::CREATE_BLOCK:
         update->mutable_status()->add_converted_resources()->CopyFrom(
@@ -3058,6 +3075,22 @@ public:
     driver->send(call);
   }
 
+  void publishDefault(const typename Event::Publish& publish)
+  {
+    CHECK(info.has_id());
+
+    Call call;
+    call.set_type(Call::UPDATE_PUBLISH_STATUS);
+    call.mutable_resource_provider_id()->CopyFrom(info.id());
+
+    typename Call::UpdatePublishStatus* update =
+      call.mutable_update_publish_status();
+    update->set_uuid(publish.uuid());
+    update->set_status(Call::UpdatePublishStatus::OK);
+
+    driver->send(call);
+  }
+
   ResourceProviderInfo info;
 
 private:

http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 25cfd47..6640620 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8775,6 +8775,164 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
 }
 
 
+// This test checks that before a workload (executor or task) is
+// launched, all resources from resoruce providers nended to run the
+// current set of workloads are properly published.
+TEST_F(SlaveTest, ResourceProviderPublishAll)
+{
+  // Start an agent and a master.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER
+  };
+
+  flags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    flags.agent_features->add_capabilities()->set_type(type);
+  }
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a mock local resource provider with the agent.
+  v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.local.mock");
+  resourceProviderInfo.set_name("test");
+
+  vector<v1::Resource> resources = {
+      v1::Resources::parse("disk", "4096", "role1").get(),
+      v1::Resources::parse("disk", "4096", "role2").get()
+  };
+
+  v1::MockResourceProvider resourceProvider(resourceProviderInfo, resources);
+
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  process::http::URL url(
+      scheme,
+      slave.get()->pid.address.ip,
+      slave.get()->pid.address.port,
+      slave.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  resourceProvider.start(
+      endpointDetector,
+      ContentType::PROTOBUF,
+      v1::DEFAULT_CREDENTIAL);
+
+  // We want to register two frameworks to launch two concurrent tasks
+  // that use the provider resources, and verify that when the second
+  // task is launched, all provider resources are published.
+  // NOTE: The mock schedulers and drivers are stored outside the loop
+  // to avoid implicit destruction before the test ends.
+  vector<Owned<MockScheduler>> scheds;
+  vector<Owned<MesosSchedulerDriver>> drivers;
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  for (size_t i = 0; i < resources.size(); i++) {
+    FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+    framework.set_roles(0, resources.at(i).reservations(0).role());
+
+    Owned<MockScheduler> sched(new MockScheduler());
+    Owned<MesosSchedulerDriver> driver(new MesosSchedulerDriver(
+        sched.get(), framework, master.get()->pid, DEFAULT_CREDENTIAL));
+
+    EXPECT_CALL(*sched, registered(driver.get(), _, _));
+
+    Future<vector<Offer>> offers;
+
+    // Decline unmatched offers.
+    // NOTE: This ensures that this framework do not hold the agent's
+    // default resources. Otherwise, the other one will get no offer.
+    EXPECT_CALL(*sched, resourceOffers(driver.get(), _))
+      .WillRepeatedly(DeclineOffers());
+
+    EXPECT_CALL(*sched, resourceOffers(driver.get(), OffersHaveAnyResource(
+        std::bind(&Resources::isReserved, lambda::_1, framework.roles(0)))))
+      .WillOnce(FutureArg<1>(&offers));
+
+    driver->start();
+
+    AWAIT_READY(offers);
+    ASSERT_FALSE(offers->empty());
+
+    Future<mesos::v1::resource_provider::Event::Publish> publish;
+
+    // Two PUBLISH events will be received: one for launching the
+    // executor, and the other for launching the task.
+    EXPECT_CALL(resourceProvider, publish(_))
+      .WillOnce(
+          Invoke(&resourceProvider,
+                 &v1::MockResourceProvider::publishDefault))
+      .WillOnce(DoAll(
+          FutureArg<0>(&publish),
+          Invoke(&resourceProvider,
+                 &v1::MockResourceProvider::publishDefault)));
+
+    Future<TaskStatus> taskStarting;
+    Future<TaskStatus> taskRunning;
+
+    EXPECT_CALL(*sched, statusUpdate(driver.get(), _))
+      .WillOnce(FutureArg<1>(&taskStarting))
+      .WillOnce(FutureArg<1>(&taskRunning));
+
+    // Launch a task using a provider resource.
+    driver->acceptOffers(
+        {offers->at(0).id()},
+        {LAUNCH({createTask(
+            offers->at(0).slave_id(),
+            Resources(offers->at(0).resources()).reserved(framework.roles(0)),
+            createCommandInfo("sleep 1000"))})},
+        filters);
+
+    AWAIT_READY(publish);
+
+    // Test if the resources of all running executors are published.
+    // This is checked through counting how many reservatinos there are
+    // in the published resources: one (role1) when launching the first
+    // task, two (role1, role2) when the second task is launched.
+    EXPECT_EQ(i + 1, v1::Resources(publish->resources()).reservations().size());
+
+    AWAIT_READY(taskStarting);
+    EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+    AWAIT_READY(taskRunning);
+    EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+    // Store the mock scheduler and driver to prevent destruction.
+    scheds.emplace_back(std::move(sched));
+    drivers.emplace_back(std::move(driver));
+  }
+}
+
+
 // This test checks that the agent correctly updates and sends
 // resource version values when it registers or reregisters.
 TEST_F(SlaveTest, ResourceVersions)


[3/3] mesos git commit: Unit tests for storage local resource provider.

Posted by ji...@apache.org.
Unit tests for storage local resource provider.

This patche adds two unit tests. The first one will register a framework
to issue a `CREATE_VOLUME` then a `DESTROY_VOLUME` call to destroy
an unpublised volume. The second one will register a framework to issue
`CREATE_VOLUME`, `CREATE`, `LAUNCH`, `DESTROY_VOLUME` in sequence to
create a volume and launch a task to use a persistent volume on the CSI
volume, then destroy this published volume.

Review: https://reviews.apache.org/r/63390


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea98a344
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea98a344
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea98a344

Branch: refs/heads/master
Commit: ea98a3440d00a3014a79dc88a0cc122806577de3
Parents: 4e71539
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu Oct 26 15:52:03 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 7 15:07:54 2017 -0800

----------------------------------------------------------------------
 src/Makefile.am                                 |   3 +-
 src/examples/test_csi_plugin.cpp                |  61 ++-
 src/master/validation.cpp                       |  10 +-
 src/tests/mesos.hpp                             |  38 ++
 .../storage_local_resource_provider_tests.cpp   | 530 +++++++++++++++++++
 5 files changed, 620 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index be105f4..f948960 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2613,7 +2613,8 @@ if ENABLE_GRPC
 mesos_tests_SOURCES +=						\
   tests/csi_client_tests.cpp					\
   tests/mock_csi_plugin.cpp					\
-  tests/mock_csi_plugin.hpp
+  tests/mock_csi_plugin.hpp					\
+  tests/storage_local_resource_provider_tests.cpp
 endif
 
 if ENABLE_SSL

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 8ab936b..6359db8 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -241,7 +241,7 @@ Status TestCSIPlugin::GetSupportedVersions(
     const csi::GetSupportedVersionsRequest* request,
     csi::GetSupportedVersionsResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   response->add_supported_versions()->CopyFrom(version);
 
@@ -254,7 +254,7 @@ Status TestCSIPlugin::GetPluginInfo(
     const csi::GetPluginInfoRequest* request,
     csi::GetPluginInfoResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {
@@ -273,7 +273,7 @@ Status TestCSIPlugin::CreateVolume(
     const csi::CreateVolumeRequest* request,
     csi::CreateVolumeResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
@@ -337,6 +337,8 @@ Status TestCSIPlugin::CreateVolume(
 
   response->mutable_volume_info()->set_id(volume.id);
   response->mutable_volume_info()->set_capacity_bytes(volume.size.bytes());
+  (*response->mutable_volume_info()->mutable_attributes())["path"] =
+    getVolumePath(volume);
 
   if (alreadyExists) {
     return Status(
@@ -353,7 +355,7 @@ Status TestCSIPlugin::DeleteVolume(
     const csi::DeleteVolumeRequest* request,
     csi::DeleteVolumeResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
@@ -391,7 +393,7 @@ Status TestCSIPlugin::ControllerPublishVolume(
     const csi::ControllerPublishVolumeRequest* request,
     csi::ControllerPublishVolumeResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
@@ -406,6 +408,14 @@ Status TestCSIPlugin::ControllerPublishVolume(
         "Volume '" + request->volume_id() + "' is not found");
   }
 
+  const Volume& volume = volumes.at(request->volume_id());
+  const string path = getVolumePath(volume);
+
+  auto it = request->volume_attributes().find("path");
+  if (it == request->volume_attributes().end() || it->second != path) {
+    return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+  }
+
   if (request->node_id() != NODE_ID) {
     return Status(
         grpc::NOT_FOUND,
@@ -422,7 +432,7 @@ Status TestCSIPlugin::ControllerUnpublishVolume(
     const csi::ControllerUnpublishVolumeRequest* request,
     csi::ControllerUnpublishVolumeResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
@@ -453,7 +463,7 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
     const csi::ValidateVolumeCapabilitiesRequest* request,
     csi::ValidateVolumeCapabilitiesResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
@@ -468,6 +478,14 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
         "Volume '" + request->volume_id() + "' is not found");
   }
 
+  const Volume& volume = volumes.at(request->volume_id());
+  const string path = getVolumePath(volume);
+
+  auto it = request->volume_attributes().find("path");
+  if (it == request->volume_attributes().end() || it->second != path) {
+    return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+  }
+
   foreach (const csi::VolumeCapability& capability,
            request->volume_capabilities()) {
     if (!capability.has_mount()) {
@@ -497,7 +515,7 @@ Status TestCSIPlugin::ListVolumes(
     const csi::ListVolumesRequest* request,
     csi::ListVolumesResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {
@@ -529,7 +547,7 @@ Status TestCSIPlugin::GetCapacity(
     const csi::GetCapacityRequest* request,
     csi::GetCapacityResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {
@@ -563,7 +581,7 @@ Status TestCSIPlugin::ControllerProbe(
     const csi::ControllerProbeRequest* request,
     csi::ControllerProbeResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {
@@ -579,7 +597,7 @@ Status TestCSIPlugin::ControllerGetCapabilities(
     const csi::ControllerGetCapabilitiesRequest* request,
     csi::ControllerGetCapabilitiesResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {
@@ -604,7 +622,7 @@ Status TestCSIPlugin::NodePublishVolume(
     const csi::NodePublishVolumeRequest* request,
     csi::NodePublishVolumeResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
@@ -619,6 +637,14 @@ Status TestCSIPlugin::NodePublishVolume(
         "Volume '" + request->volume_id() + "' is not found");
   }
 
+  const Volume& volume = volumes.at(request->volume_id());
+  const string path = getVolumePath(volume);
+
+  auto it = request->volume_attributes().find("path");
+  if (it == request->volume_attributes().end() || it->second != path) {
+    return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+  }
+
   if (!os::exists(request->target_path())) {
     return Status(
         grpc::INVALID_ARGUMENT,
@@ -638,9 +664,6 @@ Status TestCSIPlugin::NodePublishVolume(
     }
   }
 
-  const Volume& volume = volumes.at(request->volume_id());
-  const string path = getVolumePath(volume);
-
   Try<Nothing> mount = fs::mount(
       path,
       request->target_path(),
@@ -678,7 +701,7 @@ Status TestCSIPlugin::NodeUnpublishVolume(
     const csi::NodeUnpublishVolumeRequest* request,
     csi::NodeUnpublishVolumeResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {
@@ -730,7 +753,7 @@ Status TestCSIPlugin::GetNodeID(
     const csi::GetNodeIDRequest* request,
     csi::GetNodeIDResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {
@@ -748,7 +771,7 @@ Status TestCSIPlugin::NodeProbe(
     const csi::NodeProbeRequest* request,
     csi::NodeProbeResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {
@@ -764,7 +787,7 @@ Status TestCSIPlugin::NodeGetCapabilities(
     const csi::NodeGetCapabilitiesRequest* request,
     csi::NodeGetCapabilitiesResponse* response)
 {
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   Option<Error> error = validateVersion(request->version());
   if (error.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index bf7ae65..38d9a3c 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2236,8 +2236,14 @@ Option<Error> validate(
     return Error("Not a persistent volume: " + error.get().message);
   }
 
-  if (!checkpointedResources.contains(volumes)) {
-    return Error("Persistent volumes not found");
+  foreach (const Resource volume, volumes) {
+    if (Resources::hasResourceProvider(volume)) {
+      continue;
+    }
+
+    if (!checkpointedResources.contains(volume)) {
+      return Error("Persistent volumes not found");
+    }
   }
 
   // Ensure the volumes being destroyed are not in use currently.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 657f925..76fbd90 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -3449,6 +3449,44 @@ void ExpectNoFutureUnionHttpProtobufs(
 }
 
 
+// This matcher is used to match a vector of resource offers that
+// contains an offer having any resource that passes the filter.
+MATCHER_P(OffersHaveAnyResource, filter, "")
+{
+  foreach (const Offer& offer, arg) {
+    foreach (const Resource& resource, offer.resources()) {
+      if (filter(resource)) {
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
+
+// This matcher is used to match a vector of resource offers that
+// contains an offer having the specified resource.
+MATCHER_P(OffersHaveResource, resource, "")
+{
+  foreach (const Offer& offer, arg) {
+    Resources resources = offer.resources();
+
+    // If `resource` is not allocated, we are matching offers against
+    // resources constructed from scratch, so we strip off allocations.
+    if (!resource.has_allocation_info()) {
+      resources.unallocate();
+    }
+
+    if (resources.contains(resource)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
 // This matcher is used to match the task id of `TaskStatus` message.
 MATCHER_P(TaskStatusTaskIdEq, taskInfo, "")
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
new file mode 100644
index 0000000..0f0c627
--- /dev/null
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+#include <process/gmock.hpp>
+
+#include <stout/hashmap.hpp>
+
+#include "tests/flags.hpp"
+#include "tests/mesos.hpp"
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+using mesos::master::detector::MasterDetector;
+
+using process::Future;
+using process::Owned;
+
+using testing::Args;
+using testing::Sequence;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class StorageLocalResourceProviderTest : public MesosTest
+{
+public:
+  virtual void SetUp()
+  {
+    MesosTest::SetUp();
+
+    const string testCsiPluginWorkDir = path::join(sandbox.get(), "test");
+    ASSERT_SOME(os::mkdir(testCsiPluginWorkDir));
+
+    resourceProviderConfigDir =
+      path::join(sandbox.get(), "resource_provider_configs");
+
+    ASSERT_SOME(os::mkdir(resourceProviderConfigDir));
+
+    string testCsiPluginPath =
+      path::join(tests::flags.build_dir, "src", "test-csi-plugin");
+
+    Try<string> resourceProviderConfig = strings::format(
+        R"~(
+        {
+          "type": "org.apache.mesos.rp.local.storage",
+          "name": "test",
+          "default_reservations": [
+            {
+              "type": "STATIC",
+              "role": "storage"
+            }
+          ],
+          "storage": {
+            "plugin": {
+              "type": "org.apache.mesos.csi.test",
+              "name": "slrp_test",
+              "containers": [
+                {
+                  "services": [
+                    "CONTROLLER_SERVICE",
+                    "NODE_SERVICE"
+                  ],
+                  "command": {
+                    "shell": false,
+                    "value": "%s",
+                    "arguments": [
+                      "%s",
+                      "--total_capacity=4GB",
+                      "--work_dir=%s"
+                    ]
+                  }
+                }
+              ]
+            }
+          }
+        }
+        )~",
+        testCsiPluginPath,
+        testCsiPluginPath,
+        testCsiPluginWorkDir);
+
+    ASSERT_SOME(resourceProviderConfig);
+
+    ASSERT_SOME(os::write(
+        path::join(resourceProviderConfigDir, "test.json"),
+        resourceProviderConfig.get()));
+  }
+
+protected:
+  string resourceProviderConfigDir;
+};
+
+
+// This test verifies that a framework can create then destroy a volume
+// from the resources provided by a storage local resource provider that
+// uses the test CSI plugin.
+TEST_F(StorageLocalResourceProviderTest, ROOT_CreateVolumeAndDestroyVolume)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER
+  };
+
+  flags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    flags.agent_features->add_capabilities()->set_type(type);
+  }
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise offer operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
+  //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+  //   3. One containing a RAW disk resource after `DSTROY_VOLUME`.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> volumeDestroyedOffers;
+
+  Sequence offers;
+
+  auto isSourceType = [](
+      const Resource& r, const Resource::DiskInfo::Source::Type& type) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == type;
+  };
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_FALSE(rawDiskOffers->empty());
+
+  Option<Resource> source;
+
+  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+    if (resource.has_disk() &&
+        resource.disk().has_source() &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+      source = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+
+  // Create a volume.
+  driver.acceptOffers(
+      {rawDiskOffers->at(0).id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      filters);
+
+  AWAIT_READY(volumeCreatedOffers);
+  ASSERT_FALSE(volumeCreatedOffers->empty());
+
+  Option<Resource> volume;
+
+  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+    if (resource.has_disk() &&
+        resource.disk().has_source() &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
+      volume = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(volume);
+  ASSERT_TRUE(volume->disk().source().has_id());
+  ASSERT_TRUE(volume->disk().source().has_metadata());
+  ASSERT_TRUE(volume->disk().source().has_mount());
+  ASSERT_TRUE(volume->disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+  // Check if the volume is actually created by the test CSI plugin.
+  Option<string> volumePath;
+
+  foreach (const Label& label, volume->disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
+    }
+  }
+
+  ASSERT_SOME(volumePath);
+  EXPECT_TRUE(os::exists(volumePath.get()));
+
+  // Destroy the created volume.
+  driver.acceptOffers(
+      {volumeCreatedOffers->at(0).id()},
+      {DESTROY_VOLUME(volume.get())},
+      filters);
+
+  AWAIT_READY(volumeDestroyedOffers);
+  ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+  Option<Resource> destroyed;
+
+  foreach (const Resource& resource, volumeDestroyedOffers->at(0).resources()) {
+    if (resource.has_disk() &&
+        resource.disk().has_source() &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+      destroyed = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(destroyed);
+  ASSERT_FALSE(destroyed->disk().source().has_id());
+  ASSERT_FALSE(destroyed->disk().source().has_metadata());
+  ASSERT_FALSE(destroyed->disk().source().has_mount());
+
+  // Check if the volume is actually deleted by the test CSI plugin.
+  EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+
+// This test verifies that a framework can launch a task using a created
+// volume provided by a storage local resource provider that uses the
+// test CSI plugin, then destroy the volume while it is published.
+TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchAndDestroyVolume)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER
+  };
+
+  flags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    flags.agent_features->add_capabilities()->set_type(type);
+  }
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise offer operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
+  //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+  //   3. One containing a persistent volume after `CREATE`.
+  //   4. One containing the same persistent volume after `LAUNCH`.
+  //   5. One containing the same MOUNT disk resource after `DESTROY`.
+  //   6. One containing the same RAW disk resource after `DESTROY_VOLUME`.
+  //
+  // We set up the expectations for these offers as the test progresses.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> persistenceCreatedOffers;
+  Future<vector<Offer>> taskFinishedOffers;
+  Future<vector<Offer>> persistenceDestroyedOffers;
+  Future<vector<Offer>> volumeDestroyedOffers;
+
+  Sequence offers;
+
+  auto isSourceType = [](
+      const Resource& r, const Resource::DiskInfo::Source::Type& type) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == type;
+  };
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_FALSE(rawDiskOffers->empty());
+
+  Option<Resource> source;
+
+  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+    if (resource.has_disk() &&
+        resource.disk().has_source() &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+      source = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+
+  // Create a volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+  driver.acceptOffers(
+      {rawDiskOffers->at(0).id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      filters);
+
+  AWAIT_READY(volumeCreatedOffers);
+  ASSERT_FALSE(volumeCreatedOffers->empty());
+
+  Option<Resource> volume;
+
+  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+    if (resource.has_disk() &&
+        resource.disk().has_source() &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
+      volume = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(volume);
+  ASSERT_TRUE(volume->disk().source().has_id());
+  ASSERT_TRUE(volume->disk().source().has_metadata());
+  ASSERT_TRUE(volume->disk().source().has_mount());
+  ASSERT_TRUE(volume->disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+  // Check if the volume is actually created by the test CSI plugin.
+  Option<string> volumePath;
+
+  foreach (const Label& label, volume->disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
+    }
+  }
+
+  ASSERT_SOME(volumePath);
+  EXPECT_TRUE(os::exists(volumePath.get()));
+
+  // Put a file into the volume.
+  ASSERT_SOME(os::touch(path::join(volumePath.get(), "file")));
+
+  // Create a persistent volume on the CSI volume.
+  Resource persistentVolume = volume.get();
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_id(UUID::random().toString());
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_principal(framework.principal());
+  persistentVolume.mutable_disk()->mutable_volume()
+    ->set_container_path("volume");
+  persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
+      persistentVolume)))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&persistenceCreatedOffers));
+
+  driver.acceptOffers(
+      {volumeCreatedOffers->at(0).id()},
+      {CREATE(persistentVolume)},
+      filters);
+
+  AWAIT_READY(persistenceCreatedOffers);
+  ASSERT_FALSE(persistenceCreatedOffers->empty());
+
+  // Launch a task to use the persistent volume.
+  Future<TaskStatus> taskStarting;
+  Future<TaskStatus> taskRunning;
+  Future<TaskStatus> taskFinished;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&taskStarting))
+    .WillOnce(FutureArg<1>(&taskRunning))
+    .WillOnce(FutureArg<1>(&taskFinished));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
+      persistentVolume)))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&taskFinishedOffers));
+
+  driver.acceptOffers(
+      {persistenceCreatedOffers->at(0).id()},
+      {LAUNCH({createTask(
+           persistenceCreatedOffers->at(0).slave_id(),
+           persistentVolume,
+           createCommandInfo("test -f " + path::join("volume", "file")))})},
+      filters);
+
+  AWAIT_READY(taskStarting);
+  EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+  AWAIT_READY(taskRunning);
+  EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+  AWAIT_READY(taskFinished);
+  EXPECT_EQ(TASK_FINISHED, taskFinished->state());
+
+  AWAIT_READY(taskFinishedOffers);
+
+  // Destroy the persistent volume on the CSI volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(volume.get())))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&persistenceDestroyedOffers));
+
+  driver.acceptOffers(
+      {taskFinishedOffers->at(0).id()},
+      {DESTROY(persistentVolume)},
+      filters);
+
+  AWAIT_READY(persistenceDestroyedOffers);
+
+  // Destroy the created volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get())))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+  driver.acceptOffers(
+      {persistenceDestroyedOffers->at(0).id()},
+      {DESTROY_VOLUME(volume.get())},
+      filters);
+
+  AWAIT_READY(volumeDestroyedOffers);
+  ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+  // Check if the volume is actually deleted by the test CSI plugin.
+  EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {