You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/02/14 12:15:10 UTC
[5/5] mesos git commit: Made master set `launch_executor` in the
RunTask(Group)Message.
Made master set `launch_executor` in the RunTask(Group)Message.
By setting a new field `launch_executor` in the RunTask(Group)Message,
the master is able to control executor creation on the agent.
Also refactored the `addTask()` logic. Added two new functions:
`isTaskLaunchExecutor()` checks if a task needs to launch an executor;
`addExecutor()` adds an executor to the framework and slave.
Review: https://reviews.apache.org/r/65504/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10aa875d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10aa875d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10aa875d
Branch: refs/heads/1.5.x
Commit: 10aa875df8947f8cbfb318820101984d99259070
Parents: 08e0ceb
Author: Meng Zhu <mz...@mesosphere.io>
Authored: Tue Feb 13 22:44:58 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 14 03:41:16 2018 -0800
----------------------------------------------------------------------
src/master/master.cpp | 112 +++++++++++++++++++++++++++++++--------------
src/master/master.hpp | 19 ++++++--
2 files changed, 92 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/10aa875d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2758000..2b093d6 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3883,44 +3883,56 @@ Future<bool> Master::authorizeSlave(const Option<string>& principal)
}
-Resources Master::addTask(
- const TaskInfo& task,
+bool Master::isLaunchExecutor(
+ const ExecutorID& executorId,
Framework* framework,
- Slave* slave)
+ Slave* slave) const
{
CHECK_NOTNULL(framework);
CHECK_NOTNULL(slave);
- CHECK(slave->connected) << "Adding task " << task.task_id()
- << " to disconnected agent " << *slave;
- // The resources consumed.
- Resources resources = task.resources();
+ if (!slave->hasExecutor(framework->id(), executorId)) {
+ CHECK(!framework->hasExecutor(slave->id, executorId))
+ << "Executor '" << executorId
+ << "' known to the framework " << *framework
+ << " but unknown to the agent " << *slave;
+ return true;
+ }
+
+ return false;
+}
+
- // Determine if this task launches an executor, and if so make sure
- // the slave and framework state has been updated accordingly.
+void Master::addExecutor(
+ const ExecutorInfo& executorInfo,
+ Framework* framework,
+ Slave* slave)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+ CHECK(slave->connected) << "Adding executor " << executorInfo.executor_id()
+ << " to disconnected agent " << *slave;
- if (task.has_executor()) {
- // TODO(benh): Refactor this code into Slave::addTask.
- if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) {
- CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
- << "Executor '" << task.executor().executor_id()
- << "' known to the framework " << *framework
- << " but unknown to the agent " << *slave;
+ slave->addExecutor(framework->id(), executorInfo);
+ framework->addExecutor(slave->id, executorInfo);
+}
- slave->addExecutor(framework->id(), task.executor());
- framework->addExecutor(slave->id, task.executor());
- resources += task.executor().resources();
- }
- }
+void Master::addTask(
+ const TaskInfo& task,
+ Framework* framework,
+ Slave* slave)
+{
+ CHECK_NOTNULL(framework);
+ CHECK_NOTNULL(slave);
+ CHECK(slave->connected) << "Adding task " << task.task_id()
+ << " to disconnected agent " << *slave;
// Add the task to the framework and slave.
Task* t = new Task(protobuf::createTask(task, TASK_STAGING, framework->id()));
slave->addTask(t);
framework->addTask(t);
-
- return resources;
}
@@ -4953,7 +4965,23 @@ void Master::_accept(
// Add task.
if (pending) {
- const Resources consumed = addTask(task, framework, slave);
+ Resources consumed;
+
+ bool launchExecutor = true;
+ if (task.has_executor()) {
+ launchExecutor = isLaunchExecutor(
+ task.executor().executor_id(), framework, slave);
+
+ // Master tracks the new executor only if the task is not a
+ // command task.
+ if (launchExecutor) {
+ addExecutor(task.executor(), framework, slave);
+ consumed += task.executor().resources();
+ }
+ }
+
+ addTask(task, framework, slave);
+ consumed += task.resources();
CHECK(available.contains(consumed))
<< available << " does not contain " << consumed;
@@ -4995,6 +5023,8 @@ void Master::_accept(
message.set_pid(framework->pid.getOrElse(UPID()));
message.mutable_task()->MergeFrom(task);
+ message.set_launch_executor(launchExecutor);
+
if (HookManager::hooksAvailable()) {
// Set labels retrieved from label-decorator hooks.
message.mutable_task()->mutable_labels()->CopyFrom(
@@ -5013,11 +5043,11 @@ void Master::_accept(
CHECK_SOME(downgradeResources(&message));
}
- // TODO(bmahler): Consider updating this log message to
- // indicate when the executor is also being launched.
LOG(INFO) << "Launching task " << task.task_id() << " of framework "
<< *framework << " with resources " << task.resources()
- << " on agent " << *slave;
+ << " on agent " << *slave << " on "
+ << (launchExecutor ?
+ " new executor" : " existing executor");
send(slave->pid, message);
}
@@ -5176,18 +5206,25 @@ void Master::_accept(
set<TaskID> taskIds;
Resources totalResources;
+ Resources executorResources;
+
+ bool launchExecutor =
+ isLaunchExecutor(executor.executor_id(), framework, slave);
+
+ if (launchExecutor) {
+ addExecutor(executor, framework, slave);
+ executorResources = executor.resources();
+ totalResources += executorResources;
+ }
+
+ message.set_launch_executor(launchExecutor);
foreach (
TaskInfo& task, *message.mutable_task_group()->mutable_tasks()) {
taskIds.insert(task.task_id());
totalResources += task.resources();
- const Resources consumed = addTask(task, framework, slave);
-
- CHECK(_offeredResources.contains(consumed))
- << _offeredResources << " does not contain " << consumed;
-
- _offeredResources -= consumed;
+ addTask(task, framework, slave);
if (HookManager::hooksAvailable()) {
// Set labels retrieved from label-decorator hooks.
@@ -5199,6 +5236,11 @@ void Master::_accept(
}
}
+ CHECK(_offeredResources.contains(totalResources))
+ << _offeredResources << " does not contain " << totalResources;
+
+ _offeredResources -= totalResources;
+
// If the agent does not support reservation refinement, downgrade
// the task and executor resources to the "pre-reservation-refinement"
// format. This cannot contain any refined reservations since
@@ -5210,7 +5252,9 @@ void Master::_accept(
LOG(INFO) << "Launching task group " << stringify(taskIds)
<< " of framework " << *framework << " with resources "
- << totalResources << " on agent " << *slave;
+ << totalResources - executorResources << " on agent "
+ << *slave << " on "
+ << (launchExecutor ? " new executor" : " existing executor");
send(slave->pid, message);
http://git-wip-us.apache.org/repos/asf/mesos/blob/10aa875d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index a94ef38..9030cad 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -845,11 +845,20 @@ protected:
const Offer::Operation::Destroy& destroy,
const Option<process::http::authentication::Principal>& principal);
- // Add the task and its executor (if not already running) to the
- // framework and slave. Returns the resources consumed as a result,
- // which includes resources for the task and its executor
- // (if not already running).
- Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
+ // Determine if a new executor needs to be launched.
+ bool isLaunchExecutor (
+ const ExecutorID& executorId,
+ Framework* framework,
+ Slave* slave) const;
+
+ // Add executor to the framework and slave.
+ void addExecutor(
+ const ExecutorInfo& executorInfo,
+ Framework* framework,
+ Slave* slave);
+
+ // Add task to the framework and slave.
+ void addTask(const TaskInfo& task, Framework* framework, Slave* slave);
// Transitions the task, and recovers resources if the task becomes
// terminal.