You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/03/13 07:23:09 UTC
svn commit: r1455811 - in /incubator/mesos/trunk: include/mesos/ src/
src/slave/ src/tests/
Author: vinodkone
Date: Wed Mar 13 06:23:08 2013
New Revision: 1455811
URL: http://svn.apache.org/r1455811
Log:
Integrated status update manager into the slave.
Review: https://reviews.apache.org/r/7655
Added:
incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp
Modified:
incubator/mesos/trunk/include/mesos/mesos.proto
incubator/mesos/trunk/src/Makefile.am
incubator/mesos/trunk/src/slave/gc.cpp
incubator/mesos/trunk/src/slave/paths.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/slave/status_update_manager.hpp
incubator/mesos/trunk/src/tests/master_tests.cpp
Modified: incubator/mesos/trunk/include/mesos/mesos.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos/mesos.proto?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos/mesos.proto (original)
+++ incubator/mesos/trunk/include/mesos/mesos.proto Wed Mar 13 06:23:08 2013
@@ -91,12 +91,17 @@ message ExecutorID {
* performing failover). The amount of time that the master will wait
* for the scheduler to failover before removing the framework is
* specified by failover_timeout.
+ * If checkpoint is set, framework pid, executor pids and status updates
+ * are checkpointed to disk.
+ * Checkpointing allows a restarted slave to reconnect with old executors
+ * and recover status updates, at the cost of disk I/O.
*/
message FrameworkInfo {
required string user = 1;
required string name = 2;
optional FrameworkID id = 3;
optional double failover_timeout = 4 [default = 0.0];
+ optional bool checkpoint = 5 [default = false];
}
Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Wed Mar 13 06:23:08 2013
@@ -781,10 +781,11 @@ balloon_executor_LDADD = libmesos.la
check_PROGRAMS += mesos-tests
-mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp tests/filter.cpp \
+mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp tests/filter.cpp \
tests/environment.cpp \
tests/master_tests.cpp tests/state_tests.cpp \
tests/slave_state_tests.cpp \
+ tests/status_update_manager_tests.cpp \
tests/gc_tests.cpp \
tests/resource_offers_tests.cpp \
tests/fault_tolerance_tests.cpp \
Modified: incubator/mesos/trunk/src/slave/gc.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.cpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.cpp (original)
+++ incubator/mesos/trunk/src/slave/gc.cpp Wed Mar 13 06:23:08 2013
@@ -196,7 +196,7 @@ Future<Nothing> GarbageCollector::schedu
void GarbageCollector::prune(const Duration& d)
{
- return dispatch(process, &GarbageCollectorProcess::prune, d);
+ dispatch(process, &GarbageCollectorProcess::prune, d);
}
} // namespace mesos {
Modified: incubator/mesos/trunk/src/slave/paths.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/paths.hpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/paths.hpp (original)
+++ incubator/mesos/trunk/src/slave/paths.hpp Wed Mar 13 06:23:08 2013
@@ -39,53 +39,42 @@ namespace paths {
const std::string EXECUTOR_LATEST_SYMLINK = "latest";
-// Path layout templates.
-const std::string ROOT_PATH =
- "%s";
-
-const std::string SLAVEID_PATH =
- ROOT_PATH + "/slaves/slave.id";
-
-const std::string SLAVE_PATH =
- ROOT_PATH + "/slaves/%s";
+// Helper functions to generate paths.
-const std::string FRAMEWORK_PATH =
- SLAVE_PATH + "/frameworks/%s";
+// File names.
+const std::string SLAVEID_FILE = "slave.id";
+const std::string FRAMEWORK_PID_FILE = "framework.pid";
+const std::string LIBPROCESS_PID_FILE = "libprocess.pid";
+const std::string EXECED_PID_FILE = "execed.pid";
+const std::string FORKED_PID_FILE = "forked.pid";
+const std::string TASK_INFO_FILE = "info";
+const std::string TASK_UPDATES_FILE = "updates";
+// Path layout templates.
+const std::string ROOT_PATH = "%s";
+const std::string SLAVEID_PATH = ROOT_PATH + "/slaves/" + SLAVEID_FILE;
+const std::string SLAVE_PATH = ROOT_PATH + "/slaves/%s";
+const std::string FRAMEWORK_PATH = SLAVE_PATH + "/frameworks/%s";
const std::string FRAMEWORK_PID_PATH =
- FRAMEWORK_PATH + "/framework.pid";
-
-const std::string EXECUTOR_PATH =
- FRAMEWORK_PATH + "/executors/%s";
-
-const std::string EXECUTOR_RUN_PATH =
- EXECUTOR_PATH + "/runs/%s";
-
+ FRAMEWORK_PATH + "/" + FRAMEWORK_PID_FILE;
+const std::string EXECUTOR_PATH = FRAMEWORK_PATH + "/executors/%s";
+const std::string EXECUTOR_RUN_PATH = EXECUTOR_PATH + "/runs/%s";
const std::string EXECUTOR_LATEST_RUN_PATH =
EXECUTOR_PATH + "/runs/" + EXECUTOR_LATEST_SYMLINK;
+const std::string PIDS_PATH = EXECUTOR_RUN_PATH + "/pids";
+const std::string LIBPROCESS_PID_PATH = PIDS_PATH + "/" + LIBPROCESS_PID_FILE;
+const std::string EXECED_PID_PATH = PIDS_PATH + "/" + EXECED_PID_FILE;
+const std::string FORKED_PID_PATH = PIDS_PATH + "/" + FORKED_PID_FILE;
+const std::string TASK_PATH = EXECUTOR_RUN_PATH + "/tasks/%s";
+const std::string TASK_INFO_PATH = TASK_PATH + "/" + TASK_INFO_FILE;
+const std::string TASK_UPDATES_PATH = TASK_PATH + "/" + TASK_UPDATES_FILE;
-const std::string PIDS_PATH =
- EXECUTOR_RUN_PATH + "/pids";
-const std::string LIBPROCESS_PID_PATH =
- PIDS_PATH + "/libprocess.pid";
-
-const std::string EXECED_PID_PATH =
- PIDS_PATH + "/execed.pid";
-
-const std::string FORKED_PID_PATH =
- PIDS_PATH + "/forked.pid";
-
-const std::string TASK_PATH =
- EXECUTOR_RUN_PATH + "/tasks/%s";
-
-const std::string TASK_INFO_PATH =
- TASK_PATH + "/info";
-
-const std::string TASK_UPDATES_PATH =
- TASK_PATH + "/updates";
+inline std::string getMetaRootDir(const std::string rootDir)
+{
+ return path::join(rootDir, "meta");
+}
-// Helper functions to generate paths.
inline std::string getSlaveIDPath(const std::string& rootDir)
{
@@ -93,128 +82,182 @@ inline std::string getSlaveIDPath(const
}
-inline std::string getSlavePath(const std::string& rootDir,
- const SlaveID& slaveId)
+inline std::string getSlavePath(
+ const std::string& rootDir,
+ const SlaveID& slaveId)
{
return strings::format(SLAVE_PATH, rootDir, slaveId).get();
}
-inline std::string getFrameworkPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId)
+inline std::string getFrameworkPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId)
{
- return strings::format(FRAMEWORK_PATH, rootDir, slaveId,
- frameworkId).get();
+ return strings::format(
+ FRAMEWORK_PATH, rootDir, slaveId, frameworkId).get();
}
-inline std::string getFrameworkPIDPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId)
+inline std::string getFrameworkPIDPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId)
{
- return strings::format(FRAMEWORK_PID_PATH, rootDir, slaveId,
- frameworkId).get();
+ return strings::format(
+ FRAMEWORK_PID_PATH, rootDir, slaveId, frameworkId).get();
}
-inline std::string getExecutorPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId)
+inline std::string getExecutorPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
{
- return strings::format(EXECUTOR_PATH, rootDir, slaveId, frameworkId,
- executorId).get();
+ return strings::format(
+ EXECUTOR_PATH, rootDir, slaveId, frameworkId, executorId).get();
}
-inline std::string getExecutorRunPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const UUID& executorUUID)
+inline std::string getExecutorRunPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& executorUUID)
{
- return strings::format(EXECUTOR_RUN_PATH, rootDir, slaveId, frameworkId,
- executorId, executorUUID.toString()).get();
+ return strings::format(
+ EXECUTOR_RUN_PATH,
+ rootDir,
+ slaveId,
+ frameworkId,
+ executorId,
+ executorUUID.toString()).get();
}
-inline std::string getExecutorLatestRunPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId)
+inline std::string getExecutorLatestRunPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
{
- return strings::format(EXECUTOR_LATEST_RUN_PATH, rootDir, slaveId,
- frameworkId, executorId).get();
+ return strings::format(
+ EXECUTOR_LATEST_RUN_PATH,
+ rootDir,
+ slaveId,
+ frameworkId,
+ executorId).get();
}
-inline std::string getLibprocessPIDPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const UUID& executorUUID)
+inline std::string getLibprocessPIDPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& executorUUID)
{
- return strings::format(LIBPROCESS_PID_PATH, rootDir, slaveId, frameworkId,
- executorId, executorUUID.toString()).get();
+ return strings::format(
+ LIBPROCESS_PID_PATH,
+ rootDir,
+ slaveId,
+ frameworkId,
+ executorId,
+ executorUUID.toString()).get();
}
-inline std::string getExecedPIDPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const UUID& executorUUID)
+inline std::string getExecedPIDPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& executorUUID)
{
- return strings::format(EXECED_PID_PATH, rootDir, slaveId, frameworkId,
- executorId, executorUUID.toString()).get();
+ return strings::format(
+ EXECED_PID_PATH,
+ rootDir,
+ slaveId,
+ frameworkId,
+ executorId,
+ executorUUID.toString()).get();
}
-inline std::string getForkedPIDPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const UUID& executorUUID)
+inline std::string getForkedPIDPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& executorUUID)
{
- return strings::format(FORKED_PID_PATH, rootDir, slaveId, frameworkId,
- executorId, executorUUID.toString()).get();
+ return strings::format(
+ FORKED_PID_PATH,
+ rootDir,
+ slaveId,
+ frameworkId,
+ executorId,
+ executorUUID.toString()).get();
}
-inline std::string getTaskPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const UUID& executorUUID,
- const TaskID& taskId)
+inline std::string getTaskPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& executorUUID,
+ const TaskID& taskId)
{
- return strings::format(TASK_PATH, rootDir, slaveId, frameworkId, executorId,
- executorUUID.toString(), taskId).get();
+ return strings::format(
+ TASK_PATH,
+ rootDir,
+ slaveId,
+ frameworkId,
+ executorId,
+ executorUUID.toString(),
+ taskId).get();
}
-inline std::string getTaskInfoPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const UUID& executorUUID,
- const TaskID& taskId)
+inline std::string getTaskInfoPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& executorUUID,
+ const TaskID& taskId)
{
- return strings::format(TASK_INFO_PATH, rootDir, slaveId, frameworkId,
- executorId, executorUUID.toString(), taskId).get();
+ return strings::format(
+ TASK_INFO_PATH,
+ rootDir,
+ slaveId,
+ frameworkId,
+ executorId,
+ executorUUID.toString(),
+ taskId).get();
}
-inline std::string getTaskUpdatesPath(const std::string& rootDir,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const UUID& executorUUID,
- const TaskID& taskId)
+inline std::string getTaskUpdatesPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& executorUUID,
+ const TaskID& taskId)
{
- return strings::format(TASK_UPDATES_PATH, rootDir, slaveId, frameworkId,
- executorId, executorUUID.toString(), taskId).get();
+ return strings::format(
+ TASK_UPDATES_PATH,
+ rootDir,
+ slaveId,
+ frameworkId,
+ executorId,
+ executorUUID.toString(),
+ taskId).get();
}
@@ -237,7 +280,10 @@ inline std::string createExecutorDirecto
// Remove the previous "latest" symlink.
std::string latest =
getExecutorLatestRunPath(rootDir, slaveId, frameworkId, executorId);
- os::rm(latest);
+
+ if (os::exists(latest)) {
+ CHECK_SOME(os::rm(latest)) << "Failed to remove latest symlink " << latest;
+ }
// Symlink the new executor directory to "latest".
Try<Nothing> symlink = fs::symlink(directory, latest);
@@ -248,7 +294,6 @@ inline std::string createExecutorDirecto
return directory;
}
-
} // namespace paths {
} // namespace slave {
} // namespace internal {
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Mar 13 06:23:08 2013
@@ -49,6 +49,7 @@
#include "slave/flags.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
+#include "slave/status_update_manager.hpp"
namespace params = std::tr1::placeholders;
@@ -75,7 +76,8 @@ Slave::Slave(const Resources& _resources
completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
isolationModule(_isolationModule),
files(_files),
- monitor(isolationModule) {}
+ monitor(isolationModule),
+ statusUpdateManager(new StatusUpdateManager()) {}
Slave::Slave(const flags::Flags<logging::Flags, slave::Flags>& _flags,
@@ -88,7 +90,8 @@ Slave::Slave(const flags::Flags<logging:
completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
isolationModule(_isolationModule),
files(_files),
- monitor(isolationModule)
+ monitor(isolationModule),
+ statusUpdateManager(new StatusUpdateManager())
{
// TODO(benh): Move this computation into Flags as the "default".
@@ -184,6 +187,8 @@ Slave::~Slave()
foreachvalue (Framework* framework, frameworks) {
delete framework;
}
+
+ delete statusUpdateManager;
}
@@ -227,6 +232,8 @@ void Slave::initialize()
local,
self());
+ statusUpdateManager->initialize(self());
+
// Start disk monitoring.
// NOTE: We send a delayed message here instead of directly calling
// checkDiskUsage, to make disabling this feature easy (e.g by specifying
@@ -353,6 +360,7 @@ void Slave::finalize()
// immediately). Of course, this still isn't sufficient
// because those status updates might get lost and we won't
// resend them unless we build that into the system.
+ // TODO(vinod): Kill this shutdown when slave recovery is in place.
shutdownFramework(frameworkId);
}
@@ -406,6 +414,9 @@ void Slave::newMasterDetected(const UPID
connected = false;
doReliableRegistration();
+
+ // Inform the status updates manager about the new master.
+ statusUpdateManager->newMasterDetected(master);
}
@@ -532,16 +543,10 @@ void Slave::runTask(
<< " with executor '" << executorId
<< "' which is being shut down";
- StatusUpdateMessage message;
- StatusUpdate* update = message.mutable_update();
- update->mutable_framework_id()->MergeFrom(frameworkId);
- update->mutable_slave_id()->MergeFrom(id);
- TaskStatus* status = update->mutable_status();
- status->mutable_task_id()->MergeFrom(task.task_id());
- status->set_state(TASK_LOST);
- update->set_timestamp(Clock::now());
- update->set_uuid(UUID::random().toBytes());
- send(master, message);
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId, id, task.task_id(), TASK_LOST, "Executor shutting down");
+
+ statusUpdate(update);
} else if (!executor->pid) {
// Queue task until the executor starts up.
LOG(INFO) << "Queuing task '" << task.task_id()
@@ -559,7 +564,9 @@ void Slave::runTask(
// the resources before the executor acts on its RunTaskMessage.
dispatch(isolationModule,
&IsolationModule::resourcesChanged,
- framework->id, executor->id, executor->resources);
+ framework->id,
+ executor->id,
+ executor->resources);
LOG(INFO) << "Sending task '" << task.task_id()
<< "' to executor '" << executorId
@@ -585,13 +592,16 @@ void Slave::runTask(
// Queue task until the executor starts up.
executor->queuedTasks[task.task_id()] = task;
- // Tell the isolation module to launch the executor. (TODO(benh):
- // Make the isolation module a process so that it can block while
- // trying to launch the executor.)
+ // Tell the isolation module to launch the executor.
+ // TODO(benh): Make the isolation module a process so that it
+ // can block while trying to launch the executor.
dispatch(isolationModule,
&IsolationModule::launchExecutor,
- framework->id, framework->info, executor->info,
- executor->directory, executor->resources);
+ framework->id,
+ framework->info,
+ executor->info,
+ executor->directory,
+ executor->resources);
}
}
@@ -607,59 +617,37 @@ void Slave::killTask(const FrameworkID&
<< " of framework " << frameworkId
<< " because no such framework is running";
- StatusUpdateMessage message;
- StatusUpdate* update = message.mutable_update();
- update->mutable_framework_id()->MergeFrom(frameworkId);
- update->mutable_slave_id()->MergeFrom(id);
- TaskStatus* status = update->mutable_status();
- status->mutable_task_id()->MergeFrom(taskId);
- status->set_state(TASK_LOST);
- update->set_timestamp(Clock::now());
- update->set_uuid(UUID::random().toBytes());
- send(master, message);
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId, id, taskId, TASK_LOST, "Cannot find framework");
+ statusUpdate(update);
return;
}
-
// Tell the executor to kill the task if it is up and
// running, otherwise, consider the task lost.
Executor* executor = framework->getExecutor(taskId);
if (executor == NULL) {
LOG(WARNING) << "WARNING! Cannot kill task " << taskId
<< " of framework " << frameworkId
- << " because no such task is running";
+ << " because no corresponding executor is running";
- StatusUpdateMessage message;
- StatusUpdate* update = message.mutable_update();
- update->mutable_framework_id()->MergeFrom(framework->id);
- update->mutable_slave_id()->MergeFrom(id);
- TaskStatus* status = update->mutable_status();
- status->mutable_task_id()->MergeFrom(taskId);
- status->set_state(TASK_LOST);
- update->set_timestamp(Clock::now());
- update->set_uuid(UUID::random().toBytes());
- send(master, message);
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId, id, taskId, TASK_LOST, "Cannot find executor");
+
+ statusUpdate(update);
} else if (!executor->pid) {
- // Remove the task.
- executor->removeTask(taskId);
+ // We are here, if the executor hasn't registered with the slave yet.
- // Tell the isolation module to update the resources.
- dispatch(isolationModule,
- &IsolationModule::resourcesChanged,
- framework->id, executor->id, executor->resources);
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ frameworkId,
+ id,
+ taskId,
+ TASK_KILLED,
+ "Unregistered executor",
+ executor->id);
- StatusUpdateMessage message;
- StatusUpdate* update = message.mutable_update();
- update->mutable_framework_id()->MergeFrom(framework->id);
- update->mutable_executor_id()->MergeFrom(executor->id);
- update->mutable_slave_id()->MergeFrom(id);
- TaskStatus* status = update->mutable_status();
- status->mutable_task_id()->MergeFrom(taskId);
- status->set_state(TASK_KILLED);
- update->set_timestamp(Clock::now());
- update->set_uuid(UUID::random().toBytes());
- send(master, message);
+ statusUpdate(update);
} else {
// Otherwise, send a message to the executor and wait for
// it to send us a status update.
@@ -678,24 +666,33 @@ void Slave::killTask(const FrameworkID&
// therefore never processed.
void Slave::shutdownFramework(const FrameworkID& frameworkId)
{
- if (from != master) {
+ // Allow shutdownFramework() only if
+ // its called directly (e.g. Slave::finalize()) or
+ // its a message from the currently registered master.
+ if (from && from != master) {
LOG(WARNING) << "Ignoring shutdown framework message from " << from
- << "because it is not from the registered master ("
+ << " because it is not from the registered master ("
<< master << ")";
return;
}
- LOG(INFO) << "Asked to shut down framework " << frameworkId;
+ LOG(INFO) << "Asked to shut down framework " << frameworkId
+ << " by " << from;
Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
LOG(INFO) << "Shutting down framework " << framework->id;
// Shut down all executors of this framework.
+ // Note that the framework and its corresponding executors are removed from
+ // the frameworks map by shutdownExecutorTimeout() or executorTerminated().
foreachvalue (Executor* executor, framework->executors) {
shutdownExecutor(framework, executor);
}
}
+
+ // Close all status update streams for this framework.
+ statusUpdateManager->cleanup(frameworkId);
}
@@ -757,25 +754,42 @@ void Slave::statusUpdateAcknowledgement(
const TaskID& taskId,
const string& uuid)
{
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- if (framework->updates.contains(UUID::fromBytes(uuid))) {
- LOG(INFO) << "Got acknowledgement of status update"
- << " for task " << taskId
- << " of framework " << frameworkId;
-
- framework->updates.erase(UUID::fromBytes(uuid));
-
- // Cleanup if this framework has no executors running and no pending updates.
- if (framework->executors.size() == 0 && framework->updates.empty()) {
- frameworks.erase(framework->id);
-
- // Pass ownership of the framework pointer.
- completedFrameworks.push_back(
- std::tr1::shared_ptr<Framework>(framework));
- }
- }
+ LOG(INFO) << "Got acknowledgement of status update"
+ << " for task " << taskId
+ << " of framework " << frameworkId;
+
+ statusUpdateManager->acknowledgement(taskId, frameworkId, uuid)
+ .onAny(defer(self(),
+ &Slave::_statusUpdateAcknowledgement,
+ params::_1,
+ taskId,
+ frameworkId,
+ uuid));
+}
+
+
+void Slave::_statusUpdateAcknowledgement(
+ const Future<Try<Nothing> >& future,
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const string& uuid)
+{
+ if (!future.isReady()) {
+ LOG(FATAL) << "Failed to handle status update acknowledgement " << uuid
+ << " for task " << taskId
+ << " of framework " << frameworkId
+ << (future.isFailed() ? future.failure() : "future discarded");
+ }
+
+ if (future.get().isError()) {
+ LOG(ERROR) << "Failed to handle the status update acknowledgement " << uuid
+ << " for task " << taskId
+ << " of framework " << frameworkId
+ << future.get().error();
+ return;
}
+
+ // TODO(vinod): Garbage collect the task meta directory.
}
@@ -860,41 +874,21 @@ void Slave::registerExecutor(
}
}
-
+// This can be called in two ways:
+// 1) When a status update from the executor is received.
+// 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
void Slave::statusUpdate(const StatusUpdate& update)
{
const TaskStatus& status = update.status();
- LOG(INFO) << "Status update: task " << status.task_id()
- << " of framework " << update.framework_id()
- << " is now in state " << status.state();
+ LOG(INFO) << "Handling status update" << update;
Framework* framework = getFramework(update.framework_id());
- if (framework != NULL) {
- // Send message and record the status for possible resending.
- // TODO(vinod): Revisit the strategy of always sending a status update
- // upstream, when we have persistent state at the master and slave.
- StatusUpdateMessage message;
- message.mutable_update()->MergeFrom(update);
- message.set_pid(self());
- send(master, message);
-
- UUID uuid = UUID::fromBytes(update.uuid());
-
- // Send us a message to try and resend after some delay.
- delay(STATUS_UPDATE_RETRY_INTERVAL,
- self(),
- &Slave::statusUpdateTimeout,
- framework->id,
- uuid);
-
- framework->updates[uuid] = update;
-
- stats.tasks[status.state()]++;
+ Executor* executor = NULL;
- stats.validStatusUpdates++;
+ if (framework != NULL) {
+ executor = framework->getExecutor(status.task_id());
- Executor* executor = framework->getExecutor(status.task_id());
if (executor != NULL) {
executor->updateTaskState(status.task_id(), status.state());
@@ -902,22 +896,108 @@ void Slave::statusUpdate(const StatusUpd
if (protobuf::isTerminalState(status.state())) {
executor->removeTask(status.task_id());
- dispatch(isolationModule,
- &IsolationModule::resourcesChanged,
- framework->id,
- executor->id,
- executor->resources);
+ dispatch(
+ isolationModule,
+ &IsolationModule::resourcesChanged,
+ framework->id,
+ executor->id,
+ executor->resources);
}
} else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "executor for framework " << update.framework_id();
+ LOG(WARNING) << "Could not find executor for task " << status.task_id()
+ << " of framework " << update.framework_id();
+
stats.invalidStatusUpdates++;
}
} else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "framework " << update.framework_id();
+ LOG(WARNING) << "Could not find framework " << update.framework_id()
+ << " for task " << status.task_id();
+
stats.invalidStatusUpdates++;
}
+
+ // Forward the update to the status update manager.
+ // NOTE: We forward the update even if the framework/executor is unknown
+ // because currently there is no persistent state in the master.
+ // The lack of persistence might lead frameworks to use out-of-band means
+ // to figure out the task state mismatch and use status updates to reconcile.
+ // We need to revisit this issue once master has persistent state.
+ forwardUpdate(update, executor);
+}
+
+
+void Slave::forwardUpdate(const StatusUpdate& update, Executor* executor)
+{
+ LOG(INFO) << "Forwarding status update " << update
+ << " to the status update manager";
+
+ const FrameworkID& frameworkId = update.framework_id();
+ const TaskID& taskId = update.status().task_id();
+
+ Option<UPID> pid;
+ Option<string> path;
+ bool checkpoint = false;
+
+ if (executor != NULL) {
+ // Get the executor pid.
+ if (executor->pid) {
+ pid = executor->pid;
+ }
+
+ // Check whether we need to do checkpointing.
+ Framework* framework = getFramework(frameworkId);
+ CHECK_NOTNULL(framework);
+ checkpoint = framework->info.checkpoint();
+
+ if (checkpoint) {
+ // Get the path to store the updates.
+ path = paths::getTaskUpdatesPath(
+ paths::getMetaRootDir(flags.work_dir),
+ id,
+ frameworkId,
+ executor->id,
+ executor->uuid,
+ taskId);
+ }
+ }
+
+ stats.tasks[update.status().state()]++;
+ stats.validStatusUpdates++;
+
+ statusUpdateManager->update(update, checkpoint, path)
+ .onAny(defer(self(), &Slave::_forwardUpdate, params::_1, update, pid));;
+}
+
+
+void Slave::_forwardUpdate(
+ const Future<Try<Nothing> >& future,
+ const StatusUpdate& update,
+ const Option<UPID>& pid)
+{
+ if (!future.isReady()) {
+ LOG(FATAL) << "Failed to handle status update " << update
+ << (future.isFailed() ? future.failure() : "future discarded");
+ return;
+ }
+
+ if (future.get().isError()) {
+ LOG(ERROR)
+ << "Failed to handle the status update " << update
+ << ": " << future.get().error();
+ return;
+ }
+
+ // Status update manager successfully handled the status update.
+ // Acknowledge the executor, if necessary.
+ if (pid.isSome()) {
+ LOG(INFO) << "Sending ACK for status update " << update
+ << " to executor " << pid.get();
+ StatusUpdateAcknowledgementMessage message;
+ message.mutable_framework_id()->MergeFrom(update.framework_id());
+ message.mutable_slave_id()->MergeFrom(update.slave_id());
+ message.mutable_task_id()->MergeFrom(update.status().task_id());
+ send(pid.get(), message);
+ }
}
@@ -956,34 +1036,6 @@ void Slave::ping(const UPID& from, const
}
-void Slave::statusUpdateTimeout(
- const FrameworkID& frameworkId,
- const UUID& uuid)
-{
- // Check and see if we still need to send this update.
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- if (framework->updates.contains(uuid)) {
- const StatusUpdate& update = framework->updates[uuid];
-
- LOG(INFO) << "Resending status update"
- << " for task " << update.status().task_id()
- << " of framework " << update.framework_id();
-
- StatusUpdateMessage message;
- message.mutable_update()->MergeFrom(update);
- message.set_pid(self());
- send(master, message);
-
- // Send us a message to try and resend after some delay.
- delay(STATUS_UPDATE_RETRY_INTERVAL,
- self(), &Slave::statusUpdateTimeout,
- framework->id, uuid);
- }
- }
-}
-
-
void Slave::exited(const UPID& pid)
{
LOG(INFO) << "Process exited: " << from;
@@ -1059,21 +1111,6 @@ void _watch(
}
-void Slave::sendStatusUpdate(
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const TaskID& taskId,
- TaskState taskState,
- const string& message)
-{
- const StatusUpdate& update = protobuf::createStatusUpdate(
- frameworkId, id, taskId, taskState, message, executorId);
-
- // Handle the status update as though it came from the executor.
- statusUpdate(update);
-}
-
-
void _unwatch(
const Future<Nothing>& watch,
const FrameworkID& frameworkId,
@@ -1124,18 +1161,20 @@ void Slave::executorTerminated(
// or if this is a command executor, we send TASK_FAILED status updates
// instead of TASK_LOST.
+ StatusUpdate update;
+
// Transition all live launched tasks.
foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
if (!protobuf::isTerminalState(task->state())) {
isCommandExecutor = !task->has_executor_id();
-
if (destroyed || isCommandExecutor) {
- sendStatusUpdate(
- frameworkId, executorId, task->task_id(), TASK_FAILED, message);
+ update = protobuf::createStatusUpdate(
+ frameworkId, id, task->task_id(), TASK_FAILED, message, executorId);
} else {
- sendStatusUpdate(
- frameworkId, executorId, task->task_id(), TASK_LOST, message);
+ update = protobuf::createStatusUpdate(
+ frameworkId, id, task->task_id(), TASK_LOST, message, executorId);
}
+ statusUpdate(update); // Handle the status update.
}
}
@@ -1144,12 +1183,13 @@ void Slave::executorTerminated(
isCommandExecutor = task.has_command();
if (destroyed || isCommandExecutor) {
- sendStatusUpdate(
- frameworkId, executorId, task.task_id(), TASK_FAILED, message);
+ update = protobuf::createStatusUpdate(
+ frameworkId, id, task.task_id(), TASK_FAILED, message, executorId);
} else {
- sendStatusUpdate(
- frameworkId, executorId, task.task_id(), TASK_LOST, message);
+ update = protobuf::createStatusUpdate(
+ frameworkId, id, task.task_id(), TASK_LOST, message, executorId);
}
+ statusUpdate(update); // Handle the status update.
}
if (!isCommandExecutor) {
@@ -1169,7 +1209,7 @@ void Slave::executorTerminated(
framework->destroyExecutor(executor->id);
// Cleanup if this framework has no executors running.
- if (framework->executors.size() == 0) {
+ if (framework->executors.empty()) {
frameworks.erase(framework->id);
// Pass ownership of the framework pointer.
@@ -1257,7 +1297,8 @@ void Slave::checkDiskUsage()
void Slave::_checkDiskUsage(const Future<Try<double> >& usage)
{
if (!usage.isReady()) {
- LOG(WARNING) << "Error getting disk usage";
+ LOG(ERROR) << "Failed to get disk usage: "
+ << (usage.isFailed() ? usage.failure() : "future discarded");
} else {
Try<double> result = usage.get();
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Mar 13 06:23:08 2013
@@ -60,6 +60,7 @@ namespace slave {
using namespace process;
// Some forward declarations.
+class StatusUpdateManager;
struct Executor;
struct Framework;
@@ -106,18 +107,10 @@ public:
void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
- void statusUpdateAcknowledgement(
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const TaskID& taskId,
- const std::string& uuid);
-
void registerExecutor(
const FrameworkID& frameworkId,
const ExecutorID& executorId);
- void statusUpdate(const StatusUpdate& update);
-
void executorMessage(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
@@ -126,14 +119,33 @@ public:
void ping(const UPID& from, const std::string& body);
- void statusUpdateTimeout(const FrameworkID& frameworkId, const UUID& uuid);
+ // Handles the status update.
+ void statusUpdate(const StatusUpdate& update);
+
+ // Forwards the update to the status update manager.
+ // NOTE: Executor could 'NULL' when we want to forward the update
+ // despite not knowing about the framework/executor.
+ void forwardUpdate(const StatusUpdate& update, Executor* executor = NULL);
+
+ // This callback is called when the status update manager finishes
+ // handling the update. If the handling is successful, an acknowledgment
+ // is sent to the executor.
+ void _forwardUpdate(
+ const Future<Try<Nothing> >& future,
+ const StatusUpdate& update,
+ const Option<UPID>& pid);
- void sendStatusUpdate(
+ void statusUpdateAcknowledgement(
+ const SlaveID& slaveId,
const FrameworkID& frameworkId,
- const ExecutorID& executorId,
const TaskID& taskId,
- TaskState taskState,
- const std::string& message);
+ const std::string& uuid);
+
+ void _statusUpdateAcknowledgement(
+ const Future<Try<Nothing> >& future,
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const std::string& uuid);
void executorStarted(
const FrameworkID& frameworkId,
@@ -175,9 +187,10 @@ protected:
// Handle the second phase of shutting down an executor for those
// executors that have not properly shutdown within a timeout.
- void shutdownExecutorTimeout(const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const UUID& uuid);
+ void shutdownExecutorTimeout(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const UUID& uuid);
// This function returns the max age of executor/slave directories allowed,
// given a disk usage. This value could be used to tune gc.
@@ -242,6 +255,8 @@ private:
ResourceMonitor monitor;
state::SlaveState state;
+
+ StatusUpdateManager* statusUpdateManager;
};
@@ -467,7 +482,6 @@ struct Framework
return executor;
}
}
-
return NULL;
}
Modified: incubator/mesos/trunk/src/slave/status_update_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.hpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.hpp (original)
+++ incubator/mesos/trunk/src/slave/status_update_manager.hpp Wed Mar 13 06:23:08 2013
@@ -35,6 +35,7 @@
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
Modified: incubator/mesos/trunk/src/tests/master_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_tests.cpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_tests.cpp Wed Mar 13 06:23:08 2013
@@ -377,11 +377,6 @@ TEST_F(MasterTest, StatusUpdateAck)
MockExecutor exec;
- trigger statusUpdateAckMsg;
- EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()), _, _)
- .WillOnce(DoAll(Trigger(&statusUpdateAckMsg),
- Return(false)));
-
trigger shutdownCall;
EXPECT_CALL(exec, registered(_, _, _, _))
@@ -403,6 +398,14 @@ TEST_F(MasterTest, StatusUpdateAck)
BasicMasterDetector detector(master, slave, true);
+ trigger statusUpdateAckMsg;
+ EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()),
+ _,
+ Eq(slave))
+ .WillOnce(DoAll(Trigger(&statusUpdateAckMsg),
+ Return(false)));
+
+
MockScheduler sched;
MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
@@ -445,7 +448,7 @@ TEST_F(MasterTest, StatusUpdateAck)
EXPECT_EQ(TASK_RUNNING, status.state());
- // Ensure we get a status update ACK.
+ // Ensure the slave gets a status update ACK.
WAIT_UNTIL(statusUpdateAckMsg);
driver.stop();
Added: incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp?rev=1455811&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp Wed Mar 13 06:23:08 2013
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <map>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/pid.hpp>
+
+#include <stout/protobuf.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+#include <stout/os.hpp>
+
+#include "detector/detector.hpp"
+
+#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
+#include "master/master.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/process_based_isolation_module.hpp"
+#include "slave/slave.hpp"
+#include "slave/paths.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/filter.hpp"
+#include "tests/utils.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+using namespace mesos::internal::slave::paths;
+
+using namespace process;
+
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::DoAll;
+using testing::Eq;
+using testing::Return;
+using testing::SaveArg;
+
+
+class StatusUpdateManagerTest: public ::testing::Test
+{
+protected:
+ static void SetUpTestCase()
+ {
+ flags.work_dir = "/tmp/mesos-tests";
+ os::rmdir(flags.work_dir);
+ }
+
+ virtual void SetUp()
+ {
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ a = new Allocator(&allocator);
+ m = new Master(a, &files);
+ master = process::spawn(m);
+
+ execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+ isolationModule = new TestingIsolationModule(execs);
+
+ s = new Slave(flags, true, isolationModule, &files);
+ slave = process::spawn(s);
+
+ detector = new BasicMasterDetector(master, slave, true);
+
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+ }
+
+ virtual void TearDown()
+ {
+ delete detector;
+
+ process::terminate(slave);
+ process::wait(slave);
+ delete s;
+
+ delete isolationModule;
+ process::terminate(master);
+ process::wait(master);
+ delete m;
+ delete a;
+
+ os::rmdir(flags.work_dir);
+ }
+
+ vector<TaskInfo> createTasks(const Offer& offer)
+ {
+ TaskInfo task;
+ task.set_name("test-task");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offer.slave_id());
+ task.mutable_resources()->MergeFrom(offer.resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ return tasks;
+ }
+
+ HierarchicalDRFAllocatorProcess allocator;
+ Allocator *a;
+ Master* m;
+ TestingIsolationModule* isolationModule;
+ Slave* s;
+ Files files;
+ BasicMasterDetector* detector;
+ FrameworkInfo frameworkInfo;
+ MockExecutor exec;
+ map<ExecutorID, Executor*> execs;
+ MockScheduler sched;
+ TaskStatus status;
+ PID<Master> master;
+ PID<Slave> slave;
+ static flags::Flags<logging::Flags, slave::Flags> flags;
+};
+
+// Initialize static members here.
+flags::Flags<logging::Flags, slave::Flags> StatusUpdateManagerTest::flags;
+
+
+TEST_F(StatusUpdateManagerTest, CheckpointStatusUpdate)
+{
+ // Message expectations.
+ trigger statusUpdateAckMsg;
+ EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()),
+ _,
+ slave)
+ .WillRepeatedly(DoAll(Trigger(&statusUpdateAckMsg), Return(false)));
+
+ // Executor expectations.
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ trigger shutdownCall;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillRepeatedly(Trigger(&shutdownCall));
+
+ // Scheduler expectations.
+ EXPECT_CALL(sched, registered(_, _, _))
+ .Times(1);
+
+ trigger resourceOffersCall;
+ vector<Offer> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+ .WillRepeatedly(Return());
+
+ trigger statusUpdateCall;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)))
+ .WillRepeatedly(Return());
+
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master);
+
+ driver.start();
+
+ WAIT_UNTIL(resourceOffersCall);
+
+ EXPECT_NE(0u, offers.size());
+ driver.launchTasks(offers[0].id(), createTasks(offers[0]));
+
+ WAIT_UNTIL(statusUpdateCall);
+
+ EXPECT_EQ(TASK_RUNNING, status.state());
+
+ // Check if the status update was properly checkpointed.
+ WAIT_UNTIL(statusUpdateAckMsg);
+
+ sleep(1); // To make sure the status updates manager acted on the ACK.
+
+ // Ensure that both the status update and its acknowledgement
+ // are correctly checkpointed.
+ Try<list<string> > found = os::find(flags.work_dir, TASK_UPDATES_FILE);
+ ASSERT_SOME(found);
+ ASSERT_EQ(1u, found.get().size());
+
+ Try<int> fd = os::open(found.get().front(), O_RDONLY);
+ ASSERT_SOME(fd);
+
+ int updates = 0;
+ int acks = 0;
+ string uuid;
+ Result<StatusUpdateRecord> record = Result<StatusUpdateRecord>::none();
+ while (true) {
+ record = protobuf::read<StatusUpdateRecord>(fd.get());
+
+ if (!record.isSome()) {
+ break;
+ }
+
+ if (record.get().type() == StatusUpdateRecord::UPDATE) {
+ EXPECT_EQ(TASK_RUNNING, record.get().update().status().state());
+ uuid = record.get().update().uuid();
+ updates++;
+ } else {
+ EXPECT_EQ(uuid, record.get().uuid());
+ acks++;
+ }
+ }
+
+ ASSERT_TRUE(record.isNone());
+ ASSERT_EQ(1, updates);
+ ASSERT_EQ(1, acks);
+
+ driver.stop();
+ driver.join();
+
+ WAIT_UNTIL(shutdownCall); // Ensures MockExecutor can be deallocated.
+}
+
+
+TEST_F(StatusUpdateManagerTest, RetryStatusUpdate)
+{
+ // Message expectations.
+
+ // Drop the first status update message.
+ trigger statusUpdateMsgDrop;
+ EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), master, _)
+ .WillOnce(DoAll(Trigger(&statusUpdateMsgDrop), Return(true)))
+ .WillRepeatedly(Return(false));
+
+ // Executor expectations.
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ trigger shutdownCall;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillRepeatedly(Trigger(&shutdownCall));
+
+ // Scheduler expectations.
+ EXPECT_CALL(sched, registered(_, _, _))
+ .Times(1);
+
+ trigger resourceOffersCall;
+ vector<Offer> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+ .WillRepeatedly(Return());
+
+ trigger statusUpdateCall;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)))
+ .WillRepeatedly(Return());
+
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master);
+
+ driver.start();
+
+ WAIT_UNTIL(resourceOffersCall);
+
+ Clock::pause();
+
+ EXPECT_NE(0u, offers.size());
+ driver.launchTasks(offers[0].id(), createTasks(offers[0]));
+
+ WAIT_UNTIL(statusUpdateMsgDrop);
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL.secs());
+
+ WAIT_UNTIL(statusUpdateCall);
+
+ EXPECT_EQ(TASK_RUNNING, status.state());
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ WAIT_UNTIL(shutdownCall); // Ensures MockExecutor can be deallocated.
+}