You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/13 21:49:53 UTC
[09/23] git commit: Fixed slave to properly handle duplicate terminal
updates for the same task.
Fixed slave to properly handle duplicate terminal updates for the
same task.
Review: https://reviews.apache.org/r/11798
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cfb83a4f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cfb83a4f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cfb83a4f
Branch: refs/heads/vinod/0.13.0
Commit: cfb83a4f98c67c3fd74ab4375f474989763d0b2e
Parents: bc2a09d
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Jun 10 22:44:42 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Jun 11 12:01:44 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 25 ++-
src/tests/status_update_manager_tests.cpp | 202 +++++++++++++++++++++++++
2 files changed, 220 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cfb83a4f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b5b7e0e..fd96de9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1687,6 +1687,8 @@ void Slave::reregisterExecutorTimeout()
// 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
void Slave::statusUpdate(const StatusUpdate& update)
{
+ LOG(INFO) << "Handling status update " << update;
+
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
<< state;
@@ -1724,7 +1726,8 @@ void Slave::statusUpdate(const StatusUpdate& update)
// generated by the slave when the executor is unknown to it,
// e.g., killTask(), _runTask().
// TODO(vinod): Revisit these semantics when we disallow updates
- // sent by executors that are unknown to the slave.
+ // (e.g., when slave recovery is always enabled) sent by executors
+ // that are unknown to the slave.
statusUpdateManager->update(update, info.id())
.onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
@@ -1737,15 +1740,17 @@ void Slave::statusUpdate(const StatusUpdate& update)
executor->state == Executor::TERMINATED)
<< executor->state;
- VLOG(1) << "Handling status update " << update;
-
stats.tasks[update.status().state()]++;
stats.validStatusUpdates++;
executor->updateTaskState(status.task_id(), status.state());
- // Handle the task appropriately if it's terminated.
- if (protobuf::isTerminalState(status.state())) {
+ // Handle the task appropriately if it is terminated.
+ // TODO(vinod): Revisit these semantics when we disallow duplicate
+ // terminal updates (e.g., when slave recovery is always enabled).
+ if (protobuf::isTerminalState(status.state()) &&
+ (executor->queuedTasks.contains(status.task_id()) ||
+ executor->launchedTasks.contains(status.task_id()))) {
executor->terminateTask(status.task_id(), status.state());
// Tell the isolator to update the resources.
@@ -2967,7 +2972,8 @@ void Executor::terminateTask(
void Executor::completeTask(const TaskID& taskId)
{
- CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+ CHECK(terminatedTasks.contains(taskId))
+ << "Failed to find terminated task " << taskId;
Task* task = terminatedTasks[taskId];
completedTasks.push_back(*task);
@@ -3018,7 +3024,12 @@ void Executor::recoverTask(const TaskState& state)
updateTaskState(state.id, update.status().state());
// Terminate the task if it received a terminal update.
- if (protobuf::isTerminalState(update.status().state())) {
+ // We ignore duplicate terminal updates by checking if
+ // the task is present in launchedTasks.
+ // TODO(vinod): Revisit these semantics when we disallow duplicate
+ // terminal updates (e.g., when slave recovery is always enabled).
+ if (protobuf::isTerminalState(update.status().state()) &&
+ launchedTasks.contains(state.id)) {
terminateTask(state.id, update.status().state());
// If the terminal update has been acknowledged, remove it.
http://git-wip-us.apache.org/repos/asf/mesos/blob/cfb83a4f/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 61ccfcc..4239532 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -464,3 +464,205 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
Shutdown();
}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received before the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// drop the duplicate update.
+TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.checkpoint = true;
+
+ Try<PID<Slave> > slave = StartSlave(&exec, flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ // Send a terminal update right away.
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Drop the first ACK from the scheduler to the slave.
+ Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage =
+ DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
+
+ Clock::pause();
+
+ driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+ AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+ Future<Nothing> _statusUpdate =
+ FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdate);
+
+ // Now send a TASK_KILLED update for the same task.
+ TaskStatus status2 = status.get();
+ status2.set_state(TASK_KILLED);
+ execDriver->sendStatusUpdate(status2);
+
+ // At this point the status update manager has enqueued
+ // TASK_FINISHED and TASK_KILLED updates.
+ AWAIT_READY(_statusUpdate);
+
+ // After we advance the clock, the scheduler should receive
+ // the retried TASK_FINISHED update and acknowledge it. The
+ // TASK_KILLED update should be dropped by the status update
+ // manager, as the stream is already terminated.
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+ Clock::settle();
+
+ // Ensure the scheduler receives TASK_FINISHED.
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_FINISHED, update.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate terminal status updates, when the
+// second update is received after the ACK for the first update.
+// The proper behavior here is for the status update manager to
+// forward the duplicate update to the scheduler.
+TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.checkpoint = true;
+
+ Try<PID<Slave> > slave = StartSlave(&exec, flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ // Send a terminal update right away.
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> _statusUpdateAcknowledgement =
+ FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+ driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+ AWAIT_READY(_statusUpdateAcknowledgement);
+
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
+ FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+ Clock::pause();
+
+ // Now send a TASK_KILLED update for the same task.
+ TaskStatus status2 = status.get();
+ status2.set_state(TASK_KILLED);
+ execDriver->sendStatusUpdate(status2);
+
+ // Ensure the scheduler receives TASK_KILLED.
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_KILLED, update.get().state());
+
+ // Ensure the slave properly handles the ACK.
+ // Clock::settle() ensures that the slave successfully
+ // executes Slave::_statusUpdateAcknowledgement().
+ AWAIT_READY(_statusUpdateAcknowledgement2);
+ Clock::settle();
+
+ Clock::resume();
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}