You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/06/08 00:06:48 UTC
[1/2] git commit: Fixed master to properly do task reconciliation
when slave re-registers.
Updated Branches:
refs/heads/master 9ae1d01d1 -> 3f07a9437
Fixed master to properly do task reconciliation when slave re-registers.
Review: https://reviews.apache.org/r/11229
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/3f07a943
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/3f07a943
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/3f07a943
Branch: refs/heads/master
Commit: 3f07a94375c79b3bc934b836102742b47fd6d52d
Parents: 15c0040
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri May 17 17:09:36 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:04:42 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 91 +++++++++++++++--------------
src/master/master.hpp | 17 ++++--
src/slave/slave.cpp | 5 ++
src/slave/slave.hpp | 2 -
src/tests/fault_tolerance_tests.cpp | 93 +++++++++++++++++++++++++++++-
5 files changed, 156 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a2e4b90..60c6d4f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -945,50 +945,8 @@ void Master::reregisterSlave(const SlaveID& slaveId,
<< ") is being allowed to re-register with an already"
<< " in use id (" << slaveId << ")";
- // Consolidate tasks between master and the slave.
- // Fist, look for the tasks present in the slave but not present
- // in the master.
- multihashmap<FrameworkID, TaskID> slaveTasks;
- foreach (const Task& task, tasks) {
- if (!slave->tasks.contains(
- std::make_pair(task.framework_id(), task.task_id()))) {
- // This might happen if a terminal status update for this task
- // came before the slave re-registered message.
- // TODO(vinod): Consider sending a KillTaskMessage.
- // TODO(vinod): Export a statistic for these tasks.
- LOG(WARNING) << "Slave " << slaveId << " attempted to re-register"
- << " with unknown task " << task.task_id()
- << " of framework " << task.framework_id();
- }
- slaveTasks.put(task.framework_id(), task.task_id());
- }
-
- // Send TASK_LOST updates for tasks present in the master but
- // missing from the slave. This could happen if the task was
- // dropped by the slave (e.g., slave exited before getting the
- // task or the task was launched while slave was in recovery).
- foreachvalue (Task* task, utils::copy(slave->tasks)) {
- if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
- LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
- << " of framework " << task->framework_id()
- << " unknown to the slave " << slaveId;
-
- Framework* framework = getFramework(task->framework_id());
- if (framework != NULL) {
- const StatusUpdate& update = protobuf::createStatusUpdate(
- task->framework_id(),
- slaveId,
- task->task_id(),
- TASK_LOST,
- "Task was not received by the slave");
-
- StatusUpdateMessage message;
- message.mutable_update()->CopyFrom(update);
- send(framework->pid, message);
- }
- removeTask(task);
- }
- }
+ // Reconcile tasks between master and the slave.
+ reconcileTasks(slave, tasks);
SlaveReregisteredMessage message;
message.mutable_slave_id()->MergeFrom(slave->id);
@@ -1648,6 +1606,46 @@ Resources Master::launchTask(const TaskInfo& task,
}
+void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
+{
+ CHECK_NOTNULL(slave);
+
+ // We convert the 'tasks' into a map for easier lookup below.
+ // TODO(vinod): Check if the tasks are known to the master.
+ multihashmap<FrameworkID, TaskID> slaveTasks;
+ foreach (const Task& task, tasks) {
+ slaveTasks.put(task.framework_id(), task.task_id());
+ }
+
+ // Send TASK_LOST updates for tasks present in the master but
+ // missing from the slave. This could happen if the task was
+ // dropped by the slave (e.g., slave exited before getting the
+ // task or the task was launched while slave was in recovery).
+ foreachvalue (Task* task, utils::copy(slave->tasks)) {
+ if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
+ LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
+ << " of framework " << task->framework_id()
+ << " unknown to the slave " << slave->id;
+
+ Framework* framework = getFramework(task->framework_id());
+ if (framework != NULL) {
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ task->framework_id(),
+ slave->id,
+ task->task_id(),
+ TASK_LOST,
+ "Task was not received by the slave");
+
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ send(framework->pid, message);
+ }
+ removeTask(task);
+ }
+ }
+}
+
+
void Master::addFramework(Framework* framework, bool reregister)
{
CHECK(frameworks.count(framework->id) == 0);
@@ -1920,6 +1918,11 @@ void Master::readdSlave(Slave* slave,
}
foreach (const Task& task, tasks) {
+ // Ignore tasks that have reached terminal state.
+ if (protobuf::isTerminalState(task.state())) {
+ continue;
+ }
+
Task* t = new Task(task);
// Add the task to the slave.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8e7b74c..86c5232 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -132,11 +132,18 @@ protected:
// Process a launch tasks request (for a non-cancelled offer) by
// launching the desired tasks (if the offer contains a valid set of
// tasks) and reporting any unused resources to the allocator.
- void processTasks(Offer* offer,
- Framework* framework,
- Slave* slave,
- const std::vector<TaskInfo>& tasks,
- const Filters& filters);
+ void processTasks(
+ Offer* offer,
+ Framework* framework,
+ Slave* slave,
+ const std::vector<TaskInfo>& tasks,
+ const Filters& filters);
+
+ // Reconciles a re-registering slave's tasks and sends TASK_LOST
+ // updates for tasks known to the master but unknown to the slave.
+ void reconcileTasks(
+ Slave* slave,
+ const std::vector<Task>& tasks);
// Add a framework.
void addFramework(Framework* framework, bool reregister = false);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 840c64d..b5b7e0e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -703,6 +703,11 @@ void Slave::doReliableRegistration()
message.add_tasks()->CopyFrom(t);
}
+
+ // Add terminated tasks.
+ foreachvalue (Task* task, executor->terminatedTasks) {
+ message.add_tasks()->CopyFrom(*task);
+ }
}
}
send(master, message);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5aba7ed..d1ba82e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -376,8 +376,6 @@ struct Executor
hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
- multihashmap<TaskID, UUID> updates; // Pending updates.
-
private:
Executor(const Executor&); // No copying.
Executor& operator = (const Executor&); // No assigning.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index ef570b7..4afbbec 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1378,7 +1378,7 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
// This test verifies that the master sends TASK_LOST updates
// for tasks in the master absent from the re-registered slave.
// We do this by dropping RunTaskMessage from master to the slave.
-TEST_F(FaultToleranceTest, ConsolidateTasksOnSlaveReregistration)
+TEST_F(FaultToleranceTest, ReconcileLostTasks)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
@@ -1448,3 +1448,94 @@ TEST_F(FaultToleranceTest, ConsolidateTasksOnSlaveReregistration)
Shutdown();
}
+
+
+// This test verifies that when the slave re-registers, the master
+// does not send TASK_LOST update for a task that has reached terminal
+// state but is waiting for an acknowledgement.
+TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("test task");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ // Send a terminal update right away.
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+ // Drop the status update from slave to the master, so that
+ // the slave has a pending terminal update when it re-registers.
+ DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+ Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Clock::pause();
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(_statusUpdate);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
+ // expiration) at the slave to force re-registration.
+ NewMasterDetectedMessage message;
+ message.set_pid(master.get());
+
+ process::post(slave.get(), message);
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // The master should not send a TASK_LOST after the slave
+ // re-registers. We check this by advancing the clock so that
+ // the only update the scheduler receives is the retried
+ // TASK_FINISHED update.
+ Clock::advance(STATUS_UPDATE_RETRY_INTERVAL);
+
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_FINISHED, status.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
[2/2] git commit: Fixed slave to properly handle terminated tasks
that have pending updates.
Posted by vi...@apache.org.
Fixed slave to properly handle terminated tasks that have pending
updates.
Review: https://reviews.apache.org/r/11694
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/15c00409
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/15c00409
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/15c00409
Branch: refs/heads/master
Commit: 15c00409e498ad119c9c92571980c048dbc7a2ef
Parents: 9ae1d01
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jun 5 22:47:57 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:04:42 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 116 ++++++++++++++++++++++++++--------------------
src/slave/slave.hpp | 14 ++++--
2 files changed, 75 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/15c00409/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8ce1646..840c64d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1041,21 +1041,14 @@ void Slave::killTask(const FrameworkID& frameworkId, const TaskID& taskId)
switch (executor->state) {
case Executor::REGISTERING: {
- if (executor->queuedTasks.contains(taskId)) {
- // We remove the task here so that if this executor registers at
- // a later point in time it won't be sent this task.
- LOG(WARNING) << "Removing queued task " << taskId
- << " from executor '" << executor->id
- << "' of framework " << frameworkId
- << " because the executor hasn't registered yet";
- executor->removeTask(taskId);
- } else {
- LOG(WARNING) << "Cannot kill task " << taskId
- << " of framework " << frameworkId
- << " because the executor '" << executor->id
- << "' hasn't registered yet";
- }
+ LOG(WARNING) << "Removing queued task " << taskId
+ << " of framework " << frameworkId
+ << " because the executor '" << executor->id
+ << "' hasn't registered yet";
+ // NOTE: Sending a TASK_KILLED update removes the task from
+ // Executor::queuedTasks, so that if the executor registers at
+ // a later point in time, it won't get this task.
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
info.id(),
@@ -1376,11 +1369,13 @@ void Slave::_statusUpdateAcknowledgement(
executor->state == Executor::TERMINATED)
<< executor->state;
- executor->updates.remove(taskId, uuid);
+ if (executor->terminatedTasks.contains(taskId)) {
+ executor->completeTask(taskId);
+ }
// Remove the executor if it has terminated and there are no more
- // pending updates.
- if (executor->state == Executor::TERMINATED && executor->updates.empty()) {
+ // incomplete tasks.
+ if (executor->state == Executor::TERMINATED && !executor->incompleteTasks()) {
remove(framework, executor);
}
}
@@ -1594,15 +1589,13 @@ void Slave::reregisterExecutor(
send(executor->pid, message);
// Handle all the pending updates.
+ // The status update manager might have already checkpointed some
+ // of these pending updates (for example, if the slave died right
+ // after it checkpointed the update but before it could send the
+ // ACK to the executor). This is ok because the status update
+ // manager correctly handles duplicate updates.
foreach (const StatusUpdate& update, updates) {
- // The status update manager might have already checkpointed some
- // of these pending updates (for e.g: if the slave died right
- // after it checkpointed the update but before it could send the
- // ACK to the executor). If so, we can just ignore those updates.
- if (!executor->updates.contains(
- update.status().task_id(), UUID::fromBytes(update.uuid()))) {
- statusUpdate(update); // This also updates the executor's resources!
- }
+ statusUpdate(update); // This also updates the executor's resources!
}
// Now, if there is any task still in STAGING state and not in
@@ -1720,8 +1713,13 @@ void Slave::statusUpdate(const StatusUpdate& update)
if (executor == NULL) {
LOG(WARNING) << "Could not find the executor for "
<< "status update " << update;
- stats.invalidStatusUpdates++;
+ stats.validStatusUpdates++;
+ // NOTE: We forward the update here because this update could be
+ // generated by the slave when the executor is unknown to it,
+ // e.g., killTask(), _runTask().
+ // TODO(vinod): Revisit these semantics when we disallow updates
+ // sent by executors that are unknown to the slave.
statusUpdateManager->update(update, info.id())
.onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
@@ -1740,11 +1738,10 @@ void Slave::statusUpdate(const StatusUpdate& update)
stats.validStatusUpdates++;
executor->updateTaskState(status.task_id(), status.state());
- executor->updates.put(status.task_id(), UUID::fromBytes(update.uuid()));
// Handle the task appropriately if it's terminated.
if (protobuf::isTerminalState(status.state())) {
- executor->removeTask(status.task_id());
+ executor->terminateTask(status.task_id(), status.state());
// Tell the isolator to update the resources.
dispatch(isolator,
@@ -2152,10 +2149,10 @@ void Slave::executorTerminated(
send(master, message);
}
- // Remove the executor if either there are no pending updates
- // or the framework is terminating.
- if (executor->updates.empty() ||
- framework->state == Framework::TERMINATING) {
+ // Remove the executor if either the framework is terminating or
+ // there are no incomplete tasks.
+ if (framework->state == Framework::TERMINATING ||
+ !executor->incompleteTasks()) {
remove(framework, executor);
}
break;
@@ -2186,8 +2183,8 @@ void Slave::remove(Framework* framework, Executor* executor)
// care for pending updates when a framework is terminating
// because the framework cannot ACK them.
CHECK(executor->state == Executor::TERMINATED) << executor->state;
- CHECK (executor->updates.empty() ||
- framework->state == Framework::TERMINATING);
+ CHECK(framework->state == Framework::TERMINATING ||
+ !executor->incompleteTasks());
// TODO(vinod): Move the responsibility of gc'ing to the
// Executor struct.
@@ -2789,7 +2786,7 @@ Executor* Framework::getExecutor(const TaskID& taskId)
foreachvalue (Executor* executor, executors) {
if (executor->queuedTasks.contains(taskId) ||
executor->launchedTasks.contains(taskId) ||
- executor->updates.contains(taskId)) {
+ executor->terminatedTasks.contains(taskId)) {
return executor;
}
}
@@ -2941,23 +2938,36 @@ Task* Executor::addTask(const TaskInfo& task)
}
-void Executor::removeTask(const TaskID& taskId)
+void Executor::terminateTask(
+ const TaskID& taskId,
+ const mesos::TaskState& state)
{
+ Task* task = NULL;
// Remove the task if it's queued.
- queuedTasks.erase(taskId);
-
- // Update the resources if it's been launched.
- if (launchedTasks.contains(taskId)) {
- Task* task = launchedTasks[taskId];
+ if (queuedTasks.contains(taskId)) {
+ task = new Task(
+ protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+ } else if (launchedTasks.contains(taskId)) {
+ // Update the resources if it's been launched.
+ task = launchedTasks[taskId];
foreach (const Resource& resource, task->resources()) {
resources -= resource;
}
launchedTasks.erase(taskId);
+ }
- completedTasks.push_back(*task);
+ terminatedTasks[taskId] = CHECK_NOTNULL(task);
+}
- delete task;
- }
+
+void Executor::completeTask(const TaskID& taskId)
+{
+ CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+
+ Task* task = terminatedTasks[taskId];
+ completedTasks.push_back(*task);
+ terminatedTasks.erase(taskId);
+ delete task;
}
@@ -3001,16 +3011,14 @@ void Executor::recoverTask(const TaskState& state)
// Read updates to get the latest state of the task.
foreach (const StatusUpdate& update, state.updates) {
updateTaskState(state.id, update.status().state());
- updates.put(state.id, UUID::fromBytes(update.uuid()));
- // Remove the task if it received a terminal update.
+ // Terminate the task if it received a terminal update.
if (protobuf::isTerminalState(update.status().state())) {
- removeTask(state.id);
+ terminateTask(state.id, update.status().state());
- // If the terminal update has been acknowledged, remove it
- // from pending tasks.
+ // If the terminal update has been acknowledged, remove it.
if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
- updates.remove(state.id, UUID::fromBytes(update.uuid()));
+ completeTask(state.id);
}
break;
}
@@ -3026,6 +3034,14 @@ void Executor::updateTaskState(const TaskID& taskId, mesos::TaskState state)
}
+bool Executor::incompleteTasks()
+{
+ return !queuedTasks.empty() ||
+ !launchedTasks.empty() ||
+ !terminatedTasks.empty();
+}
+
+
std::ostream& operator << (std::ostream& stream, Framework::State state) {
switch (state) {
case Framework::RUNNING: return stream << "RUNNING";
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/15c00409/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 26dc96e..5aba7ed 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -335,11 +335,15 @@ struct Executor
~Executor();
Task* addTask(const TaskInfo& task);
- void removeTask(const TaskID& taskId);
+ void terminateTask(const TaskID& taskId, const mesos::TaskState& state);
+ void completeTask(const TaskID& taskId);
void checkpointTask(const TaskInfo& task);
void recoverTask(const state::TaskState& state);
void updateTaskState(const TaskID& taskId, TaskState state);
+ // Returns true if there are any queued/launched/terminated tasks.
+ bool incompleteTasks();
+
enum State {
REGISTERING, // Executor is launched but not (re-)registered yet.
RUNNING, // Executor has (re-)registered.
@@ -367,13 +371,13 @@ struct Executor
Resources resources; // Currently consumed resources.
- hashmap<TaskID, TaskInfo> queuedTasks;
- hashmap<TaskID, Task*> launchedTasks;
+ hashmap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
+ hashmap<TaskID, Task*> launchedTasks; // Running.
+ hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
+ boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
multihashmap<TaskID, UUID> updates; // Pending updates.
- boost::circular_buffer<Task> completedTasks;
-
private:
Executor(const Executor&); // No copying.
Executor& operator = (const Executor&); // No assigning.