You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/09/19 03:24:59 UTC
[2/2] git commit: Hold on to unacknowledged terminal tasks in the
Master.
Hold on to unacknowledged terminal tasks in the Master.
Review: https://reviews.apache.org/r/25567
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3182fbd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3182fbd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3182fbd
Branch: refs/heads/master
Commit: b3182fbd138137b7864d7bc26cb45e67deaabac0
Parents: 3636c0e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 11 10:49:24 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Sep 18 18:24:51 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 131 +++++++++++++++++++++++++++++----------------
src/master/master.hpp | 11 +++-
2 files changed, 94 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3182fbd/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 52a7409..34803f7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -651,7 +651,7 @@ void Master::finalize()
// Remove the slaves.
foreachvalue (Slave* slave, slaves.registered) {
- // Remove tasks.
+ // Remove tasks, don't bother recovering resources.
foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
removeTask(task);
@@ -685,7 +685,8 @@ void Master::finalize()
// allocator or the roles because it is unnecessary bookkeeping at
// this point since we are shutting down.
foreachvalue (Framework* framework, frameworks.registered) {
- // Remove pending tasks from the framework.
+ // Remove pending tasks from the framework. Don't bother
+ // recovering the resources in the allocator.
framework->pendingTasks.clear();
// No tasks/executors/offers should remain since the slaves
@@ -2717,6 +2718,12 @@ void Master::statusUpdateAcknowledgement(
return;
}
+ Task* task = slave->getTask(frameworkId, taskId);
+
+ if (task != NULL && protobuf::isTerminalState(task->state())) {
+ removeTask(task);
+ }
+
LOG(INFO) << "Forwarding status update acknowledgement "
<< UUID::fromBytes(uuid) << " for task " << taskId
<< " of framework " << frameworkId << " to slave " << *slave;
@@ -3192,8 +3199,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
forward(update, pid, framework);
// Lookup the task and see if we need to update anything locally.
- const TaskStatus& status = update.status();
- Task* task = slave->getTask(update.framework_id(), status.task_id());
+ Task* task = slave->getTask(update.framework_id(), update.status().task_id());
if (task == NULL) {
LOG(WARNING) << "Could not lookup task for status update " << update
<< " from slave " << *slave;
@@ -3204,20 +3210,14 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
LOG(INFO) << "Status update " << update << " from slave " << *slave;
- // TODO(brenden) Consider wiping the `data` and `message` fields?
- if (task->statuses_size() > 0 &&
- task->statuses(task->statuses_size() - 1).state() == status.state()) {
- task->mutable_statuses()->RemoveLast();
- }
- task->add_statuses()->CopyFrom(status);
- task->set_state(status.state());
+ updateTask(task, update.status());
- // Handle the task appropriately if it's terminated.
- if (protobuf::isTerminalState(status.state())) {
+ // If the task is terminal and no acknowledgement is needed,
+ // then remove the task now.
+ if (protobuf::isTerminalState(task->state()) && pid == UPID()) {
removeTask(task);
}
- stats.tasks[status.state()]++;
stats.validStatusUpdates++;
metrics.valid_status_updates++;
}
@@ -3763,14 +3763,14 @@ void Master::reconcile(
// missing from the slave. This could happen if the task was
// dropped by the slave (e.g., slave exited before getting the
// task or the task was launched while slave was in recovery).
- // NOTE: copies are used because statusUpdate() modifies
- // slave->tasks.
+ // NOTE: copies are needed because removeTask modified slave->tasks.
foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
- LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
+ LOG(WARNING) << "Task " << task->task_id()
<< " of framework " << task->framework_id()
- << " unknown to the slave " << *slave;
+ << " unknown to the slave " << *slave
+ << " during re-registration";
const StatusUpdate& update = protobuf::createStatusUpdate(
task->framework_id(),
@@ -3779,7 +3779,13 @@ void Master::reconcile(
TASK_LOST,
"Task is unknown to the slave");
- statusUpdate(update, UPID());
+ updateTask(task, update.status());
+ removeTask(task);
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ forward(update, UPID(), framework);
+ }
}
}
}
@@ -4079,8 +4085,7 @@ void Master::removeFramework(Slave* slave, Framework* framework)
// Remove pointers to framework's tasks in slaves, and send status
// updates.
- // NOTE: A copy is used because statusUpdate() modifies
- // slave->tasks.
+ // NOTE: A copy is needed because removeTask modifies slave->tasks.
foreachvalue (Task* task, utils::copy(slave->tasks[framework->id])) {
// Remove tasks that belong to this framework.
if (task->framework_id() == framework->id) {
@@ -4096,7 +4101,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
(task->has_executor_id()
? Option<ExecutorID>(task->executor_id()) : None()));
- statusUpdate(update, UPID());
+ updateTask(task, update.status());
+ removeTask(task);
+ forward(update, UPID(), framework);
}
}
@@ -4184,11 +4191,6 @@ void Master::readdSlave(
}
foreach (const Task& task, tasks) {
- // Ignore tasks that have reached terminal state.
- if (protobuf::isTerminalState(task.state())) {
- continue;
- }
-
Task* t = new Task(task);
// Add the task to the slave.
@@ -4211,7 +4213,10 @@ void Master::readdSlave(
<< " running on slave " << *slave;
}
- resources[task.framework_id()] += task.resources();
+ // Terminal tasks do not consume resoures.
+ if (!protobuf::isTerminalState(task.state())) {
+ resources[task.framework_id()] += task.resources();
+ }
}
// Re-add completed tasks reported by the slave.
@@ -4269,8 +4274,7 @@ void Master::removeSlave(Slave* slave)
(task->has_executor_id() ?
Option<ExecutorID>(task->executor_id()) : None()));
- task->add_statuses()->CopyFrom(update.status());
- task->set_state(update.status().state());
+ updateTask(task, update.status());
removeTask(task);
updates.push_back(update);
@@ -4369,6 +4373,49 @@ void Master::_removeSlave(
}
+void Master::updateTask(Task* task, const TaskStatus& status)
+{
+ // Out-of-order updates should not occur, however in case they
+ // do (e.g. MESOS-1799), prevent them here to ensure that the
+ // resource accounting is not affected.
+ if (protobuf::isTerminalState(task->state()) &&
+ !protobuf::isTerminalState(status.state())) {
+ LOG(ERROR) << "Ignoring out of order status update for task "
+ << task->task_id()
+ << " (" << task->state() << " -> " << status.state() << ")";
+ return;
+ }
+
+ // Once the task becomes terminal, we recover the resources.
+ if (!protobuf::isTerminalState(task->state()) &&
+ protobuf::isTerminalState(status.state())) {
+ allocator->resourcesRecovered(
+ task->framework_id(),
+ task->slave_id(),
+ task->resources(),
+ None());
+
+ switch (status.state()) {
+ case TASK_FINISHED: ++metrics.tasks_finished; break;
+ case TASK_FAILED: ++metrics.tasks_failed; break;
+ case TASK_KILLED: ++metrics.tasks_killed; break;
+ case TASK_LOST: ++metrics.tasks_lost; break;
+ default: break;
+ }
+ }
+
+ // TODO(brenden) Consider wiping the `data` and `message` fields?
+ if (task->statuses_size() > 0 &&
+ task->statuses(task->statuses_size() - 1).state() == status.state()) {
+ task->mutable_statuses()->RemoveLast();
+ }
+ task->add_statuses()->CopyFrom(status);
+ task->set_state(status.state());
+
+ stats.tasks[status.state()]++;
+}
+
+
void Master::removeTask(Task* task)
{
CHECK_NOTNULL(task);
@@ -4381,6 +4428,14 @@ void Master::removeTask(Task* task)
<< " of framework " << task->framework_id()
<< " on slave " << *slave
<< " in non-terminal state " << task->state();
+
+ // If the task is not terminal, then the resources have
+ // not yet been released.
+ allocator->resourcesRecovered(
+ task->framework_id(),
+ task->slave_id(),
+ task->resources(),
+ None());
} else {
LOG(INFO) << "Removing task " << task->task_id()
<< " with resources " << task->resources()
@@ -4397,22 +4452,6 @@ void Master::removeTask(Task* task)
// Remove from slave.
slave->removeTask(task);
- // Tell the allocator about the recovered resources.
- allocator->resourcesRecovered(
- task->framework_id(),
- task->slave_id(),
- task->resources(),
- None());
-
- // Update the task state metric.
- switch (task->state()) {
- case TASK_FINISHED: ++metrics.tasks_finished; break;
- case TASK_FAILED: ++metrics.tasks_failed; break;
- case TASK_KILLED: ++metrics.tasks_killed; break;
- case TASK_LOST: ++metrics.tasks_lost; break;
- default: break;
- }
-
delete task;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3182fbd/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 80d7535..41da240 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -47,6 +47,7 @@
#include <stout/multihashmap.hpp>
#include <stout/option.hpp>
+#include "common/protobuf_utils.hpp"
#include "common/type_utils.hpp"
#include "files/files.hpp"
@@ -368,7 +369,11 @@ protected:
const Filters& filters,
const process::Future<std::list<process::Future<Option<Error> > > >& f);
- // Remove a task and recover its resources.
+ // Transitions the task, and recovers resources if the task becomes
+ // terminal.
+ void updateTask(Task* task, const TaskStatus& status);
+
+ // Removes the task.
void removeTask(Task* task);
// Remove an executor and recover its resources.
@@ -908,7 +913,9 @@ struct Slave
foreachkey (const FrameworkID& frameworkId, tasks) {
foreachvalue (const Task* task, tasks.find(frameworkId)->second) {
- used += task->resources();
+ if (!protobuf::isTerminalState(task->state())) {
+ used += task->resources();
+ }
}
}