You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/21 01:26:22 UTC

[2/3] mesos git commit: Updated the gRPC client to return both status and payload.

Updated the gRPC client to return both status and payload.

Some user might want to perform differently based on the error code in
the gRPC Status. This patch exposes the gRPC Status by introducing a
struct called `RpcResult` which contains both the gRPC Status and the
response body.

Note that gRPC allows gRPC Status to be non-OK and still has a response
body.

Review: https://reviews.apache.org/r/63967


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7018f963
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7018f963
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7018f963

Branch: refs/heads/master
Commit: 7018f963257b55bf07a6afd317b727f50deeee1d
Parents: b3f81f7
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Nov 20 14:38:17 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 20 17:16:23 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 28 +++++++---
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 63 ++++++++++++++++-------
 2 files changed, 67 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7018f963/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index f5dc8b5..0f63e57 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -71,6 +71,21 @@ private:
 };
 
 
+/**
+ * The response of a RPC call. It includes the gRPC `Status`
+ * (https://grpc.io/grpc/cpp/classgrpc_1_1_status.html), and
+ * the actual protobuf response body.
+ */
+template <typename T>
+struct RpcResult
+{
+  RpcResult(const ::grpc::Status& _status, const T& _response)
+    : status(_status), response(_response) {}
+
+  ::grpc::Status status;
+  T response;
+};
+
 namespace client {
 
 /**
@@ -101,7 +116,7 @@ public:
    * @return a `Future` waiting for a response protobuf.
    */
   template <typename Stub, typename Request, typename Response>
-  Future<Response> call(
+  Future<RpcResult<Response>> call(
       const Channel& channel,
       std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
           ::grpc::ClientContext*,
@@ -129,7 +144,9 @@ public:
       // an asynchronous gRPC call through the `CompletionQueue`
       // managed by `data`. The `Promise` will be set by the callback
       // upon server response.
-      std::shared_ptr<Promise<Response>> promise(new Promise<Response>);
+      std::shared_ptr<Promise<RpcResult<Response>>> promise(
+          new Promise<RpcResult<Response>>);
+
       promise->future().onDiscard([=] { context->TryCancel(); });
 
       std::shared_ptr<Response> response(new Response());
@@ -150,11 +167,10 @@ public:
                 CHECK(promise->future().isPending());
                 if (promise->future().hasDiscard()) {
                   promise->discard();
-                } else if (status->ok()) {
-                  promise->set(*response);
-                } else {
-                  promise->fail(status->error_message());
+                  return;
                 }
+
+                promise->set(RpcResult<Response>(*status, *response));
               }));
 
       return promise->future();

http://git-wip-us.apache.org/repos/asf/mesos/blob/7018f963/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 7ac4ef6..8bb517f 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -46,6 +46,7 @@ using process::Future;
 using process::Promise;
 
 using process::grpc::Channel;
+using process::grpc::RpcResult;
 
 using testing::_;
 using testing::DoAll;
@@ -115,9 +116,11 @@ TEST_F(GRPCClientTest, Success)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
-  AWAIT_EXPECT_READY(pong);
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_TRUE(pong->status.ok());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -160,9 +163,14 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong1 = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
-  Future<Pong> pong2 = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
-  Future<Pong> pong3 = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong1 =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+
+  Future<RpcResult<Pong>> pong2 =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+
+  Future<RpcResult<Pong>> pong3 =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed1->future());
   AWAIT_READY(processed2->future());
@@ -170,9 +178,14 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
 
   pinged->set(Nothing());
 
-  AWAIT_EXPECT_READY(pong1);
-  AWAIT_EXPECT_READY(pong2);
-  AWAIT_EXPECT_READY(pong3);
+  AWAIT_ASSERT_READY(pong1);
+  EXPECT_TRUE(pong1->status.ok());
+
+  AWAIT_ASSERT_READY(pong2);
+  EXPECT_TRUE(pong2->status.ok());
+
+  AWAIT_ASSERT_READY(pong3);
+  EXPECT_TRUE(pong3->status.ok());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -195,9 +208,11 @@ TEST_F(GRPCClientTest, Failed)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
-  AWAIT_EXPECT_FAILED(pong);
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_FALSE(pong->status.ok());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -218,7 +233,9 @@ TEST_F(GRPCClientTest, DiscardedBeforeServerStarted)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+
   pong.discard();
 
   ASSERT_SOME(server.Startup(server_address()));
@@ -254,7 +271,9 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
 
   ASSERT_SOME(server.Startup(server_address()));
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+
   AWAIT_READY(processed->future());
 
   pong.discard();
@@ -291,7 +310,8 @@ TEST_F(GRPCClientTest, ClientShutdown)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -299,7 +319,9 @@ TEST_F(GRPCClientTest, ClientShutdown)
   AWAIT_ASSERT_READY(runtime.wait());
 
   shutdown->set(Nothing());
-  AWAIT_EXPECT_FAILED(pong);
+
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_FALSE(pong->status.ok());
 
   ASSERT_SOME(server.Shutdown());
 }
@@ -312,12 +334,14 @@ TEST_F(GRPCClientTest, ServerUnreachable)
   client::Runtime runtime;
   Channel channel("nosuchhost");
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
 
-  AWAIT_EXPECT_FAILED(pong);
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_FALSE(pong->status.ok());
 }
 
 
@@ -341,12 +365,15 @@ TEST_F(GRPCClientTest, ServerTimeout)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   // TODO(chhsiao): The gRPC library returns a failure after the default
   // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
   // is shut down. The timeout should be lowered once we support it.
-  AWAIT_EXPECT_FAILED(pong);
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_FALSE(pong->status.ok());
+
   done->set(Nothing());
 
   runtime.terminate();