You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/23 23:32:09 UTC

[5/6] mesos git commit: Renamed `grpc::Channel` to `grpc::client::Connection`.

Renamed `grpc::Channel` to `grpc::client::Connection`.

This renaming is made to avoid name conflicts between `::grpc::Channel`
and libprocess' own wrapper. Also, since this wrapper is only used
at the client side, it is moved into the `client` namespace.

Review: https://reviews.apache.org/r/67156


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ddfea093
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ddfea093
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ddfea093

Branch: refs/heads/master
Commit: ddfea093f576cdd22d4fa221326406eaf8e59b49
Parents: 3bac270
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 15 17:54:24 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 70 +++++++++++------------
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 57 +++++++++---------
 2 files changed, 60 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 68bdeb1..cf165f0 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -34,11 +34,11 @@
 #include <stout/try.hpp>
 
 
-// This file provides libprocess "support" for using gRPC. In
-// particular, it defines two wrapper classes: `Channel` (representing a
-// connection to a gRPC server) and `client::Runtime`, which integrates
-// an event loop waiting for gRPC responses, and provides the `call`
-// interface to create an asynchrous gRPC call and return a `Future`.
+// This file provides libprocess "support" for using gRPC. In particular, it
+// defines two wrapper classes: `client::Connection` which represents a
+// connection to a gRPC server, and `client::Runtime` which integrates an event
+// loop waiting for gRPC responses and provides the `call` interface to create
+// an asynchronous gRPC call and return a `Future`.
 
 
 #define GRPC_CLIENT_METHOD(service, rpc) \
@@ -47,34 +47,6 @@
 namespace process {
 namespace grpc {
 
-// Forward declarations.
-namespace client { class Runtime; }
-
-
-/**
- * A copyable interface to manage a connection to a gRPC server.
- * All `Channel` copies share the same connection. Note that the
- * connection is established lazily by the gRPC runtime library: the
- * actual connection is delayed till an RPC call is made.
- */
-class Channel
-{
-public:
-  Channel(const std::string& uri,
-          const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
-            ::grpc::InsecureChannelCredentials())
-    : channel(::grpc::CreateChannel(uri, credentials)) {}
-
-  explicit Channel(std::shared_ptr<::grpc::Channel> _channel)
-    : channel(std::move(_channel)) {}
-
-private:
-  std::shared_ptr<::grpc::Channel> channel;
-
-  friend class client::Runtime;
-};
-
-
 /**
  * Represents errors caused by non-OK gRPC statuses. See:
  * https://grpc.io/grpc/cpp/classgrpc_1_1_status.html
@@ -117,6 +89,28 @@ struct MethodTraits<
 
 
 /**
+ * A copyable interface to manage a connection to a gRPC server. All
+ * `Connection` copies share the same gRPC channel which is thread safe. Note
+ * that the actual connection is established lazily by the gRPC library at the
+ * time an RPC is made to the channel.
+ */
+class Connection
+{
+public:
+  Connection(
+      const std::string& uri,
+      const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
+        ::grpc::InsecureChannelCredentials())
+    : channel(::grpc::CreateChannel(uri, credentials)) {}
+
+  explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
+    : channel(std::move(_channel)) {}
+
+  const std::shared_ptr<::grpc::Channel> channel;
+};
+
+
+/**
  * A copyable interface to manage an internal gRPC runtime instance for
  * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
  * `CompletionQueue` to manage outstanding requests, a looper thread to
@@ -143,7 +137,7 @@ public:
    * future never fails; it will return a `StatusError` if a non-OK status is
    * returned for the call, so the caller can handle the error programmatically.
    *
-   * @param channel A connection to a gRPC server.
+   * @param connection A connection to a gRPC server.
    * @param rpc The asynchronous gRPC call to make. This can be obtained
    *     by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
    * @param request The request protobuf for the gRPC call.
@@ -156,7 +150,7 @@ public:
       typename Response =
         typename internal::MethodTraits<Method>::response_type>
   Future<Try<Response, StatusError>> call(
-      const Channel& channel,
+      const Connection& connection,
       Method&& method,
       const Request& request)
   {
@@ -193,9 +187,9 @@ public:
       std::shared_ptr<Response> response(new Response());
       std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
 
-      std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
-          (typename internal::MethodTraits<Method>::stub_type(
-              channel.channel).*method)(context.get(), request, &data->queue));
+      std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
+        (typename internal::MethodTraits<Method>::stub_type(
+            connection.channel).*method)(context.get(), request, &data->queue);
 
       reader->StartCall();
       reader->Finish(

http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index f1cdb5e..eb9621b 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -46,7 +46,6 @@ using ::grpc::Status;
 using process::Future;
 using process::Promise;
 
-using process::grpc::Channel;
 using process::grpc::StatusError;
 
 using testing::_;
@@ -67,7 +66,7 @@ public:
 
   MOCK_METHOD3(Send, Status(ServerContext*, const Ping* ping, Pong* pong));
 
-  Try<Channel> startup(const Option<string>& address = None())
+  Try<client::Connection> startup(const Option<string>& address = None())
   {
     ServerBuilder builder;
 
@@ -83,8 +82,8 @@ public:
     }
 
     return address.isSome()
-      ? Channel(address.get())
-      : Channel(server->InProcessChannel(ChannelArguments()));
+      ? client::Connection(address.get())
+      : client::Connection(server->InProcessChannel(ChannelArguments()));
   }
 
   Try<Nothing> shutdown()
@@ -104,7 +103,7 @@ class GRPCClientTest : public TemporaryDirectoryTest
 {
 protected:
   // TODO(chhsiao): Consider removing this once we have a way to get a
-  // channel before the server starts on Windows. See the
+  // connection before the server starts on Windows. See the
   // `DiscardedBeforeServerStarted` test below.
   string server_address() const
   {
@@ -118,13 +117,13 @@ protected:
 TEST_F(GRPCClientTest, Success)
 {
   PingPongServer server;
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_SOME(pong.get());
@@ -140,8 +139,8 @@ TEST_F(GRPCClientTest, Success)
 TEST_F(GRPCClientTest, ConcurrentRPCs)
 {
   PingPongServer server;
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   shared_ptr<Promise<Nothing>> processed1(new Promise<Nothing>());
   shared_ptr<Promise<Nothing>> processed2(new Promise<Nothing>());
@@ -171,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong1 =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   Future<Try<Pong, StatusError>> pong2 =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   Future<Try<Pong, StatusError>> pong3 =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed1->future());
   AWAIT_READY(processed2->future());
@@ -210,13 +209,13 @@ TEST_F(GRPCClientTest, StatusError)
   EXPECT_CALL(server, Send(_, _, _))
     .WillOnce(Return(Status::CANCELLED));
 
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
@@ -238,11 +237,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
   EXPECT_CALL(server, Send(_, _, _))
     .Times(0);
 
-  Channel channel(server_address());
+  client::Connection connection(server_address());
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   pong.discard();
 
@@ -274,13 +273,13 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
         }),
         Return(Status::OK)));
 
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -313,13 +312,13 @@ TEST_F(GRPCClientTest, ClientShutdown)
         }),
         Return(Status::OK)));
 
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -343,11 +342,11 @@ TEST_F(GRPCClientTest, ClientShutdown)
 // to connect to the server.
 TEST_F(GRPCClientTest, ServerUnreachable)
 {
-  Channel channel("nosuchhost");
+  client::Connection connection("nosuchhost");
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -375,13 +374,13 @@ TEST_F(GRPCClientTest, ServerTimeout)
         }),
         Return(Status::OK)));
 
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   // TODO(chhsiao): The gRPC library returns a failure after the default timeout
   // (5 seconds) is passed. The timeout should be lowered once we support it.