You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:15:10 UTC

[mesos] branch 1.7.x updated (30549eb -> e3623a4)

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a change to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 30549eb  Updated Mesos version to 1.7.2.
     new 91cf91c  Made starting and stopping on CSI plugin containers more verbose.
     new 994f208  Preliminary SLRP refactoring for RPC retry.
     new 7c7a874  Implemented the RPC retry logic for SLRP.
     new 2877a00  Exposed `StorageLocalResourceProviderProcess` for testing purpose.
     new 80c72a1  Improved error printing for gRPC statuses.
     new e3623a4  Added MESOS-9517 to the 1.7.2 CHANGELOG.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 3rdparty/libprocess/include/process/grpc.hpp       |   14 +-
 3rdparty/libprocess/src/grpc.cpp                   |   53 +
 CHANGELOG                                          |    1 +
 src/Makefile.am                                    |    3 +-
 src/csi/client.cpp                                 |  292 ++---
 src/csi/client.hpp                                 |   84 +-
 src/csi/rpc.hpp                                    |   11 +
 src/resource_provider/storage/provider.cpp         | 1281 ++++++++------------
 src/resource_provider/storage/provider.hpp         |   19 +-
 src/resource_provider/storage/provider_process.hpp |  420 +++++++
 src/tests/csi_client_tests.cpp                     |    2 +-
 11 files changed, 1189 insertions(+), 991 deletions(-)
 create mode 100644 src/resource_provider/storage/provider_process.hpp


[mesos] 06/06: Added MESOS-9517 to the 1.7.2 CHANGELOG.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit e3623a47c85f0c48abaf2929de0647b541498e09
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 29 11:22:24 2019 -0800

    Added MESOS-9517 to the 1.7.2 CHANGELOG.
---
 CHANGELOG | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG b/CHANGELOG
index bbdbe22..3e40356 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -3,6 +3,7 @@ Release Notes - Mesos - Version 1.7.2 (WIP)
 * This is a bug fix release.
 
 ** Bug
+  * [MESOS-9517] - SLRP should treat gRPC timeouts as non-terminal errors, instead of reporting OPERATION_FAILED.
   * [MESOS-9531] - chown error handling is incorrect in createSandboxDirectory.
   * [MESOS-9532] - ResourceOffersTest.ResourceOfferWithMultipleSlaves is flaky.
 


[mesos] 04/06: Exposed `StorageLocalResourceProviderProcess` for testing purpose.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 2877a0070b108033644817ec1b8f99085f73ab4b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Wed Jan 23 21:44:10 2019 -0800

    Exposed `StorageLocalResourceProviderProcess` for testing purpose.
    
    This patch moves the declaration of the SLRP process into an internal
    header file and add a `__call` function, so a follow-up test could use
    `FUTURE_DISPATCH` to capture a dispatch on an RPC retry.
    
    To simplify the declarations, it also internalizes `RPCTraits` and
    introduce new type aliases, and moves `DEFAULT_CSI_RETRY_BACKOFF_FACTOR`
    and `DEFAULT_CSI_RETRY_INTERVAL_MAX` to the new header for testing.
    
    Review: https://reviews.apache.org/r/69827
---
 src/Makefile.am                                    |   3 +-
 src/csi/client.hpp                                 |   5 +-
 src/csi/rpc.hpp                                    |  11 +
 src/resource_provider/storage/provider.cpp         | 602 ++++++---------------
 src/resource_provider/storage/provider.hpp         |  19 +-
 src/resource_provider/storage/provider_process.hpp | 420 ++++++++++++++
 src/tests/csi_client_tests.cpp                     |   2 +-
 7 files changed, 608 insertions(+), 454 deletions(-)

diff --git a/src/Makefile.am b/src/Makefile.am
index eb50fc7..8238c4d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1469,7 +1469,8 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/storage/disk_profile_utils.cpp			\
   resource_provider/storage/disk_profile_utils.hpp			\
   resource_provider/storage/provider.cpp				\
-  resource_provider/storage/provider.hpp
+  resource_provider/storage/provider.hpp				\
+  resource_provider/storage/provider_process.hpp
 
 libmesos_no_3rdparty_la_CPPFLAGS = $(MESOS_CPPFLAGS)
 
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index a0dfedf..c2583cf 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -35,9 +35,8 @@ public:
     : connection(_connection), runtime(_runtime) {}
 
   template <RPC rpc>
-  process::Future<
-      Try<typename RPCTraits<rpc>::response_type, process::grpc::StatusError>>
-  call(typename RPCTraits<rpc>::request_type request);
+  process::Future<Try<Response<rpc>, process::grpc::StatusError>> call(
+      Request<rpc> request);
 
 private:
   process::grpc::client::Connection connection;
diff --git a/src/csi/rpc.hpp b/src/csi/rpc.hpp
index c30a509..b2502ce 100644
--- a/src/csi/rpc.hpp
+++ b/src/csi/rpc.hpp
@@ -52,6 +52,8 @@ enum RPC
 };
 
 
+namespace internal {
+
 template <RPC>
 struct RPCTraits;
 
@@ -191,6 +193,15 @@ struct RPCTraits<NODE_GET_CAPABILITIES>
   typedef NodeGetCapabilitiesResponse response_type;
 };
 
+} // namespace internal {
+
+
+template <RPC rpc>
+using Request = typename internal::RPCTraits<rpc>::request_type;
+
+template <RPC rpc>
+using Response = typename internal::RPCTraits<rpc>::response_type;
+
 
 std::ostream& operator<<(std::ostream& stream, const RPC& rpc);
 
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 1abb9c8..c4068a5 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -19,8 +19,12 @@
 #include <algorithm>
 #include <cctype>
 #include <cstdlib>
+#include <functional>
+#include <list>
 #include <memory>
 #include <numeric>
+#include <queue>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -31,6 +35,8 @@
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/grpc.hpp>
 #include <process/id.hpp>
 #include <process/loop.hpp>
 #include <process/process.hpp>
@@ -41,23 +47,28 @@
 #include <process/metrics/metrics.hpp>
 #include <process/metrics/push_gauge.hpp>
 
+#include <mesos/http.hpp>
 #include <mesos/resources.hpp>
 #include <mesos/type_utils.hpp>
 
 #include <mesos/resource_provider/resource_provider.hpp>
+
 #include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <stout/bytes.hpp>
 #include <stout/duration.hpp>
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/linkedhashmap.hpp>
+#include <stout/nothing.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/strings.hpp>
 #include <stout/unreachable.hpp>
+#include <stout/uuid.hpp>
 
 #include <stout/os/realpath.hpp>
 
@@ -77,6 +88,8 @@
 #include "resource_provider/detector.hpp"
 #include "resource_provider/state.hpp"
 
+#include "resource_provider/storage/provider_process.hpp"
+
 #include "slave/container_daemon.hpp"
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
@@ -106,7 +119,7 @@ using process::Failure;
 using process::Future;
 using process::loop;
 using process::Owned;
-using process::Process;
+using process::ProcessBase;
 using process::Promise;
 using process::Sequence;
 using process::spawn;
@@ -140,17 +153,6 @@ namespace internal {
 // TODO(chhsiao): Make the timeout configurable.
 constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
 
-// Storage local resource provider initially picks a random amount of time
-// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
-// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
-// retries are exponentially backed off based on this interval (e.g., 2nd retry
-// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
-// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
-//
-// TODO(chhsiao): Make the retry parameters configurable.
-constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
-constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
-
 
 // Returns true if the string is a valid Java identifier.
 static bool isValidName(const string& s)
@@ -304,273 +306,32 @@ static inline Resource createRawDiskResource(
 }
 
 
-class StorageLocalResourceProviderProcess
-  : public Process<StorageLocalResourceProviderProcess>
-{
-public:
-  explicit StorageLocalResourceProviderProcess(
-      const http::URL& _url,
-      const string& _workDir,
-      const ResourceProviderInfo& _info,
-      const SlaveID& _slaveId,
-      const Option<string>& _authToken,
-      bool _strict)
-    : ProcessBase(process::ID::generate("storage-local-resource-provider")),
-      state(RECOVERING),
-      url(_url),
-      workDir(_workDir),
-      metaDir(slave::paths::getMetaRootDir(_workDir)),
-      contentType(ContentType::PROTOBUF),
-      info(_info),
-      vendor(
-          info.storage().plugin().type() + "." +
-          info.storage().plugin().name()),
-      slaveId(_slaveId),
-      authToken(_authToken),
-      strict(_strict),
-      resourceVersion(id::UUID::random()),
-      sequence("storage-local-resource-provider-sequence"),
-      metrics("resource_providers/" + info.type() + "." + info.name() + "/")
-  {
-    diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
-    CHECK_NOTNULL(diskProfileAdaptor.get());
-  }
-
-  StorageLocalResourceProviderProcess(
-      const StorageLocalResourceProviderProcess& other) = delete;
-
-  StorageLocalResourceProviderProcess& operator=(
-      const StorageLocalResourceProviderProcess& other) = delete;
-
-  void connected();
-  void disconnected();
-  void received(const Event& event);
-
-private:
-  struct VolumeData
-  {
-    VolumeData(VolumeState&& _state)
-      : state(_state), sequence(new Sequence("volume-sequence")) {}
-
-    VolumeState state;
-
-    // We run all CSI operations for the same volume on a sequence to
-    // ensure that they are processed in a sequential order.
-    Owned<Sequence> sequence;
-  };
-
-  void initialize() override;
-  void fatal();
-
-  // The recover functions are responsible to recover the state of the
-  // resource provider and CSI volumes from checkpointed data.
-  Future<Nothing> recover();
-  Future<Nothing> recoverServices();
-  Future<Nothing> recoverVolumes();
-  Future<Nothing> recoverResourceProviderState();
-
-  void doReliableRegistration();
-
-  // The reconcile functions are responsible to reconcile the state of
-  // the resource provider from the recovered state and other sources of
-  // truth, such as CSI plugin responses or the status update manager.
-  Future<Nothing> reconcileResourceProviderState();
-  Future<Nothing> reconcileOperationStatuses();
-  ResourceConversion reconcileResources(
-      const Resources& checkpointed,
-      const Resources& discovered);
-
-  // Spawns a loop to watch for changes in the set of known profiles and update
-  // the profile mapping and storage pools accordingly.
-  void watchProfiles();
-
-  // Update the profile mapping when the set of known profiles changes.
-  // NOTE: This function never fails. If it fails to translate a new
-  // profile, the resource provider will continue to operate with the
-  // set of profiles it knows about.
-  Future<Nothing> updateProfiles(const hashset<string>& profiles);
-
-  // Reconcile the storage pools when the set of known profiles changes,
-  // or a volume with an unknown profile is destroyed.
-  Future<Nothing> reconcileStoragePools();
-
-  // Returns true if the storage pools are allowed to be reconciled when
-  // the operation is being applied.
-  static bool allowsReconciliation(const Offer::Operation& operation);
-
-  // Functions for received events.
-  void subscribed(const Event::Subscribed& subscribed);
-  void applyOperation(const Event::ApplyOperation& operation);
-  void publishResources(const Event::PublishResources& publish);
-  void acknowledgeOperationStatus(
-      const Event::AcknowledgeOperationStatus& acknowledge);
-  void reconcileOperations(
-      const Event::ReconcileOperations& reconcile);
-
-  // Wrapper functions to make CSI calls and update RPC metrics.
-  //
-  // The call is made asynchronously and thus no guarantee is provided on the
-  // order in which calls are sent. Callers need to either ensure to not have
-  // multiple conflicting calls in flight, or treat results idempotently.
-  //
-  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
-  // calls on the same volume, and 2) no profile update while there are ongoing
-  // `CREATE_DISK` or `DESTROY_DISK` operations.
-  //
-  // NOTE: Since this function uses `getService` to obtain the latest service
-  // future, which depends on probe results, it is disabled for making probe
-  // calls; `_call` should be used directly instead.
-  template <
-      csi::v0::RPC rpc,
-      typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
-  Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
-      const ContainerID& containerId,
-      const typename csi::v0::RPCTraits<rpc>::request_type& request,
-      bool retry = false);
-
-  template <csi::v0::RPC rpc>
-  Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
-  _call(
-      csi::v0::Client client,
-      const typename csi::v0::RPCTraits<rpc>::request_type& request);
-
-  Future<csi::v0::Client> waitService(const string& endpoint);
-  Future<csi::v0::Client> getService(const ContainerID& containerId);
-  Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
-  Future<Nothing> waitContainer(const ContainerID& containerId);
-  Future<Nothing> killContainer(const ContainerID& containerId);
-
-  Future<Nothing> prepareIdentityService();
-  Future<Nothing> prepareControllerService();
-  Future<Nothing> prepareNodeService();
-  Future<Nothing> controllerPublish(const string& volumeId);
-  Future<Nothing> controllerUnpublish(const string& volumeId);
-  Future<Nothing> nodeStage(const string& volumeId);
-  Future<Nothing> nodeUnstage(const string& volumeId);
-  Future<Nothing> nodePublish(const string& volumeId);
-  Future<Nothing> nodeUnpublish(const string& volumeId);
-  Future<string> createVolume(
-      const string& name,
-      const Bytes& capacity,
-      const DiskProfileAdaptor::ProfileInfo& profileInfo);
-  Future<bool> deleteVolume(const string& volumeId);
-  Future<Nothing> validateVolume(
-      const string& volumeId,
-      const Option<Labels>& metadata,
-      const DiskProfileAdaptor::ProfileInfo& profileInfo);
-  Future<Resources> listVolumes();
-  Future<Resources> getCapacities();
-
-  Future<Nothing> _applyOperation(const id::UUID& operationUuid);
-  void dropOperation(
-      const id::UUID& operationUuid,
-      const Option<FrameworkID>& frameworkId,
-      const Option<Offer::Operation>& operation,
-      const string& message);
-
-  Future<vector<ResourceConversion>> applyCreateDisk(
-      const Resource& resource,
-      const id::UUID& operationUuid,
-      const Resource::DiskInfo::Source::Type& targetType,
-      const Option<string>& targetProfile);
-  Future<vector<ResourceConversion>> applyDestroyDisk(
-      const Resource& resource);
-
-  Try<Nothing> updateOperationStatus(
-      const id::UUID& operationUuid,
-      const Try<vector<ResourceConversion>>& conversions);
-
-  void garbageCollectOperationPath(const id::UUID& operationUuid);
-
-  void checkpointResourceProviderState();
-  void checkpointVolumeState(const string& volumeId);
-
-  void sendResourceProviderStateUpdate();
-
-  // NOTE: This is a callback for the status update manager and should
-  // not be called directly.
-  void sendOperationStatusUpdate(
-      const UpdateOperationStatusMessage& update);
-
-  enum State
-  {
-    RECOVERING,
-    DISCONNECTED,
-    CONNECTED,
-    SUBSCRIBED,
-    READY
-  } state;
-
-  const http::URL url;
-  const string workDir;
-  const string metaDir;
-  const ContentType contentType;
-  ResourceProviderInfo info;
-  const string vendor;
-  const SlaveID slaveId;
-  const Option<string> authToken;
-  const bool strict;
-
-  shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
-
-  string bootId;
-  process::grpc::client::Runtime runtime;
-  Owned<v1::resource_provider::Driver> driver;
-  OperationStatusUpdateManager statusUpdateManager;
-
-  // The mapping of known profiles fetched from the DiskProfileAdaptor.
-  hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
-
-  hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
-  hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services;
-
-  Option<ContainerID> nodeContainerId;
-  Option<ContainerID> controllerContainerId;
-  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
-  csi::v0::PluginCapabilities pluginCapabilities;
-  csi::v0::ControllerCapabilities controllerCapabilities;
-  csi::v0::NodeCapabilities nodeCapabilities;
-  Option<string> nodeId;
-
-  // We maintain the following invariant: if one operation depends on
-  // another, they cannot be in PENDING state at the same time, i.e.,
-  // the result of the preceding operation must have been reflected in
-  // the total resources.
-  // NOTE: We store the list of operations in a `LinkedHashMap` to
-  // preserve the order we receive the operations in case we need it.
-  LinkedHashMap<id::UUID, Operation> operations;
-  Resources totalResources;
-  id::UUID resourceVersion;
-  hashmap<string, VolumeData> volumes;
-
-  // If pending, it means that the storage pools are being reconciled, and all
-  // incoming operations that disallow reconciliation will be dropped.
-  Future<Nothing> reconciled;
-
-  // We maintain a sequence to coordinate reconciliations of storage pools. It
-  // keeps track of pending operations that disallow reconciliation, and ensures
-  // that any reconciliation waits for these operations to finish.
-  Sequence sequence;
-
-  struct Metrics
-  {
-    explicit Metrics(const string& prefix);
-    ~Metrics();
-
-    // CSI plugin metrics.
-    Counter csi_plugin_container_terminations;
-    hashmap<csi::v0::RPC, PushGauge> csi_plugin_rpcs_pending;
-    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_successes;
-    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_errors;
-    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_cancelled;
-
-    // Operation state metrics.
-    hashmap<Offer::Operation::Type, PushGauge> operations_pending;
-    hashmap<Offer::Operation::Type, Counter> operations_finished;
-    hashmap<Offer::Operation::Type, Counter> operations_failed;
-    hashmap<Offer::Operation::Type, Counter> operations_dropped;
-  } metrics;
-};
+StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
+    const http::URL& _url,
+    const string& _workDir,
+    const ResourceProviderInfo& _info,
+    const SlaveID& _slaveId,
+    const Option<string>& _authToken,
+    bool _strict)
+  : ProcessBase(process::ID::generate("storage-local-resource-provider")),
+    state(RECOVERING),
+    url(_url),
+    workDir(_workDir),
+    metaDir(slave::paths::getMetaRootDir(_workDir)),
+    contentType(ContentType::PROTOBUF),
+    info(_info),
+    vendor(
+        info.storage().plugin().type() + "." + info.storage().plugin().name()),
+    slaveId(_slaveId),
+    authToken(_authToken),
+    strict(_strict),
+    resourceVersion(id::UUID::random()),
+    sequence("storage-local-resource-provider-sequence"),
+    metrics("resource_providers/" + info.type() + "." + info.name() + "/")
+{
+  diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
+  CHECK_NOTNULL(diskProfileAdaptor.get());
+}
 
 
 void StorageLocalResourceProviderProcess::connected()
@@ -635,6 +396,121 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 }
 
 
+template <
+    csi::v0::RPC rpc,
+    typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
+Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call(
+    const ContainerID& containerId,
+    const csi::v0::Request<rpc>& request,
+    const bool retry)
+{
+  Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+  return loop(
+      self(),
+      [=] {
+        // Perform the call with the latest service future.
+        return getService(containerId)
+          .then(defer(
+              self(),
+              &StorageLocalResourceProviderProcess::_call<rpc>,
+              lambda::_1,
+              request));
+      },
+      [=](const Try<csi::v0::Response<rpc>, StatusError>& result) mutable
+          -> Future<ControlFlow<csi::v0::Response<rpc>>> {
+        Option<Duration> backoff = retry
+          ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
+          : Option<Duration>::none();
+
+        maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+        // We dispatch `__call` for testing purpose.
+        return dispatch(
+            self(),
+            &StorageLocalResourceProviderProcess::__call<rpc>,
+            result,
+            backoff);
+      });
+}
+
+
+template <csi::v0::RPC rpc>
+Future<Try<csi::v0::Response<rpc>, StatusError>>
+StorageLocalResourceProviderProcess::_call(
+    csi::v0::Client client, const csi::v0::Request<rpc>& request)
+{
+  ++metrics.csi_plugin_rpcs_pending.at(rpc);
+
+  return client.call<rpc>(request)
+    .onAny(defer(self(), [=](
+        const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) {
+      --metrics.csi_plugin_rpcs_pending.at(rpc);
+      if (future.isReady() && future->isSome()) {
+        ++metrics.csi_plugin_rpcs_successes.at(rpc);
+      } else if (future.isDiscarded()) {
+        ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
+      } else {
+        ++metrics.csi_plugin_rpcs_errors.at(rpc);
+      }
+    }));
+}
+
+
+template <csi::v0::RPC rpc>
+Future<ControlFlow<csi::v0::Response<rpc>>>
+StorageLocalResourceProviderProcess::__call(
+    const Try<csi::v0::Response<rpc>, StatusError>& result,
+    const Option<Duration>& backoff)
+{
+  if (result.isSome()) {
+    return Break(result.get());
+  }
+
+  if (backoff.isNone()) {
+    return Failure(result.error());
+  }
+
+  // See the link below for retryable status codes:
+  // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
+  switch (result.error().status.error_code()) {
+    case grpc::DEADLINE_EXCEEDED:
+    case grpc::UNAVAILABLE: {
+      LOG(ERROR)
+        << "Received '" << result.error() << "' while calling " << rpc
+        << ". Retrying in " << backoff.get();
+
+      return after(backoff.get())
+        .then([]() -> Future<ControlFlow<csi::v0::Response<rpc>>> {
+          return Continue();
+        });
+    }
+    case grpc::CANCELLED:
+    case grpc::UNKNOWN:
+    case grpc::INVALID_ARGUMENT:
+    case grpc::NOT_FOUND:
+    case grpc::ALREADY_EXISTS:
+    case grpc::PERMISSION_DENIED:
+    case grpc::UNAUTHENTICATED:
+    case grpc::RESOURCE_EXHAUSTED:
+    case grpc::FAILED_PRECONDITION:
+    case grpc::ABORTED:
+    case grpc::OUT_OF_RANGE:
+    case grpc::UNIMPLEMENTED:
+    case grpc::INTERNAL:
+    case grpc::DATA_LOSS: {
+      return Failure(result.error());
+    }
+    case grpc::OK:
+    case grpc::DO_NOT_USE: {
+      UNREACHABLE();
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
 void StorageLocalResourceProviderProcess::initialize()
 {
   Try<string> _bootId = os::bootId();
@@ -1886,112 +1762,6 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
 }
 
 
-template <
-    csi::v0::RPC rpc,
-    typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
-Future<typename csi::v0::RPCTraits<rpc>::response_type>
-StorageLocalResourceProviderProcess::call(
-    const ContainerID& containerId,
-    const typename csi::v0::RPCTraits<rpc>::request_type& request,
-    bool retry)
-{
-  using Response = typename csi::v0::RPCTraits<rpc>::response_type;
-
-  Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
-
-  return loop(
-      self(),
-      [=] {
-        // Perform the call with the latest service future.
-        return getService(containerId)
-          .then(defer(
-              self(),
-              &StorageLocalResourceProviderProcess::_call<rpc>,
-              lambda::_1,
-              request));
-      },
-      [=](const Try<Response, StatusError>& result) mutable
-          -> Future<ControlFlow<Response>> {
-        if (result.isSome()) {
-          return Break(result.get());
-        }
-
-        if (retry) {
-          // See the link below for retryable status codes:
-          // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
-          switch (result.error().status.error_code()) {
-            case grpc::DEADLINE_EXCEEDED:
-            case grpc::UNAVAILABLE: {
-              Duration delay =
-                maxBackoff * (static_cast<double>(os::random()) / RAND_MAX);
-
-              maxBackoff =
-                std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
-
-              LOG(ERROR)
-                << "Received '" << result.error() << "' while calling " << rpc
-                << ". Retrying in " << delay;
-
-              return after(delay)
-                .then([]() -> Future<ControlFlow<Response>> {
-                  return Continue();
-                });
-            }
-            case grpc::CANCELLED:
-            case grpc::UNKNOWN:
-            case grpc::INVALID_ARGUMENT:
-            case grpc::NOT_FOUND:
-            case grpc::ALREADY_EXISTS:
-            case grpc::PERMISSION_DENIED:
-            case grpc::UNAUTHENTICATED:
-            case grpc::RESOURCE_EXHAUSTED:
-            case grpc::FAILED_PRECONDITION:
-            case grpc::ABORTED:
-            case grpc::OUT_OF_RANGE:
-            case grpc::UNIMPLEMENTED:
-            case grpc::INTERNAL:
-            case grpc::DATA_LOSS: {
-              break;
-            }
-            case grpc::OK:
-            case grpc::DO_NOT_USE: {
-              UNREACHABLE();
-            }
-          }
-        }
-
-        return Failure(result.error());
-      });
-}
-
-
-template <csi::v0::RPC rpc>
-Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
-StorageLocalResourceProviderProcess::_call(
-    csi::v0::Client client,
-    const typename csi::v0::RPCTraits<rpc>::request_type& request)
-{
-  using Response = typename csi::v0::RPCTraits<rpc>::response_type;
-
-  ++metrics.csi_plugin_rpcs_pending.at(rpc);
-
-  return client.call<rpc>(request)
-    .onAny(defer(self(), [=](const Future<Try<Response, StatusError>>& future) {
-      --metrics.csi_plugin_rpcs_pending.at(rpc);
-      if (future.isReady() && future->isSome()) {
-        ++metrics.csi_plugin_rpcs_successes.at(rpc);
-      } else if (future.isDiscarded()) {
-        ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
-      } else {
-        ++metrics.csi_plugin_rpcs_errors.at(rpc);
-      }
-    }));
-}
-
-
-// Returns a future of a CSI client that waits for the endpoint socket
-// to appear if necessary, then connects to the socket and check its
-// readiness.
 Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
     const string& endpoint)
 {
@@ -2036,9 +1806,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
 }
 
 
-// Returns a future of the latest CSI client for the specified plugin
-// container. If the container is not already running, this method will
-// start a new a new container daemon.
 Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
     const ContainerID& containerId)
 {
@@ -2189,9 +1956,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
 }
 
 
-// Lists all running plugin containers for this resource provider.
-// NOTE: This might return containers that are not actually running,
-// e.g., if they are being destroyed.
 Future<hashmap<ContainerID, Option<ContainerStatus>>>
 StorageLocalResourceProviderProcess::getContainers()
 {
@@ -2244,7 +2008,6 @@ StorageLocalResourceProviderProcess::getContainers()
 }
 
 
-// Waits for the specified plugin container to be terminated.
 Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
     const ContainerID& containerId)
 {
@@ -2271,7 +2034,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
 }
 
 
-// Kills the specified plugin container.
 Future<Nothing> StorageLocalResourceProviderProcess::killContainer(
     const ContainerID& containerId)
 {
@@ -2323,7 +2085,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
 }
 
 
-// NOTE: This can only be called after `prepareIdentityService`.
 Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 {
   CHECK_SOME(pluginInfo);
@@ -2364,8 +2125,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 }
 
 
-// NOTE: This can only be called after `prepareIdentityService` and
-// `prepareControllerService`.
 Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 {
   CHECK_SOME(nodeContainerId);
@@ -2394,11 +2153,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 }
 
 
-// Transitions the state of the specified volume from `CREATED` or
-// `CONTROLLER_PUBLISH` to `NODE_READY`.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     const string& volumeId)
 {
@@ -2447,11 +2201,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 }
 
 
-// Transitions the state of the specified volume from `NODE_READY`,
-// `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
     const string& volumeId)
 {
@@ -2500,10 +2249,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
 }
 
 
-// Transitions the state of the specified volume from `NODE_READY` or
-// `NODE_STAGE` to `VOL_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
     const string& volumeId)
 {
@@ -2564,10 +2309,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
 }
 
 
-// Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE`
-// or `NODE_UNSTAGE` to `NODE_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
     const string& volumeId)
 {
@@ -2624,10 +2365,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
 }
 
 
-// Transitions the state of the specified volume from `VOL_READY` or
-// `NODE_PUBLISH` to `PUBLISHED`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
     const string& volumeId)
 {
@@ -2691,10 +2428,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
 }
 
 
-// Transitions the state of the specified volume from `PUBLISHED`,
-// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
-//
-// NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     const string& volumeId)
 {
@@ -2747,9 +2480,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 }
 
 
-// Returns a CSI volume ID.
-//
-// NOTE: This can only be called after `prepareControllerService`.
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
@@ -2795,10 +2525,6 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 }
 
 
-// Returns true if the volume has been deprovisioned.
-//
-// NOTE: This can only be called after `prepareControllerService` and
-// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
 Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId)
 {
@@ -2898,12 +2624,6 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-// Validates if a volume supports the capability of the specified profile.
-//
-// NOTE: This can only be called after `prepareIdentityService`.
-//
-// TODO(chhsiao): Validate the volume against the parameters of the profile once
-// we get CSI v1.
 Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
     const string& volumeId,
     const Option<Labels>& metadata,
@@ -2937,6 +2657,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
     volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
   }
 
+  // TODO(chhsiao): Validate the volume against the parameters of the profile
+  // once we get CSI v1.
   csi::v0::ValidateVolumeCapabilitiesRequest request;
   request.set_volume_id(volumeId);
   *request.add_volume_capabilities() = profileInfo.capability;
@@ -2969,8 +2691,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
 }
 
 
-// NOTE: This can only be called after `prepareControllerService` and
-// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 {
   CHECK(info.has_id());
@@ -3019,8 +2739,6 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 }
 
 
-// NOTE: This can only be called after `prepareControllerService` and
-// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 {
   CHECK(info.has_id());
@@ -3062,8 +2780,6 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 }
 
 
-// Applies the operation. Speculative operations will be synchronously
-// applied. Do nothing if the operation is already in a terminal state.
 Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
     const id::UUID& operationUuid)
 {
@@ -3154,8 +2870,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
 }
 
 
-// Sends `OPERATION_DROPPED` without checkpointing the status of
-// the operation.
 void StorageLocalResourceProviderProcess::dropOperation(
     const id::UUID& operationUuid,
     const Option<FrameworkID>& frameworkId,
@@ -3382,8 +3096,6 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
 }
 
 
-// Synchronously updates `totalResources` and the operation status and
-// then asks the status update manager to send status updates.
 Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
     const id::UUID& operationUuid,
     const Try<vector<ResourceConversion>>& conversions)
diff --git a/src/resource_provider/storage/provider.hpp b/src/resource_provider/storage/provider.hpp
index 331f7b7..ccd09df 100644
--- a/src/resource_provider/storage/provider.hpp
+++ b/src/resource_provider/storage/provider.hpp
@@ -17,6 +17,17 @@
 #ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__
 #define __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__
 
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/error.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
 #include "resource_provider/local.hpp"
 
 namespace mesos {
@@ -32,15 +43,15 @@ public:
   static Try<process::Owned<LocalResourceProvider>> create(
       const process::http::URL& url,
       const std::string& workDir,
-      const mesos::ResourceProviderInfo& info,
+      const ResourceProviderInfo& info,
       const SlaveID& slaveId,
       const Option<std::string>& authToken,
       bool strict);
 
   static Try<process::http::authentication::Principal> principal(
-      const mesos::ResourceProviderInfo& info);
+      const ResourceProviderInfo& info);
 
-  static Option<Error> validate(const mesos::ResourceProviderInfo& info);
+  static Option<Error> validate(const ResourceProviderInfo& info);
 
   ~StorageLocalResourceProvider() override;
 
@@ -54,7 +65,7 @@ private:
   explicit StorageLocalResourceProvider(
       const process::http::URL& url,
       const std::string& workDir,
-      const mesos::ResourceProviderInfo& info,
+      const ResourceProviderInfo& info,
       const SlaveID& slaveId,
       const Option<std::string>& authToken,
       bool strict);
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
new file mode 100644
index 0000000..36187fb
--- /dev/null
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
+#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <mesos/http.hpp>
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <mesos/resource_provider/resource_provider.hpp>
+
+#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
+
+#include <mesos/v1/resource_provider.hpp>
+
+#include <process/future.hpp>
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+#include <process/loop.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/sequence.hpp>
+
+#include <process/metrics/counter.hpp>
+#include <process/metrics/push_gauge.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/duration.hpp>
+#include <stout/hashset.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "csi/client.hpp"
+#include "csi/rpc.hpp"
+#include "csi/state.hpp"
+#include "csi/utils.hpp"
+
+#include "slave/container_daemon.hpp"
+
+#include "status_update_manager/operation.hpp"
+
+namespace mesos {
+namespace internal {
+
+// Storage local resource provider initially picks a random amount of time
+// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
+// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
+// retries are exponentially backed off based on this interval (e.g., 2nd retry
+// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
+// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
+//
+// TODO(chhsiao): Make the retry parameters configurable.
+constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
+constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
+
+
+class StorageLocalResourceProviderProcess
+  : public process::Process<StorageLocalResourceProviderProcess>
+{
+public:
+  explicit StorageLocalResourceProviderProcess(
+      const process::http::URL& _url,
+      const std::string& _workDir,
+      const ResourceProviderInfo& _info,
+      const SlaveID& _slaveId,
+      const Option<std::string>& _authToken,
+      bool _strict);
+
+  StorageLocalResourceProviderProcess(
+      const StorageLocalResourceProviderProcess& other) = delete;
+
+  StorageLocalResourceProviderProcess& operator=(
+      const StorageLocalResourceProviderProcess& other) = delete;
+
+  void connected();
+  void disconnected();
+  void received(const resource_provider::Event& event);
+
+  // Wrapper functions to make CSI calls and update RPC metrics. Made public for
+  // testing purpose.
+  //
+  // The call is made asynchronously and thus no guarantee is provided on the
+  // order in which calls are sent. Callers need to either ensure to not have
+  // multiple conflicting calls in flight, or treat results idempotently.
+  //
+  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
+  // calls on the same volume, and 2) no profile update while there are ongoing
+  // `CREATE_DISK` or `DESTROY_DISK` operations.
+  //
+  // NOTE: Since this function uses `getService` to obtain the latest service
+  // future, which depends on probe results, it is disabled for making probe
+  // calls; `_call` should be used directly instead.
+  template <
+      csi::v0::RPC rpc,
+      typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
+  process::Future<csi::v0::Response<rpc>> call(
+      const ContainerID& containerId,
+      const csi::v0::Request<rpc>& request,
+      const bool retry = false); // remains const in a mutable lambda.
+
+  template <csi::v0::RPC rpc>
+  process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>>
+  _call(csi::v0::Client client, const csi::v0::Request<rpc>& request);
+
+  template <csi::v0::RPC rpc>
+  process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call(
+      const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result,
+      const Option<Duration>& backoff);
+
+private:
+  struct VolumeData
+  {
+    VolumeData(csi::state::VolumeState&& _state)
+      : state(_state), sequence(new process::Sequence("volume-sequence")) {}
+
+    csi::state::VolumeState state;
+
+    // We run all CSI operations for the same volume on a sequence to
+    // ensure that they are processed in a sequential order.
+    process::Owned<process::Sequence> sequence;
+  };
+
+  void initialize() override;
+  void fatal();
+
+  // The recover functions are responsible to recover the state of the
+  // resource provider and CSI volumes from checkpointed data.
+  process::Future<Nothing> recover();
+  process::Future<Nothing> recoverServices();
+  process::Future<Nothing> recoverVolumes();
+  process::Future<Nothing> recoverResourceProviderState();
+
+  void doReliableRegistration();
+
+  // The reconcile functions are responsible to reconcile the state of
+  // the resource provider from the recovered state and other sources of
+  // truth, such as CSI plugin responses or the status update manager.
+  process::Future<Nothing> reconcileResourceProviderState();
+  process::Future<Nothing> reconcileOperationStatuses();
+  ResourceConversion reconcileResources(
+      const Resources& checkpointed,
+      const Resources& discovered);
+
+  // Spawns a loop to watch for changes in the set of known profiles and update
+  // the profile mapping and storage pools accordingly.
+  void watchProfiles();
+
+  // Update the profile mapping when the set of known profiles changes.
+  // NOTE: This function never fails. If it fails to translate a new
+  // profile, the resource provider will continue to operate with the
+  // set of profiles it knows about.
+  process::Future<Nothing> updateProfiles(const hashset<std::string>& profiles);
+
+  // Reconcile the storage pools when the set of known profiles changes,
+  // or a volume with an unknown profile is destroyed.
+  process::Future<Nothing> reconcileStoragePools();
+
+  // Returns true if the storage pools are allowed to be reconciled when
+  // the operation is being applied.
+  static bool allowsReconciliation(const Offer::Operation& operation);
+
+  // Functions for received events.
+  void subscribed(const resource_provider::Event::Subscribed& subscribed);
+  void applyOperation(
+      const resource_provider::Event::ApplyOperation& operation);
+  void publishResources(
+      const resource_provider::Event::PublishResources& publish);
+  void acknowledgeOperationStatus(
+      const resource_provider::Event::AcknowledgeOperationStatus& acknowledge);
+  void reconcileOperations(
+      const resource_provider::Event::ReconcileOperations& reconcile);
+
+  // Returns a future of a CSI client that waits for the endpoint socket to
+  // appear if necessary, then connects to the socket and check its readiness.
+  process::Future<csi::v0::Client> waitService(const std::string& endpoint);
+
+  // Returns a future of the latest CSI client for the specified plugin
+  // container. If the container is not already running, this method will start
+  // a new a new container daemon.
+  process::Future<csi::v0::Client> getService(const ContainerID& containerId);
+
+  // Lists all running plugin containers for this resource provider.
+  // NOTE: This might return containers that are not actually running, e.g., if
+  // they are being destroyed.
+  process::Future<hashmap<ContainerID, Option<ContainerStatus>>>
+  getContainers();
+
+  // Waits for the specified plugin container to be terminated.
+  process::Future<Nothing> waitContainer(const ContainerID& containerId);
+
+  // Kills the specified plugin container.
+  process::Future<Nothing> killContainer(const ContainerID& containerId);
+
+  process::Future<Nothing> prepareIdentityService();
+
+  // NOTE: This can only be called after `prepareIdentityService`.
+  process::Future<Nothing> prepareControllerService();
+
+  // NOTE: This can only be called after `prepareIdentityService` and
+  // `prepareControllerService`.
+  process::Future<Nothing> prepareNodeService();
+
+  // Transitions the state of the specified volume from `CREATED` or
+  // `CONTROLLER_PUBLISH` to `NODE_READY`.
+  //
+  // NOTE: This can only be called after `prepareControllerService` and
+  // `prepareNodeService`.
+  process::Future<Nothing> controllerPublish(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `NODE_READY`,
+  // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
+  //
+  // NOTE: This can only be called after `prepareControllerService` and
+  // `prepareNodeService`.
+  process::Future<Nothing> controllerUnpublish(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `NODE_READY` or
+  // `NODE_STAGE` to `VOL_READY`.
+  //
+  // NOTE: This can only be called after `prepareNodeService`.
+  process::Future<Nothing> nodeStage(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `VOL_READY`,
+  // `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`.
+  //
+  // NOTE: This can only be called after `prepareNodeService`.
+  process::Future<Nothing> nodeUnstage(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `VOL_READY` or
+  // `NODE_PUBLISH` to `PUBLISHED`.
+  //
+  // NOTE: This can only be called after `prepareNodeService`.
+  process::Future<Nothing> nodePublish(const std::string& volumeId);
+
+  // Transitions the state of the specified volume from `PUBLISHED`,
+  // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
+  //
+  // NOTE: This can only be called after `prepareNodeService`.
+  process::Future<Nothing> nodeUnpublish(const std::string& volumeId);
+
+  // Returns a CSI volume ID.
+  //
+  // NOTE: This can only be called after `prepareControllerService`.
+  process::Future<std::string> createVolume(
+      const std::string& name,
+      const Bytes& capacity,
+      const DiskProfileAdaptor::ProfileInfo& profileInfo);
+
+  // Returns true if the volume has been deprovisioned.
+  //
+  // NOTE: This can only be called after `prepareControllerService` and
+  // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
+  process::Future<bool> deleteVolume(const std::string& volumeId);
+
+  // Validates if a volume supports the capability of the specified profile.
+  //
+  // NOTE: This can only be called after `prepareIdentityService`.
+  process::Future<Nothing> validateVolume(
+      const std::string& volumeId,
+      const Option<Labels>& metadata,
+      const DiskProfileAdaptor::ProfileInfo& profileInfo);
+
+  // NOTE: This can only be called after `prepareControllerService` and the
+  // resource provider ID has been obtained.
+  process::Future<Resources> listVolumes();
+
+  // NOTE: This can only be called after `prepareControllerService` and the
+  // resource provider ID has been obtained.
+  process::Future<Resources> getCapacities();
+
+  // Applies the operation. Speculative operations will be synchronously
+  // applied. Do nothing if the operation is already in a terminal state.
+  process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
+
+  // Sends `OPERATION_DROPPED` without checkpointing the operation status.
+  void dropOperation(
+      const id::UUID& operationUuid,
+      const Option<FrameworkID>& frameworkId,
+      const Option<Offer::Operation>& operation,
+      const std::string& message);
+
+  process::Future<std::vector<ResourceConversion>> applyCreateDisk(
+      const Resource& resource,
+      const id::UUID& operationUuid,
+      const Resource::DiskInfo::Source::Type& targetType,
+      const Option<std::string>& targetProfile);
+  process::Future<std::vector<ResourceConversion>> applyDestroyDisk(
+      const Resource& resource);
+
+  // Synchronously updates `totalResources` and the operation status and
+  // then asks the status update manager to send status updates.
+  Try<Nothing> updateOperationStatus(
+      const id::UUID& operationUuid,
+      const Try<std::vector<ResourceConversion>>& conversions);
+
+  void garbageCollectOperationPath(const id::UUID& operationUuid);
+
+  void checkpointResourceProviderState();
+  void checkpointVolumeState(const std::string& volumeId);
+
+  void sendResourceProviderStateUpdate();
+
+  // NOTE: This is a callback for the status update manager and should
+  // not be called directly.
+  void sendOperationStatusUpdate(
+      const UpdateOperationStatusMessage& update);
+
+  enum State
+  {
+    RECOVERING,
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED,
+    READY
+  } state;
+
+  const process::http::URL url;
+  const std::string workDir;
+  const std::string metaDir;
+  const ContentType contentType;
+  ResourceProviderInfo info;
+  const std::string vendor;
+  const SlaveID slaveId;
+  const Option<std::string> authToken;
+  const bool strict;
+
+  std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
+
+  std::string bootId;
+  process::grpc::client::Runtime runtime;
+  process::Owned<v1::resource_provider::Driver> driver;
+  OperationStatusUpdateManager statusUpdateManager;
+
+  // The mapping of known profiles fetched from the DiskProfileAdaptor.
+  hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
+
+  hashmap<ContainerID, process::Owned<slave::ContainerDaemon>> daemons;
+  hashmap<ContainerID, process::Owned<process::Promise<csi::v0::Client>>>
+    services;
+
+  Option<ContainerID> nodeContainerId;
+  Option<ContainerID> controllerContainerId;
+  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
+  csi::v0::PluginCapabilities pluginCapabilities;
+  csi::v0::ControllerCapabilities controllerCapabilities;
+  csi::v0::NodeCapabilities nodeCapabilities;
+  Option<std::string> nodeId;
+
+  // We maintain the following invariant: if one operation depends on
+  // another, they cannot be in PENDING state at the same time, i.e.,
+  // the result of the preceding operation must have been reflected in
+  // the total resources.
+  //
+  // NOTE: We store the list of operations in a `LinkedHashMap` to
+  // preserve the order we receive the operations in case we need it.
+  LinkedHashMap<id::UUID, Operation> operations;
+  Resources totalResources;
+  id::UUID resourceVersion;
+  hashmap<std::string, VolumeData> volumes;
+
+  // If pending, it means that the storage pools are being reconciled, and all
+  // incoming operations that disallow reconciliation will be dropped.
+  process::Future<Nothing> reconciled;
+
+  // We maintain a sequence to coordinate reconciliations of storage pools. It
+  // keeps track of pending operations that disallow reconciliation, and ensures
+  // that any reconciliation waits for these operations to finish.
+  process::Sequence sequence;
+
+  struct Metrics
+  {
+    explicit Metrics(const std::string& prefix);
+    ~Metrics();
+
+    // CSI plugin metrics.
+    process::metrics::Counter csi_plugin_container_terminations;
+    hashmap<csi::v0::RPC, process::metrics::PushGauge> csi_plugin_rpcs_pending;
+    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_successes;
+    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_errors;
+    hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_cancelled;
+
+    // Operation state metrics.
+    hashmap<Offer::Operation::Type, process::metrics::PushGauge>
+      operations_pending;
+    hashmap<Offer::Operation::Type, process::metrics::Counter>
+      operations_finished;
+    hashmap<Offer::Operation::Type, process::metrics::Counter>
+      operations_failed;
+    hashmap<Offer::Operation::Type, process::metrics::Counter>
+      operations_dropped;
+  } metrics;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index 7751636..c8f3f04 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -63,7 +63,7 @@ struct RPCParam
       rpc,
       [](csi::v0::Client client) {
         return client
-          .call<rpc>(typename csi::v0::RPCTraits<rpc>::request_type())
+          .call<rpc>(csi::v0::Request<rpc>())
           .then([] { return Nothing(); });
       }
     };


[mesos] 05/06: Improved error printing for gRPC statuses.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 80c72a1b895ea1fbca6a977d0599033d27d80c75
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 22:55:14 2019 -0800

    Improved error printing for gRPC statuses.
    
    This patch prepends the error message of a `process::grpc::StatusError`
    with its status code.
    
    Review: https://reviews.apache.org/r/69813
---
 3rdparty/libprocess/include/process/grpc.hpp | 14 +++++++-
 3rdparty/libprocess/src/grpc.cpp             | 53 ++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 28c9cad..f5236c4 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -15,6 +15,7 @@
 
 #include <chrono>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <thread>
 #include <type_traits>
@@ -34,6 +35,7 @@
 #include <stout/error.hpp>
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
+#include <stout/stringify.hpp>
 #include <stout/try.hpp>
 
 
@@ -47,6 +49,13 @@
 #define GRPC_CLIENT_METHOD(service, rpc) \
   (&service::Stub::PrepareAsync##rpc)
 
+namespace grpc {
+
+std::ostream& operator<<(std::ostream& stream, StatusCode statusCode);
+
+} // namespace grpc {
+
+
 namespace process {
 namespace grpc {
 
@@ -58,7 +67,10 @@ class StatusError : public Error
 {
 public:
   StatusError(::grpc::Status _status)
-    : Error(_status.error_message()), status(std::move(_status))
+    : Error(stringify(_status.error_code()) +
+            (_status.error_message().empty()
+               ? "" : ": " + _status.error_message())),
+      status(std::move(_status))
   {
     CHECK(!status.ok());
   }
diff --git a/3rdparty/libprocess/src/grpc.cpp b/3rdparty/libprocess/src/grpc.cpp
index 4e4f989..14017c5 100644
--- a/3rdparty/libprocess/src/grpc.cpp
+++ b/3rdparty/libprocess/src/grpc.cpp
@@ -16,6 +16,59 @@
 #include <process/id.hpp>
 #include <process/process.hpp>
 
+#include <stout/unreachable.hpp>
+
+using std::ostream;
+
+namespace grpc {
+
+ostream& operator<<(ostream& stream, StatusCode statusCode)
+{
+  switch (statusCode) {
+    case OK:
+      return stream << "OK";
+    case CANCELLED:
+      return stream << "CANCELLED";
+    case UNKNOWN:
+      return stream << "UNKNOWN";
+    case INVALID_ARGUMENT:
+      return stream << "INVALID_ARGUMENT";
+    case DEADLINE_EXCEEDED:
+      return stream << "DEADLINE_EXCEEDED";
+    case NOT_FOUND:
+      return stream << "NOT_FOUND";
+    case ALREADY_EXISTS:
+      return stream << "ALREADY_EXISTS";
+    case PERMISSION_DENIED:
+      return stream << "PERMISSION_DENIED";
+    case UNAUTHENTICATED:
+      return stream << "UNAUTHENTICATED";
+    case RESOURCE_EXHAUSTED:
+      return stream << "RESOURCE_EXHAUSTED";
+    case FAILED_PRECONDITION:
+      return stream << "FAILED_PRECONDITION";
+    case ABORTED:
+      return stream << "ABORTED";
+    case OUT_OF_RANGE:
+      return stream << "OUT_OF_RANGE";
+    case UNIMPLEMENTED:
+      return stream << "UNIMPLEMENTED";
+    case INTERNAL:
+      return stream << "INTERNAL";
+    case UNAVAILABLE:
+      return stream << "UNAVAILABLE";
+    case DATA_LOSS:
+      return stream << "DATA_LOSS";
+    case DO_NOT_USE:
+      return stream << "DO_NOT_USE";
+  }
+
+  UNREACHABLE();
+}
+
+} // namespace grpc {
+
+
 namespace process {
 namespace grpc {
 namespace client {


[mesos] 03/06: Implemented the RPC retry logic for SLRP.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7c7a874121413453294a4f0b4f06a08115b1b202
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 23:00:43 2019 -0800

    Implemented the RPC retry logic for SLRP.
    
    For CSI calls triggered by offer operations, i.e., `CreateVolume` and
    `DeleteVolume`, if the plugin returns retryable errors (`UNAVAILABLE` or
    `DEADLINE_EXCEEDED`), SLRP will now retry indefinitely with a random
    exponential backoff. With this, frameworks will know that the operations
    are terminated with deterministic volume states when getting
    `OPERATION_FAILED`.
    
    Review: https://reviews.apache.org/r/69812
---
 src/csi/client.cpp                         | 292 ++++++++++-------------------
 src/csi/client.hpp                         |  85 ++++-----
 src/resource_provider/storage/provider.cpp | 134 ++++++++++---
 3 files changed, 254 insertions(+), 257 deletions(-)

diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index 61ed410..9e17f5b 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -30,308 +30,222 @@ namespace csi {
 namespace v0 {
 
 template <>
-Future<GetPluginInfoResponse>
-Client::call<GET_PLUGIN_INFO>(
-    GetPluginInfoRequest request)
+Future<Try<GetPluginInfoResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Identity, GetPluginInfo),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<GetPluginInfoResponse, StatusError>& result)
-        -> Future<GetPluginInfoResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Identity, GetPluginInfo),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<GetPluginCapabilitiesResponse>
+Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<GET_PLUGIN_CAPABILITIES>(
     GetPluginCapabilitiesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result)
-        -> Future<GetPluginCapabilitiesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ProbeResponse>
+Future<Try<ProbeResponse, process::grpc::StatusError>>
 Client::call<PROBE>(
     ProbeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Identity, Probe),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ProbeResponse, StatusError>& result)
-        -> Future<ProbeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Identity, Probe),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<CreateVolumeResponse>
+Future<Try<CreateVolumeResponse, process::grpc::StatusError>>
 Client::call<CREATE_VOLUME>(
     CreateVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, CreateVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<CreateVolumeResponse, StatusError>& result)
-        -> Future<CreateVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, CreateVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<DeleteVolumeResponse>
+Future<Try<DeleteVolumeResponse, process::grpc::StatusError>>
 Client::call<DELETE_VOLUME>(
     DeleteVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, DeleteVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<DeleteVolumeResponse, StatusError>& result)
-        -> Future<DeleteVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, DeleteVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ControllerPublishVolumeResponse>
+Future<Try<ControllerPublishVolumeResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_PUBLISH_VOLUME>(
     ControllerPublishVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result)
-        -> Future<ControllerPublishVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ControllerUnpublishVolumeResponse>
+Future<Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
     ControllerUnpublishVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result)
-        -> Future<ControllerUnpublishVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ValidateVolumeCapabilitiesResponse>
+Future<Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<VALIDATE_VOLUME_CAPABILITIES>(
     ValidateVolumeCapabilitiesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result)
-        -> Future<ValidateVolumeCapabilitiesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ListVolumesResponse>
+Future<Try<ListVolumesResponse, process::grpc::StatusError>>
 Client::call<LIST_VOLUMES>(
     ListVolumesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ListVolumes),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ListVolumesResponse, StatusError>& result)
-        -> Future<ListVolumesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ListVolumes),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<GetCapacityResponse>
+Future<Try<GetCapacityResponse, process::grpc::StatusError>>
 Client::call<GET_CAPACITY>(
     GetCapacityRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, GetCapacity),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<GetCapacityResponse, StatusError>& result)
-        -> Future<GetCapacityResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, GetCapacity),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ControllerGetCapabilitiesResponse>
+Future<Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_GET_CAPABILITIES>(
     ControllerGetCapabilitiesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result)
-        -> Future<ControllerGetCapabilitiesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeStageVolumeResponse>
+Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>>
 Client::call<NODE_STAGE_VOLUME>(
     NodeStageVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeStageVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeStageVolumeResponse, StatusError>& result)
-        -> Future<NodeStageVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeStageVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeUnstageVolumeResponse>
+Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>>
 Client::call<NODE_UNSTAGE_VOLUME>(
     NodeUnstageVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeUnstageVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result)
-        -> Future<NodeUnstageVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeUnstageVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodePublishVolumeResponse>
+Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>>
 Client::call<NODE_PUBLISH_VOLUME>(
     NodePublishVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodePublishVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodePublishVolumeResponse, StatusError>& result)
-        -> Future<NodePublishVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodePublishVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeUnpublishVolumeResponse>
+Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>>
 Client::call<NODE_UNPUBLISH_VOLUME>(
     NodeUnpublishVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result)
-        -> Future<NodeUnpublishVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeGetIdResponse>
+Future<Try<NodeGetIdResponse, process::grpc::StatusError>>
 Client::call<NODE_GET_ID>(
     NodeGetIdRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeGetId),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeGetIdResponse, StatusError>& result)
-        -> Future<NodeGetIdResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeGetId),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeGetCapabilitiesResponse>
+Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<NODE_GET_CAPABILITIES>(
     NodeGetCapabilitiesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeGetCapabilities),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result)
-        -> Future<NodeGetCapabilitiesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeGetCapabilities),
+      std::move(request),
+      CallOptions());
 }
 
 } // namespace v0 {
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 5d40d54..a0dfedf 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -35,8 +35,9 @@ public:
     : connection(_connection), runtime(_runtime) {}
 
   template <RPC rpc>
-  process::Future<typename RPCTraits<rpc>::response_type> call(
-      typename RPCTraits<rpc>::request_type request);
+  process::Future<
+      Try<typename RPCTraits<rpc>::response_type, process::grpc::StatusError>>
+  call(typename RPCTraits<rpc>::request_type request);
 
 private:
   process::grpc::client::Connection connection;
@@ -45,105 +46,95 @@ private:
 
 
 template <>
-process::Future<GetPluginInfoResponse>
-Client::call<GET_PLUGIN_INFO>(
-    GetPluginInfoRequest request);
+process::Future<Try<GetPluginInfoResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request);
 
 
 template <>
-process::Future<GetPluginCapabilitiesResponse>
-Client::call<GET_PLUGIN_CAPABILITIES>(
-    GetPluginCapabilitiesRequest request);
+process::Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_CAPABILITIES>(GetPluginCapabilitiesRequest request);
 
 
 template <>
-process::Future<ProbeResponse>
-Client::call<PROBE>(
-    ProbeRequest request);
+process::Future<Try<ProbeResponse, process::grpc::StatusError>>
+Client::call<PROBE>(ProbeRequest request);
 
 
 template <>
-process::Future<CreateVolumeResponse>
-Client::call<CREATE_VOLUME>(
-    CreateVolumeRequest request);
+process::Future<Try<CreateVolumeResponse, process::grpc::StatusError>>
+Client::call<CREATE_VOLUME>(CreateVolumeRequest request);
 
 
 template <>
-process::Future<DeleteVolumeResponse>
-Client::call<DELETE_VOLUME>(
-    DeleteVolumeRequest request);
+process::Future<Try<DeleteVolumeResponse, process::grpc::StatusError>>
+Client::call<DELETE_VOLUME>(DeleteVolumeRequest request);
 
 
 template <>
-process::Future<ControllerPublishVolumeResponse>
-Client::call<CONTROLLER_PUBLISH_VOLUME>(
-    ControllerPublishVolumeRequest request);
+process::Future<
+    Try<ControllerPublishVolumeResponse, process::grpc::StatusError>>
+Client::call<CONTROLLER_PUBLISH_VOLUME>(ControllerPublishVolumeRequest request);
 
 
 template <>
-process::Future<ControllerUnpublishVolumeResponse>
+process::Future<
+    Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
     ControllerUnpublishVolumeRequest request);
 
 
 template <>
-process::Future<ValidateVolumeCapabilitiesResponse>
+process::Future<
+    Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<VALIDATE_VOLUME_CAPABILITIES>(
     ValidateVolumeCapabilitiesRequest request);
 
 
 template <>
-process::Future<ListVolumesResponse>
-Client::call<LIST_VOLUMES>(
-    ListVolumesRequest request);
+process::Future<Try<ListVolumesResponse, process::grpc::StatusError>>
+Client::call<LIST_VOLUMES>(ListVolumesRequest request);
 
 
 template <>
-process::Future<GetCapacityResponse>
-Client::call<GET_CAPACITY>(
-    GetCapacityRequest request);
+process::Future<Try<GetCapacityResponse, process::grpc::StatusError>>
+Client::call<GET_CAPACITY>(GetCapacityRequest request);
 
 
 template <>
-process::Future<ControllerGetCapabilitiesResponse>
+process::Future<
+    Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_GET_CAPABILITIES>(
     ControllerGetCapabilitiesRequest request);
 
 
 template <>
-process::Future<NodeStageVolumeResponse>
-Client::call<NODE_STAGE_VOLUME>(
-    NodeStageVolumeRequest request);
+process::Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_STAGE_VOLUME>(NodeStageVolumeRequest request);
 
 
 template <>
-process::Future<NodeUnstageVolumeResponse>
-Client::call<NODE_UNSTAGE_VOLUME>(
-    NodeUnstageVolumeRequest request);
+process::Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_UNSTAGE_VOLUME>(NodeUnstageVolumeRequest request);
 
 
 template <>
-process::Future<NodePublishVolumeResponse>
-Client::call<NODE_PUBLISH_VOLUME>(
-    NodePublishVolumeRequest request);
+process::Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_PUBLISH_VOLUME>(NodePublishVolumeRequest request);
 
 
 template <>
-process::Future<NodeUnpublishVolumeResponse>
-Client::call<NODE_UNPUBLISH_VOLUME>(
-    NodeUnpublishVolumeRequest request);
+process::Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_UNPUBLISH_VOLUME>(NodeUnpublishVolumeRequest request);
 
 
 template <>
-process::Future<NodeGetIdResponse>
-Client::call<NODE_GET_ID>(
-    NodeGetIdRequest request);
+process::Future<Try<NodeGetIdResponse, process::grpc::StatusError>>
+Client::call<NODE_GET_ID>(NodeGetIdRequest request);
 
 
 template <>
-process::Future<NodeGetCapabilitiesResponse>
-Client::call<NODE_GET_CAPABILITIES>(
-    NodeGetCapabilitiesRequest request);
+process::Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>>
+Client::call<NODE_GET_CAPABILITIES>(NodeGetCapabilitiesRequest request);
 
 } // namespace v0 {
 } // namespace csi {
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f9f9312..1abb9c8 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -18,6 +18,7 @@
 
 #include <algorithm>
 #include <cctype>
+#include <cstdlib>
 #include <memory>
 #include <numeric>
 #include <utility>
@@ -48,6 +49,7 @@
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
@@ -55,6 +57,7 @@
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
 
 #include <stout/os/realpath.hpp>
 
@@ -109,6 +112,8 @@ using process::Sequence;
 using process::spawn;
 using process::Timeout;
 
+using process::grpc::StatusError;
+
 using process::http::authentication::Principal;
 
 using process::metrics::Counter;
@@ -130,6 +135,23 @@ using mesos::v1::resource_provider::Driver;
 namespace mesos {
 namespace internal {
 
+// Timeout for a CSI plugin component to create its endpoint socket.
+//
+// TODO(chhsiao): Make the timeout configurable.
+constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
+
+// Storage local resource provider initially picks a random amount of time
+// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
+// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
+// retries are exponentially backed off based on this interval (e.g., 2nd retry
+// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
+// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
+//
+// TODO(chhsiao): Make the retry parameters configurable.
+constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
+constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
+
+
 // Returns true if the string is a valid Java identifier.
 static bool isValidName(const string& s)
 {
@@ -164,11 +186,6 @@ static bool isValidType(const string& s)
 }
 
 
-// Timeout for a CSI plugin component to create its endpoint socket.
-// TODO(chhsiao): Make the timeout configurable.
-static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
-
-
 // Returns the container ID of the standalone container to run a CSI plugin
 // component. The container ID is of the following format:
 //     <cid_prefix><csi_type>-<csi_name>--<list_of_services>
@@ -408,10 +425,12 @@ private:
       typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
   Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
       const ContainerID& containerId,
-      const typename csi::v0::RPCTraits<rpc>::request_type& request);
+      const typename csi::v0::RPCTraits<rpc>::request_type& request,
+      bool retry = false);
 
   template <csi::v0::RPC rpc>
-  Future<typename csi::v0::RPCTraits<rpc>::response_type> _call(
+  Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
+  _call(
       csi::v0::Client client,
       const typename csi::v0::RPCTraits<rpc>::request_type& request);
 
@@ -1873,32 +1892,98 @@ template <
 Future<typename csi::v0::RPCTraits<rpc>::response_type>
 StorageLocalResourceProviderProcess::call(
     const ContainerID& containerId,
-    const typename csi::v0::RPCTraits<rpc>::request_type& request)
+    const typename csi::v0::RPCTraits<rpc>::request_type& request,
+    bool retry)
 {
-  // Get the latest service future before making the call.
-  return getService(containerId)
-    .then(defer(self(), &Self::_call<rpc>, lambda::_1, request));
+  using Response = typename csi::v0::RPCTraits<rpc>::response_type;
+
+  Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+  return loop(
+      self(),
+      [=] {
+        // Perform the call with the latest service future.
+        return getService(containerId)
+          .then(defer(
+              self(),
+              &StorageLocalResourceProviderProcess::_call<rpc>,
+              lambda::_1,
+              request));
+      },
+      [=](const Try<Response, StatusError>& result) mutable
+          -> Future<ControlFlow<Response>> {
+        if (result.isSome()) {
+          return Break(result.get());
+        }
+
+        if (retry) {
+          // See the link below for retryable status codes:
+          // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
+          switch (result.error().status.error_code()) {
+            case grpc::DEADLINE_EXCEEDED:
+            case grpc::UNAVAILABLE: {
+              Duration delay =
+                maxBackoff * (static_cast<double>(os::random()) / RAND_MAX);
+
+              maxBackoff =
+                std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+              LOG(ERROR)
+                << "Received '" << result.error() << "' while calling " << rpc
+                << ". Retrying in " << delay;
+
+              return after(delay)
+                .then([]() -> Future<ControlFlow<Response>> {
+                  return Continue();
+                });
+            }
+            case grpc::CANCELLED:
+            case grpc::UNKNOWN:
+            case grpc::INVALID_ARGUMENT:
+            case grpc::NOT_FOUND:
+            case grpc::ALREADY_EXISTS:
+            case grpc::PERMISSION_DENIED:
+            case grpc::UNAUTHENTICATED:
+            case grpc::RESOURCE_EXHAUSTED:
+            case grpc::FAILED_PRECONDITION:
+            case grpc::ABORTED:
+            case grpc::OUT_OF_RANGE:
+            case grpc::UNIMPLEMENTED:
+            case grpc::INTERNAL:
+            case grpc::DATA_LOSS: {
+              break;
+            }
+            case grpc::OK:
+            case grpc::DO_NOT_USE: {
+              UNREACHABLE();
+            }
+          }
+        }
+
+        return Failure(result.error());
+      });
 }
 
 
 template <csi::v0::RPC rpc>
-Future<typename csi::v0::RPCTraits<rpc>::response_type>
+Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
 StorageLocalResourceProviderProcess::_call(
     csi::v0::Client client,
     const typename csi::v0::RPCTraits<rpc>::request_type& request)
 {
+  using Response = typename csi::v0::RPCTraits<rpc>::response_type;
+
   ++metrics.csi_plugin_rpcs_pending.at(rpc);
 
   return client.call<rpc>(request)
-    .onAny(defer(self(), [=](
-        const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) {
+    .onAny(defer(self(), [=](const Future<Try<Response, StatusError>>& future) {
       --metrics.csi_plugin_rpcs_pending.at(rpc);
-      if (future.isReady()) {
+      if (future.isReady() && future->isSome()) {
         ++metrics.csi_plugin_rpcs_successes.at(rpc);
-      } else if (future.isFailed()) {
-        ++metrics.csi_plugin_rpcs_errors.at(rpc);
-      } else {
+      } else if (future.isDiscarded()) {
         ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
+      } else {
+        ++metrics.csi_plugin_rpcs_errors.at(rpc);
       }
     }));
 }
@@ -1939,7 +2024,14 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
   return service
     .then(defer(self(), [=](csi::v0::Client client) {
       return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
-        .then([=]() -> csi::v0::Client { return client; });
+        .then([=](const Try<csi::v0::ProbeResponse, StatusError>& result)
+            -> Future<csi::v0::Client> {
+          if (result.isError()) {
+            return Failure(result.error());
+          }
+
+          return client;
+        });
     }));
 }
 
@@ -2678,7 +2770,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
   CHECK_SOME(controllerContainerId);
 
   return call<csi::v0::CREATE_VOLUME>(
-      controllerContainerId.get(), std::move(request))
+      controllerContainerId.get(), std::move(request), true) // Retry.
     .then(defer(self(), [=](
         const csi::v0::CreateVolumeResponse& response) -> string {
       const csi::v0::Volume& volume = response.volume();
@@ -2770,7 +2862,7 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
             CHECK_SOME(controllerContainerId);
 
             return call<csi::v0::DELETE_VOLUME>(
-                controllerContainerId.get(), std::move(request))
+                controllerContainerId.get(), std::move(request), true) // Retry.
               .then([] { return Nothing(); });
           }));
       }


[mesos] 01/06: Made starting and stopping on CSI plugin containers more verbose.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 91cf91c1f26a3fb0d7c1bded9ce47f017bf3bab5
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Thu Jan 24 10:04:51 2019 +0100

    Made starting and stopping on CSI plugin containers more verbose.
    
    This patch adds some additional logging so it becomes easier to follow
    the lifecycle of CSI plugins. The container daemon already logged some
    related information, but didn't directly call out that it was working
    with CSI plugin containers.
    
    Review: https://reviews.apache.org/r/69606/
---
 src/resource_provider/storage/provider.cpp | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 111a7b9..f7a8634 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -2013,6 +2013,11 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
       config->resources(),
       containerInfo,
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
+        LOG(INFO)
+          << "CSI plugin container '" << containerId << "' started for plugin"
+          << " type '" << info.storage().plugin().type() << "' and "
+          << " name '" << info.storage().plugin().name() << "'";
+
         CHECK(services.at(containerId)->associate(connect(endpointPath)));
         return services.at(containerId)->future()
           .then([] { return Nothing(); });
@@ -2023,6 +2028,11 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
         services.at(containerId)->discard();
         services.at(containerId).reset(new Promise<csi::v0::Client>());
 
+        LOG(INFO)
+          << "CSI plugin container '" << containerId << "' stopped for plugin"
+          << " type '" << info.storage().plugin().type() << "' and "
+          << " name '" << info.storage().plugin().name() << "'";
+
         if (os::exists(endpointPath)) {
           Try<Nothing> rm = os::rm(endpointPath);
           if (rm.isError()) {


[mesos] 02/06: Preliminary SLRP refactoring for RPC retry.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 994f2089da451981f2e5a236b2a69965d0e745fc
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 20:18:06 2019 -0800

    Preliminary SLRP refactoring for RPC retry.
    
    This patch refactors the `StorageLocalResourceProvider::call` function
    to obtain the latest service future through `getService` before making
    the actual RPC call. The subsequent patch would utilize this to support
    RPC retry across plugin restarts.
    
    Review: https://reviews.apache.org/r/69811
---
 src/resource_provider/storage/provider.cpp | 821 ++++++++++++++---------------
 1 file changed, 396 insertions(+), 425 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f7a8634..f9f9312 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -390,12 +390,32 @@ private:
   void reconcileOperations(
       const Event::ReconcileOperations& reconcile);
 
-  template <csi::v0::RPC rpc>
+  // Wrapper functions to make CSI calls and update RPC metrics.
+  //
+  // The call is made asynchronously and thus no guarantee is provided on the
+  // order in which calls are sent. Callers need to either ensure to not have
+  // multiple conflicting calls in flight, or treat results idempotently.
+  //
+  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
+  // calls on the same volume, and 2) no profile update while there are ongoing
+  // `CREATE_DISK` or `DESTROY_DISK` operations.
+  //
+  // NOTE: Since this function uses `getService` to obtain the latest service
+  // future, which depends on probe results, it is disabled for making probe
+  // calls; `_call` should be used directly instead.
+  template <
+      csi::v0::RPC rpc,
+      typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
   Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
+      const ContainerID& containerId,
+      const typename csi::v0::RPCTraits<rpc>::request_type& request);
+
+  template <csi::v0::RPC rpc>
+  Future<typename csi::v0::RPCTraits<rpc>::response_type> _call(
       csi::v0::Client client,
-      typename csi::v0::RPCTraits<rpc>::request_type&& request);
+      const typename csi::v0::RPCTraits<rpc>::request_type& request);
 
-  Future<csi::v0::Client> connect(const string& endpoint);
+  Future<csi::v0::Client> waitService(const string& endpoint);
   Future<csi::v0::Client> getService(const ContainerID& containerId);
   Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
   Future<Nothing> waitContainer(const ContainerID& containerId);
@@ -1847,15 +1867,29 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
 }
 
 
-template <csi::v0::RPC rpc>
+template <
+    csi::v0::RPC rpc,
+    typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
 Future<typename csi::v0::RPCTraits<rpc>::response_type>
 StorageLocalResourceProviderProcess::call(
+    const ContainerID& containerId,
+    const typename csi::v0::RPCTraits<rpc>::request_type& request)
+{
+  // Get the latest service future before making the call.
+  return getService(containerId)
+    .then(defer(self(), &Self::_call<rpc>, lambda::_1, request));
+}
+
+
+template <csi::v0::RPC rpc>
+Future<typename csi::v0::RPCTraits<rpc>::response_type>
+StorageLocalResourceProviderProcess::_call(
     csi::v0::Client client,
-    typename csi::v0::RPCTraits<rpc>::request_type&& request)
+    const typename csi::v0::RPCTraits<rpc>::request_type& request)
 {
   ++metrics.csi_plugin_rpcs_pending.at(rpc);
 
-  return client.call<rpc>(std::move(request))
+  return client.call<rpc>(request)
     .onAny(defer(self(), [=](
         const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) {
       --metrics.csi_plugin_rpcs_pending.at(rpc);
@@ -1873,18 +1907,18 @@ StorageLocalResourceProviderProcess::call(
 // Returns a future of a CSI client that waits for the endpoint socket
 // to appear if necessary, then connects to the socket and check its
 // readiness.
-Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
     const string& endpoint)
 {
-  Future<csi::v0::Client> future;
+  Future<csi::v0::Client> service;
 
   if (os::exists(endpoint)) {
-    future = csi::v0::Client("unix://" + endpoint, runtime);
+    service = csi::v0::Client("unix://" + endpoint, runtime);
   } else {
     // Wait for the endpoint socket to appear until the timeout expires.
     Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
 
-    future = loop(
+    service = loop(
         self(),
         [=]() -> Future<Nothing> {
           if (timeout.expired()) {
@@ -1902,13 +1936,10 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
         });
   }
 
-  return future
+  return service
     .then(defer(self(), [=](csi::v0::Client client) {
-      return call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
-        .then(defer(self(), [=](
-            const csi::v0::ProbeResponse& response) -> csi::v0::Client {
-          return client;
-        }));
+      return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
+        .then([=]() -> csi::v0::Client { return client; });
     }));
 }
 
@@ -2018,7 +2049,7 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
           << " type '" << info.storage().plugin().type() << "' and "
           << " name '" << info.storage().plugin().name() << "'";
 
-        CHECK(services.at(containerId)->associate(connect(endpointPath)));
+        CHECK(services.at(containerId)->associate(waitService(endpointPath)));
         return services.at(containerId)->future()
           .then([] { return Nothing(); });
       })),
@@ -2179,31 +2210,23 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
 {
   CHECK_SOME(nodeContainerId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      // Get the plugin info.
-      return call<csi::v0::GET_PLUGIN_INFO>(
-          client, csi::v0::GetPluginInfoRequest())
-        .then(defer(self(), [=](
-            const csi::v0::GetPluginInfoResponse& response) {
-          pluginInfo = response;
+  // Get the plugin info.
+  return call<csi::v0::GET_PLUGIN_INFO>(
+      nodeContainerId.get(), csi::v0::GetPluginInfoRequest())
+    .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
+      pluginInfo = response;
 
-          LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
+      LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
 
-          // Get the latest service future before proceeding to the next step.
-          return getService(nodeContainerId.get());
-        }));
-    }))
-    .then(defer(self(), [=](csi::v0::Client client) {
       // Get the plugin capabilities.
       return call<csi::v0::GET_PLUGIN_CAPABILITIES>(
-          client, csi::v0::GetPluginCapabilitiesRequest())
-        .then(defer(self(), [=](
-            const csi::v0::GetPluginCapabilitiesResponse& response) {
-          pluginCapabilities = response.capabilities();
+          nodeContainerId.get(), csi::v0::GetPluginCapabilitiesRequest());
+    }))
+    .then(defer(self(), [=](
+        const csi::v0::GetPluginCapabilitiesResponse& response) {
+      pluginCapabilities = response.capabilities();
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
@@ -2222,36 +2245,29 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
         stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
   }
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      // Get the controller plugin info and check for consistency.
-      return call<csi::v0::GET_PLUGIN_INFO>(
-          client, csi::v0::GetPluginInfoRequest())
-        .then(defer(self(), [=](
-            const csi::v0::GetPluginInfoResponse& response) {
-          LOG(INFO) << "Controller plugin loaded: " << stringify(response);
-
-          if (pluginInfo->name() != response.name() ||
-              pluginInfo->vendor_version() != response.vendor_version()) {
-            LOG(WARNING)
-              << "Inconsistent controller and node plugin components. Please "
-                 "check with the plugin vendor to ensure compatibility.";
-          }
+  // Get the controller plugin info and check for consistency.
+  return call<csi::v0::GET_PLUGIN_INFO>(
+      controllerContainerId.get(), csi::v0::GetPluginInfoRequest())
+    .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
+      LOG(INFO) << "Controller plugin loaded: " << stringify(response);
+
+      if (pluginInfo->name() != response.name() ||
+          pluginInfo->vendor_version() != response.vendor_version()) {
+        LOG(WARNING)
+          << "Inconsistent controller and node plugin components. Please check "
+             "with the plugin vendor to ensure compatibility.";
+      }
 
-          // Get the latest service future before proceeding to the next step.
-          return getService(controllerContainerId.get());
-        }));
-    }))
-    .then(defer(self(), [=](csi::v0::Client client) {
       // Get the controller capabilities.
       return call<csi::v0::CONTROLLER_GET_CAPABILITIES>(
-          client, csi::v0::ControllerGetCapabilitiesRequest())
-        .then(defer(self(), [=](
-            const csi::v0::ControllerGetCapabilitiesResponse& response) {
-          controllerCapabilities = response.capabilities();
+          controllerContainerId.get(),
+          csi::v0::ControllerGetCapabilitiesRequest());
+    }))
+    .then(defer(self(), [=](
+        const csi::v0::ControllerGetCapabilitiesResponse& response) {
+      controllerCapabilities = response.capabilities();
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
@@ -2262,32 +2278,25 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 {
   CHECK_SOME(nodeContainerId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      // Get the node capabilities.
-      return call<csi::v0::NODE_GET_CAPABILITIES>(
-          client, csi::v0::NodeGetCapabilitiesRequest())
-        .then(defer(self(), [=](
-            const csi::v0::NodeGetCapabilitiesResponse& response)
-            -> Future<csi::v0::Client> {
-          nodeCapabilities = response.capabilities();
-
-          // Get the latest service future before proceeding to the next step.
-          return getService(nodeContainerId.get());
-        }))
-        .then(defer(self(), [=](csi::v0::Client client) -> Future<Nothing> {
-          if (!controllerCapabilities.publishUnpublishVolume) {
-            return Nothing();
-          }
+  // Get the node capabilities.
+  return call<csi::v0::NODE_GET_CAPABILITIES>(
+      nodeContainerId.get(), csi::v0::NodeGetCapabilitiesRequest())
+    .then(defer(self(), [=](
+        const csi::v0::NodeGetCapabilitiesResponse& response)
+        -> Future<Nothing> {
+      nodeCapabilities = response.capabilities();
 
-          // Get the node ID.
-          return call<csi::v0::NODE_GET_ID>(client, csi::v0::NodeGetIdRequest())
-            .then(defer(self(), [=](
-                const csi::v0::NodeGetIdResponse& response) {
-              nodeId = response.node_id();
+      if (!controllerCapabilities.publishUnpublishVolume) {
+        return Nothing();
+      }
 
-              return Nothing();
-            }));
+      // Get the node ID.
+      return call<csi::v0::NODE_GET_ID>(
+          nodeContainerId.get(), csi::v0::NodeGetIdRequest())
+        .then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) {
+          nodeId = response.node_id();
+
+          return Nothing();
         }));
     }));
 }
@@ -2295,6 +2304,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 
 // Transitions the state of the specified volume from `CREATED` or
 // `CONTROLLER_PUBLISH` to `NODE_READY`.
+//
 // NOTE: This can only be called after `prepareControllerService` and
 // `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
@@ -2312,47 +2322,42 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     return Nothing();
   }
 
-  CHECK_SOME(controllerContainerId);
-  CHECK_SOME(nodeId);
+  if (volume.state.state() == VolumeState::CREATED) {
+    volume.state.set_state(VolumeState::CONTROLLER_PUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [this, volumeId](
-        csi::v0::Client client) -> Future<Nothing> {
-      VolumeData& volume = volumes.at(volumeId);
+  CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state());
 
-      if (volume.state.state() == VolumeState::CREATED) {
-        volume.state.set_state(VolumeState::CONTROLLER_PUBLISH);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK_SOME(nodeId);
 
-      CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state());
+  csi::v0::ControllerPublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_node_id(nodeId.get());
+  *request.mutable_volume_capability() = volume.state.volume_capability();
+  request.set_readonly(false);
+  *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-      csi::v0::ControllerPublishVolumeRequest request;
-      request.set_volume_id(volumeId);
-      request.set_node_id(nodeId.get());
-      request.mutable_volume_capability()
-        ->CopyFrom(volume.state.volume_capability());
-      request.set_readonly(false);
-      *request.mutable_volume_attributes() = volume.state.volume_attributes();
+  CHECK_SOME(controllerContainerId);
 
-      return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
-          client, std::move(request))
-        .then(defer(self(), [this, volumeId](
-            const csi::v0::ControllerPublishVolumeResponse& response) {
-          VolumeData& volume = volumes.at(volumeId);
+  return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
+      controllerContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId](
+        const csi::v0::ControllerPublishVolumeResponse& response) {
+      VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::NODE_READY);
-          *volume.state.mutable_publish_info() = response.publish_info();
-          checkpointVolumeState(volumeId);
+      volume.state.set_state(VolumeState::NODE_READY);
+      *volume.state.mutable_publish_info() = response.publish_info();
+      checkpointVolumeState(volumeId);
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `NODE_READY`,
 // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
+//
 // NOTE: This can only be called after `prepareControllerService` and
 // `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
@@ -2370,45 +2375,42 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
     return Nothing();
   }
 
-  CHECK_SOME(controllerContainerId);
-  CHECK_SOME(nodeId);
+  // A previously failed `ControllerPublishVolume` call can be recovered through
+  // the current `ControllerUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
+  if (volume.state.state() == VolumeState::NODE_READY ||
+      volume.state.state() == VolumeState::CONTROLLER_PUBLISH) {
+    volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [this, volumeId](csi::v0::Client client) {
-      VolumeData& volume = volumes.at(volumeId);
+  CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state());
 
-      // A previously failed `ControllerPublishVolume` call can be recovered
-      // through the current `ControllerUnpublishVolume` call. See:
-      // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
-      if (volume.state.state() == VolumeState::NODE_READY ||
-          volume.state.state() == VolumeState::CONTROLLER_PUBLISH) {
-        volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK_SOME(nodeId);
 
-      CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state());
+  csi::v0::ControllerUnpublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_node_id(nodeId.get());
 
-      csi::v0::ControllerUnpublishVolumeRequest request;
-      request.set_volume_id(volumeId);
-      request.set_node_id(nodeId.get());
+  CHECK_SOME(controllerContainerId);
 
-      return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
-          client, std::move(request))
-        .then(defer(self(), [this, volumeId] {
-          VolumeData& volume = volumes.at(volumeId);
+  return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
+      controllerContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId] {
+      VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::CREATED);
-          volume.state.mutable_publish_info()->clear();
-          checkpointVolumeState(volumeId);
+      volume.state.set_state(VolumeState::CREATED);
+      volume.state.mutable_publish_info()->clear();
+      checkpointVolumeState(volumeId);
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `NODE_READY` or
 // `NODE_STAGE` to `VOL_READY`.
+//
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
     const string& volumeId)
@@ -2426,58 +2428,53 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
     return Nothing();
   }
 
-  CHECK_SOME(nodeContainerId);
-
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [this, volumeId](
-        csi::v0::Client client) -> Future<Nothing> {
-      VolumeData& volume = volumes.at(volumeId);
+  const string stagingPath = csi::paths::getMountStagingPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin().type(),
+          info.storage().plugin().name()),
+      volumeId);
 
-      const string stagingPath = csi::paths::getMountStagingPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
+  Try<Nothing> mkdir = os::mkdir(stagingPath);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create mount staging path '" + stagingPath +
+        "': " + mkdir.error());
+  }
 
-      Try<Nothing> mkdir = os::mkdir(stagingPath);
-      if (mkdir.isError()) {
-        return Failure(
-            "Failed to create mount staging path '" + stagingPath + "': " +
-            mkdir.error());
-      }
+  if (volume.state.state() == VolumeState::NODE_READY) {
+    volume.state.set_state(VolumeState::NODE_STAGE);
+    checkpointVolumeState(volumeId);
+  }
 
-      if (volume.state.state() == VolumeState::NODE_READY) {
-        volume.state.set_state(VolumeState::NODE_STAGE);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
 
-      CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
+  csi::v0::NodeStageVolumeRequest request;
+  request.set_volume_id(volumeId);
+  *request.mutable_publish_info() = volume.state.publish_info();
+  request.set_staging_target_path(stagingPath);
+  *request.mutable_volume_capability() = volume.state.volume_capability();
+  *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-      csi::v0::NodeStageVolumeRequest request;
-      request.set_volume_id(volumeId);
-      *request.mutable_publish_info() = volume.state.publish_info();
-      request.set_staging_target_path(stagingPath);
-      request.mutable_volume_capability()
-        ->CopyFrom(volume.state.volume_capability());
-      *request.mutable_volume_attributes() = volume.state.volume_attributes();
+  CHECK_SOME(nodeContainerId);
 
-      return call<csi::v0::NODE_STAGE_VOLUME>(client, std::move(request))
-        .then(defer(self(), [this, volumeId] {
-          VolumeData& volume = volumes.at(volumeId);
+  return call<csi::v0::NODE_STAGE_VOLUME>(
+      nodeContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId] {
+      VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::VOL_READY);
-          volume.state.set_boot_id(bootId);
-          checkpointVolumeState(volumeId);
+      volume.state.set_state(VolumeState::VOL_READY);
+      volume.state.set_boot_id(bootId);
+      checkpointVolumeState(volumeId);
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE`
 // or `NODE_UNSTAGE` to `NODE_READY`.
+//
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
     const string& volumeId)
@@ -2495,173 +2492,165 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
     return Nothing();
   }
 
-  CHECK_SOME(nodeContainerId);
+  const string stagingPath = csi::paths::getMountStagingPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin().type(),
+          info.storage().plugin().name()),
+      volumeId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [this, volumeId](csi::v0::Client client) {
-      VolumeData& volume = volumes.at(volumeId);
+  CHECK(os::exists(stagingPath));
 
-      const string stagingPath = csi::paths::getMountStagingPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
-
-      CHECK(os::exists(stagingPath));
-
-      // A previously failed `NodeStageVolume` call can be recovered through the
-      // current `NodeUnstageVolume` call. See:
-      // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
-      if (volume.state.state() == VolumeState::VOL_READY ||
-          volume.state.state() == VolumeState::NODE_STAGE) {
-        volume.state.set_state(VolumeState::NODE_UNSTAGE);
-        checkpointVolumeState(volumeId);
-      }
+  // A previously failed `NodeStageVolume` call can be recovered through the
+  // current `NodeUnstageVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
+  if (volume.state.state() == VolumeState::VOL_READY ||
+      volume.state.state() == VolumeState::NODE_STAGE) {
+    volume.state.set_state(VolumeState::NODE_UNSTAGE);
+    checkpointVolumeState(volumeId);
+  }
 
-      CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
+  CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
 
-      csi::v0::NodeUnstageVolumeRequest request;
-      request.set_volume_id(volumeId);
-      request.set_staging_target_path(stagingPath);
+  csi::v0::NodeUnstageVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_staging_target_path(stagingPath);
 
-      return call<csi::v0::NODE_UNSTAGE_VOLUME>(client, std::move(request))
-        .then(defer(self(), [this, volumeId] {
-          VolumeData& volume = volumes.at(volumeId);
+  CHECK_SOME(nodeContainerId);
 
-          volume.state.set_state(VolumeState::NODE_READY);
-          volume.state.clear_boot_id();
-          checkpointVolumeState(volumeId);
+  return call<csi::v0::NODE_UNSTAGE_VOLUME>(
+      nodeContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId] {
+      VolumeData& volume = volumes.at(volumeId);
 
-          return Nothing();
-        }));
+      volume.state.set_state(VolumeState::NODE_READY);
+      volume.state.clear_boot_id();
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `VOL_READY` or
 // `NODE_PUBLISH` to `PUBLISHED`.
+//
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  CHECK_SOME(nodeContainerId);
+  VolumeData& volume = volumes.at(volumeId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [this, volumeId](
-        csi::v0::Client client) -> Future<Nothing> {
-      VolumeData& volume = volumes.at(volumeId);
+  const string targetPath = csi::paths::getMountTargetPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin().type(),
+          info.storage().plugin().name()),
+      volumeId);
 
-      const string targetPath = csi::paths::getMountTargetPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
+  Try<Nothing> mkdir = os::mkdir(targetPath);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create mount target path '" + targetPath +
+        "': " + mkdir.error());
+  }
 
-      Try<Nothing> mkdir = os::mkdir(targetPath);
-      if (mkdir.isError()) {
-        return Failure(
-            "Failed to create mount target path '" + targetPath + "': " +
-            mkdir.error());
-      }
+  if (volume.state.state() == VolumeState::VOL_READY) {
+    volume.state.set_state(VolumeState::NODE_PUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-      if (volume.state.state() == VolumeState::VOL_READY) {
-        volume.state.set_state(VolumeState::NODE_PUBLISH);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state());
 
-      CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state());
+  csi::v0::NodePublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  *request.mutable_publish_info() = volume.state.publish_info();
+  request.set_target_path(targetPath);
+  *request.mutable_volume_capability() = volume.state.volume_capability();
+  request.set_readonly(false);
+  *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-      csi::v0::NodePublishVolumeRequest request;
-      request.set_volume_id(volumeId);
-      *request.mutable_publish_info() = volume.state.publish_info();
-      request.set_target_path(targetPath);
-      request.mutable_volume_capability()
-        ->CopyFrom(volume.state.volume_capability());
-      request.set_readonly(false);
-      *request.mutable_volume_attributes() = volume.state.volume_attributes();
+  if (nodeCapabilities.stageUnstageVolume) {
+    const string stagingPath = csi::paths::getMountStagingPath(
+        csi::paths::getMountRootDir(
+            slave::paths::getCsiRootDir(workDir),
+            info.storage().plugin().type(),
+            info.storage().plugin().name()),
+        volumeId);
 
-      if (nodeCapabilities.stageUnstageVolume) {
-        const string stagingPath = csi::paths::getMountStagingPath(
-            csi::paths::getMountRootDir(
-                slave::paths::getCsiRootDir(workDir),
-                info.storage().plugin().type(),
-                info.storage().plugin().name()),
-            volumeId);
+    CHECK(os::exists(stagingPath));
 
-        CHECK(os::exists(stagingPath));
+    request.set_staging_target_path(stagingPath);
+  }
 
-        request.set_staging_target_path(stagingPath);
-      }
+  CHECK_SOME(nodeContainerId);
 
-      return call<csi::v0::NODE_PUBLISH_VOLUME>(client, std::move(request))
-        .then(defer(self(), [this, volumeId] {
-          VolumeData& volume = volumes.at(volumeId);
+  return call<csi::v0::NODE_PUBLISH_VOLUME>(
+      nodeContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId] {
+      VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::PUBLISHED);
-          checkpointVolumeState(volumeId);
+      volume.state.set_state(VolumeState::PUBLISHED);
+      checkpointVolumeState(volumeId);
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `PUBLISHED`,
 // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
+//
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  CHECK_SOME(nodeContainerId);
+  VolumeData& volume = volumes.at(volumeId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [this, volumeId](csi::v0::Client client) {
-      VolumeData& volume = volumes.at(volumeId);
+  const string targetPath = csi::paths::getMountTargetPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin().type(),
+          info.storage().plugin().name()),
+      volumeId);
 
-      const string targetPath = csi::paths::getMountTargetPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
-
-      CHECK(os::exists(targetPath));
-
-      // A previously failed `NodePublishVolume` call can be recovered through
-      // the current `NodeUnpublishVolume` call. See:
-      // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
-      if (volume.state.state() == VolumeState::PUBLISHED ||
-          volume.state.state() == VolumeState::NODE_PUBLISH) {
-        volume.state.set_state(VolumeState::NODE_UNPUBLISH);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK(os::exists(targetPath));
 
-      CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state());
+  // A previously failed `NodePublishVolume` call can be recovered through the
+  // current `NodeUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
+  if (volume.state.state() == VolumeState::PUBLISHED ||
+      volume.state.state() == VolumeState::NODE_PUBLISH) {
+    volume.state.set_state(VolumeState::NODE_UNPUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-      csi::v0::NodeUnpublishVolumeRequest request;
-      request.set_volume_id(volumeId);
-      request.set_target_path(targetPath);
+  CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state());
 
-      return call<csi::v0::NODE_UNPUBLISH_VOLUME>(client, std::move(request))
-        .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
-          VolumeData& volume = volumes.at(volumeId);
+  csi::v0::NodeUnpublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_target_path(targetPath);
 
-          volume.state.set_state(VolumeState::VOL_READY);
-          checkpointVolumeState(volumeId);
+  CHECK_SOME(nodeContainerId);
 
-          Try<Nothing> rmdir = os::rmdir(targetPath);
-          if (rmdir.isError()) {
-            return Failure(
-                "Failed to remove mount point '" + targetPath + "': " +
-                rmdir.error());
-          }
+  return call<csi::v0::NODE_UNPUBLISH_VOLUME>(
+      nodeContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
+      VolumeData& volume = volumes.at(volumeId);
 
-          return Nothing();
-        }));
+      volume.state.set_state(VolumeState::VOL_READY);
+      checkpointVolumeState(volumeId);
+
+      Try<Nothing> rmdir = os::rmdir(targetPath);
+      if (rmdir.isError()) {
+        return Failure(
+            "Failed to remove mount point '" + targetPath + "': " +
+            rmdir.error());
+      }
+
+      return Nothing();
     }));
 }
 
@@ -2679,43 +2668,37 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
         "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
+  csi::v0::CreateVolumeRequest request;
+  request.set_name(name);
+  request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
+  request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
+  *request.add_volume_capabilities() = profileInfo.capability;
+  *request.mutable_parameters() = profileInfo.parameters;
+
   CHECK_SOME(controllerContainerId);
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      csi::v0::CreateVolumeRequest request;
-      request.set_name(name);
-      request.mutable_capacity_range()
-        ->set_required_bytes(capacity.bytes());
-      request.mutable_capacity_range()
-        ->set_limit_bytes(capacity.bytes());
-      request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
-      *request.mutable_parameters() = profileInfo.parameters;
-
-      return call<csi::v0::CREATE_VOLUME>(client, std::move(request))
-        .then(defer(self(), [=](
-            const csi::v0::CreateVolumeResponse& response) -> string {
-          const csi::v0::Volume& volume = response.volume();
-
-          if (volumes.contains(volume.id())) {
-            // The resource provider failed over after the last `createVolume`
-            // call, but before the operation status was checkpointed.
-            CHECK_EQ(VolumeState::CREATED,
-                     volumes.at(volume.id()).state.state());
-          } else {
-            VolumeState volumeState;
-            volumeState.set_state(VolumeState::CREATED);
-            volumeState.mutable_volume_capability()
-              ->CopyFrom(profileInfo.capability);
-            *volumeState.mutable_parameters() = profileInfo.parameters;
-            *volumeState.mutable_volume_attributes() = volume.attributes();
-
-            volumes.put(volume.id(), std::move(volumeState));
-            checkpointVolumeState(volume.id());
-          }
+  return call<csi::v0::CREATE_VOLUME>(
+      controllerContainerId.get(), std::move(request))
+    .then(defer(self(), [=](
+        const csi::v0::CreateVolumeResponse& response) -> string {
+      const csi::v0::Volume& volume = response.volume();
 
-          return volume.id();
-        }));
+      if (volumes.contains(volume.id())) {
+        // The resource provider failed over after the last `createVolume` call,
+        // but before the operation status was checkpointed.
+        CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state());
+      } else {
+        VolumeState volumeState;
+        volumeState.set_state(VolumeState::CREATED);
+        *volumeState.mutable_volume_capability() = profileInfo.capability;
+        *volumeState.mutable_parameters() = profileInfo.parameters;
+        *volumeState.mutable_volume_attributes() = volume.attributes();
+
+        volumes.put(volume.id(), std::move(volumeState));
+        checkpointVolumeState(volume.id());
+      }
+
+      return volume.id();
     }));
 }
 
@@ -2727,8 +2710,6 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId)
 {
-  CHECK_SOME(controllerContainerId);
-
   const string volumePath = csi::paths::getVolumePath(
       slave::paths::getCsiRootDir(workDir),
       info.storage().plugin().type(),
@@ -2745,10 +2726,10 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
 
   const VolumeData& volume = volumes.at(volumeId);
 
-  Future<Nothing> deleted = Nothing();
-
   CHECK(VolumeState::State_IsValid(volume.state.state()));
 
+  Future<Nothing> deleted = Nothing();
+
   switch (volume.state.state()) {
     case VolumeState::PUBLISHED:
     case VolumeState::NODE_PUBLISH:
@@ -2782,12 +2763,14 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
       // supported. Otherwise, we simply leave it as a preprovisioned volume.
       if (controllerCapabilities.createDeleteVolume) {
         deleted = deleted
-          .then(defer(self(), &Self::getService, controllerContainerId.get()))
-          .then(defer(self(), [this, volumeId](csi::v0::Client client) {
+          .then(defer(self(), [this, volumeId] {
             csi::v0::DeleteVolumeRequest request;
             request.set_volume_id(volumeId);
 
-            return call<csi::v0::DELETE_VOLUME>(client, std::move(request))
+            CHECK_SOME(controllerContainerId);
+
+            return call<csi::v0::DELETE_VOLUME>(
+                controllerContainerId.get(), std::move(request))
               .then([] { return Nothing(); });
           }));
       }
@@ -2856,45 +2839,40 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
         "Plugin capability 'CONTROLLER_SERVICE' is not supported");
   }
 
-  CHECK_SOME(controllerContainerId);
+  google::protobuf::Map<string, string> volumeAttributes;
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      google::protobuf::Map<string, string> volumeAttributes;
+  if (metadata.isSome()) {
+    volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
+  }
 
-      if (metadata.isSome()) {
-        volumeAttributes =
-          CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
-      }
+  csi::v0::ValidateVolumeCapabilitiesRequest request;
+  request.set_volume_id(volumeId);
+  *request.add_volume_capabilities() = profileInfo.capability;
+  *request.mutable_volume_attributes() = volumeAttributes;
 
-      csi::v0::ValidateVolumeCapabilitiesRequest request;
-      request.set_volume_id(volumeId);
-      request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
-      *request.mutable_volume_attributes() = volumeAttributes;
-
-      return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
-          client, std::move(request))
-        .then(defer(self(), [=](
-            const csi::v0::ValidateVolumeCapabilitiesResponse& response)
-            -> Future<Nothing> {
-          if (!response.supported()) {
-            return Failure(
-                "Unsupported volume capability for volume '" + volumeId +
-                "': " + response.message());
-          }
+  CHECK_SOME(controllerContainerId);
 
-          VolumeState volumeState;
-          volumeState.set_state(VolumeState::CREATED);
-          volumeState.mutable_volume_capability()
-            ->CopyFrom(profileInfo.capability);
-          *volumeState.mutable_parameters() = profileInfo.parameters;
-          *volumeState.mutable_volume_attributes() = volumeAttributes;
+  return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
+      controllerContainerId.get(), std::move(request))
+    .then(defer(self(), [=](
+        const csi::v0::ValidateVolumeCapabilitiesResponse& response)
+        -> Future<Nothing> {
+      if (!response.supported()) {
+        return Failure(
+            "Unsupported volume capability for volume '" + volumeId + "': " +
+            response.message());
+      }
 
-          volumes.put(volumeId, std::move(volumeState));
-          checkpointVolumeState(volumeId);
+      VolumeState volumeState;
+      volumeState.set_state(VolumeState::CREATED);
+      *volumeState.mutable_volume_capability() = profileInfo.capability;
+      *volumeState.mutable_parameters() = profileInfo.parameters;
+      *volumeState.mutable_volume_attributes() = volumeAttributes;
 
-          return Nothing();
-        }));
+      volumes.put(volumeId, std::move(volumeState));
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
     }));
 }
 
@@ -2912,41 +2890,39 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 
   CHECK_SOME(controllerContainerId);
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      // TODO(chhsiao): Set the max entries and use a loop to do
-      // multiple `ListVolumes` calls.
-      return call<csi::v0::LIST_VOLUMES>(client, csi::v0::ListVolumesRequest())
-        .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
-          Resources resources;
-
-          // Recover disk profiles from the checkpointed state.
-          hashmap<string, string> volumesToProfiles;
-          foreach (const Resource& resource, totalResources) {
-            if (resource.disk().source().has_id() &&
-                resource.disk().source().has_profile()) {
-              volumesToProfiles.put(
-                  resource.disk().source().id(),
-                  resource.disk().source().profile());
-            }
-          }
+  // TODO(chhsiao): Set the max entries and use a loop to do
+  // multiple `ListVolumes` calls.
+  return call<csi::v0::LIST_VOLUMES>(
+      controllerContainerId.get(), csi::v0::ListVolumesRequest())
+    .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
+      Resources resources;
 
-          foreach (const auto& entry, response.entries()) {
-            resources += createRawDiskResource(
-                info,
-                Bytes(entry.volume().capacity_bytes()),
-                volumesToProfiles.contains(entry.volume().id())
-                  ? volumesToProfiles.at(entry.volume().id())
-                  : Option<string>::none(),
-                vendor,
-                entry.volume().id(),
-                entry.volume().attributes().empty()
-                  ? Option<Labels>::none()
-                  : convertStringMapToLabels(entry.volume().attributes()));
-          }
+      // Recover disk profiles from the checkpointed state.
+      hashmap<string, string> volumesToProfiles;
+      foreach (const Resource& resource, totalResources) {
+        if (resource.disk().source().has_id() &&
+            resource.disk().source().has_profile()) {
+          volumesToProfiles.put(
+              resource.disk().source().id(),
+              resource.disk().source().profile());
+        }
+      }
 
-          return resources;
-        }));
+      foreach (const auto& entry, response.entries()) {
+        resources += createRawDiskResource(
+            info,
+            Bytes(entry.volume().capacity_bytes()),
+            volumesToProfiles.contains(entry.volume().id())
+              ? volumesToProfiles.at(entry.volume().id())
+              : Option<string>::none(),
+            vendor,
+            entry.volume().id(),
+            entry.volume().attributes().empty()
+              ? Option<Labels>::none()
+              : convertStringMapToLabels(entry.volume().attributes()));
+      }
+
+      return resources;
     }));
 }
 
@@ -2964,19 +2940,18 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 
   CHECK_SOME(controllerContainerId);
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      vector<Future<Resources>> futures;
+  vector<Future<Resources>> futures;
 
-      foreachpair (const string& profile,
-                   const DiskProfileAdaptor::ProfileInfo& profileInfo,
-                   profileInfos) {
-        csi::v0::GetCapacityRequest request;
-        request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
-        *request.mutable_parameters() = profileInfo.parameters;
+  foreachpair (const string& profile,
+               const DiskProfileAdaptor::ProfileInfo& profileInfo,
+               profileInfos) {
+    csi::v0::GetCapacityRequest request;
+    *request.add_volume_capabilities() = profileInfo.capability;
+    *request.mutable_parameters() = profileInfo.parameters;
 
-        futures.push_back(call<csi::v0::GET_CAPACITY>(
-            client, std::move(request))
+    futures.push_back(
+        call<csi::v0::GET_CAPACITY>(
+            controllerContainerId.get(), std::move(request))
           .then(defer(self(), [=](
               const csi::v0::GetCapacityResponse& response) -> Resources {
             if (response.available_capacity() == 0) {
@@ -2984,18 +2959,14 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
             }
 
             return createRawDiskResource(
-                info,
-                Bytes(response.available_capacity()),
-                profile,
-                vendor);
+                info, Bytes(response.available_capacity()), profile, vendor);
           })));
-      }
+  }
 
-      return collect(futures)
-        .then([](const vector<Resources>& resources) {
-          return accumulate(resources.begin(), resources.end(), Resources());
-        });
-    }));
+  return collect(futures)
+    .then([](const vector<Resources>& resources) {
+      return accumulate(resources.begin(), resources.end(), Resources());
+    });
 }