You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/06/08 23:37:34 UTC

[2/2] mesos git commit: Refactored the gRPC client runtime wrapper in libprocess.

Refactored the gRPC client runtime wrapper in libprocess.

The refactoring does the following things:
1. Manage the gRPC completion queue and the looper thread in the runtime
   process to get rid of a lock in `Runtime::Data`.
2. Move the computation of sending a request into the runtime process.
3. Let libprocess manage the runtime process automatically instead of
   managing its lifecycle in `Runtime::Data`.

Review: https://reviews.apache.org/r/67157


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/db9b1738
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/db9b1738
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/db9b1738

Branch: refs/heads/master
Commit: db9b17389bf4e8da62a5aa99958c16acd72bdb12
Parents: 4bfe2db
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed May 16 11:24:01 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Fri Jun 8 15:09:20 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 203 +++++++++++++---------
 3rdparty/libprocess/src/grpc.cpp             | 113 ++++++++----
 2 files changed, 196 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/db9b1738/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index cf165f0..0ff8184 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -13,9 +13,9 @@
 #ifndef __PROCESS_GRPC_HPP__
 #define __PROCESS_GRPC_HPP__
 
-#include <atomic>
 #include <chrono>
 #include <memory>
+#include <string>
 #include <thread>
 #include <type_traits>
 #include <utility>
@@ -24,13 +24,15 @@
 
 #include <grpcpp/grpcpp.h>
 
+#include <process/check.hpp>
+#include <process/dispatch.hpp>
 #include <process/future.hpp>
-#include <process/owned.hpp>
+#include <process/pid.hpp>
 #include <process/process.hpp>
 
-#include <stout/duration.hpp>
+#include <stout/error.hpp>
 #include <stout/lambda.hpp>
-#include <stout/synchronized.hpp>
+#include <stout/nothing.hpp>
 #include <stout/try.hpp>
 
 
@@ -111,17 +113,16 @@ public:
 
 
 /**
- * A copyable interface to manage an internal gRPC runtime instance for
- * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
- * `CompletionQueue` to manage outstanding requests, a looper thread to
- * wait for any incoming responses from the `CompletionQueue`, and a
- * process to handle the responses. All `Runtime` copies share the same
- * gRPC runtime instance. Usually we only need a single gRPC runtime
- * instance to handle all gRPC calls, but multiple instances can be
- * instantiated for more parallelism or isolation.
- * NOTE: The destruction of the internal gRPC runtime instance is a
- * blocking operation: it waits for the managed process to terminate.
- * The user should ensure that this only happens at shutdown.
+ * A copyable interface to manage an internal runtime process for asynchronous
+ * gRPC calls. A runtime process keeps a gRPC `CompletionQueue` to manage
+ * outstanding requests, a looper thread to wait for any incoming responses from
+ * the `CompletionQueue`, and handles the requests and responses. All `Runtime`
+ * copies share the same runtime process. Usually we only need a single runtime
+ * process to handle all gRPC calls, but multiple runtime processes can be
+ * instantiated for better parallelism and isolation.
+ *
+ * NOTE: The caller must call `terminate` to drain the `CompletionQueue` before
+ * finalizing libprocess to gracefully terminate the gRPC runtime.
  */
 class Runtime
 {
@@ -138,7 +139,7 @@ public:
    * returned for the call, so the caller can handle the error programmatically.
    *
    * @param connection A connection to a gRPC server.
-   * @param rpc The asynchronous gRPC call to make. This can be obtained
+   * @param method The asynchronous gRPC call to make. This should be obtained
    *     by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
    * @param request The request protobuf for the gRPC call.
    * @return a `Future` of `Try` waiting for a response protobuf or an error.
@@ -148,60 +149,72 @@ public:
       typename Request =
         typename internal::MethodTraits<Method>::request_type,
       typename Response =
-        typename internal::MethodTraits<Method>::response_type>
+        typename internal::MethodTraits<Method>::response_type,
+      typename std::enable_if<
+          std::is_convertible<
+              typename std::decay<Request>::type*,
+              google::protobuf::Message*>::value,
+          int>::type = 0>
   Future<Try<Response, StatusError>> call(
       const Connection& connection,
       Method&& method,
-      const Request& request)
+      Request&& request)
   {
-    static_assert(
-        std::is_convertible<Request*, google::protobuf::Message*>::value,
-        "Request must be a protobuf message");
-
-    synchronized (data->lock) {
-      if (data->terminating) {
-        return Failure("Runtime has been terminated.");
-      }
-
-      std::shared_ptr<::grpc::ClientContext> context(
-          new ::grpc::ClientContext());
-
-      // TODO(chhsiao): Allow the caller to specify a timeout.
-      context->set_deadline(
-          std::chrono::system_clock::now() + std::chrono::seconds(5));
-
-      // Enable the gRPC wait-for-ready semantics by default. See:
-      // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
-      // TODO(chhsiao): Allow the caller to set the option.
-      context->set_wait_for_ready(true);
-
-      // Create a `Promise` and a callback lambda as a tag and invokes
-      // an asynchronous gRPC call through the `CompletionQueue`
-      // managed by `data`. The `Promise` will be set by the callback
-      // upon server response.
-      std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
-          new Promise<Try<Response, StatusError>>);
-
-      promise->future().onDiscard([=] { context->TryCancel(); });
-
-      std::shared_ptr<Response> response(new Response());
-      std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
-
-      std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
-        (typename internal::MethodTraits<Method>::stub_type(
-            connection.channel).*method)(context.get(), request, &data->queue);
-
-      reader->StartCall();
-      reader->Finish(
-          response.get(),
-          status.get(),
-          new lambda::function<void()>(
-              // NOTE: `context` and `reader` need to be held on in
-              // order to get updates for the ongoing RPC, and thus
-              // are captured here. The lambda itself will later be
-              // retrieved and managed in `Data::loop()`.
+    // Create a `Promise` that will be set upon receiving a response.
+    // TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
+    // to be captured by the lambda below. Use a `unique_ptr` once we get C++14.
+    std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
+        new Promise<Try<Response, StatusError>>);
+    Future<Try<Response, StatusError>> future = promise->future();
+
+    // Send the request in the internal runtime process.
+    // TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
+    // extra copy. We should capture it by forwarding once we get C++14.
+    dispatch(data->pid, &RuntimeProcess::send, std::bind(
+        [connection, method, promise](
+            const Request& request,
+            bool terminating,
+            ::grpc::CompletionQueue* queue) {
+          if (terminating) {
+            promise->fail("Runtime has been terminated");
+            return;
+          }
+
+          // TODO(chhsiao): The `shared_ptr`s here aren't shared, but only to be
+          // captured by the lambda below. Use `unique_ptr`s once we get C++14.
+          std::shared_ptr<::grpc::ClientContext> context(
+              new ::grpc::ClientContext());
+
+          // TODO(chhsiao): Allow the caller to specify a timeout.
+          context->set_deadline(
+              std::chrono::system_clock::now() + std::chrono::seconds(5));
+
+          // Enable the gRPC wait-for-ready semantics by default. See:
+          // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
+          // TODO(chhsiao): Allow the caller to set the option.
+          context->set_wait_for_ready(true);
+
+          promise->future().onDiscard([=] { context->TryCancel(); });
+
+          std::shared_ptr<Response> response(new Response());
+          std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
+
+          std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
+            (typename internal::MethodTraits<Method>::stub_type(
+                connection.channel).*method)(context.get(), request, queue);
+
+          reader->StartCall();
+
+          // Create a `ReceiveCallback` as a tag in the `CompletionQueue` for
+          // the current asynchronous gRPC call. The callback will set up the
+          // above `Promise` upon receiving a response.
+          // NOTE: `context` and `reader` need to be held on in order to get
+          // updates for the ongoing RPC, and thus are captured here. The
+          // callback itself will later be retrieved and managed in the
+          // looper thread.
+          void* tag = new ReceiveCallback(
               [context, reader, response, status, promise]() {
-                CHECK(promise->future().isPending());
+                CHECK_PENDING(promise->future());
                 if (promise->future().hasDiscard()) {
                   promise->discard();
                 } else {
@@ -209,44 +222,70 @@ public:
                     ? std::move(*response)
                     : Try<Response, StatusError>::error(std::move(*status)));
                 }
-              }));
+              });
 
-      return promise->future();
-    }
+          reader->Finish(response.get(), status.get(), tag);
+        },
+        std::forward<Request>(request),
+        lambda::_1,
+        lambda::_2));
+
+    return future;
   }
 
   /**
-   * Asks the internal gRPC runtime instance to shut down the
-   * `CompletionQueue`, which would stop its looper thread, drain and
-   * fail all pending gRPC calls in the `CompletionQueue`, then
-   * asynchronously join the looper thread.
+   * Asks the internal runtime process to shut down the `CompletionQueue`, which
+   * would asynchronously drain and fail all pending gRPC calls in the
+   * `CompletionQueue`, then join the looper thread.
    */
   void terminate();
 
   /**
    * @return A `Future` waiting for all pending gRPC calls in the
-   *     `CompletionQueue` of the internal gRPC runtime instance to be
-   *     drained and the looper thread to be joined.
+   *     `CompletionQueue` of the internal runtime process to be drained and the
+   *     looper thread to be joined.
    */
   Future<Nothing> wait();
 
 private:
-  struct Data
+  // Type of the callback functions that can get invoked when sending a request
+  // or receiving a response.
+  typedef lambda::CallableOnce<
+      void(bool, ::grpc::CompletionQueue*)> SendCallback;
+  typedef lambda::CallableOnce<void()> ReceiveCallback;
+
+  class RuntimeProcess : public Process<RuntimeProcess>
   {
-    Data();
-    ~Data();
+  public:
+    RuntimeProcess();
+    virtual ~RuntimeProcess();
 
-    void loop();
+    void send(SendCallback callback);
+    void receive(ReceiveCallback callback);
     void terminate();
+    Future<Nothing> wait();
+
+  private:
+    void initialize() override;
+    void finalize() override;
+
+    void loop();
 
-    std::unique_ptr<std::thread> looper;
     ::grpc::CompletionQueue queue;
-    ProcessBase process;
-    std::atomic_flag lock = ATOMIC_FLAG_INIT;
-    bool terminating = false;
+    std::unique_ptr<std::thread> looper;
+    bool terminating;
     Promise<Nothing> terminated;
   };
 
+  struct Data
+  {
+     Data();
+     ~Data();
+
+     PID<RuntimeProcess> pid;
+     Future<Nothing> terminated;
+  };
+
   std::shared_ptr<Data> data;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/db9b1738/3rdparty/libprocess/src/grpc.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/grpc.cpp b/3rdparty/libprocess/src/grpc.cpp
index a80bcb6..4e4f989 100644
--- a/3rdparty/libprocess/src/grpc.cpp
+++ b/3rdparty/libprocess/src/grpc.cpp
@@ -10,85 +10,122 @@
 // See the License for the specific language governing permissions and
 // limitations under the License
 
-#include <process/dispatch.hpp>
 #include <process/grpc.hpp>
+
+#include <process/dispatch.hpp>
 #include <process/id.hpp>
+#include <process/process.hpp>
 
 namespace process {
 namespace grpc {
-
 namespace client {
 
 void Runtime::terminate()
 {
-  data->terminate();
+  dispatch(data->pid, &RuntimeProcess::terminate);
 }
 
 
 Future<Nothing> Runtime::wait()
 {
-  return data->terminated.future();
+  return data->terminated;
 }
 
 
-Runtime::Data::Data()
-  : process(ID::generate("__grpc_client__"))
+Runtime::RuntimeProcess::RuntimeProcess()
+  : ProcessBase(ID::generate("__grpc_client__")), terminating(false) {}
+
+
+Runtime::RuntimeProcess::~RuntimeProcess()
+{
+  CHECK(!looper);
+}
+
+
+void Runtime::RuntimeProcess::send(SendCallback callback)
+{
+  std::move(callback)(terminating, &queue);
+}
+
+
+void Runtime::RuntimeProcess::receive(ReceiveCallback callback)
+{
+  std::move(callback)();
+}
+
+
+void Runtime::RuntimeProcess::terminate()
 {
-  spawn(process);
+  if (!terminating) {
+    terminating = true;
+    queue.Shutdown();
+  }
+}
 
+
+Future<Nothing> Runtime::RuntimeProcess::wait()
+{
+  return terminated.future();
+}
+
+
+void Runtime::RuntimeProcess::initialize()
+{
   // The looper thread can only be created here since it need to happen
   // after `queue` is initialized.
-  looper.reset(new std::thread(&Runtime::Data::loop, this));
+  CHECK(!looper);
+  looper.reset(new std::thread(&RuntimeProcess::loop, this));
 }
 
 
-Runtime::Data::~Data()
+void Runtime::RuntimeProcess::finalize()
 {
-  terminate();
-  process::wait(process);
+  CHECK(terminating) << "Runtime has not yet been terminated";
+
+  // NOTE: This is a blocking call. However, the thread is guaranteed
+  // to be exiting, therefore the amount of blocking time should be
+  // short (just like other syscalls we invoke).
+  looper->join();
+  looper.reset();
+  terminated.set(Nothing());
 }
 
 
-void Runtime::Data::loop()
+void Runtime::RuntimeProcess::loop()
 {
   void* tag;
   bool ok;
 
   while (queue.Next(&tag, &ok)) {
-    // The returned callback object is managed by the `callback` shared
-    // pointer, so if we get a regular event from the `CompletionQueue`,
-    // then the object would be captured by the following lambda
-    // dispatched to `process`; otherwise it would be reclaimed here.
-    std::shared_ptr<lambda::function<void()>> callback(
-        reinterpret_cast<lambda::function<void()>*>(tag));
-    if (ok) {
-      dispatch(process, [=] { (*callback)(); });
-    }
+    // Currently only unary RPCs are supported, so `ok` should always be true.
+    // See: https://grpc.io/grpc/cpp/classgrpc_1_1_completion_queue.html#a86d9810ced694e50f7987ac90b9f8c1a // NOLINT
+    CHECK(ok);
+
+    // Obtain the tag as a `ReceiveCallback` and dispatch it to the runtime
+    // process. The tag is then reclaimed here.
+    ReceiveCallback* callback = reinterpret_cast<ReceiveCallback*>(tag);
+    dispatch(self(), &RuntimeProcess::receive, std::move(*callback));
+    delete callback;
   }
 
-  dispatch(process, [this] {
-    // NOTE: This is a blocking call. However, the thread is guaranteed
-    // to be exiting, therefore the amount of blocking time should be
-    // short (just like other syscalls we invoke).
-    looper->join();
-    // Terminate `process` after all events are drained.
-    process::terminate(process, false);
-    terminated.set(Nothing());
-  });
+  // Terminate self after all events are drained.
+  process::terminate(self(), false);
 }
 
 
-void Runtime::Data::terminate()
+Runtime::Data::Data()
 {
-  synchronized (lock) {
-    if (!terminating) {
-      terminating = true;
-      queue.Shutdown();
-    }
-  }
+  RuntimeProcess* process = new RuntimeProcess();
+  terminated = process->wait();
+  pid = spawn(process, true);
 }
 
-} // namespace client {
 
+Runtime::Data::~Data()
+{
+  dispatch(pid, &RuntimeProcess::terminate);
+}
+
+} // namespace client {
 } // namespace grpc {
 } // namespace process {