You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/03/18 00:01:05 UTC
mesos git commit: Ensured TaskStatus::source field is set for
executor status updates.
Repository: mesos
Updated Branches:
refs/heads/master b6d2f2def -> 51656c8cd
Ensured TaskStatus::source field is set for executor status updates.
A status update originating from executor should have the
TaskStatus::source field set to TaskStatus::SOURCE_EXECUTOR. Set this
field in the slave to be future proof (a future where there will be no
executor driver). Previous code has a bug and updated a copy of the
update that was not forwarded. Add some checks for source and reason for
status updates in existing tests.
Review: https://reviews.apache.org/r/32130
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51656c8c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51656c8c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51656c8c
Branch: refs/heads/master
Commit: 51656c8cdf9c2b58ad4c3381e4131dfa269d46ab
Parents: b6d2f2d
Author: Alexander Rukletsov <ru...@gmail.com>
Authored: Tue Mar 17 15:38:49 2015 -0700
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Tue Mar 17 15:38:49 2015 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 10 ++++++----
src/slave/slave.hpp | 4 +++-
src/tests/slave_tests.cpp | 39 +++++++++++++++++++++++++++++++--------
3 files changed, 40 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0f99e4e..f1f2100 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2479,7 +2479,7 @@ void Slave::reregisterExecutorTimeout()
// reliable delivery of status updates. Since executor driver caches
// unacked updates it is important that whoever sent the update gets
// acknowledgement for it.
-void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
+void Slave::statusUpdate(StatusUpdate update, const UPID& pid)
{
LOG(INFO) << "Handling status update " << update << " from " << pid;
@@ -2487,9 +2487,9 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
state == RUNNING || state == TERMINATING)
<< state;
- TaskStatus status = update.status();
- status.set_source(pid == UPID() ? TaskStatus::SOURCE_SLAVE
- : TaskStatus::SOURCE_EXECUTOR);
+ // Set the source before forwarding the status update.
+ update.mutable_status()->set_source(
+ pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR);
Framework* framework = getFramework(update.framework_id());
if (framework == NULL) {
@@ -2512,6 +2512,8 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
return;
}
+ TaskStatus status = update.status();
+
Executor* executor = framework->getExecutor(status.task_id());
if (executor == NULL) {
LOG(WARNING) << "Could not find the executor for "
http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 989832f..19e6b44 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -181,7 +181,9 @@ public:
// after the update is successfully handled. If pid == UPID()
// no ACK is sent. The latter is used by the slave to send
// status updates it generated (e.g., TASK_LOST).
- void statusUpdate(const StatusUpdate& update, const process::UPID& pid);
+ // NOTE: StatusUpdate is passed by value because it is modified
+ // to ensure source field is set.
+ void statusUpdate(StatusUpdate update, const process::UPID& pid);
// Continue handling the status update after optionally updating the
// container's resources.
http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index a975305..fd09d65 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -176,6 +176,7 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
AWAIT_READY(status);
ASSERT_EQ(TASK_FAILED, status.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
Clock::resume();
@@ -247,6 +248,8 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
AWAIT_READY(status);
EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+ EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
// We use 'gc.schedule' as a signal for the executor being cleaned
// up by the slave.
@@ -348,10 +351,10 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
// Expect two status updates, one for once the mesos-executor says
// the task is running and one for after our overridden command
// above finishes.
- Future<TaskStatus> status1, status2;
+ Future<TaskStatus> statusRunning, statusFinished;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status1))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished));
Try<Subprocess> executor =
subprocess(
@@ -363,12 +366,15 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
ASSERT_SOME(executor);
- // Scheduler should receive the TASK_RUNNING update.
- AWAIT_READY(status1);
- ASSERT_EQ(TASK_RUNNING, status1.get().state());
+ // Scheduler should first receive TASK_RUNNING followed by the
+ // TASK_FINISHED from the executor.
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_RUNNING, statusRunning.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
- AWAIT_READY(status2);
- ASSERT_EQ(TASK_FINISHED, status2.get().state());
+ AWAIT_READY(statusFinished);
+ ASSERT_EQ(TASK_FINISHED, statusFinished.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
AWAIT_READY(wait);
@@ -461,11 +467,15 @@ TEST_F(SlaveTest, ComamndTaskWithArguments)
driver.launchTasks(offers.get()[0].id(), tasks);
+ // Scheduler should first receive TASK_RUNNING followed by the
+ // TASK_FINISHED from the executor.
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
driver.stop();
driver.join();
@@ -592,11 +602,15 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser)
driver.launchTasks(offers.get()[0].id(), tasks);
+ // Scheduler should first receive TASK_RUNNING followed by the
+ // TASK_FINISHED from the executor.
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
driver.stop();
driver.join();
@@ -684,11 +698,15 @@ TEST_F(SlaveTest, DISABLED_ROOT_RunTaskWithCommandInfoWithUser)
driver.launchTasks(offers.get()[0].id(), tasks);
+ // Scheduler should first receive TASK_RUNNING followed by the
+ // TASK_FINISHED from the executor.
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
AWAIT_READY(statusFinished);
EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
driver.stop();
driver.join();
@@ -1092,9 +1110,12 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
AWAIT_READY(status3);
EXPECT_EQ(TASK_KILLED, status3.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source());
AWAIT_READY(status4);
EXPECT_EQ(TASK_LOST, status4.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4.get().source());
+ EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status4.get().reason());
driver.stop();
driver.join();
@@ -1204,6 +1225,8 @@ TEST_F(SlaveTest, TaskLaunchContainerizerUpdateFails)
AWAIT_READY(status);
EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+ EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
driver.stop();
driver.join();