You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/07 23:46:00 UTC
[1/3] mesos git commit: Handling offer operations in storage local
resource provider.
Repository: mesos
Updated Branches:
refs/heads/master 31d718e39 -> ea98a3440
Handling offer operations in storage local resource provider.
For legacy operations, we just call Resources::apply(). New operations
CREATE_VOLUME, DESTROY_VOLUME, CREATE_BLOCK, DESTROY_BLOCK are
implemented through CSI. Specially, DESTROY_* requires unpublishing
the resources first.
Review: https://reviews.apache.org/r/63388
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31197766
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31197766
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31197766
Branch: refs/heads/master
Commit: 31197766768b3122807ea6dcff4cd313a0bbd5dc
Parents: 31d718e
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu Nov 30 16:37:05 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 7 15:07:53 2017 -0800
----------------------------------------------------------------------
src/resource_provider/storage/provider.cpp | 580 +++++++++++++++++++++++-
1 file changed, 569 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/31197766/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a2794ac..a029421 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -41,6 +41,7 @@
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
+#include <stout/linkedhashmap.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
@@ -48,6 +49,7 @@
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
#include "csi/client.hpp"
#include "csi/paths.hpp"
@@ -70,7 +72,9 @@ using std::accumulate;
using std::find;
using std::list;
using std::queue;
+using std::shared_ptr;
using std::string;
+using std::vector;
using process::Break;
using process::Continue;
@@ -88,6 +92,7 @@ using process::collect;
using process::defer;
using process::loop;
using process::spawn;
+using process::undiscardable;
using process::http::authentication::Principal;
@@ -343,6 +348,29 @@ private:
Future<Nothing> nodePublish(const string& volumeId);
Future<Nothing> nodeUnpublish(const string& volumeId);
+ // Returns a CSI volume ID.
+ Future<string> createVolume(
+ const string& name,
+ const Bytes& capacity,
+ const ProfileData& profile);
+ Future<Nothing> deleteVolume(const string& volumeId);
+
+ // Applies the offer operation. Conventional operations will be
+ // synchoronusly applied.
+ Future<Nothing> applyOfferOperation(const UUID& operationUuid);
+
+ Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
+ const Resource& resource,
+ const UUID& operationUuid,
+ const Resource::DiskInfo::Source::Type& type);
+ Future<vector<ResourceConversion>> applyDestroyVolumeOrBlock(
+ const Resource& resource);
+
+ // Synchronously update `totalResources` and the offer operation status.
+ Try<Nothing> applyResourceConversions(
+ const UUID& operationUuid,
+ const Try<vector<ResourceConversion>>& conversions);
+
void checkpointResourceProviderState();
void sendResourceProviderStateUpdate();
void checkpointVolumeState(const string& volumeId);
@@ -380,7 +408,10 @@ private:
Option<csi::ControllerCapabilities> controllerCapabilities;
Option<string> nodeId;
- list<Event::Operation> pendingOperations;
+ // NOTE: We store the list of pending operations in a `LinkedHashMap`
+ // to preserve the order we receive the operations. This is useful
+ // when we replay depending operations during recovery.
+ LinkedHashMap<UUID, Event::Operation> pendingOperations;
Resources totalResources;
Option<UUID> resourceVersion;
hashmap<string, VolumeData> volumes;
@@ -558,7 +589,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
if (resourceProviderState.isSome()) {
foreach (const Event::Operation& operation,
resourceProviderState->operations()) {
- pendingOperations.push_back(operation);
+ Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+ CHECK_SOME(uuid);
+
+ pendingOperations[uuid.get()] = operation;
}
totalResources = resourceProviderState->resources();
@@ -571,6 +605,23 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
}
}
+ // We replay all pending operations here, so that if a volume is
+ // actually created before the last failover, it will be reflected
+ // in the updated total resources before we do the reconciliation.
+ // NOTE: `applyOfferOperation` will remove the applied operation
+ // from the list of pending operations, so we make a copy of keys
+ // here.
+ foreach (const UUID& uuid, pendingOperations.keys()) {
+ applyOfferOperation(uuid)
+ .onAny(defer(self(), [=](const Future<Nothing>& future) {
+ if (!future.isReady()) {
+ LOG(ERROR)
+ << "Failed to apply operation " << uuid << ": "
+ << (future.isFailed() ? future.failure() : "future discarded");
+ }
+ }));
+ }
+
state = DISCONNECTED;
driver.reset(new Driver(
@@ -917,6 +968,8 @@ void StorageLocalResourceProviderProcess::subscribed(
void StorageLocalResourceProviderProcess::operation(
const Event::Operation& operation)
{
+ Future<Resources> converted;
+
if (state == SUBSCRIBED) {
// TODO(chhsiao): Reject this operation.
return;
@@ -924,8 +977,23 @@ void StorageLocalResourceProviderProcess::operation(
CHECK_EQ(READY, state);
- pendingOperations.push_back(operation);
+ LOG(INFO) << "Received " << operation.info().type() << " operation";
+
+ Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+ CHECK_SOME(uuid);
+
+ CHECK(!pendingOperations.contains(uuid.get()));
+ pendingOperations[uuid.get()] = operation;
checkpointResourceProviderState();
+
+ applyOfferOperation(uuid.get())
+ .onAny(defer(self(), [=](const Future<Nothing>& future) {
+ if (!future.isReady()) {
+ LOG(ERROR)
+ << "Failed to apply " << operation.info().type() << " operation: "
+ << (future.isFailed() ? future.failure() : "future discarded");
+ }
+ }));
}
@@ -1065,14 +1133,14 @@ void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish)
void StorageLocalResourceProviderProcess::acknowledgeOfferOperation(
const Event::AcknowledgeOfferOperation& acknowledge)
{
- CHECK_EQ(SUBSCRIBED, state);
+ CHECK_EQ(READY, state);
}
void StorageLocalResourceProviderProcess::reconcileOfferOperations(
const Event::ReconcileOfferOperations& reconcile)
{
- CHECK_EQ(SUBSCRIBED, state);
+ CHECK_EQ(READY, state);
}
@@ -1822,11 +1890,502 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
}
+Future<string> StorageLocalResourceProviderProcess::createVolume(
+ const string& name,
+ const Bytes& capacity,
+ const ProfileData& profile)
+{
+ // NOTE: This can only be called after `prepareControllerService`.
+ CHECK_SOME(controllerCapabilities);
+
+ if (!controllerCapabilities->createDeleteVolume) {
+ return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+ }
+
+ return getService(controllerContainerId)
+ .then(defer(self(), [=](csi::Client client) {
+ csi::CreateVolumeRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+ request.set_name(name);
+ request.mutable_capacity_range()
+ ->set_required_bytes(capacity.bytes());
+ request.mutable_capacity_range()
+ ->set_limit_bytes(capacity.bytes());
+ request.add_volume_capabilities()->CopyFrom(profile.capability);
+ *request.mutable_parameters() = profile.parameters;
+
+ return client.CreateVolume(request)
+ .then(defer(self(), [=](const csi::CreateVolumeResponse& response) {
+ const csi::VolumeInfo& volumeInfo = response.volume_info();
+
+ if (volumes.contains(volumeInfo.id())) {
+ // The resource provider failed over after the last
+ // `CreateVolume` call, but before the operation status
+ // was checkpointed.
+ CHECK_EQ(csi::state::VolumeState::CREATED,
+ volumes.at(volumeInfo.id()).state.state());
+ } else {
+ csi::state::VolumeState volumeState;
+ volumeState.set_state(csi::state::VolumeState::CREATED);
+ volumeState.mutable_volume_capability()
+ ->CopyFrom(profile.capability);
+ *volumeState.mutable_volume_attributes() =
+ volumeInfo.attributes();
+
+ volumes.put(volumeInfo.id(), std::move(volumeState));
+ checkpointVolumeState(volumeInfo.id());
+ }
+
+ return volumeInfo.id();
+ }));
+ }));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
+ const string& volumeId)
+{
+ // NOTE: This can only be called after `prepareControllerService` and
+ // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
+ CHECK_SOME(controllerCapabilities);
+ CHECK_SOME(nodeId);
+
+ if (!controllerCapabilities->createDeleteVolume) {
+ return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+ }
+
+ const string volumePath = csi::paths::getVolumePath(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name(),
+ volumeId);
+
+ Future<Nothing> deleted = Nothing();
+
+ if (volumes.contains(volumeId)) {
+ // NOTE: We don't break for `PUBLISHED` and `NODE_READY` as deleting
+ // the volume in these states needs all operations beneath it.
+ switch (volumes.at(volumeId).state.state()) {
+ case csi::state::VolumeState::PUBLISHED: {
+ deleted = deleted
+ .then(defer(self(), &Self::nodeUnpublish, volumeId));
+ }
+ case csi::state::VolumeState::NODE_READY: {
+ deleted = deleted
+ .then(defer(self(), &Self::controllerUnpublish, volumeId));
+ }
+ case csi::state::VolumeState::CREATED: {
+ deleted = deleted
+ .then(defer(self(), &Self::getService, controllerContainerId))
+ .then(defer(self(), [=](csi::Client client) {
+ csi::DeleteVolumeRequest request;
+ request.mutable_version()->CopyFrom(csiVersion);
+ request.set_volume_id(volumeId);
+
+ return client.DeleteVolume(request)
+ .then(defer(self(), [=] {
+ // NOTE: This will destruct the volume's sequence!
+ volumes.erase(volumeId);
+ CHECK_SOME(os::rmdir(volumePath));
+
+ return Nothing();
+ }));
+ }));
+ break;
+ }
+ case csi::state::VolumeState::UNKNOWN:
+ case csi::state::VolumeState::CONTROLLER_PUBLISH:
+ case csi::state::VolumeState::CONTROLLER_UNPUBLISH:
+ case csi::state::VolumeState::NODE_PUBLISH:
+ case csi::state::VolumeState::NODE_UNPUBLISH:
+ case google::protobuf::kint32min:
+ case google::protobuf::kint32max: {
+ UNREACHABLE();
+ }
+ }
+ } else {
+ // The resource provider failed over after the last `DeleteVolume`
+ // call, but before the operation status was checkpointed.
+ CHECK(!os::exists(volumePath));
+ }
+
+ // NOTE: We make the returned future undiscardable because the
+ // deletion may cause the volume's sequence to be destructed, which
+ // will in turn discard all futures in the sequence.
+ return undiscardable(deleted);
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::applyOfferOperation(
+ const UUID& operationUuid)
+{
+ Future<vector<ResourceConversion>> conversions;
+ Option<Error> error;
+
+ CHECK(pendingOperations.contains(operationUuid));
+ const Event::Operation& operation = pendingOperations.at(operationUuid);
+
+ Try<UUID> operationVersion =
+ UUID::fromBytes(operation.resource_version_uuid());
+ CHECK_SOME(operationVersion);
+
+ if (resourceVersion.get() != operationVersion.get()) {
+ error = Error(
+ "Mismatched resource version " + stringify(operationVersion.get()) +
+ " (expected: " + stringify(resourceVersion.get()) + ")");
+ }
+
+ switch (operation.info().type()) {
+ case Offer::Operation::RESERVE:
+ case Offer::Operation::UNRESERVE:
+ case Offer::Operation::CREATE:
+ case Offer::Operation::DESTROY: {
+ // Synchronously apply the conventional operations.
+ return applyResourceConversions(
+ operationUuid,
+ error.isNone()
+ ? getResourceConversions(operation.info())
+ : Try<vector<ResourceConversion>>::error(error.get()));
+ }
+ case Offer::Operation::CREATE_VOLUME: {
+ CHECK(operation.info().has_create_volume());
+
+ if (error.isNone()) {
+ conversions = applyCreateVolumeOrBlock(
+ operation.info().create_volume().source(),
+ operationUuid,
+ operation.info().create_volume().target_type());
+ } else {
+ conversions = Failure(error.get());
+ }
+ break;
+ }
+ case Offer::Operation::DESTROY_VOLUME: {
+ CHECK(operation.info().has_destroy_volume());
+
+ if (error.isNone()) {
+ conversions = applyDestroyVolumeOrBlock(
+ operation.info().destroy_volume().volume());
+ } else {
+ conversions = Failure(error.get());
+ }
+ break;
+ }
+ case Offer::Operation::CREATE_BLOCK: {
+ CHECK(operation.info().has_create_block());
+
+ if (error.isNone()) {
+ conversions = applyCreateVolumeOrBlock(
+ operation.info().create_block().source(),
+ operationUuid,
+ Resource::DiskInfo::Source::BLOCK);
+ } else {
+ conversions = Failure(error.get());
+ }
+ break;
+ }
+ case Offer::Operation::DESTROY_BLOCK: {
+ CHECK(operation.info().has_destroy_block());
+
+ if (error.isNone()) {
+ conversions = applyDestroyVolumeOrBlock(
+ operation.info().destroy_block().block());
+ } else {
+ conversions = Failure(error.get());
+ }
+ break;
+ }
+ case Offer::Operation::UNKNOWN:
+ case Offer::Operation::LAUNCH:
+ case Offer::Operation::LAUNCH_GROUP: {
+ UNREACHABLE();
+ }
+ }
+
+ // NOTE: The code below is executed only when applying a storage operation.
+ shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
+
+ conversions
+ .onAny(defer(self(), [=](const Future<vector<ResourceConversion>>& future) {
+ Option<Error> error;
+ if (!future.isReady()) {
+ error =
+ Error(future.isFailed() ? future.failure() : "future discarded");
+ }
+
+ if (future.isReady()) {
+ LOG(INFO)
+ << "Applying conversion from '" << future->at(0).consumed << "' to '"
+ << future->at(0).converted << "'";
+ } else {
+ LOG(ERROR)
+ << "Failed to apply " << operation.info().type() << " operation: "
+ << error->message;
+ }
+
+ promise->associate(applyResourceConversions(
+ operationUuid,
+ error.isNone()
+ ? future.get()
+ : Try<vector<ResourceConversion>>::error(error.get())));
+ }));
+
+ return promise->future();
+}
+
+
+Future<vector<ResourceConversion>>
+StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
+ const Resource& resource,
+ const UUID& operationUuid,
+ const Resource::DiskInfo::Source::Type& type)
+{
+ if (resource.disk().source().type() != Resource::DiskInfo::Source::RAW) {
+ return Failure(
+ "Cannot create volume from source of " +
+ stringify(resource.disk().source().type()) + " type");
+ }
+
+ switch (type) {
+ case Resource::DiskInfo::Source::PATH:
+ case Resource::DiskInfo::Source::MOUNT: {
+ if (!profiles.at(resource.disk().source().profile())
+ .capability.has_mount()) {
+ return Failure(
+ "Profile '" + resource.disk().source().profile() +
+ "' cannot be used for CREATE_VOLUME operation");
+ }
+ break;
+ }
+ case Resource::DiskInfo::Source::BLOCK: {
+ if (!profiles.at(resource.disk().source().profile())
+ .capability.has_block()) {
+ return Failure(
+ "Profile '" + resource.disk().source().profile() +
+ "' cannot be used for CREATE_BLOCK operation");
+ }
+ break;
+ }
+ case Resource::DiskInfo::Source::UNKNOWN:
+ case Resource::DiskInfo::Source::RAW: {
+ UNREACHABLE();
+ }
+ }
+
+ Future<string> created;
+
+ if (resource.disk().source().has_id()) {
+ // Preprovisioned volumes with RAW type.
+ // TODO(chhsiao): Call `ValidateVolumeCapabilities` sequentially
+ // once we use the profile module and make profile optional.
+ CHECK(volumes.contains(resource.disk().source().id()));
+ created = resource.disk().source().id();
+ } else {
+ // We use the operation UUID as the name of the volume, so the same
+ // operation will create the same volume after recovery.
+ // TODO(chhsiao): Call `CreateVolume` sequentially with other create
+ // or delete operations.
+ // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources.
+ created = createVolume(
+ operationUuid.toString(),
+ resource.scalar().value(),
+ profiles.at(resource.disk().source().profile()));
+ }
+
+ return created
+ .then(defer(self(), [=](const string& volumeId) {
+ CHECK(volumes.contains(volumeId));
+ const csi::state::VolumeState& volumeState = volumes.at(volumeId).state;
+
+ Resource converted = resource;
+ converted.mutable_disk()->mutable_source()->set_id(volumeId);
+ converted.mutable_disk()->mutable_source()->set_type(type);
+
+ if (!volumeState.volume_attributes().empty()) {
+ converted.mutable_disk()->mutable_source()->mutable_metadata()
+ ->CopyFrom(convertStringMapToLabels(volumeState.volume_attributes()));
+ }
+
+ const string mountPath = csi::paths::getMountPath(
+ slave::paths::getCsiRootDir("."),
+ info.storage().plugin().type(),
+ info.storage().plugin().name(),
+ volumeId);
+
+ switch (type) {
+ case Resource::DiskInfo::Source::PATH: {
+ // Set the root path relative to agent work dir.
+ converted.mutable_disk()->mutable_source()->mutable_path()
+ ->set_root(mountPath);
+ break;
+ }
+ case Resource::DiskInfo::Source::MOUNT: {
+ // Set the root path relative to agent work dir.
+ converted.mutable_disk()->mutable_source()->mutable_mount()
+ ->set_root(mountPath);
+ break;
+ }
+ case Resource::DiskInfo::Source::BLOCK: {
+ break;
+ }
+ case Resource::DiskInfo::Source::UNKNOWN:
+ case Resource::DiskInfo::Source::RAW: {
+ UNREACHABLE();
+ }
+ }
+
+ vector<ResourceConversion> conversions;
+ conversions.emplace_back(resource, std::move(converted));
+
+ return conversions;
+ }));
+}
+
+
+Future<vector<ResourceConversion>>
+StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
+ const Resource& resource)
+{
+ switch (resource.disk().source().type()) {
+ case Resource::DiskInfo::Source::PATH:
+ case Resource::DiskInfo::Source::MOUNT:
+ case Resource::DiskInfo::Source::BLOCK: {
+ break;
+ }
+ case Resource::DiskInfo::Source::UNKNOWN:
+ case Resource::DiskInfo::Source::RAW: {
+ return Failure(
+ "Cannot delete volume of " +
+ stringify(resource.disk().source().type()) + " type");
+ break;
+ }
+ }
+
+ CHECK(resource.disk().source().has_id());
+ CHECK(volumes.contains(resource.disk().source().id()));
+
+ // Sequentialize the deletion with other operation on the same volume.
+ return volumes.at(resource.disk().source().id()).sequence->add(
+ std::function<Future<Nothing>()>(
+ defer(self(), &Self::deleteVolume, resource.disk().source().id())))
+ .then(defer(self(), [=]() {
+ Resource converted = resource;
+ converted.mutable_disk()->mutable_source()->clear_id();
+ converted.mutable_disk()->mutable_source()->clear_metadata();
+ converted.mutable_disk()->mutable_source()->set_type(
+ Resource::DiskInfo::Source::RAW);
+ converted.mutable_disk()->mutable_source()->clear_path();
+ converted.mutable_disk()->mutable_source()->clear_mount();
+
+ vector<ResourceConversion> conversions;
+ conversions.emplace_back(resource, std::move(converted));
+
+ return conversions;
+ }));
+}
+
+
+Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions(
+ const UUID& operationUuid,
+ const Try<vector<ResourceConversion>>& conversions)
+{
+ Option<Error> error;
+
+ CHECK(pendingOperations.contains(operationUuid));
+ const Event::Operation& operation = pendingOperations.at(operationUuid);
+
+ if (conversions.isSome()) {
+ // Strip away the allocation info when applying the convertion to
+ // the total resources.
+ vector<ResourceConversion> _conversions;
+ foreach (ResourceConversion conversion, conversions.get()) {
+ conversion.consumed.unallocate();
+ conversion.converted.unallocate();
+ _conversions.push_back(std::move(conversion));
+ }
+
+ Try<Resources> result = totalResources.apply(_conversions);
+ if (result.isSome()) {
+ totalResources = result.get();
+ } else {
+ error = result.error();
+ }
+ } else {
+ error = conversions.error();
+ }
+
+ // We first ask the status update manager to checkpoint the operation
+ // status, then checkpoint the resource provider state.
+ // TODO(chhsiao): Use the status update manager.
+ Call call;
+ call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(info.id());
+
+ Call::UpdateOfferOperationStatus* update =
+ call.mutable_update_offer_operation_status();
+ update->mutable_framework_id()->CopyFrom(operation.framework_id());
+ update->set_operation_uuid(operation.operation_uuid());
+
+ OfferOperationStatus* status = update->mutable_status();
+ status->set_status_uuid(UUID::random().toBytes());
+
+ if (operation.info().has_id()) {
+ status->mutable_operation_id()->CopyFrom(operation.info().id());
+ }
+
+ if (error.isSome()) {
+ // We only update the resource version for failed conventional
+ // operations, which are speculatively executed on the master.
+ if (operation.info().type() == Offer::Operation::RESERVE ||
+ operation.info().type() == Offer::Operation::UNRESERVE ||
+ operation.info().type() == Offer::Operation::CREATE ||
+ operation.info().type() == Offer::Operation::DESTROY) {
+ resourceVersion = UUID::random();
+
+ // Send an `UPDATE_STATE` after we finish the current operation.
+ dispatch(self(), &Self::sendResourceProviderStateUpdate);
+ }
+
+ status->set_state(OFFER_OPERATION_FAILED);
+ status->set_message(error->message);
+ } else {
+ status->set_state(OFFER_OPERATION_FINISHED);
+
+ foreach (const ResourceConversion& conversion, conversions.get()) {
+ foreach (const Resource& resource, conversion.converted) {
+ status->add_converted_resources()->CopyFrom(resource);
+ }
+ }
+ }
+
+ update->mutable_latest_status()->CopyFrom(*status);
+
+ auto err = [](const UUID& operationUuid, const string& message) {
+ LOG(ERROR)
+ << "Failed to send status update for offer operation " << operationUuid
+ << ": " << message;
+ };
+
+ driver->send(evolve(call))
+ .onFailed(std::bind(err, operationUuid, lambda::_1))
+ .onDiscarded(std::bind(err, operationUuid, "future discarded"));
+
+ pendingOperations.erase(operationUuid);
+ checkpointResourceProviderState();
+
+ if (error.isSome()) {
+ return error.get();
+ }
+
+ return Nothing();
+}
+
+
void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
{
ResourceProviderState state;
- foreach (const Event::Operation& operation, pendingOperations) {
+ foreachvalue (const Event::Operation& operation, pendingOperations) {
state.add_operations()->CopyFrom(operation);
}
@@ -1851,10 +2410,9 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
Call::UpdateState* update = call.mutable_update_state();
- foreach (const Event::Operation& operation, pendingOperations) {
- Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
- CHECK_SOME(operationUuid);
-
+ foreachpair (const UUID& uuid,
+ const Event::Operation& operation,
+ pendingOperations) {
// TODO(chhsiao): Maintain a list of terminated but unacknowledged
// offer operations in memory and reconstruc that during recovery
// by querying status update manager.
@@ -1868,7 +2426,7 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
: None()),
operation.framework_id(),
slaveId,
- operationUuid.get()));
+ uuid));
}
update->mutable_resources()->CopyFrom(totalResources);
[2/3] mesos git commit: Publish resource provider resources before
container launch or update.
Posted by ji...@apache.org.
Publish resource provider resources before container launch or update.
`Slave::publishAllocatedResources()` will compute the total allocated
resources for all currently running executor containers, and takes an
`extra` argument for resources that will be used by the executor that
is about to launch, then sums them up and asks the resource provider
manager to publish the resources.
Review: https://reviews.apache.org/r/63555
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4e715399
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4e715399
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4e715399
Branch: refs/heads/master
Commit: 4e71539976013b76e85ce79d2a123a02adfb832a
Parents: 3119776
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Nov 3 18:43:30 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 7 15:07:54 2017 -0800
----------------------------------------------------------------------
src/internal/devolve.cpp | 13 +--
src/internal/devolve.hpp | 2 +-
src/internal/evolve.cpp | 10 +++
src/internal/evolve.hpp | 1 +
src/resource_provider/manager.cpp | 9 ++
src/slave/slave.cpp | 72 ++++++++++++---
src/slave/slave.hpp | 8 ++
src/tests/mesos.hpp | 37 +++++++-
src/tests/slave_tests.cpp | 158 +++++++++++++++++++++++++++++++++
9 files changed, 288 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 289c6e3..60a768d 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -116,16 +116,19 @@ Resource devolve(const v1::Resource& resource)
}
-Resources devolve(const v1::Resources& resources)
+ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
{
- return devolve<Resource>(
- static_cast<const RepeatedPtrField<v1::Resource>&>(resources));
+ // NOTE: We do not use the common 'devolve' call for performance.
+ ResourceProviderID id;
+ id.set_value(resourceProviderId.value());
+ return id;
}
-ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
+Resources devolve(const v1::Resources& resources)
{
- return devolve<ResourceProviderID>(resourceProviderId);
+ return devolve<Resource>(
+ static_cast<const RepeatedPtrField<v1::Resource>&>(resources));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 17ab76e..9e56fd9 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -62,8 +62,8 @@ InverseOffer devolve(const v1::InverseOffer& inverseOffer);
Offer devolve(const v1::Offer& offer);
OfferOperationStatus devolve(const v1::OfferOperationStatus& status);
Resource devolve(const v1::Resource& resource);
-Resources devolve(const v1::Resources& resources);
ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId);
+Resources devolve(const v1::Resources& resources);
SlaveID devolve(const v1::AgentID& agentId);
SlaveInfo devolve(const v1::AgentInfo& agentInfo);
TaskID devolve(const v1::TaskID& taskId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index f46f864..6ce6150 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -164,6 +164,16 @@ v1::Resource evolve(const Resource& resource)
}
+v1::ResourceProviderID evolve(
+ const ResourceProviderID& resourceProviderId)
+{
+ // NOTE: We do not use the common 'devolve' call for performance.
+ v1::ResourceProviderID id;
+ id.set_value(resourceProviderId.value());
+ return id;
+}
+
+
v1::Resources evolve(const Resources& resources)
{
return evolve<v1::Resource>(
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index d796f32..77b7172 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -75,6 +75,7 @@ v1::MasterInfo evolve(const MasterInfo& masterInfo);
v1::Offer evolve(const Offer& offer);
v1::OfferID evolve(const OfferID& offerId);
v1::Resource evolve(const Resource& resource);
+v1::ResourceProviderID evolve(const ResourceProviderID& resourceProviderId);
v1::Resources evolve(const Resources& resources);
v1::Task evolve(const Task& task);
v1::TaskID evolve(const TaskID& taskId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 2aee46e..e75d528 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -538,6 +538,10 @@ Future<Nothing> ResourceProviderManagerProcess::publish(
ResourceProvider* resourceProvider =
resourceProviders.subscribed.at(resourceProviderId).get();
+ LOG(INFO)
+ << "Sending PUBLISH event " << uuid << " with resources '" << resources
+ << "' to resource provider " << resourceProviderId;
+
if (!resourceProvider->http.send(event)) {
return Failure(
"Failed to send PUBLISH event to resource provider " +
@@ -677,6 +681,11 @@ void ResourceProviderManagerProcess::updatePublishStatus(
return;
}
+ LOG(INFO)
+ << "Received UPDATE_PUBLISH_STATUS call for PUBLISH event " << uuid.get()
+ << " with " << update.status() << " status from resource provider "
+ << resourceProvider->info.id();
+
if (update.status() == Call::UpdatePublishStatus::OK) {
resourceProvider->publishes.at(uuid.get())->set(Nothing());
} else {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fb077b7..1bdc9d8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2554,9 +2554,12 @@ void Slave::__run(
LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
<< " for executor " << *executor;
- containerizer->update(
- executor->containerId,
- executor->allocatedResources())
+ publishResources()
+ .then(defer(self(), [=] {
+ return containerizer->update(
+ executor->containerId,
+ executor->allocatedResources());
+ }))
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -2998,11 +3001,19 @@ void Slave::launchExecutor(
<< "' of framework " << framework->id();
// Launch the container.
- containerizer->launch(
- executor->containerId,
- containerConfig,
- environment,
- pidCheckpointPath)
+ // NOTE: Since we modify the ExecutorInfo to include the task's
+ // resources when launching the executor, these resources need to be
+ // published before the containerizer preparing them. This should be
+ // revisited after MESOS-600.
+ publishResources(
+ taskInfo.isSome() ? taskInfo->resources() : Option<Resources>::none())
+ .then(defer(self(), [=] {
+ return containerizer->launch(
+ executor->containerId,
+ containerConfig,
+ environment,
+ pidCheckpointPath);
+ }))
.onAny(defer(self(),
&Self::executorLaunched,
frameworkId,
@@ -4154,9 +4165,12 @@ void Slave::subscribe(
}
}
- containerizer->update(
- executor->containerId,
- executor->allocatedResources())
+ publishResources()
+ .then(defer(self(), [=] {
+ return containerizer->update(
+ executor->containerId,
+ executor->allocatedResources());
+ }))
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -4358,9 +4372,12 @@ void Slave::registerExecutor(
}
}
- containerizer->update(
- executor->containerId,
- executor->allocatedResources())
+ publishResources()
+ .then(defer(self(), [=] {
+ return containerizer->update(
+ executor->containerId,
+ executor->allocatedResources());
+ }))
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -7323,6 +7340,33 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
}
+Future<Nothing> Slave::publishResources(
+ const Option<Resources>& additionalResources)
+{
+ Resources resources;
+
+ // NOTE: For resources providers that serve quantity-based resources
+ // without any identifiers (such as memory), it is very hard to keep
+ // track of published resources. So instead of implementing diff-based
+ // resource publishing, we implement an "ensure-all" semantics, and
+ // always calculate the total resources that need to remain published.
+ foreachvalue (const Framework* framework, frameworks) {
+ // NOTE: We do not call `framework->allocatedResource()` here
+ // because we do not want to publsh resources for pending tasks that
+ // have not been authorized yet.
+ foreachvalue (const Executor* executor, framework->executors) {
+ resources += executor->allocatedResources();
+ }
+ }
+
+ if (additionalResources.isSome()) {
+ resources += additionalResources.get();
+ }
+
+ return resourceProviderManager.publish(resources);
+}
+
+
void Slave::qosCorrections()
{
qosController->corrections()
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index bbf5b79..d9b0469 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -567,6 +567,14 @@ private:
void apply(const std::vector<ResourceConversion>& conversions);
+ // Publish all resources that are needed to run the current set of
+ // tasks and executors on the agent.
+ // NOTE: The `additionalResources` parameter is for publishing
+ // additional task resources when launching executors. Consider
+ // removing this parameter once we revisited MESOS-600.
+ process::Future<Nothing> publishResources(
+ const Option<Resources>& additionalResources = None());
+
// Gauge methods.
double _frameworks_active()
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 53890d8..657f925 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -440,6 +440,7 @@ using mesos::v1::MachineID;
using mesos::v1::Metric;
using mesos::v1::Offer;
using mesos::v1::Resource;
+using mesos::v1::ResourceProviderInfo;
using mesos::v1::Resources;
using mesos::v1::TaskID;
using mesos::v1::TaskInfo;
@@ -1345,7 +1346,7 @@ inline typename TOffer::Operation CREATE_VOLUME(
typename TOffer::Operation operation;
operation.set_type(TOffer::Operation::CREATE_VOLUME);
operation.mutable_create_volume()->mutable_source()->CopyFrom(source);
- operation.set_target_type(type);
+ operation.mutable_create_volume()->set_target_type(type);
return operation;
}
@@ -2848,6 +2849,22 @@ public:
Operation,
Source>::operationDefault));
EXPECT_CALL(*this, operation(_)).WillRepeatedly(DoDefault());
+
+ ON_CALL(*this, publish(_))
+ .WillByDefault(Invoke(
+ this,
+ &MockResourceProvider<
+ Event,
+ Call,
+ Driver,
+ ResourceProviderInfo,
+ Resource,
+ Resources,
+ ResourceProviderID,
+ OfferOperationState,
+ Operation,
+ Source>::publishDefault));
+ EXPECT_CALL(*this, publish(_)).WillRepeatedly(DoDefault());
}
MOCK_METHOD0_T(connected, void());
@@ -3027,7 +3044,7 @@ public:
->Mutable(0)
->mutable_disk()
->mutable_source()
- ->set_type(Source::BLOCK);
+ ->set_type(Source::RAW);
break;
case Operation::CREATE_BLOCK:
update->mutable_status()->add_converted_resources()->CopyFrom(
@@ -3058,6 +3075,22 @@ public:
driver->send(call);
}
+ void publishDefault(const typename Event::Publish& publish)
+ {
+ CHECK(info.has_id());
+
+ Call call;
+ call.set_type(Call::UPDATE_PUBLISH_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(info.id());
+
+ typename Call::UpdatePublishStatus* update =
+ call.mutable_update_publish_status();
+ update->set_uuid(publish.uuid());
+ update->set_status(Call::UpdatePublishStatus::OK);
+
+ driver->send(call);
+ }
+
ResourceProviderInfo info;
private:
http://git-wip-us.apache.org/repos/asf/mesos/blob/4e715399/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 25cfd47..6640620 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8775,6 +8775,164 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
}
+// This test checks that before a workload (executor or task) is
+// launched, all resources from resoruce providers nended to run the
+// current set of workloads are properly published.
+TEST_F(SlaveTest, ResourceProviderPublishAll)
+{
+ // Start an agent and a master.
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability and other required capabilities.
+ constexpr SlaveInfo::Capability::Type capabilities[] = {
+ SlaveInfo::Capability::MULTI_ROLE,
+ SlaveInfo::Capability::HIERARCHICAL_ROLE,
+ SlaveInfo::Capability::RESERVATION_REFINEMENT,
+ SlaveInfo::Capability::RESOURCE_PROVIDER
+ };
+
+ flags.agent_features = SlaveCapabilities();
+ foreach (SlaveInfo::Capability::Type type, capabilities) {
+ flags.agent_features->add_capabilities()->set_type(type);
+ }
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Register a mock local resource provider with the agent.
+ v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.local.mock");
+ resourceProviderInfo.set_name("test");
+
+ vector<v1::Resource> resources = {
+ v1::Resources::parse("disk", "4096", "role1").get(),
+ v1::Resources::parse("disk", "4096", "role2").get()
+ };
+
+ v1::MockResourceProvider resourceProvider(resourceProviderInfo, resources);
+
+ string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+ if (process::network::openssl::flags().enabled) {
+ scheme = "https";
+ }
+#endif
+
+ process::http::URL url(
+ scheme,
+ slave.get()->pid.address.ip,
+ slave.get()->pid.address.port,
+ slave.get()->pid.id + "/api/v1/resource_provider");
+
+ Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+ resourceProvider.start(
+ endpointDetector,
+ ContentType::PROTOBUF,
+ v1::DEFAULT_CREDENTIAL);
+
+ // We want to register two frameworks to launch two concurrent tasks
+ // that use the provider resources, and verify that when the second
+ // task is launched, all provider resources are published.
+ // NOTE: The mock schedulers and drivers are stored outside the loop
+ // to avoid implicit destruction before the test ends.
+ vector<Owned<MockScheduler>> scheds;
+ vector<Owned<MesosSchedulerDriver>> drivers;
+
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (the default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ for (size_t i = 0; i < resources.size(); i++) {
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, resources.at(i).reservations(0).role());
+
+ Owned<MockScheduler> sched(new MockScheduler());
+ Owned<MesosSchedulerDriver> driver(new MesosSchedulerDriver(
+ sched.get(), framework, master.get()->pid, DEFAULT_CREDENTIAL));
+
+ EXPECT_CALL(*sched, registered(driver.get(), _, _));
+
+ Future<vector<Offer>> offers;
+
+ // Decline unmatched offers.
+ // NOTE: This ensures that this framework do not hold the agent's
+ // default resources. Otherwise, the other one will get no offer.
+ EXPECT_CALL(*sched, resourceOffers(driver.get(), _))
+ .WillRepeatedly(DeclineOffers());
+
+ EXPECT_CALL(*sched, resourceOffers(driver.get(), OffersHaveAnyResource(
+ std::bind(&Resources::isReserved, lambda::_1, framework.roles(0)))))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver->start();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->empty());
+
+ Future<mesos::v1::resource_provider::Event::Publish> publish;
+
+ // Two PUBLISH events will be received: one for launching the
+ // executor, and the other for launching the task.
+ EXPECT_CALL(resourceProvider, publish(_))
+ .WillOnce(
+ Invoke(&resourceProvider,
+ &v1::MockResourceProvider::publishDefault))
+ .WillOnce(DoAll(
+ FutureArg<0>(&publish),
+ Invoke(&resourceProvider,
+ &v1::MockResourceProvider::publishDefault)));
+
+ Future<TaskStatus> taskStarting;
+ Future<TaskStatus> taskRunning;
+
+ EXPECT_CALL(*sched, statusUpdate(driver.get(), _))
+ .WillOnce(FutureArg<1>(&taskStarting))
+ .WillOnce(FutureArg<1>(&taskRunning));
+
+ // Launch a task using a provider resource.
+ driver->acceptOffers(
+ {offers->at(0).id()},
+ {LAUNCH({createTask(
+ offers->at(0).slave_id(),
+ Resources(offers->at(0).resources()).reserved(framework.roles(0)),
+ createCommandInfo("sleep 1000"))})},
+ filters);
+
+ AWAIT_READY(publish);
+
+ // Test if the resources of all running executors are published.
+ // This is checked through counting how many reservatinos there are
+ // in the published resources: one (role1) when launching the first
+ // task, two (role1, role2) when the second task is launched.
+ EXPECT_EQ(i + 1, v1::Resources(publish->resources()).reservations().size());
+
+ AWAIT_READY(taskStarting);
+ EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+ AWAIT_READY(taskRunning);
+ EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+ // Store the mock scheduler and driver to prevent destruction.
+ scheds.emplace_back(std::move(sched));
+ drivers.emplace_back(std::move(driver));
+ }
+}
+
+
// This test checks that the agent correctly updates and sends
// resource version values when it registers or reregisters.
TEST_F(SlaveTest, ResourceVersions)
[3/3] mesos git commit: Unit tests for storage local resource
provider.
Posted by ji...@apache.org.
Unit tests for storage local resource provider.
This patche adds two unit tests. The first one will register a framework
to issue a `CREATE_VOLUME` then a `DESTROY_VOLUME` call to destroy
an unpublised volume. The second one will register a framework to issue
`CREATE_VOLUME`, `CREATE`, `LAUNCH`, `DESTROY_VOLUME` in sequence to
create a volume and launch a task to use a persistent volume on the CSI
volume, then destroy this published volume.
Review: https://reviews.apache.org/r/63390
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea98a344
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea98a344
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea98a344
Branch: refs/heads/master
Commit: ea98a3440d00a3014a79dc88a0cc122806577de3
Parents: 4e71539
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu Oct 26 15:52:03 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 7 15:07:54 2017 -0800
----------------------------------------------------------------------
src/Makefile.am | 3 +-
src/examples/test_csi_plugin.cpp | 61 ++-
src/master/validation.cpp | 10 +-
src/tests/mesos.hpp | 38 ++
.../storage_local_resource_provider_tests.cpp | 530 +++++++++++++++++++
5 files changed, 620 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index be105f4..f948960 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2613,7 +2613,8 @@ if ENABLE_GRPC
mesos_tests_SOURCES += \
tests/csi_client_tests.cpp \
tests/mock_csi_plugin.cpp \
- tests/mock_csi_plugin.hpp
+ tests/mock_csi_plugin.hpp \
+ tests/storage_local_resource_provider_tests.cpp
endif
if ENABLE_SSL
http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 8ab936b..6359db8 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -241,7 +241,7 @@ Status TestCSIPlugin::GetSupportedVersions(
const csi::GetSupportedVersionsRequest* request,
csi::GetSupportedVersionsResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
response->add_supported_versions()->CopyFrom(version);
@@ -254,7 +254,7 @@ Status TestCSIPlugin::GetPluginInfo(
const csi::GetPluginInfoRequest* request,
csi::GetPluginInfoResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
@@ -273,7 +273,7 @@ Status TestCSIPlugin::CreateVolume(
const csi::CreateVolumeRequest* request,
csi::CreateVolumeResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
@@ -337,6 +337,8 @@ Status TestCSIPlugin::CreateVolume(
response->mutable_volume_info()->set_id(volume.id);
response->mutable_volume_info()->set_capacity_bytes(volume.size.bytes());
+ (*response->mutable_volume_info()->mutable_attributes())["path"] =
+ getVolumePath(volume);
if (alreadyExists) {
return Status(
@@ -353,7 +355,7 @@ Status TestCSIPlugin::DeleteVolume(
const csi::DeleteVolumeRequest* request,
csi::DeleteVolumeResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
@@ -391,7 +393,7 @@ Status TestCSIPlugin::ControllerPublishVolume(
const csi::ControllerPublishVolumeRequest* request,
csi::ControllerPublishVolumeResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
@@ -406,6 +408,14 @@ Status TestCSIPlugin::ControllerPublishVolume(
"Volume '" + request->volume_id() + "' is not found");
}
+ const Volume& volume = volumes.at(request->volume_id());
+ const string path = getVolumePath(volume);
+
+ auto it = request->volume_attributes().find("path");
+ if (it == request->volume_attributes().end() || it->second != path) {
+ return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+ }
+
if (request->node_id() != NODE_ID) {
return Status(
grpc::NOT_FOUND,
@@ -422,7 +432,7 @@ Status TestCSIPlugin::ControllerUnpublishVolume(
const csi::ControllerUnpublishVolumeRequest* request,
csi::ControllerUnpublishVolumeResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
@@ -453,7 +463,7 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
const csi::ValidateVolumeCapabilitiesRequest* request,
csi::ValidateVolumeCapabilitiesResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
@@ -468,6 +478,14 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
"Volume '" + request->volume_id() + "' is not found");
}
+ const Volume& volume = volumes.at(request->volume_id());
+ const string path = getVolumePath(volume);
+
+ auto it = request->volume_attributes().find("path");
+ if (it == request->volume_attributes().end() || it->second != path) {
+ return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+ }
+
foreach (const csi::VolumeCapability& capability,
request->volume_capabilities()) {
if (!capability.has_mount()) {
@@ -497,7 +515,7 @@ Status TestCSIPlugin::ListVolumes(
const csi::ListVolumesRequest* request,
csi::ListVolumesResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
@@ -529,7 +547,7 @@ Status TestCSIPlugin::GetCapacity(
const csi::GetCapacityRequest* request,
csi::GetCapacityResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
@@ -563,7 +581,7 @@ Status TestCSIPlugin::ControllerProbe(
const csi::ControllerProbeRequest* request,
csi::ControllerProbeResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
@@ -579,7 +597,7 @@ Status TestCSIPlugin::ControllerGetCapabilities(
const csi::ControllerGetCapabilitiesRequest* request,
csi::ControllerGetCapabilitiesResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
@@ -604,7 +622,7 @@ Status TestCSIPlugin::NodePublishVolume(
const csi::NodePublishVolumeRequest* request,
csi::NodePublishVolumeResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
// TODO(chhsiao): Validate required fields.
@@ -619,6 +637,14 @@ Status TestCSIPlugin::NodePublishVolume(
"Volume '" + request->volume_id() + "' is not found");
}
+ const Volume& volume = volumes.at(request->volume_id());
+ const string path = getVolumePath(volume);
+
+ auto it = request->volume_attributes().find("path");
+ if (it == request->volume_attributes().end() || it->second != path) {
+ return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+ }
+
if (!os::exists(request->target_path())) {
return Status(
grpc::INVALID_ARGUMENT,
@@ -638,9 +664,6 @@ Status TestCSIPlugin::NodePublishVolume(
}
}
- const Volume& volume = volumes.at(request->volume_id());
- const string path = getVolumePath(volume);
-
Try<Nothing> mount = fs::mount(
path,
request->target_path(),
@@ -678,7 +701,7 @@ Status TestCSIPlugin::NodeUnpublishVolume(
const csi::NodeUnpublishVolumeRequest* request,
csi::NodeUnpublishVolumeResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
@@ -730,7 +753,7 @@ Status TestCSIPlugin::GetNodeID(
const csi::GetNodeIDRequest* request,
csi::GetNodeIDResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
@@ -748,7 +771,7 @@ Status TestCSIPlugin::NodeProbe(
const csi::NodeProbeRequest* request,
csi::NodeProbeResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
@@ -764,7 +787,7 @@ Status TestCSIPlugin::NodeGetCapabilities(
const csi::NodeGetCapabilitiesRequest* request,
csi::NodeGetCapabilitiesResponse* response)
{
- LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
Option<Error> error = validateVersion(request->version());
if (error.isSome()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index bf7ae65..38d9a3c 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2236,8 +2236,14 @@ Option<Error> validate(
return Error("Not a persistent volume: " + error.get().message);
}
- if (!checkpointedResources.contains(volumes)) {
- return Error("Persistent volumes not found");
+ foreach (const Resource volume, volumes) {
+ if (Resources::hasResourceProvider(volume)) {
+ continue;
+ }
+
+ if (!checkpointedResources.contains(volume)) {
+ return Error("Persistent volumes not found");
+ }
}
// Ensure the volumes being destroyed are not in use currently.
http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 657f925..76fbd90 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -3449,6 +3449,44 @@ void ExpectNoFutureUnionHttpProtobufs(
}
+// This matcher is used to match a vector of resource offers that
+// contains an offer having any resource that passes the filter.
+MATCHER_P(OffersHaveAnyResource, filter, "")
+{
+ foreach (const Offer& offer, arg) {
+ foreach (const Resource& resource, offer.resources()) {
+ if (filter(resource)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+
+// This matcher is used to match a vector of resource offers that
+// contains an offer having the specified resource.
+MATCHER_P(OffersHaveResource, resource, "")
+{
+ foreach (const Offer& offer, arg) {
+ Resources resources = offer.resources();
+
+ // If `resource` is not allocated, we are matching offers against
+ // resources constructed from scratch, so we strip off allocations.
+ if (!resource.has_allocation_info()) {
+ resources.unallocate();
+ }
+
+ if (resources.contains(resource)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
// This matcher is used to match the task id of `TaskStatus` message.
MATCHER_P(TaskStatusTaskIdEq, taskInfo, "")
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/ea98a344/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
new file mode 100644
index 0000000..0f0c627
--- /dev/null
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+#include <process/gmock.hpp>
+
+#include <stout/hashmap.hpp>
+
+#include "tests/flags.hpp"
+#include "tests/mesos.hpp"
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+using mesos::master::detector::MasterDetector;
+
+using process::Future;
+using process::Owned;
+
+using testing::Args;
+using testing::Sequence;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class StorageLocalResourceProviderTest : public MesosTest
+{
+public:
+ virtual void SetUp()
+ {
+ MesosTest::SetUp();
+
+ const string testCsiPluginWorkDir = path::join(sandbox.get(), "test");
+ ASSERT_SOME(os::mkdir(testCsiPluginWorkDir));
+
+ resourceProviderConfigDir =
+ path::join(sandbox.get(), "resource_provider_configs");
+
+ ASSERT_SOME(os::mkdir(resourceProviderConfigDir));
+
+ string testCsiPluginPath =
+ path::join(tests::flags.build_dir, "src", "test-csi-plugin");
+
+ Try<string> resourceProviderConfig = strings::format(
+ R"~(
+ {
+ "type": "org.apache.mesos.rp.local.storage",
+ "name": "test",
+ "default_reservations": [
+ {
+ "type": "STATIC",
+ "role": "storage"
+ }
+ ],
+ "storage": {
+ "plugin": {
+ "type": "org.apache.mesos.csi.test",
+ "name": "slrp_test",
+ "containers": [
+ {
+ "services": [
+ "CONTROLLER_SERVICE",
+ "NODE_SERVICE"
+ ],
+ "command": {
+ "shell": false,
+ "value": "%s",
+ "arguments": [
+ "%s",
+ "--total_capacity=4GB",
+ "--work_dir=%s"
+ ]
+ }
+ }
+ ]
+ }
+ }
+ }
+ )~",
+ testCsiPluginPath,
+ testCsiPluginPath,
+ testCsiPluginWorkDir);
+
+ ASSERT_SOME(resourceProviderConfig);
+
+ ASSERT_SOME(os::write(
+ path::join(resourceProviderConfigDir, "test.json"),
+ resourceProviderConfig.get()));
+ }
+
+protected:
+ string resourceProviderConfigDir;
+};
+
+
+// This test verifies that a framework can create then destroy a volume
+// from the resources provided by a storage local resource provider that
+// uses the test CSI plugin.
+TEST_F(StorageLocalResourceProviderTest, ROOT_CreateVolumeAndDestroyVolume)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ flags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability and other required capabilities.
+ constexpr SlaveInfo::Capability::Type capabilities[] = {
+ SlaveInfo::Capability::MULTI_ROLE,
+ SlaveInfo::Capability::HIERARCHICAL_ROLE,
+ SlaveInfo::Capability::RESERVATION_REFINEMENT,
+ SlaveInfo::Capability::RESOURCE_PROVIDER
+ };
+
+ flags.agent_features = SlaveCapabilities();
+ foreach (SlaveInfo::Capability::Type type, capabilities) {
+ flags.agent_features->add_capabilities()->set_type(type);
+ }
+
+ flags.resource_provider_config_dir = resourceProviderConfigDir;
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Register a framework to exercise offer operations.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (the default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // The framework is expected to see the following offers in sequence:
+ // 1. One containing a RAW disk resource before `CREATE_VOLUME`.
+ // 2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+ // 3. One containing a RAW disk resource after `DSTROY_VOLUME`.
+ Future<vector<Offer>> rawDiskOffers;
+ Future<vector<Offer>> volumeCreatedOffers;
+ Future<vector<Offer>> volumeDestroyedOffers;
+
+ Sequence offers;
+
+ auto isSourceType = [](
+ const Resource& r, const Resource::DiskInfo::Source::Type& type) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().type() == type;
+ };
+
+ // Decline offers that contain only the agent's default resources.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers());
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+ driver.start();
+
+ AWAIT_READY(rawDiskOffers);
+ ASSERT_FALSE(rawDiskOffers->empty());
+
+ Option<Resource> source;
+
+ foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+ if (resource.has_disk() &&
+ resource.disk().has_source() &&
+ resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+ source = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(source);
+
+ // Create a volume.
+ driver.acceptOffers(
+ {rawDiskOffers->at(0).id()},
+ {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+ filters);
+
+ AWAIT_READY(volumeCreatedOffers);
+ ASSERT_FALSE(volumeCreatedOffers->empty());
+
+ Option<Resource> volume;
+
+ foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+ if (resource.has_disk() &&
+ resource.disk().has_source() &&
+ resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
+ volume = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(volume);
+ ASSERT_TRUE(volume->disk().source().has_id());
+ ASSERT_TRUE(volume->disk().source().has_metadata());
+ ASSERT_TRUE(volume->disk().source().has_mount());
+ ASSERT_TRUE(volume->disk().source().mount().has_root());
+ EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+ // Check if the volume is actually created by the test CSI plugin.
+ Option<string> volumePath;
+
+ foreach (const Label& label, volume->disk().source().metadata().labels()) {
+ if (label.key() == "path") {
+ volumePath = label.value();
+ break;
+ }
+ }
+
+ ASSERT_SOME(volumePath);
+ EXPECT_TRUE(os::exists(volumePath.get()));
+
+ // Destroy the created volume.
+ driver.acceptOffers(
+ {volumeCreatedOffers->at(0).id()},
+ {DESTROY_VOLUME(volume.get())},
+ filters);
+
+ AWAIT_READY(volumeDestroyedOffers);
+ ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+ Option<Resource> destroyed;
+
+ foreach (const Resource& resource, volumeDestroyedOffers->at(0).resources()) {
+ if (resource.has_disk() &&
+ resource.disk().has_source() &&
+ resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+ destroyed = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(destroyed);
+ ASSERT_FALSE(destroyed->disk().source().has_id());
+ ASSERT_FALSE(destroyed->disk().source().has_metadata());
+ ASSERT_FALSE(destroyed->disk().source().has_mount());
+
+ // Check if the volume is actually deleted by the test CSI plugin.
+ EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+
+// This test verifies that a framework can launch a task using a created
+// volume provided by a storage local resource provider that uses the
+// test CSI plugin, then destroy the volume while it is published.
+TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchAndDestroyVolume)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ flags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability and other required capabilities.
+ constexpr SlaveInfo::Capability::Type capabilities[] = {
+ SlaveInfo::Capability::MULTI_ROLE,
+ SlaveInfo::Capability::HIERARCHICAL_ROLE,
+ SlaveInfo::Capability::RESERVATION_REFINEMENT,
+ SlaveInfo::Capability::RESOURCE_PROVIDER
+ };
+
+ flags.agent_features = SlaveCapabilities();
+ foreach (SlaveInfo::Capability::Type type, capabilities) {
+ flags.agent_features->add_capabilities()->set_type(type);
+ }
+
+ flags.resource_provider_config_dir = resourceProviderConfigDir;
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Register a framework to exercise offer operations.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (the default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // The framework is expected to see the following offers in sequence:
+ // 1. One containing a RAW disk resource before `CREATE_VOLUME`.
+ // 2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+ // 3. One containing a persistent volume after `CREATE`.
+ // 4. One containing the same persistent volume after `LAUNCH`.
+ // 5. One containing the same MOUNT disk resource after `DESTROY`.
+ // 6. One containing the same RAW disk resource after `DESTROY_VOLUME`.
+ //
+ // We set up the expectations for these offers as the test progresses.
+ Future<vector<Offer>> rawDiskOffers;
+ Future<vector<Offer>> volumeCreatedOffers;
+ Future<vector<Offer>> persistenceCreatedOffers;
+ Future<vector<Offer>> taskFinishedOffers;
+ Future<vector<Offer>> persistenceDestroyedOffers;
+ Future<vector<Offer>> volumeDestroyedOffers;
+
+ Sequence offers;
+
+ auto isSourceType = [](
+ const Resource& r, const Resource::DiskInfo::Source::Type& type) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().type() == type;
+ };
+
+ // Decline offers that contain only the agent's default resources.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers());
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+ driver.start();
+
+ AWAIT_READY(rawDiskOffers);
+ ASSERT_FALSE(rawDiskOffers->empty());
+
+ Option<Resource> source;
+
+ foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+ if (resource.has_disk() &&
+ resource.disk().has_source() &&
+ resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+ source = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(source);
+
+ // Create a volume.
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+ driver.acceptOffers(
+ {rawDiskOffers->at(0).id()},
+ {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+ filters);
+
+ AWAIT_READY(volumeCreatedOffers);
+ ASSERT_FALSE(volumeCreatedOffers->empty());
+
+ Option<Resource> volume;
+
+ foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+ if (resource.has_disk() &&
+ resource.disk().has_source() &&
+ resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
+ volume = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(volume);
+ ASSERT_TRUE(volume->disk().source().has_id());
+ ASSERT_TRUE(volume->disk().source().has_metadata());
+ ASSERT_TRUE(volume->disk().source().has_mount());
+ ASSERT_TRUE(volume->disk().source().mount().has_root());
+ EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+ // Check if the volume is actually created by the test CSI plugin.
+ Option<string> volumePath;
+
+ foreach (const Label& label, volume->disk().source().metadata().labels()) {
+ if (label.key() == "path") {
+ volumePath = label.value();
+ break;
+ }
+ }
+
+ ASSERT_SOME(volumePath);
+ EXPECT_TRUE(os::exists(volumePath.get()));
+
+ // Put a file into the volume.
+ ASSERT_SOME(os::touch(path::join(volumePath.get(), "file")));
+
+ // Create a persistent volume on the CSI volume.
+ Resource persistentVolume = volume.get();
+ persistentVolume.mutable_disk()->mutable_persistence()
+ ->set_id(UUID::random().toString());
+ persistentVolume.mutable_disk()->mutable_persistence()
+ ->set_principal(framework.principal());
+ persistentVolume.mutable_disk()->mutable_volume()
+ ->set_container_path("volume");
+ persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
+ persistentVolume)))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&persistenceCreatedOffers));
+
+ driver.acceptOffers(
+ {volumeCreatedOffers->at(0).id()},
+ {CREATE(persistentVolume)},
+ filters);
+
+ AWAIT_READY(persistenceCreatedOffers);
+ ASSERT_FALSE(persistenceCreatedOffers->empty());
+
+ // Launch a task to use the persistent volume.
+ Future<TaskStatus> taskStarting;
+ Future<TaskStatus> taskRunning;
+ Future<TaskStatus> taskFinished;
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&taskStarting))
+ .WillOnce(FutureArg<1>(&taskRunning))
+ .WillOnce(FutureArg<1>(&taskFinished));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
+ persistentVolume)))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&taskFinishedOffers));
+
+ driver.acceptOffers(
+ {persistenceCreatedOffers->at(0).id()},
+ {LAUNCH({createTask(
+ persistenceCreatedOffers->at(0).slave_id(),
+ persistentVolume,
+ createCommandInfo("test -f " + path::join("volume", "file")))})},
+ filters);
+
+ AWAIT_READY(taskStarting);
+ EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+ AWAIT_READY(taskRunning);
+ EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+ AWAIT_READY(taskFinished);
+ EXPECT_EQ(TASK_FINISHED, taskFinished->state());
+
+ AWAIT_READY(taskFinishedOffers);
+
+ // Destroy the persistent volume on the CSI volume.
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(volume.get())))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&persistenceDestroyedOffers));
+
+ driver.acceptOffers(
+ {taskFinishedOffers->at(0).id()},
+ {DESTROY(persistentVolume)},
+ filters);
+
+ AWAIT_READY(persistenceDestroyedOffers);
+
+ // Destroy the created volume.
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get())))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+ driver.acceptOffers(
+ {persistenceDestroyedOffers->at(0).id()},
+ {DESTROY_VOLUME(volume.get())},
+ filters);
+
+ AWAIT_READY(volumeDestroyedOffers);
+ ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+ // Check if the volume is actually deleted by the test CSI plugin.
+ EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {