You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/08/21 06:43:38 UTC
[mesos] 04/04: Added 'handler' field to batched requests.
This is an automated email from the ASF dual-hosted git repository.
alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit c6c078e94865ad07abb4a12c5478a2bc46fc9eb4
Author: Benno Evers <be...@mesosphere.com>
AuthorDate: Tue Aug 21 08:10:38 2018 +0200
Added 'handler' field to batched requests.
This commit adds a handler field to be able to batch
arbitrary requests, instead of having a hard-coded
dispatch to `state`.
Review: https://reviews.apache.org/r/68296/
---
src/master/http.cpp | 32 +++++++++++++++++++-------------
src/master/master.hpp | 35 ++++++++++++++++++++---------------
2 files changed, 39 insertions(+), 28 deletions(-)
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 48944e2..ae28d52 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2843,27 +2843,31 @@ Future<Response> Master::Http::state(
.then(defer(
master->self(),
[this, request](const Owned<ObjectApprovers>& approvers) {
- return deferStateRequest(request, approvers);
+ return deferBatchedRequest(
+ &Master::ReadOnlyHandler::state,
+ request,
+ approvers);
}));
}
-Future<Response> Master::Http::deferStateRequest(
+Future<Response> Master::Http::deferBatchedRequest(
+ ReadOnlyRequestHandler handler,
const Request& request,
const Owned<ObjectApprovers>& approvers) const
{
- bool scheduleBatch = batchedStateRequests.empty();
+ bool scheduleBatch = batchedRequests.empty();
// Add an element to the batched state requests.
Promise<Response> promise;
Future<Response> future = promise.future();
- batchedStateRequests.push_back(
- BatchedStateRequest{request, approvers, std::move(promise)});
+ batchedRequests.push_back(
+ BatchedRequest{handler, request, approvers, std::move(promise)});
// Schedule processing of batched requests if not yet scheduled.
if (scheduleBatch) {
dispatch(master->self(), [this]() {
- processStateRequestsBatch();
+ processRequestsBatch();
});
}
@@ -3013,9 +3017,9 @@ process::http::Response Master::ReadOnlyHandler::state(
}
-void Master::Http::processStateRequestsBatch() const
+void Master::Http::processRequestsBatch() const
{
- CHECK(!batchedStateRequests.empty())
+ CHECK(!batchedRequests.empty())
<< "Bug in state batching logic: No requests to process";
// Produce the responses in parallel.
@@ -3025,12 +3029,14 @@ void Master::Http::processStateRequestsBatch() const
//
// TODO(alexr): Consider moving `BatchedStateRequest`'s fields into
// `process::async` once it supports moving.
- foreach (BatchedStateRequest& request, batchedStateRequests) {
+ foreach (BatchedRequest& request, batchedRequests) {
request.promise.associate(process::async(
- [this](const process::http::Request& request,
+ [this](ReadOnlyRequestHandler handler,
+ const process::http::Request& request,
const process::Owned<ObjectApprovers>& approvers) {
- return readonlyHandler.state(request, approvers);
+ return (readonlyHandler.*handler)(request, approvers);
},
+ request.handler,
request.request,
request.approvers));
}
@@ -3042,12 +3048,12 @@ void Master::Http::processStateRequestsBatch() const
// NOTE: There is the potential for deadlock since we are blocking 1 working
// thread here, see MESOS-8256.
vector<Future<Response>> responses;
- foreach (const BatchedStateRequest& request, batchedStateRequests) {
+ foreach (const BatchedRequest& request, batchedRequests) {
responses.push_back(request.promise.future());
}
process::await(responses).await();
- batchedStateRequests.clear();
+ batchedRequests.clear();
}
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7faaaae..dc0080b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1578,17 +1578,6 @@ private:
const Option<process::http::authentication::Principal>&
principal) const;
- // A continuation for `state()`. Schedules request processing in a batch
- // of other '/state' requests.
- process::Future<process::http::Response> deferStateRequest(
- const process::http::Request& request,
- const process::Owned<ObjectApprovers>& approvers) const;
-
- // A helper that responds to batched, i.e., accumulated, '/state'
- // requests in parallel, i.e., a continuation for `deferStateRequest()`.
- // See also `BatchedStateRequest`.
- void processStateRequestsBatch() const;
-
process::Future<std::vector<const Task*>> _tasks(
const size_t limit,
const size_t offset,
@@ -1870,16 +1859,32 @@ private:
// out into this separate class.
WeightsHandler weightsHandler;
- // TODO(alexr): Consider adding a `type` or `handler` field to expand
- // batching to other heavy read-only requests, e.g., '/state-summary'.
- struct BatchedStateRequest
+ // Since the Master actor is one of the most loaded in a typical Mesos
+ // installation, we take some extra care to keep the backlog small.
+ // In particular, all read-only requests are batched and executed in
+ // parallel, instead of going through the master queue separately.
+
+ typedef process::http::Response
+ (Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
+ const process::http::Request&,
+ const process::Owned<ObjectApprovers>&) const;
+
+ process::Future<process::http::Response> deferBatchedRequest(
+ ReadOnlyRequestHandler handler,
+ const process::http::Request& request,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ void processRequestsBatch() const;
+
+ struct BatchedRequest
{
+ ReadOnlyRequestHandler handler;
process::http::Request request;
process::Owned<ObjectApprovers> approvers;
process::Promise<process::http::Response> promise;
};
- mutable std::vector<BatchedStateRequest> batchedStateRequests;
+ mutable std::vector<BatchedRequest> batchedRequests;
};
Master(const Master&); // No copying.