You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:06 UTC
[mesos] 09/15: Refactored SLRP to use v0 `VolumeManager`.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit b2f1d3bbf0fdae99c5945df15323bbd28a067b79
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:23:55 2019 -0700
Refactored SLRP to use v0 `VolumeManager`.
This patch moves volume management code from SLRP to the v0
`VolumeManager`, and make SLRP uses the `VolumeManager` interface
polymorphically.
However, since SLRP now no longer keeps track of CSI volume states, it
will not be able to verify that a persistent volume is published before
being destroyed (although this should be guaranteed by volume manager
recovery).
Review: https://reviews.apache.org/r/70222/
---
src/csi/v0_volume_manager.cpp | 1015 ++++++++++++++++-
src/csi/v0_volume_manager_process.hpp | 107 ++
src/resource_provider/storage/provider.cpp | 1190 +-------------------
src/resource_provider/storage/provider_process.hpp | 151 +--
.../storage_local_resource_provider_tests.cpp | 25 +-
5 files changed, 1183 insertions(+), 1305 deletions(-)
diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp
index 2a4d3eb..c6112be 100644
--- a/src/csi/v0_volume_manager.cpp
+++ b/src/csi/v0_volume_manager.cpp
@@ -16,26 +16,57 @@
#include "csi/v0_volume_manager.hpp"
+#include <algorithm>
+#include <cstdlib>
+#include <functional>
+#include <list>
+
+#include <process/after.hpp>
+#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
+#include <process/loop.hpp>
#include <process/process.hpp>
#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/none.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+#include <stout/some.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+#include <stout/unreachable.hpp>
+#include "csi/client.hpp"
+#include "csi/paths.hpp"
+#include "csi/utils.hpp"
#include "csi/v0_volume_manager_process.hpp"
+#include "slave/state.hpp"
+
namespace http = process::http;
+namespace slave = mesos::internal::slave;
+using std::list;
using std::string;
using std::vector;
using google::protobuf::Map;
+using mesos::csi::state::VolumeState;
+
+using process::Break;
+using process::Continue;
+using process::ControlFlow;
using process::Failure;
using process::Future;
using process::ProcessBase;
+using process::grpc::StatusError;
+
using process::grpc::client::Runtime;
namespace mesos{
@@ -76,13 +107,163 @@ VolumeManagerProcess::VolumeManagerProcess(
Future<Nothing> VolumeManagerProcess::recover()
{
- return Failure("Unimplemented");
+ Try<string> bootId_ = os::bootId();
+ if (bootId_.isError()) {
+ return Failure("Failed to get boot ID: " + bootId_.error());
+ }
+
+ bootId = bootId_.get();
+
+ return serviceManager->recover()
+ .then(process::defer(self(), &Self::prepareServices))
+ .then(process::defer(self(), [this]() -> Future<Nothing> {
+ // Recover the states of CSI volumes.
+ Try<list<string>> volumePaths =
+ paths::getVolumePaths(rootDir, info.type(), info.name());
+
+ if (volumePaths.isError()) {
+ return Failure(
+ "Failed to find volumes for CSI plugin type '" + info.type() +
+ "' and name '" + info.name() + "': " + volumePaths.error());
+ }
+
+ vector<Future<Nothing>> futures;
+
+ foreach (const string& path, volumePaths.get()) {
+ Try<paths::VolumePath> volumePath =
+ paths::parseVolumePath(rootDir, path);
+
+ if (volumePath.isError()) {
+ return Failure(
+ "Failed to parse volume path '" + path +
+ "': " + volumePath.error());
+ }
+
+ CHECK_EQ(info.type(), volumePath->type);
+ CHECK_EQ(info.name(), volumePath->name);
+
+ const string& volumeId = volumePath->volumeId;
+ const string statePath = paths::getVolumeStatePath(
+ rootDir, info.type(), info.name(), volumeId);
+
+ if (!os::exists(statePath)) {
+ continue;
+ }
+
+ Result<VolumeState> volumeState =
+ slave::state::read<VolumeState>(statePath);
+
+ if (volumeState.isError()) {
+ return Failure(
+ "Failed to read volume state from '" + statePath +
+ "': " + volumeState.error());
+ }
+
+ if (volumeState.isNone()) {
+ continue;
+ }
+
+ volumes.put(volumeId, std::move(volumeState.get()));
+ VolumeData& volume = volumes.at(volumeId);
+
+ if (!VolumeState::State_IsValid(volume.state.state())) {
+ return Failure("Volume '" + volumeId + "' is in INVALID state");
+ }
+
+ // First, if there is a node reboot after the volume is made
+ // publishable, it should be reset to `NODE_READY`.
+ switch (volume.state.state()) {
+ case VolumeState::CREATED:
+ case VolumeState::NODE_READY:
+ case VolumeState::CONTROLLER_PUBLISH:
+ case VolumeState::CONTROLLER_UNPUBLISH:
+ case VolumeState::NODE_STAGE: {
+ break;
+ }
+ case VolumeState::VOL_READY:
+ case VolumeState::PUBLISHED:
+ case VolumeState::NODE_UNSTAGE:
+ case VolumeState::NODE_PUBLISH:
+ case VolumeState::NODE_UNPUBLISH: {
+ if (bootId != volume.state.boot_id()) {
+ // Since this is a no-op, no need to checkpoint here.
+ volume.state.set_state(VolumeState::NODE_READY);
+ volume.state.clear_boot_id();
+ }
+
+ break;
+ }
+ case VolumeState::UNKNOWN: {
+ return Failure("Volume '" + volumeId + "' is in UNKNOWN state");
+ }
+
+ // NOTE: We avoid using a default clause for the following values in
+ // proto3's open enum to enable the compiler to detect missing enum
+ // cases for us. See: https://github.com/google/protobuf/issues/3917
+ case google::protobuf::kint32min:
+ case google::protobuf::kint32max: {
+ UNREACHABLE();
+ }
+ }
+
+ // Second, if the volume has been used by a container before recovery,
+ // we have to bring the volume back to `PUBLISHED` so data can be
+ // cleaned up synchronously when needed.
+ if (volume.state.node_publish_required()) {
+ futures.push_back(publishVolume(volumeId));
+ }
+ }
+
+ // Garbage collect leftover mount paths that were failed to remove before.
+ const string mountRootDir =
+ paths::getMountRootDir(rootDir, info.type(), info.name());
+
+ Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir);
+ if (mountPaths.isError()) {
+ // TODO(chhsiao): This could indicate that something is seriously wrong.
+ // To help debugging the problem, we should surface the error via
+ // MESOS-8745.
+ return Failure(
+ "Failed to find mount paths for CSI plugin type '" + info.type() +
+ "' and name '" + info.name() + "': " + mountPaths.error());
+ }
+
+ foreach (const string& path, mountPaths.get()) {
+ Try<string> volumeId = paths::parseMountPath(mountRootDir, path);
+ if (volumeId.isError()) {
+ return Failure(
+ "Failed to parse mount path '" + path + "': " + volumeId.error());
+ }
+
+ if (!volumes.contains(volumeId.get())) {
+ garbageCollectMountPath(volumeId.get());
+ }
+ }
+
+ return process::collect(futures).then([] { return Nothing(); });
+ }));
}
Future<vector<VolumeInfo>> VolumeManagerProcess::listVolumes()
{
- return Failure("Unimplemented");
+ if (!controllerCapabilities->listVolumes) {
+ return vector<VolumeInfo>();
+ }
+
+ // TODO(chhsiao): Set the max entries and use a loop to do multiple
+ // `ListVolumes` calls.
+ return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest())
+ .then(process::defer(self(), [](const ListVolumesResponse& response) {
+ vector<VolumeInfo> result;
+ foreach (const auto& entry, response.entries()) {
+ result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()),
+ entry.volume().id(),
+ entry.volume().attributes()});
+ }
+
+ return result;
+ }));
}
@@ -90,7 +271,18 @@ Future<Bytes> VolumeManagerProcess::getCapacity(
const types::VolumeCapability& capability,
const Map<string, string>& parameters)
{
- return Failure("Unimplemented");
+ if (!controllerCapabilities->getCapacity) {
+ return Bytes(0);
+ }
+
+ GetCapacityRequest request;
+ *request.add_volume_capabilities() = evolve(capability);
+ *request.mutable_parameters() = parameters;
+
+ return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request))
+ .then([](const GetCapacityResponse& response) {
+ return Bytes(response.available_capacity());
+ });
}
@@ -100,7 +292,46 @@ Future<VolumeInfo> VolumeManagerProcess::createVolume(
const types::VolumeCapability& capability,
const Map<string, string>& parameters)
{
- return Failure("Unimplemented");
+ if (!controllerCapabilities->createDeleteVolume) {
+ return Failure(
+ "CREATE_DELETE_VOLUME controller capability is not supported for CSI "
+ "plugin type '" + info.type() + "' and name '" + info.name());
+ }
+
+ LOG(INFO) << "Creating volume with name '" << name << "'";
+
+ CreateVolumeRequest request;
+ request.set_name(name);
+ request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
+ request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
+ *request.add_volume_capabilities() = evolve(capability);
+ *request.mutable_parameters() = parameters;
+
+ // We retry the `CreateVolume` call for MESOS-9517.
+ return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+ .then(process::defer(self(), [=](
+ const CreateVolumeResponse& response) -> Future<VolumeInfo> {
+ const string& volumeId = response.volume().id();
+
+ // NOTE: If the volume is already tracked, there might already be
+ // operations running in its sequence. Since this continuation runs
+ // outside the sequence, we fail the call here to avoid any race issue.
+ // This also means that this call is not idempotent.
+ if (volumes.contains(volumeId)) {
+ return Failure("Volume with name '" + name + "' already exists");
+ }
+
+ VolumeState volumeState;
+ volumeState.set_state(VolumeState::CREATED);
+ *volumeState.mutable_volume_capability() = capability;
+ *volumeState.mutable_parameters() = parameters;
+ *volumeState.mutable_volume_attributes() = response.volume().attributes();
+
+ volumes.put(volumeId, std::move(volumeState));
+ checkpointVolumeState(volumeId);
+
+ return VolumeInfo{capacity, volumeId, response.volume().attributes()};
+ }));
}
@@ -109,13 +340,85 @@ Future<Option<Error>> VolumeManagerProcess::validateVolume(
const types::VolumeCapability& capability,
const Map<string, string>& parameters)
{
- return Failure("Unimplemented");
+ // If the volume has been checkpointed, the validation succeeds only if the
+ // capability and parameters of the specified profile are the same as those in
+ // the checkpoint.
+ if (volumes.contains(volumeInfo.id)) {
+ const VolumeState& volumeState = volumes.at(volumeInfo.id).state;
+
+ if (volumeState.volume_capability() != capability) {
+ return Some(
+ Error("Mismatched capability for volume '" + volumeInfo.id + "'"));
+ }
+
+ if (volumeState.parameters() != parameters) {
+ return Some(
+ Error("Mismatched parameters for volume '" + volumeInfo.id + "'"));
+ }
+
+ return None();
+ }
+
+ if (!parameters.empty()) {
+ LOG(WARNING)
+ << "Validating volumes against parameters is not supported in CSI v0";
+ }
+
+ LOG(INFO) << "Validating volume '" << volumeInfo.id << "'";
+
+ ValidateVolumeCapabilitiesRequest request;
+ request.set_volume_id(volumeInfo.id);
+ *request.add_volume_capabilities() = evolve(capability);
+ *request.mutable_volume_attributes() = volumeInfo.context;
+
+ return call<VALIDATE_VOLUME_CAPABILITIES>(
+ CONTROLLER_SERVICE, std::move(request))
+ .then(process::defer(self(), [=](
+ const ValidateVolumeCapabilitiesResponse& response)
+ -> Future<Option<Error>> {
+ if (!response.supported()) {
+ return Error(
+ "Unsupported volume capability for volume '" + volumeInfo.id +
+ "': " + response.message());
+ }
+
+ // NOTE: If the volume is already tracked, there might already be
+ // operations running in its sequence. Since this continuation runs
+ // outside the sequence, we fail the call here to avoid any race issue.
+ // This also means that this call is not idempotent.
+ if (volumes.contains(volumeInfo.id)) {
+ return Failure("Volume '" + volumeInfo.id + "' already validated");
+ }
+
+ VolumeState volumeState;
+ volumeState.set_state(VolumeState::CREATED);
+ *volumeState.mutable_volume_capability() = capability;
+ *volumeState.mutable_parameters() = parameters;
+ *volumeState.mutable_volume_attributes() = volumeInfo.context;
+
+ volumes.put(volumeInfo.id, std::move(volumeState));
+ checkpointVolumeState(volumeInfo.id);
+
+ return None();
+ }));
}
Future<bool> VolumeManagerProcess::deleteVolume(const string& volumeId)
{
- return Failure("Unimplemented");
+ if (!volumes.contains(volumeId)) {
+ return __deleteVolume(volumeId);
+ }
+
+ VolumeData& volume = volumes.at(volumeId);
+
+ LOG(INFO) << "Deleting volume '" << volumeId << "' in "
+ << volume.state.state() << " state";
+
+ // Volume deletion is sequentialized with other operations on the same volume
+ // to avoid races.
+ return volume.sequence->add(std::function<Future<bool>()>(
+ process::defer(self(), &Self::_deleteVolume, volumeId)));
}
@@ -133,7 +436,19 @@ Future<Nothing> VolumeManagerProcess::detachVolume(const string& volumeId)
Future<Nothing> VolumeManagerProcess::publishVolume(const string& volumeId)
{
- return Failure("Unimplemented");
+ if (!volumes.contains(volumeId)) {
+ return Failure("Cannot publish unknown volume '" + volumeId + "'");
+ }
+
+ VolumeData& volume = volumes.at(volumeId);
+
+ LOG(INFO) << "Publishing volume '" << volumeId << "' in "
+ << volume.state.state() << " state";
+
+ // Volume publishing is serialized with other operations on the same volume to
+ // avoid races.
+ return volume.sequence->add(std::function<Future<Nothing>()>(
+ process::defer(self(), &Self::_publishVolume, volumeId)));
}
@@ -143,6 +458,692 @@ Future<Nothing> VolumeManagerProcess::unpublishVolume(const string& volumeId)
}
+template <RPC Rpc>
+Future<Response<Rpc>> VolumeManagerProcess::call(
+ const Service& service,
+ const Request<Rpc>& request,
+ const bool retry) // Made immutable in the following mutable lambda.
+{
+ Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+ return process::loop(
+ self(),
+ [=] {
+ // Make the call to the latest service endpoint.
+ return serviceManager->getServiceEndpoint(service)
+ .then(process::defer(
+ self(), &VolumeManagerProcess::_call<Rpc>, lambda::_1, request));
+ },
+ [=](const Try<Response<Rpc>, StatusError>& result) mutable
+ -> Future<ControlFlow<Response<Rpc>>> {
+ Option<Duration> backoff = retry
+ ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
+ : Option<Duration>::none();
+
+ maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+ // We dispatch `__call` for testing purpose.
+ return process::dispatch(
+ self(), &VolumeManagerProcess::__call<Rpc>, result, backoff);
+ });
+}
+
+
+template <RPC Rpc>
+Future<Try<Response<Rpc>, StatusError>> VolumeManagerProcess::_call(
+ const string& endpoint, const Request<Rpc>& request)
+{
+ ++metrics->csi_plugin_rpcs_pending.at(Rpc);
+
+ return Client(endpoint, runtime).call<Rpc>(request)
+ .onAny(defer(self(), [=](
+ const Future<Try<Response<Rpc>, StatusError>>& future) {
+ --metrics->csi_plugin_rpcs_pending.at(Rpc);
+ if (future.isReady() && future->isSome()) {
+ ++metrics->csi_plugin_rpcs_successes.at(Rpc);
+ } else if (future.isDiscarded()) {
+ ++metrics->csi_plugin_rpcs_cancelled.at(Rpc);
+ } else {
+ ++metrics->csi_plugin_rpcs_errors.at(Rpc);
+ }
+ }));
+}
+
+
+template <RPC Rpc>
+Future<ControlFlow<Response<Rpc>>> VolumeManagerProcess::__call(
+ const Try<Response<Rpc>, StatusError>& result,
+ const Option<Duration>& backoff)
+{
+ if (result.isSome()) {
+ return Break(result.get());
+ }
+
+ if (backoff.isNone()) {
+ return Failure(result.error());
+ }
+
+ // See the link below for retryable status codes:
+ // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
+ switch (result.error().status.error_code()) {
+ case grpc::DEADLINE_EXCEEDED:
+ case grpc::UNAVAILABLE: {
+ LOG(ERROR) << "Received '" << result.error() << "' while calling " << Rpc
+ << ". Retrying in " << backoff.get();
+
+ return process::after(backoff.get())
+ .then([]() -> Future<ControlFlow<Response<Rpc>>> {
+ return Continue();
+ });
+ }
+ case grpc::CANCELLED:
+ case grpc::UNKNOWN:
+ case grpc::INVALID_ARGUMENT:
+ case grpc::NOT_FOUND:
+ case grpc::ALREADY_EXISTS:
+ case grpc::PERMISSION_DENIED:
+ case grpc::UNAUTHENTICATED:
+ case grpc::RESOURCE_EXHAUSTED:
+ case grpc::FAILED_PRECONDITION:
+ case grpc::ABORTED:
+ case grpc::OUT_OF_RANGE:
+ case grpc::UNIMPLEMENTED:
+ case grpc::INTERNAL:
+ case grpc::DATA_LOSS: {
+ return Failure(result.error());
+ }
+ case grpc::OK:
+ case grpc::DO_NOT_USE: {
+ UNREACHABLE();
+ }
+ }
+
+ UNREACHABLE();
+}
+
+
+Future<Nothing> VolumeManagerProcess::prepareServices()
+{
+ CHECK(!services.empty());
+
+ // Get the plugin capabilities.
+ return call<GET_PLUGIN_CAPABILITIES>(
+ *services.begin(), GetPluginCapabilitiesRequest())
+ .then(process::defer(self(), [=](
+ const GetPluginCapabilitiesResponse& response) -> Future<Nothing> {
+ pluginCapabilities = response.capabilities();
+
+ if (services.contains(CONTROLLER_SERVICE) &&
+ !pluginCapabilities->controllerService) {
+ return Failure(
+ "CONTROLLER_SERVICE plugin capability is not supported for CSI "
+ "plugin type '" + info.type() + "' and name '" + info.name() + "'");
+ }
+
+ return Nothing();
+ }))
+ // Check if all services have consistent plugin infos.
+ .then(process::defer(self(), [this] {
+ vector<Future<GetPluginInfoResponse>> futures;
+ foreach (const Service& service, services) {
+ futures.push_back(
+ call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest())
+ .onReady([service](const GetPluginInfoResponse& response) {
+ LOG(INFO) << service << " loaded: " << stringify(response);
+ }));
+ }
+
+ return process::collect(futures)
+ .then([](const vector<GetPluginInfoResponse>& pluginInfos) {
+ for (size_t i = 1; i < pluginInfos.size(); ++i) {
+ if (pluginInfos[i].name() != pluginInfos[0].name() ||
+ pluginInfos[i].vendor_version() !=
+ pluginInfos[0].vendor_version()) {
+ LOG(WARNING) << "Inconsistent plugin services. Please check with "
+ "the plugin vendor to ensure compatibility.";
+ }
+ }
+
+ return Nothing();
+ });
+ }))
+ // Get the controller capabilities.
+ .then(process::defer(self(), [this]() -> Future<Nothing> {
+ if (!services.contains(CONTROLLER_SERVICE)) {
+ controllerCapabilities = ControllerCapabilities();
+ return Nothing();
+ }
+
+ return call<CONTROLLER_GET_CAPABILITIES>(
+ CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest())
+ .then(process::defer(self(), [this](
+ const ControllerGetCapabilitiesResponse& response) {
+ controllerCapabilities = response.capabilities();
+ return Nothing();
+ }));
+ }))
+ // Get the node capabilities and ID.
+ .then(process::defer(self(), [this]() -> Future<Nothing> {
+ if (!services.contains(NODE_SERVICE)) {
+ nodeCapabilities = NodeCapabilities();
+ return Nothing();
+ }
+
+ return call<NODE_GET_CAPABILITIES>(
+ NODE_SERVICE, NodeGetCapabilitiesRequest())
+ .then(process::defer(self(), [this](
+ const NodeGetCapabilitiesResponse& response) -> Future<Nothing> {
+ nodeCapabilities = response.capabilities();
+
+ if (controllerCapabilities->publishUnpublishVolume) {
+ return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest())
+ .then(process::defer(self(), [this](
+ const NodeGetIdResponse& response) {
+ nodeId = response.node_id();
+ return Nothing();
+ }));
+ }
+
+ return Nothing();
+ }));
+ }));
+}
+
+
+Future<bool> VolumeManagerProcess::_deleteVolume(const std::string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ if (volumeState.node_publish_required()) {
+ CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
+
+ const string targetPath = paths::getMountTargetPath(
+ paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+ // NOTE: Normally the volume should have been cleaned up. However this may
+ // not be true for preprovisioned volumes (e.g., leftover from a previous
+ // resource provider instance). To prevent data leakage in such cases, we
+ // clean up the data (but not the target path) here.
+ Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
+ if (rmdir.isError()) {
+ return Failure(
+ "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
+ }
+
+ volumeState.set_node_publish_required(false);
+ checkpointVolumeState(volumeId);
+ }
+
+ if (volumeState.state() != VolumeState::CREATED) {
+ // Retry after transitioning the volume to `CREATED` state.
+ return _detachVolume(volumeId)
+ .then(process::defer(self(), &Self::_deleteVolume, volumeId));
+ }
+
+ // NOTE: The last asynchronous continuation, which is supposed to be run in
+ // the volume's sequence, would cause the sequence to be destructed, which
+ // would in turn discard the returned future. However, since the continuation
+ // would have already been run, the returned future will become ready, making
+ // the future returned by the sequence ready as well.
+ return __deleteVolume(volumeId)
+ .then(process::defer(self(), [this, volumeId](bool deleted) {
+ volumes.erase(volumeId);
+
+ const string volumePath =
+ paths::getVolumePath(rootDir, info.type(), info.name(), volumeId);
+
+ Try<Nothing> rmdir = os::rmdir(volumePath);
+ CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
+ << volumePath << "': " << rmdir.error();
+
+ garbageCollectMountPath(volumeId);
+
+ return deleted;
+ }));
+}
+
+
+Future<bool> VolumeManagerProcess::__deleteVolume(
+ const string& volumeId)
+{
+ if (!controllerCapabilities->createDeleteVolume) {
+ return false;
+ }
+
+ LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '"
+ << volumeId << "'";
+
+ DeleteVolumeRequest request;
+ request.set_volume_id(volumeId);
+
+ // We retry the `DeleteVolume` call for MESOS-9517.
+ return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+ .then([] { return true; });
+}
+
+
+Future<Nothing> VolumeManagerProcess::_attachVolume(const string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ if (volumeState.state() == VolumeState::NODE_READY) {
+ return Nothing();
+ }
+
+ if (volumeState.state() != VolumeState::CREATED &&
+ volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
+ volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
+ return Failure(
+ "Cannot attach volume '" + volumeId + "' in " +
+ stringify(volumeState.state()) + " state");
+ }
+
+ if (!controllerCapabilities->publishUnpublishVolume) {
+ // Since this is a no-op, no need to checkpoint here.
+ volumeState.set_state(VolumeState::NODE_READY);
+ return Nothing();
+ }
+
+ // A previously failed `ControllerUnpublishVolume` call can be recovered
+ // through an extra `ControllerUnpublishVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerunpublishvolume // NOLINT
+ if (volumeState.state() == VolumeState::CONTROLLER_UNPUBLISH) {
+ // Retry after recovering the volume to `CREATED` state.
+ return _detachVolume(volumeId)
+ .then(process::defer(self(), &Self::_attachVolume, volumeId));
+ }
+
+ if (volumeState.state() == VolumeState::CREATED) {
+ volumeState.set_state(VolumeState::CONTROLLER_PUBLISH);
+ checkpointVolumeState(volumeId);
+ }
+
+ LOG(INFO)
+ << "Calling '/csi.v0.Controller/ControllerPublishVolume' for volume '"
+ << volumeId << "'";
+
+ ControllerPublishVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_node_id(CHECK_NOTNONE(nodeId));
+ *request.mutable_volume_capability() =
+ evolve(volumeState.volume_capability());
+ request.set_readonly(false);
+ *request.mutable_volume_attributes() = volumeState.volume_attributes();
+
+ return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, std::move(request))
+ .then(process::defer(self(), [this, volumeId](
+ const ControllerPublishVolumeResponse& response) {
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+ volumeState.set_state(VolumeState::NODE_READY);
+ *volumeState.mutable_publish_info() = response.publish_info();
+
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::_detachVolume(const string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ if (volumeState.state() == VolumeState::CREATED) {
+ return Nothing();
+ }
+
+ if (volumeState.state() != VolumeState::NODE_READY &&
+ volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
+ volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
+ // Retry after transitioning the volume to `CREATED` state.
+ return _unpublishVolume(volumeId)
+ .then(process::defer(self(), &Self::_detachVolume, volumeId));
+ }
+
+ if (!controllerCapabilities->publishUnpublishVolume) {
+ // Since this is a no-op, no need to checkpoint here.
+ volumeState.set_state(VolumeState::CREATED);
+ return Nothing();
+ }
+
+ // A previously failed `ControllerPublishVolume` call can be recovered through
+ // the current `ControllerUnpublishVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
+ if (volumeState.state() == VolumeState::NODE_READY ||
+ volumeState.state() == VolumeState::CONTROLLER_PUBLISH) {
+ volumeState.set_state(VolumeState::CONTROLLER_UNPUBLISH);
+ checkpointVolumeState(volumeId);
+ }
+
+ LOG(INFO)
+ << "Calling '/csi.v0.Controller/ControllerUnpublishVolume' for volume '"
+ << volumeId << "'";
+
+ ControllerUnpublishVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_node_id(CHECK_NOTNONE(nodeId));
+
+ return call<CONTROLLER_UNPUBLISH_VOLUME>(
+ CONTROLLER_SERVICE, std::move(request))
+ .then(process::defer(self(), [this, volumeId] {
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+ volumeState.set_state(VolumeState::CREATED);
+ volumeState.mutable_publish_info()->clear();
+
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::_publishVolume(const string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ if (volumeState.state() == VolumeState::PUBLISHED) {
+ CHECK(volumeState.node_publish_required());
+ return Nothing();
+ }
+
+ if (volumeState.state() != VolumeState::VOL_READY &&
+ volumeState.state() != VolumeState::NODE_PUBLISH &&
+ volumeState.state() != VolumeState::NODE_UNPUBLISH) {
+ // Retry after transitioning the volume to `VOL_READY` state.
+ return __publishVolume(volumeId)
+ .then(process::defer(self(), &Self::_publishVolume, volumeId));
+ }
+
+ // A previously failed `NodeUnpublishVolume` call can be recovered through an
+ // extra `NodeUnpublishVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunpublishvolume // NOLINT
+ if (volumeState.state() == VolumeState::NODE_UNPUBLISH) {
+ // Retry after recovering the volume to `VOL_READY` state.
+ return __unpublishVolume(volumeId)
+ .then(process::defer(self(), &Self::_publishVolume, volumeId));
+ }
+
+ const string targetPath = paths::getMountTargetPath(
+ paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+ // NOTE: The target path will be cleaned up during volume removal.
+ Try<Nothing> mkdir = os::mkdir(targetPath);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create mount target path '" + targetPath +
+ "': " + mkdir.error());
+ }
+
+ if (volumeState.state() == VolumeState::VOL_READY) {
+ volumeState.set_state(VolumeState::NODE_PUBLISH);
+ checkpointVolumeState(volumeId);
+ }
+
+ LOG(INFO) << "Calling '/csi.v0.Node/NodePublishVolume' for volume '"
+ << volumeId << "'";
+
+ NodePublishVolumeRequest request;
+ request.set_volume_id(volumeId);
+ *request.mutable_publish_info() = volumeState.publish_info();
+ request.set_target_path(targetPath);
+ *request.mutable_volume_capability() =
+ evolve(volumeState.volume_capability());
+ request.set_readonly(false);
+ *request.mutable_volume_attributes() = volumeState.volume_attributes();
+
+ if (nodeCapabilities->stageUnstageVolume) {
+ const string stagingPath = paths::getMountStagingPath(
+ paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+ CHECK(os::exists(stagingPath));
+ request.set_staging_target_path(stagingPath);
+ }
+
+ return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
+ .then(defer(self(), [this, volumeId, targetPath] {
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ volumeState.set_state(VolumeState::PUBLISHED);
+
+ // NOTE: This is the first time a container is going to consume the
+ // persistent volume, so the `node_publish_required` field is set to
+ // indicate that this volume must remain published so it can be
+ // synchronously cleaned up when the persistent volume is destroyed.
+ volumeState.set_node_publish_required(true);
+
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::__publishVolume(const string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ if (volumeState.state() == VolumeState::VOL_READY) {
+ CHECK(!volumeState.boot_id().empty());
+ return Nothing();
+ }
+
+ if (volumeState.state() != VolumeState::NODE_READY &&
+ volumeState.state() != VolumeState::NODE_STAGE &&
+ volumeState.state() != VolumeState::NODE_UNSTAGE) {
+ // Retry after transitioning the volume to `NODE_READY` state.
+ return _attachVolume(volumeId)
+ .then(process::defer(self(), &Self::__publishVolume, volumeId));
+ }
+
+ if (!nodeCapabilities->stageUnstageVolume) {
+ // Since this is a no-op, no need to checkpoint here.
+ volumeState.set_state(VolumeState::VOL_READY);
+ volumeState.set_boot_id(CHECK_NOTNONE(bootId));
+ return Nothing();
+ }
+
+ // A previously failed `NodeUnstageVolume` call can be recovered through an
+ // extra `NodeUnstageVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunstagevolume // NOLINT
+ if (volumeState.state() == VolumeState::NODE_UNSTAGE) {
+ // Retry after recovering the volume to `NODE_READY` state.
+ return _unpublishVolume(volumeId)
+ .then(process::defer(self(), &Self::__publishVolume, volumeId));
+ }
+
+ const string stagingPath = paths::getMountStagingPath(
+ paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+ // NOTE: The staging path will be cleaned up in during volume removal.
+ Try<Nothing> mkdir = os::mkdir(stagingPath);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create mount staging path '" + stagingPath +
+ "': " + mkdir.error());
+ }
+
+ if (volumeState.state() == VolumeState::NODE_READY) {
+ volumeState.set_state(VolumeState::NODE_STAGE);
+ checkpointVolumeState(volumeId);
+ }
+
+ LOG(INFO) << "Calling '/csi.v0.Node/NodeStageVolume' for volume '" << volumeId
+ << "'";
+
+ NodeStageVolumeRequest request;
+ request.set_volume_id(volumeId);
+ *request.mutable_publish_info() = volumeState.publish_info();
+ request.set_staging_target_path(stagingPath);
+ *request.mutable_volume_capability() =
+ evolve(volumeState.volume_capability());
+ *request.mutable_volume_attributes() = volumeState.volume_attributes();
+
+ return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request))
+ .then(process::defer(self(), [this, volumeId] {
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+ volumeState.set_state(VolumeState::VOL_READY);
+ volumeState.set_boot_id(CHECK_NOTNONE(bootId));
+
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::_unpublishVolume(const string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ if (volumeState.state() == VolumeState::NODE_READY) {
+ CHECK(volumeState.boot_id().empty());
+ return Nothing();
+ }
+
+ if (volumeState.state() != VolumeState::VOL_READY &&
+ volumeState.state() != VolumeState::NODE_STAGE &&
+ volumeState.state() != VolumeState::NODE_UNSTAGE) {
+ // Retry after transitioning the volume to `VOL_READY` state.
+ return __unpublishVolume(volumeId)
+ .then(process::defer(self(), &Self::_unpublishVolume, volumeId));
+ }
+
+ if (!nodeCapabilities->stageUnstageVolume) {
+ // Since this is a no-op, no need to checkpoint here.
+ volumeState.set_state(VolumeState::NODE_READY);
+ volumeState.clear_boot_id();
+ return Nothing();
+ }
+
+ // A previously failed `NodeStageVolume` call can be recovered through the
+ // current `NodeUnstageVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
+ if (volumeState.state() == VolumeState::VOL_READY ||
+ volumeState.state() == VolumeState::NODE_STAGE) {
+ volumeState.set_state(VolumeState::NODE_UNSTAGE);
+ checkpointVolumeState(volumeId);
+ }
+
+ const string stagingPath = paths::getMountStagingPath(
+ paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+ CHECK(os::exists(stagingPath));
+
+ LOG(INFO) << "Calling '/csi.v0.Node/NodeUnstageVolume' for volume '"
+ << volumeId << "'";
+
+ NodeUnstageVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_staging_target_path(stagingPath);
+
+ return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request))
+ .then(process::defer(self(), [this, volumeId] {
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+ volumeState.set_state(VolumeState::NODE_READY);
+ volumeState.clear_boot_id();
+
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::__unpublishVolume(const string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+
+ if (volumeState.state() == VolumeState::VOL_READY) {
+ return Nothing();
+ }
+
+ if (volumeState.state() != VolumeState::PUBLISHED &&
+ volumeState.state() != VolumeState::NODE_PUBLISH &&
+ volumeState.state() != VolumeState::NODE_UNPUBLISH) {
+ return Failure(
+ "Cannot unpublish volume '" + volumeId + "' in " +
+ stringify(volumeState.state()) + "state");
+ }
+
+ // A previously failed `NodePublishVolume` call can be recovered through the
+ // current `NodeUnpublishVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
+ if (volumeState.state() == VolumeState::PUBLISHED ||
+ volumeState.state() == VolumeState::NODE_PUBLISH) {
+ volumeState.set_state(VolumeState::NODE_UNPUBLISH);
+ checkpointVolumeState(volumeId);
+ }
+
+ const string targetPath = paths::getMountTargetPath(
+ paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+ CHECK(os::exists(targetPath));
+
+ LOG(INFO) << "Calling '/csi.v0.Node/NodeUnpublishVolume' for volume '"
+ << volumeId << "'";
+
+ NodeUnpublishVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_target_path(targetPath);
+
+ return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
+ .then(process::defer(self(), [this, volumeId] {
+ CHECK(volumes.contains(volumeId));
+ VolumeState& volumeState = volumes.at(volumeId).state;
+ volumeState.set_state(VolumeState::VOL_READY);
+
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }));
+}
+
+
+void VolumeManagerProcess::checkpointVolumeState(const string& volumeId)
+{
+ const string statePath =
+ paths::getVolumeStatePath(rootDir, info.type(), info.name(), volumeId);
+
+ // NOTE: We ensure the checkpoint is synced to the filesystem to avoid
+ // resulting in a stale or empty checkpoint when a system crash happens.
+ Try<Nothing> checkpoint =
+ slave::state::checkpoint(statePath, volumes.at(volumeId).state, true);
+
+ CHECK_SOME(checkpoint)
+ << "Failed to checkpoint volume state to '" << statePath << "':"
+ << checkpoint.error();
+}
+
+
+void VolumeManagerProcess::garbageCollectMountPath(const string& volumeId)
+{
+ CHECK(!volumes.contains(volumeId));
+
+ const string path = paths::getMountPath(
+ paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+ if (os::exists(path)) {
+ Try<Nothing> rmdir = os::rmdir(path);
+ if (rmdir.isError()) {
+ LOG(ERROR) << "Failed to remove directory '" << path
+ << "': " << rmdir.error();
+ }
+ }
+}
+
+
VolumeManager::VolumeManager(
const http::URL& agentUrl,
const string& rootDir,
diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp
index 9db99de..214fc1f 100644
--- a/src/csi/v0_volume_manager_process.hpp
+++ b/src/csi/v0_volume_manager_process.hpp
@@ -29,17 +29,25 @@
#include <process/future.hpp>
#include <process/grpc.hpp>
#include <process/http.hpp>
+#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
+#include <process/sequence.hpp>
#include <stout/bytes.hpp>
+#include <stout/duration.hpp>
#include <stout/error.hpp>
+#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
+#include <stout/try.hpp>
#include "csi/metrics.hpp"
+#include "csi/rpc.hpp"
#include "csi/service_manager.hpp"
+#include "csi/state.hpp"
+#include "csi/utils.hpp"
#include "csi/v0_volume_manager.hpp"
#include "csi/volume_manager.hpp"
@@ -47,6 +55,16 @@ namespace mesos {
namespace csi {
namespace v0 {
+// The CSI volume manager initially picks a random amount of time between
+// `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI calls.
+// Subsequent retries are exponentially backed off based on this interval (e.g.,
+// 2nd retry uses a random value between `[0, b * 2^1]`, 3rd retry between
+// `[0, b * 2^2]`, etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
+//
+// TODO(chhsiao): Make the retry parameters configurable.
+constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
+constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
+
class VolumeManagerProcess : public process::Process<VolumeManagerProcess>
{
@@ -90,7 +108,76 @@ public:
process::Future<Nothing> unpublishVolume(const std::string& volumeId);
+ // Wrapper functions to make CSI calls and update RPC metrics. Made public for
+ // testing purpose.
+ //
+ // The call is made asynchronously and thus no guarantee is provided on the
+ // order in which calls are sent. Callers need to either ensure to not have
+ // multiple conflicting calls in flight, or treat results idempotently.
+ //
+ // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
+ // calls on the same volume, and 2) no profile update while there are ongoing
+ // `CREATE_DISK` or `DESTROY_DISK` operations.
+ template <RPC Rpc>
+ process::Future<Response<Rpc>> call(
+ const Service& service, const Request<Rpc>& request, bool retry = false);
+
+ template <RPC Rpc>
+ process::Future<Try<Response<Rpc>, process::grpc::StatusError>>
+ _call(const std::string& endpoint, const Request<Rpc>& request);
+
+ template <RPC Rpc>
+ process::Future<process::ControlFlow<Response<Rpc>>> __call(
+ const Try<Response<Rpc>, process::grpc::StatusError>& result,
+ const Option<Duration>& backoff);
+
private:
+ process::Future<Nothing> prepareServices();
+
+ process::Future<bool> _deleteVolume(const std::string& volumeId);
+ process::Future<bool> __deleteVolume(const std::string& volumeId);
+
+ // The following methods are used to manage volume lifecycles. Transient
+ // states are omitted.
+ //
+ // +------------+
+ // + + + | CREATED | ^
+ // _attachVolume | | | +---+----^---+ |
+ // | | | | | | _detachVolume
+ // | | | +---v----+---+ |
+ // v + + | NODE_READY | + ^
+ // | | +---+----^---+ | |
+ // __publishVolume | | | | | | _unpublishVolume
+ // | | +---v----+---+ | |
+ // v + | VOL_READY | + + ^
+ // | +---+----^---+ | | |
+ // _publishVolume | | | | | | __unpublishVolume
+ // | +---v----+---+ | | |
+ // V | PUBLISHED | + + +
+ // +------------+
+
+ // Transition a volume to `NODE_READY` state from any state above.
+ process::Future<Nothing> _attachVolume(const std::string& volumeId);
+
+ // Transition a volume to `CREATED` state from any state below.
+ process::Future<Nothing> _detachVolume(const std::string& volumeId);
+
+ // Transition a volume to `PUBLISHED` state from any state above.
+ process::Future<Nothing> _publishVolume(const std::string& volumeId);
+
+ // Transition a volume to `VOL_READY` state from any state above.
+ process::Future<Nothing> __publishVolume(const std::string& volumeId);
+
+ // Transition a volume to `NODE_READY` state from any state below.
+ process::Future<Nothing> _unpublishVolume(const std::string& volumeId);
+
+ // Transition a volume to `VOL_READY` state from any state below.
+ process::Future<Nothing> __unpublishVolume(const std::string& volumeId);
+
+ void checkpointVolumeState(const std::string& volumeId);
+
+ void garbageCollectMountPath(const std::string& volumeId);
+
const std::string rootDir;
const CSIPluginInfo info;
const hashset<Service> services;
@@ -98,6 +185,26 @@ private:
process::grpc::client::Runtime runtime;
Metrics* metrics;
process::Owned<ServiceManager> serviceManager;
+
+ Option<std::string> bootId;
+ Option<PluginCapabilities> pluginCapabilities;
+ Option<ControllerCapabilities> controllerCapabilities;
+ Option<NodeCapabilities> nodeCapabilities;
+ Option<std::string> nodeId;
+
+ struct VolumeData
+ {
+ VolumeData(state::VolumeState&& _state)
+ : state(_state), sequence(new process::Sequence("csi-volume-sequence")) {}
+
+ state::VolumeState state;
+
+ // We call all CSI operations on the same volume in a sequence to ensure
+ // that they are processed in a sequential order.
+ process::Owned<process::Sequence> sequence;
+ };
+
+ hashmap<std::string, VolumeData> volumes;
};
} // namespace v0 {
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index c5a5213..fba5b18 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -18,29 +18,24 @@
#include <algorithm>
#include <cctype>
-#include <cstdlib>
#include <functional>
#include <list>
#include <memory>
#include <numeric>
#include <queue>
-#include <type_traits>
#include <utility>
#include <vector>
#include <glog/logging.h>
-#include <process/after.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
-#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/grpc.hpp>
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/process.hpp>
-#include <process/sequence.hpp>
#include <process/metrics/counter.hpp>
#include <process/metrics/metrics.hpp>
@@ -74,13 +69,8 @@
#include "common/protobuf_utils.hpp"
#include "common/resources_utils.hpp"
-#include "csi/client.hpp"
#include "csi/metrics.hpp"
#include "csi/paths.hpp"
-#include "csi/rpc.hpp"
-#include "csi/service_manager.hpp"
-#include "csi/state.hpp"
-#include "csi/utils.hpp"
#include "csi/volume_manager.hpp"
#include "internal/devolve.hpp"
@@ -98,10 +88,6 @@
namespace http = process::http;
-// TODO(chhsiao): Remove `using namespace` statements after refactoring.
-using namespace mesos::csi;
-using namespace mesos::csi::v0;
-
using std::accumulate;
using std::find;
using std::list;
@@ -110,24 +96,18 @@ using std::shared_ptr;
using std::string;
using std::vector;
-using google::protobuf::Map;
-
-using process::after;
using process::await;
-using process::Break;
using process::collect;
using process::Continue;
using process::ControlFlow;
using process::defer;
using process::delay;
-using process::dispatch;
using process::Failure;
using process::Future;
using process::loop;
using process::Owned;
using process::ProcessBase;
using process::Promise;
-using process::Sequence;
using process::spawn;
using process::grpc::StatusError;
@@ -137,9 +117,8 @@ using process::http::authentication::Principal;
using process::metrics::Counter;
using process::metrics::PushGauge;
-using mesos::csi::ServiceManager;
-
-using mesos::csi::state::VolumeState;
+using mesos::csi::VolumeInfo;
+using mesos::csi::VolumeManager;
using mesos::internal::protobuf::convertLabelsToStringMap;
using mesos::internal::protobuf::convertStringMapToLabels;
@@ -339,140 +318,31 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
}
-template <
- csi::v0::RPC rpc,
- typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
-Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call(
- const csi::Service& service,
- const csi::v0::Request<rpc>& request,
- const bool retry) // Make immutable in the following mutable lambda.
-{
- Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
-
- return loop(
- self(),
- [=] {
- // Make the call to the latest service endpoint.
- return serviceManager->getServiceEndpoint(service)
- .then(defer(
- self(),
- &StorageLocalResourceProviderProcess::_call<rpc>,
- lambda::_1,
- request));
- },
- [=](const Try<csi::v0::Response<rpc>, StatusError>& result) mutable
- -> Future<ControlFlow<csi::v0::Response<rpc>>> {
- Option<Duration> backoff = retry
- ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
- : Option<Duration>::none();
-
- maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
-
- // We dispatch `__call` for testing purpose.
- return dispatch(
- self(),
- &StorageLocalResourceProviderProcess::__call<rpc>,
- result,
- backoff);
- });
-}
-
-
-template <csi::v0::RPC rpc>
-Future<Try<csi::v0::Response<rpc>, StatusError>>
-StorageLocalResourceProviderProcess::_call(
- const string& endpoint, const csi::v0::Request<rpc>& request)
-{
- ++metrics.csi_plugin_rpcs_pending.at(rpc);
-
- return csi::v0::Client(endpoint, runtime).call<rpc>(request)
- .onAny(defer(self(), [=](
- const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) {
- --metrics.csi_plugin_rpcs_pending.at(rpc);
- if (future.isReady() && future->isSome()) {
- ++metrics.csi_plugin_rpcs_successes.at(rpc);
- } else if (future.isDiscarded()) {
- ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
- } else {
- ++metrics.csi_plugin_rpcs_errors.at(rpc);
- }
- }));
-}
-
-
-template <csi::v0::RPC rpc>
-Future<ControlFlow<csi::v0::Response<rpc>>>
-StorageLocalResourceProviderProcess::__call(
- const Try<csi::v0::Response<rpc>, StatusError>& result,
- const Option<Duration>& backoff)
-{
- if (result.isSome()) {
- return Break(result.get());
- }
-
- if (backoff.isNone()) {
- return Failure(result.error());
- }
-
- // See the link below for retryable status codes:
- // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
- switch (result.error().status.error_code()) {
- case grpc::DEADLINE_EXCEEDED:
- case grpc::UNAVAILABLE: {
- LOG(ERROR)
- << "Received '" << result.error() << "' while calling " << rpc
- << ". Retrying in " << backoff.get();
-
- return after(backoff.get())
- .then([]() -> Future<ControlFlow<csi::v0::Response<rpc>>> {
- return Continue();
- });
- }
- case grpc::CANCELLED:
- case grpc::UNKNOWN:
- case grpc::INVALID_ARGUMENT:
- case grpc::NOT_FOUND:
- case grpc::ALREADY_EXISTS:
- case grpc::PERMISSION_DENIED:
- case grpc::UNAUTHENTICATED:
- case grpc::RESOURCE_EXHAUSTED:
- case grpc::FAILED_PRECONDITION:
- case grpc::ABORTED:
- case grpc::OUT_OF_RANGE:
- case grpc::UNIMPLEMENTED:
- case grpc::INTERNAL:
- case grpc::DATA_LOSS: {
- return Failure(result.error());
- }
- case grpc::OK:
- case grpc::DO_NOT_USE: {
- UNREACHABLE();
- }
- }
-
- UNREACHABLE();
-}
-
-
void StorageLocalResourceProviderProcess::initialize()
{
const Principal principal = LocalResourceProvider::principal(info);
CHECK(principal.claims.contains("cid_prefix"));
const string& containerPrefix = principal.claims.at("cid_prefix");
- rootDir = slave::paths::getCsiRootDir(workDir);
- pluginInfo = info.storage().plugin();
- services = {CONTROLLER_SERVICE, NODE_SERVICE};
-
- serviceManager.reset(new ServiceManager(
+ Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create(
extractParentEndpoint(url),
- rootDir,
- pluginInfo,
- services,
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin(),
+ {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
containerPrefix,
authToken,
- runtime,
- &metrics));
+ &metrics);
+
+ if (volumeManager_.isError()) {
+ LOG(ERROR)
+ << "Failed to create CSI volume manager for resource provider with type '"
+ << info.type() << "' and name '" << info.name()
+ << "': " << volumeManager_.error();
+
+ fatal();
+ }
+
+ volumeManager = std::move(volumeManager_).get();
auto die = [=](const string& message) {
LOG(ERROR)
@@ -503,7 +373,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
{
CHECK_EQ(RECOVERING, state);
- return recoverVolumes()
+ return volumeManager->recover()
.then(defer(self(), [=]() -> Future<Nothing> {
// Recover the resource provider ID and state from the latest symlink. If
// the symlink does not exist, this is a new resource provider, and the
@@ -604,147 +474,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
}
-Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
-{
- Try<string> bootId_ = os::bootId();
- if (bootId_.isError()) {
- return Failure("Failed to get boot ID: " + bootId_.error());
- }
-
- bootId = bootId_.get();
-
- return serviceManager->recover()
- .then(process::defer(self(), &Self::prepareServices))
- .then(process::defer(self(), [this]() -> Future<Nothing> {
- // Recover the states of CSI volumes.
- Try<list<string>> volumePaths =
- paths::getVolumePaths(rootDir, pluginInfo.type(), pluginInfo.name());
-
- if (volumePaths.isError()) {
- return Failure(
- "Failed to find volumes for CSI plugin type '" + pluginInfo.type() +
- "' and name '" + pluginInfo.name() + "': " + volumePaths.error());
- }
-
- vector<Future<Nothing>> futures;
-
- foreach (const string& path, volumePaths.get()) {
- Try<paths::VolumePath> volumePath =
- paths::parseVolumePath(rootDir, path);
-
- if (volumePath.isError()) {
- return Failure(
- "Failed to parse volume path '" + path +
- "': " + volumePath.error());
- }
-
- CHECK_EQ(pluginInfo.type(), volumePath->type);
- CHECK_EQ(pluginInfo.name(), volumePath->name);
-
- const string& volumeId = volumePath->volumeId;
- const string statePath = paths::getVolumeStatePath(
- rootDir, pluginInfo.type(), pluginInfo.name(), volumeId);
-
- if (!os::exists(statePath)) {
- continue;
- }
-
- Result<VolumeState> volumeState =
- slave::state::read<VolumeState>(statePath);
-
- if (volumeState.isError()) {
- return Failure(
- "Failed to read volume state from '" + statePath +
- "': " + volumeState.error());
- }
-
- if (volumeState.isNone()) {
- continue;
- }
-
- volumes.put(volumeId, std::move(volumeState.get()));
- VolumeData& volume = volumes.at(volumeId);
-
- if (!VolumeState::State_IsValid(volume.state.state())) {
- return Failure("Volume '" + volumeId + "' is in INVALID state");
- }
-
- // First, if there is a node reboot after the volume is made
- // publishable, it should be reset to `NODE_READY`.
- switch (volume.state.state()) {
- case VolumeState::CREATED:
- case VolumeState::NODE_READY:
- case VolumeState::CONTROLLER_PUBLISH:
- case VolumeState::CONTROLLER_UNPUBLISH:
- case VolumeState::NODE_STAGE: {
- break;
- }
- case VolumeState::VOL_READY:
- case VolumeState::PUBLISHED:
- case VolumeState::NODE_UNSTAGE:
- case VolumeState::NODE_PUBLISH:
- case VolumeState::NODE_UNPUBLISH: {
- if (bootId != volume.state.boot_id()) {
- // Since this is a no-op, no need to checkpoint here.
- volume.state.set_state(VolumeState::NODE_READY);
- volume.state.clear_boot_id();
- }
-
- break;
- }
- case VolumeState::UNKNOWN: {
- return Failure("Volume '" + volumeId + "' is in UNKNOWN state");
- }
-
- // NOTE: We avoid using a default clause for the following values in
- // proto3's open enum to enable the compiler to detect missing enum
- // cases for us. See: https://github.com/google/protobuf/issues/3917
- case google::protobuf::kint32min:
- case google::protobuf::kint32max: {
- UNREACHABLE();
- }
- }
-
- // Second, if the volume has been used by a container before recovery,
- // we have to bring the volume back to `PUBLISHED` so data can be
- // cleaned up synchronously when needed.
- if (volume.state.node_publish_required()) {
- futures.push_back(publishVolume(volumeId));
- }
- }
-
- // Garbage collect leftover mount paths that were failed to remove before.
- const string mountRootDir =
- paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name());
-
- Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir);
- if (mountPaths.isError()) {
- // TODO(chhsiao): This could indicate that something is seriously wrong.
- // To help debugging the problem, we should surface the error via
- // MESOS-8745.
- return Failure(
- "Failed to find mount paths for CSI plugin type '" +
- pluginInfo.type() + "' and name '" + pluginInfo.name() +
- "': " + mountPaths.error());
- }
-
- foreach (const string& path, mountPaths.get()) {
- Try<string> volumeId = paths::parseMountPath(mountRootDir, path);
- if (volumeId.isError()) {
- return Failure(
- "Failed to parse mount path '" + path + "': " + volumeId.error());
- }
-
- if (!volumes.contains(volumeId.get())) {
- garbageCollectMountPath(volumeId.get());
- }
- }
-
- return process::collect(futures).then([] { return Nothing(); });
- }));
-}
-
-
void StorageLocalResourceProviderProcess::doReliableRegistration()
{
if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
@@ -1113,7 +842,7 @@ Future<Resources> StorageLocalResourceProviderProcess::getRawVolumes()
{
CHECK(info.has_id());
- return listVolumes()
+ return volumeManager->listVolumes()
.then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) {
Resources resources;
@@ -1156,15 +885,15 @@ Future<Resources> StorageLocalResourceProviderProcess::getStoragePools()
foreachpair (const string& profile,
const DiskProfileAdaptor::ProfileInfo& profileInfo,
profileInfos) {
- futures.push_back(
- getCapacity(profileInfo.capability, profileInfo.parameters)
- .then(defer(self(), [=](const Bytes& capacity) -> Resources {
- if (capacity == 0) {
- return Resources();
- }
+ futures.push_back(volumeManager->getCapacity(
+ profileInfo.capability, profileInfo.parameters)
+ .then(defer(self(), [=](const Bytes& capacity) -> Resources {
+ if (capacity == 0) {
+ return Resources();
+ }
- return createRawDiskResource(info, capacity, profile, vendor);
- })));
+ return createRawDiskResource(info, capacity, profile, vendor);
+ })));
}
return collect(futures)
@@ -1395,7 +1124,6 @@ void StorageLocalResourceProviderProcess::publishResources(
case Resource::DiskInfo::Source::MOUNT:
case Resource::DiskInfo::Source::BLOCK: {
CHECK(resource.disk().source().has_id());
- CHECK(volumes.contains(resource.disk().source().id()));
volumeIds.insert(resource.disk().source().id());
break;
}
@@ -1418,7 +1146,7 @@ void StorageLocalResourceProviderProcess::publishResources(
vector<Future<Nothing>> futures;
foreach (const string& volumeId, volumeIds) {
- futures.push_back(publishVolume(volumeId));
+ futures.push_back(volumeManager->publishVolume(volumeId));
}
allPublished = collect(futures);
@@ -1526,766 +1254,6 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
}
-Future<Nothing> StorageLocalResourceProviderProcess::prepareServices()
-{
- CHECK(!services.empty());
-
- // Get the plugin capabilities.
- return call<GET_PLUGIN_CAPABILITIES>(
- *services.begin(), GetPluginCapabilitiesRequest())
- .then(process::defer(self(), [=](
- const GetPluginCapabilitiesResponse& response) -> Future<Nothing> {
- pluginCapabilities = response.capabilities();
-
- if (services.contains(CONTROLLER_SERVICE) &&
- !pluginCapabilities->controllerService) {
- return Failure(
- "CONTROLLER_SERVICE plugin capability is not supported for CSI "
- "plugin type '" +
- pluginInfo.type() + "' and name '" + pluginInfo.name() + "'");
- }
-
- return Nothing();
- }))
- // Check if all services have consistent plugin infos.
- .then(process::defer(self(), [this] {
- vector<Future<GetPluginInfoResponse>> futures;
- foreach (const Service& service, services) {
- futures.push_back(
- call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest())
- .onReady([service](const GetPluginInfoResponse& response) {
- LOG(INFO) << service << " loaded: " << stringify(response);
- }));
- }
-
- return process::collect(futures)
- .then([](const vector<GetPluginInfoResponse>& pluginInfos) {
- for (size_t i = 1; i < pluginInfos.size(); ++i) {
- if (pluginInfos[i].name() != pluginInfos[0].name() ||
- pluginInfos[i].vendor_version() !=
- pluginInfos[0].vendor_version()) {
- LOG(WARNING) << "Inconsistent plugin services. Please check with "
- "the plugin vendor to ensure compatibility.";
- }
- }
-
- return Nothing();
- });
- }))
- // Get the controller capabilities.
- .then(process::defer(self(), [this]() -> Future<Nothing> {
- if (!services.contains(CONTROLLER_SERVICE)) {
- controllerCapabilities = ControllerCapabilities();
- return Nothing();
- }
-
- return call<CONTROLLER_GET_CAPABILITIES>(
- CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest())
- .then(process::defer(self(), [this](
- const ControllerGetCapabilitiesResponse& response) {
- controllerCapabilities = response.capabilities();
- return Nothing();
- }));
- }))
- // Get the node capabilities and ID.
- .then(process::defer(self(), [this]() -> Future<Nothing> {
- if (!services.contains(NODE_SERVICE)) {
- nodeCapabilities = NodeCapabilities();
- return Nothing();
- }
-
- return call<NODE_GET_CAPABILITIES>(
- NODE_SERVICE, NodeGetCapabilitiesRequest())
- .then(process::defer(self(), [this](
- const NodeGetCapabilitiesResponse& response) -> Future<Nothing> {
- nodeCapabilities = response.capabilities();
-
- if (controllerCapabilities->publishUnpublishVolume) {
- return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest())
- .then(process::defer(self(), [this](
- const NodeGetIdResponse& response) {
- nodeId = response.node_id();
- return Nothing();
- }));
- }
-
- return Nothing();
- }));
- }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::publishVolume(
- const string& volumeId)
-{
- if (!volumes.contains(volumeId)) {
- return Failure("Cannot publish unknown volume '" + volumeId + "'");
- }
-
- VolumeData& volume = volumes.at(volumeId);
-
- LOG(INFO) << "Publishing volume '" << volumeId << "' in "
- << volume.state.state() << " state";
-
- // Volume publishing is serialized with other operations on the same volume to
- // avoid races.
- return volume.sequence->add(std::function<Future<Nothing>()>(
- process::defer(self(), &Self::_publishVolume, volumeId)));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::_attachVolume(
- const string& volumeId)
-{
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
-
- if (volumeState.state() == VolumeState::NODE_READY) {
- return Nothing();
- }
-
- if (volumeState.state() != VolumeState::CREATED &&
- volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
- volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
- return Failure(
- "Cannot attach volume '" + volumeId + "' in " +
- stringify(volumeState.state()) + " state");
- }
-
- if (!controllerCapabilities->publishUnpublishVolume) {
- // Since this is a no-op, no need to checkpoint here.
- volumeState.set_state(VolumeState::NODE_READY);
- return Nothing();
- }
-
- // A previously failed `ControllerUnpublishVolume` call can be recovered
- // through an extra `ControllerUnpublishVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerunpublishvolume // NOLINT
- if (volumeState.state() == VolumeState::CONTROLLER_UNPUBLISH) {
- // Retry after recovering the volume to `CREATED` state.
- return _detachVolume(volumeId)
- .then(process::defer(self(), &Self::_attachVolume, volumeId));
- }
-
- if (volumeState.state() == VolumeState::CREATED) {
- volumeState.set_state(VolumeState::CONTROLLER_PUBLISH);
- checkpointVolumeState(volumeId);
- }
-
- LOG(INFO)
- << "Calling '/csi.v0.Controller/ControllerPublishVolume' for volume '"
- << volumeId << "'";
-
- ControllerPublishVolumeRequest request;
- request.set_volume_id(volumeId);
- request.set_node_id(CHECK_NOTNONE(nodeId));
- *request.mutable_volume_capability() =
- csi::v0::evolve(volumeState.volume_capability());
- request.set_readonly(false);
- *request.mutable_volume_attributes() = volumeState.volume_attributes();
-
- return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, std::move(request))
- .then(process::defer(self(), [this, volumeId](
- const ControllerPublishVolumeResponse& response) {
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
- volumeState.set_state(VolumeState::NODE_READY);
- *volumeState.mutable_publish_info() = response.publish_info();
-
- checkpointVolumeState(volumeId);
-
- return Nothing();
- }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::_detachVolume(
- const string& volumeId)
-{
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
-
- if (volumeState.state() == VolumeState::CREATED) {
- return Nothing();
- }
-
- if (volumeState.state() != VolumeState::NODE_READY &&
- volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
- volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
- // Retry after transitioning the volume to `CREATED` state.
- return _unpublishVolume(volumeId)
- .then(process::defer(self(), &Self::_detachVolume, volumeId));
- }
-
- if (!controllerCapabilities->publishUnpublishVolume) {
- // Since this is a no-op, no need to checkpoint here.
- volumeState.set_state(VolumeState::CREATED);
- return Nothing();
- }
-
- // A previously failed `ControllerPublishVolume` call can be recovered through
- // the current `ControllerUnpublishVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
- if (volumeState.state() == VolumeState::NODE_READY ||
- volumeState.state() == VolumeState::CONTROLLER_PUBLISH) {
- volumeState.set_state(VolumeState::CONTROLLER_UNPUBLISH);
- checkpointVolumeState(volumeId);
- }
-
- LOG(INFO)
- << "Calling '/csi.v0.Controller/ControllerUnpublishVolume' for volume '"
- << volumeId << "'";
-
- ControllerUnpublishVolumeRequest request;
- request.set_volume_id(volumeId);
- request.set_node_id(CHECK_NOTNONE(nodeId));
-
- return call<CONTROLLER_UNPUBLISH_VOLUME>(
- CONTROLLER_SERVICE, std::move(request))
- .then(process::defer(self(), [this, volumeId] {
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
- volumeState.set_state(VolumeState::CREATED);
- volumeState.mutable_publish_info()->clear();
-
- checkpointVolumeState(volumeId);
-
- return Nothing();
- }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::_publishVolume(
- const string& volumeId)
-{
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
-
- if (volumeState.state() == VolumeState::PUBLISHED) {
- CHECK(volumeState.node_publish_required());
- return Nothing();
- }
-
- if (volumeState.state() != VolumeState::VOL_READY &&
- volumeState.state() != VolumeState::NODE_PUBLISH &&
- volumeState.state() != VolumeState::NODE_UNPUBLISH) {
- // Retry after transitioning the volume to `VOL_READY` state.
- return __publishVolume(volumeId)
- .then(process::defer(self(), &Self::_publishVolume, volumeId));
- }
-
- // A previously failed `NodeUnpublishVolume` call can be recovered through an
- // extra `NodeUnpublishVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunpublishvolume // NOLINT
- if (volumeState.state() == VolumeState::NODE_UNPUBLISH) {
- // Retry after recovering the volume to `VOL_READY` state.
- return __unpublishVolume(volumeId)
- .then(process::defer(self(), &Self::_publishVolume, volumeId));
- }
-
- const string targetPath = paths::getMountTargetPath(
- paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
- volumeId);
-
- // NOTE: The target path will be cleaned up during volume removal.
- Try<Nothing> mkdir = os::mkdir(targetPath);
- if (mkdir.isError()) {
- return Failure(
- "Failed to create mount target path '" + targetPath +
- "': " + mkdir.error());
- }
-
- if (volumeState.state() == VolumeState::VOL_READY) {
- volumeState.set_state(VolumeState::NODE_PUBLISH);
- checkpointVolumeState(volumeId);
- }
-
- LOG(INFO) << "Calling '/csi.v0.Node/NodePublishVolume' for volume '"
- << volumeId << "'";
-
- NodePublishVolumeRequest request;
- request.set_volume_id(volumeId);
- *request.mutable_publish_info() = volumeState.publish_info();
- request.set_target_path(targetPath);
- *request.mutable_volume_capability() =
- csi::v0::evolve(volumeState.volume_capability());
- request.set_readonly(false);
- *request.mutable_volume_attributes() = volumeState.volume_attributes();
-
- if (nodeCapabilities->stageUnstageVolume) {
- const string stagingPath = paths::getMountStagingPath(
- paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
- volumeId);
-
- CHECK(os::exists(stagingPath));
- request.set_staging_target_path(stagingPath);
- }
-
- return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
- .then(defer(self(), [this, volumeId, targetPath] {
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
-
- volumeState.set_state(VolumeState::PUBLISHED);
-
- // NOTE: This is the first time a container is going to consume the
- // persistent volume, so the `node_publish_required` field is set to
- // indicate that this volume must remain published so it can be
- // synchronously cleaned up when the persistent volume is destroyed.
- volumeState.set_node_publish_required(true);
-
- checkpointVolumeState(volumeId);
-
- return Nothing();
- }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::__publishVolume(
- const string& volumeId)
-{
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
-
- if (volumeState.state() == VolumeState::VOL_READY) {
- CHECK(!volumeState.boot_id().empty());
- return Nothing();
- }
-
- if (volumeState.state() != VolumeState::NODE_READY &&
- volumeState.state() != VolumeState::NODE_STAGE &&
- volumeState.state() != VolumeState::NODE_UNSTAGE) {
- // Retry after transitioning the volume to `NODE_READY` state.
- return _attachVolume(volumeId)
- .then(process::defer(self(), &Self::__publishVolume, volumeId));
- }
-
- if (!nodeCapabilities->stageUnstageVolume) {
- // Since this is a no-op, no need to checkpoint here.
- volumeState.set_state(VolumeState::VOL_READY);
- volumeState.set_boot_id(CHECK_NOTNONE(bootId));
- return Nothing();
- }
-
- // A previously failed `NodeUnstageVolume` call can be recovered through an
- // extra `NodeUnstageVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunstagevolume // NOLINT
- if (volumeState.state() == VolumeState::NODE_UNSTAGE) {
- // Retry after recovering the volume to `NODE_READY` state.
- return _unpublishVolume(volumeId)
- .then(process::defer(self(), &Self::__publishVolume, volumeId));
- }
-
- const string stagingPath = paths::getMountStagingPath(
- paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
- volumeId);
-
- // NOTE: The staging path will be cleaned up in during volume removal.
- Try<Nothing> mkdir = os::mkdir(stagingPath);
- if (mkdir.isError()) {
- return Failure(
- "Failed to create mount staging path '" + stagingPath +
- "': " + mkdir.error());
- }
-
- if (volumeState.state() == VolumeState::NODE_READY) {
- volumeState.set_state(VolumeState::NODE_STAGE);
- checkpointVolumeState(volumeId);
- }
-
- LOG(INFO) << "Calling '/csi.v0.Node/NodeStageVolume' for volume '" << volumeId
- << "'";
-
- NodeStageVolumeRequest request;
- request.set_volume_id(volumeId);
- *request.mutable_publish_info() = volumeState.publish_info();
- request.set_staging_target_path(stagingPath);
- *request.mutable_volume_capability() =
- csi::v0::evolve(volumeState.volume_capability());
- *request.mutable_volume_attributes() = volumeState.volume_attributes();
-
- return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request))
- .then(process::defer(self(), [this, volumeId] {
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
- volumeState.set_state(VolumeState::VOL_READY);
- volumeState.set_boot_id(CHECK_NOTNONE(bootId));
-
- checkpointVolumeState(volumeId);
-
- return Nothing();
- }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::_unpublishVolume(
- const string& volumeId)
-{
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
-
- if (volumeState.state() == VolumeState::NODE_READY) {
- CHECK(volumeState.boot_id().empty());
- return Nothing();
- }
-
- if (volumeState.state() != VolumeState::VOL_READY &&
- volumeState.state() != VolumeState::NODE_STAGE &&
- volumeState.state() != VolumeState::NODE_UNSTAGE) {
- // Retry after transitioning the volume to `VOL_READY` state.
- return __unpublishVolume(volumeId)
- .then(process::defer(self(), &Self::_unpublishVolume, volumeId));
- }
-
- if (!nodeCapabilities->stageUnstageVolume) {
- // Since this is a no-op, no need to checkpoint here.
- volumeState.set_state(VolumeState::NODE_READY);
- volumeState.clear_boot_id();
- return Nothing();
- }
-
- // A previously failed `NodeStageVolume` call can be recovered through the
- // current `NodeUnstageVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
- if (volumeState.state() == VolumeState::VOL_READY ||
- volumeState.state() == VolumeState::NODE_STAGE) {
- volumeState.set_state(VolumeState::NODE_UNSTAGE);
- checkpointVolumeState(volumeId);
- }
-
- const string stagingPath = paths::getMountStagingPath(
- paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
- volumeId);
-
- CHECK(os::exists(stagingPath));
-
- LOG(INFO) << "Calling '/csi.v0.Node/NodeUnstageVolume' for volume '"
- << volumeId << "'";
-
- NodeUnstageVolumeRequest request;
- request.set_volume_id(volumeId);
- request.set_staging_target_path(stagingPath);
-
- return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request))
- .then(process::defer(self(), [this, volumeId] {
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
- volumeState.set_state(VolumeState::NODE_READY);
- volumeState.clear_boot_id();
-
- checkpointVolumeState(volumeId);
-
- return Nothing();
- }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::__unpublishVolume(
- const string& volumeId)
-{
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
-
- if (volumeState.state() == VolumeState::VOL_READY) {
- return Nothing();
- }
-
- if (volumeState.state() != VolumeState::PUBLISHED &&
- volumeState.state() != VolumeState::NODE_PUBLISH &&
- volumeState.state() != VolumeState::NODE_UNPUBLISH) {
- return Failure(
- "Cannot unpublish volume '" + volumeId + "' in " +
- stringify(volumeState.state()) + "state");
- }
-
- // A previously failed `NodePublishVolume` call can be recovered through the
- // current `NodeUnpublishVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
- if (volumeState.state() == VolumeState::PUBLISHED ||
- volumeState.state() == VolumeState::NODE_PUBLISH) {
- volumeState.set_state(VolumeState::NODE_UNPUBLISH);
- checkpointVolumeState(volumeId);
- }
-
- const string targetPath = paths::getMountTargetPath(
- paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
- volumeId);
-
- CHECK(os::exists(targetPath));
-
- LOG(INFO) << "Calling '/csi.v0.Node/NodeUnpublishVolume' for volume '"
- << volumeId << "'";
-
- NodeUnpublishVolumeRequest request;
- request.set_volume_id(volumeId);
- request.set_target_path(targetPath);
-
- return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
- .then(process::defer(self(), [this, volumeId] {
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
- volumeState.set_state(VolumeState::VOL_READY);
-
- checkpointVolumeState(volumeId);
-
- return Nothing();
- }));
-}
-
-
-Future<VolumeInfo> StorageLocalResourceProviderProcess::createVolume(
- const string& name,
- const Bytes& capacity,
- const types::VolumeCapability& capability,
- const Map<string, string>& parameters)
-{
- if (!controllerCapabilities->createDeleteVolume) {
- return Failure(
- "CREATE_DELETE_VOLUME controller capability is not supported for CSI "
- "plugin type '" + info.type() + "' and name '" + info.name());
- }
-
- LOG(INFO) << "Creating volume with name '" << name << "'";
-
- CreateVolumeRequest request;
- request.set_name(name);
- request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
- request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
- *request.add_volume_capabilities() = csi::v0::evolve(capability);
- *request.mutable_parameters() = parameters;
-
- // We retry the `CreateVolume` call for MESOS-9517.
- return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
- .then(process::defer(self(), [=](
- const CreateVolumeResponse& response) -> Future<VolumeInfo> {
- const string& volumeId = response.volume().id();
-
- // NOTE: If the volume is already tracked, there might already be
- // operations running in its sequence. Since this continuation runs
- // outside the sequence, we fail the call here to avoid any race issue.
- // This also means that this call is not idempotent.
- if (volumes.contains(volumeId)) {
- return Failure("Volume with name '" + name + "' already exists");
- }
-
- VolumeState volumeState;
- volumeState.set_state(VolumeState::CREATED);
- *volumeState.mutable_volume_capability() = capability;
- *volumeState.mutable_parameters() = parameters;
- *volumeState.mutable_volume_attributes() = response.volume().attributes();
-
- volumes.put(volumeId, std::move(volumeState));
- checkpointVolumeState(volumeId);
-
- return VolumeInfo{capacity, volumeId, response.volume().attributes()};
- }));
-}
-
-
-Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
- const string& volumeId)
-{
- if (!volumes.contains(volumeId)) {
- return __deleteVolume(volumeId);
- }
-
- VolumeData& volume = volumes.at(volumeId);
-
- LOG(INFO) << "Deleting volume '" << volumeId << "' in "
- << volume.state.state() << " state";
-
- // Volume deletion is sequentialized with other operations on the same volume
- // to avoid races.
- return volume.sequence->add(std::function<Future<bool>()>(
- process::defer(self(), &Self::_deleteVolume, volumeId)));
-}
-
-
-Future<bool> StorageLocalResourceProviderProcess::_deleteVolume(
- const std::string& volumeId)
-{
- CHECK(volumes.contains(volumeId));
- VolumeState& volumeState = volumes.at(volumeId).state;
-
- if (volumeState.node_publish_required()) {
- CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
-
- const string targetPath = paths::getMountTargetPath(
- paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
- volumeId);
-
- // NOTE: Normally the volume should have been cleaned up. However this may
- // not be true for preprovisioned volumes (e.g., leftover from a previous
- // resource provider instance). To prevent data leakage in such cases, we
- // clean up the data (but not the target path) here.
- Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
- if (rmdir.isError()) {
- return Failure(
- "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
- }
-
- volumeState.set_node_publish_required(false);
- checkpointVolumeState(volumeId);
- }
-
- if (volumeState.state() != VolumeState::CREATED) {
- // Retry after transitioning the volume to `CREATED` state.
- return _detachVolume(volumeId)
- .then(process::defer(self(), &Self::_deleteVolume, volumeId));
- }
-
- // NOTE: The last asynchronous continuation, which is supposed to be run in
- // the volume's sequence, would cause the sequence to be destructed, which
- // would in turn discard the returned future. However, since the continuation
- // would have already been run, the returned future will become ready, making
- // the future returned by the sequence ready as well.
- return __deleteVolume(volumeId)
- .then(process::defer(self(), [this, volumeId](bool deleted) {
- volumes.erase(volumeId);
-
- const string volumePath = paths::getVolumePath(
- rootDir, pluginInfo.type(), pluginInfo.name(), volumeId);
-
- Try<Nothing> rmdir = os::rmdir(volumePath);
- CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
- << volumePath << "': " << rmdir.error();
-
- garbageCollectMountPath(volumeId);
-
- return deleted;
- }));
-}
-
-
-Future<bool> StorageLocalResourceProviderProcess::__deleteVolume(
- const string& volumeId)
-{
- if (!controllerCapabilities->createDeleteVolume) {
- return false;
- }
-
- LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '"
- << volumeId << "'";
-
- DeleteVolumeRequest request;
- request.set_volume_id(volumeId);
-
- // We retry the `DeleteVolume` call for MESOS-9517.
- return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
- .then([] { return true; });
-}
-
-
-Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume(
- const VolumeInfo& volumeInfo,
- const types::VolumeCapability& capability,
- const Map<string, string>& parameters)
-{
- // If the volume has been checkpointed, the validation succeeds only if the
- // capability and parameters of the specified profile are the same as those in
- // the checkpoint.
- if (volumes.contains(volumeInfo.id)) {
- const VolumeState& volumeState = volumes.at(volumeInfo.id).state;
-
- if (volumeState.volume_capability() != capability) {
- return Some(
- Error("Mismatched capability for volume '" + volumeInfo.id + "'"));
- }
-
- if (volumeState.parameters() != parameters) {
- return Some(
- Error("Mismatched parameters for volume '" + volumeInfo.id + "'"));
- }
-
- return None();
- }
-
- if (!parameters.empty()) {
- LOG(WARNING)
- << "Validating volumes against parameters is not supported in CSI v0";
- }
-
- LOG(INFO) << "Validating volume '" << volumeInfo.id << "'";
-
- ValidateVolumeCapabilitiesRequest request;
- request.set_volume_id(volumeInfo.id);
- *request.add_volume_capabilities() = csi::v0::evolve(capability);
- *request.mutable_volume_attributes() = volumeInfo.context;
-
- return call<VALIDATE_VOLUME_CAPABILITIES>(
- CONTROLLER_SERVICE, std::move(request))
- .then(process::defer(self(), [=](
- const ValidateVolumeCapabilitiesResponse& response)
- -> Future<Option<Error>> {
- if (!response.supported()) {
- return Error(
- "Unsupported volume capability for volume '" + volumeInfo.id +
- "': " + response.message());
- }
-
- // NOTE: If the volume is already tracked, there might already be
- // operations running in its sequence. Since this continuation runs
- // outside the sequence, we fail the call here to avoid any race issue.
- // This also means that this call is not idempotent.
- if (volumes.contains(volumeInfo.id)) {
- return Failure("Volume '" + volumeInfo.id + "' already validated");
- }
-
- VolumeState volumeState;
- volumeState.set_state(VolumeState::CREATED);
- *volumeState.mutable_volume_capability() = capability;
- *volumeState.mutable_parameters() = parameters;
- *volumeState.mutable_volume_attributes() = volumeInfo.context;
-
- volumes.put(volumeInfo.id, std::move(volumeState));
- checkpointVolumeState(volumeInfo.id);
-
- return None();
- }));
-}
-
-
-Future<vector<VolumeInfo>> StorageLocalResourceProviderProcess::listVolumes()
-{
- if (!controllerCapabilities->listVolumes) {
- return vector<VolumeInfo>();
- }
-
- // TODO(chhsiao): Set the max entries and use a loop to do multiple
- // `ListVolumes` calls.
- return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest())
- .then(process::defer(self(), [](const ListVolumesResponse& response) {
- vector<VolumeInfo> result;
- foreach (const auto& entry, response.entries()) {
- result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()),
- entry.volume().id(),
- entry.volume().attributes()});
- }
-
- return result;
- }));
-}
-
-
-Future<Bytes> StorageLocalResourceProviderProcess::getCapacity(
- const types::VolumeCapability& capability,
- const Map<string, string>& parameters)
-{
- if (!controllerCapabilities->getCapacity) {
- return Bytes(0);
- }
-
- GetCapacityRequest request;
- *request.add_volume_capabilities() = csi::v0::evolve(capability);
- *request.mutable_parameters() = parameters;
-
- return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request))
- .then([](const GetCapacityResponse& response) {
- return Bytes(response.available_capacity());
- });
-}
-
-
Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
const id::UUID& operationUuid)
{
@@ -2519,7 +1487,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
// afterward. See MESOS-9254.
Future<VolumeInfo> created;
if (resource.disk().source().has_profile()) {
- created = createVolume(
+ created = volumeManager->createVolume(
operationUuid.toString(),
resource.scalar().value() * Bytes::MEGABYTES,
profileInfo.capability,
@@ -2532,7 +1500,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
resource.disk().source().metadata()))
};
- created = validateVolume(
+ created = volumeManager->validateVolume(
volumeInfo, profileInfo.capability, profileInfo.parameters)
.then([resource, profile, volumeInfo](
const Option<Error>& error) -> Future<VolumeInfo> {
@@ -2596,7 +1564,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
CHECK(!Resources::isPersistentVolume(resource));
CHECK(resource.disk().source().has_id());
- return deleteVolume(resource.disk().source().id())
+ return volumeManager->deleteVolume(resource.disk().source().id())
.then(defer(self(), [=](bool deprovisioned) {
Resource converted = resource;
converted.mutable_disk()->mutable_source()->set_type(
@@ -2669,6 +1637,7 @@ StorageLocalResourceProviderProcess::applyCreate(
foreach (const Resource& resource, operation.create().volumes()) {
CHECK(Resources::isPersistentVolume(resource));
+ CHECK(resource.disk().source().has_id());
// TODO(chhsiao): Support persistent BLOCK volumes.
if (resource.disk().source().type() != Resource::DiskInfo::Source::MOUNT) {
@@ -2677,6 +1646,12 @@ StorageLocalResourceProviderProcess::applyCreate(
stringify(resource.disk().persistence().id()) + "' on a " +
stringify(resource.disk().source().type()) + " disk");
}
+
+ // TODO(chhsiao): Ideally, we could perform a sanity check to verify that
+ // the target path is empty before creating a new persistent volume.
+ // However, right now we cannot distinguish the case where a framework is
+ // recreating its own persistent volume after the agent ID changes from the
+ // case where existing data is being leaked to another framework.
}
return getResourceConversions(operation);
@@ -2690,43 +1665,32 @@ StorageLocalResourceProviderProcess::applyDestroy(
CHECK(operation.has_destroy());
foreach (const Resource& resource, operation.destroy().volumes()) {
- // TODO(chhsiao): Support cleaning up persistent BLOCK volumes, presumably
- // with `dd` or any other utility to zero out the block device.
CHECK(Resources::isPersistentVolume(resource));
- CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT);
CHECK(resource.disk().source().has_id());
- const string& volumeId = resource.disk().source().id();
- CHECK(volumes.contains(volumeId));
-
- const VolumeState& volumeState = volumes.at(volumeId).state;
-
- // NOTE: Data can only be written to the persistent volume when when it is
- // in `PUBLISHED` state (i.e., mounted). Once a volume has been transitioned
- // to `PUBLISHED`, we will set the `node_publish_required` field and always
- // recover it back to `PUBLISHED` after a failover, until a `DESTROY_DISK`
- // is applied, which only comes after `DESTROY`. So we only need to clean up
- // the volume if it has the field set.
- if (!volumeState.node_publish_required()) {
- continue;
- }
-
- CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
+ // TODO(chhsiao): Support cleaning up persistent BLOCK volumes, presumably
+ // with `dd` or any other utility to zero out the block device.
+ CHECK_EQ(Resource::DiskInfo::Source::MOUNT,
+ resource.disk().source().type());
const string targetPath = csi::paths::getMountTargetPath(
csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name()),
- volumeId);
-
- // Only the data in the target path, but not itself, should be removed.
- Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
- if (rmdir.isError()) {
- return Error(
- "Failed to remove persistent volume '" +
- stringify(resource.disk().persistence().id()) + "' at '" +
- targetPath + "': " + rmdir.error());
+ resource.disk().source().id());
+
+ if (os::exists(targetPath)) {
+ // NOTE: We always clean up the data in the target path (but not the
+ // directory itself) even if the volume is not published, in which case
+ // this should be a no-op.
+ Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
+ if (rmdir.isError()) {
+ return Error(
+ "Failed to remove persistent volume '" +
+ stringify(resource.disk().persistence().id()) + "' at '" +
+ targetPath + "': " + rmdir.error());
+ }
}
}
@@ -2854,28 +1818,6 @@ void StorageLocalResourceProviderProcess::garbageCollectOperationPath(
}
-void StorageLocalResourceProviderProcess::garbageCollectMountPath(
- const string& volumeId)
-{
- CHECK(!volumes.contains(volumeId));
-
- const string path = csi::paths::getMountPath(
- csi::paths::getMountRootDir(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name()),
- volumeId);
-
- if (os::exists(path)) {
- Try<Nothing> rmdir = os::rmdir(path);
- if (rmdir.isError()) {
- LOG(ERROR)
- << "Failed to remove directory '" << path << "': " << rmdir.error();
- }
- }
-}
-
-
void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
{
ResourceProviderState state;
@@ -2925,26 +1867,6 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
}
-void StorageLocalResourceProviderProcess::checkpointVolumeState(
- const string& volumeId)
-{
- const string statePath = csi::paths::getVolumeStatePath(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name(),
- volumeId);
-
- // NOTE: We ensure the checkpoint is synced to the filesystem to avoid
- // resulting in a stale or empty checkpoint when a system crash happens.
- Try<Nothing> checkpoint =
- slave::state::checkpoint(statePath, volumes.at(volumeId).state, true);
-
- CHECK_SOME(checkpoint)
- << "Failed to checkpoint volume state to '" << statePath << "':"
- << checkpoint.error();
-}
-
-
void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
{
Call call;
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
index 56d3682..6e14d90 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -19,7 +19,6 @@
#include <memory>
#include <string>
-#include <type_traits>
#include <vector>
#include <mesos/mesos.hpp>
@@ -32,9 +31,7 @@
#include <mesos/v1/resource_provider.hpp>
#include <process/future.hpp>
-#include <process/grpc.hpp>
#include <process/http.hpp>
-#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/sequence.hpp>
@@ -42,8 +39,6 @@
#include <process/metrics/counter.hpp>
#include <process/metrics/push_gauge.hpp>
-#include <stout/bytes.hpp>
-#include <stout/duration.hpp>
#include <stout/hashset.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/nothing.hpp>
@@ -52,10 +47,6 @@
#include <stout/uuid.hpp>
#include "csi/metrics.hpp"
-#include "csi/rpc.hpp"
-#include "csi/service_manager.hpp"
-#include "csi/state.hpp"
-#include "csi/utils.hpp"
#include "csi/volume_manager.hpp"
#include "status_update_manager/operation.hpp"
@@ -63,18 +54,6 @@
namespace mesos {
namespace internal {
-// Storage local resource provider initially picks a random amount of time
-// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
-// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
-// retries are exponentially backed off based on this interval (e.g., 2nd retry
-// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
-// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
-//
-// TODO(chhsiao): Make the retry parameters configurable.
-constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
-constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
-
-
class StorageLocalResourceProviderProcess
: public process::Process<StorageLocalResourceProviderProcess>
{
@@ -97,57 +76,11 @@ public:
void disconnected();
void received(const resource_provider::Event& event);
- // Wrapper functions to make CSI calls and update RPC metrics. Made public for
- // testing purpose.
- //
- // The call is made asynchronously and thus no guarantee is provided on the
- // order in which calls are sent. Callers need to either ensure to not have
- // multiple conflicting calls in flight, or treat results idempotently.
- //
- // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
- // calls on the same volume, and 2) no profile update while there are ongoing
- // `CREATE_DISK` or `DESTROY_DISK` operations.
- //
- // NOTE: Since this function uses `getService` to obtain the latest service
- // future, which depends on probe results, it is disabled for making probe
- // calls; `_call` should be used directly instead.
- template <
- csi::v0::RPC rpc,
- typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
- process::Future<csi::v0::Response<rpc>> call(
- const csi::Service& service,
- const csi::v0::Request<rpc>& request,
- bool retry = false);
-
- template <csi::v0::RPC rpc>
- process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>>
- _call(const std::string& endpoint, const csi::v0::Request<rpc>& request);
-
- template <csi::v0::RPC rpc>
- process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call(
- const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result,
- const Option<Duration>& backoff);
-
private:
- struct VolumeData
- {
- VolumeData(csi::state::VolumeState&& _state)
- : state(_state), sequence(new process::Sequence("volume-sequence")) {}
-
- csi::state::VolumeState state;
-
- // We run all CSI operations for the same volume on a sequence to
- // ensure that they are processed in a sequential order.
- process::Owned<process::Sequence> sequence;
- };
-
void initialize() override;
void fatal();
- // The recover functions are responsible to recover the state of the
- // resource provider and CSI volumes from checkpointed data.
process::Future<Nothing> recover();
- process::Future<Nothing> recoverVolumes();
void doReliableRegistration();
@@ -192,73 +125,6 @@ private:
void reconcileOperations(
const resource_provider::Event::ReconcileOperations& reconcile);
- process::Future<Nothing> prepareServices();
-
- process::Future<Nothing> publishVolume(const std::string& volumeId);
-
- // The following methods are used to manage volume lifecycles. Transient
- // states are omitted.
- //
- // +------------+
- // + + + | CREATED | ^
- // _attachVolume | | | +---+----^---+ |
- // | | | | | | _detachVolume
- // | | | +---v----+---+ |
- // v + + | NODE_READY | + ^
- // | | +---+----^---+ | |
- // __publishVolume | | | | | | _unpublishVolume
- // | | +---v----+---+ | |
- // v + | VOL_READY | + + ^
- // | +---+----^---+ | | |
- // _publishVolume | | | | | | __unpublishVolume
- // | +---v----+---+ | | |
- // V | PUBLISHED | + + +
- // +------------+
-
- // Transition a volume to `NODE_READY` state from any state above.
- process::Future<Nothing> _attachVolume(const std::string& volumeId);
-
- // Transition a volume to `CREATED` state from any state below.
- process::Future<Nothing> _detachVolume(const std::string& volumeId);
-
- // Transition a volume to `PUBLISHED` state from any state above.
- process::Future<Nothing> _publishVolume(const std::string& volumeId);
-
- // Transition a volume to `VOL_READY` state from any state above.
- process::Future<Nothing> __publishVolume(const std::string& volumeId);
-
- // Transition a volume to `NODE_READY` state from any state below.
- process::Future<Nothing> _unpublishVolume(const std::string& volumeId);
-
- // Transition a volume to `VOL_READY` state from any state below.
- process::Future<Nothing> __unpublishVolume(const std::string& volumeId);
-
- // NOTE: This can only be called after `prepareServices`.
- process::Future<csi::VolumeInfo> createVolume(
- const std::string& name,
- const Bytes& capacity,
- const csi::types::VolumeCapability& capability,
- const google::protobuf::Map<std::string, std::string>& parameters);
-
- // NOTE: This can only be called after `prepareServices`.
- process::Future<bool> deleteVolume(const std::string& volumeId);
- process::Future<bool> _deleteVolume(const std::string& volumeId);
- process::Future<bool> __deleteVolume(const std::string& volumeId);
-
- // NOTE: This can only be called after `prepareServices`.
- process::Future<Option<Error>> validateVolume(
- const csi::VolumeInfo& volumeInfo,
- const csi::types::VolumeCapability& capability,
- const google::protobuf::Map<std::string, std::string>& parameters);
-
- // NOTE: This can only be called after `prepareServices`.
- process::Future<std::vector<csi::VolumeInfo>> listVolumes();
-
- // NOTE: This can only be called after `prepareServices`.
- process::Future<Bytes> getCapacity(
- const csi::types::VolumeCapability& capability,
- const google::protobuf::Map<std::string, std::string>& parameters);
-
// Applies the operation. Speculative operations will be synchronously
// applied. Do nothing if the operation is already in a terminal state.
process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
@@ -295,10 +161,8 @@ private:
const Try<std::vector<ResourceConversion>>& conversions);
void garbageCollectOperationPath(const id::UUID& operationUuid);
- void garbageCollectMountPath(const std::string& volumeId);
void checkpointResourceProviderState();
- void checkpointVolumeState(const std::string& volumeId);
void sendResourceProviderStateUpdate();
@@ -326,25 +190,13 @@ private:
std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
- process::grpc::client::Runtime runtime;
process::Owned<v1::resource_provider::Driver> driver;
OperationStatusUpdateManager statusUpdateManager;
// The mapping of known profiles fetched from the DiskProfileAdaptor.
hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
- process::Owned<csi::ServiceManager> serviceManager;
-
- // TODO(chhsiao): Remove the following variables after refactoring.
- std::string rootDir;
- CSIPluginInfo pluginInfo;
- hashset<csi::Service> services;
-
- Option<std::string> bootId;
- Option<csi::v0::PluginCapabilities> pluginCapabilities;
- Option<csi::v0::ControllerCapabilities> controllerCapabilities;
- Option<csi::v0::NodeCapabilities> nodeCapabilities;
- Option<std::string> nodeId;
+ process::Owned<csi::VolumeManager> volumeManager;
// We maintain the following invariant: if one operation depends on
// another, they cannot be in PENDING state at the same time, i.e.,
@@ -356,7 +208,6 @@ private:
LinkedHashMap<id::UUID, Operation> operations;
Resources totalResources;
id::UUID resourceVersion;
- hashmap<std::string, VolumeData> volumes;
// If pending, it means that the storage pools are being reconciled, and all
// incoming operations that disallow reconciliation will be dropped.
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 0fbd602..bb71935 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -43,6 +43,7 @@
#include "csi/paths.hpp"
#include "csi/rpc.hpp"
#include "csi/state.hpp"
+#include "csi/v0_volume_manager_process.hpp"
#include "linux/fs.hpp"
@@ -50,8 +51,6 @@
#include "module/manager.hpp"
-#include "resource_provider/storage/provider_process.hpp"
-
#include "slave/container_daemon_process.hpp"
#include "slave/paths.hpp"
#include "slave/state.hpp"
@@ -5054,7 +5053,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
ASSERT_EQ(0u, createVolumeRequests.size());
Future<Nothing> createVolumeCall = FUTURE_DISPATCH(
- _, &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>);
+ _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>);
// Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call.
createVolumeResults.put(
@@ -5062,7 +5061,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
AWAIT_READY(createVolumeCall);
- Duration createVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+ Duration createVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
// Settle the clock to ensure that the retry timer has been set, then advance
// the clock by the maximum backoff to trigger a retry.
@@ -5080,15 +5079,14 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
ASSERT_EQ(0u, createVolumeRequests.size());
createVolumeCall = FUTURE_DISPATCH(
- _,
- &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>);
+ _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>);
createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
AWAIT_READY(createVolumeCall);
- createVolumeBackoff =
- std::min(createVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+ createVolumeBackoff = std::min(
+ createVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX);
// Settle the clock to ensure that the retry timer has been set, then
// advance the clock by the maximum backoff to trigger a retry.
@@ -5142,7 +5140,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
ASSERT_EQ(0u, deleteVolumeRequests.size());
Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH(
- _, &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>);
+ _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>);
// Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call.
deleteVolumeResults.put(
@@ -5150,7 +5148,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
AWAIT_READY(deleteVolumeCall);
- Duration deleteVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+ Duration deleteVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
// Settle the clock to ensure that the retry timer has been set, then advance
// the clock by the maximum backoff to trigger a retry.
@@ -5168,15 +5166,14 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
ASSERT_EQ(0u, deleteVolumeRequests.size());
deleteVolumeCall = FUTURE_DISPATCH(
- _,
- &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>);
+ _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>);
deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
AWAIT_READY(deleteVolumeCall);
- deleteVolumeBackoff =
- std::min(deleteVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+ deleteVolumeBackoff = std::min(
+ deleteVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX);
// Settle the clock to ensure that the retry timer has been set, then
// advance the clock by the maximum backoff to trigger a retry.