You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/08/11 18:22:59 UTC
[mesos] branch master updated: Batch '/state' requests on Master.
This is an automated email from the ASF dual-hosted git repository.
alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 2f4d9ae Batch '/state' requests on Master.
2f4d9ae is described below
commit 2f4d9ae0335b988f8a0bdab319ebd02f34519f3b
Author: Alexander Rukletsov <al...@apache.org>
AuthorDate: Thu Jul 19 12:56:46 2018 +0200
Batch '/state' requests on Master.
With this patch handlers for '/state' requests are not scheduled
directly after authorization, but are accumulated and then scheduled
for later parallel processing.
This approach allows, if there are N '/state' requests in the Master's
mailbox and T is the request response time, to block the Master actor
only once for time O(T) instead of blocking it for time N*T prior to
this patch.
This batching technique reduces both the time Master is spending
answering '/state' requests and the average request responce time
in presence of multiple requests in the Master's mailbox. However,
for seldom '/state' requests that don't accumulate in the Master's
mailbox, the response time might increase due to an added trip
through the mailbox.
The change preserves the read-your-writes consistency model.
Review: https://reviews.apache.org/r/68132
---
src/master/http.cpp | 301 +++++++++++++++++++++++++++++++-------------------
src/master/master.hpp | 27 ++++-
2 files changed, 212 insertions(+), 116 deletions(-)
diff --git a/src/master/http.cpp b/src/master/http.cpp
index d43fbd6..e2773ed 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -36,8 +36,10 @@
#include <mesos/v1/master/master.hpp>
+#include <process/async.hpp>
#include <process/collect.hpp>
#include <process/defer.hpp>
+#include <process/future.hpp>
#include <process/help.hpp>
#include <process/logging.hpp>
@@ -96,6 +98,7 @@ using process::Failure;
using process::Future;
using process::HELP;
using process::Logging;
+using process::Promise;
using process::TLDR;
using process::http::Accepted;
@@ -2813,7 +2816,7 @@ string Master::Http::STATE_HELP()
Future<Response> Master::Http::state(
const Request& request,
- const Option<Principal>& principal) const
+ const Option<Principal>& principal)
{
// TODO(greggomann): Remove this check once the `Principal` type is used in
// `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
@@ -2829,150 +2832,218 @@ Future<Response> Master::Http::state(
return redirect(request);
}
+ // TODO(alexr): De-duplicate response processing when the principal is
+ // identical, e.g., if "bob" asks for state three times in one batch,
+ // ideally we only compute the response for "bob" once since they're all
+ // identical within a principal.
return ObjectApprovers::create(
master->authorizer,
principal,
{VIEW_ROLE, VIEW_FRAMEWORK, VIEW_TASK, VIEW_EXECUTOR, VIEW_FLAGS})
.then(defer(
master->self(),
- [this, request](const Owned<ObjectApprovers>& approvers) -> Response {
- // This lambda is consumed before the outer lambda
- // returns, hence capture by reference is fine here.
- auto state = [this, &approvers](JSON::ObjectWriter* writer) {
- writer->field("version", MESOS_VERSION);
+ [this, request](const Owned<ObjectApprovers>& approvers) {
+ return deferStateRequest(request, approvers);
+ }));
+}
- if (build::GIT_SHA.isSome()) {
- writer->field("git_sha", build::GIT_SHA.get());
- }
- if (build::GIT_BRANCH.isSome()) {
- writer->field("git_branch", build::GIT_BRANCH.get());
- }
+Future<Response> Master::Http::deferStateRequest(
+ const Request& request,
+ const Owned<ObjectApprovers>& approvers)
+{
+ bool scheduleBatch = batchedStateRequests.empty();
+
+ // Add an element to the batched state requests.
+ Promise<Response> promise;
+ Future<Response> future = promise.future();
+ batchedStateRequests.push_back(
+ BatchedStateRequest{request, approvers, std::move(promise)});
+
+ // Schedule processing of batched requests if not yet scheduled.
+ if (scheduleBatch) {
+ dispatch(master->self(), [this]() {
+ processStateRequestsBatch();
+ });
+ }
- if (build::GIT_TAG.isSome()) {
- writer->field("git_tag", build::GIT_TAG.get());
- }
+ return future;
+}
- writer->field("build_date", build::DATE);
- writer->field("build_time", build::TIME);
- writer->field("build_user", build::USER);
- writer->field("start_time", master->startTime.secs());
- if (master->electedTime.isSome()) {
- writer->field("elected_time", master->electedTime->secs());
- }
+void Master::Http::processStateRequestsBatch()
+{
+ CHECK(!batchedStateRequests.empty())
+ << "Bug in state batching logic: No requests to process";
+
+ // This lambda is consumed before the enclosed function returns,
+ // hence capturing `this` is fine here.
+ auto produceResponse = [this](
+ const Request& request,
+ const Owned<ObjectApprovers>& approvers) -> Response {
+ // This lambda is consumed before the outer lambda returns,
+ // hence capturing a reference is fine here.
+ auto calculateState = [this, &approvers](JSON::ObjectWriter* writer) {
+ writer->field("version", MESOS_VERSION);
+
+ if (build::GIT_SHA.isSome()) {
+ writer->field("git_sha", build::GIT_SHA.get());
+ }
- writer->field("id", master->info().id());
- writer->field("pid", string(master->self()));
- writer->field("hostname", master->info().hostname());
- writer->field("capabilities", master->info().capabilities());
- writer->field("activated_slaves", master->_slaves_active());
- writer->field("deactivated_slaves", master->_slaves_inactive());
- writer->field("unreachable_slaves", master->_slaves_unreachable());
+ if (build::GIT_BRANCH.isSome()) {
+ writer->field("git_branch", build::GIT_BRANCH.get());
+ }
- if (master->info().has_domain()) {
- writer->field("domain", master->info().domain());
- }
+ if (build::GIT_TAG.isSome()) {
+ writer->field("git_tag", build::GIT_TAG.get());
+ }
- // TODO(haosdent): Deprecated this in favor of `leader_info` below.
- if (master->leader.isSome()) {
- writer->field("leader", master->leader->pid());
- }
+ writer->field("build_date", build::DATE);
+ writer->field("build_time", build::TIME);
+ writer->field("build_user", build::USER);
+ writer->field("start_time", master->startTime.secs());
- if (master->leader.isSome()) {
- writer->field("leader_info", [this](JSON::ObjectWriter* writer) {
- json(writer, master->leader.get());
- });
- }
+ if (master->electedTime.isSome()) {
+ writer->field("elected_time", master->electedTime->secs());
+ }
- if (approvers->approved<VIEW_FLAGS>()) {
- if (master->flags.cluster.isSome()) {
- writer->field("cluster", master->flags.cluster.get());
- }
+ writer->field("id", master->info().id());
+ writer->field("pid", string(master->self()));
+ writer->field("hostname", master->info().hostname());
+ writer->field("capabilities", master->info().capabilities());
+ writer->field("activated_slaves", master->_slaves_active());
+ writer->field("deactivated_slaves", master->_slaves_inactive());
+ writer->field("unreachable_slaves", master->_slaves_unreachable());
- if (master->flags.log_dir.isSome()) {
- writer->field("log_dir", master->flags.log_dir.get());
- }
+ if (master->info().has_domain()) {
+ writer->field("domain", master->info().domain());
+ }
- if (master->flags.external_log_file.isSome()) {
- writer->field("external_log_file",
- master->flags.external_log_file.get());
+ // TODO(haosdent): Deprecated this in favor of `leader_info` below.
+ if (master->leader.isSome()) {
+ writer->field("leader", master->leader->pid());
+ }
+
+ if (master->leader.isSome()) {
+ writer->field("leader_info", [this](JSON::ObjectWriter* writer) {
+ json(writer, master->leader.get());
+ });
+ }
+
+ if (approvers->approved<VIEW_FLAGS>()) {
+ if (master->flags.cluster.isSome()) {
+ writer->field("cluster", master->flags.cluster.get());
+ }
+
+ if (master->flags.log_dir.isSome()) {
+ writer->field("log_dir", master->flags.log_dir.get());
+ }
+
+ if (master->flags.external_log_file.isSome()) {
+ writer->field("external_log_file",
+ master->flags.external_log_file.get());
+ }
+
+ writer->field("flags", [this](JSON::ObjectWriter* writer) {
+ foreachvalue (const flags::Flag& flag, master->flags) {
+ Option<string> value = flag.stringify(master->flags);
+ if (value.isSome()) {
+ writer->field(flag.effective_name().value, value.get());
}
+ }
+ });
+ }
- writer->field("flags", [this](JSON::ObjectWriter* writer) {
- foreachvalue (const flags::Flag& flag, master->flags) {
- Option<string> value = flag.stringify(master->flags);
- if (value.isSome()) {
- writer->field(flag.effective_name().value, value.get());
- }
- }
- });
+ // Model all of the registered slaves.
+ writer->field(
+ "slaves",
+ [this, &approvers](JSON::ArrayWriter* writer) {
+ foreachvalue (Slave* slave, master->slaves.registered) {
+ writer->element(SlaveWriter(*slave, approvers));
}
+ });
- // Model all of the registered slaves.
- writer->field(
- "slaves",
- [this, &approvers](JSON::ArrayWriter* writer) {
- foreachvalue (Slave* slave, master->slaves.registered) {
- writer->element(SlaveWriter(*slave, approvers));
- }
- });
+ // Model all of the recovered slaves.
+ writer->field(
+ "recovered_slaves",
+ [this](JSON::ArrayWriter* writer) {
+ foreachvalue (
+ const SlaveInfo& slaveInfo, master->slaves.recovered) {
+ writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
+ json(writer, slaveInfo);
+ });
+ }
+ });
- // Model all of the recovered slaves.
- writer->field(
- "recovered_slaves",
- [this](JSON::ArrayWriter* writer) {
- foreachvalue (
- const SlaveInfo& slaveInfo, master->slaves.recovered) {
- writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
- json(writer, slaveInfo);
- });
- }
- });
+ // Model all of the frameworks.
+ writer->field(
+ "frameworks",
+ [this, &approvers](JSON::ArrayWriter* writer) {
+ foreachvalue (
+ Framework* framework, master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ continue;
+ }
- // Model all of the frameworks.
- writer->field(
- "frameworks",
- [this, &approvers](JSON::ArrayWriter* writer) {
- foreachvalue (
- Framework* framework, master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- continue;
- }
+ writer->element(FullFrameworkWriter(approvers, framework));
+ }
+ });
- writer->element(FullFrameworkWriter(approvers, framework));
- }
- });
+ // Model all of the completed frameworks.
+ writer->field(
+ "completed_frameworks",
+ [this, &approvers](JSON::ArrayWriter* writer) {
+ foreachvalue (
+ const Owned<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ continue;
+ }
- // Model all of the completed frameworks.
- writer->field(
- "completed_frameworks",
- [this, &approvers](JSON::ArrayWriter* writer) {
- foreachvalue (
- const Owned<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- continue;
- }
+ writer->element(
+ FullFrameworkWriter(approvers, framework.get()));
+ }
+ });
- writer->element(
- FullFrameworkWriter(approvers, framework.get()));
- }
- });
+ // Orphan tasks are no longer possible. We emit an empty array
+ // for the sake of backward compatibility.
+ writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});
- // Orphan tasks are no longer possible. We emit an empty array
- // for the sake of backward compatibility.
- writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});
+ // Unregistered frameworks are no longer possible. We emit an
+ // empty array for the sake of backward compatibility.
+ writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
+ };
- // Unregistered frameworks are no longer possible. We emit an
- // empty array for the sake of backward compatibility.
- writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
- };
+ return OK(jsonify(calculateState), request.url.query.get("jsonp"));
+ };
- return OK(jsonify(state), request.url.query.get("jsonp"));
- }));
+ // Produce the responses in parallel.
+ //
+ // TODO(alexr): Consider abstracting this into `parallel_async` or
+ // `foreach_parallel`, see MESOS-8587.
+ //
+ // TODO(alexr): Consider moving `BatchedStateRequest`'s fields into
+ // `process::async` once it supports moving.
+ foreach (BatchedStateRequest& request, batchedStateRequests) {
+ request.promise.associate(process::async(
+ produceResponse, request.request, request.approvers));
+ }
+
+ // Block the master actor until all workers have generated state responses.
+ // It is crucial not to allow the master actor to continue and possibly
+ // modify its state while a worker is reading it.
+ //
+ // NOTE: There is the potential for deadlock since we are blocking 1 working
+ // thread here, see MESOS-8256.
+ vector<Future<Response>> responses;
+ foreach (const BatchedStateRequest& request, batchedStateRequests) {
+ responses.push_back(request.promise.future());
+ }
+ process::await(responses).await();
+
+ batchedStateRequests.clear();
}
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 209b998..85ef14c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -44,6 +44,7 @@
#include <mesos/scheduler/scheduler.hpp>
+#include <process/future.hpp>
#include <process/limiter.hpp>
#include <process/http.hpp>
#include <process/owned.hpp>
@@ -1461,10 +1462,12 @@ private:
principal) const;
// /master/state
+ //
+ // NOTE: Requests to this endpoint are batched.
process::Future<process::http::Response> state(
const process::http::Request& request,
const Option<process::http::authentication::Principal>&
- principal) const;
+ principal);
// /master/state-summary
process::Future<process::http::Response> stateSummary(
@@ -1552,6 +1555,17 @@ private:
const Option<process::http::authentication::Principal>&
principal) const;
+ // A continuation for `state()`. Schedules request processing in a batch
+ // of other '/state' requests.
+ process::Future<process::http::Response> deferStateRequest(
+ const process::http::Request& request,
+ const process::Owned<ObjectApprovers>& approvers);
+
+ // A helper that responds to batched, i.e., accumulated, '/state'
+ // requests in parallel, i.e., a continuation for `deferStateRequest()`.
+ // See also `BatchedStateRequest`.
+ void processStateRequestsBatch();
+
process::Future<std::vector<const Task*>> _tasks(
const size_t limit,
const size_t offset,
@@ -1830,6 +1844,17 @@ private:
// NOTE: The weights specific pieces of the Operator API are factored
// out into this separate class.
WeightsHandler weightsHandler;
+
+ // TODO(alexr): Consider adding a `type` or `handler` field to expand
+ // batching to other heavy read-only requests, e.g., '/state-summary'.
+ struct BatchedStateRequest
+ {
+ process::http::Request request;
+ process::Owned<ObjectApprovers> approvers;
+ process::Promise<process::http::Response> promise;
+ };
+
+ std::vector<BatchedStateRequest> batchedStateRequests;
};
Master(const Master&); // No copying.