You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/03/15 00:03:55 UTC
[1/2] mesos git commit: Check task user before allowing a task to be
launched on the agent.
Repository: mesos
Updated Branches:
refs/heads/master 4c7f33baf -> c7817fb46
Check task user before allowing a task to be launched on the agent.
Added support for action `run_tasks` on the agent's flag `acl`. Based on
the ACL configured for `run_tasks`, a task to be launched on the agent
can be (dis)allowed to launch on the agent.
If a task or task group cannot be launched due to failed authorization,
a `TASK_ERROR` status update shall be sent with a reason code of
`REASON_TASK_UNAUTHORIZED` or `REASON_TASK_GROUP_UNAUTHORIZED` as
applicable.
Note that in case of a task group, all tasks fail if any of the tasks
within the task group encounter the authorization error.
Review: https://reviews.apache.org/r/55887/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9c3e4c70
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9c3e4c70
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9c3e4c70
Branch: refs/heads/master
Commit: 9c3e4c709f1f3f93f673e2764daf9bedbea375dd
Parents: 4c7f33b
Author: Anindya Sinha <an...@apple.com>
Authored: Tue Mar 14 16:34:57 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Mar 14 17:00:15 2017 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 347 +++++++++++++++++++++++----
src/slave/slave.hpp | 27 ++-
src/tests/slave_authorization_tests.cpp | 110 +++++++++
3 files changed, 429 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9c3e4c70/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2308d5b..425d33e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1774,6 +1774,8 @@ void Slave::_run(
const Option<TaskInfo>& task,
const Option<TaskGroupInfo>& taskGroup)
{
+ // TODO(anindya_sinha): Consider refactoring the initial steps common
+ // to `_run()` and `__run()`.
CHECK_NE(task.isSome(), taskGroup.isSome())
<< "Either task or task group should be set but not both";
@@ -1787,10 +1789,6 @@ void Slave::_run(
}
const FrameworkID& frameworkId = frameworkInfo.id();
-
- LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup)
- << " for framework " << frameworkId;
-
Framework* framework = getFramework(frameworkId);
if (framework == nullptr) {
LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
@@ -1801,29 +1799,34 @@ void Slave::_run(
const ExecutorID& executorId = executorInfo.executor_id();
- // Remove the task/task group from being pending. If any of the
- // tasks in the task group have been killed in the interim, we
- // send a TASK_KILLED for all the other tasks in the group.
+ // We don't send a status update here because a terminating
+ // framework cannot send acknowledgements.
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+ << " of framework " << frameworkId
+ << " because the framework is terminating";
+
+ // Although we cannot send a status update in this case, we remove
+ // the affected tasks from the pending tasks.
+ foreach (const TaskInfo& _task, tasks) {
+ framework->removePendingTask(_task, executorInfo);
+ }
+
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
+ return;
+ }
+
+ // If any of the tasks in the task group have been killed in the interim,
+ // we send a TASK_KILLED for all the other tasks in the group.
bool killed = false;
foreach (const TaskInfo& _task, tasks) {
- if (framework->pending.contains(executorId) &&
- framework->pending[executorId].contains(_task.task_id())) {
- framework->pending[executorId].erase(_task.task_id());
- if (framework->pending[executorId].empty()) {
- framework->pending.erase(executorId);
- // NOTE: Ideally we would perform the following check here:
- //
- // if (framework->executors.empty() &&
- // framework->pending.empty()) {
- // removeFramework(framework);
- // }
- //
- // However, we need 'framework' to stay valid for the rest of
- // this function. As such, we perform the check before each of
- // the 'return' statements below.
- }
- } else {
+ if (!framework->pending.contains(executorId) ||
+ !framework->pending.at(executorId).contains(_task.task_id())) {
killed = true;
+ break;
}
}
@@ -1833,6 +1836,8 @@ void Slave::_run(
<< " because it has been killed in the meantime";
foreach (const TaskInfo& _task, tasks) {
+ framework->removePendingTask(_task, executorInfo);
+
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
@@ -1841,11 +1846,14 @@ void Slave::_run(
TaskStatus::SOURCE_SLAVE,
UUID::random(),
"Task killed before it was launched");
+
+ // TODO(vinod): Ensure that the status update manager reliably
+ // delivers this update. Currently, we don't guarantee this
+ // because removal of the framework causes the status update
+ // manager to stop retrying for its un-acked updates.
statusUpdate(update, UPID());
}
- // Refer to the comment after 'framework->pending.erase' above
- // for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
}
@@ -1853,21 +1861,7 @@ void Slave::_run(
return;
}
- // We don't send a status update here because a terminating
- // framework cannot send acknowledgements.
- if (framework->state == Framework::TERMINATING) {
- LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
- << " of framework " << frameworkId
- << " because the framework is terminating";
-
- // Refer to the comment after 'framework->pending.erase' above
- // for why we need this.
- if (framework->executors.empty() && framework->pending.empty()) {
- removeFramework(framework);
- }
-
- return;
- }
+ CHECK(!future.isDiscarded());
if (!future.isReady()) {
LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
@@ -1883,6 +1877,8 @@ void Slave::_run(
}
foreach (const TaskInfo& _task, tasks) {
+ framework->removePendingTask(_task, executorInfo);
+
const StatusUpdate update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
@@ -1901,7 +1897,197 @@ void Slave::_run(
statusUpdate(update, UPID());
}
- // Refer to the comment after 'framework->pending.erase' above
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
+ return;
+ }
+
+ // Authorize the task or tasks (as in a task group) to ensure that the
+ // task user is allowed to launch tasks on the agent. If authorization
+ // fails, the task (or all tasks in a task group) are not launched.
+ list<Future<bool>> authorizations;
+
+ LOG(INFO) << "Authorizing " << taskOrTaskGroup(task, taskGroup)
+ << " for framework " << frameworkId;
+
+ foreach (const TaskInfo& _task, tasks) {
+ authorizations.push_back(authorizeTask(_task, frameworkInfo));
+ }
+
+ collect(authorizations)
+ .onAny(defer(self(),
+ &Self::__run,
+ lambda::_1,
+ frameworkInfo,
+ executorInfo,
+ task,
+ taskGroup));
+}
+
+
+void Slave::__run(
+ const Future<list<bool>>& future,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const Option<TaskInfo>& task,
+ const Option<TaskGroupInfo>& taskGroup)
+{
+ CHECK_NE(task.isSome(), taskGroup.isSome())
+ << "Either task or task group should be set but not both";
+
+ vector<TaskInfo> tasks;
+ if (task.isSome()) {
+ tasks.push_back(task.get());
+ } else {
+ foreach (const TaskInfo& _task, taskGroup->tasks()) {
+ tasks.push_back(_task);
+ }
+ }
+
+ const FrameworkID& frameworkId = frameworkInfo.id();
+ Framework* framework = getFramework(frameworkId);
+ if (framework == nullptr) {
+ LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+ << " because the framework " << frameworkId
+ << " does not exist";
+ return;
+ }
+
+ const ExecutorID& executorId = executorInfo.executor_id();
+
+ // We don't send a status update here because a terminating
+ // framework cannot send acknowledgements.
+ if (framework->state == Framework::TERMINATING) {
+ LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+ << " of framework " << frameworkId
+ << " because the framework is terminating";
+
+ // Although we cannot send a status update in this case, we remove
+ // the affected tasks from the list of pending tasks.
+ foreach (const TaskInfo& _task, tasks) {
+ framework->removePendingTask(_task, executorInfo);
+ }
+
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
+ return;
+ }
+
+ // Remove the task/task group from being pending. If any of the
+ // tasks in the task group have been killed in the interim, we
+ // send a TASK_KILLED for all the other tasks in the group.
+ bool killed = false;
+ foreach (const TaskInfo& _task, tasks) {
+ if (framework->removePendingTask(_task, executorInfo)) {
+ // NOTE: Ideally we would perform the following check here:
+ //
+ // if (framework->executors.empty() &&
+ // framework->pending.empty()) {
+ // removeFramework(framework);
+ // }
+ //
+ // However, we need 'framework' to stay valid for the rest of
+ // this function. As such, we perform the check before each of
+ // the 'return' statements below.
+ } else {
+ killed = true;
+ }
+ }
+
+ if (killed) {
+ LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+ << " of framework " << frameworkId
+ << " because it has been killed in the meantime";
+
+ foreach (const TaskInfo& _task, tasks) {
+ const StatusUpdate update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ _task.task_id(),
+ TASK_KILLED,
+ TaskStatus::SOURCE_SLAVE,
+ UUID::random(),
+ "Task killed before it was launched");
+ statusUpdate(update, UPID());
+ }
+
+ // Refer to the comment after 'framework->removePendingTask' above
+ // for why we need this.
+ if (framework->executors.empty() && framework->pending.empty()) {
+ removeFramework(framework);
+ }
+
+ return;
+ }
+
+ CHECK(!future.isDiscarded());
+
+ // Validate that the task (or tasks in case of task group) are authorized
+ // to be run on this agent.
+ Option<Error> error = None();
+ if (!future.isReady()) {
+ error = Error("Failed to authorize " + taskOrTaskGroup(task, taskGroup) +
+ ": " + future.failure());
+ }
+
+ if (error.isNone()) {
+ list<bool> authorizations = future.get();
+
+ foreach (const TaskInfo& _task, tasks) {
+ bool authorized = authorizations.front();
+ authorizations.pop_front();
+
+ // If authorization for this task fails, we fail all tasks (in case of
+ // a task group) with this specific error.
+ if (!authorized) {
+ string user = frameworkInfo.user();
+
+ if (_task.has_command() && _task.command().has_user()) {
+ user = _task.command().user();
+ } else if (executorInfo.has_command() &&
+ executorInfo.command().has_user()) {
+ user = executorInfo.command().user();
+ }
+
+ error = Error("Task '" + stringify(_task.task_id()) + "'"
+ " is not authorized to launch as"
+ " user '" + user + "'");
+
+ break;
+ }
+ }
+ }
+
+ // For failed authorization, we send a TASK_ERROR status update for
+ // all tasks.
+ if (error.isSome()) {
+ const TaskStatus::Reason reason = task.isSome()
+ ? TaskStatus::REASON_TASK_UNAUTHORIZED
+ : TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+
+ LOG(ERROR) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+ << " of framework " << frameworkId
+ << ": " << error->message;
+
+ foreach (const TaskInfo& _task, tasks) {
+ const StatusUpdate update = protobuf::createStatusUpdate(
+ frameworkId,
+ info.id(),
+ _task.task_id(),
+ TASK_ERROR,
+ TaskStatus::SOURCE_SLAVE,
+ UUID::random(),
+ error->message,
+ reason);
+
+ statusUpdate(update, UPID());
+ }
+
+ // Refer to the comment after 'framework->removePendingTask' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
@@ -1910,6 +2096,9 @@ void Slave::_run(
return;
}
+ LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup)
+ << " for framework " << frameworkId;
+
auto unallocated = [](const Resources& resources) {
Resources result = resources;
result.unallocate();
@@ -1967,7 +2156,7 @@ void Slave::_run(
statusUpdate(update, UPID());
}
- // Refer to the comment after 'framework->pending.erase' above
+ // Refer to the comment after 'framework->removePendingTask' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
@@ -2020,7 +2209,7 @@ void Slave::_run(
statusUpdate(update, UPID());
}
- // Refer to the comment after 'framework->pending.erase' above
+ // Refer to the comment after 'framework->removePendingTask' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
@@ -2039,7 +2228,7 @@ void Slave::_run(
<< " of framework " << frameworkId
<< " because the agent is terminating";
- // Refer to the comment after 'framework->pending.erase' above
+ // Refer to the comment after 'framework->removePendingTask' above
// for why we need this.
if (framework->executors.empty() && framework->pending.empty()) {
removeFramework(framework);
@@ -2158,7 +2347,7 @@ void Slave::_run(
containerizer->update(executor->containerId, resources)
.onAny(defer(self(),
- &Self::__run,
+ &Self::___run,
lambda::_1,
frameworkId,
executorId,
@@ -2183,7 +2372,7 @@ void Slave::_run(
}
-void Slave::__run(
+void Slave::___run(
const Future<Nothing>& future,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
@@ -3314,7 +3503,7 @@ void Slave::subscribe(
containerizer->update(executor->containerId, resources)
.onAny(defer(self(),
- &Self::__run,
+ &Self::___run,
lambda::_1,
framework->id(),
executor->id,
@@ -3536,7 +3725,7 @@ void Slave::registerExecutor(
containerizer->update(executor->containerId, resources)
.onAny(defer(self(),
- &Self::__run,
+ &Self::___run,
lambda::_1,
frameworkId,
executorId,
@@ -6150,6 +6339,45 @@ double Slave::_executor_directory_max_allowed_age_secs()
}
+// As a principle, we do not need to re-authorize actions that have already
+// been authorized by the master. However, we re-authorize the RUN_TASK action
+// on the agent even though the master has already authorized it because:
+// a) in cases where hosts have heterogeneous user-account configurations,
+// it makes sense to set the ACL on the agent instead of on the master
+// b) compared to other actions such as killing a task and shutting down a
+// framework, it's a greater security risk if malicious tasks are launched
+// as a superuser on the agent.
+Future<bool> Slave::authorizeTask(
+ const TaskInfo& task,
+ const FrameworkInfo& frameworkInfo)
+{
+ if (authorizer.isNone()) {
+ return true;
+ }
+
+ // Authorize the task.
+ authorization::Request request;
+
+ if (frameworkInfo.has_principal()) {
+ request.mutable_subject()->set_value(frameworkInfo.principal());
+ }
+
+ request.set_action(authorization::RUN_TASK);
+
+ authorization::Object* object = request.mutable_object();
+
+ object->mutable_task_info()->CopyFrom(task);
+ object->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+ LOG(INFO)
+ << "Authorizing framework principal '"
+ << (frameworkInfo.has_principal() ? frameworkInfo.principal() : "ANY")
+ << "' to launch task " << task.task_id();
+
+ return authorizer.get()->authorized(request);
+}
+
+
Future<bool> Slave::authorizeLogAccess(const Option<Principal>& principal)
{
if (authorizer.isNone()) {
@@ -6636,6 +6864,27 @@ Executor* Framework::getExecutor(const TaskID& taskId) const
}
+// Return `true` if `task` was a pending task of this framework
+// before the removal; `false` otherwise.
+bool Framework::removePendingTask(
+ const TaskInfo& task,
+ const ExecutorInfo& executorInfo)
+{
+ const ExecutorID executorId = executorInfo.executor_id();
+
+ if (pending.contains(executorId) &&
+ pending.at(executorId).contains(task.task_id())) {
+ pending.at(executorId).erase(task.task_id());
+ if (pending.at(executorId).empty()) {
+ pending.erase(executorId);
+ }
+ return true;
+ }
+
+ return false;
+}
+
+
Executor* Slave::getExecutor(const ContainerID& containerId) const
{
const ContainerID rootContainerId = protobuf::getRootContainerId(containerId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/9c3e4c70/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 3ff9281..e2de66c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -150,11 +150,11 @@ public:
// Made 'virtual' for Slave mocking.
virtual void _run(
- const process::Future<bool>& future,
- const FrameworkInfo& frameworkInfo,
- const ExecutorInfo& executorInfo,
- const Option<TaskInfo>& task,
- const Option<TaskGroupInfo>& taskGroup);
+ const process::Future<bool>& future,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const Option<TaskInfo>& task,
+ const Option<TaskGroupInfo>& taskGroup);
// Made 'virtual' for Slave mocking.
virtual void runTaskGroup(
@@ -338,11 +338,18 @@ public:
virtual void finalize();
virtual void exited(const process::UPID& pid);
+ void __run(
+ const process::Future<std::list<bool>>& future,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const Option<TaskInfo>& task,
+ const Option<TaskGroupInfo>& taskGroup);
+
// This is called when the resource limits of the container have
// been updated for the given tasks and task groups. If the update is
// successful, we flush the given tasks to the executor by sending
// RunTaskMessages or `LAUNCH_GROUP` events.
- void __run(
+ void ___run(
const process::Future<Nothing>& future,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
@@ -465,6 +472,10 @@ private:
Try<Nothing> syncCheckpointedResources(
const Resources& newCheckpointedResources);
+ process::Future<bool> authorizeTask(
+ const TaskInfo& task,
+ const FrameworkInfo& frameworkInfo);
+
process::Future<bool> authorizeLogAccess(
const Option<process::http::authentication::Principal>& principal);
@@ -1102,6 +1113,10 @@ struct Framework
void checkpointFramework() const;
+ bool removePendingTask(
+ const TaskInfo& task,
+ const ExecutorInfo& executorInfo);
+
const FrameworkID id() const { return info.id(); }
enum State
http://git-wip-us.apache.org/repos/asf/mesos/blob/9c3e4c70/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 8998ff7..3657e0a 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -369,6 +369,116 @@ TYPED_TEST(SlaveAuthorizerTest, ViewFlags)
}
+// This test verifies that a task is launched on the agent if the task
+// user is authorized based on `run_tasks` ACL configured on the agent
+// to only allow whitelisted users to run tasks on the agent.
+TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
+{
+ // Get the current user.
+ Result<string> user = os::user();
+ ASSERT_SOME(user) << "Failed to get the current user name"
+ << (user.isError() ? ": " + user.error() : "");
+
+ Try<Owned<cluster::Master>> master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ // Start a slave with `bar` and the current user being the only authorized
+ // users to launch tasks on the agent.
+ ACLs acls;
+ acls.set_permissive(false); // Restrictive.
+ mesos::ACL::RunTask* acl = acls.add_run_tasks();
+ acl->mutable_principals()->set_type(ACL::Entity::ANY);
+ acl->mutable_users()->add_values("bar");
+ acl->mutable_users()->add_values(user.get());
+
+ slave::Flags slaveFlags = this->CreateSlaveFlags();
+ slaveFlags.acls = acls;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = this->StartSlave(
+ detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Create a framework with user `foo`.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_user("foo");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ // Framework is registered since the master admits frameworks of any user.
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ // Launch the first task with no user, so it defaults to the
+ // framework user `foo`.
+ TaskInfo task1 = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:32").get(),
+ "sleep 1000");
+
+ // Launch the second task as the current user.
+ TaskInfo task2 = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:32").get(),
+ "sleep 1000");
+ task2.mutable_command()->set_user(user.get());
+
+ // The first task should fail since the task user `foo` is not an
+ // authorized user that can launch a task. However, the second task
+ // should succeed.
+ Future<TaskStatus> status1;
+ Future<TaskStatus> status2;
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status1))
+ .WillOnce(FutureArg<1>(&status2));
+
+ driver.acceptOffers(
+ {offer.id()},
+ {LAUNCH({task1, task2})});
+
+ // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+ AWAIT_READY(status1);
+ AWAIT_READY(status2);
+
+ // Validate both the statuses. Note that the order of receiving the
+ // status updates for the 2 tasks is not deterministic.
+ hashmap<TaskID, TaskStatus> statuses {
+ {status1->task_id(), status1.get()},
+ {status2->task_id(), status2.get()}
+ };
+
+ ASSERT_TRUE(statuses.contains(task1.task_id()));
+ EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statuses.at(task1.task_id()).source());
+ EXPECT_EQ(TaskStatus::REASON_TASK_UNAUTHORIZED,
+ statuses.at(task1.task_id()).reason());
+
+ ASSERT_TRUE(statuses.contains(task2.task_id()));
+ EXPECT_EQ(TASK_RUNNING, statuses.at(task2.task_id()).state());
+
+ driver.stop();
+ driver.join();
+}
+
+
// Parameterized fixture for agent-specific authorization tests. The
// path of the tested endpoint is passed as the only parameter.
class SlaveEndpointTest:
[2/2] mesos git commit: Test to ensure non-authorized users cannot
launch tasks on agents.
Posted by ya...@apache.org.
Test to ensure non-authorized users cannot launch tasks on agents.
Review: https://reviews.apache.org/r/55888/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c7817fb4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c7817fb4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c7817fb4
Branch: refs/heads/master
Commit: c7817fb461b21aa3eaca5721521e22dbab01df24
Parents: 9c3e4c7
Author: Anindya Sinha <an...@apple.com>
Authored: Tue Mar 14 17:00:43 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Mar 14 17:00:43 2017 -0700
----------------------------------------------------------------------
src/tests/slave_authorization_tests.cpp | 110 +++++++++++++++++++++++++++
1 file changed, 110 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c7817fb4/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 3657e0a..bf19336 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -479,6 +479,116 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
}
+// This test verifies that a task is launched on the agent if the task
+// user is authorized based on `run_tasks` ACL configured on the agent
+// to only allow whitelisted users to run tasks on the agent.
+TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
+{
+ // Get the current user.
+ Result<string> user = os::user();
+ ASSERT_SOME(user) << "Failed to get the current user name"
+ << (user.isError() ? ": " + user.error() : "");
+
+ Try<Owned<cluster::Master>> master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ // Start a slave with `bar` and the current user being the only authorized
+ // users to launch tasks on the agent.
+ ACLs acls;
+ acls.set_permissive(false); // Restrictive.
+ mesos::ACL::RunTask* acl = acls.add_run_tasks();
+ acl->mutable_principals()->set_type(ACL::Entity::ANY);
+ acl->mutable_users()->add_values("bar");
+ acl->mutable_users()->add_values(user.get());
+
+ slave::Flags slaveFlags = this->CreateSlaveFlags();
+ slaveFlags.acls = acls;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = this->StartSlave(
+ detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Create a framework with user `foo`.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_user("foo");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ // Framework is registered since the master admits frameworks of any user.
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ // Launch the first task with no user, so it defaults to the
+ // framework user `foo`.
+ TaskInfo task1 = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:32").get(),
+ "sleep 1000");
+
+ // Launch the second task as the current user.
+ TaskInfo task2 = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:32").get(),
+ "sleep 1000");
+ task2.mutable_command()->set_user(user.get());
+
+ // The first task should fail since the task user `foo` is not an
+ // authorized user that can launch a task. However, the second task
+ // should succeed.
+ Future<TaskStatus> status1;
+ Future<TaskStatus> status2;
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status1))
+ .WillOnce(FutureArg<1>(&status2));
+
+ driver.acceptOffers(
+ {offer.id()},
+ {LAUNCH({task1, task2})});
+
+ // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+ AWAIT_READY(status1);
+ AWAIT_READY(status2);
+
+ // Validate both the statuses. Note that the order of receiving the
+ // status updates for the 2 tasks is not deterministic.
+ hashmap<TaskID, TaskStatus> statuses {
+ {status1->task_id(), status1.get()},
+ {status2->task_id(), status2.get()}
+ };
+
+ ASSERT_TRUE(statuses.contains(task1.task_id()));
+ EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statuses.at(task1.task_id()).source());
+ EXPECT_EQ(TaskStatus::REASON_TASK_UNAUTHORIZED,
+ statuses.at(task1.task_id()).reason());
+
+ ASSERT_TRUE(statuses.contains(task2.task_id()));
+ EXPECT_EQ(TASK_RUNNING, statuses.at(task2.task_id()).state());
+
+ driver.stop();
+ driver.join();
+}
+
+
// Parameterized fixture for agent-specific authorization tests. The
// path of the tested endpoint is passed as the only parameter.
class SlaveEndpointTest: