You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/06/11 01:33:47 UTC
[2/7] git commit: Authorized launch tasks.
Authorized launch tasks.
Review: https://reviews.apache.org/r/22151
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a6c4ee70
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a6c4ee70
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a6c4ee70
Branch: refs/heads/vinod/authorize_tasks
Commit: a6c4ee70262380c3ecb3c43c8eee0108f6b224eb
Parents: b73ea62
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed May 28 16:45:22 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/common/protobuf_utils.hpp | 2 +
src/master/hierarchical_allocator_process.hpp | 1 -
src/master/master.cpp | 474 +++++++++++++++------
src/master/master.hpp | 36 +-
src/tests/master_authorization_tests.cpp | 209 +++++++++
6 files changed, 575 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 1d49dca..c91b438 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -975,6 +975,7 @@ mesos_tests_SOURCES = \
tests/log_tests.cpp \
tests/logging_tests.cpp \
tests/main.cpp \
+ tests/master_authorization_tests.cpp \
tests/master_contender_detector_tests.cpp \
tests/master_tests.cpp \
tests/mesos.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 0f65341..12ff00a 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -48,6 +48,8 @@ inline bool isTerminalState(const TaskState& state)
}
+// TODO(vinod): Make SlaveID optional because 'StatusUpdate.SlaveID'
+// is optional.
inline StatusUpdate createStatusUpdate(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 0c5e2e0..1765e70 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -553,7 +553,6 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesUnused(
// result of a valid task launch by an active
// framework that doesn't use the entire offer.
CHECK(frameworks.contains(frameworkId));
-
const std::string& role = frameworks[frameworkId].role();
sorters[role]->unallocated(frameworkId.value(), resources);
sorters[role]->remove(resources);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index df75c8a..1a2b219 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -25,6 +25,8 @@
#include <list>
#include <sstream>
+#include <process/check.hpp>
+#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/id.hpp>
@@ -68,6 +70,7 @@ using std::list;
using std::string;
using std::vector;
+using process::await;
using process::wait; // Necessary on some OS's to disambiguate.
using process::Clock;
using process::Failure;
@@ -576,6 +579,9 @@ void Master::finalize()
// allocator or the roles because it is unnecessary bookkeeping at
// this point since we are shutting down.
foreachvalue (Framework* framework, frameworks.activated) {
+ // Remove pending tasks from the framework.
+ framework->pendingTasks.clear();
+
// Remove pointers to the framework's tasks in slaves.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
Slave* slave = getSlave(task->slave_id());
@@ -1387,6 +1393,7 @@ struct TaskInfoVisitor
virtual ~TaskInfoVisitor() {}
};
+
// Checks that a task id is valid, i.e., contains only valid characters.
struct TaskIDChecker : TaskInfoVisitor
{
@@ -1445,22 +1452,21 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
{
const TaskID& taskId = task.task_id();
- if (ids.contains(taskId) || framework.tasks.contains(taskId)) {
+ if (framework.pendingTasks.contains(taskId) ||
+ framework.tasks.contains(taskId)) {
return "Task has duplicate ID: " + taskId.value();
}
-
- ids.insert(taskId);
-
return None();
}
-
- hashset<TaskID> ids;
};
-// Checks that the used resources by a task (and executor if
-// necessary) on each slave does not exceed the total resources
-// offered on that slave
+// Checks that the used resources by a task on each slave does not
+// exceed the total resources offered on that slave.
+// NOTE: We do not account for executor resources here because tasks
+// are launched asynchronously and an executor might exit between
+// validation and actual launch. Therefore executor resources are
+// accounted for in 'Master::_launchTasks()'.
struct ResourceUsageChecker : TaskInfoVisitor
{
virtual Option<Error> operator () (
@@ -1482,11 +1488,10 @@ struct ResourceUsageChecker : TaskInfoVisitor
// Check if this task uses more resources than offered.
Resources taskResources = task.resources();
- if (!((usedResources + taskResources) <= resources)) {
+ if (!(taskResources <= resources)) {
return "Task " + stringify(task.task_id()) + " attempted to use " +
- stringify(taskResources) + " combined with already used " +
- stringify(usedResources) + " is greater than offered " +
- stringify(resources);
+ stringify(taskResources) + " which is greater than offered " +
+ stringify(resources);
}
// Check this task's executor's resources.
@@ -1496,33 +1501,13 @@ struct ResourceUsageChecker : TaskInfoVisitor
if (!Resources::isAllocatable(resource)) {
// TODO(benh): Send back the invalid resources?
return "Executor for task " + stringify(task.task_id()) +
- " uses invalid resources " + stringify(resource);
- }
- }
-
- // Check if this task's executor is running, and if not check if
- // the task + the executor use more resources than offered.
- if (!executors.contains(task.executor().executor_id())) {
- if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
- taskResources += task.executor().resources();
- if (!((usedResources + taskResources) <= resources)) {
- return "Task " + stringify(task.task_id()) +
- " + executor attempted to use " + stringify(taskResources) +
- " combined with already used " + stringify(usedResources) +
- " is greater than offered " + stringify(resources);
- }
+ " uses invalid resources " + stringify(resource);
}
- executors.insert(task.executor().executor_id());
}
}
- usedResources += taskResources;
-
return None();
}
-
- Resources usedResources;
- hashset<ExecutorID> executors;
};
@@ -1544,11 +1529,26 @@ struct ExecutorInfoChecker : TaskInfoVisitor
if (task.has_executor()) {
const ExecutorID& executorId = task.executor().executor_id();
+ Option<ExecutorInfo> executorInfo = None();
+
if (slave.hasExecutor(framework.id, executorId)) {
- const Option<ExecutorInfo> executorInfo =
- slave.executors.get(framework.id).get().get(executorId);
+ executorInfo = slave.executors.get(framework.id).get().get(executorId);
+ } else {
+ // See if any of the pending tasks have the same executor.
+ // Note that picking the first matching executor is ok because
+ // all the matching executors have been added to
+ // 'framework.pendingTasks' after validation and hence have
+ // the same executor info.
+ foreachvalue (const TaskInfo& task_, framework.pendingTasks) {
+ if (task_.has_executor() &&
+ task_.executor().executor_id() == executorId) {
+ executorInfo = task_.executor();
+ break;
+ }
+ }
+ }
- if (!(task.executor() == executorInfo.get())) {
+ if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
return "Task has invalid ExecutorInfo (existing ExecutorInfo"
" with same ExecutorID is not compatible).\n"
"------------------------------------------------------------\n"
@@ -1558,7 +1558,6 @@ struct ExecutorInfoChecker : TaskInfoVisitor
"Task's ExecutorInfo:\n" +
stringify(task.executor()) + "\n"
"------------------------------------------------------------\n";
- }
}
}
@@ -1854,9 +1853,55 @@ void Master::launchTasks(
<< " on slave " << *slave
<< " for framework " << framework->id;
- Resources usedResources; // Accumulated resources used.
+ // Validate each task and launch if valid.
+ list<Future<Option<Error> > > futures;
+ foreach (const TaskInfo& task, tasks) {
+ futures.push_back(validateTask(task, framework, slave, totalResources));
+
+ // Add to pending tasks.
+ // NOTE: We need to do this here after validation because of the
+ // way task validators work.
+ framework->pendingTasks[task.task_id()] = task;
+ }
+
+ // Wait for all the tasks to be validated.
+ // NOTE: We wait for all tasks because currently the allocator
+ // is expected to get 'resourcesUnused()' once per 'launchTasks()'.
+ await(futures)
+ .onAny(defer(self(),
+ &Master::_launchTasks,
+ framework->id,
+ slaveId.get(),
+ tasks,
+ totalResources,
+ filters,
+ lambda::_1));
+}
+
+
+// Helper to convert authorization result to Future<Option<Error> >.
+static Future<Option<Error> > _authorize(const string& message, bool authorized)
+{
+ if (authorized) {
+ return None();
+ }
+
+ return Error(message);
+}
+
+
+Future<Option<Error> > Master::validateTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave,
+ const Resources& totalResources)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
// Create task visitors.
+ // TODO(vinod): Create the visitors on the heap and make the visit
+ // operation const.
list<TaskInfoVisitor*> taskVisitors;
taskVisitors.push_back(new TaskIDChecker());
taskVisitors.push_back(new SlaveIDChecker());
@@ -1867,40 +1912,241 @@ void Master::launchTasks(
// TODO(benh): Add a HealthCheckChecker visitor.
- // Loop through each task and check it's validity.
- foreach (const TaskInfo& task, tasks) {
- // Possible error found while checking task's validity.
- Option<Error> error = None();
+ // Invoke each visitor.
+ Option<Error> error = None();
+ foreach (TaskInfoVisitor* visitor, taskVisitors) {
+ error = (*visitor)(task, totalResources, *framework, *slave);
+ if (error.isSome()) {
+ break;
+ }
+ }
- // Invoke each visitor.
- foreach (TaskInfoVisitor* visitor, taskVisitors) {
- error = (*visitor)(task, totalResources, *framework, *slave);
- if (error.isSome()) {
- break;
- }
+ // Cleanup visitors.
+ while (!taskVisitors.empty()) {
+ TaskInfoVisitor* visitor = taskVisitors.front();
+ taskVisitors.pop_front();
+ delete visitor;
+ };
+
+ if (error.isSome()) {
+ return Error(error.get().message);
+ }
+
+ if (authorizer.isNone()) {
+ // Authorization is disabled.
+ return None();
+ }
+
+ // Authorize the task.
+ string user = framework->info.user(); // Default user.
+ if (task.has_command() && task.command().has_user()) {
+ user = task.command().user();
+ } else if (task.has_executor() && task.executor().command().has_user()) {
+ user = task.executor().command().user();
+ }
+
+ LOG(INFO)
+ << "Authorizing framework principal '" << framework->info.principal()
+ << "' to launch task " << task.task_id() << " as user '" << user << "'";
+
+ mesos::ACL::RunTasks request;
+ if (framework->info.has_principal()) {
+ request.mutable_principals()->add_values(framework->info.principal());
+ } else {
+ // Framework doesn't have a principal set.
+ request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+ }
+ request.mutable_users()->add_values(user);
+
+ return authorizer.get()->authorize(request).then(
+ lambda::bind(&_authorize,
+ "Not authorized to launch as user '" + user + "'",
+ lambda::_1));
+}
+
+
+void Master::launchTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+ CHECK(!slave->disconnected);
+
+ // Determine if this task launches an executor, and if so make sure
+ // the slave and framework state has been updated accordingly.
+ Option<ExecutorID> executorId;
+
+ if (task.has_executor()) {
+ // TODO(benh): Refactor this code into Slave::addTask.
+ if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
+ CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
+ << "Executor " << task.executor().executor_id()
+ << " known to the framework " << framework->id
+ << " but unknown to the slave " << *slave;
+
+ slave->addExecutor(framework->id, task.executor());
+ framework->addExecutor(slave->id, task.executor());
}
- if (error.isNone()) {
- // Task looks good, get it running!
- usedResources += launchTask(task, framework, slave);
- } else {
- // Error validating task, send a failed status update.
- LOG(WARNING) << "Failed to validate task " << task.task_id()
- << " : " << error.get().message;
+ executorId = task.executor().executor_id();
+ }
+
+ // Add the task to the framework and slave.
+ Task* t = new Task();
+ t->mutable_framework_id()->MergeFrom(framework->id);
+ t->set_state(TASK_STAGING);
+ t->set_name(task.name());
+ t->mutable_task_id()->MergeFrom(task.task_id());
+ t->mutable_slave_id()->MergeFrom(task.slave_id());
+ t->mutable_resources()->MergeFrom(task.resources());
+
+ if (executorId.isSome()) {
+ t->mutable_executor_id()->MergeFrom(executorId.get());
+ }
+
+ framework->addTask(t);
+
+ slave->addTask(t);
+
+ // Tell the slave to launch the task!
+ LOG(INFO) << "Launching task " << task.task_id()
+ << " of framework " << framework->id
+ << " with resources " << task.resources()
+ << " on slave " << *slave;
+
+ RunTaskMessage message;
+ message.mutable_framework()->MergeFrom(framework->info);
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.set_pid(framework->pid);
+ message.mutable_task()->MergeFrom(task);
+ send(slave->pid, message);
+
+ stats.tasks[TASK_STAGING]++;
+ return;
+}
+
+
+void Master::_launchTasks(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const vector<TaskInfo>& tasks,
+ const Resources& totalResources,
+ const Filters& filters,
+ const Future<list<Future<Option<Error> > > >& validationErrors)
+{
+ CHECK_READY(validationErrors);
+ CHECK_EQ(validationErrors.get().size(), tasks.size());
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework == NULL) {
+ LOG(WARNING)
+ << "Ignoring launch tasks message for framework " << frameworkId
+ << " because the framework cannot be found";
+
+ // Tell the allocator about the recovered resources.
+ allocator->resourcesRecovered(frameworkId, slaveId, totalResources);
+
+ return;
+ }
+
+ Slave* slave = getSlave(slaveId);
+ if (slave == NULL || slave->disconnected) {
+ foreach (const TaskInfo& task, tasks) {
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
- slave->id,
+ task.slave_id(),
+ task.task_id(),
+ TASK_LOST,
+ (slave == NULL ? "Slave removed" : "Slave disconnected"));
+
+ LOG(INFO) << "Sending status update " << update << ": "
+ << (slave == NULL ? "Slave removed" : "Slave disconnected");
+
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ send(framework->pid, message);
+ }
+
+ // Tell the allocator about the recovered resources.
+ allocator->resourcesRecovered(frameworkId, slaveId, totalResources);
+
+ return;
+ }
+
+ Resources usedResources; // Accumulated resources used.
+
+ size_t index = 0;
+ foreach (const Future<Option<Error> >& future, validationErrors.get()) {
+ const TaskInfo& task = tasks[index++];
+
+ // NOTE: The task will not be in 'pendingTasks' if 'killTask()'
+ // for the task was called before we are here.
+ if (!framework->pendingTasks.contains(task.task_id())) {
+ continue;
+ }
+
+ framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks.
+
+ CHECK(!future.isDiscarded());
+ if (future.isFailed() || future.get().isSome()) {
+ const string error = future.isFailed()
+ ? "Authorization failure: " + future.failure()
+ : future.get().get().message;
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ framework->id,
+ task.slave_id(),
+ task.task_id(),
+ TASK_LOST,
+ error);
+
+ LOG(INFO)
+ << "Sending status update " << update << ": " << error;
+
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ send(framework->pid, message);
+
+ continue;
+ }
+
+ // Check if resources needed by the task (and its executor in case
+ // the executor is new) are available. These resources will be
+ // added by 'launchTask()' below.
+ Resources resources = task.resources();
+ if (task.has_executor() &&
+ !slave->hasExecutor(framework->id, task.executor().executor_id())) {
+ resources += task.executor().resources();
+ }
+
+ if (!(usedResources + resources <= totalResources)) {
+ const string error =
+ "Task uses more resources " + stringify(resources) +
+ " than available " + stringify(totalResources - usedResources);
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ framework->id,
+ task.slave_id(),
task.task_id(),
TASK_LOST,
- error.get().message);
+ error);
+
+ LOG(INFO) << "Sending status update " << update << " for invalid task: "
+ << error;
- LOG(INFO) << "Sending status update "
- << update << " for invalid task";
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update);
send(framework->pid, message);
+
+ continue;
}
+
+ // Launch task.
+ launchTask(task, framework, slave);
+ usedResources += resources;
}
// All used resources should be allocatable, enforced by our validators.
@@ -1911,19 +2157,8 @@ void Master::launchTasks(
if (unusedResources.allocatable().size() > 0) {
// Tell the allocator about the unused (e.g., refused) resources.
- allocator->resourcesUnused(
- framework->id,
- slave->id,
- unusedResources,
- filters);
+ allocator->resourcesUnused(frameworkId, slaveId, unusedResources, filters);
}
-
- // Cleanup visitors.
- while (!taskVisitors.empty()) {
- TaskInfoVisitor* visitor = taskVisitors.front();
- taskVisitors.pop_front();
- delete visitor;
- };
}
@@ -1981,6 +2216,24 @@ void Master::killTask(
return;
}
+ if (framework->pendingTasks.contains(taskId)) {
+ // Remove from pending tasks.
+ framework->pendingTasks.erase(taskId);
+
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus* status = update->mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->set_state(TASK_KILLED);
+ status->set_message("Killed pending task");
+ update->set_timestamp(Clock::now().secs());
+ update->set_uuid(UUID::random().toBytes());
+ send(framework->pid, message);
+
+ return;
+ }
+
Task* task = framework->getTask(taskId);
if (task == NULL) {
// TODO(bmahler): Per MESOS-1200, if we knew the SlaveID here we
@@ -2778,6 +3031,13 @@ void Master::reconcileTasks(
continue;
}
+ if (framework->pendingTasks.contains(status.task_id())) {
+ LOG(WARNING) << "Status for task " << status.task_id()
+ << " from framework " << frameworkId
+ << " is unknown since the task is pending";
+ continue;
+ }
+
Option<StatusUpdate> update;
// Check for a removed slave (case 1).
@@ -3052,73 +3312,6 @@ vector<Framework*> Master::getActiveFrameworks() const
}
-Resources Master::launchTask(const TaskInfo& task,
- Framework* framework,
- Slave* slave)
-{
- CHECK_NOTNULL(framework);
- CHECK_NOTNULL(slave);
-
- Resources resources; // Total resources used on slave by launching this task.
-
- // Determine if this task launches an executor, and if so make sure
- // the slave and framework state has been updated accordingly.
- Option<ExecutorID> executorId;
-
- if (task.has_executor()) {
- // TODO(benh): Refactor this code into Slave::addTask.
- if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
- CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
- << "Executor " << task.executor().executor_id()
- << " known to the framework " << framework->id
- << " but unknown to the slave " << *slave;
-
- slave->addExecutor(framework->id, task.executor());
- framework->addExecutor(slave->id, task.executor());
- resources += task.executor().resources();
- }
-
- executorId = task.executor().executor_id();
- }
-
- // Add the task to the framework and slave.
- Task* t = new Task();
- t->mutable_framework_id()->MergeFrom(framework->id);
- t->set_state(TASK_STAGING);
- t->set_name(task.name());
- t->mutable_task_id()->MergeFrom(task.task_id());
- t->mutable_slave_id()->MergeFrom(task.slave_id());
- t->mutable_resources()->MergeFrom(task.resources());
-
- if (executorId.isSome()) {
- t->mutable_executor_id()->MergeFrom(executorId.get());
- }
-
- framework->addTask(t);
-
- slave->addTask(t);
-
- resources += task.resources();
-
- // Tell the slave to launch the task!
- LOG(INFO) << "Launching task " << task.task_id()
- << " of framework " << framework->id
- << " with resources " << task.resources()
- << " on slave " << *slave;
-
- RunTaskMessage message;
- message.mutable_framework()->MergeFrom(framework->info);
- message.mutable_framework_id()->MergeFrom(framework->id);
- message.set_pid(framework->pid);
- message.mutable_task()->MergeFrom(task);
- send(slave->pid, message);
-
- stats.tasks[TASK_STAGING]++;
-
- return resources;
-}
-
-
// NOTE: This function is only called when the slave re-registers
// with a master that already knows about it (i.e., not a failed
// over master).
@@ -3350,6 +3543,9 @@ void Master::removeFramework(Framework* framework)
send(slave->pid, message);
}
+ // Remove the pending tasks from the framework.
+ framework->pendingTasks.clear();
+
// Remove pointers to the framework's tasks in slaves.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
Slave* slave = getSlave(task->slave_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 75f0d49..0c68a5b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -86,10 +86,10 @@ class SlaveObserver;
class WhitelistWatcher;
struct Framework;
-struct Slave;
-struct Role;
struct OfferVisitor;
-
+struct Role;
+struct Slave;
+struct TaskInfoVisitor;
class Master : public ProtobufProcess<Master>
{
@@ -310,11 +310,27 @@ protected:
const std::vector<StatusUpdate>& updates,
const process::Future<bool>& removed);
- // Launch a task from a task description, and returned the consumed
- // resources for the task and possibly it's executor.
- Resources launchTask(const TaskInfo& task,
- Framework* framework,
- Slave* slave);
+ // Validates the task including authorization.
+ // Returns None if the task is valid.
+ // Returns Error if the task is invalid.
+ // Returns Failure if authorization returns 'Failure'.
+ process::Future<Option<Error> > validateTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave,
+ const Resources& totalResources);
+
+ // Launch a task from a task description.
+ void launchTask(const TaskInfo& task, Framework* framework, Slave* slave);
+
+ // 'launchTasks()' continuation.
+ void _launchTasks(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const std::vector<TaskInfo>& tasks,
+ const Resources& totalResources,
+ const Filters& filters,
+ const process::Future<std::list<process::Future<Option<Error> > > >& f);
// Remove a task.
void removeTask(Task* task);
@@ -901,6 +917,10 @@ struct Framework
process::Time reregisteredTime;
process::Time unregisteredTime;
+ // Tasks that have not yet been launched because they are being
+ // validated (e.g., authorized).
+ hashmap<TaskID, TaskInfo> pendingTasks;
+
hashmap<TaskID, Task*> tasks;
// NOTE: We use a shared pointer for Task because clang doesn't like
http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
new file mode 100644
index 0000000..17debaf
--- /dev/null
+++ b/src/tests/master_authorization_tests.cpp
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/try.hpp>
+
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+
+
+class MasterAuthorizationTest : public MesosTest {};
+
+
+// This test verifies that an authorized task launch is successful.
+TEST_F(MasterAuthorizationTest, AuthorizedTask)
+{
+ // Setup ACLs so that the framework can only launch tasks as "foo".
+ ACLs acls;
+ acls.set_permissive(false);
+ mesos::ACL::RunTasks* acl = acls.add_run_tasks();
+ acl->mutable_principals()->add_values(DEFAULT_FRAMEWORK_INFO.principal());
+ acl->mutable_users()->add_values("foo");
+
+ master::Flags flags = CreateMasterFlags();
+ flags.acls = JSON::Protobuf(acls);
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ // Create an authorized executor.
+ ExecutorInfo executor; // Bug in gcc 4.1.*, must assign on next line.
+ executor = CREATE_EXECUTOR_INFO("test-executor", "exit 1");
+ executor.mutable_command()->set_user("foo");
+
+ MockExecutor exec(executor.executor_id());
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // Create an authorized task.
+ TaskInfo task;
+ task.set_name("test");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(executor);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that an unauthorized task launch is rejected.
+TEST_F(MasterAuthorizationTest, UnauthorizedTask)
+{
+ // Setup ACLs so that no framework can launch as "foo".
+ ACLs acls;
+ mesos::ACL::RunTasks* acl = acls.add_run_tasks();
+ acl->mutable_principals()->set_type(mesos::ACL::Entity::NONE);
+ acl->mutable_users()->add_values("foo");
+
+ master::Flags flags = CreateMasterFlags();
+ flags.acls = JSON::Protobuf(acls);
+
+ Try<PID<Master> > master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ // Create an unauthorized executor.
+ ExecutorInfo executor; // Bug in gcc 4.1.*, must assign on next line.
+ executor = CREATE_EXECUTOR_INFO("test-executor", "exit 1");
+ executor.mutable_command()->set_user("foo");
+
+ MockExecutor exec(executor.executor_id());
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // Create an unauthorized task.
+ TaskInfo task;
+ task.set_name("test");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(executor);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_LOST, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}