You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/07/17 20:57:51 UTC
[2/2] mesos git commit: Made gRPC call options configurable.
Made gRPC call options configurable.
Review: https://reviews.apache.org/r/67938
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a211b4ca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a211b4ca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a211b4ca
Branch: refs/heads/master
Commit: a211b4cadf289168464fc50987255d883c226e89
Parents: abf11a9
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Jul 16 18:18:11 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue Jul 17 13:50:59 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/grpc.hpp | 35 ++++++---
3rdparty/libprocess/src/tests/grpc_tests.cpp | 88 ++++++++++++++++-------
2 files changed, 87 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a211b4ca/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 0ff8184..bdafeb3 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -30,6 +30,7 @@
#include <process/pid.hpp>
#include <process/process.hpp>
+#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
@@ -113,6 +114,23 @@ public:
/**
+ * Defines the gRPC options for each call.
+ */
+struct CallOptions
+{
+ // Enable the gRPC wait-for-ready semantics by default so the call will be
+ // retried if the connection is not ready. See:
+ // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
+ bool wait_for_ready = true;
+
+ // The timeout of the call. A `DEADLINE_EXCEEDED` status will be returned if
+ // there is no response in the specified amount of time. This is required to
+ // avoid the call from being pending forever.
+ Duration timeout = Seconds(60);
+};
+
+
+/**
* A copyable interface to manage an internal runtime process for asynchronous
* gRPC calls. A runtime process keeps a gRPC `CompletionQueue` to manage
* outstanding requests, a looper thread to wait for any incoming responses from
@@ -142,6 +160,7 @@ public:
* @param method The asynchronous gRPC call to make. This should be obtained
* by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
* @param request The request protobuf for the gRPC call.
+ * @param options The gRPC options for the call.
* @return a `Future` of `Try` waiting for a response protobuf or an error.
*/
template <
@@ -158,7 +177,8 @@ public:
Future<Try<Response, StatusError>> call(
const Connection& connection,
Method&& method,
- Request&& request)
+ Request&& request,
+ const CallOptions& options)
{
// Create a `Promise` that will be set upon receiving a response.
// TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
@@ -171,7 +191,7 @@ public:
// TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
// extra copy. We should capture it by forwarding once we get C++14.
dispatch(data->pid, &RuntimeProcess::send, std::bind(
- [connection, method, promise](
+ [connection, method, options, promise](
const Request& request,
bool terminating,
::grpc::CompletionQueue* queue) {
@@ -185,14 +205,11 @@ public:
std::shared_ptr<::grpc::ClientContext> context(
new ::grpc::ClientContext());
- // TODO(chhsiao): Allow the caller to specify a timeout.
- context->set_deadline(
- std::chrono::system_clock::now() + std::chrono::seconds(5));
+ context->set_wait_for_ready(options.wait_for_ready);
- // Enable the gRPC wait-for-ready semantics by default. See:
- // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
- // TODO(chhsiao): Allow the caller to set the option.
- context->set_wait_for_ready(true);
+ context->set_deadline(
+ std::chrono::system_clock::now() +
+ std::chrono::nanoseconds(options.timeout.ns()));
promise->future().onDiscard([=] { context->TryCancel(); });
http://git-wip-us.apache.org/repos/asf/mesos/blob/a211b4ca/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index eb9621b..9677df6 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -48,6 +48,8 @@ using process::Promise;
using process::grpc::StatusError;
+using process::grpc::client::CallOptions;
+
using testing::_;
using testing::DoAll;
using testing::InvokeWithoutArgs;
@@ -109,6 +111,15 @@ protected:
{
return "unix://" + path::join(sandbox.get(), "socket");
}
+
+ static CallOptions call_options()
+ {
+ CallOptions options;
+ options.wait_for_ready = true;
+ options.timeout = Milliseconds(100);
+
+ return options;
+ }
};
@@ -122,8 +133,11 @@ TEST_F(GRPCClientTest, Success)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_ASSERT_READY(pong);
EXPECT_SOME(pong.get());
@@ -169,14 +183,23 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong1 =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong1 = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
- Future<Try<Pong, StatusError>> pong2 =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong2 = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
- Future<Try<Pong, StatusError>> pong3 =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong3 = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_READY(processed1->future());
AWAIT_READY(processed2->future());
@@ -214,8 +237,11 @@ TEST_F(GRPCClientTest, StatusError)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
@@ -240,8 +266,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
client::Connection connection(server_address());
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
pong.discard();
@@ -278,8 +307,11 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_READY(processed->future());
@@ -317,8 +349,11 @@ TEST_F(GRPCClientTest, ClientShutdown)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
AWAIT_READY(processed->future());
@@ -327,9 +362,6 @@ TEST_F(GRPCClientTest, ClientShutdown)
shutdown->set(Nothing());
- // TODO(chhsiao): The gRPC library returns a failure after the default
- // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
- // is shut down. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
@@ -345,14 +377,15 @@ TEST_F(GRPCClientTest, ServerUnreachable)
client::Connection connection("nosuchhost");
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection,
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
- // TODO(chhsiao): The gRPC library returns a failure after the default timeout
- // (5 seconds) is passed. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
@@ -379,11 +412,12 @@ TEST_F(GRPCClientTest, ServerTimeout)
client::Runtime runtime;
- Future<Try<Pong, StatusError>> pong =
- runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ Future<Try<Pong, StatusError>> pong = runtime.call(
+ connection.get(),
+ GRPC_CLIENT_METHOD(PingPong, Send),
+ Ping(),
+ call_options());
- // TODO(chhsiao): The gRPC library returns a failure after the default timeout
- // (5 seconds) is passed. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());