You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/08/21 06:43:36 UTC
[mesos] 02/04: Moved state serialization into separate function.
This is an automated email from the ASF dual-hosted git repository.
alexr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 943aa2c984bfab59e327e96e955a2de72b91218f
Author: Benno Evers <be...@mesosphere.com>
AuthorDate: Tue Aug 21 08:10:09 2018 +0200
Moved state serialization into separate function.
In preparation for the changes in the subsequent commit,
the serialization logic was moved from an inner lambda
to a separate function.
Review: https://reviews.apache.org/r/68343/
---
src/master/http.cpp | 246 +++++++++++++++++++++++++-------------------------
src/master/master.hpp | 25 +++++
2 files changed, 150 insertions(+), 121 deletions(-)
diff --git a/src/master/http.cpp b/src/master/http.cpp
index e2773ed..1b6a840 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -2871,154 +2871,153 @@ Future<Response> Master::Http::deferStateRequest(
}
-void Master::Http::processStateRequestsBatch()
+process::http::Response Master::ReadOnlyHandler::state(
+ const process::http::Request& request,
+ const process::Owned<ObjectApprovers>& approvers) const
{
- CHECK(!batchedStateRequests.empty())
- << "Bug in state batching logic: No requests to process";
-
- // This lambda is consumed before the enclosed function returns,
- // hence capturing `this` is fine here.
- auto produceResponse = [this](
- const Request& request,
- const Owned<ObjectApprovers>& approvers) -> Response {
- // This lambda is consumed before the outer lambda returns,
- // hence capturing a reference is fine here.
- auto calculateState = [this, &approvers](JSON::ObjectWriter* writer) {
- writer->field("version", MESOS_VERSION);
-
- if (build::GIT_SHA.isSome()) {
- writer->field("git_sha", build::GIT_SHA.get());
- }
+ const Master* master = this->master;
+ auto calculateState = [master, &approvers](JSON::ObjectWriter* writer) {
+ writer->field("version", MESOS_VERSION);
- if (build::GIT_BRANCH.isSome()) {
- writer->field("git_branch", build::GIT_BRANCH.get());
- }
+ if (build::GIT_SHA.isSome()) {
+ writer->field("git_sha", build::GIT_SHA.get());
+ }
- if (build::GIT_TAG.isSome()) {
- writer->field("git_tag", build::GIT_TAG.get());
- }
+ if (build::GIT_BRANCH.isSome()) {
+ writer->field("git_branch", build::GIT_BRANCH.get());
+ }
- writer->field("build_date", build::DATE);
- writer->field("build_time", build::TIME);
- writer->field("build_user", build::USER);
- writer->field("start_time", master->startTime.secs());
+ if (build::GIT_TAG.isSome()) {
+ writer->field("git_tag", build::GIT_TAG.get());
+ }
- if (master->electedTime.isSome()) {
- writer->field("elected_time", master->electedTime->secs());
- }
+ writer->field("build_date", build::DATE);
+ writer->field("build_time", build::TIME);
+ writer->field("build_user", build::USER);
+ writer->field("start_time", master->startTime.secs());
- writer->field("id", master->info().id());
- writer->field("pid", string(master->self()));
- writer->field("hostname", master->info().hostname());
- writer->field("capabilities", master->info().capabilities());
- writer->field("activated_slaves", master->_slaves_active());
- writer->field("deactivated_slaves", master->_slaves_inactive());
- writer->field("unreachable_slaves", master->_slaves_unreachable());
+ if (master->electedTime.isSome()) {
+ writer->field("elected_time", master->electedTime->secs());
+ }
- if (master->info().has_domain()) {
- writer->field("domain", master->info().domain());
- }
+ writer->field("id", master->info().id());
+ writer->field("pid", string(master->self()));
+ writer->field("hostname", master->info().hostname());
+ writer->field("capabilities", master->info().capabilities());
+ writer->field("activated_slaves", master->_const_slaves_active());
+ writer->field("deactivated_slaves", master->_const_slaves_inactive());
+ writer->field("unreachable_slaves", master->_const_slaves_unreachable());
- // TODO(haosdent): Deprecated this in favor of `leader_info` below.
- if (master->leader.isSome()) {
- writer->field("leader", master->leader->pid());
- }
+ if (master->info().has_domain()) {
+ writer->field("domain", master->info().domain());
+ }
- if (master->leader.isSome()) {
- writer->field("leader_info", [this](JSON::ObjectWriter* writer) {
- json(writer, master->leader.get());
- });
- }
+ // TODO(haosdent): Deprecated this in favor of `leader_info` below.
+ if (master->leader.isSome()) {
+ writer->field("leader", master->leader->pid());
+ }
- if (approvers->approved<VIEW_FLAGS>()) {
- if (master->flags.cluster.isSome()) {
- writer->field("cluster", master->flags.cluster.get());
- }
+ if (master->leader.isSome()) {
+ writer->field("leader_info", [master](JSON::ObjectWriter* writer) {
+ json(writer, master->leader.get());
+ });
+ }
- if (master->flags.log_dir.isSome()) {
- writer->field("log_dir", master->flags.log_dir.get());
- }
+ if (approvers->approved<VIEW_FLAGS>()) {
+ if (master->flags.cluster.isSome()) {
+ writer->field("cluster", master->flags.cluster.get());
+ }
- if (master->flags.external_log_file.isSome()) {
- writer->field("external_log_file",
- master->flags.external_log_file.get());
- }
+ if (master->flags.log_dir.isSome()) {
+ writer->field("log_dir", master->flags.log_dir.get());
+ }
- writer->field("flags", [this](JSON::ObjectWriter* writer) {
- foreachvalue (const flags::Flag& flag, master->flags) {
- Option<string> value = flag.stringify(master->flags);
- if (value.isSome()) {
- writer->field(flag.effective_name().value, value.get());
- }
- }
- });
+ if (master->flags.external_log_file.isSome()) {
+ writer->field("external_log_file",
+ master->flags.external_log_file.get());
}
- // Model all of the registered slaves.
- writer->field(
- "slaves",
- [this, &approvers](JSON::ArrayWriter* writer) {
- foreachvalue (Slave* slave, master->slaves.registered) {
- writer->element(SlaveWriter(*slave, approvers));
+ writer->field("flags", [master](JSON::ObjectWriter* writer) {
+ foreachvalue (const flags::Flag& flag, master->flags) {
+ Option<string> value = flag.stringify(master->flags);
+ if (value.isSome()) {
+ writer->field(flag.effective_name().value, value.get());
}
- });
+ }
+ });
+ }
- // Model all of the recovered slaves.
- writer->field(
- "recovered_slaves",
- [this](JSON::ArrayWriter* writer) {
- foreachvalue (
- const SlaveInfo& slaveInfo, master->slaves.recovered) {
- writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
- json(writer, slaveInfo);
- });
- }
- });
+ // Model all of the registered slaves.
+ writer->field(
+ "slaves",
+ [master, &approvers](JSON::ArrayWriter* writer) {
+ foreachvalue (Slave* slave, master->slaves.registered) {
+ writer->element(SlaveWriter(*slave, approvers));
+ }
+ });
- // Model all of the frameworks.
- writer->field(
- "frameworks",
- [this, &approvers](JSON::ArrayWriter* writer) {
- foreachvalue (
- Framework* framework, master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- continue;
- }
+ // Model all of the recovered slaves.
+ writer->field(
+ "recovered_slaves",
+ [master](JSON::ArrayWriter* writer) {
+ foreachvalue (
+ const SlaveInfo& slaveInfo, master->slaves.recovered) {
+ writer->element([&slaveInfo](JSON::ObjectWriter* writer) {
+ json(writer, slaveInfo);
+ });
+ }
+ });
- writer->element(FullFrameworkWriter(approvers, framework));
+ // Model all of the frameworks.
+ writer->field(
+ "frameworks",
+ [master, &approvers](JSON::ArrayWriter* writer) {
+ foreachvalue (
+ Framework* framework, master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ continue;
}
- });
- // Model all of the completed frameworks.
- writer->field(
- "completed_frameworks",
- [this, &approvers](JSON::ArrayWriter* writer) {
- foreachvalue (
- const Owned<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
- continue;
- }
+ writer->element(FullFrameworkWriter(approvers, framework));
+ }
+ });
- writer->element(
- FullFrameworkWriter(approvers, framework.get()));
+ // Model all of the completed frameworks.
+ writer->field(
+ "completed_frameworks",
+ [master, &approvers](JSON::ArrayWriter* writer) {
+ foreachvalue (
+ const Owned<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (!approvers->approved<VIEW_FRAMEWORK>(framework->info)) {
+ continue;
}
- });
- // Orphan tasks are no longer possible. We emit an empty array
- // for the sake of backward compatibility.
- writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});
+ writer->element(
+ FullFrameworkWriter(approvers, framework.get()));
+ }
+ });
- // Unregistered frameworks are no longer possible. We emit an
- // empty array for the sake of backward compatibility.
- writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
- };
+ // Orphan tasks are no longer possible. We emit an empty array
+ // for the sake of backward compatibility.
+ writer->field("orphan_tasks", [](JSON::ArrayWriter*) {});
- return OK(jsonify(calculateState), request.url.query.get("jsonp"));
+ // Unregistered frameworks are no longer possible. We emit an
+ // empty array for the sake of backward compatibility.
+ writer->field("unregistered_frameworks", [](JSON::ArrayWriter*) {});
};
+ return OK(jsonify(calculateState), request.url.query.get("jsonp"));
+}
+
+
+void Master::Http::processStateRequestsBatch()
+{
+ CHECK(!batchedStateRequests.empty())
+ << "Bug in state batching logic: No requests to process";
+
// Produce the responses in parallel.
//
// TODO(alexr): Consider abstracting this into `parallel_async` or
@@ -3028,7 +3027,12 @@ void Master::Http::processStateRequestsBatch()
// `process::async` once it supports moving.
foreach (BatchedStateRequest& request, batchedStateRequests) {
request.promise.associate(process::async(
- produceResponse, request.request, request.approvers));
+ [this](const process::http::Request& request,
+ const process::Owned<ObjectApprovers>& approvers) {
+ return readonlyHandler.state(request, approvers);
+ },
+ request.request,
+ request.approvers));
}
// Block the master actor until all workers have generated state responses.
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1020626..6312c29 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1384,12 +1384,35 @@ private:
Master* master;
};
+ // Inner class used to namespace HTTP handlers that do not change the
+ // underlying master object.
+ //
+ // NOTE: Most member functions of this class are not routed directly but
+ // dispatched from their corresponding handlers in the outer `Http` class.
+ // This is because deciding whether an incoming request is read-only often
+ // requires some inspection, e.g. distinguishing between "GET" and "POST"
+ // requests to the same endpoint.
+ class ReadOnlyHandler
+ {
+ public:
+ explicit ReadOnlyHandler(const Master* _master) : master(_master) {}
+
+ // /state
+ process::http::Response state(
+ const process::http::Request& request,
+ const process::Owned<ObjectApprovers>& approvers) const;
+
+ private:
+ const Master* master;
+ };
+
// Inner class used to namespace HTTP route handlers (see
// master/http.cpp for implementations).
class Http
{
public:
explicit Http(Master* _master) : master(_master),
+ readonlyHandler(_master),
quotaHandler(_master),
weightsHandler(_master) {}
@@ -1837,6 +1860,8 @@ private:
Master* master;
+ ReadOnlyHandler readonlyHandler;
+
// NOTE: The quota specific pieces of the Operator API are factored
// out into this separate class.
QuotaHandler quotaHandler;