You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/06/08 23:37:34 UTC
[2/2] mesos git commit: Refactored the gRPC client runtime wrapper in
libprocess.
Refactored the gRPC client runtime wrapper in libprocess.
The refactoring does the following things:
1. Manage the gRPC completion queue and the looper thread in the runtime
process to get rid of a lock in `Runtime::Data`.
2. Move the computation of sending a request into the runtime process.
3. Let libprocess manage the runtime process automatically instead of
managing its lifecycle in `Runtime::Data`.
Review: https://reviews.apache.org/r/67157
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/db9b1738
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/db9b1738
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/db9b1738
Branch: refs/heads/master
Commit: db9b17389bf4e8da62a5aa99958c16acd72bdb12
Parents: 4bfe2db
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed May 16 11:24:01 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Fri Jun 8 15:09:20 2018 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/grpc.hpp | 203 +++++++++++++---------
3rdparty/libprocess/src/grpc.cpp | 113 ++++++++----
2 files changed, 196 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/db9b1738/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index cf165f0..0ff8184 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -13,9 +13,9 @@
#ifndef __PROCESS_GRPC_HPP__
#define __PROCESS_GRPC_HPP__
-#include <atomic>
#include <chrono>
#include <memory>
+#include <string>
#include <thread>
#include <type_traits>
#include <utility>
@@ -24,13 +24,15 @@
#include <grpcpp/grpcpp.h>
+#include <process/check.hpp>
+#include <process/dispatch.hpp>
#include <process/future.hpp>
-#include <process/owned.hpp>
+#include <process/pid.hpp>
#include <process/process.hpp>
-#include <stout/duration.hpp>
+#include <stout/error.hpp>
#include <stout/lambda.hpp>
-#include <stout/synchronized.hpp>
+#include <stout/nothing.hpp>
#include <stout/try.hpp>
@@ -111,17 +113,16 @@ public:
/**
- * A copyable interface to manage an internal gRPC runtime instance for
- * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC
- * `CompletionQueue` to manage outstanding requests, a looper thread to
- * wait for any incoming responses from the `CompletionQueue`, and a
- * process to handle the responses. All `Runtime` copies share the same
- * gRPC runtime instance. Usually we only need a single gRPC runtime
- * instance to handle all gRPC calls, but multiple instances can be
- * instantiated for more parallelism or isolation.
- * NOTE: The destruction of the internal gRPC runtime instance is a
- * blocking operation: it waits for the managed process to terminate.
- * The user should ensure that this only happens at shutdown.
+ * A copyable interface to manage an internal runtime process for asynchronous
+ * gRPC calls. A runtime process keeps a gRPC `CompletionQueue` to manage
+ * outstanding requests, a looper thread to wait for any incoming responses from
+ * the `CompletionQueue`, and handles the requests and responses. All `Runtime`
+ * copies share the same runtime process. Usually we only need a single runtime
+ * process to handle all gRPC calls, but multiple runtime processes can be
+ * instantiated for better parallelism and isolation.
+ *
+ * NOTE: The caller must call `terminate` to drain the `CompletionQueue` before
+ * finalizing libprocess to gracefully terminate the gRPC runtime.
*/
class Runtime
{
@@ -138,7 +139,7 @@ public:
* returned for the call, so the caller can handle the error programmatically.
*
* @param connection A connection to a gRPC server.
- * @param rpc The asynchronous gRPC call to make. This can be obtained
+ * @param method The asynchronous gRPC call to make. This should be obtained
* by the `GRPC_CLIENT_METHOD(service, rpc)` macro.
* @param request The request protobuf for the gRPC call.
* @return a `Future` of `Try` waiting for a response protobuf or an error.
@@ -148,60 +149,72 @@ public:
typename Request =
typename internal::MethodTraits<Method>::request_type,
typename Response =
- typename internal::MethodTraits<Method>::response_type>
+ typename internal::MethodTraits<Method>::response_type,
+ typename std::enable_if<
+ std::is_convertible<
+ typename std::decay<Request>::type*,
+ google::protobuf::Message*>::value,
+ int>::type = 0>
Future<Try<Response, StatusError>> call(
const Connection& connection,
Method&& method,
- const Request& request)
+ Request&& request)
{
- static_assert(
- std::is_convertible<Request*, google::protobuf::Message*>::value,
- "Request must be a protobuf message");
-
- synchronized (data->lock) {
- if (data->terminating) {
- return Failure("Runtime has been terminated.");
- }
-
- std::shared_ptr<::grpc::ClientContext> context(
- new ::grpc::ClientContext());
-
- // TODO(chhsiao): Allow the caller to specify a timeout.
- context->set_deadline(
- std::chrono::system_clock::now() + std::chrono::seconds(5));
-
- // Enable the gRPC wait-for-ready semantics by default. See:
- // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
- // TODO(chhsiao): Allow the caller to set the option.
- context->set_wait_for_ready(true);
-
- // Create a `Promise` and a callback lambda as a tag and invokes
- // an asynchronous gRPC call through the `CompletionQueue`
- // managed by `data`. The `Promise` will be set by the callback
- // upon server response.
- std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
- new Promise<Try<Response, StatusError>>);
-
- promise->future().onDiscard([=] { context->TryCancel(); });
-
- std::shared_ptr<Response> response(new Response());
- std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
-
- std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
- (typename internal::MethodTraits<Method>::stub_type(
- connection.channel).*method)(context.get(), request, &data->queue);
-
- reader->StartCall();
- reader->Finish(
- response.get(),
- status.get(),
- new lambda::function<void()>(
- // NOTE: `context` and `reader` need to be held on in
- // order to get updates for the ongoing RPC, and thus
- // are captured here. The lambda itself will later be
- // retrieved and managed in `Data::loop()`.
+ // Create a `Promise` that will be set upon receiving a response.
+ // TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
+ // to be captured by the lambda below. Use a `unique_ptr` once we get C++14.
+ std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
+ new Promise<Try<Response, StatusError>>);
+ Future<Try<Response, StatusError>> future = promise->future();
+
+ // Send the request in the internal runtime process.
+ // TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
+ // extra copy. We should capture it by forwarding once we get C++14.
+ dispatch(data->pid, &RuntimeProcess::send, std::bind(
+ [connection, method, promise](
+ const Request& request,
+ bool terminating,
+ ::grpc::CompletionQueue* queue) {
+ if (terminating) {
+ promise->fail("Runtime has been terminated");
+ return;
+ }
+
+ // TODO(chhsiao): The `shared_ptr`s here aren't shared, but only to be
+ // captured by the lambda below. Use `unique_ptr`s once we get C++14.
+ std::shared_ptr<::grpc::ClientContext> context(
+ new ::grpc::ClientContext());
+
+ // TODO(chhsiao): Allow the caller to specify a timeout.
+ context->set_deadline(
+ std::chrono::system_clock::now() + std::chrono::seconds(5));
+
+ // Enable the gRPC wait-for-ready semantics by default. See:
+ // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
+ // TODO(chhsiao): Allow the caller to set the option.
+ context->set_wait_for_ready(true);
+
+ promise->future().onDiscard([=] { context->TryCancel(); });
+
+ std::shared_ptr<Response> response(new Response());
+ std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
+
+ std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
+ (typename internal::MethodTraits<Method>::stub_type(
+ connection.channel).*method)(context.get(), request, queue);
+
+ reader->StartCall();
+
+ // Create a `ReceiveCallback` as a tag in the `CompletionQueue` for
+ // the current asynchronous gRPC call. The callback will set up the
+ // above `Promise` upon receiving a response.
+ // NOTE: `context` and `reader` need to be held on in order to get
+ // updates for the ongoing RPC, and thus are captured here. The
+ // callback itself will later be retrieved and managed in the
+ // looper thread.
+ void* tag = new ReceiveCallback(
[context, reader, response, status, promise]() {
- CHECK(promise->future().isPending());
+ CHECK_PENDING(promise->future());
if (promise->future().hasDiscard()) {
promise->discard();
} else {
@@ -209,44 +222,70 @@ public:
? std::move(*response)
: Try<Response, StatusError>::error(std::move(*status)));
}
- }));
+ });
- return promise->future();
- }
+ reader->Finish(response.get(), status.get(), tag);
+ },
+ std::forward<Request>(request),
+ lambda::_1,
+ lambda::_2));
+
+ return future;
}
/**
- * Asks the internal gRPC runtime instance to shut down the
- * `CompletionQueue`, which would stop its looper thread, drain and
- * fail all pending gRPC calls in the `CompletionQueue`, then
- * asynchronously join the looper thread.
+ * Asks the internal runtime process to shut down the `CompletionQueue`, which
+ * would asynchronously drain and fail all pending gRPC calls in the
+ * `CompletionQueue`, then join the looper thread.
*/
void terminate();
/**
* @return A `Future` waiting for all pending gRPC calls in the
- * `CompletionQueue` of the internal gRPC runtime instance to be
- * drained and the looper thread to be joined.
+ * `CompletionQueue` of the internal runtime process to be drained and the
+ * looper thread to be joined.
*/
Future<Nothing> wait();
private:
- struct Data
+ // Type of the callback functions that can get invoked when sending a request
+ // or receiving a response.
+ typedef lambda::CallableOnce<
+ void(bool, ::grpc::CompletionQueue*)> SendCallback;
+ typedef lambda::CallableOnce<void()> ReceiveCallback;
+
+ class RuntimeProcess : public Process<RuntimeProcess>
{
- Data();
- ~Data();
+ public:
+ RuntimeProcess();
+ virtual ~RuntimeProcess();
- void loop();
+ void send(SendCallback callback);
+ void receive(ReceiveCallback callback);
void terminate();
+ Future<Nothing> wait();
+
+ private:
+ void initialize() override;
+ void finalize() override;
+
+ void loop();
- std::unique_ptr<std::thread> looper;
::grpc::CompletionQueue queue;
- ProcessBase process;
- std::atomic_flag lock = ATOMIC_FLAG_INIT;
- bool terminating = false;
+ std::unique_ptr<std::thread> looper;
+ bool terminating;
Promise<Nothing> terminated;
};
+ struct Data
+ {
+ Data();
+ ~Data();
+
+ PID<RuntimeProcess> pid;
+ Future<Nothing> terminated;
+ };
+
std::shared_ptr<Data> data;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/db9b1738/3rdparty/libprocess/src/grpc.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/grpc.cpp b/3rdparty/libprocess/src/grpc.cpp
index a80bcb6..4e4f989 100644
--- a/3rdparty/libprocess/src/grpc.cpp
+++ b/3rdparty/libprocess/src/grpc.cpp
@@ -10,85 +10,122 @@
// See the License for the specific language governing permissions and
// limitations under the License
-#include <process/dispatch.hpp>
#include <process/grpc.hpp>
+
+#include <process/dispatch.hpp>
#include <process/id.hpp>
+#include <process/process.hpp>
namespace process {
namespace grpc {
-
namespace client {
void Runtime::terminate()
{
- data->terminate();
+ dispatch(data->pid, &RuntimeProcess::terminate);
}
Future<Nothing> Runtime::wait()
{
- return data->terminated.future();
+ return data->terminated;
}
-Runtime::Data::Data()
- : process(ID::generate("__grpc_client__"))
+Runtime::RuntimeProcess::RuntimeProcess()
+ : ProcessBase(ID::generate("__grpc_client__")), terminating(false) {}
+
+
+Runtime::RuntimeProcess::~RuntimeProcess()
+{
+ CHECK(!looper);
+}
+
+
+void Runtime::RuntimeProcess::send(SendCallback callback)
+{
+ std::move(callback)(terminating, &queue);
+}
+
+
+void Runtime::RuntimeProcess::receive(ReceiveCallback callback)
+{
+ std::move(callback)();
+}
+
+
+void Runtime::RuntimeProcess::terminate()
{
- spawn(process);
+ if (!terminating) {
+ terminating = true;
+ queue.Shutdown();
+ }
+}
+
+Future<Nothing> Runtime::RuntimeProcess::wait()
+{
+ return terminated.future();
+}
+
+
+void Runtime::RuntimeProcess::initialize()
+{
// The looper thread can only be created here since it need to happen
// after `queue` is initialized.
- looper.reset(new std::thread(&Runtime::Data::loop, this));
+ CHECK(!looper);
+ looper.reset(new std::thread(&RuntimeProcess::loop, this));
}
-Runtime::Data::~Data()
+void Runtime::RuntimeProcess::finalize()
{
- terminate();
- process::wait(process);
+ CHECK(terminating) << "Runtime has not yet been terminated";
+
+ // NOTE: This is a blocking call. However, the thread is guaranteed
+ // to be exiting, therefore the amount of blocking time should be
+ // short (just like other syscalls we invoke).
+ looper->join();
+ looper.reset();
+ terminated.set(Nothing());
}
-void Runtime::Data::loop()
+void Runtime::RuntimeProcess::loop()
{
void* tag;
bool ok;
while (queue.Next(&tag, &ok)) {
- // The returned callback object is managed by the `callback` shared
- // pointer, so if we get a regular event from the `CompletionQueue`,
- // then the object would be captured by the following lambda
- // dispatched to `process`; otherwise it would be reclaimed here.
- std::shared_ptr<lambda::function<void()>> callback(
- reinterpret_cast<lambda::function<void()>*>(tag));
- if (ok) {
- dispatch(process, [=] { (*callback)(); });
- }
+ // Currently only unary RPCs are supported, so `ok` should always be true.
+ // See: https://grpc.io/grpc/cpp/classgrpc_1_1_completion_queue.html#a86d9810ced694e50f7987ac90b9f8c1a // NOLINT
+ CHECK(ok);
+
+ // Obtain the tag as a `ReceiveCallback` and dispatch it to the runtime
+ // process. The tag is then reclaimed here.
+ ReceiveCallback* callback = reinterpret_cast<ReceiveCallback*>(tag);
+ dispatch(self(), &RuntimeProcess::receive, std::move(*callback));
+ delete callback;
}
- dispatch(process, [this] {
- // NOTE: This is a blocking call. However, the thread is guaranteed
- // to be exiting, therefore the amount of blocking time should be
- // short (just like other syscalls we invoke).
- looper->join();
- // Terminate `process` after all events are drained.
- process::terminate(process, false);
- terminated.set(Nothing());
- });
+ // Terminate self after all events are drained.
+ process::terminate(self(), false);
}
-void Runtime::Data::terminate()
+Runtime::Data::Data()
{
- synchronized (lock) {
- if (!terminating) {
- terminating = true;
- queue.Shutdown();
- }
- }
+ RuntimeProcess* process = new RuntimeProcess();
+ terminated = process->wait();
+ pid = spawn(process, true);
}
-} // namespace client {
+Runtime::Data::~Data()
+{
+ dispatch(pid, &RuntimeProcess::terminate);
+}
+
+} // namespace client {
} // namespace grpc {
} // namespace process {