You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/07/17 20:57:50 UTC
[1/2] mesos git commit: Passed the default options when making gRPC
calls.
Repository: mesos
Updated Branches:
refs/heads/master abf11a951 -> 841f652b2
Passed the default options when making gRPC calls.
Review: https://reviews.apache.org/r/67947
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/841f652b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/841f652b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/841f652b
Branch: refs/heads/master
Commit: 841f652b2db90156c5cf1a9389af7b4ea862694b
Parents: a211b4c
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Jul 17 13:15:40 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue Jul 17 13:50:59 2018 -0700
----------------------------------------------------------------------
src/csi/client.cpp | 53 +++++++++++++++++++++++++++++++++----------------
1 file changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/841f652b/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index a36e622..61ed410 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -23,6 +23,8 @@ using process::Future;
using process::grpc::StatusError;
+using process::grpc::client::CallOptions;
+
namespace mesos {
namespace csi {
namespace v0 {
@@ -36,7 +38,8 @@ Client::call<GET_PLUGIN_INFO>(
.call(
connection,
GRPC_CLIENT_METHOD(Identity, GetPluginInfo),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<GetPluginInfoResponse, StatusError>& result)
-> Future<GetPluginInfoResponse> {
return result;
@@ -53,7 +56,8 @@ Client::call<GET_PLUGIN_CAPABILITIES>(
.call(
connection,
GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result)
-> Future<GetPluginCapabilitiesResponse> {
return result;
@@ -70,7 +74,8 @@ Client::call<PROBE>(
.call(
connection,
GRPC_CLIENT_METHOD(Identity, Probe),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<ProbeResponse, StatusError>& result)
-> Future<ProbeResponse> {
return result;
@@ -87,7 +92,8 @@ Client::call<CREATE_VOLUME>(
.call(
connection,
GRPC_CLIENT_METHOD(Controller, CreateVolume),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<CreateVolumeResponse, StatusError>& result)
-> Future<CreateVolumeResponse> {
return result;
@@ -104,7 +110,8 @@ Client::call<DELETE_VOLUME>(
.call(
connection,
GRPC_CLIENT_METHOD(Controller, DeleteVolume),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<DeleteVolumeResponse, StatusError>& result)
-> Future<DeleteVolumeResponse> {
return result;
@@ -121,7 +128,8 @@ Client::call<CONTROLLER_PUBLISH_VOLUME>(
.call(
connection,
GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<ControllerPublishVolumeResponse, StatusError>& result)
-> Future<ControllerPublishVolumeResponse> {
return result;
@@ -138,7 +146,8 @@ Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
.call(
connection,
GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result)
-> Future<ControllerUnpublishVolumeResponse> {
return result;
@@ -155,7 +164,8 @@ Client::call<VALIDATE_VOLUME_CAPABILITIES>(
.call(
connection,
GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result)
-> Future<ValidateVolumeCapabilitiesResponse> {
return result;
@@ -172,7 +182,8 @@ Client::call<LIST_VOLUMES>(
.call(
connection,
GRPC_CLIENT_METHOD(Controller, ListVolumes),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<ListVolumesResponse, StatusError>& result)
-> Future<ListVolumesResponse> {
return result;
@@ -189,7 +200,8 @@ Client::call<GET_CAPACITY>(
.call(
connection,
GRPC_CLIENT_METHOD(Controller, GetCapacity),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<GetCapacityResponse, StatusError>& result)
-> Future<GetCapacityResponse> {
return result;
@@ -206,7 +218,8 @@ Client::call<CONTROLLER_GET_CAPABILITIES>(
.call(
connection,
GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result)
-> Future<ControllerGetCapabilitiesResponse> {
return result;
@@ -223,7 +236,8 @@ Client::call<NODE_STAGE_VOLUME>(
.call(
connection,
GRPC_CLIENT_METHOD(Node, NodeStageVolume),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<NodeStageVolumeResponse, StatusError>& result)
-> Future<NodeStageVolumeResponse> {
return result;
@@ -240,7 +254,8 @@ Client::call<NODE_UNSTAGE_VOLUME>(
.call(
connection,
GRPC_CLIENT_METHOD(Node, NodeUnstageVolume),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<NodeUnstageVolumeResponse, StatusError>& result)
-> Future<NodeUnstageVolumeResponse> {
return result;
@@ -257,7 +272,8 @@ Client::call<NODE_PUBLISH_VOLUME>(
.call(
connection,
GRPC_CLIENT_METHOD(Node, NodePublishVolume),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<NodePublishVolumeResponse, StatusError>& result)
-> Future<NodePublishVolumeResponse> {
return result;
@@ -274,7 +290,8 @@ Client::call<NODE_UNPUBLISH_VOLUME>(
.call(
connection,
GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result)
-> Future<NodeUnpublishVolumeResponse> {
return result;
@@ -291,7 +308,8 @@ Client::call<NODE_GET_ID>(
.call(
connection,
GRPC_CLIENT_METHOD(Node, NodeGetId),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<NodeGetIdResponse, StatusError>& result)
-> Future<NodeGetIdResponse> {
return result;
@@ -308,7 +326,8 @@ Client::call<NODE_GET_CAPABILITIES>(
.call(
connection,
GRPC_CLIENT_METHOD(Node, NodeGetCapabilities),
- std::move(request))
+ std::move(request),
+ CallOptions())
.then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result)
-> Future<NodeGetCapabilitiesResponse> {
return result;
[2/2] mesos git commit: Made gRPC call options configurable.
Posted by ch...@apache.org.
Made gRPC call options configurable.
Review: https://reviews.apache.org/r/67938
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a211b4ca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a211b4ca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a211b4ca
Branch: refs/heads/master
Commit: a211b4cadf289168464fc50987255d883c226e89
Parents: abf11a9
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Jul 16 18:18:11 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue Jul 17 13:50:59 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/grpc.hpp | 35 ++++++---
3rdparty/libprocess/src/tests/grpc_tests.cpp | 88 ++++++++++++++++-------
2 files changed, 87 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a211b4ca/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 0ff8184..bdafeb3 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -30,6 +30,7 @@
#include <process/pid.hpp>
#include <process/process.hpp>
+#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
@@ -113,6 +114,23 @@ public:
/**
+ * Defines the gRPC options for each call.
+ */
+struct CallOptions
+{
+ // Enable the gRPC wait-for-ready semantics by default so the call will be
+ // retried if the connection is not ready. See:
+ // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
+ bool wait_for_ready = true;
+
+ // The timeout of the call. A `DEADLINE_EXCEEDED` status will be returned if
+ // there is no response in the specified amount of time. This is required to
+ // avoid the call from being pending forever.
+ Duration timeout = Seconds(60);
+};
+
+
+/**
* A copyable interface to manage an internal runtime process for asynchronous
* gRPC calls. A runtime process keeps a gRPC `CompletionQueue` to manage
* outstanding requests, a looper thread to wait for any incoming responses from
@@ -142,6 +160,7 @@ public:
* @param method The asynchronous gRPC call to make. This should be obtained
* by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
* @param request The request protobuf for the gRPC call.
+ * @param options The gRPC options for the call.
* @return a `Future` of `Try` waiting for a response protobuf or an error.
*/
template <
@@ -158,7 +177,8 @@ public:
Future<Try<Response, StatusError>> call(
const Connection& connection,
Method&& method,
- Request&& request)
+ Request&& request,
+ const CallOptions& options)
{
// Create a `Promise` that will be set upon receiving a response.
// TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
@@ -171,7 +191,7 @@ public:
// TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
// extra copy. We should capture it by forwarding once we get C++14.
dispatch(data->pid, &RuntimeProcess::send, std::bind(
- [connection, method, promise](
+ [connection, method, options, promise](
const Request& request,
bool terminating,
::grpc::CompletionQueue* queue) {
@@ -185,14 +205,11 @@ public:
std::shared_ptr<::grpc::ClientContext> context(
new ::grpc::ClientContext());
- // TODO(chhsiao): Allow the caller to specify a timeout.
- context->set_deadline(
- std::chrono::system_clock::now() + std::chrono::seconds(5));
+ context->set_wait_for_ready(options.wait_for_ready);
- // Enable the gRPC wait-for-ready semantics by default. See:
- // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
- // TODO(chhsiao): Allow the caller to set the option.
- context->set_wait_for_ready(true);
+ context->set_deadline(
+ std::chrono::system_clock::now() +
+ std::chrono::nanoseconds(options.timeout.ns()));
promise->future().onDiscard([=] { context->TryCancel(); });
http://git-wip-us.apache.org/repos/asf/mesos/blob/a211b4ca/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index eb9621b..9677df6 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -48,6 +48,8 @@ using process::Promise;
using process::grpc::StatusError;
+using process::grpc::client::CallOptions;
+
using testing::_;
using testing::DoAll;
using testing::InvokeWithoutArgs;
@@ -109,6 +111,15 @@ protected:
{
return "unix://" + path::join(sandbox.get(), "socket");
}
+
+ static CallOptions call_options()
+ {
+ CallOptions options;
+ options.wait_for_ready = true;
+ options.timeout = Milliseconds(100);
+
+ return options;
+ }
};
@@ -122,8 +133,11 @@ TEST_F(GRPCClientTest, Success)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_ASSERT_READY(pong);
EXPECT_SOME(pong.get());
@@ -169,14 +183,23 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong1 =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong1 = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
- Future<Try<Pong, StatusError>> pong2 =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong2 = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
- Future<Try<Pong, StatusError>> pong3 =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong3 = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_READY(processed1->future());
AWAIT_READY(processed2->future());
@@ -214,8 +237,11 @@ TEST_F(GRPCClientTest, StatusError)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
@@ -240,8 +266,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
client::Connection connection(server_address());
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
pong.discard();
@@ -278,8 +307,11 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_READY(processed->future());
@@ -317,8 +349,11 @@ TEST_F(GRPCClientTest, ClientShutdown)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_READY(processed->future());
@@ -327,9 +362,6 @@ TEST_F(GRPCClientTest, ClientShutdown)
shutdown->set(Nothing());
- // TODO(chhsiao): The gRPC library returns a failure after the default
- // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
- // is shut down. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
@@ -345,14 +377,15 @@ TEST_F(GRPCClientTest, ServerUnreachable)
client::Connection connection("nosuchhost");
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
- // TODO(chhsiao): The gRPC library returns a failure after the default timeout
- // (5 seconds) is passed. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
@@ -379,11 +412,12 @@ TEST_F(GRPCClientTest, ServerTimeout)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
- // TODO(chhsiao): The gRPC library returns a failure after the default timeout
- // (5 seconds) is passed. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());