You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/10/16 19:26:16 UTC
mesos git commit: Only update the task status when its old status is
not terminal.
Repository: mesos
Updated Branches:
refs/heads/master b1f51b72f -> 0577040b4
Only update the task status when its old status is not terminal.
Review: https://reviews.apache.org/r/38051
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0577040b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0577040b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0577040b
Branch: refs/heads/master
Commit: 0577040b48ab8f3906a99e22bda23391a91d4f03
Parents: b1f51b7
Author: Yong Qiao Wang <yq...@cn.ibm.com>
Authored: Fri Oct 16 10:25:52 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 16 10:25:52 2015 -0700
----------------------------------------------------------------------
src/master/master.cpp | 11 ++-
src/tests/status_update_manager_tests.cpp | 108 +++++++++++++++++++++++++
2 files changed, 113 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0577040b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ba12a83..2cc8147 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6025,12 +6025,11 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
// Get the unacknowledged status.
const TaskStatus& status = update.status();
- // Out-of-order updates should not occur, however in case they
- // do (e.g., due to bugs), prevent them here to ensure that the
- // resource accounting is not affected.
- if (protobuf::isTerminalState(task->state()) &&
- !protobuf::isTerminalState(status.state())) {
- LOG(ERROR) << "Ignoring out of order status update for task "
+ // Once a task's state has been transitioned to terminal state, no further
+ // terminal updates should result in a state change. These are the same
+ // semantics that are enforced by the slave.
+ if (protobuf::isTerminalState(task->state())) {
+ LOG(ERROR) << "Ignoring status update for the terminated task "
<< task->task_id()
<< " (" << task->state() << " -> " << status.state() << ")"
<< " of framework " << task->framework_id();
http://git-wip-us.apache.org/repos/asf/mesos/blob/0577040b/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 9970d71..b413b15 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -842,6 +842,114 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState)
Shutdown();
}
+
+// This test verifies that if master receives a status update
+// for an already terminated task it forwards it without
+// changing the state of the task.
+TEST_F(StatusUpdateManagerTest, DuplicatedTerminalStatusUpdate)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave>> slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ // Send a terminal update right away.
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<Nothing> _statusUpdateAcknowledgement =
+ FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+ driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_FINISHED, status.get().state());
+
+ AWAIT_READY(_statusUpdateAcknowledgement);
+
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
+ FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+ Clock::pause();
+
+ // Now send a TASK_KILLED update for the same task.
+ TaskStatus status2 = status.get();
+ status2.set_state(TASK_KILLED);
+ execDriver->sendStatusUpdate(status2);
+
+ // Ensure the scheduler receives TASK_KILLED.
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_KILLED, update.get().state());
+
+ // Ensure the slave properly handles the ACK.
+ // Clock::settle() ensures that the slave successfully
+ // executes Slave::_statusUpdateAcknowledgement().
+ AWAIT_READY(_statusUpdateAcknowledgement2);
+
+ // Verify the latest task status.
+ Future<process::http::Response> tasks = process::http::get(master.get(), "tasks");
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, tasks);
+
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ(
+ "application/json",
+ "Content-Type",
+ tasks);
+
+ Try<JSON::Object> parse = JSON::parse<JSON::Object>(tasks.get().body);
+ ASSERT_SOME(parse);
+
+ Result<JSON::String> state = parse.get().find<JSON::String>("tasks[0].state");
+
+ ASSERT_SOME_EQ(JSON::String("TASK_FINISHED"), state);
+
+ Clock::resume();
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {