You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:15:14 UTC

[mesos] 04/06: Exposed `StorageLocalResourceProviderProcess` for testing purpose.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 2877a0070b108033644817ec1b8f99085f73ab4b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Wed Jan 23 21:44:10 2019 -0800

    Exposed `StorageLocalResourceProviderProcess` for testing purpose.
    
    This patch moves the declaration of the SLRP process into an internal
    header file and add a `__call` function, so a follow-up test could use
    `FUTURE_DISPATCH` to capture a dispatch on an RPC retry.
    
    To simplify the declarations, it also internalizes `RPCTraits` and
    introduce new type aliases, and moves `DEFAULT_CSI_RETRY_BACKOFF_FACTOR`
    and `DEFAULT_CSI_RETRY_INTERVAL_MAX` to the new header for testing.
    
    Review: https://reviews.apache.org/r/69827
---
 src/Makefile.am                                    |   3 +-
 src/csi/client.hpp                                 |   5 +-
 src/csi/rpc.hpp                                    |  11 +
 src/resource_provider/storage/provider.cpp         | 602 ++++++---------------
 src/resource_provider/storage/provider.hpp         |  19 +-
 src/resource_provider/storage/provider_process.hpp | 420 ++++++++++++++
 src/tests/csi_client_tests.cpp                     |   2 +-
 7 files changed, 608 insertions(+), 454 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index eb50fc7..8238c4d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1469,7 +1469,8 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/storage/disk_profile_utils.cpp			\
   resource_provider/storage/disk_profile_utils.hpp			\
   resource_provider/storage/provider.cpp				\
-  resource_provider/storage/provider.hpp
+  resource_provider/storage/provider.hpp				\
+  resource_provider/storage/provider_process.hpp
 
 libmesos_no_3rdparty_la_CPPFLAGS = $(MESOS_CPPFLAGS)
 
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index a0dfedf..c2583cf 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -35,9 +35,8 @@ public:
     : connection(_connection), runtime(_runtime) {}
 
   template <RPC rpc>
-  process::Future<
-      Try<typename RPCTraits<rpc>::response_type, process::grpc::StatusError>>
-  call(typename RPCTraits<rpc>::request_type request);
+  process::Future<Try<Response<rpc>, process::grpc::StatusError>> call(
+      Request<rpc> request);
 
 private:
   process::grpc::client::Connection connection;
diff --git a/src/csi/rpc.hpp b/src/csi/rpc.hpp
index c30a509..b2502ce 100644
--- a/src/csi/rpc.hpp
+++ b/src/csi/rpc.hpp
@@ -52,6 +52,8 @@ enum RPC
 };
 
 
+namespace internal {
+
 template <RPC>
 struct RPCTraits;
 
@@ -191,6 +193,15 @@ struct RPCTraits<NODE_GET_CAPABILITIES>
   typedef NodeGetCapabilitiesResponse response_type;
 };
 
+} // namespace internal {
+
+
+template <RPC rpc>
+using Request = typename internal::RPCTraits<rpc>::request_type;
+
+template <RPC rpc>
+using Response = typename internal::RPCTraits<rpc>::response_type;
+
 
 std::ostream& operator<<(std::ostream& stream, const RPC& rpc);
 
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 1abb9c8..c4068a5 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -19,8 +19,12 @@
 #include <algorithm>
 #include <cctype>
 #include <cstdlib>
+#include <functional>
+#include <list>
 #include <memory>
 #include <numeric>
+#include <queue>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -31,6 +35,8 @@
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/grpc.hpp>
 #include <process/id.hpp>
 #include <process/loop.hpp>
 #include <process/process.hpp>
@@ -41,23 +47,28 @@
 #include <process/metrics/metrics.hpp>
 #include <process/metrics/push_gauge.hpp>
 
+#include <mesos/http.hpp>
 #include <mesos/resources.hpp>
 #include <mesos/type_utils.hpp>
 
 #include <mesos/resource_provider/resource_provider.hpp>
+
 #include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <stout/bytes.hpp>
 #include <stout/duration.hpp>
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/linkedhashmap.hpp>
+#include <stout/nothing.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/strings.hpp>
 #include <stout/unreachable.hpp>
+#include <stout/uuid.hpp>
 
 #include <stout/os/realpath.hpp>
 
@@ -77,6 +88,8 @@
 #include "resource_provider/detector.hpp"
 #include "resource_provider/state.hpp"
 
+#include "resource_provider/storage/provider_process.hpp"
+
 #include "slave/container_daemon.hpp"
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
@@ -106,7 +119,7 @@ using process::Failure;
 using process::Future;
 using process::loop;
 using process::Owned;
-using process::Process;
+using process::ProcessBase;
 using process::Promise;
 using process::Sequence;
 using process::spawn;
@@ -140,17 +153,6 @@ namespace internal {
 // TODO(chhsiao): Make the timeout configurable.
 constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
 
-// Storage local resource provider initially picks a random amount of time
-// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
-// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
-// retries are exponentially backed off based on this interval (e.g., 2nd retry
-// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
-// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
-//
-// TODO(chhsiao): Make the retry parameters configurable.
-constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
-constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
-
 
 // Returns true if the string is a valid Java identifier.
 static bool isValidName(const string& s)
@@ -304,273 +306,32 @@ static inline Resource createRawDiskResource(
 }
 
 
-class StorageLocalResourceProviderProcess
-  : public Process<StorageLocalResourceProviderProcess>
-{
-public:
-  explicit StorageLocalResourceProviderProcess(
-      const http::URL& _url,
-      const string& _workDir,
-      const ResourceProviderInfo& _info,
-      const SlaveID& _slaveId,
-      const Option<string>& _authToken,
-      bool _strict)
-    : ProcessBase(process::ID::generate("storage-local-resource-provider")),
-      state(RECOVERING),
-      url(_url),
-      workDir(_workDir),
-      metaDir(slave::paths::getMetaRootDir(_workDir)),
-      contentType(ContentType::PROTOBUF),
-      info(_info),
-      vendor(
-          info.storage().plugin().type() + "." +
-          info.storage().plugin().name()),
-      slaveId(_slaveId),
-      authToken(_authToken),
-      strict(_strict),
-      resourceVersion(id::UUID::random()),
-      sequence("storage-local-resource-provider-sequence"),
-      metrics("resource_providers/" + info.type() + "." + info.name() + "/")
-  {
-    diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
-    CHECK_NOTNULL(diskProfileAdaptor.get());
-  }
-
-  StorageLocalResourceProviderProcess(
-      const StorageLocalResourceProviderProcess& other) = delete;
-
-  StorageLocalResourceProviderProcess& operator=(
-      const StorageLocalResourceProviderProcess& other) = delete;
-
-  void connected();
-  void disconnected();
-  void received(const Event& event);
-
-private:
-  struct VolumeData
-  {
-    VolumeData(VolumeState&& _state)
-      : state(_state), sequence(new Sequence("volume-sequence")) {}
-
-    VolumeState state;
-
-    // We run all CSI operations for the same volume on a sequence to
-    // ensure that they are processed in a sequential order.
-    Owned<Sequence> sequence;
-  };
-
-  void initialize() override;
-  void fatal();
-
-  // The recover functions are responsible to recover the state of the
-  // resource provider and CSI volumes from checkpointed data.
-  Future<Nothing> recover();
-  Future<Nothing> recoverServices();
-  Future<Nothing> recoverVolumes();
-  Future<Nothing> recoverResourceProviderState();
-
-  void doReliableRegistration();
-
-  // The reconcile functions are responsible to reconcile the state of
-  // the resource provider from the recovered state and other sources of
-  // truth, such as CSI plugin responses or the status update manager.
-  Future<Nothing> reconcileResourceProviderState();
-  Future<Nothing> reconcileOperationStatuses();
-  ResourceConversion reconcileResources(
-      const Resources& checkpointed,
-      const Resources& discovered);
-
-  // Spawns a loop to watch for changes in the set of known profiles and update
-  // the profile mapping and storage pools accordingly.
-  void watchProfiles();
-
-  // Update the profile mapping when the set of known profiles changes.
-  // NOTE: This function never fails. If it fails to translate a new
-  // profile, the resource provider will continue to operate with the
-  // set of profiles it knows about.
-  Future<Nothing> updateProfiles(const hashset<string>& profiles);
-
-  // Reconcile the storage pools when the set of known profiles changes,
-  // or a volume with an unknown profile is destroyed.
-  Future<Nothing> reconcileStoragePools();
-
-  // Returns true if the storage pools are allowed to be reconciled when
-  // the operation is being applied.
-  static bool allowsReconciliation(const Offer::Operation& operation);
-
-  // Functions for received events.
-  void subscribed(const Event::Subscribed& subscribed);
-  void applyOperation(const Event::ApplyOperation& operation);
-  void publishResources(const Event::PublishResources& publish);
-  void acknowledgeOperationStatus(
-      const Event::AcknowledgeOperationStatus& acknowledge);
-  void reconcileOperations(
-      const Event::ReconcileOperations& reconcile);
-
-  // Wrapper functions to make CSI calls and update RPC metrics.
-  //
-  // The call is made asynchronously and thus no guarantee is provided on the
-  // order in which calls are sent. Callers need to either ensure to not have
-  // multiple conflicting calls in flight, or treat results idempotently.
-  //
-  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
-  // calls on the same volume, and 2) no profile update while there are ongoing
-  // `CREATE_DISK` or `DESTROY_DISK` operations.
-  //
-  // NOTE: Since this function uses `getService` to obtain the latest service
-  // future, which depends on probe results, it is disabled for making probe
-  // calls; `_call` should be used directly instead.
-  template <
-      csi::v0::RPC rpc,
-      typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
-  Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
-      const ContainerID& containerId,
-      const typename csi::v0::RPCTraits<rpc>::request_type& request,
-      bool retry = false);
-
-  template <csi::v0::RPC rpc>
-  Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
-  _call(
-      csi::v0::Client client,
-      const typename csi::v0::RPCTraits<rpc>::request_type& request);
-
-  Future<csi::v0::Client> waitService(const string& endpoint);
-  Future<csi::v0::Client> getService(const ContainerID& containerId);
-  Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
-  Future<Nothing> waitContainer(const ContainerID& containerId);
-  Future<Nothing> killContainer(const ContainerID& containerId);
-
-  Future<Nothing> prepareIdentityService();
-  Future<Nothing> prepareControllerService();
-  Future<Nothing> prepareNodeService();
-  Future<Nothing> controllerPublish(const string& volumeId);
-  Future<Nothing> controllerUnpublish(const string& volumeId);
-  Future<Nothing> nodeStage(const string& volumeId);
-  Future<Nothing> nodeUnstage(const string& volumeId);
-  Future<Nothing> nodePublish(const string& volumeId);
-  Future<Nothing> nodeUnpublish(const string& volumeId);
-  Future<string> createVolume(
-      const string& name,
-      const Bytes& capacity,
-      const DiskProfileAdaptor::ProfileInfo& profileInfo);
-  Future<bool> deleteVolume(const string& volumeId);
-  Future<Nothing> validateVolume(
-      const string& volumeId,
-      const Option<Labels>& metadata,
-      const DiskProfileAdaptor::ProfileInfo& profileInfo);
-  Future<Resources> listVolumes();
-  Future<Resources> getCapacities();
-
-  Future<Nothing> _applyOperation(const id::UUID& operationUuid);
-  void dropOperation(
-      const id::UUID& operationUuid,
-      const Option<FrameworkID>& frameworkId,
-      const Option<Offer::Operation>& operation,
-      const string& message);
-
-  Future<vector<ResourceConversion>> applyCreateDisk(
-      const Resource& resource,
-      const id::UUID& operationUuid,
-      const Resource::DiskInfo::Source::Type& targetType,
-      const Option<string>& targetProfile);
-  Future<vector<ResourceConversion>> applyDestroyDisk(
-      const Resource& resource);
-
-  Try<Nothing> updateOperationStatus(
-      const id::UUID& operationUuid,
-      const Try<vector<ResourceConversion>>& conversions);
-
-  void garbageCollectOperationPath(const id::UUID& operationUuid);
-
-  void checkpointResourceProviderState();
-  void checkpointVolumeState(const string& volumeId);
-
-  void sendResourceProviderStateUpdate();
-
-  // NOTE: This is a callback for the status update manager and should
-  // not be called directly.
-  void sendOperationStatusUpdate(
-      const UpdateOperationStatusMessage& update);
-
-  enum State
-  {
-    RECOVERING,
-    DISCONNECTED,
-    CONNECTED,
-    SUBSCRIBED,
-    READY
-  } state;
-
-  const http::URL url;
-  const string workDir;
-  const string metaDir;
-  const ContentType contentType;
-  ResourceProviderInfo info;
-  const string vendor;
-  const SlaveID slaveId;
-  const Option<string> authToken;
-  const bool strict;
-
-  shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
-
-  string bootId;
-  process::grpc::client::Runtime runtime;
-  Owned<v1::resource_provider::Driver> driver;
-  OperationStatusUpdateManager statusUpdateManager;
-
-  // The mapping of known profiles fetched from the DiskProfileAdaptor.
-  hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
-
-  hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
-  hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services;
-
-  Option<ContainerID> nodeContainerId;
-  Option<ContainerID> controllerContainerId;
-  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
-  csi::v0::PluginCapabilities pluginCapabilities;
-  csi::v0::ControllerCapabilities controllerCapabilities;
-  csi::v0::NodeCapabilities nodeCapabilities;
-  Option<string> nodeId;
-
-  // We maintain the following invariant: if one operation depends on
-  // another, they cannot be in PENDING state at the same time, i.e.,
-  // the result of the preceding operation must have been reflected in
-  // the total resources.
-  // NOTE: We store the list of operations in a `LinkedHashMap` to
-  // preserve the order we receive the operations in case we need it.
-  LinkedHashMap<id::UUID, Operation> operations;
-  Resources totalResources;
-  id::UUID resourceVersion;
-  hashmap<string, VolumeData> volumes;
-
-  // If pending, it means that the storage pools are being reconciled, and all
-  // incoming operations that disallow reconciliation will be dropped.
-  Future<Nothing> reconciled;
-
-  // We maintain a sequence to coordinate reconciliations of storage pools. It
-  // keeps track of pending operations that disallow reconciliation, and ensures
-  // that any reconciliation waits for these operations to finish.
-  Sequence sequence;
-
-  struct Metrics
-  {
-    explicit Metrics(const string& prefix);
-    ~Metrics();
-
-    // CSI plugin metrics.
-    Counter csi_plugin_container_terminations;
-    hashmap<csi::v0::RPC, PushGauge> csi_plugin_rpcs_pending;
-    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_successes;
-    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_errors;
-    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_cancelled;
-
-    // Operation state metrics.
-    hashmap<Offer::Operation::Type, PushGauge> operations_pending;
-    hashmap<Offer::Operation::Type, Counter> operations_finished;
-    hashmap<Offer::Operation::Type, Counter> operations_failed;
-    hashmap<Offer::Operation::Type, Counter> operations_dropped;
-  } metrics;
-};
+StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
+    const http::URL& _url,
+    const string& _workDir,
+    const ResourceProviderInfo& _info,
+    const SlaveID& _slaveId,
+    const Option<string>& _authToken,
+    bool _strict)
+  : ProcessBase(process::ID::generate("storage-local-resource-provider")),
+    state(RECOVERING),
+    url(_url),
+    workDir(_workDir),
+    metaDir(slave::paths::getMetaRootDir(_workDir)),
+    contentType(ContentType::PROTOBUF),
+    info(_info),
+    vendor(
+        info.storage().plugin().type() + "." + info.storage().plugin().name()),
+    slaveId(_slaveId),
+    authToken(_authToken),
+    strict(_strict),
+    resourceVersion(id::UUID::random()),
+    sequence("storage-local-resource-provider-sequence"),
+    metrics("resource_providers/" + info.type() + "." + info.name() + "/")
+{
+  diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
+  CHECK_NOTNULL(diskProfileAdaptor.get());
+}
 
 
 void StorageLocalResourceProviderProcess::connected()
@@ -635,6 +396,121 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 }
 
 
+template <
+    csi::v0::RPC rpc,
+    typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
+Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call(
+    const ContainerID& containerId,
+    const csi::v0::Request<rpc>& request,
+    const bool retry)
+{
+  Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+  return loop(
+      self(),
+      [=] {
+        // Perform the call with the latest service future.
+        return getService(containerId)
+          .then(defer(
+              self(),
+              &StorageLocalResourceProviderProcess::_call<rpc>,
+              lambda::_1,
+              request));
+      },
+      [=](const Try<csi::v0::Response<rpc>, StatusError>& result) mutable
+          -> Future<ControlFlow<csi::v0::Response<rpc>>> {
+        Option<Duration> backoff = retry
+          ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
+          : Option<Duration>::none();
+
+        maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+        // We dispatch `__call` for testing purpose.
+        return dispatch(
+            self(),
+            &StorageLocalResourceProviderProcess::__call<rpc>,
+            result,
+            backoff);
+      });
+}
+
+
+template <csi::v0::RPC rpc>
+Future<Try<csi::v0::Response<rpc>, StatusError>>
+StorageLocalResourceProviderProcess::_call(
+    csi::v0::Client client, const csi::v0::Request<rpc>& request)
+{
+  ++metrics.csi_plugin_rpcs_pending.at(rpc);
+
+  return client.call<rpc>(request)
+    .onAny(defer(self(), [=](
+        const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) {
+      --metrics.csi_plugin_rpcs_pending.at(rpc);
+      if (future.isReady() && future->isSome()) {
+        ++metrics.csi_plugin_rpcs_successes.at(rpc);
+      } else if (future.isDiscarded()) {
+        ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
+      } else {
+        ++metrics.csi_plugin_rpcs_errors.at(rpc);
+      }
+    }));
+}
+
+
+template <csi::v0::RPC rpc>
+Future<ControlFlow<csi::v0::Response<rpc>>>
+StorageLocalResourceProviderProcess::__call(
+    const Try<csi::v0::Response<rpc>, StatusError>& result,
+    const Option<Duration>& backoff)
+{
+  if (result.isSome()) {
+    return Break(result.get());
+  }
+
+  if (backoff.isNone()) {
+    return Failure(result.error());
+  }
+
+  // See the link below for retryable status codes:
+  // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
+  switch (result.error().status.error_code()) {
+    case grpc::DEADLINE_EXCEEDED:
+    case grpc::UNAVAILABLE: {
+      LOG(ERROR)
+        << "Received '" << result.error() << "' while calling " << rpc
+        << ". Retrying in " << backoff.get();
+
+      return after(backoff.get())
+        .then([]() -> Future<ControlFlow<csi::v0::Response<rpc>>> {
+          return Continue();
+        });
+    }
+    case grpc::CANCELLED:
+    case grpc::UNKNOWN:
+    case grpc::INVALID_ARGUMENT:
+    case grpc::NOT_FOUND:
+    case grpc::ALREADY_EXISTS:
+    case grpc::PERMISSION_DENIED:
+    case grpc::UNAUTHENTICATED:
+    case grpc::RESOURCE_EXHAUSTED:
+    case grpc::FAILED_PRECONDITION:
+    case grpc::ABORTED:
+    case grpc::OUT_OF_RANGE:
+    case grpc::UNIMPLEMENTED:
+    case grpc::INTERNAL:
+    case grpc::DATA_LOSS: {
+      return Failure(result.error());
+    }
+    case grpc::OK:
+    case grpc::DO_NOT_USE: {
+      UNREACHABLE();
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
 void StorageLocalResourceProviderProcess::initialize()
 {
   Try<string> _bootId = os::bootId();
@@ -1886,112 +1762,6 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
 }
 
 
-template <
-    csi::v0::RPC rpc,
-    typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
-Future<typename csi::v0::RPCTraits<rpc>::response_type>
-StorageLocalResourceProviderProcess::call(
-    const ContainerID& containerId,
-    const typename csi::v0::RPCTraits<rpc>::request_type& request,
-    bool retry)
-{
-  using Response = typename csi::v0::RPCTraits<rpc>::response_type;
-
-  Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
-
-  return loop(
-      self(),
-      [=] {
-        // Perform the call with the latest service future.
-        return getService(containerId)
-          .then(defer(
-              self(),
-              &StorageLocalResourceProviderProcess::_call<rpc>,
-              lambda::_1,
-              request));
-      },
-      [=](const Try<Response, StatusError>& result) mutable
-          -> Future<ControlFlow<Response>> {
-        if (result.isSome()) {
-          return Break(result.get());
-        }
-
-        if (retry) {
-          // See the link below for retryable status codes:
-          // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
-          switch (result.error().status.error_code()) {
-            case grpc::DEADLINE_EXCEEDED:
-            case grpc::UNAVAILABLE: {
-              Duration delay =
-                maxBackoff * (static_cast<double>(os::random()) / RAND_MAX);
-
-              maxBackoff =
-                std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
-
-              LOG(ERROR)
-                << "Received '" << result.error() << "' while calling " << rpc
-                << ". Retrying in " << delay;
-
-              return after(delay)
-                .then([]() -> Future<ControlFlow<Response>> {
-                  return Continue();
-                });
-            }
-            case grpc::CANCELLED:
-            case grpc::UNKNOWN:
-            case grpc::INVALID_ARGUMENT:
-            case grpc::NOT_FOUND:
-            case grpc::ALREADY_EXISTS:
-            case grpc::PERMISSION_DENIED:
-            case grpc::UNAUTHENTICATED:
-            case grpc::RESOURCE_EXHAUSTED:
-            case grpc::FAILED_PRECONDITION:
-            case grpc::ABORTED:
-            case grpc::OUT_OF_RANGE:
-            case grpc::UNIMPLEMENTED:
-            case grpc::INTERNAL:
-            case grpc::DATA_LOSS: {
-              break;
-            }
-            case grpc::OK:
-            case grpc::DO_NOT_USE: {
-              UNREACHABLE();
-            }
-          }
-        }
-
-        return Failure(result.error());
-      });
-}
-
-
-template <csi::v0::RPC rpc>
-Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
-StorageLocalResourceProviderProcess::_call(
-    csi::v0::Client client,
-    const typename csi::v0::RPCTraits<rpc>::request_type& request)
-{
-  using Response = typename csi::v0::RPCTraits<rpc>::response_type;
-
-  ++metrics.csi_plugin_rpcs_pending.at(rpc);
-
-  return client.call<rpc>(request)
-    .onAny(defer(self(), [=](const Future<Try<Response, StatusError>>& future) {
-      --metrics.csi_plugin_rpcs_pending.at(rpc);
-      if (future.isReady() && future->isSome()) {
-        ++metrics.csi_plugin_rpcs_successes.at(rpc);
-      } else if (future.isDiscarded()) {
-        ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
-      } else {
-        ++metrics.csi_plugin_rpcs_errors.at(rpc);
-      }
-    }));
-}
-
-
-// Returns a future of a CSI client that waits for the endpoint socket
-// to appear if necessary, then connects to the socket and check its
-// readiness.
 Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
     const string& endpoint)
 {
@@ -2036,9 +1806,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
 }
 
 
-// Returns a future of the latest CSI client for the specified plugin
-// container. If the container is not already running, this method will
-// start a new a new container daemon.
 Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
     const ContainerID& containerId)
 {
@@ -2189,9 +1956,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
 }
 
 
-// Lists all running plugin containers for this resource provider.
-// NOTE: This might return containers that are not actually running,
-// e.g., if they are being destroyed.
 Future<hashmap<ContainerID, Option<ContainerStatus>>>
 StorageLocalResourceProviderProcess::getContainers()
 {
@@ -2244,7 +2008,6 @@ StorageLocalResourceProviderProcess::getContainers()
 }
 
 
-// Waits for the specified plugin container to be terminated.
 Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
     const ContainerID& containerId)
 {
@@ -2271,7 +2034,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
 }
 
 
-// Kills the specified plugin container.
 Future<Nothing> StorageLocalResourceProviderProcess::killContainer(
     const ContainerID& containerId)
 {
@@ -2323,7 +2085,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
 }
 
 
-// NOTE: This can only be called after `prepareIdentityService`.
 Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 {
   CHECK_SOME(pluginInfo);
@@ -2364,8 +2125,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 }
 
 
-// NOTE: This can only be called after `prepareIdentityService` and
-// `prepareControllerService`.
 Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 {
   CHECK_SOME(nodeContainerId);
@@ -2394,11 +2153,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 }
 
 
-// Transitions the state of the specified volume from `CREATED` or
-// `CONTROLLER_PUBLISH` to `NODE_READY`.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     const string& volumeId)
 {
@@ -2447,11 +2201,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 }
 
 
-// Transitions the state of the specified volume from `NODE_READY`,
-// `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
     const string& volumeId)
 {
@@ -2500,10 +2249,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
 }
 
 
-// Transitions the state of the specified volume from `NODE_READY` or
-// `NODE_STAGE` to `VOL_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
     const string& volumeId)
 {
@@ -2564,10 +2309,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
 }
 
 
-// Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE`
-// or `NODE_UNSTAGE` to `NODE_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
     const string& volumeId)
 {
@@ -2624,10 +2365,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
 }
 
 
-// Transitions the state of the specified volume from `VOL_READY` or
-// `NODE_PUBLISH` to `PUBLISHED`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
     const string& volumeId)
 {
@@ -2691,10 +2428,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
 }
 
 
-// Transitions the state of the specified volume from `PUBLISHED`,
-// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     const string& volumeId)
 {
@@ -2747,9 +2480,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 }
 
 
-// Returns a CSI volume ID.
-//
-// NOTE: This can only be called after `prepareControllerService`.
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
@@ -2795,10 +2525,6 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 }
 
 
-// Returns true if the volume has been deprovisioned.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
 Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId)
 {
@@ -2898,12 +2624,6 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-// Validates if a volume supports the capability of the specified profile.
-//
-// NOTE: This can only be called after `prepareIdentityService`.
-//
-// TODO(chhsiao): Validate the volume against the parameters of the profile once
-// we get CSI v1.
 Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
     const string& volumeId,
     const Option<Labels>& metadata,
@@ -2937,6 +2657,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
     volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
   }
 
+  // TODO(chhsiao): Validate the volume against the parameters of the profile
+  // once we get CSI v1.
   csi::v0::ValidateVolumeCapabilitiesRequest request;
   request.set_volume_id(volumeId);
   *request.add_volume_capabilities() = profileInfo.capability;
@@ -2969,8 +2691,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
 }
 
 
-// NOTE: This can only be called after `prepareControllerService` and
-// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 {
   CHECK(info.has_id());
@@ -3019,8 +2739,6 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 }
 
 
-// NOTE: This can only be called after `prepareControllerService` and
-// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 {
   CHECK(info.has_id());
@@ -3062,8 +2780,6 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 }
 
 
-// Applies the operation. Speculative operations will be synchronously
-// applied. Do nothing if the operation is already in a terminal state.
 Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
     const id::UUID& operationUuid)
 {
@@ -3154,8 +2870,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
 }
 
 
-// Sends `OPERATION_DROPPED` without checkpointing the status of
-// the operation.
 void StorageLocalResourceProviderProcess::dropOperation(
     const id::UUID& operationUuid,
     const Option<FrameworkID>& frameworkId,
@@ -3382,8 +3096,6 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
 }
 
 
-// Synchronously updates `totalResources` and the operation status and
-// then asks the status update manager to send status updates.
 Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
     const id::UUID& operationUuid,
     const Try<vector<ResourceConversion>>& conversions)
diff --git a/src/resource_provider/storage/provider.hpp b/src/resource_provider/storage/provider.hpp
index 331f7b7..ccd09df 100644
--- a/src/resource_provider/storage/provider.hpp
+++ b/src/resource_provider/storage/provider.hpp
@@ -17,6 +17,17 @@
 #ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__
 #define __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__
 
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
 #include "resource_provider/local.hpp"
 
 namespace mesos {
@@ -32,15 +43,15 @@ public:
   static Try<process::Owned<LocalResourceProvider>> create(
       const process::http::URL& url,
       const std::string& workDir,
-      const mesos::ResourceProviderInfo& info,
+      const ResourceProviderInfo& info,
       const SlaveID& slaveId,
       const Option<std::string>& authToken,
       bool strict);
 
   static Try<process::http::authentication::Principal> principal(
-      const mesos::ResourceProviderInfo& info);
+      const ResourceProviderInfo& info);
 
-  static Option<Error> validate(const mesos::ResourceProviderInfo& info);
+  static Option<Error> validate(const ResourceProviderInfo& info);
 
   ~StorageLocalResourceProvider() override;
 
@@ -54,7 +65,7 @@ private:
   explicit StorageLocalResourceProvider(
       const process::http::URL& url,
       const std::string& workDir,
-      const mesos::ResourceProviderInfo& info,
+      const ResourceProviderInfo& info,
       const SlaveID& slaveId,
       const Option<std::string>& authToken,
       bool strict);
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
new file mode 100644
index 0000000..36187fb
--- /dev/null
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
+#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <mesos/http.hpp>
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
+
+#include <mesos/v1/resource_provider.hpp>
+
+#include <process/future.hpp>
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+#include <process/loop.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/sequence.hpp>
+
+#include <process/metrics/counter.hpp>
+#include <process/metrics/push_gauge.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/duration.hpp>
+#include <stout/hashset.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "csi/client.hpp"
+#include "csi/rpc.hpp"
+#include "csi/state.hpp"
+#include "csi/utils.hpp"
+
+#include "slave/container_daemon.hpp"
+
+#include "status_update_manager/operation.hpp"
+
+namespace mesos {
+namespace internal {
+
+// Storage local resource provider initially picks a random amount of time
+// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
+// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
+// retries are exponentially backed off based on this interval (e.g., 2nd retry
+// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
+// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
+//
+// TODO(chhsiao): Make the retry parameters configurable.
+constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
+constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
+
+
+class StorageLocalResourceProviderProcess
+  : public process::Process<StorageLocalResourceProviderProcess>
+{
+public:
+  explicit StorageLocalResourceProviderProcess(
+      const process::http::URL& _url,
+      const std::string& _workDir,
+      const ResourceProviderInfo& _info,
+      const SlaveID& _slaveId,
+      const Option<std::string>& _authToken,
+      bool _strict);
+
+  StorageLocalResourceProviderProcess(
+      const StorageLocalResourceProviderProcess& other) = delete;
+
+  StorageLocalResourceProviderProcess& operator=(
+      const StorageLocalResourceProviderProcess& other) = delete;
+
+  void connected();
+  void disconnected();
+  void received(const resource_provider::Event& event);
+
+  // Wrapper functions to make CSI calls and update RPC metrics. Made public for
+  // testing purpose.
+  //
+  // The call is made asynchronously and thus no guarantee is provided on the
+  // order in which calls are sent. Callers need to either ensure to not have
+  // multiple conflicting calls in flight, or treat results idempotently.
+  //
+  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
+  // calls on the same volume, and 2) no profile update while there are ongoing
+  // `CREATE_DISK` or `DESTROY_DISK` operations.
+  //
+  // NOTE: Since this function uses `getService` to obtain the latest service
+  // future, which depends on probe results, it is disabled for making probe
+  // calls; `_call` should be used directly instead.
+  template <
+      csi::v0::RPC rpc,
+      typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
+  process::Future<csi::v0::Response<rpc>> call(
+      const ContainerID& containerId,
+      const csi::v0::Request<rpc>& request,
+      const bool retry = false); // remains const in a mutable lambda.
+
+  template <csi::v0::RPC rpc>
+  process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>>
+  _call(csi::v0::Client client, const csi::v0::Request<rpc>& request);
+
+  template <csi::v0::RPC rpc>
+  process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call(
+      const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result,
+      const Option<Duration>& backoff);
+
+private:
+  struct VolumeData
+  {
+    VolumeData(csi::state::VolumeState&& _state)
+      : state(_state), sequence(new process::Sequence("volume-sequence")) {}
+
+    csi::state::VolumeState state;
+
+    // We run all CSI operations for the same volume on a sequence to
+    // ensure that they are processed in a sequential order.
+    process::Owned<process::Sequence> sequence;
+  };
+
+  void initialize() override;
+  void fatal();
+
+  // The recover functions are responsible to recover the state of the
+  // resource provider and CSI volumes from checkpointed data.
+  process::Future<Nothing> recover();
+  process::Future<Nothing> recoverServices();
+  process::Future<Nothing> recoverVolumes();
+  process::Future<Nothing> recoverResourceProviderState();
+
+  void doReliableRegistration();
+
+  // The reconcile functions are responsible to reconcile the state of
+  // the resource provider from the recovered state and other sources of
+  // truth, such as CSI plugin responses or the status update manager.
+  process::Future<Nothing> reconcileResourceProviderState();
+  process::Future<Nothing> reconcileOperationStatuses();
+  ResourceConversion reconcileResources(
+      const Resources& checkpointed,
+      const Resources& discovered);
+
+  // Spawns a loop to watch for changes in the set of known profiles and update
+  // the profile mapping and storage pools accordingly.
+  void watchProfiles();
+
+  // Update the profile mapping when the set of known profiles changes.
+  // NOTE: This function never fails. If it fails to translate a new
+  // profile, the resource provider will continue to operate with the
+  // set of profiles it knows about.
+  process::Future<Nothing> updateProfiles(const hashset<std::string>& profiles);
+
+  // Reconcile the storage pools when the set of known profiles changes,
+  // or a volume with an unknown profile is destroyed.
+  process::Future<Nothing> reconcileStoragePools();
+
+  // Returns true if the storage pools are allowed to be reconciled when
+  // the operation is being applied.
+  static bool allowsReconciliation(const Offer::Operation& operation);
+
+  // Functions for received events.
+  void subscribed(const resource_provider::Event::Subscribed& subscribed);
+  void applyOperation(
+      const resource_provider::Event::ApplyOperation& operation);
+  void publishResources(
+      const resource_provider::Event::PublishResources& publish);
+  void acknowledgeOperationStatus(
+      const resource_provider::Event::AcknowledgeOperationStatus& acknowledge);
+  void reconcileOperations(
+      const resource_provider::Event::ReconcileOperations& reconcile);
+
+  // Returns a future of a CSI client that waits for the endpoint socket to
+  // appear if necessary, then connects to the socket and check its readiness.
+  process::Future<csi::v0::Client> waitService(const std::string& endpoint);
+
+  // Returns a future of the latest CSI client for the specified plugin
+  // container. If the container is not already running, this method will start
+  // a new a new container daemon.
+  process::Future<csi::v0::Client> getService(const ContainerID& containerId);
+
+  // Lists all running plugin containers for this resource provider.
+  // NOTE: This might return containers that are not actually running, e.g., if
+  // they are being destroyed.
+  process::Future<hashmap<ContainerID, Option<ContainerStatus>>>
+  getContainers();
+
+  // Waits for the specified plugin container to be terminated.
+  process::Future<Nothing> waitContainer(const ContainerID& containerId);
+
+  // Kills the specified plugin container.
+  process::Future<Nothing> killContainer(const ContainerID& containerId);
+
+  process::Future<Nothing> prepareIdentityService();
+
+  // NOTE: This can only be called after `prepareIdentityService`.
+  process::Future<Nothing> prepareControllerService();
+
+  // NOTE: This can only be called after `prepareIdentityService` and
+  // `prepareControllerService`.
+  process::Future<Nothing> prepareNodeService();
+
+  // Transitions the state of the specified volume from `CREATED` or
+  // `CONTROLLER_PUBLISH` to `NODE_READY`.
+  //
+  // NOTE: This can only be called after `prepareControllerService` and
+  // `prepareNodeService`.
+  process::Future<Nothing> controllerPublish(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `NODE_READY`,
+  // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
+  //
+  // NOTE: This can only be called after `prepareControllerService` and
+  // `prepareNodeService`.
+  process::Future<Nothing> controllerUnpublish(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `NODE_READY` or
+  // `NODE_STAGE` to `VOL_READY`.
+  //
+  // NOTE: This can only be called after `prepareNodeService`.
+  process::Future<Nothing> nodeStage(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `VOL_READY`,
+  // `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`.
+  //
+  // NOTE: This can only be called after `prepareNodeService`.
+  process::Future<Nothing> nodeUnstage(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `VOL_READY` or
+  // `NODE_PUBLISH` to `PUBLISHED`.
+  //
+  // NOTE: This can only be called after `prepareNodeService`.
+  process::Future<Nothing> nodePublish(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `PUBLISHED`,
+  // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
+  //
+  // NOTE: This can only be called after `prepareNodeService`.
+  process::Future<Nothing> nodeUnpublish(const std::string& volumeId);
+
+  // Returns a CSI volume ID.
+  //
+  // NOTE: This can only be called after `prepareControllerService`.
+  process::Future<std::string> createVolume(
+      const std::string& name,
+      const Bytes& capacity,
+      const DiskProfileAdaptor::ProfileInfo& profileInfo);
+
+  // Returns true if the volume has been deprovisioned.
+  //
+  // NOTE: This can only be called after `prepareControllerService` and
+  // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
+  process::Future<bool> deleteVolume(const std::string& volumeId);
+
+  // Validates if a volume supports the capability of the specified profile.
+  //
+  // NOTE: This can only be called after `prepareIdentityService`.
+  process::Future<Nothing> validateVolume(
+      const std::string& volumeId,
+      const Option<Labels>& metadata,
+      const DiskProfileAdaptor::ProfileInfo& profileInfo);
+
+  // NOTE: This can only be called after `prepareControllerService` and the
+  // resource provider ID has been obtained.
+  process::Future<Resources> listVolumes();
+
+  // NOTE: This can only be called after `prepareControllerService` and the
+  // resource provider ID has been obtained.
+  process::Future<Resources> getCapacities();
+
+  // Applies the operation. Speculative operations will be synchronously
+  // applied. Do nothing if the operation is already in a terminal state.
+  process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
+
+  // Sends `OPERATION_DROPPED` without checkpointing the operation status.
+  void dropOperation(
+      const id::UUID& operationUuid,
+      const Option<FrameworkID>& frameworkId,
+      const Option<Offer::Operation>& operation,
+      const std::string& message);
+
+  process::Future<std::vector<ResourceConversion>> applyCreateDisk(
+      const Resource& resource,
+      const id::UUID& operationUuid,
+      const Resource::DiskInfo::Source::Type& targetType,
+      const Option<std::string>& targetProfile);
+  process::Future<std::vector<ResourceConversion>> applyDestroyDisk(
+      const Resource& resource);
+
+  // Synchronously updates `totalResources` and the operation status and
+  // then asks the status update manager to send status updates.
+  Try<Nothing> updateOperationStatus(
+      const id::UUID& operationUuid,
+      const Try<std::vector<ResourceConversion>>& conversions);
+
+  void garbageCollectOperationPath(const id::UUID& operationUuid);
+
+  void checkpointResourceProviderState();
+  void checkpointVolumeState(const std::string& volumeId);
+
+  void sendResourceProviderStateUpdate();
+
+  // NOTE: This is a callback for the status update manager and should
+  // not be called directly.
+  void sendOperationStatusUpdate(
+      const UpdateOperationStatusMessage& update);
+
+  enum State
+  {
+    RECOVERING,
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED,
+    READY
+  } state;
+
+  const process::http::URL url;
+  const std::string workDir;
+  const std::string metaDir;
+  const ContentType contentType;
+  ResourceProviderInfo info;
+  const std::string vendor;
+  const SlaveID slaveId;
+  const Option<std::string> authToken;
+  const bool strict;
+
+  std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
+
+  std::string bootId;
+  process::grpc::client::Runtime runtime;
+  process::Owned<v1::resource_provider::Driver> driver;
+  OperationStatusUpdateManager statusUpdateManager;
+
+  // The mapping of known profiles fetched from the DiskProfileAdaptor.
+  hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
+
+  hashmap<ContainerID, process::Owned<slave::ContainerDaemon>> daemons;
+  hashmap<ContainerID, process::Owned<process::Promise<csi::v0::Client>>>
+    services;
+
+  Option<ContainerID> nodeContainerId;
+  Option<ContainerID> controllerContainerId;
+  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
+  csi::v0::PluginCapabilities pluginCapabilities;
+  csi::v0::ControllerCapabilities controllerCapabilities;
+  csi::v0::NodeCapabilities nodeCapabilities;
+  Option<std::string> nodeId;
+
+  // We maintain the following invariant: if one operation depends on
+  // another, they cannot be in PENDING state at the same time, i.e.,
+  // the result of the preceding operation must have been reflected in
+  // the total resources.
+  //
+  // NOTE: We store the list of operations in a `LinkedHashMap` to
+  // preserve the order we receive the operations in case we need it.
+  LinkedHashMap<id::UUID, Operation> operations;
+  Resources totalResources;
+  id::UUID resourceVersion;
+  hashmap<std::string, VolumeData> volumes;
+
+  // If pending, it means that the storage pools are being reconciled, and all
+  // incoming operations that disallow reconciliation will be dropped.
+  process::Future<Nothing> reconciled;
+
+  // We maintain a sequence to coordinate reconciliations of storage pools. It
+  // keeps track of pending operations that disallow reconciliation, and ensures
+  // that any reconciliation waits for these operations to finish.
+  process::Sequence sequence;
+
+  struct Metrics
+  {
+    explicit Metrics(const std::string& prefix);
+    ~Metrics();
+
+    // CSI plugin metrics.
+    process::metrics::Counter csi_plugin_container_terminations;
+    hashmap<csi::v0::RPC, process::metrics::PushGauge> csi_plugin_rpcs_pending;
+    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_successes;
+    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_errors;
+    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_cancelled;
+
+    // Operation state metrics.
+    hashmap<Offer::Operation::Type, process::metrics::PushGauge>
+      operations_pending;
+    hashmap<Offer::Operation::Type, process::metrics::Counter>
+      operations_finished;
+    hashmap<Offer::Operation::Type, process::metrics::Counter>
+      operations_failed;
+    hashmap<Offer::Operation::Type, process::metrics::Counter>
+      operations_dropped;
+  } metrics;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index 7751636..c8f3f04 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -63,7 +63,7 @@ struct RPCParam
       rpc,
       [](csi::v0::Client client) {
         return client
-          .call<rpc>(typename csi::v0::RPCTraits<rpc>::request_type())
+          .call<rpc>(csi::v0::Request<rpc>())
           .then([] { return Nothing(); });
       }
     };