You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/22 00:47:35 UTC
[6/6] git commit: Updated reconciliation semantics to take the task's
unacknowledged state into account.
Updated reconciliation semantics to take the task's unacknowledged state into account.
Review: https://reviews.apache.org/r/26702
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e960cdff
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e960cdff
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e960cdff
Branch: refs/heads/master
Commit: e960cdffec20d54b4f57f552d13cd92004f8e437
Parents: 3c4e3fd
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Oct 10 19:09:35 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Oct 21 15:47:09 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 23 +++++--
src/tests/reconciliation_tests.cpp | 115 +++++++++++++++++++++++++++++++-
2 files changed, 133 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e960cdff/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9743eab..e70cdbf 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3535,11 +3535,15 @@ void Master::reconcileTasks(
}
foreachvalue (Task* task, framework->tasks) {
+ const TaskState& state = task->has_status_update_state()
+ ? task->status_update_state()
+ : task->state();
+
const StatusUpdate& update = protobuf::createStatusUpdate(
frameworkId,
task->slave_id(),
task->task_id(),
- task->state(),
+ state,
"Reconciliation: Latest task state");
VLOG(1) << "Sending implicit reconciliation state "
@@ -3593,12 +3597,16 @@ void Master::reconcileTasks(
TASK_STAGING,
"Reconciliation: Latest task state");
} else if (task != NULL) {
- // (2) Task is known: send the latest state.
+ // (2) Task is known: send the latest status update state.
+ const TaskState& state = task->has_status_update_state()
+ ? task->status_update_state()
+ : task->state();
+
update = protobuf::createStatusUpdate(
frameworkId,
task->slave_id(),
task->task_id(),
- task->state(),
+ state,
"Reconciliation: Latest task state");
} else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
// (3) Task is unknown, slave is registered: TASK_LOST.
@@ -3948,10 +3956,17 @@ void Master::reconcile(
: ": sending TASK_LOST");
if (slave->version.isSome()) {
+ // NOTE: Currently the slave doesn't look at the task state
+ // when it reconciles the task state; we include the correct
+ // state for correctness and consistency.
+ const TaskState& state = task->has_status_update_state()
+ ? task->status_update_state()
+ : task->state();
+
TaskStatus* status = reconcile.add_statuses();
status->mutable_task_id()->CopyFrom(task->task_id());
status->mutable_slave_id()->CopyFrom(slave->id);
- status->set_state(task->state());
+ status->set_state(state);
status->set_message("Reconciliation request");
status->set_timestamp(Clock::now().secs());
} else {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e960cdff/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 400c5c0..4ba5394 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -63,7 +63,7 @@ using testing::An;
using testing::AtMost;
using testing::DoAll;
using testing::Return;
-
+using testing::SaveArg;
class ReconciliationTest : public MesosTest {};
@@ -742,3 +742,116 @@ TEST_F(ReconciliationTest, UnacknowledgedTerminalTask)
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
+
+
+// This test verifies that when the task's latest and status update
+// states differ, master responds to reconciliation request with the
+// status update state.
+TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
+{
+ // Start a master.
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Start a slave.
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ StandaloneMasterDetector slaveDetector(master.get());
+ Try<PID<Slave> > slave = StartSlave(&exec, &slaveDetector);
+ ASSERT_SOME(slave);
+
+ // Start a scheduler.
+ MockScheduler sched;
+ StandaloneMasterDetector schedulerDetector(master.get());
+ TestingMesosSchedulerDriver driver(&sched, &schedulerDetector);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ // Signal when the first update is dropped.
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+ Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+ driver.start();
+
+ // Pause the clock to avoid status update retries.
+ Clock::pause();
+
+ // Wait until TASK_RUNNING is sent to the master.
+ AWAIT_READY(statusUpdateMessage);
+
+ // Ensure status update manager handles TASK_RUNNING update.
+ AWAIT_READY(__statusUpdate);
+
+ Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+ // Now send TASK_FINISHED update.
+ TaskStatus finishedStatus;
+ finishedStatus = statusUpdateMessage.get().update().status();
+ finishedStatus.set_state(TASK_FINISHED);
+ execDriver->sendStatusUpdate(finishedStatus);
+
+ // Ensure status update manager handles TASK_FINISHED update.
+ AWAIT_READY(__statusUpdate2);
+
+ EXPECT_CALL(sched, disconnected(&driver))
+ .WillOnce(Return());
+
+ // Simulate master failover by restarting the master.
+ this->Stop(master.get());
+ master = StartMaster();
+
+ Clock::resume();
+
+ Future<Nothing> registered;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered));
+
+ // Re-register the framework.
+ schedulerDetector.appoint(master.get());
+
+ AWAIT_READY(registered);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get());
+
+ // Drop all updates to the second master.
+ DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+
+ // Re-register the slave.
+ slaveDetector.appoint(master.get());
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // Framework should receive a TASK_RUNNING update, since that is the
+ // latest status update state of the task.
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ // Reconcile the state of the task.
+ vector<TaskStatus> statuses;
+ driver.reconcileTasks(statuses);
+
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_RUNNING, update.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}