You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/13 21:49:46 UTC
[02/23] git commit: Fixed slave to properly handle terminated tasks
that have pending updates.
Fixed slave to properly handle terminated tasks that have pending
updates.
Review: https://reviews.apache.org/r/11694
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/22e64506
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/22e64506
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/22e64506
Branch: refs/heads/vinod/0.13.0
Commit: 22e64506da12dc29ecd101bf73702c8287811bf2
Parents: 3f809e0
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jun 5 22:47:57 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:19:28 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 116 +++++++++++++++++++++++++++--------------------
src/slave/slave.hpp | 14 ++++--
2 files changed, 75 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/22e64506/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8ce1646..840c64d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1041,21 +1041,14 @@ void Slave::killTask(const FrameworkID& frameworkId, const TaskID& taskId)
switch (executor->state) {
case Executor::REGISTERING: {
- if (executor->queuedTasks.contains(taskId)) {
- // We remove the task here so that if this executor registers at
- // a later point in time it won't be sent this task.
- LOG(WARNING) << "Removing queued task " << taskId
- << " from executor '" << executor->id
- << "' of framework " << frameworkId
- << " because the executor hasn't registered yet";
- executor->removeTask(taskId);
- } else {
- LOG(WARNING) << "Cannot kill task " << taskId
- << " of framework " << frameworkId
- << " because the executor '" << executor->id
- << "' hasn't registered yet";
- }
+ LOG(WARNING) << "Removing queued task " << taskId
+ << " of framework " << frameworkId
+ << " because the executor '" << executor->id
+ << "' hasn't registered yet";
+ // NOTE: Sending a TASK_KILLED update removes the task from
+ // Executor::queuedTasks, so that if the executor registers at
+ // a later point in time, it won't get this task.
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
@@ -1376,11 +1369,13 @@ void Slave::_statusUpdateAcknowledgement(
executor->state == Executor::TERMINATED)
<< executor->state;
- executor->updates.remove(taskId, uuid);
+ if (executor->terminatedTasks.contains(taskId)) {
+ executor->completeTask(taskId);
+ }
// Remove the executor if it has terminated and there are no more
- // pending updates.
- if (executor->state == Executor::TERMINATED && executor->updates.empty()) {
+ // incomplete tasks.
+ if (executor->state == Executor::TERMINATED && !executor->incompleteTasks()) {
remove(framework, executor);
}
}
@@ -1594,15 +1589,13 @@ void Slave::reregisterExecutor(
send(executor->pid, message);
// Handle all the pending updates.
+ // The status update manager might have already checkpointed some
+ // of these pending updates (for example, if the slave died right
+ // after it checkpointed the update but before it could send the
+ // ACK to the executor). This is ok because the status update
+ // manager correctly handles duplicate updates.
foreach (const StatusUpdate& update, updates) {
- // The status update manager might have already checkpointed some
- // of these pending updates (for e.g: if the slave died right
- // after it checkpointed the update but before it could send the
- // ACK to the executor). If so, we can just ignore those updates.
- if (!executor->updates.contains(
- update.status().task_id(), UUID::fromBytes(update.uuid()))) {
- statusUpdate(update); // This also updates the executor's resources!
- }
+ statusUpdate(update); // This also updates the executor's resources!
}
// Now, if there is any task still in STAGING state and not in
@@ -1720,8 +1713,13 @@ void Slave::statusUpdate(const StatusUpdate& update)
if (executor == NULL) {
LOG(WARNING) << "Could not find the executor for "
<< "status update " << update;
- stats.invalidStatusUpdates++;
+ stats.validStatusUpdates++;
+ // NOTE: We forward the update here because this update could be
+ // generated by the slave when the executor is unknown to it,
+ // e.g., killTask(), _runTask().
+ // TODO(vinod): Revisit these semantics when we disallow updates
+ // sent by executors that are unknown to the slave.
statusUpdateManager->update(update, info.id())
.onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
@@ -1740,11 +1738,10 @@ void Slave::statusUpdate(const StatusUpdate& update)
stats.validStatusUpdates++;
executor->updateTaskState(status.task_id(), status.state());
- executor->updates.put(status.task_id(), UUID::fromBytes(update.uuid()));
// Handle the task appropriately if it's terminated.
if (protobuf::isTerminalState(status.state())) {
- executor->removeTask(status.task_id());
+ executor->terminateTask(status.task_id(), status.state());
// Tell the isolator to update the resources.
dispatch(isolator,
@@ -2152,10 +2149,10 @@ void Slave::executorTerminated(
send(master, message);
}
- // Remove the executor if either there are no pending updates
- // or the framework is terminating.
- if (executor->updates.empty() ||
- framework->state == Framework::TERMINATING) {
+ // Remove the executor if either the framework is terminating or
+ // there are no incomplete tasks.
+ if (framework->state == Framework::TERMINATING ||
+ !executor->incompleteTasks()) {
remove(framework, executor);
}
break;
@@ -2186,8 +2183,8 @@ void Slave::remove(Framework* framework, Executor* executor)
// care for pending updates when a framework is terminating
// because the framework cannot ACK them.
CHECK(executor->state == Executor::TERMINATED) << executor->state;
- CHECK (executor->updates.empty() ||
- framework->state == Framework::TERMINATING);
+ CHECK(framework->state == Framework::TERMINATING ||
+ !executor->incompleteTasks());
// TODO(vinod): Move the responsibility of gc'ing to the
// Executor struct.
@@ -2789,7 +2786,7 @@ Executor* Framework::getExecutor(const TaskID& taskId)
foreachvalue (Executor* executor, executors) {
if (executor->queuedTasks.contains(taskId) ||
executor->launchedTasks.contains(taskId) ||
- executor->updates.contains(taskId)) {
+ executor->terminatedTasks.contains(taskId)) {
return executor;
}
}
@@ -2941,23 +2938,36 @@ Task* Executor::addTask(const TaskInfo& task)
}
-void Executor::removeTask(const TaskID& taskId)
+void Executor::terminateTask(
+ const TaskID& taskId,
+ const mesos::TaskState& state)
{
+ Task* task = NULL;
// Remove the task if it's queued.
- queuedTasks.erase(taskId);
-
- // Update the resources if it's been launched.
- if (launchedTasks.contains(taskId)) {
- Task* task = launchedTasks[taskId];
+ if (queuedTasks.contains(taskId)) {
+ task = new Task(
+ protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+ } else if (launchedTasks.contains(taskId)) {
+ // Update the resources if it's been launched.
+ task = launchedTasks[taskId];
foreach (const Resource& resource, task->resources()) {
resources -= resource;
}
launchedTasks.erase(taskId);
+ }
- completedTasks.push_back(*task);
+ terminatedTasks[taskId] = CHECK_NOTNULL(task);
+}
- delete task;
- }
+
+void Executor::completeTask(const TaskID& taskId)
+{
+ CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+
+ Task* task = terminatedTasks[taskId];
+ completedTasks.push_back(*task);
+ terminatedTasks.erase(taskId);
+ delete task;
}
@@ -3001,16 +3011,14 @@ void Executor::recoverTask(const TaskState& state)
// Read updates to get the latest state of the task.
foreach (const StatusUpdate& update, state.updates) {
updateTaskState(state.id, update.status().state());
- updates.put(state.id, UUID::fromBytes(update.uuid()));
- // Remove the task if it received a terminal update.
+ // Terminate the task if it received a terminal update.
if (protobuf::isTerminalState(update.status().state())) {
- removeTask(state.id);
+ terminateTask(state.id, update.status().state());
- // If the terminal update has been acknowledged, remove it
- // from pending tasks.
+ // If the terminal update has been acknowledged, remove it.
if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
- updates.remove(state.id, UUID::fromBytes(update.uuid()));
+ completeTask(state.id);
}
break;
}
@@ -3026,6 +3034,14 @@ void Executor::updateTaskState(const TaskID& taskId, mesos::TaskState state)
}
+bool Executor::incompleteTasks()
+{
+ return !queuedTasks.empty() ||
+ !launchedTasks.empty() ||
+ !terminatedTasks.empty();
+}
+
+
std::ostream& operator << (std::ostream& stream, Framework::State state) {
switch (state) {
case Framework::RUNNING: return stream << "RUNNING";
http://git-wip-us.apache.org/repos/asf/mesos/blob/22e64506/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 26dc96e..5aba7ed 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -335,11 +335,15 @@ struct Executor
~Executor();
Task* addTask(const TaskInfo& task);
- void removeTask(const TaskID& taskId);
+ void terminateTask(const TaskID& taskId, const mesos::TaskState& state);
+ void completeTask(const TaskID& taskId);
void checkpointTask(const TaskInfo& task);
void recoverTask(const state::TaskState& state);
void updateTaskState(const TaskID& taskId, TaskState state);
+ // Returns true if there are any queued/launched/terminated tasks.
+ bool incompleteTasks();
+
enum State {
REGISTERING, // Executor is launched but not (re-)registered yet.
RUNNING, // Executor has (re-)registered.
@@ -367,13 +371,13 @@ struct Executor
Resources resources; // Currently consumed resources.
- hashmap<TaskID, TaskInfo> queuedTasks;
- hashmap<TaskID, Task*> launchedTasks;
+ hashmap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
+ hashmap<TaskID, Task*> launchedTasks; // Running.
+ hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
+ boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
multihashmap<TaskID, UUID> updates; // Pending updates.
- boost::circular_buffer<Task> completedTasks;
-
private:
Executor(const Executor&); // No copying.
Executor& operator = (const Executor&); // No assigning.