You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/22 00:47:33 UTC

[4/6] git commit: Updated slave to include latest task state in update.

Updated slave to include latest task state in update.

Review: https://reviews.apache.org/r/26700


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/da669702
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/da669702
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/da669702

Branch: refs/heads/master
Commit: da669702e4c6c4050ef49d9a1e399a837a77c143
Parents: ca14f37
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Oct 10 14:48:49 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Oct 21 15:47:09 2014 -0700

----------------------------------------------------------------------
 src/messages/messages.proto               |  7 +++
 src/slave/slave.cpp                       | 36 ++++++++----
 src/slave/slave.hpp                       |  7 ++-
 src/slave/status_update_manager.cpp       |  8 +--
 src/slave/status_update_manager.hpp       |  2 +-
 src/tests/status_update_manager_tests.cpp | 80 ++++++++++++++++++++++++++
 6 files changed, 122 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 196d1d4..6e49fe7 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -73,6 +73,13 @@ message StatusUpdate {
   required TaskStatus status = 4;
   required double timestamp = 5;
   required bytes uuid = 6;
+
+  // This corresponds to the latest state of the task according to the
+  // slave. Note that this state might be different than the state in
+  // 'status' because status update manager queues updates. In other
+  // words, 'status' corresponds to the update at top of the queue and
+  // 'latest_state' corresponds to the update at bottom of the queue.
+  optional TaskState latest_state = 7;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a5761ed..55e5264 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2268,6 +2268,16 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
   stats.validStatusUpdates++;
   metrics.valid_status_updates++;
 
+  // We set the latest state of the task here so that the slave can
+  // inform the master about the latest state (via status update or
+  // ReregisterSlaveMessage message) as soon as possible. Master can
+  // use this information, for example, to release resources as soon
+  // as the latest state of the task reaches a terminal state. This
+  // is important because status update manager queues updates and
+  // only sends one update per task at a time; the next update for a
+  // task is sent only after the acknowledgement for the previous one
+  // is received, which could take a long time if the framework is
+  // backed up or is down.
   executor->updateTaskState(status);
 
   // Handle the task appropriately if it is terminated.
@@ -2372,7 +2382,7 @@ void Slave::__statusUpdate(
 
 // NOTE: An acknowledgement for this update might have already been
 // processed by the slave but not the status update manager.
-void Slave::forward(const StatusUpdate& update)
+void Slave::forward(StatusUpdate update)
 {
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
@@ -2385,7 +2395,8 @@ void Slave::forward(const StatusUpdate& update)
     return;
   }
 
-  // Update the status update state of the task.
+  // Update the status update state of the task and include the latest
+  // state of the task in the status update.
   Framework* framework = getFramework(update.framework_id());
   if (framework != NULL) {
     const TaskID& taskId = update.status().task_id();
@@ -2402,17 +2413,22 @@ void Slave::forward(const StatusUpdate& update)
         task = executor->terminatedTasks[taskId];
       }
 
-      // We set the status update state of the task here because in
-      // steady state master updates the status update state of the
-      // task when it receives this update. If the master fails over,
-      // slave re-registers with this task with this status update
-      // state. Note that an acknowledgement for this update might be
-      // enqueued on status update manager when we are here. But that
-      // is ok because the status update state will be updated when
-      // the next update is forwarded to the slave.
       if (task != NULL) {
+        // We set the status update state of the task here because in
+        // steady state master updates the status update state of the
+        // task when it receives this update. If the master fails over,
+        // slave re-registers with this task in this status update
+        // state. Note that an acknowledgement for this update might
+        // be enqueued on status update manager when we are here. But
+        // that is ok because the status update state will be updated
+        // when the next update is forwarded to the slave.
         task->set_status_update_state(update.status().state());
         task->set_status_update_uuid(update.uuid());
+
+        // Include the latest state of task in the update. See the
+        // comments in 'statusUpdate()' on why informing the master
+        // about the latest state of the task is important.
+        update.set_latest_state(task->state());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 439052e..eb5de73 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -197,9 +197,10 @@ public:
       const StatusUpdate& update,
       const process::UPID& pid);
 
-  // This is called by status update manager to forward a
-  // status update to the master.
-  void forward(const StatusUpdate& update);
+  // This is called by status update manager to forward a status
+  // update to the master. Note that the latest state of the task is
+  // added to the update before forwarding.
+  void forward(StatusUpdate update);
 
   void statusUpdateAcknowledgement(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 9bdbf5e..fab8c22 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -71,7 +71,7 @@ public:
   using process::ProcessBase::initialize;
 
   // StatusUpdateManager implementation.
-  void initialize(const function<void(const StatusUpdate&)>& forward);
+  void initialize(const function<void(StatusUpdate)>& forward);
 
   Future<Nothing> update(
       const StatusUpdate& update,
@@ -138,7 +138,7 @@ private:
   const Flags flags;
   bool paused;
 
-  function<void(const StatusUpdate&)> forward_;
+  function<void(StatusUpdate)> forward_;
 
   hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams;
 };
@@ -160,7 +160,7 @@ StatusUpdateManagerProcess::~StatusUpdateManagerProcess()
 
 
 void StatusUpdateManagerProcess::initialize(
-    const function<void(const StatusUpdate&)>& forward)
+    const function<void(StatusUpdate)>& forward)
 {
   forward_ = forward;
 }
@@ -559,7 +559,7 @@ StatusUpdateManager::~StatusUpdateManager()
 
 
 void StatusUpdateManager::initialize(
-    const function<void(const StatusUpdate&)>& forward)
+    const function<void(StatusUpdate)>& forward)
 {
   dispatch(process, &StatusUpdateManagerProcess::initialize, forward);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index 2852884..1c8a54e 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -78,7 +78,7 @@ public:
 
   // Expects a callback 'forward' which gets called whenever there is
   // a new status update that needs to be forwarded to the master.
-  void initialize(const lambda::function<void(const StatusUpdate&)>& forward);
+  void initialize(const lambda::function<void(StatusUpdate)>& forward);
 
   // TODO(vinod): Come up with better names/signatures for the
   // checkpointing and non-checkpointing 'update()' functions.

http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index e9ef1e2..b0b1316 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -782,3 +782,83 @@ TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
 
   Shutdown();
 }
+
+
+// This test verifies that the status update manager correctly includes
+// the latest state of the task in status update.
+TEST_F(StatusUpdateManagerTest, LatestTaskState)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Signal when the first update is dropped.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+  driver.start();
+
+  // Wait until TASK_RUNNING is sent to the master.
+  AWAIT_READY(statusUpdateMessage);
+
+  // Ensure the status update manager handles the TASK_RUNNING update.
+  AWAIT_READY(__statusUpdate);
+
+  // Pause the clock to avoid status update manager from retrying.
+  Clock::pause();
+
+  Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+  // Now send TASK_FINISHED update.
+  TaskStatus finishedStatus;
+  finishedStatus = statusUpdateMessage.get().update().status();
+  finishedStatus.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(finishedStatus);
+
+  // Ensure the status update manager handles the TASK_FINISHED update.
+  AWAIT_READY(__statusUpdate2);
+
+  // Signal when the second update is dropped.
+  Future<StatusUpdateMessage> statusUpdateMessage2 =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  // Advance the clock for the status update manager to send a retry.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(statusUpdateMessage2);
+
+  // The update should correspond to TASK_RUNNING.
+  ASSERT_EQ(TASK_RUNNING, statusUpdateMessage2.get().update().status().state());
+
+  // The update should include TASK_FINISHED as the latest state.
+  ASSERT_EQ(TASK_FINISHED,
+            statusUpdateMessage2.get().update().latest_state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}