You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:14:34 UTC
[mesos] 02/08: Implemented the RPC retry logic for SLRP.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 9fb42b0fa8c79e432089ef56099a03765e65fb38
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 23:00:43 2019 -0800
Implemented the RPC retry logic for SLRP.
For CSI calls triggered by offer operations, i.e., `CreateVolume` and
`DeleteVolume`, if the plugin returns retryable errors (`UNAVAILABLE` or
`DEADLINE_EXCEEDED`), SLRP will now retry indefinitely with a random
exponential backoff. With this, frameworks will know that the operations
are terminated with deterministic volume states when getting
`OPERATION_FAILED`.
Review: https://reviews.apache.org/r/69812
---
src/csi/client.cpp | 292 ++++++++++-------------------
src/csi/client.hpp | 85 ++++-----
src/resource_provider/storage/provider.cpp | 134 ++++++++++---
3 files changed, 254 insertions(+), 257 deletions(-)
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index 61ed410..9e17f5b 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -30,308 +30,222 @@ namespace csi {
namespace v0 {
template <>
-Future<GetPluginInfoResponse>
-Client::call<GET_PLUGIN_INFO>(
- GetPluginInfoRequest request)
+Future<Try<GetPluginInfoResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Identity, GetPluginInfo),
- std::move(request),
- CallOptions())
- .then([](const Try<GetPluginInfoResponse, StatusError>& result)
- -> Future<GetPluginInfoResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Identity, GetPluginInfo),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<GetPluginCapabilitiesResponse>
+Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>>
Client::call<GET_PLUGIN_CAPABILITIES>(
GetPluginCapabilitiesRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
- std::move(request),
- CallOptions())
- .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result)
- -> Future<GetPluginCapabilitiesResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<ProbeResponse>
+Future<Try<ProbeResponse, process::grpc::StatusError>>
Client::call<PROBE>(
ProbeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Identity, Probe),
- std::move(request),
- CallOptions())
- .then([](const Try<ProbeResponse, StatusError>& result)
- -> Future<ProbeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Identity, Probe),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<CreateVolumeResponse>
+Future<Try<CreateVolumeResponse, process::grpc::StatusError>>
Client::call<CREATE_VOLUME>(
CreateVolumeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Controller, CreateVolume),
- std::move(request),
- CallOptions())
- .then([](const Try<CreateVolumeResponse, StatusError>& result)
- -> Future<CreateVolumeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, CreateVolume),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<DeleteVolumeResponse>
+Future<Try<DeleteVolumeResponse, process::grpc::StatusError>>
Client::call<DELETE_VOLUME>(
DeleteVolumeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Controller, DeleteVolume),
- std::move(request),
- CallOptions())
- .then([](const Try<DeleteVolumeResponse, StatusError>& result)
- -> Future<DeleteVolumeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, DeleteVolume),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<ControllerPublishVolumeResponse>
+Future<Try<ControllerPublishVolumeResponse, process::grpc::StatusError>>
Client::call<CONTROLLER_PUBLISH_VOLUME>(
ControllerPublishVolumeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
- std::move(request),
- CallOptions())
- .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result)
- -> Future<ControllerPublishVolumeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<ControllerUnpublishVolumeResponse>
+Future<Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>>
Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
ControllerUnpublishVolumeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
- std::move(request),
- CallOptions())
- .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result)
- -> Future<ControllerUnpublishVolumeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<ValidateVolumeCapabilitiesResponse>
+Future<Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>>
Client::call<VALIDATE_VOLUME_CAPABILITIES>(
ValidateVolumeCapabilitiesRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
- std::move(request),
- CallOptions())
- .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result)
- -> Future<ValidateVolumeCapabilitiesResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<ListVolumesResponse>
+Future<Try<ListVolumesResponse, process::grpc::StatusError>>
Client::call<LIST_VOLUMES>(
ListVolumesRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Controller, ListVolumes),
- std::move(request),
- CallOptions())
- .then([](const Try<ListVolumesResponse, StatusError>& result)
- -> Future<ListVolumesResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ListVolumes),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<GetCapacityResponse>
+Future<Try<GetCapacityResponse, process::grpc::StatusError>>
Client::call<GET_CAPACITY>(
GetCapacityRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Controller, GetCapacity),
- std::move(request),
- CallOptions())
- .then([](const Try<GetCapacityResponse, StatusError>& result)
- -> Future<GetCapacityResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, GetCapacity),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<ControllerGetCapabilitiesResponse>
+Future<Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>>
Client::call<CONTROLLER_GET_CAPABILITIES>(
ControllerGetCapabilitiesRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
- std::move(request),
- CallOptions())
- .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result)
- -> Future<ControllerGetCapabilitiesResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<NodeStageVolumeResponse>
+Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>>
Client::call<NODE_STAGE_VOLUME>(
NodeStageVolumeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Node, NodeStageVolume),
- std::move(request),
- CallOptions())
- .then([](const Try<NodeStageVolumeResponse, StatusError>& result)
- -> Future<NodeStageVolumeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Node, NodeStageVolume),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<NodeUnstageVolumeResponse>
+Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>>
Client::call<NODE_UNSTAGE_VOLUME>(
NodeUnstageVolumeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Node, NodeUnstageVolume),
- std::move(request),
- CallOptions())
- .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result)
- -> Future<NodeUnstageVolumeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Node, NodeUnstageVolume),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<NodePublishVolumeResponse>
+Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>>
Client::call<NODE_PUBLISH_VOLUME>(
NodePublishVolumeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Node, NodePublishVolume),
- std::move(request),
- CallOptions())
- .then([](const Try<NodePublishVolumeResponse, StatusError>& result)
- -> Future<NodePublishVolumeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Node, NodePublishVolume),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<NodeUnpublishVolumeResponse>
+Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>>
Client::call<NODE_UNPUBLISH_VOLUME>(
NodeUnpublishVolumeRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume),
- std::move(request),
- CallOptions())
- .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result)
- -> Future<NodeUnpublishVolumeResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<NodeGetIdResponse>
+Future<Try<NodeGetIdResponse, process::grpc::StatusError>>
Client::call<NODE_GET_ID>(
NodeGetIdRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Node, NodeGetId),
- std::move(request),
- CallOptions())
- .then([](const Try<NodeGetIdResponse, StatusError>& result)
- -> Future<NodeGetIdResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Node, NodeGetId),
+ std::move(request),
+ CallOptions());
}
template <>
-Future<NodeGetCapabilitiesResponse>
+Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>>
Client::call<NODE_GET_CAPABILITIES>(
NodeGetCapabilitiesRequest request)
{
- return runtime
- .call(
- connection,
- GRPC_CLIENT_METHOD(Node, NodeGetCapabilities),
- std::move(request),
- CallOptions())
- .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result)
- -> Future<NodeGetCapabilitiesResponse> {
- return result;
- });
+ return runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(Node, NodeGetCapabilities),
+ std::move(request),
+ CallOptions());
}
} // namespace v0 {
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 5d40d54..a0dfedf 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -35,8 +35,9 @@ public:
: connection(_connection), runtime(_runtime) {}
template <RPC rpc>
- process::Future<typename RPCTraits<rpc>::response_type> call(
- typename RPCTraits<rpc>::request_type request);
+ process::Future<
+ Try<typename RPCTraits<rpc>::response_type, process::grpc::StatusError>>
+ call(typename RPCTraits<rpc>::request_type request);
private:
process::grpc::client::Connection connection;
@@ -45,105 +46,95 @@ private:
template <>
-process::Future<GetPluginInfoResponse>
-Client::call<GET_PLUGIN_INFO>(
- GetPluginInfoRequest request);
+process::Future<Try<GetPluginInfoResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request);
template <>
-process::Future<GetPluginCapabilitiesResponse>
-Client::call<GET_PLUGIN_CAPABILITIES>(
- GetPluginCapabilitiesRequest request);
+process::Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>>
+Client::call<GET_PLUGIN_CAPABILITIES>(GetPluginCapabilitiesRequest request);
template <>
-process::Future<ProbeResponse>
-Client::call<PROBE>(
- ProbeRequest request);
+process::Future<Try<ProbeResponse, process::grpc::StatusError>>
+Client::call<PROBE>(ProbeRequest request);
template <>
-process::Future<CreateVolumeResponse>
-Client::call<CREATE_VOLUME>(
- CreateVolumeRequest request);
+process::Future<Try<CreateVolumeResponse, process::grpc::StatusError>>
+Client::call<CREATE_VOLUME>(CreateVolumeRequest request);
template <>
-process::Future<DeleteVolumeResponse>
-Client::call<DELETE_VOLUME>(
- DeleteVolumeRequest request);
+process::Future<Try<DeleteVolumeResponse, process::grpc::StatusError>>
+Client::call<DELETE_VOLUME>(DeleteVolumeRequest request);
template <>
-process::Future<ControllerPublishVolumeResponse>
-Client::call<CONTROLLER_PUBLISH_VOLUME>(
- ControllerPublishVolumeRequest request);
+process::Future<
+ Try<ControllerPublishVolumeResponse, process::grpc::StatusError>>
+Client::call<CONTROLLER_PUBLISH_VOLUME>(ControllerPublishVolumeRequest request);
template <>
-process::Future<ControllerUnpublishVolumeResponse>
+process::Future<
+ Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>>
Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
ControllerUnpublishVolumeRequest request);
template <>
-process::Future<ValidateVolumeCapabilitiesResponse>
+process::Future<
+ Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>>
Client::call<VALIDATE_VOLUME_CAPABILITIES>(
ValidateVolumeCapabilitiesRequest request);
template <>
-process::Future<ListVolumesResponse>
-Client::call<LIST_VOLUMES>(
- ListVolumesRequest request);
+process::Future<Try<ListVolumesResponse, process::grpc::StatusError>>
+Client::call<LIST_VOLUMES>(ListVolumesRequest request);
template <>
-process::Future<GetCapacityResponse>
-Client::call<GET_CAPACITY>(
- GetCapacityRequest request);
+process::Future<Try<GetCapacityResponse, process::grpc::StatusError>>
+Client::call<GET_CAPACITY>(GetCapacityRequest request);
template <>
-process::Future<ControllerGetCapabilitiesResponse>
+process::Future<
+ Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>>
Client::call<CONTROLLER_GET_CAPABILITIES>(
ControllerGetCapabilitiesRequest request);
template <>
-process::Future<NodeStageVolumeResponse>
-Client::call<NODE_STAGE_VOLUME>(
- NodeStageVolumeRequest request);
+process::Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_STAGE_VOLUME>(NodeStageVolumeRequest request);
template <>
-process::Future<NodeUnstageVolumeResponse>
-Client::call<NODE_UNSTAGE_VOLUME>(
- NodeUnstageVolumeRequest request);
+process::Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_UNSTAGE_VOLUME>(NodeUnstageVolumeRequest request);
template <>
-process::Future<NodePublishVolumeResponse>
-Client::call<NODE_PUBLISH_VOLUME>(
- NodePublishVolumeRequest request);
+process::Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_PUBLISH_VOLUME>(NodePublishVolumeRequest request);
template <>
-process::Future<NodeUnpublishVolumeResponse>
-Client::call<NODE_UNPUBLISH_VOLUME>(
- NodeUnpublishVolumeRequest request);
+process::Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>>
+Client::call<NODE_UNPUBLISH_VOLUME>(NodeUnpublishVolumeRequest request);
template <>
-process::Future<NodeGetIdResponse>
-Client::call<NODE_GET_ID>(
- NodeGetIdRequest request);
+process::Future<Try<NodeGetIdResponse, process::grpc::StatusError>>
+Client::call<NODE_GET_ID>(NodeGetIdRequest request);
template <>
-process::Future<NodeGetCapabilitiesResponse>
-Client::call<NODE_GET_CAPABILITIES>(
- NodeGetCapabilitiesRequest request);
+process::Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>>
+Client::call<NODE_GET_CAPABILITIES>(NodeGetCapabilitiesRequest request);
} // namespace v0 {
} // namespace csi {
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 811b87e..22ad0c8 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -18,6 +18,7 @@
#include <algorithm>
#include <cctype>
+#include <cstdlib>
#include <memory>
#include <numeric>
#include <utility>
@@ -48,6 +49,7 @@
#include <mesos/v1/resource_provider.hpp>
+#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
@@ -55,6 +57,7 @@
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
#include <stout/os/realpath.hpp>
@@ -109,6 +112,8 @@ using process::Sequence;
using process::spawn;
using process::Timeout;
+using process::grpc::StatusError;
+
using process::http::authentication::Principal;
using process::metrics::Counter;
@@ -130,6 +135,23 @@ using mesos::v1::resource_provider::Driver;
namespace mesos {
namespace internal {
+// Timeout for a CSI plugin component to create its endpoint socket.
+//
+// TODO(chhsiao): Make the timeout configurable.
+constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
+
+// Storage local resource provider initially picks a random amount of time
+// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
+// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
+// retries are exponentially backed off based on this interval (e.g., 2nd retry
+// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
+// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
+//
+// TODO(chhsiao): Make the retry parameters configurable.
+constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
+constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
+
+
// Returns true if the string is a valid Java identifier.
static bool isValidName(const string& s)
{
@@ -164,11 +186,6 @@ static bool isValidType(const string& s)
}
-// Timeout for a CSI plugin component to create its endpoint socket.
-// TODO(chhsiao): Make the timeout configurable.
-static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
-
-
// Returns the container ID of the standalone container to run a CSI plugin
// component. The container ID is of the following format:
// <cid_prefix><csi_type>-<csi_name>--<list_of_services>
@@ -408,10 +425,12 @@ private:
typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
const ContainerID& containerId,
- const typename csi::v0::RPCTraits<rpc>::request_type& request);
+ const typename csi::v0::RPCTraits<rpc>::request_type& request,
+ bool retry = false);
template <csi::v0::RPC rpc>
- Future<typename csi::v0::RPCTraits<rpc>::response_type> _call(
+ Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
+ _call(
csi::v0::Client client,
const typename csi::v0::RPCTraits<rpc>::request_type& request);
@@ -1877,32 +1896,98 @@ template <
Future<typename csi::v0::RPCTraits<rpc>::response_type>
StorageLocalResourceProviderProcess::call(
const ContainerID& containerId,
- const typename csi::v0::RPCTraits<rpc>::request_type& request)
+ const typename csi::v0::RPCTraits<rpc>::request_type& request,
+ bool retry)
{
- // Get the latest service future before making the call.
- return getService(containerId)
- .then(defer(self(), &Self::_call<rpc>, lambda::_1, request));
+ using Response = typename csi::v0::RPCTraits<rpc>::response_type;
+
+ Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+ return loop(
+ self(),
+ [=] {
+ // Perform the call with the latest service future.
+ return getService(containerId)
+ .then(defer(
+ self(),
+ &StorageLocalResourceProviderProcess::_call<rpc>,
+ lambda::_1,
+ request));
+ },
+ [=](const Try<Response, StatusError>& result) mutable
+ -> Future<ControlFlow<Response>> {
+ if (result.isSome()) {
+ return Break(result.get());
+ }
+
+ if (retry) {
+ // See the link below for retryable status codes:
+ // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
+ switch (result.error().status.error_code()) {
+ case grpc::DEADLINE_EXCEEDED:
+ case grpc::UNAVAILABLE: {
+ Duration delay =
+ maxBackoff * (static_cast<double>(os::random()) / RAND_MAX);
+
+ maxBackoff =
+ std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+ LOG(ERROR)
+ << "Received '" << result.error() << "' while calling " << rpc
+ << ". Retrying in " << delay;
+
+ return after(delay)
+ .then([]() -> Future<ControlFlow<Response>> {
+ return Continue();
+ });
+ }
+ case grpc::CANCELLED:
+ case grpc::UNKNOWN:
+ case grpc::INVALID_ARGUMENT:
+ case grpc::NOT_FOUND:
+ case grpc::ALREADY_EXISTS:
+ case grpc::PERMISSION_DENIED:
+ case grpc::UNAUTHENTICATED:
+ case grpc::RESOURCE_EXHAUSTED:
+ case grpc::FAILED_PRECONDITION:
+ case grpc::ABORTED:
+ case grpc::OUT_OF_RANGE:
+ case grpc::UNIMPLEMENTED:
+ case grpc::INTERNAL:
+ case grpc::DATA_LOSS: {
+ break;
+ }
+ case grpc::OK:
+ case grpc::DO_NOT_USE: {
+ UNREACHABLE();
+ }
+ }
+ }
+
+ return Failure(result.error());
+ });
}
template <csi::v0::RPC rpc>
-Future<typename csi::v0::RPCTraits<rpc>::response_type>
+Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>>
StorageLocalResourceProviderProcess::_call(
csi::v0::Client client,
const typename csi::v0::RPCTraits<rpc>::request_type& request)
{
+ using Response = typename csi::v0::RPCTraits<rpc>::response_type;
+
++metrics.csi_plugin_rpcs_pending.at(rpc);
return client.call<rpc>(request)
- .onAny(defer(self(), [=](
- const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) {
+ .onAny(defer(self(), [=](const Future<Try<Response, StatusError>>& future) {
--metrics.csi_plugin_rpcs_pending.at(rpc);
- if (future.isReady()) {
+ if (future.isReady() && future->isSome()) {
++metrics.csi_plugin_rpcs_successes.at(rpc);
- } else if (future.isFailed()) {
- ++metrics.csi_plugin_rpcs_errors.at(rpc);
- } else {
+ } else if (future.isDiscarded()) {
++metrics.csi_plugin_rpcs_cancelled.at(rpc);
+ } else {
+ ++metrics.csi_plugin_rpcs_errors.at(rpc);
}
}));
}
@@ -1943,7 +2028,14 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
return service
.then(defer(self(), [=](csi::v0::Client client) {
return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
- .then([=]() -> csi::v0::Client { return client; });
+ .then([=](const Try<csi::v0::ProbeResponse, StatusError>& result)
+ -> Future<csi::v0::Client> {
+ if (result.isError()) {
+ return Failure(result.error());
+ }
+
+ return client;
+ });
}));
}
@@ -2682,7 +2774,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
CHECK_SOME(controllerContainerId);
return call<csi::v0::CREATE_VOLUME>(
- controllerContainerId.get(), std::move(request))
+ controllerContainerId.get(), std::move(request), true) // Retry.
.then(defer(self(), [=](
const csi::v0::CreateVolumeResponse& response) -> string {
const csi::v0::Volume& volume = response.volume();
@@ -2774,7 +2866,7 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
CHECK_SOME(controllerContainerId);
return call<csi::v0::DELETE_VOLUME>(
- controllerContainerId.get(), std::move(request))
+ controllerContainerId.get(), std::move(request), true) // Retry.
.then([] { return Nothing(); });
}));
}