You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/22 00:47:33 UTC
[4/6] git commit: Updated slave to include latest task state in
update.
Updated slave to include latest task state in update.
Review: https://reviews.apache.org/r/26700
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/da669702
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/da669702
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/da669702
Branch: refs/heads/master
Commit: da669702e4c6c4050ef49d9a1e399a837a77c143
Parents: ca14f37
Author: Vinod Kone <vi...@gmail.com>
Authored: Fri Oct 10 14:48:49 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Oct 21 15:47:09 2014 -0700
----------------------------------------------------------------------
src/messages/messages.proto | 7 +++
src/slave/slave.cpp | 36 ++++++++----
src/slave/slave.hpp | 7 ++-
src/slave/status_update_manager.cpp | 8 +--
src/slave/status_update_manager.hpp | 2 +-
src/tests/status_update_manager_tests.cpp | 80 ++++++++++++++++++++++++++
6 files changed, 122 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 196d1d4..6e49fe7 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -73,6 +73,13 @@ message StatusUpdate {
required TaskStatus status = 4;
required double timestamp = 5;
required bytes uuid = 6;
+
+ // This corresponds to the latest state of the task according to the
+ // slave. Note that this state might be different than the state in
+ // 'status' because status update manager queues updates. In other
+ // words, 'status' corresponds to the update at top of the queue and
+ // 'latest_state' corresponds to the update at bottom of the queue.
+ optional TaskState latest_state = 7;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a5761ed..55e5264 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2268,6 +2268,16 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid)
stats.validStatusUpdates++;
metrics.valid_status_updates++;
+ // We set the latest state of the task here so that the slave can
+ // inform the master about the latest state (via status update or
+ // ReregisterSlaveMessage message) as soon as possible. Master can
+ // use this information, for example, to release resources as soon
+ // as the latest state of the task reaches a terminal state. This
+ // is important because status update manager queues updates and
+ // only sends one update per task at a time; the next update for a
+ // task is sent only after the acknowledgement for the previous one
+ // is received, which could take a long time if the framework is
+ // backed up or is down.
executor->updateTaskState(status);
// Handle the task appropriately if it is terminated.
@@ -2372,7 +2382,7 @@ void Slave::__statusUpdate(
// NOTE: An acknowledgement for this update might have already been
// processed by the slave but not the status update manager.
-void Slave::forward(const StatusUpdate& update)
+void Slave::forward(StatusUpdate update)
{
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
@@ -2385,7 +2395,8 @@ void Slave::forward(const StatusUpdate& update)
return;
}
- // Update the status update state of the task.
+ // Update the status update state of the task and include the latest
+ // state of the task in the status update.
Framework* framework = getFramework(update.framework_id());
if (framework != NULL) {
const TaskID& taskId = update.status().task_id();
@@ -2402,17 +2413,22 @@ void Slave::forward(const StatusUpdate& update)
task = executor->terminatedTasks[taskId];
}
- // We set the status update state of the task here because in
- // steady state master updates the status update state of the
- // task when it receives this update. If the master fails over,
- // slave re-registers with this task with this status update
- // state. Note that an acknowledgement for this update might be
- // enqueued on status update manager when we are here. But that
- // is ok because the status update state will be updated when
- // the next update is forwarded to the slave.
if (task != NULL) {
+ // We set the status update state of the task here because in
+ // steady state master updates the status update state of the
+ // task when it receives this update. If the master fails over,
+ // slave re-registers with this task in this status update
+ // state. Note that an acknowledgement for this update might
+ // be enqueued on status update manager when we are here. But
+ // that is ok because the status update state will be updated
+ // when the next update is forwarded to the slave.
task->set_status_update_state(update.status().state());
task->set_status_update_uuid(update.uuid());
+
+ // Include the latest state of task in the update. See the
+ // comments in 'statusUpdate()' on why informing the master
+ // about the latest state of the task is important.
+ update.set_latest_state(task->state());
}
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 439052e..eb5de73 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -197,9 +197,10 @@ public:
const StatusUpdate& update,
const process::UPID& pid);
- // This is called by status update manager to forward a
- // status update to the master.
- void forward(const StatusUpdate& update);
+ // This is called by status update manager to forward a status
+ // update to the master. Note that the latest state of the task is
+ // added to the update before forwarding.
+ void forward(StatusUpdate update);
void statusUpdateAcknowledgement(
const process::UPID& from,
http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 9bdbf5e..fab8c22 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -71,7 +71,7 @@ public:
using process::ProcessBase::initialize;
// StatusUpdateManager implementation.
- void initialize(const function<void(const StatusUpdate&)>& forward);
+ void initialize(const function<void(StatusUpdate)>& forward);
Future<Nothing> update(
const StatusUpdate& update,
@@ -138,7 +138,7 @@ private:
const Flags flags;
bool paused;
- function<void(const StatusUpdate&)> forward_;
+ function<void(StatusUpdate)> forward_;
hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams;
};
@@ -160,7 +160,7 @@ StatusUpdateManagerProcess::~StatusUpdateManagerProcess()
void StatusUpdateManagerProcess::initialize(
- const function<void(const StatusUpdate&)>& forward)
+ const function<void(StatusUpdate)>& forward)
{
forward_ = forward;
}
@@ -559,7 +559,7 @@ StatusUpdateManager::~StatusUpdateManager()
void StatusUpdateManager::initialize(
- const function<void(const StatusUpdate&)>& forward)
+ const function<void(StatusUpdate)>& forward)
{
dispatch(process, &StatusUpdateManagerProcess::initialize, forward);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index 2852884..1c8a54e 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -78,7 +78,7 @@ public:
// Expects a callback 'forward' which gets called whenever there is
// a new status update that needs to be forwarded to the master.
- void initialize(const lambda::function<void(const StatusUpdate&)>& forward);
+ void initialize(const lambda::function<void(StatusUpdate)>& forward);
// TODO(vinod): Come up with better names/signatures for the
// checkpointing and non-checkpointing 'update()' functions.
http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index e9ef1e2..b0b1316 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -782,3 +782,83 @@ TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
Shutdown();
}
+
+
+// This test verifies that the status update manager correctly includes
+// the latest state of the task in status update.
+TEST_F(StatusUpdateManagerTest, LatestTaskState)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ Try<PID<Slave> > slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ // Signal when the first update is dropped.
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+ Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+ driver.start();
+
+ // Wait until TASK_RUNNING is sent to the master.
+ AWAIT_READY(statusUpdateMessage);
+
+ // Ensure the status update manager handles the TASK_RUNNING update.
+ AWAIT_READY(__statusUpdate);
+
+ // Pause the clock to avoid status update manager from retrying.
+ Clock::pause();
+
+ Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+ // Now send TASK_FINISHED update.
+ TaskStatus finishedStatus;
+ finishedStatus = statusUpdateMessage.get().update().status();
+ finishedStatus.set_state(TASK_FINISHED);
+ execDriver->sendStatusUpdate(finishedStatus);
+
+ // Ensure the status update manager handles the TASK_FINISHED update.
+ AWAIT_READY(__statusUpdate2);
+
+ // Signal when the second update is dropped.
+ Future<StatusUpdateMessage> statusUpdateMessage2 =
+ DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+ // Advance the clock for the status update manager to send a retry.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+ AWAIT_READY(statusUpdateMessage2);
+
+ // The update should correspond to TASK_RUNNING.
+ ASSERT_EQ(TASK_RUNNING, statusUpdateMessage2.get().update().status().state());
+
+ // The update should include TASK_FINISHED as the latest state.
+ ASSERT_EQ(TASK_FINISHED,
+ statusUpdateMessage2.get().update().latest_state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}