You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/11/13 19:45:48 UTC
[2/2] mesos git commit: Perform task validation after authorization
is done.
Perform task validation after authorization is done.
Review: https://reviews.apache.org/r/27769
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0d2b582
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0d2b582
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0d2b582
Branch: refs/heads/master
Commit: a0d2b582e718c0bb84e23560f520075b92c1c984
Parents: e021a7b
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Nov 7 18:09:11 2014 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Nov 13 10:45:13 2014 -0800
----------------------------------------------------------------------
src/master/master.cpp | 91 +++++++++++++++++++---------------------------
src/master/master.hpp | 5 +--
2 files changed, 39 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0d2b582/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index fbf6375..00fb3e3 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1927,8 +1927,7 @@ struct UniqueTaskIDChecker : TaskInfoVisitor
{
const TaskID& taskId = task.task_id();
- if (framework.pendingTasks.contains(taskId) ||
- framework.tasks.contains(taskId)) {
+ if (framework.tasks.contains(taskId)) {
return "Task has duplicate ID: " + taskId.value();
}
return None();
@@ -2054,21 +2053,6 @@ struct ExecutorInfoChecker : TaskInfoVisitor
if (slave.hasExecutor(framework.id, executorId)) {
executorInfo = slave.executors.get(framework.id).get().get(executorId);
- } else {
- // See if any of the pending tasks have the same executor
- // on the same slave.
- // Note that picking the first matching executor is ok because
- // all the matching executors have been added to
- // 'framework.pendingTasks' after validation and hence have
- // the same executor info.
- foreachvalue (const TaskInfo& task_, framework.pendingTasks) {
- if (task_.has_executor() &&
- task_.executor().executor_id() == executorId &&
- task_.slave_id() == task.slave_id()) {
- executorInfo = task_.executor();
- break;
- }
- }
}
if (executorInfo.isSome() && !(task.executor() == executorInfo.get())) {
@@ -2345,26 +2329,25 @@ void Master::launchTasks(
<< " on slave " << *slave
<< " for framework " << *framework;
- // Validate each task.
- vector<Option<Error>> validations;
+ // Authorize each task. A task is in 'framework->pendingTasks'
+ // before it is authorized.
+ list<Future<bool>> futures;
foreach (const TaskInfo& task, tasks) {
- validations.push_back(validateTask(task, framework, slave, used));
+ futures.push_back(authorizeTask(task, framework));
// Add to pending tasks.
- // NOTE: We need to do this here after validation because of the
- // way task validators work.
- framework->pendingTasks[task.task_id()] = task;
+ // NOTE: The task ID here hasn't been validated yet, but it
+ // doesn't matter. If the task ID is not valid, the task won't be
+ // launched anyway. If two tasks have the same ID, the second one
+ // will not be put into 'framework->pendingTasks', therefore will
+ // not be launched.
+ if (!framework->pendingTasks.contains(task.task_id())) {
+ framework->pendingTasks[task.task_id()] = task;
+ }
stats.tasks[TASK_STAGING]++;
}
- // Authorize each task.
- list<Future<bool>> futures;
- foreach (const TaskInfo& task, tasks) {
- // TODO(dhamon): Only authorize if there's no validation error.
- futures.push_back(authorizeTask(task, framework));
- }
-
// Wait for all the tasks to be authorized.
await(futures)
.onAny(defer(self(),
@@ -2374,7 +2357,6 @@ void Master::launchTasks(
tasks,
used,
filters,
- validations,
lambda::_1));
}
@@ -2522,10 +2504,8 @@ void Master::_launchTasks(
const vector<TaskInfo>& tasks,
const Resources& totalResources,
const Filters& filters,
- const vector<Option<Error>>& validations,
const Future<list<Future<bool>>>& authorizations)
{
- CHECK_EQ(validations.size(), tasks.size());
CHECK_READY(authorizations);
CHECK_EQ(authorizations.get().size(), tasks.size());
@@ -2571,10 +2551,7 @@ void Master::_launchTasks(
size_t index = 0;
foreach (const Future<bool>& authorization, authorizations.get()) {
- const TaskInfo& task = tasks[index];
- const Option<Error>& validation = validations[index];
-
- ++index;
+ const TaskInfo& task = tasks[index++];
// NOTE: The task will not be in 'pendingTasks' if 'killTask()'
// for the task was called before we are here.
@@ -2582,17 +2559,30 @@ void Master::_launchTasks(
continue;
}
- framework->pendingTasks.erase(task.task_id()); // Remove from pending tasks.
+ // Remove from pending tasks.
+ framework->pendingTasks.erase(task.task_id());
+
+ // Check authorization result.
+ CHECK(!authorization.isDiscarded());
+
+ if (authorization.isFailed() || !authorization.get()) {
+ string user = framework->info.user(); // Default user.
+ if (task.has_command() && task.command().has_user()) {
+ user = task.command().user();
+ } else if (task.has_executor() && task.executor().command().has_user()) {
+ user = task.executor().command().user();
+ }
- if (validation.isSome()) {
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
task.slave_id(),
task.task_id(),
TASK_ERROR,
TaskStatus::SOURCE_MASTER,
- validation.get().message,
- TaskStatus::REASON_TASK_INVALID);
+ authorization.isFailed() ?
+ "Authorization failure: " + authorization.failure() :
+ "Not authorized to launch as user '" + user + "'",
+ TaskStatus::REASON_TASK_UNAUTHORIZED);
metrics.tasks_error++;
stats.tasks[TASK_ERROR]++;
@@ -2602,26 +2592,19 @@ void Master::_launchTasks(
continue;
}
- CHECK(!authorization.isDiscarded());
-
- if (authorization.isFailed() || !authorization.get()) {
- string user = framework->info.user(); // Default user.
- if (task.has_command() && task.command().has_user()) {
- user = task.command().user();
- } else if (task.has_executor() && task.executor().command().has_user()) {
- user = task.executor().command().user();
- }
+ // Validate the task.
+ const Option<Error>& validation =
+ validateTask(task, framework, slave, totalResources);
+ if (validation.isSome()) {
const StatusUpdate& update = protobuf::createStatusUpdate(
framework->id,
task.slave_id(),
task.task_id(),
TASK_ERROR,
TaskStatus::SOURCE_MASTER,
- authorization.isFailed() ?
- "Authorization failure: " + authorization.failure() :
- "Not authorized to launch as user '" + user + "'",
- TaskStatus::REASON_TASK_UNAUTHORIZED);
+ validation.get().message,
+ TaskStatus::REASON_TASK_INVALID);
metrics.tasks_error++;
stats.tasks[TASK_ERROR]++;
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0d2b582/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index b3bdec6..a5e8e08 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -370,7 +370,6 @@ protected:
const std::vector<TaskInfo>& tasks,
const Resources& totalResources,
const Filters& filters,
- const std::vector<Option<Error>>& validations,
const process::Future<std::list<process::Future<bool>>>& authorizations);
// Transitions the task, and recovers resources if the task becomes
@@ -1172,8 +1171,8 @@ struct Framework
process::Time reregisteredTime;
process::Time unregisteredTime;
- // Tasks that have not yet been launched because they are being
- // validated (e.g., authorized).
+ // Tasks that have not yet been launched because they are currently
+ // being authorized.
hashmap<TaskID, TaskInfo> pendingTasks;
hashmap<TaskID, Task*> tasks;