You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:15:12 UTC

[mesos] 02/06: Preliminary SLRP refactoring for RPC retry.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 994f2089da451981f2e5a236b2a69965d0e745fc
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 20:18:06 2019 -0800

    Preliminary SLRP refactoring for RPC retry.
    
    This patch refactors the `StorageLocalResourceProvider::call` function
    to obtain the latest service future through `getService` before making
    the actual RPC call. The subsequent patch would utilize this to support
    RPC retry across plugin restarts.
    
    Review: https://reviews.apache.org/r/69811
---
 src/resource_provider/storage/provider.cpp | 821 ++++++++++++++---------------
 1 file changed, 396 insertions(+), 425 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f7a8634..f9f9312 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -390,12 +390,32 @@ private:
   void reconcileOperations(
       const Event::ReconcileOperations& reconcile);
 
-  template <csi::v0::RPC rpc>
+  // Wrapper functions to make CSI calls and update RPC metrics.
+  //
+  // The call is made asynchronously and thus no guarantee is provided on the
+  // order in which calls are sent. Callers need to either ensure to not have
+  // multiple conflicting calls in flight, or treat results idempotently.
+  //
+  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
+  // calls on the same volume, and 2) no profile update while there are ongoing
+  // `CREATE_DISK` or `DESTROY_DISK` operations.
+  //
+  // NOTE: Since this function uses `getService` to obtain the latest service
+  // future, which depends on probe results, it is disabled for making probe
+  // calls; `_call` should be used directly instead.
+  template <
+      csi::v0::RPC rpc,
+      typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
   Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
+      const ContainerID& containerId,
+      const typename csi::v0::RPCTraits<rpc>::request_type& request);
+
+  template <csi::v0::RPC rpc>
+  Future<typename csi::v0::RPCTraits<rpc>::response_type> _call(
       csi::v0::Client client,
-      typename csi::v0::RPCTraits<rpc>::request_type&& request);
+      const typename csi::v0::RPCTraits<rpc>::request_type& request);
 
-  Future<csi::v0::Client> connect(const string& endpoint);
+  Future<csi::v0::Client> waitService(const string& endpoint);
   Future<csi::v0::Client> getService(const ContainerID& containerId);
   Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
   Future<Nothing> waitContainer(const ContainerID& containerId);
@@ -1847,15 +1867,29 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
 }
 
 
-template <csi::v0::RPC rpc>
+template <
+    csi::v0::RPC rpc,
+    typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
 Future<typename csi::v0::RPCTraits<rpc>::response_type>
 StorageLocalResourceProviderProcess::call(
+    const ContainerID& containerId,
+    const typename csi::v0::RPCTraits<rpc>::request_type& request)
+{
+  // Get the latest service future before making the call.
+  return getService(containerId)
+    .then(defer(self(), &Self::_call<rpc>, lambda::_1, request));
+}
+
+
+template <csi::v0::RPC rpc>
+Future<typename csi::v0::RPCTraits<rpc>::response_type>
+StorageLocalResourceProviderProcess::_call(
     csi::v0::Client client,
-    typename csi::v0::RPCTraits<rpc>::request_type&& request)
+    const typename csi::v0::RPCTraits<rpc>::request_type& request)
 {
   ++metrics.csi_plugin_rpcs_pending.at(rpc);
 
-  return client.call<rpc>(std::move(request))
+  return client.call<rpc>(request)
     .onAny(defer(self(), [=](
         const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) {
       --metrics.csi_plugin_rpcs_pending.at(rpc);
@@ -1873,18 +1907,18 @@ StorageLocalResourceProviderProcess::call(
 // Returns a future of a CSI client that waits for the endpoint socket
 // to appear if necessary, then connects to the socket and check its
 // readiness.
-Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
     const string& endpoint)
 {
-  Future<csi::v0::Client> future;
+  Future<csi::v0::Client> service;
 
   if (os::exists(endpoint)) {
-    future = csi::v0::Client("unix://" + endpoint, runtime);
+    service = csi::v0::Client("unix://" + endpoint, runtime);
   } else {
     // Wait for the endpoint socket to appear until the timeout expires.
     Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
 
-    future = loop(
+    service = loop(
         self(),
         [=]() -> Future<Nothing> {
           if (timeout.expired()) {
@@ -1902,13 +1936,10 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
         });
   }
 
-  return future
+  return service
     .then(defer(self(), [=](csi::v0::Client client) {
-      return call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
-        .then(defer(self(), [=](
-            const csi::v0::ProbeResponse& response) -> csi::v0::Client {
-          return client;
-        }));
+      return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
+        .then([=]() -> csi::v0::Client { return client; });
     }));
 }
 
@@ -2018,7 +2049,7 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
           << " type '" << info.storage().plugin().type() << "' and "
           << " name '" << info.storage().plugin().name() << "'";
 
-        CHECK(services.at(containerId)->associate(connect(endpointPath)));
+        CHECK(services.at(containerId)->associate(waitService(endpointPath)));
         return services.at(containerId)->future()
           .then([] { return Nothing(); });
       })),
@@ -2179,31 +2210,23 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
 {
   CHECK_SOME(nodeContainerId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      // Get the plugin info.
-      return call<csi::v0::GET_PLUGIN_INFO>(
-          client, csi::v0::GetPluginInfoRequest())
-        .then(defer(self(), [=](
-            const csi::v0::GetPluginInfoResponse& response) {
-          pluginInfo = response;
+  // Get the plugin info.
+  return call<csi::v0::GET_PLUGIN_INFO>(
+      nodeContainerId.get(), csi::v0::GetPluginInfoRequest())
+    .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
+      pluginInfo = response;
 
-          LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
+      LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
 
-          // Get the latest service future before proceeding to the next step.
-          return getService(nodeContainerId.get());
-        }));
-    }))
-    .then(defer(self(), [=](csi::v0::Client client) {
       // Get the plugin capabilities.
       return call<csi::v0::GET_PLUGIN_CAPABILITIES>(
-          client, csi::v0::GetPluginCapabilitiesRequest())
-        .then(defer(self(), [=](
-            const csi::v0::GetPluginCapabilitiesResponse& response) {
-          pluginCapabilities = response.capabilities();
+          nodeContainerId.get(), csi::v0::GetPluginCapabilitiesRequest());
+    }))
+    .then(defer(self(), [=](
+        const csi::v0::GetPluginCapabilitiesResponse& response) {
+      pluginCapabilities = response.capabilities();
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
@@ -2222,36 +2245,29 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
         stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
   }
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      // Get the controller plugin info and check for consistency.
-      return call<csi::v0::GET_PLUGIN_INFO>(
-          client, csi::v0::GetPluginInfoRequest())
-        .then(defer(self(), [=](
-            const csi::v0::GetPluginInfoResponse& response) {
-          LOG(INFO) << "Controller plugin loaded: " << stringify(response);
-
-          if (pluginInfo->name() != response.name() ||
-              pluginInfo->vendor_version() != response.vendor_version()) {
-            LOG(WARNING)
-              << "Inconsistent controller and node plugin components. Please "
-                 "check with the plugin vendor to ensure compatibility.";
-          }
+  // Get the controller plugin info and check for consistency.
+  return call<csi::v0::GET_PLUGIN_INFO>(
+      controllerContainerId.get(), csi::v0::GetPluginInfoRequest())
+    .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
+      LOG(INFO) << "Controller plugin loaded: " << stringify(response);
+
+      if (pluginInfo->name() != response.name() ||
+          pluginInfo->vendor_version() != response.vendor_version()) {
+        LOG(WARNING)
+          << "Inconsistent controller and node plugin components. Please check "
+             "with the plugin vendor to ensure compatibility.";
+      }
 
-          // Get the latest service future before proceeding to the next step.
-          return getService(controllerContainerId.get());
-        }));
-    }))
-    .then(defer(self(), [=](csi::v0::Client client) {
       // Get the controller capabilities.
       return call<csi::v0::CONTROLLER_GET_CAPABILITIES>(
-          client, csi::v0::ControllerGetCapabilitiesRequest())
-        .then(defer(self(), [=](
-            const csi::v0::ControllerGetCapabilitiesResponse& response) {
-          controllerCapabilities = response.capabilities();
+          controllerContainerId.get(),
+          csi::v0::ControllerGetCapabilitiesRequest());
+    }))
+    .then(defer(self(), [=](
+        const csi::v0::ControllerGetCapabilitiesResponse& response) {
+      controllerCapabilities = response.capabilities();
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
@@ -2262,32 +2278,25 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 {
   CHECK_SOME(nodeContainerId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      // Get the node capabilities.
-      return call<csi::v0::NODE_GET_CAPABILITIES>(
-          client, csi::v0::NodeGetCapabilitiesRequest())
-        .then(defer(self(), [=](
-            const csi::v0::NodeGetCapabilitiesResponse& response)
-            -> Future<csi::v0::Client> {
-          nodeCapabilities = response.capabilities();
-
-          // Get the latest service future before proceeding to the next step.
-          return getService(nodeContainerId.get());
-        }))
-        .then(defer(self(), [=](csi::v0::Client client) -> Future<Nothing> {
-          if (!controllerCapabilities.publishUnpublishVolume) {
-            return Nothing();
-          }
+  // Get the node capabilities.
+  return call<csi::v0::NODE_GET_CAPABILITIES>(
+      nodeContainerId.get(), csi::v0::NodeGetCapabilitiesRequest())
+    .then(defer(self(), [=](
+        const csi::v0::NodeGetCapabilitiesResponse& response)
+        -> Future<Nothing> {
+      nodeCapabilities = response.capabilities();
 
-          // Get the node ID.
-          return call<csi::v0::NODE_GET_ID>(client, csi::v0::NodeGetIdRequest())
-            .then(defer(self(), [=](
-                const csi::v0::NodeGetIdResponse& response) {
-              nodeId = response.node_id();
+      if (!controllerCapabilities.publishUnpublishVolume) {
+        return Nothing();
+      }
 
-              return Nothing();
-            }));
+      // Get the node ID.
+      return call<csi::v0::NODE_GET_ID>(
+          nodeContainerId.get(), csi::v0::NodeGetIdRequest())
+        .then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) {
+          nodeId = response.node_id();
+
+          return Nothing();
         }));
     }));
 }
@@ -2295,6 +2304,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 
 // Transitions the state of the specified volume from `CREATED` or
 // `CONTROLLER_PUBLISH` to `NODE_READY`.
+//
 // NOTE: This can only be called after `prepareControllerService` and
 // `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
@@ -2312,47 +2322,42 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     return Nothing();
   }
 
-  CHECK_SOME(controllerContainerId);
-  CHECK_SOME(nodeId);
+  if (volume.state.state() == VolumeState::CREATED) {
+    volume.state.set_state(VolumeState::CONTROLLER_PUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [this, volumeId](
-        csi::v0::Client client) -> Future<Nothing> {
-      VolumeData& volume = volumes.at(volumeId);
+  CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state());
 
-      if (volume.state.state() == VolumeState::CREATED) {
-        volume.state.set_state(VolumeState::CONTROLLER_PUBLISH);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK_SOME(nodeId);
 
-      CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state());
+  csi::v0::ControllerPublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_node_id(nodeId.get());
+  *request.mutable_volume_capability() = volume.state.volume_capability();
+  request.set_readonly(false);
+  *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-      csi::v0::ControllerPublishVolumeRequest request;
-      request.set_volume_id(volumeId);
-      request.set_node_id(nodeId.get());
-      request.mutable_volume_capability()
-        ->CopyFrom(volume.state.volume_capability());
-      request.set_readonly(false);
-      *request.mutable_volume_attributes() = volume.state.volume_attributes();
+  CHECK_SOME(controllerContainerId);
 
-      return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
-          client, std::move(request))
-        .then(defer(self(), [this, volumeId](
-            const csi::v0::ControllerPublishVolumeResponse& response) {
-          VolumeData& volume = volumes.at(volumeId);
+  return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
+      controllerContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId](
+        const csi::v0::ControllerPublishVolumeResponse& response) {
+      VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::NODE_READY);
-          *volume.state.mutable_publish_info() = response.publish_info();
-          checkpointVolumeState(volumeId);
+      volume.state.set_state(VolumeState::NODE_READY);
+      *volume.state.mutable_publish_info() = response.publish_info();
+      checkpointVolumeState(volumeId);
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `NODE_READY`,
 // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
+//
 // NOTE: This can only be called after `prepareControllerService` and
 // `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
@@ -2370,45 +2375,42 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
     return Nothing();
   }
 
-  CHECK_SOME(controllerContainerId);
-  CHECK_SOME(nodeId);
+  // A previously failed `ControllerPublishVolume` call can be recovered through
+  // the current `ControllerUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
+  if (volume.state.state() == VolumeState::NODE_READY ||
+      volume.state.state() == VolumeState::CONTROLLER_PUBLISH) {
+    volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [this, volumeId](csi::v0::Client client) {
-      VolumeData& volume = volumes.at(volumeId);
+  CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state());
 
-      // A previously failed `ControllerPublishVolume` call can be recovered
-      // through the current `ControllerUnpublishVolume` call. See:
-      // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
-      if (volume.state.state() == VolumeState::NODE_READY ||
-          volume.state.state() == VolumeState::CONTROLLER_PUBLISH) {
-        volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK_SOME(nodeId);
 
-      CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state());
+  csi::v0::ControllerUnpublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_node_id(nodeId.get());
 
-      csi::v0::ControllerUnpublishVolumeRequest request;
-      request.set_volume_id(volumeId);
-      request.set_node_id(nodeId.get());
+  CHECK_SOME(controllerContainerId);
 
-      return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
-          client, std::move(request))
-        .then(defer(self(), [this, volumeId] {
-          VolumeData& volume = volumes.at(volumeId);
+  return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
+      controllerContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId] {
+      VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::CREATED);
-          volume.state.mutable_publish_info()->clear();
-          checkpointVolumeState(volumeId);
+      volume.state.set_state(VolumeState::CREATED);
+      volume.state.mutable_publish_info()->clear();
+      checkpointVolumeState(volumeId);
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `NODE_READY` or
 // `NODE_STAGE` to `VOL_READY`.
+//
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
     const string& volumeId)
@@ -2426,58 +2428,53 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
     return Nothing();
   }
 
-  CHECK_SOME(nodeContainerId);
-
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [this, volumeId](
-        csi::v0::Client client) -> Future<Nothing> {
-      VolumeData& volume = volumes.at(volumeId);
+  const string stagingPath = csi::paths::getMountStagingPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin().type(),
+          info.storage().plugin().name()),
+      volumeId);
 
-      const string stagingPath = csi::paths::getMountStagingPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
+  Try<Nothing> mkdir = os::mkdir(stagingPath);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create mount staging path '" + stagingPath +
+        "': " + mkdir.error());
+  }
 
-      Try<Nothing> mkdir = os::mkdir(stagingPath);
-      if (mkdir.isError()) {
-        return Failure(
-            "Failed to create mount staging path '" + stagingPath + "': " +
-            mkdir.error());
-      }
+  if (volume.state.state() == VolumeState::NODE_READY) {
+    volume.state.set_state(VolumeState::NODE_STAGE);
+    checkpointVolumeState(volumeId);
+  }
 
-      if (volume.state.state() == VolumeState::NODE_READY) {
-        volume.state.set_state(VolumeState::NODE_STAGE);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
 
-      CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
+  csi::v0::NodeStageVolumeRequest request;
+  request.set_volume_id(volumeId);
+  *request.mutable_publish_info() = volume.state.publish_info();
+  request.set_staging_target_path(stagingPath);
+  *request.mutable_volume_capability() = volume.state.volume_capability();
+  *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-      csi::v0::NodeStageVolumeRequest request;
-      request.set_volume_id(volumeId);
-      *request.mutable_publish_info() = volume.state.publish_info();
-      request.set_staging_target_path(stagingPath);
-      request.mutable_volume_capability()
-        ->CopyFrom(volume.state.volume_capability());
-      *request.mutable_volume_attributes() = volume.state.volume_attributes();
+  CHECK_SOME(nodeContainerId);
 
-      return call<csi::v0::NODE_STAGE_VOLUME>(client, std::move(request))
-        .then(defer(self(), [this, volumeId] {
-          VolumeData& volume = volumes.at(volumeId);
+  return call<csi::v0::NODE_STAGE_VOLUME>(
+      nodeContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId] {
+      VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::VOL_READY);
-          volume.state.set_boot_id(bootId);
-          checkpointVolumeState(volumeId);
+      volume.state.set_state(VolumeState::VOL_READY);
+      volume.state.set_boot_id(bootId);
+      checkpointVolumeState(volumeId);
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE`
 // or `NODE_UNSTAGE` to `NODE_READY`.
+//
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
     const string& volumeId)
@@ -2495,173 +2492,165 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
     return Nothing();
   }
 
-  CHECK_SOME(nodeContainerId);
+  const string stagingPath = csi::paths::getMountStagingPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin().type(),
+          info.storage().plugin().name()),
+      volumeId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [this, volumeId](csi::v0::Client client) {
-      VolumeData& volume = volumes.at(volumeId);
+  CHECK(os::exists(stagingPath));
 
-      const string stagingPath = csi::paths::getMountStagingPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
-
-      CHECK(os::exists(stagingPath));
-
-      // A previously failed `NodeStageVolume` call can be recovered through the
-      // current `NodeUnstageVolume` call. See:
-      // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
-      if (volume.state.state() == VolumeState::VOL_READY ||
-          volume.state.state() == VolumeState::NODE_STAGE) {
-        volume.state.set_state(VolumeState::NODE_UNSTAGE);
-        checkpointVolumeState(volumeId);
-      }
+  // A previously failed `NodeStageVolume` call can be recovered through the
+  // current `NodeUnstageVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
+  if (volume.state.state() == VolumeState::VOL_READY ||
+      volume.state.state() == VolumeState::NODE_STAGE) {
+    volume.state.set_state(VolumeState::NODE_UNSTAGE);
+    checkpointVolumeState(volumeId);
+  }
 
-      CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
+  CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
 
-      csi::v0::NodeUnstageVolumeRequest request;
-      request.set_volume_id(volumeId);
-      request.set_staging_target_path(stagingPath);
+  csi::v0::NodeUnstageVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_staging_target_path(stagingPath);
 
-      return call<csi::v0::NODE_UNSTAGE_VOLUME>(client, std::move(request))
-        .then(defer(self(), [this, volumeId] {
-          VolumeData& volume = volumes.at(volumeId);
+  CHECK_SOME(nodeContainerId);
 
-          volume.state.set_state(VolumeState::NODE_READY);
-          volume.state.clear_boot_id();
-          checkpointVolumeState(volumeId);
+  return call<csi::v0::NODE_UNSTAGE_VOLUME>(
+      nodeContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId] {
+      VolumeData& volume = volumes.at(volumeId);
 
-          return Nothing();
-        }));
+      volume.state.set_state(VolumeState::NODE_READY);
+      volume.state.clear_boot_id();
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `VOL_READY` or
 // `NODE_PUBLISH` to `PUBLISHED`.
+//
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  CHECK_SOME(nodeContainerId);
+  VolumeData& volume = volumes.at(volumeId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [this, volumeId](
-        csi::v0::Client client) -> Future<Nothing> {
-      VolumeData& volume = volumes.at(volumeId);
+  const string targetPath = csi::paths::getMountTargetPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin().type(),
+          info.storage().plugin().name()),
+      volumeId);
 
-      const string targetPath = csi::paths::getMountTargetPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
+  Try<Nothing> mkdir = os::mkdir(targetPath);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create mount target path '" + targetPath +
+        "': " + mkdir.error());
+  }
 
-      Try<Nothing> mkdir = os::mkdir(targetPath);
-      if (mkdir.isError()) {
-        return Failure(
-            "Failed to create mount target path '" + targetPath + "': " +
-            mkdir.error());
-      }
+  if (volume.state.state() == VolumeState::VOL_READY) {
+    volume.state.set_state(VolumeState::NODE_PUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-      if (volume.state.state() == VolumeState::VOL_READY) {
-        volume.state.set_state(VolumeState::NODE_PUBLISH);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state());
 
-      CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state());
+  csi::v0::NodePublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  *request.mutable_publish_info() = volume.state.publish_info();
+  request.set_target_path(targetPath);
+  *request.mutable_volume_capability() = volume.state.volume_capability();
+  request.set_readonly(false);
+  *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-      csi::v0::NodePublishVolumeRequest request;
-      request.set_volume_id(volumeId);
-      *request.mutable_publish_info() = volume.state.publish_info();
-      request.set_target_path(targetPath);
-      request.mutable_volume_capability()
-        ->CopyFrom(volume.state.volume_capability());
-      request.set_readonly(false);
-      *request.mutable_volume_attributes() = volume.state.volume_attributes();
+  if (nodeCapabilities.stageUnstageVolume) {
+    const string stagingPath = csi::paths::getMountStagingPath(
+        csi::paths::getMountRootDir(
+            slave::paths::getCsiRootDir(workDir),
+            info.storage().plugin().type(),
+            info.storage().plugin().name()),
+        volumeId);
 
-      if (nodeCapabilities.stageUnstageVolume) {
-        const string stagingPath = csi::paths::getMountStagingPath(
-            csi::paths::getMountRootDir(
-                slave::paths::getCsiRootDir(workDir),
-                info.storage().plugin().type(),
-                info.storage().plugin().name()),
-            volumeId);
+    CHECK(os::exists(stagingPath));
 
-        CHECK(os::exists(stagingPath));
+    request.set_staging_target_path(stagingPath);
+  }
 
-        request.set_staging_target_path(stagingPath);
-      }
+  CHECK_SOME(nodeContainerId);
 
-      return call<csi::v0::NODE_PUBLISH_VOLUME>(client, std::move(request))
-        .then(defer(self(), [this, volumeId] {
-          VolumeData& volume = volumes.at(volumeId);
+  return call<csi::v0::NODE_PUBLISH_VOLUME>(
+      nodeContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId] {
+      VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::PUBLISHED);
-          checkpointVolumeState(volumeId);
+      volume.state.set_state(VolumeState::PUBLISHED);
+      checkpointVolumeState(volumeId);
 
-          return Nothing();
-        }));
+      return Nothing();
     }));
 }
 
 
 // Transitions the state of the specified volume from `PUBLISHED`,
 // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
+//
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  CHECK_SOME(nodeContainerId);
+  VolumeData& volume = volumes.at(volumeId);
 
-  return getService(nodeContainerId.get())
-    .then(defer(self(), [this, volumeId](csi::v0::Client client) {
-      VolumeData& volume = volumes.at(volumeId);
+  const string targetPath = csi::paths::getMountTargetPath(
+      csi::paths::getMountRootDir(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin().type(),
+          info.storage().plugin().name()),
+      volumeId);
 
-      const string targetPath = csi::paths::getMountTargetPath(
-          csi::paths::getMountRootDir(
-              slave::paths::getCsiRootDir(workDir),
-              info.storage().plugin().type(),
-              info.storage().plugin().name()),
-          volumeId);
-
-      CHECK(os::exists(targetPath));
-
-      // A previously failed `NodePublishVolume` call can be recovered through
-      // the current `NodeUnpublishVolume` call. See:
-      // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
-      if (volume.state.state() == VolumeState::PUBLISHED ||
-          volume.state.state() == VolumeState::NODE_PUBLISH) {
-        volume.state.set_state(VolumeState::NODE_UNPUBLISH);
-        checkpointVolumeState(volumeId);
-      }
+  CHECK(os::exists(targetPath));
 
-      CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state());
+  // A previously failed `NodePublishVolume` call can be recovered through the
+  // current `NodeUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
+  if (volume.state.state() == VolumeState::PUBLISHED ||
+      volume.state.state() == VolumeState::NODE_PUBLISH) {
+    volume.state.set_state(VolumeState::NODE_UNPUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-      csi::v0::NodeUnpublishVolumeRequest request;
-      request.set_volume_id(volumeId);
-      request.set_target_path(targetPath);
+  CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state());
 
-      return call<csi::v0::NODE_UNPUBLISH_VOLUME>(client, std::move(request))
-        .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
-          VolumeData& volume = volumes.at(volumeId);
+  csi::v0::NodeUnpublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_target_path(targetPath);
 
-          volume.state.set_state(VolumeState::VOL_READY);
-          checkpointVolumeState(volumeId);
+  CHECK_SOME(nodeContainerId);
 
-          Try<Nothing> rmdir = os::rmdir(targetPath);
-          if (rmdir.isError()) {
-            return Failure(
-                "Failed to remove mount point '" + targetPath + "': " +
-                rmdir.error());
-          }
+  return call<csi::v0::NODE_UNPUBLISH_VOLUME>(
+      nodeContainerId.get(), std::move(request))
+    .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
+      VolumeData& volume = volumes.at(volumeId);
 
-          return Nothing();
-        }));
+      volume.state.set_state(VolumeState::VOL_READY);
+      checkpointVolumeState(volumeId);
+
+      Try<Nothing> rmdir = os::rmdir(targetPath);
+      if (rmdir.isError()) {
+        return Failure(
+            "Failed to remove mount point '" + targetPath + "': " +
+            rmdir.error());
+      }
+
+      return Nothing();
     }));
 }
 
@@ -2679,43 +2668,37 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
         "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
+  csi::v0::CreateVolumeRequest request;
+  request.set_name(name);
+  request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
+  request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
+  *request.add_volume_capabilities() = profileInfo.capability;
+  *request.mutable_parameters() = profileInfo.parameters;
+
   CHECK_SOME(controllerContainerId);
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      csi::v0::CreateVolumeRequest request;
-      request.set_name(name);
-      request.mutable_capacity_range()
-        ->set_required_bytes(capacity.bytes());
-      request.mutable_capacity_range()
-        ->set_limit_bytes(capacity.bytes());
-      request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
-      *request.mutable_parameters() = profileInfo.parameters;
-
-      return call<csi::v0::CREATE_VOLUME>(client, std::move(request))
-        .then(defer(self(), [=](
-            const csi::v0::CreateVolumeResponse& response) -> string {
-          const csi::v0::Volume& volume = response.volume();
-
-          if (volumes.contains(volume.id())) {
-            // The resource provider failed over after the last `createVolume`
-            // call, but before the operation status was checkpointed.
-            CHECK_EQ(VolumeState::CREATED,
-                     volumes.at(volume.id()).state.state());
-          } else {
-            VolumeState volumeState;
-            volumeState.set_state(VolumeState::CREATED);
-            volumeState.mutable_volume_capability()
-              ->CopyFrom(profileInfo.capability);
-            *volumeState.mutable_parameters() = profileInfo.parameters;
-            *volumeState.mutable_volume_attributes() = volume.attributes();
-
-            volumes.put(volume.id(), std::move(volumeState));
-            checkpointVolumeState(volume.id());
-          }
+  return call<csi::v0::CREATE_VOLUME>(
+      controllerContainerId.get(), std::move(request))
+    .then(defer(self(), [=](
+        const csi::v0::CreateVolumeResponse& response) -> string {
+      const csi::v0::Volume& volume = response.volume();
 
-          return volume.id();
-        }));
+      if (volumes.contains(volume.id())) {
+        // The resource provider failed over after the last `createVolume` call,
+        // but before the operation status was checkpointed.
+        CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state());
+      } else {
+        VolumeState volumeState;
+        volumeState.set_state(VolumeState::CREATED);
+        *volumeState.mutable_volume_capability() = profileInfo.capability;
+        *volumeState.mutable_parameters() = profileInfo.parameters;
+        *volumeState.mutable_volume_attributes() = volume.attributes();
+
+        volumes.put(volume.id(), std::move(volumeState));
+        checkpointVolumeState(volume.id());
+      }
+
+      return volume.id();
     }));
 }
 
@@ -2727,8 +2710,6 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId)
 {
-  CHECK_SOME(controllerContainerId);
-
   const string volumePath = csi::paths::getVolumePath(
       slave::paths::getCsiRootDir(workDir),
       info.storage().plugin().type(),
@@ -2745,10 +2726,10 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
 
   const VolumeData& volume = volumes.at(volumeId);
 
-  Future<Nothing> deleted = Nothing();
-
   CHECK(VolumeState::State_IsValid(volume.state.state()));
 
+  Future<Nothing> deleted = Nothing();
+
   switch (volume.state.state()) {
     case VolumeState::PUBLISHED:
     case VolumeState::NODE_PUBLISH:
@@ -2782,12 +2763,14 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
       // supported. Otherwise, we simply leave it as a preprovisioned volume.
       if (controllerCapabilities.createDeleteVolume) {
         deleted = deleted
-          .then(defer(self(), &Self::getService, controllerContainerId.get()))
-          .then(defer(self(), [this, volumeId](csi::v0::Client client) {
+          .then(defer(self(), [this, volumeId] {
             csi::v0::DeleteVolumeRequest request;
             request.set_volume_id(volumeId);
 
-            return call<csi::v0::DELETE_VOLUME>(client, std::move(request))
+            CHECK_SOME(controllerContainerId);
+
+            return call<csi::v0::DELETE_VOLUME>(
+                controllerContainerId.get(), std::move(request))
               .then([] { return Nothing(); });
           }));
       }
@@ -2856,45 +2839,40 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
         "Plugin capability 'CONTROLLER_SERVICE' is not supported");
   }
 
-  CHECK_SOME(controllerContainerId);
+  google::protobuf::Map<string, string> volumeAttributes;
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      google::protobuf::Map<string, string> volumeAttributes;
+  if (metadata.isSome()) {
+    volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
+  }
 
-      if (metadata.isSome()) {
-        volumeAttributes =
-          CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
-      }
+  csi::v0::ValidateVolumeCapabilitiesRequest request;
+  request.set_volume_id(volumeId);
+  *request.add_volume_capabilities() = profileInfo.capability;
+  *request.mutable_volume_attributes() = volumeAttributes;
 
-      csi::v0::ValidateVolumeCapabilitiesRequest request;
-      request.set_volume_id(volumeId);
-      request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
-      *request.mutable_volume_attributes() = volumeAttributes;
-
-      return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
-          client, std::move(request))
-        .then(defer(self(), [=](
-            const csi::v0::ValidateVolumeCapabilitiesResponse& response)
-            -> Future<Nothing> {
-          if (!response.supported()) {
-            return Failure(
-                "Unsupported volume capability for volume '" + volumeId +
-                "': " + response.message());
-          }
+  CHECK_SOME(controllerContainerId);
 
-          VolumeState volumeState;
-          volumeState.set_state(VolumeState::CREATED);
-          volumeState.mutable_volume_capability()
-            ->CopyFrom(profileInfo.capability);
-          *volumeState.mutable_parameters() = profileInfo.parameters;
-          *volumeState.mutable_volume_attributes() = volumeAttributes;
+  return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
+      controllerContainerId.get(), std::move(request))
+    .then(defer(self(), [=](
+        const csi::v0::ValidateVolumeCapabilitiesResponse& response)
+        -> Future<Nothing> {
+      if (!response.supported()) {
+        return Failure(
+            "Unsupported volume capability for volume '" + volumeId + "': " +
+            response.message());
+      }
 
-          volumes.put(volumeId, std::move(volumeState));
-          checkpointVolumeState(volumeId);
+      VolumeState volumeState;
+      volumeState.set_state(VolumeState::CREATED);
+      *volumeState.mutable_volume_capability() = profileInfo.capability;
+      *volumeState.mutable_parameters() = profileInfo.parameters;
+      *volumeState.mutable_volume_attributes() = volumeAttributes;
 
-          return Nothing();
-        }));
+      volumes.put(volumeId, std::move(volumeState));
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
     }));
 }
 
@@ -2912,41 +2890,39 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 
   CHECK_SOME(controllerContainerId);
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      // TODO(chhsiao): Set the max entries and use a loop to do
-      // multiple `ListVolumes` calls.
-      return call<csi::v0::LIST_VOLUMES>(client, csi::v0::ListVolumesRequest())
-        .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
-          Resources resources;
-
-          // Recover disk profiles from the checkpointed state.
-          hashmap<string, string> volumesToProfiles;
-          foreach (const Resource& resource, totalResources) {
-            if (resource.disk().source().has_id() &&
-                resource.disk().source().has_profile()) {
-              volumesToProfiles.put(
-                  resource.disk().source().id(),
-                  resource.disk().source().profile());
-            }
-          }
+  // TODO(chhsiao): Set the max entries and use a loop to do
+  // multiple `ListVolumes` calls.
+  return call<csi::v0::LIST_VOLUMES>(
+      controllerContainerId.get(), csi::v0::ListVolumesRequest())
+    .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
+      Resources resources;
 
-          foreach (const auto& entry, response.entries()) {
-            resources += createRawDiskResource(
-                info,
-                Bytes(entry.volume().capacity_bytes()),
-                volumesToProfiles.contains(entry.volume().id())
-                  ? volumesToProfiles.at(entry.volume().id())
-                  : Option<string>::none(),
-                vendor,
-                entry.volume().id(),
-                entry.volume().attributes().empty()
-                  ? Option<Labels>::none()
-                  : convertStringMapToLabels(entry.volume().attributes()));
-          }
+      // Recover disk profiles from the checkpointed state.
+      hashmap<string, string> volumesToProfiles;
+      foreach (const Resource& resource, totalResources) {
+        if (resource.disk().source().has_id() &&
+            resource.disk().source().has_profile()) {
+          volumesToProfiles.put(
+              resource.disk().source().id(),
+              resource.disk().source().profile());
+        }
+      }
 
-          return resources;
-        }));
+      foreach (const auto& entry, response.entries()) {
+        resources += createRawDiskResource(
+            info,
+            Bytes(entry.volume().capacity_bytes()),
+            volumesToProfiles.contains(entry.volume().id())
+              ? volumesToProfiles.at(entry.volume().id())
+              : Option<string>::none(),
+            vendor,
+            entry.volume().id(),
+            entry.volume().attributes().empty()
+              ? Option<Labels>::none()
+              : convertStringMapToLabels(entry.volume().attributes()));
+      }
+
+      return resources;
     }));
 }
 
@@ -2964,19 +2940,18 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 
   CHECK_SOME(controllerContainerId);
 
-  return getService(controllerContainerId.get())
-    .then(defer(self(), [=](csi::v0::Client client) {
-      vector<Future<Resources>> futures;
+  vector<Future<Resources>> futures;
 
-      foreachpair (const string& profile,
-                   const DiskProfileAdaptor::ProfileInfo& profileInfo,
-                   profileInfos) {
-        csi::v0::GetCapacityRequest request;
-        request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
-        *request.mutable_parameters() = profileInfo.parameters;
+  foreachpair (const string& profile,
+               const DiskProfileAdaptor::ProfileInfo& profileInfo,
+               profileInfos) {
+    csi::v0::GetCapacityRequest request;
+    *request.add_volume_capabilities() = profileInfo.capability;
+    *request.mutable_parameters() = profileInfo.parameters;
 
-        futures.push_back(call<csi::v0::GET_CAPACITY>(
-            client, std::move(request))
+    futures.push_back(
+        call<csi::v0::GET_CAPACITY>(
+            controllerContainerId.get(), std::move(request))
           .then(defer(self(), [=](
               const csi::v0::GetCapacityResponse& response) -> Resources {
             if (response.available_capacity() == 0) {
@@ -2984,18 +2959,14 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
             }
 
             return createRawDiskResource(
-                info,
-                Bytes(response.available_capacity()),
-                profile,
-                vendor);
+                info, Bytes(response.available_capacity()), profile, vendor);
           })));
-      }
+  }
 
-      return collect(futures)
-        .then([](const vector<Resources>& resources) {
-          return accumulate(resources.begin(), resources.end(), Resources());
-        });
-    }));
+  return collect(futures)
+    .then([](const vector<Resources>& resources) {
+      return accumulate(resources.begin(), resources.end(), Resources());
+    });
 }