You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/10/16 19:26:16 UTC

mesos git commit: Only update the task status when its old status is not terminal.

Repository: mesos
Updated Branches:
  refs/heads/master b1f51b72f -> 0577040b4


Only update the task status when its old status is not terminal.

Review: https://reviews.apache.org/r/38051


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0577040b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0577040b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0577040b

Branch: refs/heads/master
Commit: 0577040b48ab8f3906a99e22bda23391a91d4f03
Parents: b1f51b7
Author: Yong Qiao Wang <yq...@cn.ibm.com>
Authored: Fri Oct 16 10:25:52 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 16 10:25:52 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp                     |  11 ++-
 src/tests/status_update_manager_tests.cpp | 108 +++++++++++++++++++++++++
 2 files changed, 113 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0577040b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ba12a83..2cc8147 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6025,12 +6025,11 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
   // Get the unacknowledged status.
   const TaskStatus& status = update.status();
 
-  // Out-of-order updates should not occur, however in case they
-  // do (e.g., due to bugs), prevent them here to ensure that the
-  // resource accounting is not affected.
-  if (protobuf::isTerminalState(task->state()) &&
-      !protobuf::isTerminalState(status.state())) {
-    LOG(ERROR) << "Ignoring out of order status update for task "
+  // Once a task's state has been transitioned to terminal state, no further
+  // terminal updates should result in a state change. These are the same
+  // semantics that are enforced by the slave.
+  if (protobuf::isTerminalState(task->state())) {
+    LOG(ERROR) << "Ignoring status update for the terminated task "
                << task->task_id()
                << " (" << task->state() << " -> " << status.state() << ")"
                << " of framework " << task->framework_id();

http://git-wip-us.apache.org/repos/asf/mesos/blob/0577040b/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 9970d71..b413b15 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -842,6 +842,114 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState)
   Shutdown();
 }
 
+
+// This test verifies that if master receives a status update
+// for an already terminated task it forwards it without
+// changing the state of the task.
+TEST_F(StatusUpdateManagerTest, DuplicatedTerminalStatusUpdate)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+  Clock::pause();
+
+  // Now send a TASK_KILLED update for the same task.
+  TaskStatus status2 = status.get();
+  status2.set_state(TASK_KILLED);
+  execDriver->sendStatusUpdate(status2);
+
+  // Ensure the scheduler receives TASK_KILLED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_KILLED, update.get().state());
+
+  // Ensure the slave properly handles the ACK.
+  // Clock::settle() ensures that the slave successfully
+  // executes Slave::_statusUpdateAcknowledgement().
+  AWAIT_READY(_statusUpdateAcknowledgement2);
+
+  // Verify the latest task status.
+  Future<process::http::Response> tasks = process::http::get(master.get(), "tasks");
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, tasks);
+
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ(
+      "application/json",
+      "Content-Type",
+      tasks);
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(tasks.get().body);
+  ASSERT_SOME(parse);
+
+  Result<JSON::String> state = parse.get().find<JSON::String>("tasks[0].state");
+
+  ASSERT_SOME_EQ(JSON::String("TASK_FINISHED"), state);
+
+  Clock::resume();
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {