You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:14:35 UTC
[mesos] 03/08: Exposed `StorageLocalResourceProviderProcess` for
testing purpose.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit e9d2a2960817476513ae3c47c71f9a0b2af698fd
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Wed Jan 23 21:44:10 2019 -0800
Exposed `StorageLocalResourceProviderProcess` for testing purpose.
This patch moves the declaration of the SLRP process into an internal
header file and add a `__call` function, so a follow-up test could use
`FUTURE_DISPATCH` to capture a dispatch on an RPC retry.
To simplify the declarations, it also internalizes `RPCTraits` and
introduce new type aliases, and moves `DEFAULT_CSI_RETRY_BACKOFF_FACTOR`
and `DEFAULT_CSI_RETRY_INTERVAL_MAX` to the new header for testing.
Review: https://reviews.apache.org/r/69827
---
src/Makefile.am | 3 +-
src/csi/client.hpp | 5 +-
src/csi/rpc.hpp | 11 +
src/resource_provider/storage/provider.cpp | 602 ++++++---------------
src/resource_provider/storage/provider.hpp | 19 +-
src/resource_provider/storage/provider_process.hpp | 420 ++++++++++++++
src/tests/csi_client_tests.cpp | 2 +-
7 files changed, 608 insertions(+), 454 deletions(-)
diff --git a/src/Makefile.am b/src/Makefile.am
index b8105c9..c674163 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1499,7 +1499,8 @@ libmesos_no_3rdparty_la_SOURCES += \
resource_provider/storage/disk_profile_utils.cpp \
resource_provider/storage/disk_profile_utils.hpp \
resource_provider/storage/provider.cpp \
- resource_provider/storage/provider.hpp
+ resource_provider/storage/provider.hpp \
+ resource_provider/storage/provider_process.hpp
libmesos_no_3rdparty_la_CPPFLAGS = $(MESOS_CPPFLAGS)
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index a0dfedf..c2583cf 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -35,9 +35,8 @@ public:
: connection(_connection), runtime(_runtime) {}
template <RPC rpc>
- process::Future<
- Try<typename RPCTraits<rpc>::response_type, process::grpc::StatusError>>
- call(typename RPCTraits<rpc>::request_type request);
+ process::Future<Try<Response<rpc>, process::grpc::StatusError>> call(
+ Request<rpc> request);
private:
process::grpc::client::Connection connection;
diff --git a/src/csi/rpc.hpp b/src/csi/rpc.hpp
index c30a509..b2502ce 100644
--- a/src/csi/rpc.hpp
+++ b/src/csi/rpc.hpp
@@ -52,6 +52,8 @@ enum RPC
};
+namespace internal {
+
template <RPC>
struct RPCTraits;
@@ -191,6 +193,15 @@ struct RPCTraits<NODE_GET_CAPABILITIES>
typedef NodeGetCapabilitiesResponse response_type;
};
+} // namespace internal {
+
+
+template <RPC rpc>
+using Request = typename internal::RPCTraits<rpc>::request_type;
+
+template <RPC rpc>
+using Response = typename internal::RPCTraits<rpc>::response_type;
+
std::ostream& operator<<(std::ostream& stream, const RPC& rpc);
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 22ad0c8..09a710d 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -19,8 +19,12 @@
#include <algorithm>
#include <cctype>
#include <cstdlib>
+#include <functional>
+#include <list>
#include <memory>
#include <numeric>
+#include <queue>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -31,6 +35,8 @@
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/grpc.hpp>
#include <process/id.hpp>
#include <process/loop.hpp>
#include <process/process.hpp>
@@ -41,23 +47,28 @@
#include <process/metrics/metrics.hpp>
#include <process/metrics/push_gauge.hpp>
+#include <mesos/http.hpp>
#include <mesos/resources.hpp>
#include <mesos/type_utils.hpp>
#include <mesos/resource_provider/resource_provider.hpp>
+
#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
#include <mesos/v1/resource_provider.hpp>
+#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/linkedhashmap.hpp>
+#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/unreachable.hpp>
+#include <stout/uuid.hpp>
#include <stout/os/realpath.hpp>
@@ -77,6 +88,8 @@
#include "resource_provider/detector.hpp"
#include "resource_provider/state.hpp"
+#include "resource_provider/storage/provider_process.hpp"
+
#include "slave/container_daemon.hpp"
#include "slave/paths.hpp"
#include "slave/state.hpp"
@@ -106,7 +119,7 @@ using process::Failure;
using process::Future;
using process::loop;
using process::Owned;
-using process::Process;
+using process::ProcessBase;
using process::Promise;
using process::Sequence;
using process::spawn;
@@ -140,17 +153,6 @@ namespace internal {
// TODO(chhsiao): Make the timeout configurable.
constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
-// Storage local resource provider initially picks a random amount of time
-// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
-// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
-// retries are exponentially backed off based on this interval (e.g., 2nd retry
-// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
-// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
-//
-// TODO(chhsiao): Make the retry parameters configurable.
-constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
-constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
-
// Returns true if the string is a valid Java identifier.
static bool isValidName(const string& s)
@@ -304,273 +306,32 @@ static inline Resource createRawDiskResource(
}
-class StorageLocalResourceProviderProcess
- : public Process<StorageLocalResourceProviderProcess>
-{
-public:
- explicit StorageLocalResourceProviderProcess(
- const http::URL& _url,
- const string& _workDir,
- const ResourceProviderInfo& _info,
- const SlaveID& _slaveId,
- const Option<string>& _authToken,
- bool _strict)
- : ProcessBase(process::ID::generate("storage-local-resource-provider")),
- state(RECOVERING),
- url(_url),
- workDir(_workDir),
- metaDir(slave::paths::getMetaRootDir(_workDir)),
- contentType(ContentType::PROTOBUF),
- info(_info),
- vendor(
- info.storage().plugin().type() + "." +
- info.storage().plugin().name()),
- slaveId(_slaveId),
- authToken(_authToken),
- strict(_strict),
- resourceVersion(id::UUID::random()),
- sequence("storage-local-resource-provider-sequence"),
- metrics("resource_providers/" + info.type() + "." + info.name() + "/")
- {
- diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
- CHECK_NOTNULL(diskProfileAdaptor.get());
- }
-
- StorageLocalResourceProviderProcess(
- const StorageLocalResourceProviderProcess& other) = delete;
-
- StorageLocalResourceProviderProcess& operator=(
- const StorageLocalResourceProviderProcess& other) = delete;
-
- void connected();
- void disconnected();
- void received(const Event& event);
-
-private:
- struct VolumeData
- {
- VolumeData(VolumeState&& _state)
- : state(_state), sequence(new Sequence("volume-sequence")) {}
-
- VolumeState state;
-
- // We run all CSI operations for the same volume on a sequence to
- // ensure that they are processed in a sequential order.
- Owned<Sequence> sequence;
- };
-
- void initialize() override;
- void fatal();
-
- // The recover functions are responsible to recover the state of the
- // resource provider and CSI volumes from checkpointed data.
- Future<Nothing> recover();
- Future<Nothing> recoverServices();
- Future<Nothing> recoverVolumes();
- Future<Nothing> recoverResourceProviderState();
-
- void doReliableRegistration();
-
- // The reconcile functions are responsible to reconcile the state of
- // the resource provider from the recovered state and other sources of
- // truth, such as CSI plugin responses or the status update manager.
- Future<Nothing> reconcileResourceProviderState();
- Future<Nothing> reconcileOperationStatuses();
- ResourceConversion reconcileResources(
- const Resources& checkpointed,
- const Resources& discovered);
-
- // Spawns a loop to watch for changes in the set of known profiles and update
- // the profile mapping and storage pools accordingly.
- void watchProfiles();
-
- // Update the profile mapping when the set of known profiles changes.
- // NOTE: This function never fails. If it fails to translate a new
- // profile, the resource provider will continue to operate with the
- // set of profiles it knows about.
- Future<Nothing> updateProfiles(const hashset<string>& profiles);
-
- // Reconcile the storage pools when the set of known profiles changes,
- // or a volume with an unknown profile is destroyed.
- Future<Nothing> reconcileStoragePools();
-
- // Returns true if the storage pools are allowed to be reconciled when
- // the operation is being applied.
- static bool allowsReconciliation(const Offer::Operation& operation);
-
- // Functions for received events.
- void subscribed(const Event::Subscribed& subscribed);
- void applyOperation(const Event::ApplyOperation& operation);
- void publishResources(const Event::PublishResources& publish);
- void acknowledgeOperationStatus(
- const Event::AcknowledgeOperationStatus& acknowledge);
- void reconcileOperations(
- const Event::ReconcileOperations& reconcile);
-
- // Wrapper functions to make CSI calls and update RPC metrics.
- //
- // The call is made asynchronously and thus no guarantee is provided on the
- // order in which calls are sent. Callers need to either ensure to not have
- // multiple conflicting calls in flight, or treat results idempotently.
- //
- // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
- // calls on the same volume, and 2) no profile update while there are ongoing
- // `CREATE_DISK` or `DESTROY_DISK` operations.
- //
- // NOTE: Since this function uses `getService` to obtain the latest service
- // future, which depends on probe results, it is disabled for making probe
- // calls; `_call` should be used directly instead.
- template <
- csi::v0::RPC rpc,
- typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
- Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
- const ContainerID& containerId,
- const typename csi::v0::RPCTraits<rpc>::request_type& request,
- bool retry = false);
-
- template <csi::v0::RPC rpc>
- Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
- _call(
- csi::v0::Client client,
- const typename csi::v0::RPCTraits<rpc>::request_type& request);
-
- Future<csi::v0::Client> waitService(const string& endpoint);
- Future<csi::v0::Client> getService(const ContainerID& containerId);
- Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
- Future<Nothing> waitContainer(const ContainerID& containerId);
- Future<Nothing> killContainer(const ContainerID& containerId);
-
- Future<Nothing> prepareIdentityService();
- Future<Nothing> prepareControllerService();
- Future<Nothing> prepareNodeService();
- Future<Nothing> controllerPublish(const string& volumeId);
- Future<Nothing> controllerUnpublish(const string& volumeId);
- Future<Nothing> nodeStage(const string& volumeId);
- Future<Nothing> nodeUnstage(const string& volumeId);
- Future<Nothing> nodePublish(const string& volumeId);
- Future<Nothing> nodeUnpublish(const string& volumeId);
- Future<string> createVolume(
- const string& name,
- const Bytes& capacity,
- const DiskProfileAdaptor::ProfileInfo& profileInfo);
- Future<bool> deleteVolume(const string& volumeId);
- Future<Nothing> validateVolume(
- const string& volumeId,
- const Option<Labels>& metadata,
- const DiskProfileAdaptor::ProfileInfo& profileInfo);
- Future<Resources> listVolumes();
- Future<Resources> getCapacities();
-
- Future<Nothing> _applyOperation(const id::UUID& operationUuid);
- void dropOperation(
- const id::UUID& operationUuid,
- const Option<FrameworkID>& frameworkId,
- const Option<Offer::Operation>& operation,
- const string& message);
-
- Future<vector<ResourceConversion>> applyCreateDisk(
- const Resource& resource,
- const id::UUID& operationUuid,
- const Resource::DiskInfo::Source::Type& targetType,
- const Option<string>& targetProfile);
- Future<vector<ResourceConversion>> applyDestroyDisk(
- const Resource& resource);
-
- Try<Nothing> updateOperationStatus(
- const id::UUID& operationUuid,
- const Try<vector<ResourceConversion>>& conversions);
-
- void garbageCollectOperationPath(const id::UUID& operationUuid);
-
- void checkpointResourceProviderState();
- void checkpointVolumeState(const string& volumeId);
-
- void sendResourceProviderStateUpdate();
-
- // NOTE: This is a callback for the status update manager and should
- // not be called directly.
- void sendOperationStatusUpdate(
- const UpdateOperationStatusMessage& update);
-
- enum State
- {
- RECOVERING,
- DISCONNECTED,
- CONNECTED,
- SUBSCRIBED,
- READY
- } state;
-
- const http::URL url;
- const string workDir;
- const string metaDir;
- const ContentType contentType;
- ResourceProviderInfo info;
- const string vendor;
- const SlaveID slaveId;
- const Option<string> authToken;
- const bool strict;
-
- shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
-
- string bootId;
- process::grpc::client::Runtime runtime;
- Owned<v1::resource_provider::Driver> driver;
- OperationStatusUpdateManager statusUpdateManager;
-
- // The mapping of known profiles fetched from the DiskProfileAdaptor.
- hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
-
- hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
- hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services;
-
- Option<ContainerID> nodeContainerId;
- Option<ContainerID> controllerContainerId;
- Option<csi::v0::GetPluginInfoResponse> pluginInfo;
- csi::v0::PluginCapabilities pluginCapabilities;
- csi::v0::ControllerCapabilities controllerCapabilities;
- csi::v0::NodeCapabilities nodeCapabilities;
- Option<string> nodeId;
-
- // We maintain the following invariant: if one operation depends on
- // another, they cannot be in PENDING state at the same time, i.e.,
- // the result of the preceding operation must have been reflected in
- // the total resources.
- // NOTE: We store the list of operations in a `LinkedHashMap` to
- // preserve the order we receive the operations in case we need it.
- LinkedHashMap<id::UUID, Operation> operations;
- Resources totalResources;
- id::UUID resourceVersion;
- hashmap<string, VolumeData> volumes;
-
- // If pending, it means that the storage pools are being reconciled, and all
- // incoming operations that disallow reconciliation will be dropped.
- Future<Nothing> reconciled;
-
- // We maintain a sequence to coordinate reconciliations of storage pools. It
- // keeps track of pending operations that disallow reconciliation, and ensures
- // that any reconciliation waits for these operations to finish.
- Sequence sequence;
-
- struct Metrics
- {
- explicit Metrics(const string& prefix);
- ~Metrics();
-
- // CSI plugin metrics.
- Counter csi_plugin_container_terminations;
- hashmap<csi::v0::RPC, PushGauge> csi_plugin_rpcs_pending;
- hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_successes;
- hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_errors;
- hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_cancelled;
-
- // Operation state metrics.
- hashmap<Offer::Operation::Type, PushGauge> operations_pending;
- hashmap<Offer::Operation::Type, Counter> operations_finished;
- hashmap<Offer::Operation::Type, Counter> operations_failed;
- hashmap<Offer::Operation::Type, Counter> operations_dropped;
- } metrics;
-};
+StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
+ const http::URL& _url,
+ const string& _workDir,
+ const ResourceProviderInfo& _info,
+ const SlaveID& _slaveId,
+ const Option<string>& _authToken,
+ bool _strict)
+ : ProcessBase(process::ID::generate("storage-local-resource-provider")),
+ state(RECOVERING),
+ url(_url),
+ workDir(_workDir),
+ metaDir(slave::paths::getMetaRootDir(_workDir)),
+ contentType(ContentType::PROTOBUF),
+ info(_info),
+ vendor(
+ info.storage().plugin().type() + "." + info.storage().plugin().name()),
+ slaveId(_slaveId),
+ authToken(_authToken),
+ strict(_strict),
+ resourceVersion(id::UUID::random()),
+ sequence("storage-local-resource-provider-sequence"),
+ metrics("resource_providers/" + info.type() + "." + info.name() + "/")
+{
+ diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
+ CHECK_NOTNULL(diskProfileAdaptor.get());
+}
void StorageLocalResourceProviderProcess::connected()
@@ -639,6 +400,121 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
}
+template <
+ csi::v0::RPC rpc,
+ typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
+Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call(
+ const ContainerID& containerId,
+ const csi::v0::Request<rpc>& request,
+ const bool retry)
+{
+ Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+ return loop(
+ self(),
+ [=] {
+ // Perform the call with the latest service future.
+ return getService(containerId)
+ .then(defer(
+ self(),
+ &StorageLocalResourceProviderProcess::_call<rpc>,
+ lambda::_1,
+ request));
+ },
+ [=](const Try<csi::v0::Response<rpc>, StatusError>& result) mutable
+ -> Future<ControlFlow<csi::v0::Response<rpc>>> {
+ Option<Duration> backoff = retry
+ ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
+ : Option<Duration>::none();
+
+ maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+ // We dispatch `__call` for testing purpose.
+ return dispatch(
+ self(),
+ &StorageLocalResourceProviderProcess::__call<rpc>,
+ result,
+ backoff);
+ });
+}
+
+
+template <csi::v0::RPC rpc>
+Future<Try<csi::v0::Response<rpc>, StatusError>>
+StorageLocalResourceProviderProcess::_call(
+ csi::v0::Client client, const csi::v0::Request<rpc>& request)
+{
+ ++metrics.csi_plugin_rpcs_pending.at(rpc);
+
+ return client.call<rpc>(request)
+ .onAny(defer(self(), [=](
+ const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) {
+ --metrics.csi_plugin_rpcs_pending.at(rpc);
+ if (future.isReady() && future->isSome()) {
+ ++metrics.csi_plugin_rpcs_successes.at(rpc);
+ } else if (future.isDiscarded()) {
+ ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
+ } else {
+ ++metrics.csi_plugin_rpcs_errors.at(rpc);
+ }
+ }));
+}
+
+
+template <csi::v0::RPC rpc>
+Future<ControlFlow<csi::v0::Response<rpc>>>
+StorageLocalResourceProviderProcess::__call(
+ const Try<csi::v0::Response<rpc>, StatusError>& result,
+ const Option<Duration>& backoff)
+{
+ if (result.isSome()) {
+ return Break(result.get());
+ }
+
+ if (backoff.isNone()) {
+ return Failure(result.error());
+ }
+
+ // See the link below for retryable status codes:
+ // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
+ switch (result.error().status.error_code()) {
+ case grpc::DEADLINE_EXCEEDED:
+ case grpc::UNAVAILABLE: {
+ LOG(ERROR)
+ << "Received '" << result.error() << "' while calling " << rpc
+ << ". Retrying in " << backoff.get();
+
+ return after(backoff.get())
+ .then([]() -> Future<ControlFlow<csi::v0::Response<rpc>>> {
+ return Continue();
+ });
+ }
+ case grpc::CANCELLED:
+ case grpc::UNKNOWN:
+ case grpc::INVALID_ARGUMENT:
+ case grpc::NOT_FOUND:
+ case grpc::ALREADY_EXISTS:
+ case grpc::PERMISSION_DENIED:
+ case grpc::UNAUTHENTICATED:
+ case grpc::RESOURCE_EXHAUSTED:
+ case grpc::FAILED_PRECONDITION:
+ case grpc::ABORTED:
+ case grpc::OUT_OF_RANGE:
+ case grpc::UNIMPLEMENTED:
+ case grpc::INTERNAL:
+ case grpc::DATA_LOSS: {
+ return Failure(result.error());
+ }
+ case grpc::OK:
+ case grpc::DO_NOT_USE: {
+ UNREACHABLE();
+ }
+ }
+
+ UNREACHABLE();
+}
+
+
void StorageLocalResourceProviderProcess::initialize()
{
Try<string> _bootId = os::bootId();
@@ -1890,112 +1766,6 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
}
-template <
- csi::v0::RPC rpc,
- typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
-Future<typename csi::v0::RPCTraits<rpc>::response_type>
-StorageLocalResourceProviderProcess::call(
- const ContainerID& containerId,
- const typename csi::v0::RPCTraits<rpc>::request_type& request,
- bool retry)
-{
- using Response = typename csi::v0::RPCTraits<rpc>::response_type;
-
- Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
-
- return loop(
- self(),
- [=] {
- // Perform the call with the latest service future.
- return getService(containerId)
- .then(defer(
- self(),
- &StorageLocalResourceProviderProcess::_call<rpc>,
- lambda::_1,
- request));
- },
- [=](const Try<Response, StatusError>& result) mutable
- -> Future<ControlFlow<Response>> {
- if (result.isSome()) {
- return Break(result.get());
- }
-
- if (retry) {
- // See the link below for retryable status codes:
- // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
- switch (result.error().status.error_code()) {
- case grpc::DEADLINE_EXCEEDED:
- case grpc::UNAVAILABLE: {
- Duration delay =
- maxBackoff * (static_cast<double>(os::random()) / RAND_MAX);
-
- maxBackoff =
- std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
-
- LOG(ERROR)
- << "Received '" << result.error() << "' while calling " << rpc
- << ". Retrying in " << delay;
-
- return after(delay)
- .then([]() -> Future<ControlFlow<Response>> {
- return Continue();
- });
- }
- case grpc::CANCELLED:
- case grpc::UNKNOWN:
- case grpc::INVALID_ARGUMENT:
- case grpc::NOT_FOUND:
- case grpc::ALREADY_EXISTS:
- case grpc::PERMISSION_DENIED:
- case grpc::UNAUTHENTICATED:
- case grpc::RESOURCE_EXHAUSTED:
- case grpc::FAILED_PRECONDITION:
- case grpc::ABORTED:
- case grpc::OUT_OF_RANGE:
- case grpc::UNIMPLEMENTED:
- case grpc::INTERNAL:
- case grpc::DATA_LOSS: {
- break;
- }
- case grpc::OK:
- case grpc::DO_NOT_USE: {
- UNREACHABLE();
- }
- }
- }
-
- return Failure(result.error());
- });
-}
-
-
-template <csi::v0::RPC rpc>
-Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
-StorageLocalResourceProviderProcess::_call(
- csi::v0::Client client,
- const typename csi::v0::RPCTraits<rpc>::request_type& request)
-{
- using Response = typename csi::v0::RPCTraits<rpc>::response_type;
-
- ++metrics.csi_plugin_rpcs_pending.at(rpc);
-
- return client.call<rpc>(request)
- .onAny(defer(self(), [=](const Future<Try<Response, StatusError>>& future) {
- --metrics.csi_plugin_rpcs_pending.at(rpc);
- if (future.isReady() && future->isSome()) {
- ++metrics.csi_plugin_rpcs_successes.at(rpc);
- } else if (future.isDiscarded()) {
- ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
- } else {
- ++metrics.csi_plugin_rpcs_errors.at(rpc);
- }
- }));
-}
-
-
-// Returns a future of a CSI client that waits for the endpoint socket
-// to appear if necessary, then connects to the socket and check its
-// readiness.
Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
const string& endpoint)
{
@@ -2040,9 +1810,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
}
-// Returns a future of the latest CSI client for the specified plugin
-// container. If the container is not already running, this method will
-// start a new a new container daemon.
Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
const ContainerID& containerId)
{
@@ -2193,9 +1960,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
}
-// Lists all running plugin containers for this resource provider.
-// NOTE: This might return containers that are not actually running,
-// e.g., if they are being destroyed.
Future<hashmap<ContainerID, Option<ContainerStatus>>>
StorageLocalResourceProviderProcess::getContainers()
{
@@ -2248,7 +2012,6 @@ StorageLocalResourceProviderProcess::getContainers()
}
-// Waits for the specified plugin container to be terminated.
Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
const ContainerID& containerId)
{
@@ -2275,7 +2038,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
}
-// Kills the specified plugin container.
Future<Nothing> StorageLocalResourceProviderProcess::killContainer(
const ContainerID& containerId)
{
@@ -2327,7 +2089,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
}
-// NOTE: This can only be called after `prepareIdentityService`.
Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
{
CHECK_SOME(pluginInfo);
@@ -2368,8 +2129,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
}
-// NOTE: This can only be called after `prepareIdentityService` and
-// `prepareControllerService`.
Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
{
CHECK_SOME(nodeContainerId);
@@ -2398,11 +2157,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
}
-// Transitions the state of the specified volume from `CREATED` or
-// `CONTROLLER_PUBLISH` to `NODE_READY`.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
const string& volumeId)
{
@@ -2451,11 +2205,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
}
-// Transitions the state of the specified volume from `NODE_READY`,
-// `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
const string& volumeId)
{
@@ -2504,10 +2253,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
}
-// Transitions the state of the specified volume from `NODE_READY` or
-// `NODE_STAGE` to `VOL_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
const string& volumeId)
{
@@ -2568,10 +2313,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
}
-// Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE`
-// or `NODE_UNSTAGE` to `NODE_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
const string& volumeId)
{
@@ -2628,10 +2369,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
}
-// Transitions the state of the specified volume from `VOL_READY` or
-// `NODE_PUBLISH` to `PUBLISHED`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
const string& volumeId)
{
@@ -2695,10 +2432,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
}
-// Transitions the state of the specified volume from `PUBLISHED`,
-// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
const string& volumeId)
{
@@ -2751,9 +2484,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
}
-// Returns a CSI volume ID.
-//
-// NOTE: This can only be called after `prepareControllerService`.
Future<string> StorageLocalResourceProviderProcess::createVolume(
const string& name,
const Bytes& capacity,
@@ -2799,10 +2529,6 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
}
-// Returns true if the volume has been deprovisioned.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
const string& volumeId)
{
@@ -2902,12 +2628,6 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
}
-// Validates if a volume supports the capability of the specified profile.
-//
-// NOTE: This can only be called after `prepareIdentityService`.
-//
-// TODO(chhsiao): Validate the volume against the parameters of the profile once
-// we get CSI v1.
Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
const string& volumeId,
const Option<Labels>& metadata,
@@ -2941,6 +2661,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
}
+ // TODO(chhsiao): Validate the volume against the parameters of the profile
+ // once we get CSI v1.
csi::v0::ValidateVolumeCapabilitiesRequest request;
request.set_volume_id(volumeId);
*request.add_volume_capabilities() = profileInfo.capability;
@@ -2973,8 +2695,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
}
-// NOTE: This can only be called after `prepareControllerService` and
-// the resource provider ID has been obtained.
Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
{
CHECK(info.has_id());
@@ -3023,8 +2743,6 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
}
-// NOTE: This can only be called after `prepareControllerService` and
-// the resource provider ID has been obtained.
Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
{
CHECK(info.has_id());
@@ -3066,8 +2784,6 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
}
-// Applies the operation. Speculative operations will be synchronously
-// applied. Do nothing if the operation is already in a terminal state.
Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
const id::UUID& operationUuid)
{
@@ -3158,8 +2874,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
}
-// Sends `OPERATION_DROPPED` without checkpointing the status of
-// the operation.
void StorageLocalResourceProviderProcess::dropOperation(
const id::UUID& operationUuid,
const Option<FrameworkID>& frameworkId,
@@ -3386,8 +3100,6 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
}
-// Synchronously updates `totalResources` and the operation status and
-// then asks the status update manager to send status updates.
Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
const id::UUID& operationUuid,
const Try<vector<ResourceConversion>>& conversions)
diff --git a/src/resource_provider/storage/provider.hpp b/src/resource_provider/storage/provider.hpp
index 331f7b7..ccd09df 100644
--- a/src/resource_provider/storage/provider.hpp
+++ b/src/resource_provider/storage/provider.hpp
@@ -17,6 +17,17 @@
#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__
#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
#include "resource_provider/local.hpp"
namespace mesos {
@@ -32,15 +43,15 @@ public:
static Try<process::Owned<LocalResourceProvider>> create(
const process::http::URL& url,
const std::string& workDir,
- const mesos::ResourceProviderInfo& info,
+ const ResourceProviderInfo& info,
const SlaveID& slaveId,
const Option<std::string>& authToken,
bool strict);
static Try<process::http::authentication::Principal> principal(
- const mesos::ResourceProviderInfo& info);
+ const ResourceProviderInfo& info);
- static Option<Error> validate(const mesos::ResourceProviderInfo& info);
+ static Option<Error> validate(const ResourceProviderInfo& info);
~StorageLocalResourceProvider() override;
@@ -54,7 +65,7 @@ private:
explicit StorageLocalResourceProvider(
const process::http::URL& url,
const std::string& workDir,
- const mesos::ResourceProviderInfo& info,
+ const ResourceProviderInfo& info,
const SlaveID& slaveId,
const Option<std::string>& authToken,
bool strict);
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
new file mode 100644
index 0000000..36187fb
--- /dev/null
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
+#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <mesos/http.hpp>
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
+
+#include <mesos/v1/resource_provider.hpp>
+
+#include <process/future.hpp>
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+#include <process/loop.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/sequence.hpp>
+
+#include <process/metrics/counter.hpp>
+#include <process/metrics/push_gauge.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/duration.hpp>
+#include <stout/hashset.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "csi/client.hpp"
+#include "csi/rpc.hpp"
+#include "csi/state.hpp"
+#include "csi/utils.hpp"
+
+#include "slave/container_daemon.hpp"
+
+#include "status_update_manager/operation.hpp"
+
+namespace mesos {
+namespace internal {
+
+// Storage local resource provider initially picks a random amount of time
+// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
+// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
+// retries are exponentially backed off based on this interval (e.g., 2nd retry
+// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
+// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
+//
+// TODO(chhsiao): Make the retry parameters configurable.
+constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
+constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
+
+
+class StorageLocalResourceProviderProcess
+ : public process::Process<StorageLocalResourceProviderProcess>
+{
+public:
+ explicit StorageLocalResourceProviderProcess(
+ const process::http::URL& _url,
+ const std::string& _workDir,
+ const ResourceProviderInfo& _info,
+ const SlaveID& _slaveId,
+ const Option<std::string>& _authToken,
+ bool _strict);
+
+ StorageLocalResourceProviderProcess(
+ const StorageLocalResourceProviderProcess& other) = delete;
+
+ StorageLocalResourceProviderProcess& operator=(
+ const StorageLocalResourceProviderProcess& other) = delete;
+
+ void connected();
+ void disconnected();
+ void received(const resource_provider::Event& event);
+
+ // Wrapper functions to make CSI calls and update RPC metrics. Made public for
+ // testing purpose.
+ //
+ // The call is made asynchronously and thus no guarantee is provided on the
+ // order in which calls are sent. Callers need to either ensure to not have
+ // multiple conflicting calls in flight, or treat results idempotently.
+ //
+ // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
+ // calls on the same volume, and 2) no profile update while there are ongoing
+ // `CREATE_DISK` or `DESTROY_DISK` operations.
+ //
+ // NOTE: Since this function uses `getService` to obtain the latest service
+ // future, which depends on probe results, it is disabled for making probe
+ // calls; `_call` should be used directly instead.
+ template <
+ csi::v0::RPC rpc,
+ typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
+ process::Future<csi::v0::Response<rpc>> call(
+ const ContainerID& containerId,
+ const csi::v0::Request<rpc>& request,
+ const bool retry = false); // remains const in a mutable lambda.
+
+ template <csi::v0::RPC rpc>
+ process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>>
+ _call(csi::v0::Client client, const csi::v0::Request<rpc>& request);
+
+ template <csi::v0::RPC rpc>
+ process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call(
+ const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result,
+ const Option<Duration>& backoff);
+
+private:
+ struct VolumeData
+ {
+ VolumeData(csi::state::VolumeState&& _state)
+ : state(_state), sequence(new process::Sequence("volume-sequence")) {}
+
+ csi::state::VolumeState state;
+
+ // We run all CSI operations for the same volume on a sequence to
+ // ensure that they are processed in a sequential order.
+ process::Owned<process::Sequence> sequence;
+ };
+
+ void initialize() override;
+ void fatal();
+
+ // The recover functions are responsible to recover the state of the
+ // resource provider and CSI volumes from checkpointed data.
+ process::Future<Nothing> recover();
+ process::Future<Nothing> recoverServices();
+ process::Future<Nothing> recoverVolumes();
+ process::Future<Nothing> recoverResourceProviderState();
+
+ void doReliableRegistration();
+
+ // The reconcile functions are responsible to reconcile the state of
+ // the resource provider from the recovered state and other sources of
+ // truth, such as CSI plugin responses or the status update manager.
+ process::Future<Nothing> reconcileResourceProviderState();
+ process::Future<Nothing> reconcileOperationStatuses();
+ ResourceConversion reconcileResources(
+ const Resources& checkpointed,
+ const Resources& discovered);
+
+ // Spawns a loop to watch for changes in the set of known profiles and update
+ // the profile mapping and storage pools accordingly.
+ void watchProfiles();
+
+ // Update the profile mapping when the set of known profiles changes.
+ // NOTE: This function never fails. If it fails to translate a new
+ // profile, the resource provider will continue to operate with the
+ // set of profiles it knows about.
+ process::Future<Nothing> updateProfiles(const hashset<std::string>& profiles);
+
+ // Reconcile the storage pools when the set of known profiles changes,
+ // or a volume with an unknown profile is destroyed.
+ process::Future<Nothing> reconcileStoragePools();
+
+ // Returns true if the storage pools are allowed to be reconciled when
+ // the operation is being applied.
+ static bool allowsReconciliation(const Offer::Operation& operation);
+
+ // Functions for received events.
+ void subscribed(const resource_provider::Event::Subscribed& subscribed);
+ void applyOperation(
+ const resource_provider::Event::ApplyOperation& operation);
+ void publishResources(
+ const resource_provider::Event::PublishResources& publish);
+ void acknowledgeOperationStatus(
+ const resource_provider::Event::AcknowledgeOperationStatus& acknowledge);
+ void reconcileOperations(
+ const resource_provider::Event::ReconcileOperations& reconcile);
+
+ // Returns a future of a CSI client that waits for the endpoint socket to
+ // appear if necessary, then connects to the socket and check its readiness.
+ process::Future<csi::v0::Client> waitService(const std::string& endpoint);
+
+ // Returns a future of the latest CSI client for the specified plugin
+ // container. If the container is not already running, this method will start
+ // a new a new container daemon.
+ process::Future<csi::v0::Client> getService(const ContainerID& containerId);
+
+ // Lists all running plugin containers for this resource provider.
+ // NOTE: This might return containers that are not actually running, e.g., if
+ // they are being destroyed.
+ process::Future<hashmap<ContainerID, Option<ContainerStatus>>>
+ getContainers();
+
+ // Waits for the specified plugin container to be terminated.
+ process::Future<Nothing> waitContainer(const ContainerID& containerId);
+
+ // Kills the specified plugin container.
+ process::Future<Nothing> killContainer(const ContainerID& containerId);
+
+ process::Future<Nothing> prepareIdentityService();
+
+ // NOTE: This can only be called after `prepareIdentityService`.
+ process::Future<Nothing> prepareControllerService();
+
+ // NOTE: This can only be called after `prepareIdentityService` and
+ // `prepareControllerService`.
+ process::Future<Nothing> prepareNodeService();
+
+ // Transitions the state of the specified volume from `CREATED` or
+ // `CONTROLLER_PUBLISH` to `NODE_READY`.
+ //
+ // NOTE: This can only be called after `prepareControllerService` and
+ // `prepareNodeService`.
+ process::Future<Nothing> controllerPublish(const std::string& volumeId);
+
+ // Transitions the state of the specified volume from `NODE_READY`,
+ // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
+ //
+ // NOTE: This can only be called after `prepareControllerService` and
+ // `prepareNodeService`.
+ process::Future<Nothing> controllerUnpublish(const std::string& volumeId);
+
+ // Transitions the state of the specified volume from `NODE_READY` or
+ // `NODE_STAGE` to `VOL_READY`.
+ //
+ // NOTE: This can only be called after `prepareNodeService`.
+ process::Future<Nothing> nodeStage(const std::string& volumeId);
+
+ // Transitions the state of the specified volume from `VOL_READY`,
+ // `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`.
+ //
+ // NOTE: This can only be called after `prepareNodeService`.
+ process::Future<Nothing> nodeUnstage(const std::string& volumeId);
+
+ // Transitions the state of the specified volume from `VOL_READY` or
+ // `NODE_PUBLISH` to `PUBLISHED`.
+ //
+ // NOTE: This can only be called after `prepareNodeService`.
+ process::Future<Nothing> nodePublish(const std::string& volumeId);
+
+ // Transitions the state of the specified volume from `PUBLISHED`,
+ // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
+ //
+ // NOTE: This can only be called after `prepareNodeService`.
+ process::Future<Nothing> nodeUnpublish(const std::string& volumeId);
+
+ // Returns a CSI volume ID.
+ //
+ // NOTE: This can only be called after `prepareControllerService`.
+ process::Future<std::string> createVolume(
+ const std::string& name,
+ const Bytes& capacity,
+ const DiskProfileAdaptor::ProfileInfo& profileInfo);
+
+ // Returns true if the volume has been deprovisioned.
+ //
+ // NOTE: This can only be called after `prepareControllerService` and
+ // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
+ process::Future<bool> deleteVolume(const std::string& volumeId);
+
+ // Validates if a volume supports the capability of the specified profile.
+ //
+ // NOTE: This can only be called after `prepareIdentityService`.
+ process::Future<Nothing> validateVolume(
+ const std::string& volumeId,
+ const Option<Labels>& metadata,
+ const DiskProfileAdaptor::ProfileInfo& profileInfo);
+
+ // NOTE: This can only be called after `prepareControllerService` and the
+ // resource provider ID has been obtained.
+ process::Future<Resources> listVolumes();
+
+ // NOTE: This can only be called after `prepareControllerService` and the
+ // resource provider ID has been obtained.
+ process::Future<Resources> getCapacities();
+
+ // Applies the operation. Speculative operations will be synchronously
+ // applied. Do nothing if the operation is already in a terminal state.
+ process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
+
+ // Sends `OPERATION_DROPPED` without checkpointing the operation status.
+ void dropOperation(
+ const id::UUID& operationUuid,
+ const Option<FrameworkID>& frameworkId,
+ const Option<Offer::Operation>& operation,
+ const std::string& message);
+
+ process::Future<std::vector<ResourceConversion>> applyCreateDisk(
+ const Resource& resource,
+ const id::UUID& operationUuid,
+ const Resource::DiskInfo::Source::Type& targetType,
+ const Option<std::string>& targetProfile);
+ process::Future<std::vector<ResourceConversion>> applyDestroyDisk(
+ const Resource& resource);
+
+ // Synchronously updates `totalResources` and the operation status and
+ // then asks the status update manager to send status updates.
+ Try<Nothing> updateOperationStatus(
+ const id::UUID& operationUuid,
+ const Try<std::vector<ResourceConversion>>& conversions);
+
+ void garbageCollectOperationPath(const id::UUID& operationUuid);
+
+ void checkpointResourceProviderState();
+ void checkpointVolumeState(const std::string& volumeId);
+
+ void sendResourceProviderStateUpdate();
+
+ // NOTE: This is a callback for the status update manager and should
+ // not be called directly.
+ void sendOperationStatusUpdate(
+ const UpdateOperationStatusMessage& update);
+
+ enum State
+ {
+ RECOVERING,
+ DISCONNECTED,
+ CONNECTED,
+ SUBSCRIBED,
+ READY
+ } state;
+
+ const process::http::URL url;
+ const std::string workDir;
+ const std::string metaDir;
+ const ContentType contentType;
+ ResourceProviderInfo info;
+ const std::string vendor;
+ const SlaveID slaveId;
+ const Option<std::string> authToken;
+ const bool strict;
+
+ std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
+
+ std::string bootId;
+ process::grpc::client::Runtime runtime;
+ process::Owned<v1::resource_provider::Driver> driver;
+ OperationStatusUpdateManager statusUpdateManager;
+
+ // The mapping of known profiles fetched from the DiskProfileAdaptor.
+ hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
+
+ hashmap<ContainerID, process::Owned<slave::ContainerDaemon>> daemons;
+ hashmap<ContainerID, process::Owned<process::Promise<csi::v0::Client>>>
+ services;
+
+ Option<ContainerID> nodeContainerId;
+ Option<ContainerID> controllerContainerId;
+ Option<csi::v0::GetPluginInfoResponse> pluginInfo;
+ csi::v0::PluginCapabilities pluginCapabilities;
+ csi::v0::ControllerCapabilities controllerCapabilities;
+ csi::v0::NodeCapabilities nodeCapabilities;
+ Option<std::string> nodeId;
+
+ // We maintain the following invariant: if one operation depends on
+ // another, they cannot be in PENDING state at the same time, i.e.,
+ // the result of the preceding operation must have been reflected in
+ // the total resources.
+ //
+ // NOTE: We store the list of operations in a `LinkedHashMap` to
+ // preserve the order we receive the operations in case we need it.
+ LinkedHashMap<id::UUID, Operation> operations;
+ Resources totalResources;
+ id::UUID resourceVersion;
+ hashmap<std::string, VolumeData> volumes;
+
+ // If pending, it means that the storage pools are being reconciled, and all
+ // incoming operations that disallow reconciliation will be dropped.
+ process::Future<Nothing> reconciled;
+
+ // We maintain a sequence to coordinate reconciliations of storage pools. It
+ // keeps track of pending operations that disallow reconciliation, and ensures
+ // that any reconciliation waits for these operations to finish.
+ process::Sequence sequence;
+
+ struct Metrics
+ {
+ explicit Metrics(const std::string& prefix);
+ ~Metrics();
+
+ // CSI plugin metrics.
+ process::metrics::Counter csi_plugin_container_terminations;
+ hashmap<csi::v0::RPC, process::metrics::PushGauge> csi_plugin_rpcs_pending;
+ hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_successes;
+ hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_errors;
+ hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_cancelled;
+
+ // Operation state metrics.
+ hashmap<Offer::Operation::Type, process::metrics::PushGauge>
+ operations_pending;
+ hashmap<Offer::Operation::Type, process::metrics::Counter>
+ operations_finished;
+ hashmap<Offer::Operation::Type, process::metrics::Counter>
+ operations_failed;
+ hashmap<Offer::Operation::Type, process::metrics::Counter>
+ operations_dropped;
+ } metrics;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index 7751636..c8f3f04 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -63,7 +63,7 @@ struct RPCParam
rpc,
[](csi::v0::Client client) {
return client
- .call<rpc>(typename csi::v0::RPCTraits<rpc>::request_type())
+ .call<rpc>(csi::v0::Request<rpc>())
.then([] { return Nothing(); });
}
};