You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:15:13 UTC

[mesos] 03/06: Implemented the RPC retry logic for SLRP.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7c7a874121413453294a4f0b4f06a08115b1b202
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 23:00:43 2019 -0800

    Implemented the RPC retry logic for SLRP.
    
    For CSI calls triggered by offer operations, i.e., `CreateVolume` and
    `DeleteVolume`, if the plugin returns retryable errors (`UNAVAILABLE` or
    `DEADLINE_EXCEEDED`), SLRP will now retry indefinitely with a random
    exponential backoff. With this, frameworks will know that the operations
    are terminated with deterministic volume states when getting
    `OPERATION_FAILED`.
    
    Review: https://reviews.apache.org/r/69812
---
 src/csi/client.cpp                         | 292 ++++++++++-------------------
 src/csi/client.hpp                         |  85 ++++-----
 src/resource_provider/storage/provider.cpp | 134 ++++++++++---
 3 files changed, 254 insertions(+), 257 deletions(-)

diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index 61ed410..9e17f5b 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -30,308 +30,222 @@ namespace csi {
 namespace v0 {
 
 template <>
-Future<GetPluginInfoResponse>
-Client::call<GET_PLUGIN_INFO>(
-    GetPluginInfoRequest request)
+Future<Try<GetPluginInfoResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Identity, GetPluginInfo),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<GetPluginInfoResponse, StatusError>& result)
-        -> Future<GetPluginInfoResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Identity, GetPluginInfo),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<GetPluginCapabilitiesResponse>
+Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<GET_PLUGIN_CAPABILITIES>(
     GetPluginCapabilitiesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result)
-        -> Future<GetPluginCapabilitiesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ProbeResponse>
+Future<Try<ProbeResponse, process::grpc::StatusError>>
 Client::call<PROBE>(
     ProbeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Identity, Probe),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ProbeResponse, StatusError>& result)
-        -> Future<ProbeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Identity, Probe),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<CreateVolumeResponse>
+Future<Try<CreateVolumeResponse, process::grpc::StatusError>>
 Client::call<CREATE_VOLUME>(
     CreateVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, CreateVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<CreateVolumeResponse, StatusError>& result)
-        -> Future<CreateVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, CreateVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<DeleteVolumeResponse>
+Future<Try<DeleteVolumeResponse, process::grpc::StatusError>>
 Client::call<DELETE_VOLUME>(
     DeleteVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, DeleteVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<DeleteVolumeResponse, StatusError>& result)
-        -> Future<DeleteVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, DeleteVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ControllerPublishVolumeResponse>
+Future<Try<ControllerPublishVolumeResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_PUBLISH_VOLUME>(
     ControllerPublishVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result)
-        -> Future<ControllerPublishVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ControllerUnpublishVolumeResponse>
+Future<Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
     ControllerUnpublishVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result)
-        -> Future<ControllerUnpublishVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ValidateVolumeCapabilitiesResponse>
+Future<Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<VALIDATE_VOLUME_CAPABILITIES>(
     ValidateVolumeCapabilitiesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result)
-        -> Future<ValidateVolumeCapabilitiesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ListVolumesResponse>
+Future<Try<ListVolumesResponse, process::grpc::StatusError>>
 Client::call<LIST_VOLUMES>(
     ListVolumesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ListVolumes),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ListVolumesResponse, StatusError>& result)
-        -> Future<ListVolumesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ListVolumes),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<GetCapacityResponse>
+Future<Try<GetCapacityResponse, process::grpc::StatusError>>
 Client::call<GET_CAPACITY>(
     GetCapacityRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, GetCapacity),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<GetCapacityResponse, StatusError>& result)
-        -> Future<GetCapacityResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, GetCapacity),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<ControllerGetCapabilitiesResponse>
+Future<Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_GET_CAPABILITIES>(
     ControllerGetCapabilitiesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result)
-        -> Future<ControllerGetCapabilitiesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeStageVolumeResponse>
+Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>>
 Client::call<NODE_STAGE_VOLUME>(
     NodeStageVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeStageVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeStageVolumeResponse, StatusError>& result)
-        -> Future<NodeStageVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeStageVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeUnstageVolumeResponse>
+Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>>
 Client::call<NODE_UNSTAGE_VOLUME>(
     NodeUnstageVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeUnstageVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result)
-        -> Future<NodeUnstageVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeUnstageVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodePublishVolumeResponse>
+Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>>
 Client::call<NODE_PUBLISH_VOLUME>(
     NodePublishVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodePublishVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodePublishVolumeResponse, StatusError>& result)
-        -> Future<NodePublishVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodePublishVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeUnpublishVolumeResponse>
+Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>>
 Client::call<NODE_UNPUBLISH_VOLUME>(
     NodeUnpublishVolumeRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result)
-        -> Future<NodeUnpublishVolumeResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeGetIdResponse>
+Future<Try<NodeGetIdResponse, process::grpc::StatusError>>
 Client::call<NODE_GET_ID>(
     NodeGetIdRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeGetId),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeGetIdResponse, StatusError>& result)
-        -> Future<NodeGetIdResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeGetId),
+      std::move(request),
+      CallOptions());
 }
 
 
 template <>
-Future<NodeGetCapabilitiesResponse>
+Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<NODE_GET_CAPABILITIES>(
     NodeGetCapabilitiesRequest request)
 {
-  return runtime
-    .call(
-        connection,
-        GRPC_CLIENT_METHOD(Node, NodeGetCapabilities),
-        std::move(request),
-        CallOptions())
-    .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result)
-        -> Future<NodeGetCapabilitiesResponse> {
-      return result;
-    });
+  return runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(Node, NodeGetCapabilities),
+      std::move(request),
+      CallOptions());
 }
 
 } // namespace v0 {
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 5d40d54..a0dfedf 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -35,8 +35,9 @@ public:
     : connection(_connection), runtime(_runtime) {}
 
   template <RPC rpc>
-  process::Future<typename RPCTraits<rpc>::response_type> call(
-      typename RPCTraits<rpc>::request_type request);
+  process::Future<
+      Try<typename RPCTraits<rpc>::response_type, process::grpc::StatusError>>
+  call(typename RPCTraits<rpc>::request_type request);
 
 private:
   process::grpc::client::Connection connection;
@@ -45,105 +46,95 @@ private:
 
 
 template <>
-process::Future<GetPluginInfoResponse>
-Client::call<GET_PLUGIN_INFO>(
-    GetPluginInfoRequest request);
+process::Future<Try<GetPluginInfoResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request);
 
 
 template <>
-process::Future<GetPluginCapabilitiesResponse>
-Client::call<GET_PLUGIN_CAPABILITIES>(
-    GetPluginCapabilitiesRequest request);
+process::Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_CAPABILITIES>(GetPluginCapabilitiesRequest request);
 
 
 template <>
-process::Future<ProbeResponse>
-Client::call<PROBE>(
-    ProbeRequest request);
+process::Future<Try<ProbeResponse, process::grpc::StatusError>>
+Client::call<PROBE>(ProbeRequest request);
 
 
 template <>
-process::Future<CreateVolumeResponse>
-Client::call<CREATE_VOLUME>(
-    CreateVolumeRequest request);
+process::Future<Try<CreateVolumeResponse, process::grpc::StatusError>>
+Client::call<CREATE_VOLUME>(CreateVolumeRequest request);
 
 
 template <>
-process::Future<DeleteVolumeResponse>
-Client::call<DELETE_VOLUME>(
-    DeleteVolumeRequest request);
+process::Future<Try<DeleteVolumeResponse, process::grpc::StatusError>>
+Client::call<DELETE_VOLUME>(DeleteVolumeRequest request);
 
 
 template <>
-process::Future<ControllerPublishVolumeResponse>
-Client::call<CONTROLLER_PUBLISH_VOLUME>(
-    ControllerPublishVolumeRequest request);
+process::Future<
+    Try<ControllerPublishVolumeResponse, process::grpc::StatusError>>
+Client::call<CONTROLLER_PUBLISH_VOLUME>(ControllerPublishVolumeRequest request);
 
 
 template <>
-process::Future<ControllerUnpublishVolumeResponse>
+process::Future<
+    Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
     ControllerUnpublishVolumeRequest request);
 
 
 template <>
-process::Future<ValidateVolumeCapabilitiesResponse>
+process::Future<
+    Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<VALIDATE_VOLUME_CAPABILITIES>(
     ValidateVolumeCapabilitiesRequest request);
 
 
 template <>
-process::Future<ListVolumesResponse>
-Client::call<LIST_VOLUMES>(
-    ListVolumesRequest request);
+process::Future<Try<ListVolumesResponse, process::grpc::StatusError>>
+Client::call<LIST_VOLUMES>(ListVolumesRequest request);
 
 
 template <>
-process::Future<GetCapacityResponse>
-Client::call<GET_CAPACITY>(
-    GetCapacityRequest request);
+process::Future<Try<GetCapacityResponse, process::grpc::StatusError>>
+Client::call<GET_CAPACITY>(GetCapacityRequest request);
 
 
 template <>
-process::Future<ControllerGetCapabilitiesResponse>
+process::Future<
+    Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>>
 Client::call<CONTROLLER_GET_CAPABILITIES>(
     ControllerGetCapabilitiesRequest request);
 
 
 template <>
-process::Future<NodeStageVolumeResponse>
-Client::call<NODE_STAGE_VOLUME>(
-    NodeStageVolumeRequest request);
+process::Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_STAGE_VOLUME>(NodeStageVolumeRequest request);
 
 
 template <>
-process::Future<NodeUnstageVolumeResponse>
-Client::call<NODE_UNSTAGE_VOLUME>(
-    NodeUnstageVolumeRequest request);
+process::Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_UNSTAGE_VOLUME>(NodeUnstageVolumeRequest request);
 
 
 template <>
-process::Future<NodePublishVolumeResponse>
-Client::call<NODE_PUBLISH_VOLUME>(
-    NodePublishVolumeRequest request);
+process::Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_PUBLISH_VOLUME>(NodePublishVolumeRequest request);
 
 
 template <>
-process::Future<NodeUnpublishVolumeResponse>
-Client::call<NODE_UNPUBLISH_VOLUME>(
-    NodeUnpublishVolumeRequest request);
+process::Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_UNPUBLISH_VOLUME>(NodeUnpublishVolumeRequest request);
 
 
 template <>
-process::Future<NodeGetIdResponse>
-Client::call<NODE_GET_ID>(
-    NodeGetIdRequest request);
+process::Future<Try<NodeGetIdResponse, process::grpc::StatusError>>
+Client::call<NODE_GET_ID>(NodeGetIdRequest request);
 
 
 template <>
-process::Future<NodeGetCapabilitiesResponse>
-Client::call<NODE_GET_CAPABILITIES>(
-    NodeGetCapabilitiesRequest request);
+process::Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>>
+Client::call<NODE_GET_CAPABILITIES>(NodeGetCapabilitiesRequest request);
 
 } // namespace v0 {
 } // namespace csi {
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f9f9312..1abb9c8 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -18,6 +18,7 @@
 
 #include <algorithm>
 #include <cctype>
+#include <cstdlib>
 #include <memory>
 #include <numeric>
 #include <utility>
@@ -48,6 +49,7 @@
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
@@ -55,6 +57,7 @@
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
 
 #include <stout/os/realpath.hpp>
 
@@ -109,6 +112,8 @@ using process::Sequence;
 using process::spawn;
 using process::Timeout;
 
+using process::grpc::StatusError;
+
 using process::http::authentication::Principal;
 
 using process::metrics::Counter;
@@ -130,6 +135,23 @@ using mesos::v1::resource_provider::Driver;
 namespace mesos {
 namespace internal {
 
+// Timeout for a CSI plugin component to create its endpoint socket.
+//
+// TODO(chhsiao): Make the timeout configurable.
+constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
+
+// Storage local resource provider initially picks a random amount of time
+// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
+// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
+// retries are exponentially backed off based on this interval (e.g., 2nd retry
+// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
+// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
+//
+// TODO(chhsiao): Make the retry parameters configurable.
+constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
+constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
+
+
 // Returns true if the string is a valid Java identifier.
 static bool isValidName(const string& s)
 {
@@ -164,11 +186,6 @@ static bool isValidType(const string& s)
 }
 
 
-// Timeout for a CSI plugin component to create its endpoint socket.
-// TODO(chhsiao): Make the timeout configurable.
-static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
-
-
 // Returns the container ID of the standalone container to run a CSI plugin
 // component. The container ID is of the following format:
 //     <cid_prefix><csi_type>-<csi_name>--<list_of_services>
@@ -408,10 +425,12 @@ private:
       typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
   Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
       const ContainerID& containerId,
-      const typename csi::v0::RPCTraits<rpc>::request_type& request);
+      const typename csi::v0::RPCTraits<rpc>::request_type& request,
+      bool retry = false);
 
   template <csi::v0::RPC rpc>
-  Future<typename csi::v0::RPCTraits<rpc>::response_type> _call(
+  Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
+  _call(
       csi::v0::Client client,
       const typename csi::v0::RPCTraits<rpc>::request_type& request);
 
@@ -1873,32 +1892,98 @@ template <
 Future<typename csi::v0::RPCTraits<rpc>::response_type>
 StorageLocalResourceProviderProcess::call(
     const ContainerID& containerId,
-    const typename csi::v0::RPCTraits<rpc>::request_type& request)
+    const typename csi::v0::RPCTraits<rpc>::request_type& request,
+    bool retry)
 {
-  // Get the latest service future before making the call.
-  return getService(containerId)
-    .then(defer(self(), &Self::_call<rpc>, lambda::_1, request));
+  using Response = typename csi::v0::RPCTraits<rpc>::response_type;
+
+  Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+  return loop(
+      self(),
+      [=] {
+        // Perform the call with the latest service future.
+        return getService(containerId)
+          .then(defer(
+              self(),
+              &StorageLocalResourceProviderProcess::_call<rpc>,
+              lambda::_1,
+              request));
+      },
+      [=](const Try<Response, StatusError>& result) mutable
+          -> Future<ControlFlow<Response>> {
+        if (result.isSome()) {
+          return Break(result.get());
+        }
+
+        if (retry) {
+          // See the link below for retryable status codes:
+          // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
+          switch (result.error().status.error_code()) {
+            case grpc::DEADLINE_EXCEEDED:
+            case grpc::UNAVAILABLE: {
+              Duration delay =
+                maxBackoff * (static_cast<double>(os::random()) / RAND_MAX);
+
+              maxBackoff =
+                std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+              LOG(ERROR)
+                << "Received '" << result.error() << "' while calling " << rpc
+                << ". Retrying in " << delay;
+
+              return after(delay)
+                .then([]() -> Future<ControlFlow<Response>> {
+                  return Continue();
+                });
+            }
+            case grpc::CANCELLED:
+            case grpc::UNKNOWN:
+            case grpc::INVALID_ARGUMENT:
+            case grpc::NOT_FOUND:
+            case grpc::ALREADY_EXISTS:
+            case grpc::PERMISSION_DENIED:
+            case grpc::UNAUTHENTICATED:
+            case grpc::RESOURCE_EXHAUSTED:
+            case grpc::FAILED_PRECONDITION:
+            case grpc::ABORTED:
+            case grpc::OUT_OF_RANGE:
+            case grpc::UNIMPLEMENTED:
+            case grpc::INTERNAL:
+            case grpc::DATA_LOSS: {
+              break;
+            }
+            case grpc::OK:
+            case grpc::DO_NOT_USE: {
+              UNREACHABLE();
+            }
+          }
+        }
+
+        return Failure(result.error());
+      });
 }
 
 
 template <csi::v0::RPC rpc>
-Future<typename csi::v0::RPCTraits<rpc>::response_type>
+Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
 StorageLocalResourceProviderProcess::_call(
     csi::v0::Client client,
     const typename csi::v0::RPCTraits<rpc>::request_type& request)
 {
+  using Response = typename csi::v0::RPCTraits<rpc>::response_type;
+
   ++metrics.csi_plugin_rpcs_pending.at(rpc);
 
   return client.call<rpc>(request)
-    .onAny(defer(self(), [=](
-        const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) {
+    .onAny(defer(self(), [=](const Future<Try<Response, StatusError>>& future) {
       --metrics.csi_plugin_rpcs_pending.at(rpc);
-      if (future.isReady()) {
+      if (future.isReady() && future->isSome()) {
         ++metrics.csi_plugin_rpcs_successes.at(rpc);
-      } else if (future.isFailed()) {
-        ++metrics.csi_plugin_rpcs_errors.at(rpc);
-      } else {
+      } else if (future.isDiscarded()) {
         ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
+      } else {
+        ++metrics.csi_plugin_rpcs_errors.at(rpc);
       }
     }));
 }
@@ -1939,7 +2024,14 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
   return service
     .then(defer(self(), [=](csi::v0::Client client) {
       return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
-        .then([=]() -> csi::v0::Client { return client; });
+        .then([=](const Try<csi::v0::ProbeResponse, StatusError>& result)
+            -> Future<csi::v0::Client> {
+          if (result.isError()) {
+            return Failure(result.error());
+          }
+
+          return client;
+        });
     }));
 }
 
@@ -2678,7 +2770,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
   CHECK_SOME(controllerContainerId);
 
   return call<csi::v0::CREATE_VOLUME>(
-      controllerContainerId.get(), std::move(request))
+      controllerContainerId.get(), std::move(request), true) // Retry.
     .then(defer(self(), [=](
         const csi::v0::CreateVolumeResponse& response) -> string {
       const csi::v0::Volume& volume = response.volume();
@@ -2770,7 +2862,7 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
             CHECK_SOME(controllerContainerId);
 
             return call<csi::v0::DELETE_VOLUME>(
-                controllerContainerId.get(), std::move(request))
+                controllerContainerId.get(), std::move(request), true) // Retry.
               .then([] { return Nothing(); });
           }));
       }