You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/23 23:32:05 UTC

[1/6] mesos git commit: Updated the CSI client to use the new libprocess gRPC interface.

Repository: mesos
Updated Branches:
  refs/heads/master 21305ab47 -> 3d2a1fd49


Updated the CSI client to use the new libprocess gRPC interface.

This patch makes the following changes:
1. Replace `GRPC_RPC` with `GRPC_CLIENT_METHOD`.
2. Replace `process::grpc::Channel` with
   `process::grpc::client::Connection`.
3. Make the CSI client metods return a `Future` of a `Try`.

Review: https://reviews.apache.org/r/67158


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d2a1fd4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d2a1fd4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d2a1fd4

Branch: refs/heads/master
Commit: 3d2a1fd49dfe0325f4ca94e3abbab97366bcebd0
Parents: ddfea09
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed May 16 12:10:10 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 src/csi/client.cpp             | 187 +++++++++++++-----------------------
 src/csi/client.hpp             |   6 +-
 src/tests/csi_client_tests.cpp |  19 ++--
 src/tests/mock_csi_plugin.cpp  |   8 +-
 src/tests/mock_csi_plugin.hpp  |   2 +-
 5 files changed, 84 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index 559e805..a4ba1f1 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -19,7 +19,7 @@
 using process::Failure;
 using process::Future;
 
-using process::grpc::RpcResult;
+using process::grpc::StatusError;
 
 namespace mesos {
 namespace csi {
@@ -29,14 +29,10 @@ Future<GetPluginInfoResponse> Client::GetPluginInfo(
     const GetPluginInfoRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Identity, GetPluginInfo), request)
-    .then([](const RpcResult<GetPluginInfoResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Identity, GetPluginInfo), request)
+    .then([](const Try<GetPluginInfoResponse, StatusError>& result)
         -> Future<GetPluginInfoResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -45,14 +41,13 @@ Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities(
     const GetPluginCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Identity, GetPluginCapabilities), request)
-    .then([](const RpcResult<GetPluginCapabilitiesResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
+        request)
+    .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result)
         -> Future<GetPluginCapabilitiesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -61,14 +56,10 @@ Future<ProbeResponse> Client::Probe(
     const ProbeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Identity, Probe), request)
-    .then([](const RpcResult<ProbeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Identity, Probe), request)
+    .then([](const Try<ProbeResponse, StatusError>& result)
         -> Future<ProbeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -77,14 +68,10 @@ Future<CreateVolumeResponse> Client::CreateVolume(
     const CreateVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, CreateVolume), request)
-    .then([](const RpcResult<CreateVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Controller, CreateVolume), request)
+    .then([](const Try<CreateVolumeResponse, StatusError>& result)
         -> Future<CreateVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -93,14 +80,10 @@ Future<DeleteVolumeResponse> Client::DeleteVolume(
     const DeleteVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, DeleteVolume), request)
-    .then([](const RpcResult<DeleteVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Controller, DeleteVolume), request)
+    .then([](const Try<DeleteVolumeResponse, StatusError>& result)
         -> Future<DeleteVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -109,14 +92,13 @@ Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume(
     const ControllerPublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ControllerPublishVolume), request)
-    .then([](const RpcResult<ControllerPublishVolumeResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
+        request)
+    .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result)
         -> Future<ControllerPublishVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -125,14 +107,13 @@ Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume(
     const ControllerUnpublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ControllerUnpublishVolume), request)
-    .then([](const RpcResult<ControllerUnpublishVolumeResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
+        request)
+    .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result)
         -> Future<ControllerUnpublishVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -141,14 +122,13 @@ Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities(
     const ValidateVolumeCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ValidateVolumeCapabilities), request)
-    .then([](const RpcResult<ValidateVolumeCapabilitiesResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
+        request)
+    .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result)
         -> Future<ValidateVolumeCapabilitiesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -157,14 +137,10 @@ Future<ListVolumesResponse> Client::ListVolumes(
     const ListVolumesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ListVolumes), request)
-    .then([](const RpcResult<ListVolumesResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Controller, ListVolumes), request)
+    .then([](const Try<ListVolumesResponse, StatusError>& result)
         -> Future<ListVolumesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -173,14 +149,10 @@ Future<GetCapacityResponse> Client::GetCapacity(
     const GetCapacityRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, GetCapacity), request)
-    .then([](const RpcResult<GetCapacityResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Controller, GetCapacity), request)
+    .then([](const Try<GetCapacityResponse, StatusError>& result)
         -> Future<GetCapacityResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -189,14 +161,13 @@ Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
     const ControllerGetCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ControllerGetCapabilities), request)
-    .then([](const RpcResult<ControllerGetCapabilitiesResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
+        request)
+    .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result)
         -> Future<ControllerGetCapabilitiesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -205,14 +176,10 @@ Future<NodeStageVolumeResponse> Client::NodeStageVolume(
     const NodeStageVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeStageVolume), request)
-    .then([](const RpcResult<NodeStageVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeStageVolume), request)
+    .then([](const Try<NodeStageVolumeResponse, StatusError>& result)
         -> Future<NodeStageVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -221,14 +188,10 @@ Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume(
     const NodeUnstageVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeUnstageVolume), request)
-    .then([](const RpcResult<NodeUnstageVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeUnstageVolume), request)
+    .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result)
         -> Future<NodeUnstageVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -237,14 +200,10 @@ Future<NodePublishVolumeResponse> Client::NodePublishVolume(
     const NodePublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodePublishVolume), request)
-    .then([](const RpcResult<NodePublishVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodePublishVolume), request)
+    .then([](const Try<NodePublishVolumeResponse, StatusError>& result)
         -> Future<NodePublishVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -253,14 +212,10 @@ Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
     const NodeUnpublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeUnpublishVolume), request)
-    .then([](const RpcResult<NodeUnpublishVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume), request)
+    .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result)
         -> Future<NodeUnpublishVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -269,14 +224,10 @@ Future<NodeGetIdResponse> Client::NodeGetId(
     const NodeGetIdRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeGetId), request)
-    .then([](const RpcResult<NodeGetIdResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeGetId), request)
+    .then([](const Try<NodeGetIdResponse, StatusError>& result)
         -> Future<NodeGetIdResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -285,14 +236,10 @@ Future<NodeGetCapabilitiesResponse> Client::NodeGetCapabilities(
     const NodeGetCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeGetCapabilities), request)
-    .then([](const RpcResult<NodeGetCapabilitiesResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeGetCapabilities), request)
+    .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result)
         -> Future<NodeGetCapabilitiesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/csi/client.hpp
----------------------------------------------------------------------
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 5d84674..9d7019a 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -30,9 +30,9 @@ namespace v0 {
 class Client
 {
 public:
-  Client(const process::grpc::Channel& _channel,
+  Client(const process::grpc::client::Connection& _connection,
          const process::grpc::client::Runtime& _runtime)
-    : channel(_channel), runtime(_runtime) {}
+    : connection(_connection), runtime(_runtime) {}
 
   // RPCs for the Identity service.
   process::Future<GetPluginInfoResponse>
@@ -90,7 +90,7 @@ public:
     NodeGetCapabilities(const NodeGetCapabilitiesRequest& request);
 
 private:
-  process::grpc::Channel channel;
+  process::grpc::client::Connection connection;
   process::grpc::client::Runtime runtime;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/csi_client_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index f5b9eac..d5993d6 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -32,8 +32,7 @@ using mesos::csi::v0::Client;
 
 using process::Future;
 
-using process::grpc::Channel;
-
+using process::grpc::client::Connection;
 using process::grpc::client::Runtime;
 
 using testing::TestParamInfo;
@@ -57,13 +56,13 @@ struct RPCParam
   template <typename Request, typename Response>
   RPCParam(const string& _name, Future<Response>(Client::*rpc)(const Request&))
     : name(_name),
-      call([=](const Channel& channel, const Runtime runtime) {
-        return (Client(channel, runtime).*rpc)(Request())
+      call([=](const Connection& connection, const Runtime runtime) {
+        return (Client(connection, runtime).*rpc)(Request())
           .then([] { return Nothing(); });
       }) {}
 
   string name;
-  lambda::function<Future<Nothing>(const Channel&, const Runtime&)> call;
+  lambda::function<Future<Nothing>(const Connection&, const Runtime&)> call;
 };
 
 
@@ -76,10 +75,10 @@ protected:
   {
     TemporaryDirectoryTest::SetUp();
 
-    Try<Channel> _channel = plugin.startup();
-    ASSERT_SOME(_channel);
+    Try<Connection> _connection = plugin.startup();
+    ASSERT_SOME(_connection);
 
-    channel = _channel.get();
+    connection = _connection.get();
   }
 
   virtual void TearDown() override
@@ -91,7 +90,7 @@ protected:
   }
 
   MockCSIPlugin plugin;
-  Option<process::grpc::Channel> channel;
+  Option<process::grpc::client::Connection> connection;
   process::grpc::client::Runtime runtime;
 };
 
@@ -139,7 +138,7 @@ INSTANTIATE_TEST_CASE_P(
 // This test verifies that the all methods of CSI clients work.
 TEST_P(CSIClientTest, Call)
 {
-  Future<Nothing> call = GetParam().call(channel.get(), runtime);
+  Future<Nothing> call = GetParam().call(connection.get(), runtime);
   AWAIT_EXPECT_READY(call);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/mock_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.cpp b/src/tests/mock_csi_plugin.cpp
index 6983b84..17c6335 100644
--- a/src/tests/mock_csi_plugin.cpp
+++ b/src/tests/mock_csi_plugin.cpp
@@ -30,7 +30,7 @@ using mesos::csi::v0::Controller;
 using mesos::csi::v0::Identity;
 using mesos::csi::v0::Node;
 
-using process::grpc::Channel;
+using process::grpc::client::Connection;
 
 using testing::_;
 using testing::Return;
@@ -49,7 +49,7 @@ MockCSIPlugin::MockCSIPlugin()
 }
 
 
-Try<Channel> MockCSIPlugin::startup(const Option<string>& address)
+Try<Connection> MockCSIPlugin::startup(const Option<string>& address)
 {
   ServerBuilder builder;
 
@@ -67,8 +67,8 @@ Try<Channel> MockCSIPlugin::startup(const Option<string>& address)
   }
 
   return address.isSome()
-    ? Channel(address.get())
-    : Channel(server->InProcessChannel(ChannelArguments()));
+    ? Connection(address.get())
+    : Connection(server->InProcessChannel(ChannelArguments()));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/mock_csi_plugin.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.hpp b/src/tests/mock_csi_plugin.hpp
index 6f7a5ab..4642326 100644
--- a/src/tests/mock_csi_plugin.hpp
+++ b/src/tests/mock_csi_plugin.hpp
@@ -72,7 +72,7 @@ public:
 
   CSI_METHOD_FOREACH(DECLARE_MOCK_CSI_METHOD)
 
-  Try<process::grpc::Channel> startup(
+  Try<process::grpc::client::Connection> startup(
       const Option<std::string>& address = None());
   Try<Nothing> shutdown();
 


[2/6] mesos git commit: Renamed `GRPC_RPC` and adapted the new gRPC async API.

Posted by ch...@apache.org.
Renamed `GRPC_RPC` and adapted the new gRPC async API.

To better describe the `GRPC_RPC` macro and to avoid future name
conflicts, we renamed it to `GRPC_CLIENT_METHOD`. Additionally,
we adapted the new gRPC asynchronous client API. See:
https://github.com/grpc/grpc/pull/12269

We also introduced the `MethodTraits` internal helper to simplify the
declaration of `Runtime::call`.

Review: https://reviews.apache.org/r/67155


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bac270b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bac270b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bac270b

Branch: refs/heads/master
Commit: 3bac270b0ed99315edf72d24653c1cb068247c20
Parents: 9a94eb6
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 15 16:43:33 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 44 ++++++++++++++++++-----
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 20 +++++------
 2 files changed, 45 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index f7f3171..68bdeb1 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -41,8 +41,8 @@
 // interface to create an asynchrous gRPC call and return a `Future`.
 
 
-#define GRPC_RPC(service, rpc) \
-  (&service::Stub::Async##rpc)
+#define GRPC_CLIENT_METHOD(service, rpc) \
+  (&service::Stub::PrepareAsync##rpc)
 
 namespace process {
 namespace grpc {
@@ -94,6 +94,28 @@ public:
 
 namespace client {
 
+// Internal helper utilities.
+namespace internal {
+
+template <typename T>
+struct MethodTraits; // Undefined.
+
+
+template <typename Stub, typename Request, typename Response>
+struct MethodTraits<
+    std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
+        ::grpc::ClientContext*,
+        const Request&,
+        ::grpc::CompletionQueue*)>
+{
+  typedef Stub stub_type;
+  typedef Request request_type;
+  typedef Response response_type;
+};
+
+} // namespace internal {
+
+
 /**
  * A copyable interface to manage an internal gRPC runtime instance for
  * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
@@ -123,17 +145,19 @@ public:
    *
    * @param channel A connection to a gRPC server.
    * @param rpc The asynchronous gRPC call to make. This can be obtained
-   *     by the `GRPC_RPC(Service, RPC)` macro.
+   *     by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
    * @param request The request protobuf for the gRPC call.
    * @return a `Future` of `Try` waiting for a response protobuf or an error.
    */
-  template <typename Stub, typename Request, typename Response>
+  template <
+      typename Method,
+      typename Request =
+        typename internal::MethodTraits<Method>::request_type,
+      typename Response =
+        typename internal::MethodTraits<Method>::response_type>
   Future<Try<Response, StatusError>> call(
       const Channel& channel,
-      std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
-          ::grpc::ClientContext*,
-          const Request&,
-          ::grpc::CompletionQueue*),
+      Method&& method,
       const Request& request)
   {
     static_assert(
@@ -170,8 +194,10 @@ public:
       std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
 
       std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
-          (Stub(channel.channel).*rpc)(context.get(), request, &data->queue));
+          (typename internal::MethodTraits<Method>::stub_type(
+              channel.channel).*method)(context.get(), request, &data->queue));
 
+      reader->StartCall();
       reader->Finish(
           response.get(),
           status.get(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/3bac270b/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 07c2f3e..f1cdb5e 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -124,7 +124,7 @@ TEST_F(GRPCClientTest, Success)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_SOME(pong.get());
@@ -171,13 +171,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong1 =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   Future<Try<Pong, StatusError>> pong2 =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   Future<Try<Pong, StatusError>> pong3 =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed1->future());
   AWAIT_READY(processed2->future());
@@ -216,7 +216,7 @@ TEST_F(GRPCClientTest, StatusError)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
@@ -242,7 +242,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   pong.discard();
 
@@ -280,7 +280,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -319,7 +319,7 @@ TEST_F(GRPCClientTest, ClientShutdown)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -347,7 +347,7 @@ TEST_F(GRPCClientTest, ServerUnreachable)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -381,7 +381,7 @@ TEST_F(GRPCClientTest, ServerTimeout)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
+    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   // TODO(chhsiao): The gRPC library returns a failure after the default timeout
   // (5 seconds) is passed. The timeout should be lowered once we support it.


[5/6] mesos git commit: Renamed `grpc::Channel` to `grpc::client::Connection`.

Posted by ch...@apache.org.
Renamed `grpc::Channel` to `grpc::client::Connection`.

This renaming is made to avoid name conflicts between `::grpc::Channel`
and libprocess' own wrapper. Also, since this wrapper is only used
at the client side, it is moved into the `client` namespace.

Review: https://reviews.apache.org/r/67156


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ddfea093
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ddfea093
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ddfea093

Branch: refs/heads/master
Commit: ddfea093f576cdd22d4fa221326406eaf8e59b49
Parents: 3bac270
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 15 17:54:24 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 70 +++++++++++------------
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 57 +++++++++---------
 2 files changed, 60 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 68bdeb1..cf165f0 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -34,11 +34,11 @@
 #include <stout/try.hpp>
 
 
-// This file provides libprocess "support" for using gRPC. In
-// particular, it defines two wrapper classes: `Channel` (representing a
-// connection to a gRPC server) and `client::Runtime`, which integrates
-// an event loop waiting for gRPC responses, and provides the `call`
-// interface to create an asynchrous gRPC call and return a `Future`.
+// This file provides libprocess "support" for using gRPC. In particular, it
+// defines two wrapper classes: `client::Connection` which represents a
+// connection to a gRPC server, and `client::Runtime` which integrates an event
+// loop waiting for gRPC responses and provides the `call` interface to create
+// an asynchronous gRPC call and return a `Future`.
 
 
 #define GRPC_CLIENT_METHOD(service, rpc) \
@@ -47,34 +47,6 @@
 namespace process {
 namespace grpc {
 
-// Forward declarations.
-namespace client { class Runtime; }
-
-
-/**
- * A copyable interface to manage a connection to a gRPC server.
- * All `Channel` copies share the same connection. Note that the
- * connection is established lazily by the gRPC runtime library: the
- * actual connection is delayed till an RPC call is made.
- */
-class Channel
-{
-public:
-  Channel(const std::string& uri,
-          const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
-            ::grpc::InsecureChannelCredentials())
-    : channel(::grpc::CreateChannel(uri, credentials)) {}
-
-  explicit Channel(std::shared_ptr<::grpc::Channel> _channel)
-    : channel(std::move(_channel)) {}
-
-private:
-  std::shared_ptr<::grpc::Channel> channel;
-
-  friend class client::Runtime;
-};
-
-
 /**
  * Represents errors caused by non-OK gRPC statuses. See:
  * https://grpc.io/grpc/cpp/classgrpc_1_1_status.html
@@ -117,6 +89,28 @@ struct MethodTraits<
 
 
 /**
+ * A copyable interface to manage a connection to a gRPC server. All
+ * `Connection` copies share the same gRPC channel which is thread safe. Note
+ * that the actual connection is established lazily by the gRPC library at the
+ * time an RPC is made to the channel.
+ */
+class Connection
+{
+public:
+  Connection(
+      const std::string& uri,
+      const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
+        ::grpc::InsecureChannelCredentials())
+    : channel(::grpc::CreateChannel(uri, credentials)) {}
+
+  explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
+    : channel(std::move(_channel)) {}
+
+  const std::shared_ptr<::grpc::Channel> channel;
+};
+
+
+/**
  * A copyable interface to manage an internal gRPC runtime instance for
  * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
  * `CompletionQueue` to manage outstanding requests, a looper thread to
@@ -143,7 +137,7 @@ public:
    * future never fails; it will return a `StatusError` if a non-OK status is
    * returned for the call, so the caller can handle the error programmatically.
    *
-   * @param channel A connection to a gRPC server.
+   * @param connection A connection to a gRPC server.
    * @param rpc The asynchronous gRPC call to make. This can be obtained
    *     by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
    * @param request The request protobuf for the gRPC call.
@@ -156,7 +150,7 @@ public:
       typename Response =
         typename internal::MethodTraits<Method>::response_type>
   Future<Try<Response, StatusError>> call(
-      const Channel& channel,
+      const Connection& connection,
       Method&& method,
       const Request& request)
   {
@@ -193,9 +187,9 @@ public:
       std::shared_ptr<Response> response(new Response());
       std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
 
-      std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
-          (typename internal::MethodTraits<Method>::stub_type(
-              channel.channel).*method)(context.get(), request, &data->queue));
+      std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
+        (typename internal::MethodTraits<Method>::stub_type(
+            connection.channel).*method)(context.get(), request, &data->queue);
 
       reader->StartCall();
       reader->Finish(

http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index f1cdb5e..eb9621b 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -46,7 +46,6 @@ using ::grpc::Status;
 using process::Future;
 using process::Promise;
 
-using process::grpc::Channel;
 using process::grpc::StatusError;
 
 using testing::_;
@@ -67,7 +66,7 @@ public:
 
   MOCK_METHOD3(Send, Status(ServerContext*, const Ping* ping, Pong* pong));
 
-  Try<Channel> startup(const Option<string>& address = None())
+  Try<client::Connection> startup(const Option<string>& address = None())
   {
     ServerBuilder builder;
 
@@ -83,8 +82,8 @@ public:
     }
 
     return address.isSome()
-      ? Channel(address.get())
-      : Channel(server->InProcessChannel(ChannelArguments()));
+      ? client::Connection(address.get())
+      : client::Connection(server->InProcessChannel(ChannelArguments()));
   }
 
   Try<Nothing> shutdown()
@@ -104,7 +103,7 @@ class GRPCClientTest : public TemporaryDirectoryTest
 {
 protected:
   // TODO(chhsiao): Consider removing this once we have a way to get a
-  // channel before the server starts on Windows. See the
+  // connection before the server starts on Windows. See the
   // `DiscardedBeforeServerStarted` test below.
   string server_address() const
   {
@@ -118,13 +117,13 @@ protected:
 TEST_F(GRPCClientTest, Success)
 {
   PingPongServer server;
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_SOME(pong.get());
@@ -140,8 +139,8 @@ TEST_F(GRPCClientTest, Success)
 TEST_F(GRPCClientTest, ConcurrentRPCs)
 {
   PingPongServer server;
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   shared_ptr<Promise<Nothing>> processed1(new Promise<Nothing>());
   shared_ptr<Promise<Nothing>> processed2(new Promise<Nothing>());
@@ -171,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong1 =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   Future<Try<Pong, StatusError>> pong2 =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   Future<Try<Pong, StatusError>> pong3 =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed1->future());
   AWAIT_READY(processed2->future());
@@ -210,13 +209,13 @@ TEST_F(GRPCClientTest, StatusError)
   EXPECT_CALL(server, Send(_, _, _))
     .WillOnce(Return(Status::CANCELLED));
 
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
@@ -238,11 +237,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
   EXPECT_CALL(server, Send(_, _, _))
     .Times(0);
 
-  Channel channel(server_address());
+  client::Connection connection(server_address());
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   pong.discard();
 
@@ -274,13 +273,13 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
         }),
         Return(Status::OK)));
 
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -313,13 +312,13 @@ TEST_F(GRPCClientTest, ClientShutdown)
         }),
         Return(Status::OK)));
 
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -343,11 +342,11 @@ TEST_F(GRPCClientTest, ClientShutdown)
 // to connect to the server.
 TEST_F(GRPCClientTest, ServerUnreachable)
 {
-  Channel channel("nosuchhost");
+  client::Connection connection("nosuchhost");
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -375,13 +374,13 @@ TEST_F(GRPCClientTest, ServerTimeout)
         }),
         Return(Status::OK)));
 
-  Try<Channel> channel = server.startup();
-  ASSERT_SOME(channel);
+  Try<client::Connection> connection = server.startup();
+  ASSERT_SOME(connection);
 
   client::Runtime runtime;
 
   Future<Try<Pong, StatusError>> pong =
-    runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
 
   // TODO(chhsiao): The gRPC library returns a failure after the default timeout
   // (5 seconds) is passed. The timeout should be lowered once we support it.


[6/6] mesos git commit: Replaced `RpcResult` with `Try`.

Posted by ch...@apache.org.
Replaced `RpcResult<Response>` with `Try<Response, StatusError>`.

The `process::grpc::client::Runtime::call` method currently returns a
`RpcResult<Response>`, which contains both a `::grpc::Status` object
and the resulting response protobuf. However, if the `::grpc::Status`
represents a non-OK status, the gRPC library does not guarantee that
the response protobuf is valid. This patch replaces `RpcResult` with
`Try` to provide better type safety.

Review: https://reviews.apache.org/r/67154


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9a94eb69
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9a94eb69
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9a94eb69

Branch: refs/heads/master
Commit: 9a94eb697f8a7b38955cdf2f21b81d3fda3e2845
Parents: f8829f8
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon May 14 22:00:35 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 42 +++++++++------
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 66 +++++++++++++----------
 2 files changed, 63 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9a94eb69/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 321a46e..f7f3171 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -18,6 +18,7 @@
 #include <memory>
 #include <thread>
 #include <type_traits>
+#include <utility>
 
 #include <google/protobuf/message.h>
 
@@ -75,20 +76,22 @@ private:
 
 
 /**
- * The response of a RPC call. It includes the gRPC `Status`
- * (https://grpc.io/grpc/cpp/classgrpc_1_1_status.html), and
- * the actual protobuf response body.
+ * Represents errors caused by non-OK gRPC statuses. See:
+ * https://grpc.io/grpc/cpp/classgrpc_1_1_status.html
  */
-template <typename T>
-struct RpcResult
+class StatusError : public Error
 {
-  RpcResult(const ::grpc::Status& _status, const T& _response)
-    : status(_status), response(_response) {}
+public:
+  StatusError(::grpc::Status _status)
+    : Error(_status.error_message()), status(std::move(_status))
+  {
+    CHECK(!status.ok());
+  }
 
-  ::grpc::Status status;
-  T response;
+  const ::grpc::Status status;
 };
 
+
 namespace client {
 
 /**
@@ -112,14 +115,20 @@ public:
   /**
    * Sends an asynchronous gRPC call.
    *
+   * This function returns a `Future` of a `Try` such that the response protobuf
+   * is returned only if the gRPC call returns an OK status to ensure type
+   * safety (see https://github.com/grpc/grpc/issues/12824). Note that the
+   * future never fails; it will return a `StatusError` if a non-OK status is
+   * returned for the call, so the caller can handle the error programmatically.
+   *
    * @param channel A connection to a gRPC server.
    * @param rpc The asynchronous gRPC call to make. This can be obtained
    *     by the `GRPC_RPC(Service, RPC)` macro.
    * @param request The request protobuf for the gRPC call.
-   * @return a `Future` waiting for a response protobuf.
+   * @return a `Future` of `Try` waiting for a response protobuf or an error.
    */
   template <typename Stub, typename Request, typename Response>
-  Future<RpcResult<Response>> call(
+  Future<Try<Response, StatusError>> call(
       const Channel& channel,
       std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
           ::grpc::ClientContext*,
@@ -152,8 +161,8 @@ public:
       // an asynchronous gRPC call through the `CompletionQueue`
       // managed by `data`. The `Promise` will be set by the callback
       // upon server response.
-      std::shared_ptr<Promise<RpcResult<Response>>> promise(
-          new Promise<RpcResult<Response>>);
+      std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
+          new Promise<Try<Response, StatusError>>);
 
       promise->future().onDiscard([=] { context->TryCancel(); });
 
@@ -175,10 +184,11 @@ public:
                 CHECK(promise->future().isPending());
                 if (promise->future().hasDiscard()) {
                   promise->discard();
-                  return;
+                } else {
+                  promise->set(status->ok()
+                    ? std::move(*response)
+                    : Try<Response, StatusError>::error(std::move(*status)));
                 }
-
-                promise->set(RpcResult<Response>(*status, *response));
               }));
 
       return promise->future();

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a94eb69/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 38cd6c6..07c2f3e 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -47,7 +47,7 @@ using process::Future;
 using process::Promise;
 
 using process::grpc::Channel;
-using process::grpc::RpcResult;
+using process::grpc::StatusError;
 
 using testing::_;
 using testing::DoAll;
@@ -123,11 +123,11 @@ TEST_F(GRPCClientTest, Success)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
-  EXPECT_TRUE(pong->status.ok());
+  EXPECT_SOME(pong.get());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -170,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong1 =
+  Future<Try<Pong, StatusError>> pong1 =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
-  Future<RpcResult<Pong>> pong2 =
+  Future<Try<Pong, StatusError>> pong2 =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
-  Future<RpcResult<Pong>> pong3 =
+  Future<Try<Pong, StatusError>> pong3 =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed1->future());
@@ -186,13 +186,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
   pinged->set(Nothing());
 
   AWAIT_ASSERT_READY(pong1);
-  EXPECT_TRUE(pong1->status.ok());
+  EXPECT_SOME(pong1.get());
 
   AWAIT_ASSERT_READY(pong2);
-  EXPECT_TRUE(pong2->status.ok());
+  EXPECT_SOME(pong2.get());
 
   AWAIT_ASSERT_READY(pong3);
-  EXPECT_TRUE(pong3->status.ok());
+  EXPECT_SOME(pong3.get());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -201,9 +201,9 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
 }
 
 
-// This test verifies that a gRPC future fails when the server responds
-// with a status other than OK for the given call.
-TEST_F(GRPCClientTest, Failed)
+// This test verifies that a gRPC future is set with an error when the server
+// responds with a status other than OK for the given call.
+TEST_F(GRPCClientTest, StatusError)
 {
   PingPongServer server;
 
@@ -215,11 +215,12 @@ TEST_F(GRPCClientTest, Failed)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
-  EXPECT_FALSE(pong->status.ok());
+  EXPECT_ERROR(pong.get());
+  EXPECT_EQ(::grpc::CANCELLED, pong->error().status.error_code());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -240,7 +241,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
   Channel channel(server_address());
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   pong.discard();
@@ -278,7 +279,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
@@ -295,7 +296,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
 }
 
 
-// This test verifies that a gRPC future fails properly when the runtime
+// This test verifies that a gRPC future is set with an error when the runtime
 // is shut down before the server responds.
 TEST_F(GRPCClientTest, ClientShutdown)
 {
@@ -317,7 +318,7 @@ TEST_F(GRPCClientTest, ClientShutdown)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
@@ -327,32 +328,39 @@ TEST_F(GRPCClientTest, ClientShutdown)
 
   shutdown->set(Nothing());
 
+  // TODO(chhsiao): The gRPC library returns a failure after the default
+  // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
+  // is shut down. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
-  EXPECT_FALSE(pong->status.ok());
+  EXPECT_ERROR(pong.get());
+  EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
 
   ASSERT_SOME(server.shutdown());
 }
 
 
-// This test verifies that a gRPC future fails when it is unable to
-// connect to the server.
+// This test verifies that a gRPC future is set with an error when it is unable
+// to connect to the server.
 TEST_F(GRPCClientTest, ServerUnreachable)
 {
   Channel channel("nosuchhost");
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
 
+  // TODO(chhsiao): The gRPC library returns a failure after the default timeout
+  // (5 seconds) is passed. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
-  EXPECT_FALSE(pong->status.ok());
+  EXPECT_ERROR(pong.get());
+  EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
 }
 
 
-// This test verifies that a gRPC future fails properly when the server
+// This test verifies that a gRPC future is set with an error when the server
 // hangs when processing the given call.
 TEST_F(GRPCClientTest, ServerTimeout)
 {
@@ -372,14 +380,14 @@ TEST_F(GRPCClientTest, ServerTimeout)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
-  // TODO(chhsiao): The gRPC library returns a failure after the default
-  // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
-  // is shut down. The timeout should be lowered once we support it.
+  // TODO(chhsiao): The gRPC library returns a failure after the default timeout
+  // (5 seconds) is passed. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
-  EXPECT_FALSE(pong->status.ok());
+  EXPECT_ERROR(pong.get());
+  EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
 
   done->set(Nothing());
 


[4/6] mesos git commit: Supported custom error types for the `Future(Try<...>)` constructor.

Posted by ch...@apache.org.
Supported custom error types for the `Future(Try<...>)` constructor.

Review: https://reviews.apache.org/r/67191


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f8829f87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f8829f87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f8829f87

Branch: refs/heads/master
Commit: f8829f87e7220eccbdcf2dec6f5e4a63af0a5a7b
Parents: d7a1fe4
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu May 17 12:13:10 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f8829f87/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index fb107d2..b8508c5 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -42,6 +42,7 @@
 #include <stout/preprocessor.hpp>
 #include <stout/result.hpp>
 #include <stout/result_of.hpp>
+#include <stout/stringify.hpp>
 #include <stout/synchronized.hpp>
 #include <stout/try.hpp>
 
@@ -107,9 +108,11 @@ public:
   /*implicit*/ Future(const Future<T>& that) = default;
   /*implicit*/ Future(Future<T>&& that) = default;
 
-  /*implicit*/ Future(const Try<T>& t);
+  template <typename E>
+  /*implicit*/ Future(const Try<T, E>& t);
 
-  /*implicit*/ Future(const Try<Future<T>>& t);
+  template <typename E>
+  /*implicit*/ Future(const Try<Future<T>, E>& t);
 
   ~Future() = default;
 
@@ -1116,23 +1119,27 @@ Future<T>::Future(const ErrnoFailure& failure)
 
 
 template <typename T>
-Future<T>::Future(const Try<T>& t)
+template <typename E>
+Future<T>::Future(const Try<T, E>& t)
   : data(new Data())
 {
   if (t.isSome()){
     set(t.get());
   } else {
-    fail(t.error());
+    // TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
+    fail(stringify(t.error()));
   }
 }
 
 
 template <typename T>
-Future<T>::Future(const Try<Future<T>>& t)
+template <typename E>
+Future<T>::Future(const Try<Future<T>, E>& t)
   : data(t.isSome() ? t->data : std::shared_ptr<Data>(new Data()))
 {
   if (!t.isSome()) {
-    fail(t.error());
+    // TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
+    fail(stringify(t.error()));
   }
 }
 


[3/6] mesos git commit: Overloaded `stringify` for `Error`s to reduce overheads.

Posted by ch...@apache.org.
Overloaded `stringify` for `Error`s to reduce overheads.

Review: https://reviews.apache.org/r/67190


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d7a1fe47
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d7a1fe47
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d7a1fe47

Branch: refs/heads/master
Commit: d7a1fe47e06944dcb50d93b697f8cfcf743bb1a5
Parents: 21305ab
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu May 17 12:25:12 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 3rdparty/stout/include/stout/stringify.hpp | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d7a1fe47/3rdparty/stout/include/stout/stringify.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/stringify.hpp b/3rdparty/stout/include/stout/stringify.hpp
index 49bf684..ef7b26f 100644
--- a/3rdparty/stout/include/stout/stringify.hpp
+++ b/3rdparty/stout/include/stout/stringify.hpp
@@ -29,6 +29,7 @@
 #endif // __WINDOWS__
 
 #include "abort.hpp"
+#include "error.hpp"
 #include "hashmap.hpp"
 #include "set.hpp"
 
@@ -190,4 +191,14 @@ std::string stringify(const hashmap<K, V>& map)
   return out.str();
 }
 
+
+// TODO(chhsiao): This overload returns a non-const rvalue for consistency.
+// Consider the following overloads instead for better performance:
+//   const std::string& stringify(const Error&);
+//   std::string stringify(Error&&);
+inline std::string stringify(const Error& error)
+{
+  return error.message;
+}
+
 #endif // __STOUT_STRINGIFY_HPP__