You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/05/29 01:40:11 UTC
git commit: Updated the slave to send terminal tasks for terminated
executors during re-registration.
Repository: mesos
Updated Branches:
refs/heads/master 4ef409dee -> 31e382e95
Updated the slave to send terminal tasks for terminated executors
during re-registration.
The slave must send terminal unacknowledged tasks for terminal
executors so that the master can correctly reconcile tasks when
the slave re-registers.
Otherwise, the master may see a missing task
(terminal, unacknowledged) on the slave and incorrectly inform
the framework via TASK_LOST, per MESOS-1388.
Review: https://reviews.apache.org/r/21991
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31e382e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31e382e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31e382e9
Branch: refs/heads/master
Commit: 31e382e95faf1ee1ebb8901bb94ba2b5cefa8607
Parents: 4ef409d
Author: Ben Mahler <be...@gmail.com>
Authored: Wed May 28 15:51:42 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed May 28 15:52:35 2014 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 27 ++++++++++----------
src/tests/fault_tolerance_tests.cpp | 44 +++++++++++++++++++++-----------
2 files changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/31e382e9/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7fbedb1..c5c0513 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -803,8 +803,7 @@ void Slave::doReliableRegistration(const Duration& duration)
CHECK(state == DISCONNECTED || state == TERMINATING) << state;
if (info.id() == "") {
- // Slave started before master.
- // (Vinod): Is the above comment true?
+ // Registering for the first time.
RegisterSlaveMessage message;
message.mutable_slave()->CopyFrom(info);
send(master.get(), message);
@@ -816,13 +815,9 @@ void Slave::doReliableRegistration(const Duration& duration)
foreachvalue (Framework* framework, frameworks) {
foreachvalue (Executor* executor, framework->executors) {
- // Ignore terminated executors because they do not consume
- // any resources.
- if (executor->state == Executor::TERMINATED) {
- continue;
- }
-
// Add launched, terminated, and queued tasks.
+ // Note that terminated executors will only have terminated
+ // unacknowledged tasks.
foreach (Task* task, executor->launchedTasks.values()) {
message.add_tasks()->CopyFrom(*task);
}
@@ -844,12 +839,16 @@ void Slave::doReliableRegistration(const Duration& duration)
message.mutable_tasks(i)->clear_executor_id();
}
} else {
- ExecutorInfo* executorInfo = message.add_executor_infos();
- executorInfo->MergeFrom(executor->info);
-
- // Scheduler Driver will ensure the framework id is set in
- // ExecutorInfo, effectively making it a required field.
- CHECK(executorInfo->has_framework_id());
+ // Ignore terminated executors because they do not consume
+ // any resources.
+ if (executor->state != Executor::TERMINATED) {
+ ExecutorInfo* executorInfo = message.add_executor_infos();
+ executorInfo->MergeFrom(executor->info);
+
+ // Scheduler Driver will ensure the framework id is set in
+ // ExecutorInfo, effectively making it a required field.
+ CHECK(executorInfo->has_framework_id());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/31e382e9/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 3f26393..e484a8a 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1850,11 +1850,11 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
}
-// This test verifies that a re-registering slave does not inform
-// the master about a terminated executor (and its tasks), when the
-// executor has pending updates. We check this by ensuring that the
-// master sends a TASK_LOST update for the task belonging to the
-// terminated executor.
+// This test verifies that a re-registering slave sends the terminal
+// unacknowledged tasks for a terminal executor. This is required
+// for the master to correctly reconcile it's view with the slave's
+// view of tasks. This test drops a terminal update to the master
+// and then forces the slave to re-register.
TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
{
Try<PID<Master> > master = StartMaster();
@@ -1880,7 +1880,9 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
.WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
.WillRepeatedly(Return()); // Ignore subsequent offers.
- EXPECT_CALL(exec, registered(_, _, _, _));
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
EXPECT_CALL(exec, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
@@ -1894,29 +1896,41 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
- // Drop the TASK_LOST status update(s) sent to the master.
- // This ensures that the TASK_LOST received by the scheduler
- // is generated by the master.
- DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+ // Drop the TASK_FINISHED status update sent to the master.
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
Future<ExitedExecutorMessage> executorExitedMessage =
FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
+ Clock::pause();
+
+ TaskStatus finishedStatus;
+ finishedStatus = status.get();
+ finishedStatus.set_state(TASK_FINISHED);
+ execDriver->sendStatusUpdate(finishedStatus);
+
+ // Ensure the update was sent.
+ AWAIT_READY(statusUpdateMessage);
+
// Now kill the executor.
containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
- AWAIT_READY(executorExitedMessage);
-
- // Simulate a spurious master change event (e.g., due to ZooKeeper
- // expiration) at the slave to force re-registration.
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status2));
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get());
+
detector.appoint(master.get());
+ AWAIT_READY(slaveReregisteredMessage);
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
AWAIT_READY(status2);
- EXPECT_EQ(TASK_LOST, status2.get().state());
+ EXPECT_EQ(TASK_FINISHED, status2.get().state());
driver.stop();
driver.join();