You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/23 23:32:06 UTC

[2/6] mesos git commit: Renamed `GRPC_RPC` and adapted the new gRPC async API.

Renamed `GRPC_RPC` and adapted the new gRPC async API.

To better describe the `GRPC_RPC` macro and to avoid future name
conflicts, we renamed it to `GRPC_CLIENT_METHOD`. Additionally,
we adapted the new gRPC asynchronous client API. See:
https://github.com/grpc/grpc/pull/12269

We also introduced the `MethodTraits` internal helper to simplify the
declaration of `Runtime::call`.

Review: https://reviews.apache.org/r/67155


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bac270b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bac270b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bac270b

Branch: refs/heads/master
Commit: 3bac270b0ed99315edf72d24653c1cb068247c20
Parents: 9a94eb6
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 15 16:43:33 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 44 ++++++++++++++++++-----
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 20 +++++------
 2 files changed, 45 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index f7f3171..68bdeb1 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -41,8 +41,8 @@
 // interface to create an asynchrous gRPC call and return a `Future`.
 
 
-#define GRPC_RPC(service, rpc) \
-  (&service::Stub::Async##rpc)
+#define GRPC_CLIENT_METHOD(service, rpc) \
+  (&service::Stub::PrepareAsync##rpc)
 
 namespace process {
 namespace grpc {
@@ -94,6 +94,28 @@ public:
 
 namespace client {
 
+// Internal helper utilities.
+namespace internal {
+
+template <typename T>
+struct MethodTraits; // Undefined.
+
+
+template <typename Stub, typename Request, typename Response>
+struct MethodTraits<
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
+        ::grpc::ClientContext*,
+        const Request&,
+        ::grpc::CompletionQueue*)>
+{
+  typedef Stub stub_type;
+  typedef Request request_type;
+  typedef Response response_type;
+};
+
+} // namespace internal {
+
+
 /**
  * A copyable interface to manage an internal gRPC runtime instance for
  * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
@@ -123,17 +145,19 @@ public:
    *
    * @param channel A connection to a gRPC server.
    * @param rpc The asynchronous gRPC call to make. This can be obtained
-   *     by the `GRPC_RPC(Service, RPC)` macro.
+   *     by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
    * @param request The request protobuf for the gRPC call.
    * @return a `Future` of `Try` waiting for a response protobuf or an error.
    */
-  template <typename Stub, typename Request, typename Response>
+  template <
+      typename Method,
+      typename Request =
+        typename internal::MethodTraits<Method>::request_type,
+      typename Response =
+        typename internal::MethodTraits<Method>::response_type>
   Future<Try<Response, StatusError>> call(
       const Channel& channel,
-      std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
-          ::grpc::ClientContext*,
-          const Request&,
-          ::grpc::CompletionQueue*),
+      Method&& method,
       const Request& request)
   {
     static_assert(
@@ -170,8 +194,10 @@ public:
       std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
 
       std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
-          (Stub(channel.channel).*rpc)(context.get(), request, &data->queue));
+          (typename internal::MethodTraits<Method>::stub_type(
+              channel.channel).*method)(context.get(), request, &data->queue));
 
+      reader->StartCall();
       reader->Finish(
           response.get(),
           status.get(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 07c2f3e..f1cdb5e 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -124,7 +124,7 @@ TEST_F(GRPCClientTest, Success)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_SOME(pong.get());
@@ -171,13 +171,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong1 =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   Future<Try<Pong, StatusError>> pong2 =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   Future<Try<Pong, StatusError>> pong3 =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed1->future());
   AWAIT_READY(processed2->future());
@@ -216,7 +216,7 @@ TEST_F(GRPCClientTest, StatusError)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
@@ -242,7 +242,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   pong.discard();
 
@@ -280,7 +280,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -319,7 +319,7 @@ TEST_F(GRPCClientTest, ClientShutdown)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -347,7 +347,7 @@ TEST_F(GRPCClientTest, ServerUnreachable)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -381,7 +381,7 @@ TEST_F(GRPCClientTest, ServerTimeout)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   // TODO(chhsiao): The gRPC library returns a failure after the default timeout
   // (5 seconds) is passed. The timeout should be lowered once we support it.