You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/08/23 22:23:46 UTC
[03/13] mesos git commit: Fixed a bug where the agent kills and still
launches a task.
Fixed a bug where the agent kills and still launches a task.
The following race leads to the agent both killing and launching a task:
(1) Slave::__run completes, task is now within Executor::queuedTasks.
(2) Slave::killTask locates the executor based on the task ID residing
in queuedTasks, calls Slave::statusUpdate() with TASK_KILLED.
(3) Slave::___run assumes that killed tasks have been removed from
Executor::queuedTasks, but this now occurs asynchronously in
Slave::_statusUpdate. So, the agent still sees the queued task
and delivers it and adds the task to Executor::launchedTasks.
(3) Slave::_statusUpdate runs, removes the task from
Executor::launchedTasks and adds it to Executor::terminatedTasks.
The fix applied here is to synchronously transition queued tasks to
a terminal state when statusUpdate is called. This can be done because
for queued tasks, we do not need to retrieve the container status (the
task never reached the container).
Review: https://reviews.apache.org/r/61639
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0b9c3ded
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0b9c3ded
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0b9c3ded
Branch: refs/heads/master
Commit: 0b9c3dedb04e9bf2c3d1f1663cf9cd4f47cb674b
Parents: f961c9d
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu Aug 10 18:34:15 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 82 ++++++++++++++++++++++++++++--------------------
src/slave/slave.hpp | 2 +-
2 files changed, 49 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0b9c3ded/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 50d2a10..1521d5d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -4516,26 +4516,36 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
metrics.valid_status_updates++;
- // Before sending update, we need to retrieve the container status.
- //
- // NOTE: If the executor sets the ContainerID inside the
- // ContainerStatus, that indicates that the Task this status update
- // is associated with is tied to that container (could be nested).
- // Therefore, we need to get the status of that container, instead
- // of the top level executor container.
- ContainerID containerId = executor->containerId;
- if (update.status().has_container_status() &&
- update.status().container_status().has_container_id()) {
- containerId = update.status().container_status().container_id();
- }
-
- containerizer->status(containerId)
- .onAny(defer(self(),
- &Slave::_statusUpdate,
- update,
- pid,
- executor->id,
- lambda::_1));
+ // Before sending update, we need to retrieve the container status
+ // if the task reached the executor. For tasks that are queued, we
+ // do not need to send the container status and we must
+ // synchronously transition the task to ensure that it is removed
+ // from the queued tasks before the run task path continues.
+ if (executor->queuedTasks.contains(status.task_id())) {
+ CHECK(protobuf::isTerminalState(status.state()))
+ << "Queued tasks can only be transitioned to terminal states";
+
+ _statusUpdate(update, pid, executor->id, None());
+ } else {
+ // NOTE: If the executor sets the ContainerID inside the
+ // ContainerStatus, that indicates that the Task this status update
+ // is associated with is tied to that container (could be nested).
+ // Therefore, we need to get the status of that container, instead
+ // of the top level executor container.
+ ContainerID containerId = executor->containerId;
+ if (update.status().has_container_status() &&
+ update.status().container_status().has_container_id()) {
+ containerId = update.status().container_status().container_id();
+ }
+
+ containerizer->status(containerId)
+ .onAny(defer(self(),
+ &Slave::_statusUpdate,
+ update,
+ pid,
+ executor->id,
+ lambda::_1));
+ }
}
@@ -4543,26 +4553,26 @@ void Slave::_statusUpdate(
StatusUpdate update,
const Option<process::UPID>& pid,
const ExecutorID& executorId,
- const Future<ContainerStatus>& future)
+ const Option<Future<ContainerStatus>>& containerStatus)
{
- ContainerStatus* containerStatus =
- update.mutable_status()->mutable_container_status();
-
// There can be cases where a container is already removed from the
// containerizer before the `status` call is dispatched to the
// containerizer, leading to the failure of the returned `Future`.
// In such a case we should simply not update the `ContainerStatus`
// with the return `Future` but continue processing the
// `StatusUpdate`.
- if (future.isReady()) {
- containerStatus->MergeFrom(future.get());
+ if (containerStatus.isSome() && containerStatus->isReady()) {
+ ContainerStatus* status =
+ update.mutable_status()->mutable_container_status();
+
+ status->MergeFrom(containerStatus->get());
// Fill in the container IP address with the IP from the agent
// PID, if not already filled in.
//
// TODO(karya): Fill in the IP address by looking up the executor PID.
- if (containerStatus->network_infos().size() == 0) {
- NetworkInfo* networkInfo = containerStatus->add_network_infos();
+ if (status->network_infos().size() == 0) {
+ NetworkInfo* networkInfo = status->add_network_infos();
NetworkInfo::IPAddress* ipAddress = networkInfo->add_ip_addresses();
// Set up IPv4 address.
@@ -5385,18 +5395,22 @@ void Slave::executorTerminated(
// the status update manager should have already cleaned up all the
// status update streams for a framework that is terminating.
if (framework->state != Framework::TERMINATING) {
- // Transition all live launched tasks.
- foreachvalue (Task* task, executor->launchedTasks) {
+ // Transition all live launched tasks. Note that the map is
+ // removed from within the loop due terminal status updates.
+ foreach (const TaskID& taskId, executor->launchedTasks.keys()) {
+ Task* task = executor->launchedTasks.at(taskId);
+
if (!protobuf::isTerminalState(task->state())) {
sendExecutorTerminatedStatusUpdate(
- task->task_id(), termination, frameworkId, executor);
+ taskId, termination, frameworkId, executor);
}
}
- // Transition all queued tasks.
- foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+ // Transition all queued tasks. Note that the map is removed
+ // from within the loop due terminal status updates.
+ foreach (const TaskID& taskId, executor->queuedTasks.keys()) {
sendExecutorTerminatedStatusUpdate(
- task.task_id(), termination, frameworkId, executor);
+ taskId, termination, frameworkId, executor);
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0b9c3ded/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0e07a1a..ca9f3da 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -252,7 +252,7 @@ public:
StatusUpdate update,
const Option<process::UPID>& pid,
const ExecutorID& executorId,
- const process::Future<ContainerStatus>& future);
+ const Option<process::Future<ContainerStatus>>& containerStatus);
// Continue handling the status update after optionally updating the
// container's resources.