You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/03/15 00:03:55 UTC

[1/2] mesos git commit: Check task user before allowing a task to be launched on the agent.

Repository: mesos
Updated Branches:
  refs/heads/master 4c7f33baf -> c7817fb46


Check task user before allowing a task to be launched on the agent.

Added support for action `run_tasks` on the agent's flag `acl`. Based on
the ACL configured for `run_tasks`, a task to be launched on the agent
can be (dis)allowed to launch on the agent.
If a task or task group cannot be launched due to failed authorization,
a `TASK_ERROR` status update shall be sent with a reason code of
`REASON_TASK_UNAUTHORIZED` or `REASON_TASK_GROUP_UNAUTHORIZED` as
applicable.

Note that in case of a task group, all tasks fail if any of the tasks
within the task group encounter the authorization error.

Review: https://reviews.apache.org/r/55887/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9c3e4c70
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9c3e4c70
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9c3e4c70

Branch: refs/heads/master
Commit: 9c3e4c709f1f3f93f673e2764daf9bedbea375dd
Parents: 4c7f33b
Author: Anindya Sinha <an...@apple.com>
Authored: Tue Mar 14 16:34:57 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Mar 14 17:00:15 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                     | 347 +++++++++++++++++++++++----
 src/slave/slave.hpp                     |  27 ++-
 src/tests/slave_authorization_tests.cpp | 110 +++++++++
 3 files changed, 429 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9c3e4c70/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2308d5b..425d33e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1774,6 +1774,8 @@ void Slave::_run(
     const Option<TaskInfo>& task,
     const Option<TaskGroupInfo>& taskGroup)
 {
+  // TODO(anindya_sinha): Consider refactoring the initial steps common
+  // to `_run()` and `__run()`.
   CHECK_NE(task.isSome(), taskGroup.isSome())
     << "Either task or task group should be set but not both";
 
@@ -1787,10 +1789,6 @@ void Slave::_run(
   }
 
   const FrameworkID& frameworkId = frameworkInfo.id();
-
-  LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup)
-            << " for framework " << frameworkId;
-
   Framework* framework = getFramework(frameworkId);
   if (framework == nullptr) {
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
@@ -1801,29 +1799,34 @@ void Slave::_run(
 
   const ExecutorID& executorId = executorInfo.executor_id();
 
-  // Remove the task/task group from being pending. If any of the
-  // tasks in the task group have been killed in the interim, we
-  // send a TASK_KILLED for all the other tasks in the group.
+  // We don't send a status update here because a terminating
+  // framework cannot send acknowledgements.
+  if (framework->state == Framework::TERMINATING) {
+    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+                 << " of framework " << frameworkId
+                 << " because the framework is terminating";
+
+    // Although we cannot send a status update in this case, we remove
+    // the affected tasks from the pending tasks.
+    foreach (const TaskInfo& _task, tasks) {
+      framework->removePendingTask(_task, executorInfo);
+    }
+
+    if (framework->executors.empty() && framework->pending.empty()) {
+      removeFramework(framework);
+    }
+
+    return;
+  }
+
+  // If any of the tasks in the task group have been killed in the interim,
+  // we send a TASK_KILLED for all the other tasks in the group.
   bool killed = false;
   foreach (const TaskInfo& _task, tasks) {
-    if (framework->pending.contains(executorId) &&
-        framework->pending[executorId].contains(_task.task_id())) {
-      framework->pending[executorId].erase(_task.task_id());
-      if (framework->pending[executorId].empty()) {
-        framework->pending.erase(executorId);
-        // NOTE: Ideally we would perform the following check here:
-        //
-        //   if (framework->executors.empty() &&
-        //       framework->pending.empty()) {
-        //     removeFramework(framework);
-        //   }
-        //
-        // However, we need 'framework' to stay valid for the rest of
-        // this function. As such, we perform the check before each of
-        // the 'return' statements below.
-      }
-    } else {
+    if (!framework->pending.contains(executorId) ||
+        !framework->pending.at(executorId).contains(_task.task_id())) {
       killed = true;
+      break;
     }
   }
 
@@ -1833,6 +1836,8 @@ void Slave::_run(
                  << " because it has been killed in the meantime";
 
     foreach (const TaskInfo& _task, tasks) {
+      framework->removePendingTask(_task, executorInfo);
+
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
           info.id(),
@@ -1841,11 +1846,14 @@ void Slave::_run(
           TaskStatus::SOURCE_SLAVE,
           UUID::random(),
           "Task killed before it was launched");
+
+      // TODO(vinod): Ensure that the status update manager reliably
+      // delivers this update. Currently, we don't guarantee this
+      // because removal of the framework causes the status update
+      // manager to stop retrying for its un-acked updates.
       statusUpdate(update, UPID());
     }
 
-    // Refer to the comment after 'framework->pending.erase' above
-    // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
     }
@@ -1853,21 +1861,7 @@ void Slave::_run(
     return;
   }
 
-  // We don't send a status update here because a terminating
-  // framework cannot send acknowledgements.
-  if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-                 << " of framework " << frameworkId
-                 << " because the framework is terminating";
-
-    // Refer to the comment after 'framework->pending.erase' above
-    // for why we need this.
-    if (framework->executors.empty() && framework->pending.empty()) {
-      removeFramework(framework);
-    }
-
-    return;
-  }
+  CHECK(!future.isDiscarded());
 
   if (!future.isReady()) {
     LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
@@ -1883,6 +1877,8 @@ void Slave::_run(
     }
 
     foreach (const TaskInfo& _task, tasks) {
+      framework->removePendingTask(_task, executorInfo);
+
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
           info.id(),
@@ -1901,7 +1897,197 @@ void Slave::_run(
       statusUpdate(update, UPID());
     }
 
-    // Refer to the comment after 'framework->pending.erase' above
+    if (framework->executors.empty() && framework->pending.empty()) {
+      removeFramework(framework);
+    }
+
+    return;
+  }
+
+  // Authorize the task or tasks (as in a task group) to ensure that the
+  // task user is allowed to launch tasks on the agent. If authorization
+  // fails, the task (or all tasks in a task group) are not launched.
+  list<Future<bool>> authorizations;
+
+  LOG(INFO) << "Authorizing " << taskOrTaskGroup(task, taskGroup)
+            << " for framework " << frameworkId;
+
+  foreach (const TaskInfo& _task, tasks) {
+    authorizations.push_back(authorizeTask(_task, frameworkInfo));
+  }
+
+  collect(authorizations)
+    .onAny(defer(self(),
+                 &Self::__run,
+                 lambda::_1,
+                 frameworkInfo,
+                 executorInfo,
+                 task,
+                 taskGroup));
+}
+
+
+void Slave::__run(
+    const Future<list<bool>>& future,
+    const FrameworkInfo& frameworkInfo,
+    const ExecutorInfo& executorInfo,
+    const Option<TaskInfo>& task,
+    const Option<TaskGroupInfo>& taskGroup)
+{
+  CHECK_NE(task.isSome(), taskGroup.isSome())
+    << "Either task or task group should be set but not both";
+
+  vector<TaskInfo> tasks;
+  if (task.isSome()) {
+    tasks.push_back(task.get());
+  } else {
+    foreach (const TaskInfo& _task, taskGroup->tasks()) {
+      tasks.push_back(_task);
+    }
+  }
+
+  const FrameworkID& frameworkId = frameworkInfo.id();
+  Framework* framework = getFramework(frameworkId);
+  if (framework == nullptr) {
+    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+                 << " because the framework " << frameworkId
+                 << " does not exist";
+    return;
+  }
+
+  const ExecutorID& executorId = executorInfo.executor_id();
+
+  // We don't send a status update here because a terminating
+  // framework cannot send acknowledgements.
+  if (framework->state == Framework::TERMINATING) {
+    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+                 << " of framework " << frameworkId
+                 << " because the framework is terminating";
+
+    // Although we cannot send a status update in this case, we remove
+    // the affected tasks from the list of pending tasks.
+    foreach (const TaskInfo& _task, tasks) {
+      framework->removePendingTask(_task, executorInfo);
+    }
+
+    if (framework->executors.empty() && framework->pending.empty()) {
+      removeFramework(framework);
+    }
+
+    return;
+  }
+
+  // Remove the task/task group from being pending. If any of the
+  // tasks in the task group have been killed in the interim, we
+  // send a TASK_KILLED for all the other tasks in the group.
+  bool killed = false;
+  foreach (const TaskInfo& _task, tasks) {
+    if (framework->removePendingTask(_task, executorInfo)) {
+      // NOTE: Ideally we would perform the following check here:
+      //
+      //   if (framework->executors.empty() &&
+      //       framework->pending.empty()) {
+      //     removeFramework(framework);
+      //   }
+      //
+      // However, we need 'framework' to stay valid for the rest of
+      // this function. As such, we perform the check before each of
+      // the 'return' statements below.
+    } else {
+      killed = true;
+    }
+  }
+
+  if (killed) {
+    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+                 << " of framework " << frameworkId
+                 << " because it has been killed in the meantime";
+
+    foreach (const TaskInfo& _task, tasks) {
+      const StatusUpdate update = protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          _task.task_id(),
+          TASK_KILLED,
+          TaskStatus::SOURCE_SLAVE,
+          UUID::random(),
+          "Task killed before it was launched");
+      statusUpdate(update, UPID());
+    }
+
+    // Refer to the comment after 'framework->removePendingTask' above
+    // for why we need this.
+    if (framework->executors.empty() && framework->pending.empty()) {
+      removeFramework(framework);
+    }
+
+    return;
+  }
+
+  CHECK(!future.isDiscarded());
+
+  // Validate that the task (or tasks in case of task group) are authorized
+  // to be run on this agent.
+  Option<Error> error = None();
+  if (!future.isReady()) {
+    error = Error("Failed to authorize " + taskOrTaskGroup(task, taskGroup) +
+                  ": " + future.failure());
+  }
+
+  if (error.isNone()) {
+    list<bool> authorizations = future.get();
+
+    foreach (const TaskInfo& _task, tasks) {
+      bool authorized = authorizations.front();
+      authorizations.pop_front();
+
+      // If authorization for this task fails, we fail all tasks (in case of
+      // a task group) with this specific error.
+      if (!authorized) {
+        string user = frameworkInfo.user();
+
+        if (_task.has_command() && _task.command().has_user()) {
+          user = _task.command().user();
+        } else if (executorInfo.has_command() &&
+                   executorInfo.command().has_user()) {
+          user = executorInfo.command().user();
+        }
+
+        error = Error("Task '" + stringify(_task.task_id()) + "'"
+                      " is not authorized to launch as"
+                      " user '" + user + "'");
+
+        break;
+      }
+    }
+  }
+
+  // For failed authorization, we send a TASK_ERROR status update for
+  // all tasks.
+  if (error.isSome()) {
+    const TaskStatus::Reason reason = task.isSome()
+      ? TaskStatus::REASON_TASK_UNAUTHORIZED
+      : TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+
+    LOG(ERROR) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+               << " of framework " << frameworkId
+               << ": " << error->message;
+
+    foreach (const TaskInfo& _task, tasks) {
+      const StatusUpdate update = protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          _task.task_id(),
+          TASK_ERROR,
+          TaskStatus::SOURCE_SLAVE,
+          UUID::random(),
+          error->message,
+          reason);
+
+      statusUpdate(update, UPID());
+    }
+
+    // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
@@ -1910,6 +2096,9 @@ void Slave::_run(
     return;
   }
 
+  LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup)
+            << " for framework " << frameworkId;
+
   auto unallocated = [](const Resources& resources) {
     Resources result = resources;
     result.unallocate();
@@ -1967,7 +2156,7 @@ void Slave::_run(
       statusUpdate(update, UPID());
     }
 
-    // Refer to the comment after 'framework->pending.erase' above
+    // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
@@ -2020,7 +2209,7 @@ void Slave::_run(
       statusUpdate(update, UPID());
     }
 
-    // Refer to the comment after 'framework->pending.erase' above
+    // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
@@ -2039,7 +2228,7 @@ void Slave::_run(
                  << " of framework " << frameworkId
                  << " because the agent is terminating";
 
-    // Refer to the comment after 'framework->pending.erase' above
+    // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
     if (framework->executors.empty() && framework->pending.empty()) {
       removeFramework(framework);
@@ -2158,7 +2347,7 @@ void Slave::_run(
 
       containerizer->update(executor->containerId, resources)
         .onAny(defer(self(),
-                     &Self::__run,
+                     &Self::___run,
                      lambda::_1,
                      frameworkId,
                      executorId,
@@ -2183,7 +2372,7 @@ void Slave::_run(
 }
 
 
-void Slave::__run(
+void Slave::___run(
     const Future<Nothing>& future,
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
@@ -3314,7 +3503,7 @@ void Slave::subscribe(
 
       containerizer->update(executor->containerId, resources)
         .onAny(defer(self(),
-                     &Self::__run,
+                     &Self::___run,
                      lambda::_1,
                      framework->id(),
                      executor->id,
@@ -3536,7 +3725,7 @@ void Slave::registerExecutor(
 
       containerizer->update(executor->containerId, resources)
         .onAny(defer(self(),
-                     &Self::__run,
+                     &Self::___run,
                      lambda::_1,
                      frameworkId,
                      executorId,
@@ -6150,6 +6339,45 @@ double Slave::_executor_directory_max_allowed_age_secs()
 }
 
 
+// As a principle, we do not need to re-authorize actions that have already
+// been authorized by the master. However, we re-authorize the RUN_TASK action
+// on the agent even though the master has already authorized it because:
+// a) in cases where hosts have heterogeneous user-account configurations,
+//    it makes sense to set the ACL on the agent instead of on the master
+// b) compared to other actions such as killing a task and shutting down a
+//    framework, it's a greater security risk if malicious tasks are launched
+//    as a superuser on the agent.
+Future<bool> Slave::authorizeTask(
+    const TaskInfo& task,
+    const FrameworkInfo& frameworkInfo)
+{
+  if (authorizer.isNone()) {
+    return true;
+  }
+
+  // Authorize the task.
+  authorization::Request request;
+
+  if (frameworkInfo.has_principal()) {
+    request.mutable_subject()->set_value(frameworkInfo.principal());
+  }
+
+  request.set_action(authorization::RUN_TASK);
+
+  authorization::Object* object = request.mutable_object();
+
+  object->mutable_task_info()->CopyFrom(task);
+  object->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+  LOG(INFO)
+    << "Authorizing framework principal '"
+    << (frameworkInfo.has_principal() ? frameworkInfo.principal() : "ANY")
+    << "' to launch task " << task.task_id();
+
+  return authorizer.get()->authorized(request);
+}
+
+
 Future<bool> Slave::authorizeLogAccess(const Option<Principal>& principal)
 {
   if (authorizer.isNone()) {
@@ -6636,6 +6864,27 @@ Executor* Framework::getExecutor(const TaskID& taskId) const
 }
 
 
+// Return `true` if `task` was a pending task of this framework
+// before the removal; `false` otherwise.
+bool Framework::removePendingTask(
+    const TaskInfo& task,
+    const ExecutorInfo& executorInfo)
+{
+  const ExecutorID executorId = executorInfo.executor_id();
+
+  if (pending.contains(executorId) &&
+      pending.at(executorId).contains(task.task_id())) {
+    pending.at(executorId).erase(task.task_id());
+    if (pending.at(executorId).empty()) {
+      pending.erase(executorId);
+    }
+    return true;
+  }
+
+  return false;
+}
+
+
 Executor* Slave::getExecutor(const ContainerID& containerId) const
 {
   const ContainerID rootContainerId = protobuf::getRootContainerId(containerId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/9c3e4c70/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 3ff9281..e2de66c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -150,11 +150,11 @@ public:
 
   // Made 'virtual' for Slave mocking.
   virtual void _run(
-    const process::Future<bool>& future,
-    const FrameworkInfo& frameworkInfo,
-    const ExecutorInfo& executorInfo,
-    const Option<TaskInfo>& task,
-    const Option<TaskGroupInfo>& taskGroup);
+      const process::Future<bool>& future,
+      const FrameworkInfo& frameworkInfo,
+      const ExecutorInfo& executorInfo,
+      const Option<TaskInfo>& task,
+      const Option<TaskGroupInfo>& taskGroup);
 
   // Made 'virtual' for Slave mocking.
   virtual void runTaskGroup(
@@ -338,11 +338,18 @@ public:
   virtual void finalize();
   virtual void exited(const process::UPID& pid);
 
+  void __run(
+      const process::Future<std::list<bool>>& future,
+      const FrameworkInfo& frameworkInfo,
+      const ExecutorInfo& executorInfo,
+      const Option<TaskInfo>& task,
+      const Option<TaskGroupInfo>& taskGroup);
+
   // This is called when the resource limits of the container have
   // been updated for the given tasks and task groups. If the update is
   // successful, we flush the given tasks to the executor by sending
   // RunTaskMessages or `LAUNCH_GROUP` events.
-  void __run(
+  void ___run(
       const process::Future<Nothing>& future,
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
@@ -465,6 +472,10 @@ private:
   Try<Nothing> syncCheckpointedResources(
       const Resources& newCheckpointedResources);
 
+  process::Future<bool> authorizeTask(
+      const TaskInfo& task,
+      const FrameworkInfo& frameworkInfo);
+
   process::Future<bool> authorizeLogAccess(
       const Option<process::http::authentication::Principal>& principal);
 
@@ -1102,6 +1113,10 @@ struct Framework
 
   void checkpointFramework() const;
 
+  bool removePendingTask(
+      const TaskInfo& task,
+      const ExecutorInfo& executorInfo);
+
   const FrameworkID id() const { return info.id(); }
 
   enum State

http://git-wip-us.apache.org/repos/asf/mesos/blob/9c3e4c70/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 8998ff7..3657e0a 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -369,6 +369,116 @@ TYPED_TEST(SlaveAuthorizerTest, ViewFlags)
 }
 
 
+// This test verifies that a task is launched on the agent if the task
+// user is authorized based on `run_tasks` ACL configured on the agent
+// to only allow whitelisted users to run tasks on the agent.
+TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
+{
+  // Get the current user.
+  Result<string> user = os::user();
+  ASSERT_SOME(user) << "Failed to get the current user name"
+                    << (user.isError() ? ": " + user.error() : "");
+
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  // Start a slave with `bar` and the current user being the only authorized
+  // users to launch tasks on the agent.
+  ACLs acls;
+  acls.set_permissive(false); // Restrictive.
+  mesos::ACL::RunTask* acl = acls.add_run_tasks();
+  acl->mutable_principals()->set_type(ACL::Entity::ANY);
+  acl->mutable_users()->add_values("bar");
+  acl->mutable_users()->add_values(user.get());
+
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
+  slaveFlags.acls = acls;
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = this->StartSlave(
+      detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Create a framework with user `foo`.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_user("foo");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Framework is registered since the master admits frameworks of any user.
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  // Launch the first task with no user, so it defaults to the
+  // framework user `foo`.
+  TaskInfo task1 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:32").get(),
+      "sleep 1000");
+
+  // Launch the second task as the current user.
+  TaskInfo task2 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:32").get(),
+      "sleep 1000");
+  task2.mutable_command()->set_user(user.get());
+
+  // The first task should fail since the task user `foo` is not an
+  // authorized user that can launch a task. However, the second task
+  // should succeed.
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {LAUNCH({task1, task2})});
+
+  // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+  AWAIT_READY(status1);
+  AWAIT_READY(status2);
+
+  // Validate both the statuses. Note that the order of receiving the
+  // status updates for the 2 tasks is not deterministic.
+  hashmap<TaskID, TaskStatus> statuses {
+    {status1->task_id(), status1.get()},
+    {status2->task_id(), status2.get()}
+  };
+
+  ASSERT_TRUE(statuses.contains(task1.task_id()));
+  EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statuses.at(task1.task_id()).source());
+  EXPECT_EQ(TaskStatus::REASON_TASK_UNAUTHORIZED,
+            statuses.at(task1.task_id()).reason());
+
+  ASSERT_TRUE(statuses.contains(task2.task_id()));
+  EXPECT_EQ(TASK_RUNNING, statuses.at(task2.task_id()).state());
+
+  driver.stop();
+  driver.join();
+}
+
+
 // Parameterized fixture for agent-specific authorization tests. The
 // path of the tested endpoint is passed as the only parameter.
 class SlaveEndpointTest:


[2/2] mesos git commit: Test to ensure non-authorized users cannot launch tasks on agents.

Posted by ya...@apache.org.
Test to ensure non-authorized users cannot launch tasks on agents.

Review: https://reviews.apache.org/r/55888/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c7817fb4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c7817fb4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c7817fb4

Branch: refs/heads/master
Commit: c7817fb461b21aa3eaca5721521e22dbab01df24
Parents: 9c3e4c7
Author: Anindya Sinha <an...@apple.com>
Authored: Tue Mar 14 17:00:43 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Mar 14 17:00:43 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_authorization_tests.cpp | 110 +++++++++++++++++++++++++++
 1 file changed, 110 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c7817fb4/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 3657e0a..bf19336 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -479,6 +479,116 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
 }
 
 
+// This test verifies that a task is launched on the agent if the task
+// user is authorized based on `run_tasks` ACL configured on the agent
+// to only allow whitelisted users to run tasks on the agent.
+TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
+{
+  // Get the current user.
+  Result<string> user = os::user();
+  ASSERT_SOME(user) << "Failed to get the current user name"
+                    << (user.isError() ? ": " + user.error() : "");
+
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  // Start a slave with `bar` and the current user being the only authorized
+  // users to launch tasks on the agent.
+  ACLs acls;
+  acls.set_permissive(false); // Restrictive.
+  mesos::ACL::RunTask* acl = acls.add_run_tasks();
+  acl->mutable_principals()->set_type(ACL::Entity::ANY);
+  acl->mutable_users()->add_values("bar");
+  acl->mutable_users()->add_values(user.get());
+
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
+  slaveFlags.acls = acls;
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = this->StartSlave(
+      detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Create a framework with user `foo`.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_user("foo");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Framework is registered since the master admits frameworks of any user.
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  // Launch the first task with no user, so it defaults to the
+  // framework user `foo`.
+  TaskInfo task1 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:32").get(),
+      "sleep 1000");
+
+  // Launch the second task as the current user.
+  TaskInfo task2 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:32").get(),
+      "sleep 1000");
+  task2.mutable_command()->set_user(user.get());
+
+  // The first task should fail since the task user `foo` is not an
+  // authorized user that can launch a task. However, the second task
+  // should succeed.
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {LAUNCH({task1, task2})});
+
+  // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+  AWAIT_READY(status1);
+  AWAIT_READY(status2);
+
+  // Validate both the statuses. Note that the order of receiving the
+  // status updates for the 2 tasks is not deterministic.
+  hashmap<TaskID, TaskStatus> statuses {
+    {status1->task_id(), status1.get()},
+    {status2->task_id(), status2.get()}
+  };
+
+  ASSERT_TRUE(statuses.contains(task1.task_id()));
+  EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statuses.at(task1.task_id()).source());
+  EXPECT_EQ(TaskStatus::REASON_TASK_UNAUTHORIZED,
+            statuses.at(task1.task_id()).reason());
+
+  ASSERT_TRUE(statuses.contains(task2.task_id()));
+  EXPECT_EQ(TASK_RUNNING, statuses.at(task2.task_id()).state());
+
+  driver.stop();
+  driver.join();
+}
+
+
 // Parameterized fixture for agent-specific authorization tests. The
 // path of the tested endpoint is passed as the only parameter.
 class SlaveEndpointTest: