You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/23 23:32:05 UTC
[1/6] mesos git commit: Updated the CSI client to use the new
libprocess gRPC interface.
Repository: mesos
Updated Branches:
refs/heads/master 21305ab47 -> 3d2a1fd49
Updated the CSI client to use the new libprocess gRPC interface.
This patch makes the following changes:
1. Replace `GRPC_RPC` with `GRPC_CLIENT_METHOD`.
2. Replace `process::grpc::Channel` with
`process::grpc::client::Connection`.
3. Make the CSI client metods return a `Future` of a `Try`.
Review: https://reviews.apache.org/r/67158
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d2a1fd4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d2a1fd4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d2a1fd4
Branch: refs/heads/master
Commit: 3d2a1fd49dfe0325f4ca94e3abbab97366bcebd0
Parents: ddfea09
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed May 16 12:10:10 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700
----------------------------------------------------------------------
src/csi/client.cpp | 187 +++++++++++++-----------------------
src/csi/client.hpp | 6 +-
src/tests/csi_client_tests.cpp | 19 ++--
src/tests/mock_csi_plugin.cpp | 8 +-
src/tests/mock_csi_plugin.hpp | 2 +-
5 files changed, 84 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index 559e805..a4ba1f1 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -19,7 +19,7 @@
using process::Failure;
using process::Future;
-using process::grpc::RpcResult;
+using process::grpc::StatusError;
namespace mesos {
namespace csi {
@@ -29,14 +29,10 @@ Future<GetPluginInfoResponse> Client::GetPluginInfo(
const GetPluginInfoRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Identity, GetPluginInfo), request)
- .then([](const RpcResult<GetPluginInfoResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Identity, GetPluginInfo), request)
+ .then([](const Try<GetPluginInfoResponse, StatusError>& result)
-> Future<GetPluginInfoResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -45,14 +41,13 @@ Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities(
const GetPluginCapabilitiesRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Identity, GetPluginCapabilities), request)
- .then([](const RpcResult<GetPluginCapabilitiesResponse>& result)
+ .call(
+ connection,
+ GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
+ request)
+ .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result)
-> Future<GetPluginCapabilitiesResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -61,14 +56,10 @@ Future<ProbeResponse> Client::Probe(
const ProbeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Identity, Probe), request)
- .then([](const RpcResult<ProbeResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Identity, Probe), request)
+ .then([](const Try<ProbeResponse, StatusError>& result)
-> Future<ProbeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -77,14 +68,10 @@ Future<CreateVolumeResponse> Client::CreateVolume(
const CreateVolumeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Controller, CreateVolume), request)
- .then([](const RpcResult<CreateVolumeResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Controller, CreateVolume), request)
+ .then([](const Try<CreateVolumeResponse, StatusError>& result)
-> Future<CreateVolumeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -93,14 +80,10 @@ Future<DeleteVolumeResponse> Client::DeleteVolume(
const DeleteVolumeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Controller, DeleteVolume), request)
- .then([](const RpcResult<DeleteVolumeResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Controller, DeleteVolume), request)
+ .then([](const Try<DeleteVolumeResponse, StatusError>& result)
-> Future<DeleteVolumeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -109,14 +92,13 @@ Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume(
const ControllerPublishVolumeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Controller, ControllerPublishVolume), request)
- .then([](const RpcResult<ControllerPublishVolumeResponse>& result)
+ .call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
+ request)
+ .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result)
-> Future<ControllerPublishVolumeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -125,14 +107,13 @@ Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume(
const ControllerUnpublishVolumeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Controller, ControllerUnpublishVolume), request)
- .then([](const RpcResult<ControllerUnpublishVolumeResponse>& result)
+ .call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
+ request)
+ .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result)
-> Future<ControllerUnpublishVolumeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -141,14 +122,13 @@ Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities(
const ValidateVolumeCapabilitiesRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Controller, ValidateVolumeCapabilities), request)
- .then([](const RpcResult<ValidateVolumeCapabilitiesResponse>& result)
+ .call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
+ request)
+ .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result)
-> Future<ValidateVolumeCapabilitiesResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -157,14 +137,10 @@ Future<ListVolumesResponse> Client::ListVolumes(
const ListVolumesRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Controller, ListVolumes), request)
- .then([](const RpcResult<ListVolumesResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Controller, ListVolumes), request)
+ .then([](const Try<ListVolumesResponse, StatusError>& result)
-> Future<ListVolumesResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -173,14 +149,10 @@ Future<GetCapacityResponse> Client::GetCapacity(
const GetCapacityRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Controller, GetCapacity), request)
- .then([](const RpcResult<GetCapacityResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Controller, GetCapacity), request)
+ .then([](const Try<GetCapacityResponse, StatusError>& result)
-> Future<GetCapacityResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -189,14 +161,13 @@ Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
const ControllerGetCapabilitiesRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Controller, ControllerGetCapabilities), request)
- .then([](const RpcResult<ControllerGetCapabilitiesResponse>& result)
+ .call(
+ connection,
+ GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
+ request)
+ .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result)
-> Future<ControllerGetCapabilitiesResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -205,14 +176,10 @@ Future<NodeStageVolumeResponse> Client::NodeStageVolume(
const NodeStageVolumeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Node, NodeStageVolume), request)
- .then([](const RpcResult<NodeStageVolumeResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Node, NodeStageVolume), request)
+ .then([](const Try<NodeStageVolumeResponse, StatusError>& result)
-> Future<NodeStageVolumeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -221,14 +188,10 @@ Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume(
const NodeUnstageVolumeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Node, NodeUnstageVolume), request)
- .then([](const RpcResult<NodeUnstageVolumeResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Node, NodeUnstageVolume), request)
+ .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result)
-> Future<NodeUnstageVolumeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -237,14 +200,10 @@ Future<NodePublishVolumeResponse> Client::NodePublishVolume(
const NodePublishVolumeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Node, NodePublishVolume), request)
- .then([](const RpcResult<NodePublishVolumeResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Node, NodePublishVolume), request)
+ .then([](const Try<NodePublishVolumeResponse, StatusError>& result)
-> Future<NodePublishVolumeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -253,14 +212,10 @@ Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
const NodeUnpublishVolumeRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Node, NodeUnpublishVolume), request)
- .then([](const RpcResult<NodeUnpublishVolumeResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume), request)
+ .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result)
-> Future<NodeUnpublishVolumeResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -269,14 +224,10 @@ Future<NodeGetIdResponse> Client::NodeGetId(
const NodeGetIdRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Node, NodeGetId), request)
- .then([](const RpcResult<NodeGetIdResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Node, NodeGetId), request)
+ .then([](const Try<NodeGetIdResponse, StatusError>& result)
-> Future<NodeGetIdResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
@@ -285,14 +236,10 @@ Future<NodeGetCapabilitiesResponse> Client::NodeGetCapabilities(
const NodeGetCapabilitiesRequest& request)
{
return runtime
- .call(channel, GRPC_RPC(Node, NodeGetCapabilities), request)
- .then([](const RpcResult<NodeGetCapabilitiesResponse>& result)
+ .call(connection, GRPC_CLIENT_METHOD(Node, NodeGetCapabilities), request)
+ .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result)
-> Future<NodeGetCapabilitiesResponse> {
- if (result.status.ok()) {
- return result.response;
- } else {
- return Failure(result.status.error_message());
- }
+ return result;
});
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/csi/client.hpp
----------------------------------------------------------------------
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 5d84674..9d7019a 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -30,9 +30,9 @@ namespace v0 {
class Client
{
public:
- Client(const process::grpc::Channel& _channel,
+ Client(const process::grpc::client::Connection& _connection,
const process::grpc::client::Runtime& _runtime)
- : channel(_channel), runtime(_runtime) {}
+ : connection(_connection), runtime(_runtime) {}
// RPCs for the Identity service.
process::Future<GetPluginInfoResponse>
@@ -90,7 +90,7 @@ public:
NodeGetCapabilities(const NodeGetCapabilitiesRequest& request);
private:
- process::grpc::Channel channel;
+ process::grpc::client::Connection connection;
process::grpc::client::Runtime runtime;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/csi_client_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index f5b9eac..d5993d6 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -32,8 +32,7 @@ using mesos::csi::v0::Client;
using process::Future;
-using process::grpc::Channel;
-
+using process::grpc::client::Connection;
using process::grpc::client::Runtime;
using testing::TestParamInfo;
@@ -57,13 +56,13 @@ struct RPCParam
template <typename Request, typename Response>
RPCParam(const string& _name, Future<Response>(Client::*rpc)(const Request&))
: name(_name),
- call([=](const Channel& channel, const Runtime runtime) {
- return (Client(channel, runtime).*rpc)(Request())
+ call([=](const Connection& connection, const Runtime runtime) {
+ return (Client(connection, runtime).*rpc)(Request())
.then([] { return Nothing(); });
}) {}
string name;
- lambda::function<Future<Nothing>(const Channel&, const Runtime&)> call;
+ lambda::function<Future<Nothing>(const Connection&, const Runtime&)> call;
};
@@ -76,10 +75,10 @@ protected:
{
TemporaryDirectoryTest::SetUp();
- Try<Channel> _channel = plugin.startup();
- ASSERT_SOME(_channel);
+ Try<Connection> _connection = plugin.startup();
+ ASSERT_SOME(_connection);
- channel = _channel.get();
+ connection = _connection.get();
}
virtual void TearDown() override
@@ -91,7 +90,7 @@ protected:
}
MockCSIPlugin plugin;
- Option<process::grpc::Channel> channel;
+ Option<process::grpc::client::Connection> connection;
process::grpc::client::Runtime runtime;
};
@@ -139,7 +138,7 @@ INSTANTIATE_TEST_CASE_P(
// This test verifies that the all methods of CSI clients work.
TEST_P(CSIClientTest, Call)
{
- Future<Nothing> call = GetParam().call(channel.get(), runtime);
+ Future<Nothing> call = GetParam().call(connection.get(), runtime);
AWAIT_EXPECT_READY(call);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/mock_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.cpp b/src/tests/mock_csi_plugin.cpp
index 6983b84..17c6335 100644
--- a/src/tests/mock_csi_plugin.cpp
+++ b/src/tests/mock_csi_plugin.cpp
@@ -30,7 +30,7 @@ using mesos::csi::v0::Controller;
using mesos::csi::v0::Identity;
using mesos::csi::v0::Node;
-using process::grpc::Channel;
+using process::grpc::client::Connection;
using testing::_;
using testing::Return;
@@ -49,7 +49,7 @@ MockCSIPlugin::MockCSIPlugin()
}
-Try<Channel> MockCSIPlugin::startup(const Option<string>& address)
+Try<Connection> MockCSIPlugin::startup(const Option<string>& address)
{
ServerBuilder builder;
@@ -67,8 +67,8 @@ Try<Channel> MockCSIPlugin::startup(const Option<string>& address)
}
return address.isSome()
- ? Channel(address.get())
- : Channel(server->InProcessChannel(ChannelArguments()));
+ ? Connection(address.get())
+ : Connection(server->InProcessChannel(ChannelArguments()));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/mock_csi_plugin.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.hpp b/src/tests/mock_csi_plugin.hpp
index 6f7a5ab..4642326 100644
--- a/src/tests/mock_csi_plugin.hpp
+++ b/src/tests/mock_csi_plugin.hpp
@@ -72,7 +72,7 @@ public:
CSI_METHOD_FOREACH(DECLARE_MOCK_CSI_METHOD)
- Try<process::grpc::Channel> startup(
+ Try<process::grpc::client::Connection> startup(
const Option<std::string>& address = None());
Try<Nothing> shutdown();
[2/6] mesos git commit: Renamed `GRPC_RPC` and adapted the new gRPC
async API.
Posted by ch...@apache.org.
Renamed `GRPC_RPC` and adapted the new gRPC async API.
To better describe the `GRPC_RPC` macro and to avoid future name
conflicts, we renamed it to `GRPC_CLIENT_METHOD`. Additionally,
we adapted the new gRPC asynchronous client API. See:
https://github.com/grpc/grpc/pull/12269
We also introduced the `MethodTraits` internal helper to simplify the
declaration of `Runtime::call`.
Review: https://reviews.apache.org/r/67155
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bac270b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bac270b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bac270b
Branch: refs/heads/master
Commit: 3bac270b0ed99315edf72d24653c1cb068247c20
Parents: 9a94eb6
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 15 16:43:33 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/grpc.hpp | 44 ++++++++++++++++++-----
3rdparty/libprocess/src/tests/grpc_tests.cpp | 20 +++++------
2 files changed, 45 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index f7f3171..68bdeb1 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -41,8 +41,8 @@
// interface to create an asynchrous gRPC call and return a `Future`.
-#define GRPC_RPC(service, rpc) \
- (&service::Stub::Async##rpc)
+#define GRPC_CLIENT_METHOD(service, rpc) \
+ (&service::Stub::PrepareAsync##rpc)
namespace process {
namespace grpc {
@@ -94,6 +94,28 @@ public:
namespace client {
+// Internal helper utilities.
+namespace internal {
+
+template <typename T>
+struct MethodTraits; // Undefined.
+
+
+template <typename Stub, typename Request, typename Response>
+struct MethodTraits<
+ std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
+ ::grpc::ClientContext*,
+ const Request&,
+ ::grpc::CompletionQueue*)>
+{
+ typedef Stub stub_type;
+ typedef Request request_type;
+ typedef Response response_type;
+};
+
+} // namespace internal {
+
+
/**
* A copyable interface to manage an internal gRPC runtime instance for
* asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
@@ -123,17 +145,19 @@ public:
*
* @param channel A connection to a gRPC server.
* @param rpc The asynchronous gRPC call to make. This can be obtained
- * by the `GRPC_RPC(Service, RPC)` macro.
+ * by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
* @param request The request protobuf for the gRPC call.
* @return a `Future` of `Try` waiting for a response protobuf or an error.
*/
- template <typename Stub, typename Request, typename Response>
+ template <
+ typename Method,
+ typename Request =
+ typename internal::MethodTraits<Method>::request_type,
+ typename Response =
+ typename internal::MethodTraits<Method>::response_type>
Future<Try<Response, StatusError>> call(
const Channel& channel,
- std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
- ::grpc::ClientContext*,
- const Request&,
- ::grpc::CompletionQueue*),
+ Method&& method,
const Request& request)
{
static_assert(
@@ -170,8 +194,10 @@ public:
std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
- (Stub(channel.channel).*rpc)(context.get(), request, &data->queue));
+ (typename internal::MethodTraits<Method>::stub_type(
+ channel.channel).*method)(context.get(), request, &data->queue));
+ reader->StartCall();
reader->Finish(
response.get(),
status.get(),
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 07c2f3e..f1cdb5e 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -124,7 +124,7 @@ TEST_F(GRPCClientTest, Success)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
EXPECT_SOME(pong.get());
@@ -171,13 +171,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong1 =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
Future<Try<Pong, StatusError>> pong2 =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
Future<Try<Pong, StatusError>> pong3 =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed1->future());
AWAIT_READY(processed2->future());
@@ -216,7 +216,7 @@ TEST_F(GRPCClientTest, StatusError)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
@@ -242,7 +242,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
pong.discard();
@@ -280,7 +280,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -319,7 +319,7 @@ TEST_F(GRPCClientTest, ClientShutdown)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -347,7 +347,7 @@ TEST_F(GRPCClientTest, ServerUnreachable)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
@@ -381,7 +381,7 @@ TEST_F(GRPCClientTest, ServerTimeout)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+ runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
// TODO(chhsiao): The gRPC library returns a failure after the default timeout
// (5 seconds) is passed. The timeout should be lowered once we support it.
[5/6] mesos git commit: Renamed `grpc::Channel` to
`grpc::client::Connection`.
Posted by ch...@apache.org.
Renamed `grpc::Channel` to `grpc::client::Connection`.
This renaming is made to avoid name conflicts between `::grpc::Channel`
and libprocess' own wrapper. Also, since this wrapper is only used
at the client side, it is moved into the `client` namespace.
Review: https://reviews.apache.org/r/67156
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ddfea093
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ddfea093
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ddfea093
Branch: refs/heads/master
Commit: ddfea093f576cdd22d4fa221326406eaf8e59b49
Parents: 3bac270
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 15 17:54:24 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/grpc.hpp | 70 +++++++++++------------
3rdparty/libprocess/src/tests/grpc_tests.cpp | 57 +++++++++---------
2 files changed, 60 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 68bdeb1..cf165f0 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -34,11 +34,11 @@
#include <stout/try.hpp>
-// This file provides libprocess "support" for using gRPC. In
-// particular, it defines two wrapper classes: `Channel` (representing a
-// connection to a gRPC server) and `client::Runtime`, which integrates
-// an event loop waiting for gRPC responses, and provides the `call`
-// interface to create an asynchrous gRPC call and return a `Future`.
+// This file provides libprocess "support" for using gRPC. In particular, it
+// defines two wrapper classes: `client::Connection` which represents a
+// connection to a gRPC server, and `client::Runtime` which integrates an event
+// loop waiting for gRPC responses and provides the `call` interface to create
+// an asynchronous gRPC call and return a `Future`.
#define GRPC_CLIENT_METHOD(service, rpc) \
@@ -47,34 +47,6 @@
namespace process {
namespace grpc {
-// Forward declarations.
-namespace client { class Runtime; }
-
-
-/**
- * A copyable interface to manage a connection to a gRPC server.
- * All `Channel` copies share the same connection. Note that the
- * connection is established lazily by the gRPC runtime library: the
- * actual connection is delayed till an RPC call is made.
- */
-class Channel
-{
-public:
- Channel(const std::string& uri,
- const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
- ::grpc::InsecureChannelCredentials())
- : channel(::grpc::CreateChannel(uri, credentials)) {}
-
- explicit Channel(std::shared_ptr<::grpc::Channel> _channel)
- : channel(std::move(_channel)) {}
-
-private:
- std::shared_ptr<::grpc::Channel> channel;
-
- friend class client::Runtime;
-};
-
-
/**
* Represents errors caused by non-OK gRPC statuses. See:
* https://grpc.io/grpc/cpp/classgrpc_1_1_status.html
@@ -117,6 +89,28 @@ struct MethodTraits<
/**
+ * A copyable interface to manage a connection to a gRPC server. All
+ * `Connection` copies share the same gRPC channel which is thread safe. Note
+ * that the actual connection is established lazily by the gRPC library at the
+ * time an RPC is made to the channel.
+ */
+class Connection
+{
+public:
+ Connection(
+ const std::string& uri,
+ const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
+ ::grpc::InsecureChannelCredentials())
+ : channel(::grpc::CreateChannel(uri, credentials)) {}
+
+ explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
+ : channel(std::move(_channel)) {}
+
+ const std::shared_ptr<::grpc::Channel> channel;
+};
+
+
+/**
* A copyable interface to manage an internal gRPC runtime instance for
* asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
* `CompletionQueue` to manage outstanding requests, a looper thread to
@@ -143,7 +137,7 @@ public:
* future never fails; it will return a `StatusError` if a non-OK status is
* returned for the call, so the caller can handle the error programmatically.
*
- * @param channel A connection to a gRPC server.
+ * @param connection A connection to a gRPC server.
* @param rpc The asynchronous gRPC call to make. This can be obtained
* by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
* @param request The request protobuf for the gRPC call.
@@ -156,7 +150,7 @@ public:
typename Response =
typename internal::MethodTraits<Method>::response_type>
Future<Try<Response, StatusError>> call(
- const Channel& channel,
+ const Connection& connection,
Method&& method,
const Request& request)
{
@@ -193,9 +187,9 @@ public:
std::shared_ptr<Response> response(new Response());
std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
- std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
- (typename internal::MethodTraits<Method>::stub_type(
- channel.channel).*method)(context.get(), request, &data->queue));
+ std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
+ (typename internal::MethodTraits<Method>::stub_type(
+ connection.channel).*method)(context.get(), request, &data->queue);
reader->StartCall();
reader->Finish(
http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index f1cdb5e..eb9621b 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -46,7 +46,6 @@ using ::grpc::Status;
using process::Future;
using process::Promise;
-using process::grpc::Channel;
using process::grpc::StatusError;
using testing::_;
@@ -67,7 +66,7 @@ public:
MOCK_METHOD3(Send, Status(ServerContext*, const Ping* ping, Pong* pong));
- Try<Channel> startup(const Option<string>& address = None())
+ Try<client::Connection> startup(const Option<string>& address = None())
{
ServerBuilder builder;
@@ -83,8 +82,8 @@ public:
}
return address.isSome()
- ? Channel(address.get())
- : Channel(server->InProcessChannel(ChannelArguments()));
+ ? client::Connection(address.get())
+ : client::Connection(server->InProcessChannel(ChannelArguments()));
}
Try<Nothing> shutdown()
@@ -104,7 +103,7 @@ class GRPCClientTest : public TemporaryDirectoryTest
{
protected:
// TODO(chhsiao): Consider removing this once we have a way to get a
- // channel before the server starts on Windows. See the
+ // connection before the server starts on Windows. See the
// `DiscardedBeforeServerStarted` test below.
string server_address() const
{
@@ -118,13 +117,13 @@ protected:
TEST_F(GRPCClientTest, Success)
{
PingPongServer server;
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
EXPECT_SOME(pong.get());
@@ -140,8 +139,8 @@ TEST_F(GRPCClientTest, Success)
TEST_F(GRPCClientTest, ConcurrentRPCs)
{
PingPongServer server;
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
shared_ptr<Promise<Nothing>> processed1(new Promise<Nothing>());
shared_ptr<Promise<Nothing>> processed2(new Promise<Nothing>());
@@ -171,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong1 =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
Future<Try<Pong, StatusError>> pong2 =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
Future<Try<Pong, StatusError>> pong3 =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed1->future());
AWAIT_READY(processed2->future());
@@ -210,13 +209,13 @@ TEST_F(GRPCClientTest, StatusError)
EXPECT_CALL(server, Send(_, _, _))
.WillOnce(Return(Status::CANCELLED));
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
EXPECT_ERROR(pong.get());
@@ -238,11 +237,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
EXPECT_CALL(server, Send(_, _, _))
.Times(0);
- Channel channel(server_address());
+ client::Connection connection(server_address());
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
pong.discard();
@@ -274,13 +273,13 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
}),
Return(Status::OK)));
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -313,13 +312,13 @@ TEST_F(GRPCClientTest, ClientShutdown)
}),
Return(Status::OK)));
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -343,11 +342,11 @@ TEST_F(GRPCClientTest, ClientShutdown)
// to connect to the server.
TEST_F(GRPCClientTest, ServerUnreachable)
{
- Channel channel("nosuchhost");
+ client::Connection connection("nosuchhost");
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
@@ -375,13 +374,13 @@ TEST_F(GRPCClientTest, ServerTimeout)
}),
Return(Status::OK)));
- Try<Channel> channel = server.startup();
- ASSERT_SOME(channel);
+ Try<client::Connection> connection = server.startup();
+ ASSERT_SOME(connection);
client::Runtime runtime;
Future<Try<Pong, StatusError>> pong =
- runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+ runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
// TODO(chhsiao): The gRPC library returns a failure after the default timeout
// (5 seconds) is passed. The timeout should be lowered once we support it.
[6/6] mesos git commit: Replaced `RpcResult` with
`Try`.
Posted by ch...@apache.org.
Replaced `RpcResult<Response>` with `Try<Response, StatusError>`.
The `process::grpc::client::Runtime::call` method currently returns a
`RpcResult<Response>`, which contains both a `::grpc::Status` object
and the resulting response protobuf. However, if the `::grpc::Status`
represents a non-OK status, the gRPC library does not guarantee that
the response protobuf is valid. This patch replaces `RpcResult` with
`Try` to provide better type safety.
Review: https://reviews.apache.org/r/67154
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9a94eb69
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9a94eb69
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9a94eb69
Branch: refs/heads/master
Commit: 9a94eb697f8a7b38955cdf2f21b81d3fda3e2845
Parents: f8829f8
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon May 14 22:00:35 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/grpc.hpp | 42 +++++++++------
3rdparty/libprocess/src/tests/grpc_tests.cpp | 66 +++++++++++++----------
2 files changed, 63 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a94eb69/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 321a46e..f7f3171 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -18,6 +18,7 @@
#include <memory>
#include <thread>
#include <type_traits>
+#include <utility>
#include <google/protobuf/message.h>
@@ -75,20 +76,22 @@ private:
/**
- * The response of a RPC call. It includes the gRPC `Status`
- * (https://grpc.io/grpc/cpp/classgrpc_1_1_status.html), and
- * the actual protobuf response body.
+ * Represents errors caused by non-OK gRPC statuses. See:
+ * https://grpc.io/grpc/cpp/classgrpc_1_1_status.html
*/
-template <typename T>
-struct RpcResult
+class StatusError : public Error
{
- RpcResult(const ::grpc::Status& _status, const T& _response)
- : status(_status), response(_response) {}
+public:
+ StatusError(::grpc::Status _status)
+ : Error(_status.error_message()), status(std::move(_status))
+ {
+ CHECK(!status.ok());
+ }
- ::grpc::Status status;
- T response;
+ const ::grpc::Status status;
};
+
namespace client {
/**
@@ -112,14 +115,20 @@ public:
/**
* Sends an asynchronous gRPC call.
*
+ * This function returns a `Future` of a `Try` such that the response protobuf
+ * is returned only if the gRPC call returns an OK status to ensure type
+ * safety (see https://github.com/grpc/grpc/issues/12824). Note that the
+ * future never fails; it will return a `StatusError` if a non-OK status is
+ * returned for the call, so the caller can handle the error programmatically.
+ *
* @param channel A connection to a gRPC server.
* @param rpc The asynchronous gRPC call to make. This can be obtained
* by the `GRPC_RPC(Service, RPC)` macro.
* @param request The request protobuf for the gRPC call.
- * @return a `Future` waiting for a response protobuf.
+ * @return a `Future` of `Try` waiting for a response protobuf or an error.
*/
template <typename Stub, typename Request, typename Response>
- Future<RpcResult<Response>> call(
+ Future<Try<Response, StatusError>> call(
const Channel& channel,
std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
::grpc::ClientContext*,
@@ -152,8 +161,8 @@ public:
// an asynchronous gRPC call through the `CompletionQueue`
// managed by `data`. The `Promise` will be set by the callback
// upon server response.
- std::shared_ptr<Promise<RpcResult<Response>>> promise(
- new Promise<RpcResult<Response>>);
+ std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
+ new Promise<Try<Response, StatusError>>);
promise->future().onDiscard([=] { context->TryCancel(); });
@@ -175,10 +184,11 @@ public:
CHECK(promise->future().isPending());
if (promise->future().hasDiscard()) {
promise->discard();
- return;
+ } else {
+ promise->set(status->ok()
+ ? std::move(*response)
+ : Try<Response, StatusError>::error(std::move(*status)));
}
-
- promise->set(RpcResult<Response>(*status, *response));
}));
return promise->future();
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a94eb69/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 38cd6c6..07c2f3e 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -47,7 +47,7 @@ using process::Future;
using process::Promise;
using process::grpc::Channel;
-using process::grpc::RpcResult;
+using process::grpc::StatusError;
using testing::_;
using testing::DoAll;
@@ -123,11 +123,11 @@ TEST_F(GRPCClientTest, Success)
client::Runtime runtime;
- Future<RpcResult<Pong>> pong =
+ Future<Try<Pong, StatusError>> pong =
runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
- EXPECT_TRUE(pong->status.ok());
+ EXPECT_SOME(pong.get());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
@@ -170,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
client::Runtime runtime;
- Future<RpcResult<Pong>> pong1 =
+ Future<Try<Pong, StatusError>> pong1 =
runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
- Future<RpcResult<Pong>> pong2 =
+ Future<Try<Pong, StatusError>> pong2 =
runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
- Future<RpcResult<Pong>> pong3 =
+ Future<Try<Pong, StatusError>> pong3 =
runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
AWAIT_READY(processed1->future());
@@ -186,13 +186,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
pinged->set(Nothing());
AWAIT_ASSERT_READY(pong1);
- EXPECT_TRUE(pong1->status.ok());
+ EXPECT_SOME(pong1.get());
AWAIT_ASSERT_READY(pong2);
- EXPECT_TRUE(pong2->status.ok());
+ EXPECT_SOME(pong2.get());
AWAIT_ASSERT_READY(pong3);
- EXPECT_TRUE(pong3->status.ok());
+ EXPECT_SOME(pong3.get());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
@@ -201,9 +201,9 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
}
-// This test verifies that a gRPC future fails when the server responds
-// with a status other than OK for the given call.
-TEST_F(GRPCClientTest, Failed)
+// This test verifies that a gRPC future is set with an error when the server
+// responds with a status other than OK for the given call.
+TEST_F(GRPCClientTest, StatusError)
{
PingPongServer server;
@@ -215,11 +215,12 @@ TEST_F(GRPCClientTest, Failed)
client::Runtime runtime;
- Future<RpcResult<Pong>> pong =
+ Future<Try<Pong, StatusError>> pong =
runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
AWAIT_ASSERT_READY(pong);
- EXPECT_FALSE(pong->status.ok());
+ EXPECT_ERROR(pong.get());
+ EXPECT_EQ(::grpc::CANCELLED, pong->error().status.error_code());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
@@ -240,7 +241,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
Channel channel(server_address());
client::Runtime runtime;
- Future<RpcResult<Pong>> pong =
+ Future<Try<Pong, StatusError>> pong =
runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
pong.discard();
@@ -278,7 +279,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
client::Runtime runtime;
- Future<RpcResult<Pong>> pong =
+ Future<Try<Pong, StatusError>> pong =
runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -295,7 +296,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
}
-// This test verifies that a gRPC future fails properly when the runtime
+// This test verifies that a gRPC future is set with an error when the runtime
// is shut down before the server responds.
TEST_F(GRPCClientTest, ClientShutdown)
{
@@ -317,7 +318,7 @@ TEST_F(GRPCClientTest, ClientShutdown)
client::Runtime runtime;
- Future<RpcResult<Pong>> pong =
+ Future<Try<Pong, StatusError>> pong =
runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
AWAIT_READY(processed->future());
@@ -327,32 +328,39 @@ TEST_F(GRPCClientTest, ClientShutdown)
shutdown->set(Nothing());
+ // TODO(chhsiao): The gRPC library returns a failure after the default
+ // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
+ // is shut down. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
- EXPECT_FALSE(pong->status.ok());
+ EXPECT_ERROR(pong.get());
+ EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
ASSERT_SOME(server.shutdown());
}
-// This test verifies that a gRPC future fails when it is unable to
-// connect to the server.
+// This test verifies that a gRPC future is set with an error when it is unable
+// to connect to the server.
TEST_F(GRPCClientTest, ServerUnreachable)
{
Channel channel("nosuchhost");
client::Runtime runtime;
- Future<RpcResult<Pong>> pong =
+ Future<Try<Pong, StatusError>> pong =
runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
runtime.terminate();
AWAIT_ASSERT_READY(runtime.wait());
+ // TODO(chhsiao): The gRPC library returns a failure after the default timeout
+ // (5 seconds) is passed. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
- EXPECT_FALSE(pong->status.ok());
+ EXPECT_ERROR(pong.get());
+ EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
}
-// This test verifies that a gRPC future fails properly when the server
+// This test verifies that a gRPC future is set with an error when the server
// hangs when processing the given call.
TEST_F(GRPCClientTest, ServerTimeout)
{
@@ -372,14 +380,14 @@ TEST_F(GRPCClientTest, ServerTimeout)
client::Runtime runtime;
- Future<RpcResult<Pong>> pong =
+ Future<Try<Pong, StatusError>> pong =
runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
- // TODO(chhsiao): The gRPC library returns a failure after the default
- // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
- // is shut down. The timeout should be lowered once we support it.
+ // TODO(chhsiao): The gRPC library returns a failure after the default timeout
+ // (5 seconds) is passed. The timeout should be lowered once we support it.
AWAIT_ASSERT_READY(pong);
- EXPECT_FALSE(pong->status.ok());
+ EXPECT_ERROR(pong.get());
+ EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
done->set(Nothing());
[4/6] mesos git commit: Supported custom error types for the
`Future(Try<...>)` constructor.
Posted by ch...@apache.org.
Supported custom error types for the `Future(Try<...>)` constructor.
Review: https://reviews.apache.org/r/67191
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f8829f87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f8829f87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f8829f87
Branch: refs/heads/master
Commit: f8829f87e7220eccbdcf2dec6f5e4a63af0a5a7b
Parents: d7a1fe4
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu May 17 12:13:10 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f8829f87/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index fb107d2..b8508c5 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -42,6 +42,7 @@
#include <stout/preprocessor.hpp>
#include <stout/result.hpp>
#include <stout/result_of.hpp>
+#include <stout/stringify.hpp>
#include <stout/synchronized.hpp>
#include <stout/try.hpp>
@@ -107,9 +108,11 @@ public:
/*implicit*/ Future(const Future<T>& that) = default;
/*implicit*/ Future(Future<T>&& that) = default;
- /*implicit*/ Future(const Try<T>& t);
+ template <typename E>
+ /*implicit*/ Future(const Try<T, E>& t);
- /*implicit*/ Future(const Try<Future<T>>& t);
+ template <typename E>
+ /*implicit*/ Future(const Try<Future<T>, E>& t);
~Future() = default;
@@ -1116,23 +1119,27 @@ Future<T>::Future(const ErrnoFailure& failure)
template <typename T>
-Future<T>::Future(const Try<T>& t)
+template <typename E>
+Future<T>::Future(const Try<T, E>& t)
: data(new Data())
{
if (t.isSome()){
set(t.get());
} else {
- fail(t.error());
+ // TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
+ fail(stringify(t.error()));
}
}
template <typename T>
-Future<T>::Future(const Try<Future<T>>& t)
+template <typename E>
+Future<T>::Future(const Try<Future<T>, E>& t)
: data(t.isSome() ? t->data : std::shared_ptr<Data>(new Data()))
{
if (!t.isSome()) {
- fail(t.error());
+ // TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
+ fail(stringify(t.error()));
}
}
[3/6] mesos git commit: Overloaded `stringify` for `Error`s to reduce
overheads.
Posted by ch...@apache.org.
Overloaded `stringify` for `Error`s to reduce overheads.
Review: https://reviews.apache.org/r/67190
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d7a1fe47
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d7a1fe47
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d7a1fe47
Branch: refs/heads/master
Commit: d7a1fe47e06944dcb50d93b697f8cfcf743bb1a5
Parents: 21305ab
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu May 17 12:25:12 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700
----------------------------------------------------------------------
3rdparty/stout/include/stout/stringify.hpp | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d7a1fe47/3rdparty/stout/include/stout/stringify.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/stringify.hpp b/3rdparty/stout/include/stout/stringify.hpp
index 49bf684..ef7b26f 100644
--- a/3rdparty/stout/include/stout/stringify.hpp
+++ b/3rdparty/stout/include/stout/stringify.hpp
@@ -29,6 +29,7 @@
#endif // __WINDOWS__
#include "abort.hpp"
+#include "error.hpp"
#include "hashmap.hpp"
#include "set.hpp"
@@ -190,4 +191,14 @@ std::string stringify(const hashmap<K, V>& map)
return out.str();
}
+
+// TODO(chhsiao): This overload returns a non-const rvalue for consistency.
+// Consider the following overloads instead for better performance:
+// const std::string& stringify(const Error&);
+// std::string stringify(Error&&);
+inline std::string stringify(const Error& error)
+{
+ return error.message;
+}
+
#endif // __STOUT_STRINGIFY_HPP__