You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/22 00:47:35 UTC

[6/6] git commit: Updated reconciliation semantics to take the task's unacknowledged state into account.

Updated reconciliation semantics to take the task's unacknowledged state into account.

Review: https://reviews.apache.org/r/26702


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e960cdff
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e960cdff
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e960cdff

Branch: refs/heads/master
Commit: e960cdffec20d54b4f57f552d13cd92004f8e437
Parents: 3c4e3fd
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Oct 10 19:09:35 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Oct 21 15:47:09 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp              |  23 +++++--
 src/tests/reconciliation_tests.cpp | 115 +++++++++++++++++++++++++++++++-
 2 files changed, 133 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e960cdff/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9743eab..e70cdbf 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3535,11 +3535,15 @@ void Master::reconcileTasks(
     }
 
     foreachvalue (Task* task, framework->tasks) {
+      const TaskState& state = task->has_status_update_state()
+          ? task->status_update_state()
+          : task->state();
+
       const StatusUpdate& update = protobuf::createStatusUpdate(
           frameworkId,
           task->slave_id(),
           task->task_id(),
-          task->state(),
+          state,
           "Reconciliation: Latest task state");
 
       VLOG(1) << "Sending implicit reconciliation state "
@@ -3593,12 +3597,16 @@ void Master::reconcileTasks(
           TASK_STAGING,
           "Reconciliation: Latest task state");
     } else if (task != NULL) {
-      // (2) Task is known: send the latest state.
+      // (2) Task is known: send the latest status update state.
+      const TaskState& state = task->has_status_update_state()
+          ? task->status_update_state()
+          : task->state();
+
       update = protobuf::createStatusUpdate(
           frameworkId,
           task->slave_id(),
           task->task_id(),
-          task->state(),
+          state,
           "Reconciliation: Latest task state");
     } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) {
       // (3) Task is unknown, slave is registered: TASK_LOST.
@@ -3948,10 +3956,17 @@ void Master::reconcile(
                          : ": sending TASK_LOST");
 
         if (slave->version.isSome()) {
+          // NOTE: Currently the slave doesn't look at the task state
+          // when it reconciles the task state; we include the correct
+          // state for correctness and consistency.
+          const TaskState& state = task->has_status_update_state()
+              ? task->status_update_state()
+              : task->state();
+
           TaskStatus* status = reconcile.add_statuses();
           status->mutable_task_id()->CopyFrom(task->task_id());
           status->mutable_slave_id()->CopyFrom(slave->id);
-          status->set_state(task->state());
+          status->set_state(state);
           status->set_message("Reconciliation request");
           status->set_timestamp(Clock::now().secs());
         } else {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e960cdff/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 400c5c0..4ba5394 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -63,7 +63,7 @@ using testing::An;
 using testing::AtMost;
 using testing::DoAll;
 using testing::Return;
-
+using testing::SaveArg;
 
 class ReconciliationTest : public MesosTest {};
 
@@ -742,3 +742,116 @@ TEST_F(ReconciliationTest, UnacknowledgedTerminalTask)
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
+
+
+// This test verifies that when the task's latest and status update
+// states differ, master responds to reconciliation request with the
+// status update state.
+TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
+{
+  // Start a master.
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Start a slave.
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  StandaloneMasterDetector slaveDetector(master.get());
+  Try<PID<Slave> > slave = StartSlave(&exec, &slaveDetector);
+  ASSERT_SOME(slave);
+
+  // Start a scheduler.
+  MockScheduler sched;
+  StandaloneMasterDetector schedulerDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &schedulerDetector);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Signal when the first update is dropped.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+  driver.start();
+
+  // Pause the clock to avoid status update retries.
+  Clock::pause();
+
+  // Wait until TASK_RUNNING is sent to the master.
+  AWAIT_READY(statusUpdateMessage);
+
+  // Ensure status update manager handles TASK_RUNNING update.
+  AWAIT_READY(__statusUpdate);
+
+  Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+  // Now send TASK_FINISHED update.
+  TaskStatus finishedStatus;
+  finishedStatus = statusUpdateMessage.get().update().status();
+  finishedStatus.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(finishedStatus);
+
+  // Ensure status update manager handles TASK_FINISHED update.
+  AWAIT_READY(__statusUpdate2);
+
+  EXPECT_CALL(sched, disconnected(&driver))
+    .WillOnce(Return());
+
+  // Simulate master failover by restarting the master.
+  this->Stop(master.get());
+  master = StartMaster();
+
+  Clock::resume();
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  // Re-register the framework.
+  schedulerDetector.appoint(master.get());
+
+  AWAIT_READY(registered);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get());
+
+  // Drop all updates to the second master.
+  DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+
+  // Re-register the slave.
+  slaveDetector.appoint(master.get());
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Framework should receive a TASK_RUNNING update, since that is the
+  // latest status update state of the task.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  // Reconcile the state of the task.
+  vector<TaskStatus> statuses;
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_RUNNING, update.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}