You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/07/06 21:53:41 UTC
mesos git commit: Refactor all `get*` helper functions for master API
to blocking version.
Repository: mesos
Updated Branches:
refs/heads/master 8cbc1aeff -> b16dbb2c2
Refactor all `get*` helper functions for master API to blocking version.
This makes sure \`GetState()\` collection is done in an
atomic fashion and avoid a race condition when doing `subscribe`.
Things can be improved later:
\- \`Swap\` and RVO to avoid large data copy.
\- refactor the common logic of `ObjectApprover` into another helper
function.
\- craft out test case for actual race condition.
Review: https://reviews.apache.org/r/49722/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b16dbb2c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b16dbb2c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b16dbb2c
Branch: refs/heads/master
Commit: b16dbb2c22faab5bdff088aef668f15a7dcf696f
Parents: 8cbc1ae
Author: Zhitao Li <zh...@gmail.com>
Authored: Wed Jul 6 16:48:31 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jul 6 16:53:15 2016 -0500
----------------------------------------------------------------------
src/master/http.cpp | 680 +++++++++++++++++++++++++--------------------
src/master/master.hpp | 28 +-
2 files changed, 393 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b16dbb2c/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 32b15d1..bff1fd5 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -588,41 +588,83 @@ Future<Response> Master::Http::api(
case mesos::master::Call::REMOVE_QUOTA:
return quotaHandler.remove(call, principal);
- case mesos::master::Call::SUBSCRIBE: {
- return _getState(principal)
- .then(defer(master->self(),
- [this, acceptType]
- (const mesos::master::Response::GetState& getState)
- -> Future<Response> {
- // TODO(zhitao): There is a possible race condition here: if an action
- // like `taskUpdate()` is queued between `_getState()` and this
- // continuation, neither the event will be sent to the subscriber
- // (because the connection is not in subscribers yet), nor
- // the effect of the change would be captured in the snapshot.
- Pipe pipe;
- OK ok;
-
- ok.headers["Content-Type"] = stringify(acceptType);
- ok.type = Response::PIPE;
- ok.reader = pipe.reader();
-
- HttpConnection http {pipe.writer(), acceptType, UUID::random()};
- master->subscribe(http);
-
- mesos::master::Event event;
- event.set_type(mesos::master::Event::SUBSCRIBED);
- event.mutable_subscribed()->mutable_get_state()->CopyFrom(getState);
- http.send<mesos::master::Event, v1::master::Event>(event);
-
- return ok;
- }));
- }
+ case mesos::master::Call::SUBSCRIBE:
+ return subscribe(call, principal, acceptType);
}
UNREACHABLE();
}
+Future<Response> Master::Http::subscribe(
+ const mesos::master::Call& call,
+ const Option<string>& principal,
+ ContentType contentType) const
+{
+ CHECK_EQ(mesos::master::Call::SUBSCRIBE, call.type());
+
+ // Retrieve Approvers for authorizing frameworks and tasks.
+ Future<Owned<ObjectApprover>> frameworksApprover;
+ Future<Owned<ObjectApprover>> tasksApprover;
+ Future<Owned<ObjectApprover>> executorsApprover;
+ if (master->authorizer.isSome()) {
+ authorization::Subject subject;
+ if (principal.isSome()) {
+ subject.set_value(principal.get());
+ }
+
+ frameworksApprover = master->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_FRAMEWORK);
+
+ tasksApprover = master->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_TASK);
+
+ executorsApprover = master->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_EXECUTOR);
+ } else {
+ frameworksApprover = Owned<ObjectApprover>(
+ new AcceptingObjectApprover());
+ tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ executorsApprover = Owned<ObjectApprover>(
+ new AcceptingObjectApprover());
+ }
+
+ return collect(frameworksApprover, tasksApprover, executorsApprover)
+ .then(defer(master->self(),
+ [=](const tuple<Owned<ObjectApprover>,
+ Owned<ObjectApprover>,
+ Owned<ObjectApprover>>& approvers)
+ -> Future<Response> {
+ // Get approver from tuple.
+ Owned<ObjectApprover> frameworksApprover;
+ Owned<ObjectApprover> tasksApprover;
+ Owned<ObjectApprover> executorsApprover;
+ tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
+
+ Pipe pipe;
+ OK ok;
+
+ ok.headers["Content-Type"] = stringify(contentType);
+ ok.type = Response::PIPE;
+ ok.reader = pipe.reader();
+
+ HttpConnection http {pipe.writer(), contentType, UUID::random()};
+ master->subscribe(http);
+
+ mesos::master::Event event;
+ event.set_type(mesos::master::Event::SUBSCRIBED);
+ event.mutable_subscribed()->mutable_get_state()->CopyFrom(
+ _getState(frameworksApprover,
+ tasksApprover,
+ executorsApprover));
+
+ http.send<mesos::master::Event, v1::master::Event>(event);
+
+ return ok;
+ }));
+}
+
+
// TODO(ijimenez): Add some information or pointers to help
// users understand the HTTP Event/Call API.
string Master::Http::SCHEDULER_HELP()
@@ -1361,22 +1403,6 @@ Future<Response> Master::Http::getFrameworks(
{
CHECK_EQ(mesos::master::Call::GET_FRAMEWORKS, call.type());
- return _getFrameworks(principal)
- .then([contentType](
- const mesos::master::Response::GetFrameworks& getFrameworks)
- -> Future<Response> {
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_FRAMEWORKS);
- response.mutable_get_frameworks()->CopyFrom(getFrameworks);
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
- });
-}
-
-
-Future<mesos::master::Response::GetFrameworks> Master::Http::_getFrameworks(
- const Option<string>& principal) const
-{
// Retrieve `ObjectApprover`s for authorizing frameworks.
Future<Owned<ObjectApprover>> frameworksApprover;
@@ -1396,58 +1422,70 @@ Future<mesos::master::Response::GetFrameworks> Master::Http::_getFrameworks(
return frameworksApprover
.then(defer(master->self(),
[=](const Owned<ObjectApprover>& frameworksApprover)
- -> Future<mesos::master::Response::GetFrameworks> {
- mesos::master::Response::GetFrameworks getFrameworks;
+ -> Future<Response> {
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_FRAMEWORKS);
+ response.mutable_get_frameworks()->CopyFrom(
+ _getFrameworks(frameworksApprover));
- foreachvalue (const Framework* framework,
- master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
- continue;
- }
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ }));
+}
- getFrameworks.add_frameworks()->CopyFrom(model(*framework));
- }
- foreach (const std::shared_ptr<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
- continue;
- }
+mesos::master::Response::GetFrameworks Master::Http::_getFrameworks(
+ const Owned<ObjectApprover>& frameworksApprover) const
+{
+ mesos::master::Response::GetFrameworks getFrameworks;
+ foreachvalue (const Framework* framework,
+ master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
- getFrameworks.add_completed_frameworks()->CopyFrom(model(*framework));
- }
+ getFrameworks.add_frameworks()->CopyFrom(model(*framework));
+ }
- foreachvalue (const Slave* slave, master->slaves.registered) {
- foreachkey (const FrameworkID& frameworkId, slave->tasks) {
- if (!master->frameworks.registered.contains(frameworkId)) {
- // TODO(haosdent): This logic should be simplified after
- // a deprecation cycle starting with 1.0 as after that
- // we can rely on `master->frameworks.recovered` containing
- // all FrameworkInfos.
- // Until then there are 3 cases:
- // - No authorization enabled: show all orphaned frameworks.
- // - Authorization enabled, but no FrameworkInfo present:
- // do not show orphaned frameworks.
- // - Authorization enabled, FrameworkInfo present: filter
- // based on `approveViewFrameworkInfo`.
- if (master->authorizer.isSome() &&
- (!master->frameworks.recovered.contains(frameworkId) ||
- !approveViewFrameworkInfo(
- frameworksApprover,
- master->frameworks.recovered[frameworkId]))) {
- continue;
- }
+ foreach (const std::shared_ptr<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
- getFrameworks.add_recovered_frameworks()->CopyFrom(
- master->frameworks.recovered[frameworkId]);
- }
+ getFrameworks.add_completed_frameworks()->CopyFrom(model(*framework));
+ }
+
+ foreachvalue (const Slave* slave, master->slaves.registered) {
+ foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+ if (!master->frameworks.registered.contains(frameworkId)) {
+ // TODO(haosdent): This logic should be simplified after
+ // a deprecation cycle starting with 1.0 as after that
+ // we can rely on `master->frameworks.recovered` containing
+ // all FrameworkInfos.
+ // Until then there are 3 cases:
+ // - No authorization enabled: show all orphaned frameworks.
+ // - Authorization enabled, but no FrameworkInfo present:
+ // do not show orphaned frameworks.
+ // - Authorization enabled, FrameworkInfo present: filter
+ // based on `approveViewFrameworkInfo`.
+ if (master->authorizer.isSome() &&
+ (!master->frameworks.recovered.contains(frameworkId) ||
+ !approveViewFrameworkInfo(
+ frameworksApprover,
+ master->frameworks.recovered[frameworkId]))) {
+ continue;
}
+
+ getFrameworks.add_recovered_frameworks()->CopyFrom(
+ master->frameworks.recovered[frameworkId]);
}
+ }
+ }
- return getFrameworks;
- }));
+ return getFrameworks;
}
@@ -1458,23 +1496,6 @@ Future<Response> Master::Http::getExecutors(
{
CHECK_EQ(mesos::master::Call::GET_EXECUTORS, call.type());
- return _getExecutors(principal)
- .then(
- [contentType](const mesos::master::Response::GetExecutors& getExecutors)
- -> Future<Response> {
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_EXECUTORS);
- response.mutable_get_executors()->CopyFrom(getExecutors);
-
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
- });
-}
-
-
-Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
- const Option<std::string>& principal) const
-{
// Retrieve `ObjectApprover`s for authorizing frameworks and executors.
Future<Owned<ObjectApprover>> frameworksApprover;
Future<Owned<ObjectApprover>> executorsApprover;
@@ -1498,95 +1519,110 @@ Future<mesos::master::Response::GetExecutors> Master::Http::_getExecutors(
.then(defer(master->self(),
[=](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>>& approvers)
- -> Future<mesos::master::Response::GetExecutors> {
+ -> Future<Response> {
// Get approver from tuple.
Owned<ObjectApprover> frameworksApprover;
Owned<ObjectApprover> executorsApprover;
tie(frameworksApprover, executorsApprover) = approvers;
- // Construct framework list with both active and completed frameworks.
- vector<const Framework*> frameworks;
- foreachvalue (Framework* framework, master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
- continue;
- }
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_EXECUTORS);
- frameworks.push_back(framework);
- }
+ response.mutable_get_executors()->CopyFrom(
+ _getExecutors(frameworksApprover, executorsApprover));
- foreach (const std::shared_ptr<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
- continue;
- }
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ }));
+}
- frameworks.push_back(framework.get());
- }
- mesos::master::Response::GetExecutors getExecutors;
+mesos::master::Response::GetExecutors Master::Http::_getExecutors(
+ const Owned<ObjectApprover>& frameworksApprover,
+ const Owned<ObjectApprover>& executorsApprover) const
+{
+ // Construct framework list with both active and completed frameworks.
+ vector<const Framework*> frameworks;
+ foreachvalue (Framework* framework, master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
- foreach (const Framework* framework, frameworks) {
- foreachpair (const SlaveID& slaveId,
- const auto& executorsMap,
- framework->executors) {
- foreachvalue (const ExecutorInfo& info, executorsMap) {
- // Skip unauthorized executors.
- if (!approveViewExecutorInfo(executorsApprover,
- info,
- framework->info)) {
- continue;
- }
+ frameworks.push_back(framework);
+ }
- mesos::master::Response::GetExecutors::Executor* executor =
- getExecutors.add_executors();
+ foreach (const std::shared_ptr<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
- executor->mutable_executor_info()->CopyFrom(info);
- executor->mutable_slave_id()->CopyFrom(slaveId);
- }
+ frameworks.push_back(framework.get());
+ }
+
+ mesos::master::Response::GetExecutors getExecutors;
+
+ foreach (const Framework* framework, frameworks) {
+ foreachpair (const SlaveID& slaveId,
+ const auto& executorsMap,
+ framework->executors) {
+ foreachvalue (const ExecutorInfo& info, executorsMap) {
+ // Skip unauthorized executors.
+ if (!approveViewExecutorInfo(executorsApprover,
+ info,
+ framework->info)) {
+ continue;
}
- }
- // Orphan executors.
- foreachvalue (const Slave* slave, master->slaves.registered) {
- typedef hashmap<ExecutorID, ExecutorInfo> ExecutorMap;
- foreachpair (const FrameworkID& frameworkId,
- const ExecutorMap& executors,
- slave->executors) {
- foreachvalue (const ExecutorInfo& info, executors) {
- if (!master->frameworks.registered.contains(frameworkId)) {
- // TODO(haosdent): This logic should be simplified after
- // a deprecation cycle starting with 1.0 as after that
- // we can rely on `master->frameworks.recovered` containing
- // all FrameworkInfos.
- // Until then there are 3 cases:
- // - No authorization enabled: show all orphaned executors.
- // - Authorization enabled, but no FrameworkInfo present:
- // do not show orphaned executors.
- // - Authorization enabled, FrameworkInfo present: filter
- // based on `approveViewExecutorInfo`.
- if (master->authorizer.isSome() &&
- (!master->frameworks.recovered.contains(frameworkId) ||
- !approveViewExecutorInfo(
- executorsApprover,
- info,
- master->frameworks.recovered[frameworkId]))) {
- continue;
- }
+ mesos::master::Response::GetExecutors::Executor* executor =
+ getExecutors.add_executors();
- mesos::master::Response::GetExecutors::Executor* executor =
- getExecutors.add_orphan_executors();
+ executor->mutable_executor_info()->CopyFrom(info);
+ executor->mutable_slave_id()->CopyFrom(slaveId);
+ }
+ }
+ }
- executor->mutable_executor_info()->CopyFrom(info);
- executor->mutable_slave_id()->CopyFrom(slave->id);
- }
+ // Orphan executors.
+ foreachvalue (const Slave* slave, master->slaves.registered) {
+ typedef hashmap<ExecutorID, ExecutorInfo> ExecutorMap;
+ foreachpair (const FrameworkID& frameworkId,
+ const ExecutorMap& executors,
+ slave->executors) {
+ foreachvalue (const ExecutorInfo& info, executors) {
+ if (!master->frameworks.registered.contains(frameworkId)) {
+ // TODO(haosdent): This logic should be simplified after
+ // a deprecation cycle starting with 1.0 as after that
+ // we can rely on `master->frameworks.recovered` containing
+ // all FrameworkInfos.
+ // Until then there are 3 cases:
+ // - No authorization enabled: show all orphaned executors.
+ // - Authorization enabled, but no FrameworkInfo present:
+ // do not show orphaned executors.
+ // - Authorization enabled, FrameworkInfo present: filter
+ // based on `approveViewExecutorInfo`.
+ if (master->authorizer.isSome() &&
+ (!master->frameworks.recovered.contains(frameworkId) ||
+ !approveViewExecutorInfo(
+ executorsApprover,
+ info,
+ master->frameworks.recovered[frameworkId]))) {
+ continue;
}
+
+ mesos::master::Response::GetExecutors::Executor* executor =
+ getExecutors.add_orphan_executors();
+
+ executor->mutable_executor_info()->CopyFrom(info);
+ executor->mutable_slave_id()->CopyFrom(slave->id);
}
}
+ }
+ }
- return getExecutors;
- }));
+ return getExecutors;
}
@@ -1597,44 +1633,79 @@ Future<Response> Master::Http::getState(
{
CHECK_EQ(mesos::master::Call::GET_STATE, call.type());
- return _getState(principal)
- .then([contentType](const mesos::master::Response::GetState& getState)
- -> Future<Response> {
+ // Retrieve Approvers for authorizing frameworks and tasks.
+ Future<Owned<ObjectApprover>> frameworksApprover;
+ Future<Owned<ObjectApprover>> tasksApprover;
+ Future<Owned<ObjectApprover>> executorsApprover;
+ if (master->authorizer.isSome()) {
+ authorization::Subject subject;
+ if (principal.isSome()) {
+ subject.set_value(principal.get());
+ }
+
+ frameworksApprover = master->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_FRAMEWORK);
+
+ tasksApprover = master->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_TASK);
+
+ executorsApprover = master->authorizer.get()->getObjectApprover(
+ subject, authorization::VIEW_EXECUTOR);
+ } else {
+ frameworksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ tasksApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ executorsApprover = Owned<ObjectApprover>(new AcceptingObjectApprover());
+ }
+
+ return collect(frameworksApprover, tasksApprover, executorsApprover)
+ .then(defer(master->self(),
+ [=](const tuple<Owned<ObjectApprover>,
+ Owned<ObjectApprover>,
+ Owned<ObjectApprover>>& approvers)
+ -> Future<Response> {
+ // Get approver from tuple.
+ Owned<ObjectApprover> frameworksApprover;
+ Owned<ObjectApprover> tasksApprover;
+ Owned<ObjectApprover> executorsApprover;
+ tie(frameworksApprover, tasksApprover, executorsApprover) = approvers;
+
mesos::master::Response response;
response.set_type(mesos::master::Response::GET_STATE);
- response.mutable_get_state()->CopyFrom(getState);
+ response.mutable_get_state()->CopyFrom(
+ _getState(frameworksApprover,
+ tasksApprover,
+ executorsApprover));
return OK(serialize(contentType, evolve(response)),
stringify(contentType));
- });
+ }));
}
-Future<mesos::master::Response::GetState> Master::Http::_getState(
- const Option<string>& principal) const
+mesos::master::Response::GetState Master::Http::_getState(
+ const Owned<ObjectApprover>& frameworksApprover,
+ const Owned<ObjectApprover>& tasksApprover,
+ const Owned<ObjectApprover>& executorsApprover) const
{
- return collect(
- _getTasks(principal),
- _getExecutors(principal),
- _getFrameworks(principal),
- _getAgents(principal))
- .then(defer(master->self(),
- [](const tuple<mesos::master::Response::GetTasks,
- mesos::master::Response::GetExecutors,
- mesos::master::Response::GetFrameworks,
- mesos::master::Response::GetAgents>& results)
- -> mesos::master::Response::GetState {
- mesos::master::Response::GetState getState;
-
- // Use std::get instead of std::tie to avoid
- // unnecessary copy of large data structs.
- getState.mutable_get_tasks()->CopyFrom(std::get<0>(results));
- getState.mutable_get_executors()->CopyFrom(std::get<1>(results));
- getState.mutable_get_frameworks()->CopyFrom(std::get<2>(results));
- getState.mutable_get_agents()->CopyFrom(std::get<3>(results));
-
- return getState;
- }));
+ // NOTE: This function must be blocking instead of returning a
+ // `Future`. This is because `subscribe()` needs to atomically
+ // add subscriber to `subscribers` map and send the captured state
+ // in `SUBSCRIBED` without being interleaved by any other events.
+
+ mesos::master::Response::GetState getState;
+
+ getState.mutable_get_tasks()->CopyFrom(
+ _getTasks(frameworksApprover, tasksApprover));
+
+ getState.mutable_get_executors()->CopyFrom(
+ _getExecutors(frameworksApprover, executorsApprover));
+
+ getState.mutable_get_frameworks()->CopyFrom(
+ _getFrameworks(frameworksApprover));
+
+ getState.mutable_get_agents()->CopyFrom(_getAgents());
+
+ return getState;
}
@@ -2210,21 +2281,16 @@ Future<process::http::Response> Master::Http::getAgents(
{
CHECK_EQ(mesos::master::Call::GET_AGENTS, call.type());
- return _getAgents(principal)
- .then([contentType](const mesos::master::Response::GetAgents& getAgents)
- -> Future<Response> {
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_AGENTS);
- response.mutable_get_agents()->CopyFrom(getAgents);
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_AGENTS);
+ response.mutable_get_agents()->CopyFrom(_getAgents());
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
- });
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
}
-Future<mesos::master::Response::GetAgents> Master::Http::_getAgents(
- const Option<string>& principal) const
+mesos::master::Response::GetAgents Master::Http::_getAgents() const
{
mesos::master::Response::GetAgents getAgents;
foreachvalue (const Slave* slave, master->slaves.registered) {
@@ -3523,20 +3589,6 @@ Future<Response> Master::Http::getTasks(
{
CHECK_EQ(mesos::master::Call::GET_TASKS, call.type());
- return _getTasks(principal)
- .then([contentType](const mesos::master::Response::GetTasks& getTasks)
- -> Future<Response> {
- mesos::master::Response response;
- response.set_type(mesos::master::Response::GET_TASKS);
- response.mutable_get_tasks()->CopyFrom(getTasks);
- return OK(serialize(contentType, evolve(response)),
- stringify(contentType));
- });
-}
-
-
-Future<mesos::master::Response::GetTasks> Master::Http::_getTasks(
- const Option<string>& principal) const {
// Retrieve Approvers for authorizing frameworks and tasks.
Future<Owned<ObjectApprover>> frameworksApprover;
Future<Owned<ObjectApprover>> tasksApprover;
@@ -3560,105 +3612,123 @@ Future<mesos::master::Response::GetTasks> Master::Http::_getTasks(
.then(defer(master->self(),
[=](const tuple<Owned<ObjectApprover>,
Owned<ObjectApprover>>& approvers)
- -> Future<mesos::master::Response::GetTasks> {
+ -> Future<Response> {
// Get approver from tuple.
Owned<ObjectApprover> frameworksApprover;
Owned<ObjectApprover> tasksApprover;
tie(frameworksApprover, tasksApprover) = approvers;
- // Construct framework list with both active and completed frameworks.
- vector<const Framework*> frameworks;
- foreachvalue (Framework* framework, master->frameworks.registered) {
- // Skip unauthorized frameworks.
- if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
- continue;
- }
+ mesos::master::Response response;
+ response.set_type(mesos::master::Response::GET_TASKS);
- frameworks.push_back(framework);
- }
+ response.mutable_get_tasks()->CopyFrom(
+ _getTasks(frameworksApprover,
+ tasksApprover));
- foreach (const std::shared_ptr<Framework>& framework,
- master->frameworks.completed) {
- // Skip unauthorized frameworks.
- if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
- continue;
- }
+ return OK(serialize(contentType, evolve(response)),
+ stringify(contentType));
+ }));
+}
- frameworks.push_back(framework.get());
- }
- mesos::master::Response::GetTasks getTasks;
- vector<const Task*> tasks;
- foreach (const Framework* framework, frameworks) {
- // Pending tasks.
- foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
- // Skip unauthorized tasks.
- if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
- continue;
- }
+mesos::master::Response::GetTasks Master::Http::_getTasks(
+ const Owned<ObjectApprover>& frameworksApprover,
+ const Owned<ObjectApprover>& tasksApprover) const
+{
+ // Construct framework list with both active and completed frameworks.
+ vector<const Framework*> frameworks;
+ foreachvalue (Framework* framework, master->frameworks.registered) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
- const Task& task =
- protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+ frameworks.push_back(framework);
+ }
- getTasks.add_pending_tasks()->CopyFrom(task);
- }
+ foreach (const std::shared_ptr<Framework>& framework,
+ master->frameworks.completed) {
+ // Skip unauthorized frameworks.
+ if (!approveViewFrameworkInfo(frameworksApprover, framework->info)) {
+ continue;
+ }
- // Active tasks.
- foreachvalue (Task* task, framework->tasks) {
- CHECK_NOTNULL(task);
- // Skip unauthorized tasks.
- if (!approveViewTask(tasksApprover, *task, framework->info)) {
- continue;
- }
+ frameworks.push_back(framework.get());
+ }
- getTasks.add_tasks()->CopyFrom(*task);
- }
+ mesos::master::Response::GetTasks getTasks;
- // Completed tasks.
- foreach (const std::shared_ptr<Task>& task, framework->completedTasks) {
- // Skip unauthorized tasks.
- if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
- continue;
- }
+ vector<const Task*> tasks;
+ foreach (const Framework* framework, frameworks) {
+ // Pending tasks.
+ foreachvalue (const TaskInfo& taskInfo, framework->pendingTasks) {
+ // Skip unauthorized tasks.
+ if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {
+ continue;
+ }
- getTasks.add_completed_tasks()->CopyFrom(*task);
- }
+ const Task& task =
+ protobuf::createTask(taskInfo, TASK_STAGING, framework->id());
+
+ getTasks.add_pending_tasks()->CopyFrom(task);
+ }
+
+ // Active tasks.
+ foreachvalue (Task* task, framework->tasks) {
+ CHECK_NOTNULL(task);
+ // Skip unauthorized tasks.
+ if (!approveViewTask(tasksApprover, *task, framework->info)) {
+ continue;
}
- // Orphan tasks.
- foreachvalue (const Slave* slave, master->slaves.registered) {
- typedef hashmap<TaskID, Task*> TaskMap;
- foreachvalue (const TaskMap& tasks, slave->tasks) {
- foreachvalue (const Task* task, tasks) {
- CHECK_NOTNULL(task);
- const FrameworkID& frameworkId = task->framework_id();
- if (!master->frameworks.registered.contains(frameworkId)) {
- // TODO(joerg84): This logic should be simplified after
- // a deprecation cycle starting with 1.0 as after that
- // we can rely on `master->frameworks.recovered` containing
- // all FrameworkInfos.
- // Until then there are 3 cases:
- // - No authorization enabled: show all orphaned tasks.
- // - Authorization enabled, but no FrameworkInfo present:
- // do not show orphaned tasks.
- // - Authorization enabled, FrameworkInfo present: filter
- // based on `approveViewTask`.
- if (master->authorizer.isSome() &&
- (!master->frameworks.recovered.contains(frameworkId) ||
- !approveViewTask(
- tasksApprover,
- *task,
- master->frameworks.recovered[frameworkId]))) {
- continue;
- }
+ getTasks.add_tasks()->CopyFrom(*task);
+ }
- getTasks.add_orphan_tasks()->CopyFrom(*task);
+ // Completed tasks.
+ foreach (const std::shared_ptr<Task>& task, framework->completedTasks) {
+ // Skip unauthorized tasks.
+ if (!approveViewTask(tasksApprover, *task.get(), framework->info)) {
+ continue;
+ }
+
+ getTasks.add_completed_tasks()->CopyFrom(*task);
+ }
+
+ // Orphan tasks.
+ foreachvalue (const Slave* slave, master->slaves.registered) {
+ typedef hashmap<TaskID, Task*> TaskMap;
+ foreachvalue (const TaskMap& tasks, slave->tasks) {
+ foreachvalue (const Task* task, tasks) {
+ CHECK_NOTNULL(task);
+ const FrameworkID& frameworkId = task->framework_id();
+ if (!master->frameworks.registered.contains(frameworkId)) {
+ // TODO(joerg84): This logic should be simplified after
+ // a deprecation cycle starting with 1.0 as after that
+ // we can rely on `master->frameworks.recovered` containing
+ // all FrameworkInfos.
+ // Until then there are 3 cases:
+ // - No authorization enabled: show all orphaned tasks.
+ // - Authorization enabled, but no FrameworkInfo present:
+ // do not show orphaned tasks.
+ // - Authorization enabled, FrameworkInfo present: filter
+ // based on `approveViewTask`.
+ if (master->authorizer.isSome() &&
+ (!master->frameworks.recovered.contains(frameworkId) ||
+ !approveViewTask(
+ tasksApprover,
+ *task,
+ master->frameworks.recovered[frameworkId]))) {
+ continue;
}
+
+ getTasks.add_orphan_tasks()->CopyFrom(*task);
}
}
}
- return getTasks;
- }));
+ }
+ }
+
+ return getTasks;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b16dbb2c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0688ba3..60efd22 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1388,8 +1388,7 @@ private:
const Option<std::string>& principal,
ContentType contentType) const;
- process::Future<mesos::master::Response::GetAgents> _getAgents(
- const Option<std::string>& principal) const;
+ mesos::master::Response::GetAgents _getAgents() const;
process::Future<process::http::Response> getFlags(
const mesos::master::Call& call,
@@ -1461,8 +1460,9 @@ private:
const Option<std::string>& principal,
ContentType contentType) const;
- process::Future<mesos::master::Response::GetTasks> _getTasks(
- const Option<std::string>& principal) const;
+ mesos::master::Response::GetTasks _getTasks(
+ const process::Owned<ObjectApprover>& frameworksApprover,
+ const process::Owned<ObjectApprover>& tasksApprover) const;
process::Future<process::http::Response> createVolumes(
const mesos::master::Call& call,
@@ -1489,24 +1489,32 @@ private:
const Option<std::string>& principal,
ContentType contentType) const;
- process::Future<mesos::master::Response::GetFrameworks> _getFrameworks(
- const Option<std::string>& principal) const;
+ mesos::master::Response::GetFrameworks _getFrameworks(
+ const process::Owned<ObjectApprover>& frameworksApprover) const;
process::Future<process::http::Response> getExecutors(
const mesos::master::Call& call,
const Option<std::string>& principal,
ContentType contentType) const;
- process::Future<mesos::master::Response::GetExecutors> _getExecutors(
- const Option<std::string>& principal) const;
+ mesos::master::Response::GetExecutors _getExecutors(
+ const process::Owned<ObjectApprover>& frameworksApprover,
+ const process::Owned<ObjectApprover>& executorsApprover) const;
process::Future<process::http::Response> getState(
const mesos::master::Call& call,
const Option<std::string>& principal,
ContentType contentType) const;
- process::Future<mesos::master::Response::GetState> _getState(
- const Option<std::string>& principal) const;
+ mesos::master::Response::GetState _getState(
+ const process::Owned<ObjectApprover>& frameworksApprover,
+ const process::Owned<ObjectApprover>& taskApprover,
+ const process::Owned<ObjectApprover>& executorsApprover) const;
+
+ process::Future<process::http::Response> subscribe(
+ const mesos::master::Call& call,
+ const Option<std::string>& principal,
+ ContentType contentType) const;
Master* master;