You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/23 23:32:06 UTC
[2/6] mesos git commit: Renamed `GRPC_RPC` and adapted the new gRPC
async API.
Renamed `GRPC_RPC` and adapted the new gRPC async API.
To better describe the `GRPC_RPC` macro and to avoid future name
conflicts, we renamed it to `GRPC_CLIENT_METHOD`. Additionally,
we adapted the new gRPC asynchronous client API. See:
https://github.com/grpc/grpc/pull/12269
We also introduced the `MethodTraits` internal helper to simplify the
declaration of `Runtime::call`.
Review: https://reviews.apache.org/r/67155
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bac270b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bac270b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bac270b
Branch: refs/heads/master
Commit: 3bac270b0ed99315edf72d24653c1cb068247c20
Parents: 9a94eb6
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 15 16:43:33 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/grpc.hpp | 44 ++++++++++++++++++-----
3rdparty/libprocess/src/tests/grpc_tests.cpp | 20 +++++------
2 files changed, 45 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index f7f3171..68bdeb1 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -41,8 +41,8 @@
// interface to create an asynchrous gRPC call and return a `Future`.
-#define GRPC_RPC(service, rpc) \
- (&service::Stub::Async##rpc)
+#define GRPC_CLIENT_METHOD(service, rpc) \
+ (&service::Stub::PrepareAsync##rpc)
namespace process {
namespace grpc {
@@ -94,6 +94,28 @@ public:
namespace client {
+// Internal helper utilities.
+namespace internal {
+
+template <typename T>
+struct MethodTraits; // Undefined.
+
+
+template <typename Stub, typename Request, typename Response>
+struct MethodTraits<
+ std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
+ ::grpc::ClientContext*,
+ const Request&,
+ ::grpc::CompletionQueue*)>
+{
+ typedef Stub stub_type;
+ typedef Request request_type;
+ typedef Response response_type;
+};
+
+} // namespace internal {
+
+
/**
* A copyable interface to manage an internal gRPC runtime instance for
* asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
@@ -123,17 +145,19 @@ public:
*
* @param channel A connection to a gRPC server.
* @param rpc The asynchronous gRPC call to make. This can be obtained
- * by the `GRPC_RPC(Service, RPC)` macro.
+ * by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
* @param request The request protobuf for the gRPC call.
* @return a `Future` of `Try` waiting for a response protobuf or an error.
*/
- template <typename Stub, typename Request, typename Response>
+ template <
+ typename Method,
+ typename Request =
+ typename internal::MethodTraits<Method>::request_type,
+ typename Response =
+ typename internal::MethodTraits<Method>::response_type>
Future<Try<Response, StatusError>> call(
const Channel& channel,
- std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
- ::grpc::ClientContext*,
- const Request&,
- ::grpc::CompletionQueue*),
+ Method&& method,
const Request& request)
{
static_assert(
@@ -170,8 +194,10 @@ public:
std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
- (Stub(channel.channel).*rpc)(context.get(), request, &data->queue));
+ (typename internal::MethodTraits<Method>::stub_type(
+ channel.channel).*method)(context.get(), request, &data->queue));
+ reader->StartCall();
reader->Finish(
response.get(),
status.get(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 07c2f3e..f1cdb5e 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -124,7 +124,7 @@ TEST_F(GRPCClientTest, Success)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
EXPECT_SOME(pong.get());
@@ -171,13 +171,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong1 =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
Future<Try<Pong, StatusError>> pong2 =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
Future<Try<Pong, StatusError>> pong3 =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed1->future());
AWAIT_READY(processed2->future());
@@ -216,7 +216,7 @@ TEST_F(GRPCClientTest, StatusError)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
@@ -242,7 +242,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
pong.discard();
@@ -280,7 +280,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -319,7 +319,7 @@ TEST_F(GRPCClientTest, ClientShutdown)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -347,7 +347,7 @@ TEST_F(GRPCClientTest, ServerUnreachable)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
@@ -381,7 +381,7 @@ TEST_F(GRPCClientTest, ServerTimeout)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
// TODO(chhsiao): The gRPC library returns a failure after the default timeout
// (5 seconds) is passed. The timeout should be lowered once we support it.