You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/11/13 19:45:48 UTC

[2/2] mesos git commit: Perform task validation after authorization is done.

Perform task validation after authorization is done.

Review: https://reviews.apache.org/r/27769


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0d2b582
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0d2b582
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0d2b582

Branch: refs/heads/master
Commit: a0d2b582e718c0bb84e23560f520075b92c1c984
Parents: e021a7b
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Nov 7 18:09:11 2014 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Nov 13 10:45:13 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 91 +++++++++++++++++++---------------------------
 src/master/master.hpp |  5 +--
 2 files changed, 39 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a0d2b582/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index fbf6375..00fb3e3 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1927,8 +1927,7 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
   {
     const TaskID& taskId = task.task_id();
 
-    if (framework.pendingTasks.contains(taskId) ||
-        framework.tasks.contains(taskId)) {
+    if (framework.tasks.contains(taskId)) {
       return "Task has duplicate ID: " + taskId.value();
     }
     return None();
@@ -2054,21 +2053,6 @@ struct ExecutorInfoChecker : TaskInfoVisitor
 
       if (slave.hasExecutor(framework.id, executorId)) {
         executorInfo = slave.executors.get(framework.id).get().get(executorId);
-      } else {
-        // See if any of the pending tasks have the same executor
-        // on the same slave.
-        // Note that picking the first matching executor is ok because
-        // all the matching executors have been added to
-        // 'framework.pendingTasks' after validation and hence have
-        // the same executor info.
-        foreachvalue (const TaskInfo& task_, framework.pendingTasks) {
-          if (task_.has_executor() &&
-              task_.executor().executor_id() == executorId &&
-              task_.slave_id() == task.slave_id()) {
-            executorInfo = task_.executor();
-            break;
-          }
-        }
       }
 
       if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
@@ -2345,26 +2329,25 @@ void Master::launchTasks(
             << " on slave " << *slave
             << " for framework " << *framework;
 
-  // Validate each task.
-  vector<Option<Error>> validations;
+  // Authorize each task. A task is in 'framework->pendingTasks'
+  // before it is authorized.
+  list<Future<bool>> futures;
   foreach (const TaskInfo& task, tasks) {
-    validations.push_back(validateTask(task, framework, slave, used));
+    futures.push_back(authorizeTask(task, framework));
 
     // Add to pending tasks.
-    // NOTE: We need to do this here after validation because of the
-    // way task validators work.
-    framework->pendingTasks[task.task_id()] = task;
+    // NOTE: The task ID here hasn't been validated yet, but it
+    // doesn't matter. If the task ID is not valid, the task won't be
+    // launched anyway. If two tasks have the same ID, the second one
+    // will not be put into 'framework->pendingTasks', therefore will
+    // not be launched.
+    if (!framework->pendingTasks.contains(task.task_id())) {
+      framework->pendingTasks[task.task_id()] = task;
+    }
 
     stats.tasks[TASK_STAGING]++;
   }
 
-  // Authorize each task.
-  list<Future<bool>> futures;
-  foreach (const TaskInfo& task, tasks) {
-    // TODO(dhamon): Only authorize if there's no validation error.
-    futures.push_back(authorizeTask(task, framework));
-  }
-
   // Wait for all the tasks to be authorized.
   await(futures)
     .onAny(defer(self(),
@@ -2374,7 +2357,6 @@ void Master::launchTasks(
                  tasks,
                  used,
                  filters,
-                 validations,
                  lambda::_1));
 }
 
@@ -2522,10 +2504,8 @@ void Master::_launchTasks(
     const vector<TaskInfo>& tasks,
     const Resources& totalResources,
     const Filters& filters,
-    const vector<Option<Error>>& validations,
     const Future<list<Future<bool>>>& authorizations)
 {
-  CHECK_EQ(validations.size(), tasks.size());
   CHECK_READY(authorizations);
   CHECK_EQ(authorizations.get().size(), tasks.size());
 
@@ -2571,10 +2551,7 @@ void Master::_launchTasks(
 
   size_t index = 0;
   foreach (const Future<bool>& authorization, authorizations.get()) {
-    const TaskInfo& task = tasks[index];
-    const Option<Error>& validation = validations[index];
-
-    ++index;
+    const TaskInfo& task = tasks[index++];
 
     // NOTE: The task will not be in 'pendingTasks' if 'killTask()'
     // for the task was called before we are here.
@@ -2582,17 +2559,30 @@ void Master::_launchTasks(
       continue;
     }
 
-    framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks.
+    // Remove from pending tasks.
+    framework->pendingTasks.erase(task.task_id());
+
+    // Check authorization result.
+    CHECK(!authorization.isDiscarded());
+
+    if (authorization.isFailed() || !authorization.get()) {
+      string user = framework->info.user(); // Default user.
+      if (task.has_command() && task.command().has_user()) {
+        user = task.command().user();
+      } else if (task.has_executor() && task.executor().command().has_user()) {
+        user = task.executor().command().user();
+      }
 
-    if (validation.isSome()) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           framework->id,
           task.slave_id(),
           task.task_id(),
           TASK_ERROR,
           TaskStatus::SOURCE_MASTER,
-          validation.get().message,
-          TaskStatus::REASON_TASK_INVALID);
+          authorization.isFailed() ?
+              "Authorization failure: " + authorization.failure() :
+              "Not authorized to launch as user '" + user + "'",
+          TaskStatus::REASON_TASK_UNAUTHORIZED);
 
       metrics.tasks_error++;
       stats.tasks[TASK_ERROR]++;
@@ -2602,26 +2592,19 @@ void Master::_launchTasks(
       continue;
     }
 
-    CHECK(!authorization.isDiscarded());
-
-    if (authorization.isFailed() || !authorization.get()) {
-      string user = framework->info.user(); // Default user.
-      if (task.has_command() && task.command().has_user()) {
-        user = task.command().user();
-      } else if (task.has_executor() && task.executor().command().has_user()) {
-        user = task.executor().command().user();
-      }
+    // Validate the task.
+    const Option<Error>& validation =
+      validateTask(task, framework, slave, totalResources);
 
+    if (validation.isSome()) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           framework->id,
           task.slave_id(),
           task.task_id(),
           TASK_ERROR,
           TaskStatus::SOURCE_MASTER,
-          authorization.isFailed() ?
-              "Authorization failure: " + authorization.failure() :
-              "Not authorized to launch as user '" + user + "'",
-          TaskStatus::REASON_TASK_UNAUTHORIZED);
+          validation.get().message,
+          TaskStatus::REASON_TASK_INVALID);
 
       metrics.tasks_error++;
       stats.tasks[TASK_ERROR]++;

http://git-wip-us.apache.org/repos/asf/mesos/blob/a0d2b582/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index b3bdec6..a5e8e08 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -370,7 +370,6 @@ protected:
       const std::vector<TaskInfo>& tasks,
       const Resources& totalResources,
       const Filters& filters,
-      const std::vector<Option<Error>>& validations,
       const process::Future<std::list<process::Future<bool>>>& authorizations);
 
   // Transitions the task, and recovers resources if the task becomes
@@ -1172,8 +1171,8 @@ struct Framework
   process::Time reregisteredTime;
   process::Time unregisteredTime;
 
-  // Tasks that have not yet been launched because they are being
-  // validated (e.g., authorized).
+  // Tasks that have not yet been launched because they are currently
+  // being authorized.
   hashmap<TaskID, TaskInfo> pendingTasks;
 
   hashmap<TaskID, Task*> tasks;