You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/06/11 01:33:47 UTC

[2/7] git commit: Authorized launch tasks.

Authorized launch tasks.

Review: https://reviews.apache.org/r/22151


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a6c4ee70
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a6c4ee70
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a6c4ee70

Branch: refs/heads/vinod/authorize_tasks
Commit: a6c4ee70262380c3ecb3c43c8eee0108f6b224eb
Parents: b73ea62
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed May 28 16:45:22 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 10 16:33:36 2014 -0700

----------------------------------------------------------------------
 src/Makefile.am                               |   1 +
 src/common/protobuf_utils.hpp                 |   2 +
 src/master/hierarchical_allocator_process.hpp |   1 -
 src/master/master.cpp                         | 474 +++++++++++++++------
 src/master/master.hpp                         |  36 +-
 src/tests/master_authorization_tests.cpp      | 209 +++++++++
 6 files changed, 575 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 1d49dca..c91b438 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -975,6 +975,7 @@ mesos_tests_SOURCES =				\
   tests/log_tests.cpp				\
   tests/logging_tests.cpp			\
   tests/main.cpp				\
+  tests/master_authorization_tests.cpp		\
   tests/master_contender_detector_tests.cpp	\
   tests/master_tests.cpp			\
   tests/mesos.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 0f65341..12ff00a 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -48,6 +48,8 @@ inline bool isTerminalState(const TaskState& state)
 }
 
 
+// TODO(vinod): Make SlaveID optional because 'StatusUpdate.SlaveID'
+// is optional.
 inline StatusUpdate createStatusUpdate(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 0c5e2e0..1765e70 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -553,7 +553,6 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesUnused(
   // result of a valid task launch by an active
   // framework that doesn't use the entire offer.
   CHECK(frameworks.contains(frameworkId));
-
   const std::string& role = frameworks[frameworkId].role();
   sorters[role]->unallocated(frameworkId.value(), resources);
   sorters[role]->remove(resources);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index df75c8a..1a2b219 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -25,6 +25,8 @@
 #include <list>
 #include <sstream>
 
+#include <process/check.hpp>
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/id.hpp>
@@ -68,6 +70,7 @@ using std::list;
 using std::string;
 using std::vector;
 
+using process::await;
 using process::wait; // Necessary on some OS's to disambiguate.
 using process::Clock;
 using process::Failure;
@@ -576,6 +579,9 @@ void Master::finalize()
   // allocator or the roles because it is unnecessary bookkeeping at
   // this point since we are shutting down.
   foreachvalue (Framework* framework, frameworks.activated) {
+    // Remove pending tasks from the framework.
+    framework->pendingTasks.clear();
+
     // Remove pointers to the framework's tasks in slaves.
     foreachvalue (Task* task, utils::copy(framework->tasks)) {
       Slave* slave = getSlave(task->slave_id());
@@ -1387,6 +1393,7 @@ struct TaskInfoVisitor
   virtual ~TaskInfoVisitor() {}
 };
 
+
 // Checks that a task id is valid, i.e., contains only valid characters.
 struct TaskIDChecker : TaskInfoVisitor
 {
@@ -1445,22 +1452,21 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
   {
     const TaskID& taskId = task.task_id();
 
-    if (ids.contains(taskId) || framework.tasks.contains(taskId)) {
+    if (framework.pendingTasks.contains(taskId) ||
+        framework.tasks.contains(taskId)) {
       return "Task has duplicate ID: " + taskId.value();
     }
-
-    ids.insert(taskId);
-
     return None();
   }
-
-  hashset<TaskID> ids;
 };
 
 
-// Checks that the used resources by a task (and executor if
-// necessary) on each slave does not exceed the total resources
-// offered on that slave
+// Checks that the used resources by a task on each slave does not
+// exceed the total resources offered on that slave.
+// NOTE: We do not account for executor resources here because tasks
+// are launched asynchronously and an executor might exit between
+// validation and actual launch. Therefore executor resources are
+// accounted for in 'Master::_launchTasks()'.
 struct ResourceUsageChecker : TaskInfoVisitor
 {
   virtual Option<Error> operator () (
@@ -1482,11 +1488,10 @@ struct ResourceUsageChecker : TaskInfoVisitor
     // Check if this task uses more resources than offered.
     Resources taskResources = task.resources();
 
-    if (!((usedResources + taskResources) <= resources)) {
+    if (!(taskResources <= resources)) {
       return "Task " + stringify(task.task_id()) + " attempted to use " +
-          stringify(taskResources) + " combined with already used " +
-          stringify(usedResources) + " is greater than offered " +
-          stringify(resources);
+             stringify(taskResources) + " which is greater than offered " +
+             stringify(resources);
     }
 
     // Check this task's executor's resources.
@@ -1496,33 +1501,13 @@ struct ResourceUsageChecker : TaskInfoVisitor
         if (!Resources::isAllocatable(resource)) {
           // TODO(benh): Send back the invalid resources?
           return "Executor for task " + stringify(task.task_id()) +
-              " uses invalid resources " + stringify(resource);
-        }
-      }
-
-      // Check if this task's executor is running, and if not check if
-      // the task + the executor use more resources than offered.
-      if (!executors.contains(task.executor().executor_id())) {
-        if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
-          taskResources += task.executor().resources();
-          if (!((usedResources + taskResources) <= resources)) {
-            return "Task " + stringify(task.task_id()) +
-                   " + executor attempted to use " + stringify(taskResources) +
-                   " combined with already used " + stringify(usedResources) +
-                   " is greater than offered " + stringify(resources);
-          }
+                 " uses invalid resources " + stringify(resource);
         }
-        executors.insert(task.executor().executor_id());
       }
     }
 
-    usedResources += taskResources;
-
     return None();
   }
-
-  Resources usedResources;
-  hashset<ExecutorID> executors;
 };
 
 
@@ -1544,11 +1529,26 @@ struct ExecutorInfoChecker : TaskInfoVisitor
 
     if (task.has_executor()) {
       const ExecutorID& executorId = task.executor().executor_id();
+      Option<ExecutorInfo> executorInfo = None();
+
       if (slave.hasExecutor(framework.id, executorId)) {
-        const Option<ExecutorInfo> executorInfo =
-          slave.executors.get(framework.id).get().get(executorId);
+        executorInfo = slave.executors.get(framework.id).get().get(executorId);
+      } else {
+        // See if any of the pending tasks have the same executor.
+        // Note that picking the first matching executor is ok because
+        // all the matching executors have been added to
+        // 'framework.pendingTasks' after validation and hence have
+        // the same executor info.
+        foreachvalue (const TaskInfo& task_, framework.pendingTasks) {
+          if (task_.has_executor() &&
+              task_.executor().executor_id() == executorId) {
+            executorInfo = task_.executor();
+            break;
+          }
+        }
+      }
 
-        if (!(task.executor() == executorInfo.get())) {
+      if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
           return "Task has invalid ExecutorInfo (existing ExecutorInfo"
               " with same ExecutorID is not compatible).\n"
               "------------------------------------------------------------\n"
@@ -1558,7 +1558,6 @@ struct ExecutorInfoChecker : TaskInfoVisitor
               "Task's ExecutorInfo:\n" +
               stringify(task.executor()) + "\n"
               "------------------------------------------------------------\n";
-        }
       }
     }
 
@@ -1854,9 +1853,55 @@ void Master::launchTasks(
             << " on slave " << *slave
             << " for framework " << framework->id;
 
-  Resources usedResources; // Accumulated resources used.
+  // Validate each task and launch if valid.
+  list<Future<Option<Error> > > futures;
+  foreach (const TaskInfo& task, tasks) {
+    futures.push_back(validateTask(task, framework, slave, totalResources));
+
+    // Add to pending tasks.
+    // NOTE: We need to do this here after validation because of the
+    // way task validators work.
+    framework->pendingTasks[task.task_id()] = task;
+  }
+
+  // Wait for all the tasks to be validated.
+  // NOTE: We wait for all tasks because currently the allocator
+  // is expected to get 'resourcesUnused()' once per 'launchTasks()'.
+  await(futures)
+    .onAny(defer(self(),
+                 &Master::_launchTasks,
+                 framework->id,
+                 slaveId.get(),
+                 tasks,
+                 totalResources,
+                 filters,
+                 lambda::_1));
+}
+
+
+// Helper to convert authorization result to Future<Option<Error> >.
+static Future<Option<Error> > _authorize(const string& message, bool authorized)
+{
+  if (authorized) {
+    return None();
+  }
+
+  return Error(message);
+}
+
+
+Future<Option<Error> > Master::validateTask(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave,
+    const Resources& totalResources)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
 
   // Create task visitors.
+  // TODO(vinod): Create the visitors on the heap and make the visit
+  // operation const.
   list<TaskInfoVisitor*> taskVisitors;
   taskVisitors.push_back(new TaskIDChecker());
   taskVisitors.push_back(new SlaveIDChecker());
@@ -1867,40 +1912,241 @@ void Master::launchTasks(
 
   // TODO(benh): Add a HealthCheckChecker visitor.
 
-  // Loop through each task and check it's validity.
-  foreach (const TaskInfo& task, tasks) {
-    // Possible error found while checking task's validity.
-    Option<Error> error = None();
+  // Invoke each visitor.
+  Option<Error> error = None();
+  foreach (TaskInfoVisitor* visitor, taskVisitors) {
+    error = (*visitor)(task, totalResources, *framework, *slave);
+    if (error.isSome()) {
+      break;
+    }
+  }
 
-    // Invoke each visitor.
-    foreach (TaskInfoVisitor* visitor, taskVisitors) {
-      error = (*visitor)(task, totalResources, *framework, *slave);
-      if (error.isSome()) {
-        break;
-      }
+  // Cleanup visitors.
+  while (!taskVisitors.empty()) {
+    TaskInfoVisitor* visitor = taskVisitors.front();
+    taskVisitors.pop_front();
+    delete visitor;
+  };
+
+  if (error.isSome()) {
+    return Error(error.get().message);
+  }
+
+  if (authorizer.isNone()) {
+    // Authorization is disabled.
+    return None();
+  }
+
+  // Authorize the task.
+  string user = framework->info.user(); // Default user.
+  if (task.has_command() && task.command().has_user()) {
+    user = task.command().user();
+  } else if (task.has_executor() && task.executor().command().has_user()) {
+    user = task.executor().command().user();
+  }
+
+  LOG(INFO)
+    << "Authorizing framework principal '" << framework->info.principal()
+    << "' to launch task " << task.task_id() << " as user '" << user << "'";
+
+  mesos::ACL::RunTasks request;
+  if (framework->info.has_principal()) {
+    request.mutable_principals()->add_values(framework->info.principal());
+  } else {
+    // Framework doesn't have a principal set.
+    request.mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+  }
+  request.mutable_users()->add_values(user);
+
+  return authorizer.get()->authorize(request).then(
+      lambda::bind(&_authorize,
+                   "Not authorized to launch as user '" + user + "'",
+                   lambda::_1));
+}
+
+
+void Master::launchTask(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+  CHECK(!slave->disconnected);
+
+  // Determine if this task launches an executor, and if so make sure
+  // the slave and framework state has been updated accordingly.
+  Option<ExecutorID> executorId;
+
+  if (task.has_executor()) {
+    // TODO(benh): Refactor this code into Slave::addTask.
+    if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
+      CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
+        << "Executor " << task.executor().executor_id()
+        << " known to the framework " << framework->id
+        << " but unknown to the slave " << *slave;
+
+      slave->addExecutor(framework->id, task.executor());
+      framework->addExecutor(slave->id, task.executor());
     }
 
-    if (error.isNone()) {
-      // Task looks good, get it running!
-      usedResources += launchTask(task, framework, slave);
-    } else {
-      // Error validating task, send a failed status update.
-      LOG(WARNING) << "Failed to validate task " << task.task_id()
-                   << " : " << error.get().message;
+    executorId = task.executor().executor_id();
+  }
+
+  // Add the task to the framework and slave.
+  Task* t = new Task();
+  t->mutable_framework_id()->MergeFrom(framework->id);
+  t->set_state(TASK_STAGING);
+  t->set_name(task.name());
+  t->mutable_task_id()->MergeFrom(task.task_id());
+  t->mutable_slave_id()->MergeFrom(task.slave_id());
+  t->mutable_resources()->MergeFrom(task.resources());
+
+  if (executorId.isSome()) {
+    t->mutable_executor_id()->MergeFrom(executorId.get());
+  }
+
+  framework->addTask(t);
+
+  slave->addTask(t);
+
+  // Tell the slave to launch the task!
+  LOG(INFO) << "Launching task " << task.task_id()
+            << " of framework " << framework->id
+            << " with resources " << task.resources()
+            << " on slave " << *slave;
+
+  RunTaskMessage message;
+  message.mutable_framework()->MergeFrom(framework->info);
+  message.mutable_framework_id()->MergeFrom(framework->id);
+  message.set_pid(framework->pid);
+  message.mutable_task()->MergeFrom(task);
+  send(slave->pid, message);
+
+  stats.tasks[TASK_STAGING]++;
 
+  return;
+}
+
+
+void Master::_launchTasks(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const vector<TaskInfo>& tasks,
+    const Resources& totalResources,
+    const Filters& filters,
+    const Future<list<Future<Option<Error> > > >& validationErrors)
+{
+  CHECK_READY(validationErrors);
+  CHECK_EQ(validationErrors.get().size(), tasks.size());
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+    LOG(WARNING)
+      << "Ignoring launch tasks message for framework " << frameworkId
+      << " because the framework cannot be found";
+
+    // Tell the allocator about the recovered resources.
+    allocator->resourcesRecovered(frameworkId, slaveId, totalResources);
+
+    return;
+  }
+
+  Slave* slave = getSlave(slaveId);
+  if (slave == NULL || slave->disconnected) {
+    foreach (const TaskInfo& task, tasks) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           framework->id,
-          slave->id,
+          task.slave_id(),
+          task.task_id(),
+          TASK_LOST,
+          (slave == NULL ? "Slave removed" : "Slave disconnected"));
+
+      LOG(INFO) << "Sending status update " << update << ": "
+                << (slave == NULL ? "Slave removed" : "Slave disconnected");
+
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+    }
+
+    // Tell the allocator about the recovered resources.
+    allocator->resourcesRecovered(frameworkId, slaveId, totalResources);
+
+    return;
+  }
+
+  Resources usedResources; // Accumulated resources used.
+
+  size_t index = 0;
+  foreach (const Future<Option<Error> >& future, validationErrors.get()) {
+    const TaskInfo& task = tasks[index++];
+
+    // NOTE: The task will not be in 'pendingTasks' if 'killTask()'
+    // for the task was called before we are here.
+    if (!framework->pendingTasks.contains(task.task_id())) {
+      continue;
+    }
+
+    framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks.
+
+    CHECK(!future.isDiscarded());
+    if (future.isFailed() || future.get().isSome()) {
+      const string error = future.isFailed()
+          ? "Authorization failure: " + future.failure()
+          : future.get().get().message;
+
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          framework->id,
+          task.slave_id(),
+          task.task_id(),
+          TASK_LOST,
+          error);
+
+      LOG(INFO)
+        << "Sending status update " << update << ": " << error;
+
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+
+      continue;
+    }
+
+    // Check if resources needed by the task (and its executor in case
+    // the executor is new) are available. These resources will be
+    // added by 'launchTask()' below.
+    Resources resources = task.resources();
+    if (task.has_executor() &&
+        !slave->hasExecutor(framework->id, task.executor().executor_id())) {
+      resources += task.executor().resources();
+    }
+
+    if (!(usedResources + resources <= totalResources)) {
+      const string error =
+        "Task uses more resources " + stringify(resources) +
+        " than available " + stringify(totalResources - usedResources);
+
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          framework->id,
+          task.slave_id(),
           task.task_id(),
           TASK_LOST,
-          error.get().message);
+          error);
+
+      LOG(INFO) << "Sending status update " << update << " for invalid task: "
+                << error;
 
-      LOG(INFO) << "Sending status update "
-                << update << " for invalid task";
       StatusUpdateMessage message;
       message.mutable_update()->CopyFrom(update);
       send(framework->pid, message);
+
+      continue;
     }
+
+    // Launch task.
+    launchTask(task, framework, slave);
+    usedResources += resources;
   }
 
   // All used resources should be allocatable, enforced by our validators.
@@ -1911,19 +2157,8 @@ void Master::launchTasks(
 
   if (unusedResources.allocatable().size() > 0) {
     // Tell the allocator about the unused (e.g., refused) resources.
-    allocator->resourcesUnused(
-        framework->id,
-        slave->id,
-        unusedResources,
-        filters);
+    allocator->resourcesUnused(frameworkId, slaveId, unusedResources, filters);
   }
-
-  // Cleanup visitors.
-  while (!taskVisitors.empty()) {
-    TaskInfoVisitor* visitor = taskVisitors.front();
-    taskVisitors.pop_front();
-    delete visitor;
-  };
 }
 
 
@@ -1981,6 +2216,24 @@ void Master::killTask(
     return;
   }
 
+  if (framework->pendingTasks.contains(taskId)) {
+    // Remove from pending tasks.
+    framework->pendingTasks.erase(taskId);
+
+    StatusUpdateMessage message;
+    StatusUpdate* update = message.mutable_update();
+    update->mutable_framework_id()->MergeFrom(frameworkId);
+    TaskStatus* status = update->mutable_status();
+    status->mutable_task_id()->MergeFrom(taskId);
+    status->set_state(TASK_KILLED);
+    status->set_message("Killed pending task");
+    update->set_timestamp(Clock::now().secs());
+    update->set_uuid(UUID::random().toBytes());
+    send(framework->pid, message);
+
+    return;
+  }
+
   Task* task = framework->getTask(taskId);
   if (task == NULL) {
     // TODO(bmahler): Per MESOS-1200, if we knew the SlaveID here we
@@ -2778,6 +3031,13 @@ void Master::reconcileTasks(
       continue;
     }
 
+    if (framework->pendingTasks.contains(status.task_id())) {
+      LOG(WARNING) << "Status for task " << status.task_id()
+                   << " from framework " << frameworkId
+                   << " is unknown since the task is pending";
+      continue;
+    }
+
     Option<StatusUpdate> update;
 
     // Check for a removed slave (case 1).
@@ -3052,73 +3312,6 @@ vector<Framework*> Master::getActiveFrameworks() const
 }
 
 
-Resources Master::launchTask(const TaskInfo& task,
-                             Framework* framework,
-                             Slave* slave)
-{
-  CHECK_NOTNULL(framework);
-  CHECK_NOTNULL(slave);
-
-  Resources resources; // Total resources used on slave by launching this task.
-
-  // Determine if this task launches an executor, and if so make sure
-  // the slave and framework state has been updated accordingly.
-  Option<ExecutorID> executorId;
-
-  if (task.has_executor()) {
-    // TODO(benh): Refactor this code into Slave::addTask.
-    if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
-      CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
-        << "Executor " << task.executor().executor_id()
-        << " known to the framework " << framework->id
-        << " but unknown to the slave " << *slave;
-
-      slave->addExecutor(framework->id, task.executor());
-      framework->addExecutor(slave->id, task.executor());
-      resources += task.executor().resources();
-    }
-
-    executorId = task.executor().executor_id();
-  }
-
-  // Add the task to the framework and slave.
-  Task* t = new Task();
-  t->mutable_framework_id()->MergeFrom(framework->id);
-  t->set_state(TASK_STAGING);
-  t->set_name(task.name());
-  t->mutable_task_id()->MergeFrom(task.task_id());
-  t->mutable_slave_id()->MergeFrom(task.slave_id());
-  t->mutable_resources()->MergeFrom(task.resources());
-
-  if (executorId.isSome()) {
-    t->mutable_executor_id()->MergeFrom(executorId.get());
-  }
-
-  framework->addTask(t);
-
-  slave->addTask(t);
-
-  resources += task.resources();
-
-  // Tell the slave to launch the task!
-  LOG(INFO) << "Launching task " << task.task_id()
-            << " of framework " << framework->id
-            << " with resources " << task.resources()
-            << " on slave " << *slave;
-
-  RunTaskMessage message;
-  message.mutable_framework()->MergeFrom(framework->info);
-  message.mutable_framework_id()->MergeFrom(framework->id);
-  message.set_pid(framework->pid);
-  message.mutable_task()->MergeFrom(task);
-  send(slave->pid, message);
-
-  stats.tasks[TASK_STAGING]++;
-
-  return resources;
-}
-
-
 // NOTE: This function is only called when the slave re-registers
 // with a master that already knows about it (i.e., not a failed
 // over master).
@@ -3350,6 +3543,9 @@ void Master::removeFramework(Framework* framework)
     send(slave->pid, message);
   }
 
+  // Remove the pending tasks from the framework.
+  framework->pendingTasks.clear();
+
   // Remove pointers to the framework's tasks in slaves.
   foreachvalue (Task* task, utils::copy(framework->tasks)) {
     Slave* slave = getSlave(task->slave_id());

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 75f0d49..0c68a5b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -86,10 +86,10 @@ class SlaveObserver;
 class WhitelistWatcher;
 
 struct Framework;
-struct Slave;
-struct Role;
 struct OfferVisitor;
-
+struct Role;
+struct Slave;
+struct TaskInfoVisitor;
 
 class Master : public ProtobufProcess<Master>
 {
@@ -310,11 +310,27 @@ protected:
       const std::vector<StatusUpdate>& updates,
       const process::Future<bool>& removed);
 
-  // Launch a task from a task description, and returned the consumed
-  // resources for the task and possibly it's executor.
-  Resources launchTask(const TaskInfo& task,
-                       Framework* framework,
-                       Slave* slave);
+  // Validates the task including authorization.
+  // Returns None if the task is valid.
+  // Returns Error if the task is invalid.
+  // Returns Failure if authorization returns 'Failure'.
+  process::Future<Option<Error> > validateTask(
+      const TaskInfo& task,
+      Framework* framework,
+      Slave* slave,
+      const Resources& totalResources);
+
+  // Launch a task from a task description.
+  void launchTask(const TaskInfo& task, Framework* framework, Slave* slave);
+
+  // 'launchTasks()' continuation.
+  void _launchTasks(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const std::vector<TaskInfo>& tasks,
+      const Resources& totalResources,
+      const Filters& filters,
+      const process::Future<std::list<process::Future<Option<Error> > > >& f);
 
   // Remove a task.
   void removeTask(Task* task);
@@ -901,6 +917,10 @@ struct Framework
   process::Time reregisteredTime;
   process::Time unregisteredTime;
 
+  // Tasks that have not yet been launched because they are being
+  // validated (e.g., authorized).
+  hashmap<TaskID, TaskInfo> pendingTasks;
+
   hashmap<TaskID, Task*> tasks;
 
   // NOTE: We use a shared pointer for Task because clang doesn't like

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6c4ee70/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
new file mode 100644
index 0000000..17debaf
--- /dev/null
+++ b/src/tests/master_authorization_tests.cpp
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/try.hpp>
+
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+
+
+class MasterAuthorizationTest : public MesosTest {};
+
+
+// This test verifies that an authorized task launch is successful.
+TEST_F(MasterAuthorizationTest, AuthorizedTask)
+{
+  // Setup ACLs so that the framework can only launch tasks as "foo".
+  ACLs acls;
+  acls.set_permissive(false);
+  mesos::ACL::RunTasks* acl = acls.add_run_tasks();
+  acl->mutable_principals()->add_values(DEFAULT_FRAMEWORK_INFO.principal());
+  acl->mutable_users()->add_values("foo");
+
+  master::Flags flags = CreateMasterFlags();
+  flags.acls = JSON::Protobuf(acls);
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Create an authorized executor.
+  ExecutorInfo executor; // Bug in gcc 4.1.*, must assign on next line.
+  executor = CREATE_EXECUTOR_INFO("test-executor", "exit 1");
+  executor.mutable_command()->set_user("foo");
+
+  MockExecutor exec(executor.executor_id());
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create an authorized task.
+  TaskInfo task;
+  task.set_name("test");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(executor);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
+// This test verifies that an unauthorized task launch is rejected.
+TEST_F(MasterAuthorizationTest, UnauthorizedTask)
+{
+  // Setup ACLs so that no framework can launch as "foo".
+  ACLs acls;
+  mesos::ACL::RunTasks* acl = acls.add_run_tasks();
+  acl->mutable_principals()->set_type(mesos::ACL::Entity::NONE);
+  acl->mutable_users()->add_values("foo");
+
+  master::Flags flags = CreateMasterFlags();
+  flags.acls = JSON::Protobuf(acls);
+
+  Try<PID<Master> > master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  // Create an unauthorized executor.
+  ExecutorInfo executor; // Bug in gcc 4.1.*, must assign on next line.
+  executor = CREATE_EXECUTOR_INFO("test-executor", "exit 1");
+  executor.mutable_command()->set_user("foo");
+
+  MockExecutor exec(executor.executor_id());
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Create an unauthorized task.
+  TaskInfo task;
+  task.set_name("test");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(executor);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}