You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2015/03/18 00:01:05 UTC

mesos git commit: Ensured TaskStatus::source field is set for executor status updates.

Repository: mesos
Updated Branches:
  refs/heads/master b6d2f2def -> 51656c8cd


Ensured TaskStatus::source field is set for executor status updates.

A status update originating from executor should have the
TaskStatus::source field set to TaskStatus::SOURCE_EXECUTOR. Set this
field in the slave to be future proof (a future where there will be no
executor driver). Previous code has a bug and updated a copy of the
update that was not forwarded. Add some checks for source and reason for
status updates in existing tests.

Review: https://reviews.apache.org/r/32130


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51656c8c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51656c8c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51656c8c

Branch: refs/heads/master
Commit: 51656c8cdf9c2b58ad4c3381e4131dfa269d46ab
Parents: b6d2f2d
Author: Alexander Rukletsov <ru...@gmail.com>
Authored: Tue Mar 17 15:38:49 2015 -0700
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Tue Mar 17 15:38:49 2015 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 10 ++++++----
 src/slave/slave.hpp       |  4 +++-
 src/tests/slave_tests.cpp | 39 +++++++++++++++++++++++++++++++--------
 3 files changed, 40 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0f99e4e..f1f2100 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2479,7 +2479,7 @@ void Slave::reregisterExecutorTimeout()
 // reliable delivery of status updates. Since executor driver caches
 // unacked updates it is important that whoever sent the update gets
 // acknowledgement for it.
-void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
+void Slave::statusUpdate(StatusUpdate update, const UPID& pid)
 {
   LOG(INFO) << "Handling status update " << update << " from " << pid;
 
@@ -2487,9 +2487,9 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
         state == RUNNING || state == TERMINATING)
     << state;
 
-  TaskStatus status = update.status();
-  status.set_source(pid == UPID() ? TaskStatus::SOURCE_SLAVE
-                                  : TaskStatus::SOURCE_EXECUTOR);
+  // Set the source before forwarding the status update.
+  update.mutable_status()->set_source(
+      pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR);
 
   Framework* framework = getFramework(update.framework_id());
   if (framework == NULL) {
@@ -2512,6 +2512,8 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
     return;
   }
 
+  TaskStatus status = update.status();
+
   Executor* executor = framework->getExecutor(status.task_id());
   if (executor == NULL) {
     LOG(WARNING)  << "Could not find the executor for "

http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 989832f..19e6b44 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -181,7 +181,9 @@ public:
   // after the update is successfully handled. If pid == UPID()
   // no ACK is sent. The latter is used by the slave to send
   // status updates it generated (e.g., TASK_LOST).
-  void statusUpdate(const StatusUpdate& update, const process::UPID& pid);
+  // NOTE: StatusUpdate is passed by value because it is modified
+  // to ensure source field is set.
+  void statusUpdate(StatusUpdate update, const process::UPID& pid);
 
   // Continue handling the status update after optionally updating the
   // container's resources.

http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index a975305..fd09d65 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -176,6 +176,7 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
 
   AWAIT_READY(status);
   ASSERT_EQ(TASK_FAILED, status.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
 
   Clock::resume();
 
@@ -247,6 +248,8 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
 
   // We use 'gc.schedule' as a signal for the executor being cleaned
   // up by the slave.
@@ -348,10 +351,10 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
   // Expect two status updates, one for once the mesos-executor says
   // the task is running and one for after our overridden command
   // above finishes.
-  Future<TaskStatus> status1, status2;
+  Future<TaskStatus> statusRunning, statusFinished;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status1))
-    .WillOnce(FutureArg<1>(&status2));
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished));
 
   Try<Subprocess> executor =
     subprocess(
@@ -363,12 +366,15 @@ TEST_F(SlaveTest, CommandExecutorWithOverride)
 
   ASSERT_SOME(executor);
 
-  // Scheduler should receive the TASK_RUNNING update.
-  AWAIT_READY(status1);
-  ASSERT_EQ(TASK_RUNNING, status1.get().state());
+  // Scheduler should first receive TASK_RUNNING followed by the
+  // TASK_FINISHED from the executor.
+  AWAIT_READY(statusRunning);
+  ASSERT_EQ(TASK_RUNNING, statusRunning.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
 
-  AWAIT_READY(status2);
-  ASSERT_EQ(TASK_FINISHED, status2.get().state());
+  AWAIT_READY(statusFinished);
+  ASSERT_EQ(TASK_FINISHED, statusFinished.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
 
   AWAIT_READY(wait);
 
@@ -461,11 +467,15 @@ TEST_F(SlaveTest, ComamndTaskWithArguments)
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
+  // Scheduler should first receive TASK_RUNNING followed by the
+  // TASK_FINISHED from the executor.
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
 
   AWAIT_READY(statusFinished);
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
 
   driver.stop();
   driver.join();
@@ -592,11 +602,15 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser)
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
+  // Scheduler should first receive TASK_RUNNING followed by the
+  // TASK_FINISHED from the executor.
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
 
   AWAIT_READY(statusFinished);
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
 
   driver.stop();
   driver.join();
@@ -684,11 +698,15 @@ TEST_F(SlaveTest, DISABLED_ROOT_RunTaskWithCommandInfoWithUser)
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
+  // Scheduler should first receive TASK_RUNNING followed by the
+  // TASK_FINISHED from the executor.
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source());
 
   AWAIT_READY(statusFinished);
   EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source());
 
   driver.stop();
   driver.join();
@@ -1092,9 +1110,12 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
 
   AWAIT_READY(status3);
   EXPECT_EQ(TASK_KILLED, status3.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source());
 
   AWAIT_READY(status4);
   EXPECT_EQ(TASK_LOST, status4.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4.get().source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status4.get().reason());
 
   driver.stop();
   driver.join();
@@ -1204,6 +1225,8 @@ TEST_F(SlaveTest, TaskLaunchContainerizerUpdateFails)
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source());
+  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason());
 
   driver.stop();
   driver.join();