You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/23 23:32:09 UTC
[5/6] mesos git commit: Renamed `grpc::Channel` to
`grpc::client::Connection`.
Renamed `grpc::Channel` to `grpc::client::Connection`.
This renaming is made to avoid name conflicts between `::grpc::Channel`
and libprocess' own wrapper. Also, since this wrapper is only used
at the client side, it is moved into the `client` namespace.
Review: https://reviews.apache.org/r/67156
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ddfea093
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ddfea093
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ddfea093
Branch: refs/heads/master
Commit: ddfea093f576cdd22d4fa221326406eaf8e59b49
Parents: 3bac270
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 15 17:54:24 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/grpc.hpp | 70 +++++++++++------------
3rdparty/libprocess/src/tests/grpc_tests.cpp | 57 +++++++++---------
2 files changed, 60 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 68bdeb1..cf165f0 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -34,11 +34,11 @@
#include <stout/try.hpp>
-// This file provides libprocess "support" for using gRPC. In
-// particular, it defines two wrapper classes: `Channel` (representing a
-// connection to a gRPC server) and `client::Runtime`, which integrates
-// an event loop waiting for gRPC responses, and provides the `call`
-// interface to create an asynchrous gRPC call and return a `Future`.
+// This file provides libprocess "support" for using gRPC. In particular, it
+// defines two wrapper classes: `client::Connection` which represents a
+// connection to a gRPC server, and `client::Runtime` which integrates an event
+// loop waiting for gRPC responses and provides the `call` interface to create
+// an asynchronous gRPC call and return a `Future`.
#define GRPC_CLIENT_METHOD(service, rpc) \
@@ -47,34 +47,6 @@
namespace process {
namespace grpc {
-// Forward declarations.
-namespace client { class Runtime; }
-
-
-/**
- * A copyable interface to manage a connection to a gRPC server.
- * All `Channel` copies share the same connection. Note that the
- * connection is established lazily by the gRPC runtime library: the
- * actual connection is delayed till an RPC call is made.
- */
-class Channel
-{
-public:
- Channel(const std::string& uri,
- const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
- ::grpc::InsecureChannelCredentials())
- : channel(::grpc::CreateChannel(uri, credentials)) {}
-
- explicit Channel(std::shared_ptr<::grpc::Channel> _channel)
- : channel(std::move(_channel)) {}
-
-private:
- std::shared_ptr<::grpc::Channel> channel;
-
- friend class client::Runtime;
-};
-
-
/**
* Represents errors caused by non-OK gRPC statuses. See:
* https://grpc.io/grpc/cpp/classgrpc_1_1_status.html
@@ -117,6 +89,28 @@ struct MethodTraits<
/**
+ * A copyable interface to manage a connection to a gRPC server. All
+ * `Connection` copies share the same gRPC channel which is thread safe. Note
+ * that the actual connection is established lazily by the gRPC library at the
+ * time an RPC is made to the channel.
+ */
+class Connection
+{
+public:
+ Connection(
+ const std::string& uri,
+ const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
+ ::grpc::InsecureChannelCredentials())
+ : channel(::grpc::CreateChannel(uri, credentials)) {}
+
+ explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
+ : channel(std::move(_channel)) {}
+
+ const std::shared_ptr<::grpc::Channel> channel;
+};
+
+
+/**
* A copyable interface to manage an internal gRPC runtime instance for
* asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
* `CompletionQueue` to manage outstanding requests, a looper thread to
@@ -143,7 +137,7 @@ public:
* future never fails; it will return a `StatusError` if a non-OK status is
* returned for the call, so the caller can handle the error programmatically.
*
- * @param channel A connection to a gRPC server.
+ * @param connection A connection to a gRPC server.
* @param rpc The asynchronous gRPC call to make. This can be obtained
* by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
* @param request The request protobuf for the gRPC call.
@@ -156,7 +150,7 @@ public:
typename Response =
typename internal::MethodTraits<Method>::response_type>
Future<Try<Response, StatusError>> call(
- const Channel& channel,
+ const Connection& connection,
Method&& method,
const Request& request)
{
@@ -193,9 +187,9 @@ public:
std::shared_ptr<Response> response(new Response());
std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
- std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
- (typename internal::MethodTraits<Method>::stub_type(
- channel.channel).*method)(context.get(), request, &data->queue));
+ std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
+ (typename internal::MethodTraits<Method>::stub_type(
+ connection.channel).*method)(context.get(), request, &data->queue);
reader->StartCall();
reader->Finish(
http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index f1cdb5e..eb9621b 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -46,7 +46,6 @@ using ::grpc::Status;
using process::Future;
using process::Promise;
-using process::grpc::Channel;
using process::grpc::StatusError;
using testing::_;
@@ -67,7 +66,7 @@ public:
MOCK_METHOD3(Send, Status(ServerContext*, const Ping* ping, Pong* pong));
- Try<Channel> startup(const Option<string>& address = None())
+ Try<client::Connection> startup(const Option<string>& address = None())
{
ServerBuilder builder;
@@ -83,8 +82,8 @@ public:
}
return address.isSome()
- ? Channel(address.get())
- : Channel(server->InProcessChannel(ChannelArguments()));
+ ? client::Connection(address.get())
+ : client::Connection(server->InProcessChannel(ChannelArguments()));
}
Try<Nothing> shutdown()
@@ -104,7 +103,7 @@ class GRPCClientTest : public TemporaryDirectoryTest
{
protected:
// TODO(chhsiao): Consider removing this once we have a way to get a
- // channel before the server starts on Windows. See the
+ // connection before the server starts on Windows. See the
// `DiscardedBeforeServerStarted` test below.
string server_address() const
{
@@ -118,13 +117,13 @@ protected:
TEST_F(GRPCClientTest, Success)
{
PingPongServer server;
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
EXPECT_SOME(pong.get());
@@ -140,8 +139,8 @@ TEST_F(GRPCClientTest, Success)
TEST_F(GRPCClientTest, ConcurrentRPCs)
{
PingPongServer server;
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
shared_ptr<Promise<Nothing>> processed1(new Promise<Nothing>());
shared_ptr<Promise<Nothing>> processed2(new Promise<Nothing>());
@@ -171,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong1 =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
Future<Try<Pong, StatusError>> pong2 =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
Future<Try<Pong, StatusError>> pong3 =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed1->future());
AWAIT_READY(processed2->future());
@@ -210,13 +209,13 @@ TEST_F(GRPCClientTest, StatusError)
EXPECT_CALL(server, Send(_, _, _))
.WillOnce(Return(Status::CANCELLED));
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
@@ -238,11 +237,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
EXPECT_CALL(server, Send(_, _, _))
.Times(0);
- Channel channel(server_address());
+ client::Connection connection(server_address());
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
pong.discard();
@@ -274,13 +273,13 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
}),
Return(Status::OK)));
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -313,13 +312,13 @@ TEST_F(GRPCClientTest, ClientShutdown)
}),
Return(Status::OK)));
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -343,11 +342,11 @@ TEST_F(GRPCClientTest, ClientShutdown)
// to connect to the server.
TEST_F(GRPCClientTest, ServerUnreachable)
{
- Channel channel("nosuchhost");
+ client::Connection connection("nosuchhost");
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
@@ -375,13 +374,13 @@ TEST_F(GRPCClientTest, ServerTimeout)
}),
Return(Status::OK)));
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
// TODO(chhsiao): The gRPC library returns a failure after the default timeout
// (5 seconds) is passed. The timeout should be lowered once we support it.