You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/15 21:21:12 UTC
[11/13] mesos git commit: Kept TaskInfo beyond first scheduler ack in
command executor.
Kept TaskInfo beyond first scheduler ack in command executor.
Prior to this patch, command executor wipes TaskInfo after receiving
a status update acknowledgement from the scheduler to indicate that
there are no unacknowledged tasks. Keeping original TaskInfo beyond
the ack can be beneficial, hence we introduce a struct TaskData that
holds TaskInfo and explicit ack flag.
Review: https://reviews.apache.org/r/57596
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a67231b0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a67231b0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a67231b0
Branch: refs/heads/master
Commit: a67231b004202a196370d6e6ea83fd75f76c7beb
Parents: 6b332ad
Author: Alexander Rukletsov <al...@apache.org>
Authored: Tue Mar 14 12:36:09 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Mar 15 22:20:20 2017 +0100
----------------------------------------------------------------------
src/launcher/executor.cpp | 66 +++++++++++++++++++++++++-----------------
1 file changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a67231b0/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 57e4a32..02834c9 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -125,6 +125,7 @@ public:
const Duration& _shutdownGracePeriod)
: ProcessBase(process::ID::generate("command-executor")),
state(DISCONNECTED),
+ taskData(None()),
launched(false),
killed(false),
killedByHealthCheck(false),
@@ -143,7 +144,6 @@ public:
capabilities(_capabilities),
frameworkId(_frameworkId),
executorId(_executorId),
- unacknowledgedTask(None()),
lastTaskStatus(None())
{
#ifdef __WINDOWS__
@@ -229,8 +229,10 @@ public:
// Remove the corresponding update.
unacknowledgedUpdates.erase(uuid);
- // Remove the corresponding task.
- unacknowledgedTask = None();
+ // Mark task as acknowledged.
+ CHECK(taskData.isSome());
+ taskData->acknowledged = true;
+
break;
}
@@ -348,9 +350,9 @@ protected:
}
// Send the unacknowledged task.
- if (unacknowledgedTask.isSome()) {
+ if (taskData.isSome() && !taskData->acknowledged) {
subscribe->add_unacknowledged_tasks()->MergeFrom(
- unacknowledgedTask.get());
+ taskData->taskInfo);
}
mesos->send(evolve(call));
@@ -358,13 +360,13 @@ protected:
delay(Seconds(1), self(), &Self::doReliableRegistration);
}
- void launch(const TaskInfo& _task)
+ void launch(const TaskInfo& task)
{
CHECK_EQ(SUBSCRIBED, state);
if (launched) {
TaskStatus status = createTaskStatus(
- _task.task_id(),
+ task.task_id(),
TASK_FAILED,
None(),
"Attempted to run multiple tasks using a \"command\" executor");
@@ -374,14 +376,15 @@ protected:
}
// Capture the task.
- unacknowledgedTask = _task;
+ CHECK(taskData.isNone());
+ taskData = TaskData(task);
// Capture the TaskID.
- taskId = unacknowledgedTask->task_id();
+ taskId = task.task_id();
// Capture the kill policy.
- if (unacknowledgedTask->has_kill_policy()) {
- killPolicy = unacknowledgedTask->kill_policy();
+ if (task.has_kill_policy()) {
+ killPolicy = task.kill_policy();
}
// Determine the command to launch the task.
@@ -400,10 +403,10 @@ protected:
}
command = parse.get();
- } else if (unacknowledgedTask->has_command()) {
- command = unacknowledgedTask->command();
+ } else if (task.has_command()) {
+ command = task.command();
} else {
- LOG(FATAL) << "Expecting task '" << unacknowledgedTask->task_id()
+ LOG(FATAL) << "Expecting task '" << taskData.taskInfo->task_id()
<< "' to have a command";
}
@@ -414,11 +417,11 @@ protected:
// correct solution is to perform this validation at master side.
if (command.shell()) {
CHECK(command.has_value())
- << "Shell command of task '" << unacknowledgedTask->task_id()
+ << "Shell command of task '" << taskData.taskInfo->task_id()
<< "' is not specified!";
} else {
CHECK(command.has_value())
- << "Executable of task '" << unacknowledgedTask->task_id()
+ << "Executable of task '" << taskData.taskInfo->task_id()
<< "' is not specified!";
}
@@ -447,7 +450,7 @@ protected:
launchEnvironment.MergeFrom(command.environment());
}
- cout << "Starting task " << unacknowledgedTask->task_id() << endl;
+ cout << "Starting task " << taskData.taskInfo->task_id() << endl;
#ifndef __WINDOWS__
pid = launchTaskPosix(
@@ -475,10 +478,10 @@ protected:
cout << "Forked command at " << pid << endl;
- if (unacknowledgedTask->has_health_check()) {
+ if (task.has_health_check()) {
vector<string> namespaces;
if (rootfs.isSome() &&
- unacknowledgedTask->health_check().type() == HealthCheck::COMMAND) {
+ task.health_check().type() == HealthCheck::COMMAND) {
// Make sure command health checks are run from the task's mount
// namespace. Otherwise if rootfs is specified the command binary
// may not be available in the executor.
@@ -490,10 +493,10 @@ protected:
Try<Owned<checks::HealthChecker>> _healthChecker =
checks::HealthChecker::create(
- unacknowledgedTask->health_check(),
+ task.health_check(),
launcherDir,
defer(self(), &Self::taskHealthUpdated, lambda::_1),
- unacknowledgedTask->task_id(),
+ taskData.taskInfo->task_id(),
pid,
namespaces);
@@ -511,7 +514,7 @@ protected:
.onAny(defer(self(), &Self::reaped, pid, lambda::_1));
TaskStatus status =
- createTaskStatus(unacknowledgedTask->task_id(), TASK_RUNNING);
+ createTaskStatus(taskData.taskInfo->task_id(), TASK_RUNNING);
forward(status);
launched = true;
@@ -848,6 +851,21 @@ private:
SUBSCRIBED
} state;
+ struct TaskData
+ {
+ explicit TaskData(const TaskInfo& _taskInfo)
+ : taskInfo(_taskInfo), acknowledged(false) {}
+
+ TaskInfo taskInfo;
+
+ // Indicates whether a status update acknowledgement
+ // has been received for any status update.
+ bool acknowledged;
+ };
+
+ // Once `TaskInfo` is received, it is cached for later access.
+ Option<TaskData> taskData;
+
// TODO(alexr): Introduce a state enum and document transitions,
// see MESOS-5252.
bool launched;
@@ -880,10 +898,6 @@ private:
LinkedHashMap<UUID, Call::Update> unacknowledgedUpdates;
- // `None` if there is either no task yet or no status
- // update acknowledgements have been received yet.
- Option<TaskInfo> unacknowledgedTask;
-
Option<TaskStatus> lastTaskStatus;
Owned<checks::HealthChecker> healthChecker;