You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/07/17 20:57:50 UTC

[1/2] mesos git commit: Passed the default options when making gRPC calls.

Repository: mesos
Updated Branches:
  refs/heads/master abf11a951 -> 841f652b2


Passed the default options when making gRPC calls.

Review: https://reviews.apache.org/r/67947


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/841f652b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/841f652b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/841f652b

Branch: refs/heads/master
Commit: 841f652b2db90156c5cf1a9389af7b4ea862694b
Parents: a211b4c
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Jul 17 13:15:40 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue Jul 17 13:50:59 2018 -0700

----------------------------------------------------------------------
 src/csi/client.cpp | 53 +++++++++++++++++++++++++++++++++----------------
 1 file changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/841f652b/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index a36e622..61ed410 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -23,6 +23,8 @@ using process::Future;
 
 using process::grpc::StatusError;
 
+using process::grpc::client::CallOptions;
+
 namespace mesos {
 namespace csi {
 namespace v0 {
@@ -36,7 +38,8 @@ Client::call<GET_PLUGIN_INFO>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Identity, GetPluginInfo),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<GetPluginInfoResponse, StatusError>& result)
         -> Future<GetPluginInfoResponse> {
       return result;
@@ -53,7 +56,8 @@ Client::call<GET_PLUGIN_CAPABILITIES>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result)
         -> Future<GetPluginCapabilitiesResponse> {
       return result;
@@ -70,7 +74,8 @@ Client::call<PROBE>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Identity, Probe),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<ProbeResponse, StatusError>& result)
         -> Future<ProbeResponse> {
       return result;
@@ -87,7 +92,8 @@ Client::call<CREATE_VOLUME>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Controller, CreateVolume),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<CreateVolumeResponse, StatusError>& result)
         -> Future<CreateVolumeResponse> {
       return result;
@@ -104,7 +110,8 @@ Client::call<DELETE_VOLUME>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Controller, DeleteVolume),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<DeleteVolumeResponse, StatusError>& result)
         -> Future<DeleteVolumeResponse> {
       return result;
@@ -121,7 +128,8 @@ Client::call<CONTROLLER_PUBLISH_VOLUME>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result)
         -> Future<ControllerPublishVolumeResponse> {
       return result;
@@ -138,7 +146,8 @@ Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result)
         -> Future<ControllerUnpublishVolumeResponse> {
       return result;
@@ -155,7 +164,8 @@ Client::call<VALIDATE_VOLUME_CAPABILITIES>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result)
         -> Future<ValidateVolumeCapabilitiesResponse> {
       return result;
@@ -172,7 +182,8 @@ Client::call<LIST_VOLUMES>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Controller, ListVolumes),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<ListVolumesResponse, StatusError>& result)
         -> Future<ListVolumesResponse> {
       return result;
@@ -189,7 +200,8 @@ Client::call<GET_CAPACITY>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Controller, GetCapacity),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<GetCapacityResponse, StatusError>& result)
         -> Future<GetCapacityResponse> {
       return result;
@@ -206,7 +218,8 @@ Client::call<CONTROLLER_GET_CAPABILITIES>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result)
         -> Future<ControllerGetCapabilitiesResponse> {
       return result;
@@ -223,7 +236,8 @@ Client::call<NODE_STAGE_VOLUME>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Node, NodeStageVolume),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<NodeStageVolumeResponse, StatusError>& result)
         -> Future<NodeStageVolumeResponse> {
       return result;
@@ -240,7 +254,8 @@ Client::call<NODE_UNSTAGE_VOLUME>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Node, NodeUnstageVolume),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result)
         -> Future<NodeUnstageVolumeResponse> {
       return result;
@@ -257,7 +272,8 @@ Client::call<NODE_PUBLISH_VOLUME>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Node, NodePublishVolume),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<NodePublishVolumeResponse, StatusError>& result)
         -> Future<NodePublishVolumeResponse> {
       return result;
@@ -274,7 +290,8 @@ Client::call<NODE_UNPUBLISH_VOLUME>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result)
         -> Future<NodeUnpublishVolumeResponse> {
       return result;
@@ -291,7 +308,8 @@ Client::call<NODE_GET_ID>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Node, NodeGetId),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<NodeGetIdResponse, StatusError>& result)
         -> Future<NodeGetIdResponse> {
       return result;
@@ -308,7 +326,8 @@ Client::call<NODE_GET_CAPABILITIES>(
     .call(
         connection,
         GRPC_CLIENT_METHOD(Node, NodeGetCapabilities),
-        std::move(request))
+        std::move(request),
+        CallOptions())
     .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result)
         -> Future<NodeGetCapabilitiesResponse> {
       return result;


[2/2] mesos git commit: Made gRPC call options configurable.

Posted by ch...@apache.org.
Made gRPC call options configurable.

Review: https://reviews.apache.org/r/67938


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a211b4ca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a211b4ca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a211b4ca

Branch: refs/heads/master
Commit: a211b4cadf289168464fc50987255d883c226e89
Parents: abf11a9
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Jul 16 18:18:11 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue Jul 17 13:50:59 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 35 ++++++---
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 88 ++++++++++++++++-------
 2 files changed, 87 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a211b4ca/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index 0ff8184..bdafeb3 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -30,6 +30,7 @@
 #include <process/pid.hpp>
 #include <process/process.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
@@ -113,6 +114,23 @@ public:
 
 
 /**
+ * Defines the gRPC options for each call.
+ */
+struct CallOptions
+{
+  // Enable the gRPC wait-for-ready semantics by default so the call will be
+  // retried if the connection is not ready. See:
+  // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
+  bool wait_for_ready = true;
+
+  // The timeout of the call. A `DEADLINE_EXCEEDED` status will be returned if
+  // there is no response in the specified amount of time. This is required to
+  // avoid the call from being pending forever.
+  Duration timeout = Seconds(60);
+};
+
+
+/**
  * A copyable interface to manage an internal runtime process for asynchronous
  * gRPC calls. A runtime process keeps a gRPC `CompletionQueue` to manage
  * outstanding requests, a looper thread to wait for any incoming responses from
@@ -142,6 +160,7 @@ public:
    * @param method The asynchronous gRPC call to make. This should be obtained
    *     by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
    * @param request The request protobuf for the gRPC call.
+   * @param options The gRPC options for the call.
    * @return a `Future` of `Try` waiting for a response protobuf or an error.
    */
   template <
@@ -158,7 +177,8 @@ public:
   Future<Try<Response, StatusError>> call(
       const Connection& connection,
       Method&& method,
-      Request&& request)
+      Request&& request,
+      const CallOptions& options)
   {
     // Create a `Promise` that will be set upon receiving a response.
     // TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
@@ -171,7 +191,7 @@ public:
     // TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
     // extra copy. We should capture it by forwarding once we get C++14.
     dispatch(data->pid, &RuntimeProcess::send, std::bind(
-        [connection, method, promise](
+        [connection, method, options, promise](
             const Request& request,
             bool terminating,
             ::grpc::CompletionQueue* queue) {
@@ -185,14 +205,11 @@ public:
           std::shared_ptr<::grpc::ClientContext> context(
               new ::grpc::ClientContext());
 
-          // TODO(chhsiao): Allow the caller to specify a timeout.
-          context->set_deadline(
-              std::chrono::system_clock::now() + std::chrono::seconds(5));
+          context->set_wait_for_ready(options.wait_for_ready);
 
-          // Enable the gRPC wait-for-ready semantics by default. See:
-          // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
-          // TODO(chhsiao): Allow the caller to set the option.
-          context->set_wait_for_ready(true);
+          context->set_deadline(
+              std::chrono::system_clock::now() +
+              std::chrono::nanoseconds(options.timeout.ns()));
 
           promise->future().onDiscard([=] { context->TryCancel(); });
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a211b4ca/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index eb9621b..9677df6 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -48,6 +48,8 @@ using process::Promise;
 
 using process::grpc::StatusError;
 
+using process::grpc::client::CallOptions;
+
 using testing::_;
 using testing::DoAll;
 using testing::InvokeWithoutArgs;
@@ -109,6 +111,15 @@ protected:
   {
     return "unix://" + path::join(sandbox.get(), "socket");
   }
+
+  static CallOptions call_options()
+  {
+    CallOptions options;
+    options.wait_for_ready = true;
+    options.timeout = Milliseconds(100);
+
+    return options;
+  }
 };
 
 
@@ -122,8 +133,11 @@ TEST_F(GRPCClientTest, Success)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_SOME(pong.get());
@@ -169,14 +183,23 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong1 =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong1 = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
-  Future<Try<Pong, StatusError>> pong2 =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong2 = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
-  Future<Try<Pong, StatusError>> pong3 =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong3 = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_READY(processed1->future());
   AWAIT_READY(processed2->future());
@@ -214,8 +237,11 @@ TEST_F(GRPCClientTest, StatusError)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
@@ -240,8 +266,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted)
   client::Connection connection(server_address());
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   pong.discard();
 
@@ -278,8 +307,11 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_READY(processed->future());
 
@@ -317,8 +349,11 @@ TEST_F(GRPCClientTest, ClientShutdown)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   AWAIT_READY(processed->future());
 
@@ -327,9 +362,6 @@ TEST_F(GRPCClientTest, ClientShutdown)
 
   shutdown->set(Nothing());
 
-  // TODO(chhsiao): The gRPC library returns a failure after the default
-  // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
-  // is shut down. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
   EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
@@ -345,14 +377,15 @@ TEST_F(GRPCClientTest, ServerUnreachable)
   client::Connection connection("nosuchhost");
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection,
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
 
-  // TODO(chhsiao): The gRPC library returns a failure after the default timeout
-  // (5 seconds) is passed. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
   EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
@@ -379,11 +412,12 @@ TEST_F(GRPCClientTest, ServerTimeout)
 
   client::Runtime runtime;
 
-  Future<Try<Pong, StatusError>> pong =
-    runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping());
+  Future<Try<Pong, StatusError>> pong = runtime.call(
+      connection.get(),
+      GRPC_CLIENT_METHOD(PingPong, Send),
+      Ping(),
+      call_options());
 
-  // TODO(chhsiao): The gRPC library returns a failure after the default timeout
-  // (5 seconds) is passed. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
   EXPECT_ERROR(pong.get());
   EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());