You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/07/17 20:57:51 UTC

[2/2] mesos git commit: Made gRPC call options configurable.

Made gRPC call options configurable.

Review: https://reviews.apache.org/r/67938


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a211b4ca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a211b4ca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a211b4ca

Branch: refs/heads/master
Commit: a211b4cadf289168464fc50987255d883c226e89
Parents: abf11a9
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Jul 16 18:18:11 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue Jul 17 13:50:59 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 35 ++++++---
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 88 ++++++++++++++++-------
 2 files changed, 87 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a211b4ca/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 0ff8184..bdafeb3 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -30,6 +30,7 @@
 #include <process/pid.hpp>
 #include <process/process.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
@@ -113,6 +114,23 @@ public:
 
 
 /**
+ * Defines the gRPC options for each call.
+ */
+struct CallOptions
+{
+  // Enable the gRPC wait-for-ready semantics by default so the call will be
+  // retried if the connection is not ready. See:
+  // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
+  bool wait_for_ready = true;
+
+  // The timeout of the call. A `DEADLINE_EXCEEDED` status will be returned if
+  // there is no response in the specified amount of time. This is required to
+  // avoid the call from being pending forever.
+  Duration timeout = Seconds(60);
+};
+
+
+/**
  * A copyable interface to manage an internal runtime process for asynchronous
  * gRPC calls. A runtime process keeps a gRPC `CompletionQueue` to manage
  * outstanding requests, a looper thread to wait for any incoming responses from
@@ -142,6 +160,7 @@ public:
    * @param method The asynchronous gRPC call to make. This should be obtained
    *     by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
    * @param request The request protobuf for the gRPC call.
+   * @param options The gRPC options for the call.
    * @return a `Future` of `Try` waiting for a response protobuf or an error.
    */
   template <
@@ -158,7 +177,8 @@ public:
   Future<Try<Response, StatusError>> call(
       const Connection& connection,
       Method&& method,
-      Request&& request)
+      Request&& request,
+      const CallOptions& options)
   {
     // Create a `Promise` that will be set upon receiving a response.
     // TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
@@ -171,7 +191,7 @@ public:
     // TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
     // extra copy. We should capture it by forwarding once we get C++14.
     dispatch(data->pid, &RuntimeProcess::send, std::bind(
-        [connection, method, promise](
+        [connection, method, options, promise](
             const Request& request,
             bool terminating,
             ::grpc::CompletionQueue* queue) {
@@ -185,14 +205,11 @@ public:
           std::shared_ptr<::grpc::ClientContext> context(
               new ::grpc::ClientContext());
 
-          // TODO(chhsiao): Allow the caller to specify a timeout.
-          context->set_deadline(
-              std::chrono::system_clock::now() + std::chrono::seconds(5));
+          context->set_wait_for_ready(options.wait_for_ready);
 
-          // Enable the gRPC wait-for-ready semantics by default. See:
-          // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
-          // TODO(chhsiao): Allow the caller to set the option.
-          context->set_wait_for_ready(true);
+          context->set_deadline(
+              std::chrono::system_clock::now() +
+              std::chrono::nanoseconds(options.timeout.ns()));
 
           promise->future().onDiscard([=] { context->TryCancel(); });
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a211b4ca/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index eb9621b..9677df6 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -48,6 +48,8 @@ using process::Promise;
 
 using process::grpc::StatusError;
 
+using process::grpc::client::CallOptions;
+
 using testing::_;
 using testing::DoAll;
 using testing::InvokeWithoutArgs;
@@ -109,6 +111,15 @@ protected:
   {
     return "unix://" + path::join(sandbox.get(), "socket");
   }
+
+  static CallOptions call_options()
+  {
+    CallOptions options;
+    options.wait_for_ready = true;
+    options.timeout = Milliseconds(100);
+
+    return options;
+  }
 };
 
 
@@ -122,8 +133,11 @@ TEST_F(GRPCClientTest, Success)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_SOME(pong.get());
@@ -169,14 +183,23 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong1 =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong1 = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
-  Future<Try<Pong, StatusError>> pong2 =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong2 = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
-  Future<Try<Pong, StatusError>> pong3 =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong3 = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_READY(processed1->future());
   AWAIT_READY(processed2->future());
@@ -214,8 +237,11 @@ TEST_F(GRPCClientTest, StatusError)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
@@ -240,8 +266,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
   client::Connection connection(server_address());
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   pong.discard();
 
@@ -278,8 +307,11 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_READY(processed->future());
 
@@ -317,8 +349,11 @@ TEST_F(GRPCClientTest, ClientShutdown)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_READY(processed->future());
 
@@ -327,9 +362,6 @@ TEST_F(GRPCClientTest, ClientShutdown)
 
   shutdown->set(Nothing());
 
-  // TODO(chhsiao): The gRPC library returns a failure after the default
-  // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
-  // is shut down. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
   EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
@@ -345,14 +377,15 @@ TEST_F(GRPCClientTest, ServerUnreachable)
   client::Connection connection("nosuchhost");
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
 
-  // TODO(chhsiao): The gRPC library returns a failure after the default timeout
-  // (5 seconds) is passed. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
   EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
@@ -379,11 +412,12 @@ TEST_F(GRPCClientTest, ServerTimeout)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
-  // TODO(chhsiao): The gRPC library returns a failure after the default timeout
-  // (5 seconds) is passed. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
   EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());