You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/03/13 07:23:09 UTC

svn commit: r1455811 - in /incubator/mesos/trunk: include/mesos/ src/ src/slave/ src/tests/

Author: vinodkone
Date: Wed Mar 13 06:23:08 2013
New Revision: 1455811

URL: http://svn.apache.org/r1455811
Log:
Integrated status update manager into the slave.

Review: https://reviews.apache.org/r/7655

Added:
    incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp
Modified:
    incubator/mesos/trunk/include/mesos/mesos.proto
    incubator/mesos/trunk/src/Makefile.am
    incubator/mesos/trunk/src/slave/gc.cpp
    incubator/mesos/trunk/src/slave/paths.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/slave/status_update_manager.hpp
    incubator/mesos/trunk/src/tests/master_tests.cpp

Modified: incubator/mesos/trunk/include/mesos/mesos.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/mesos/mesos.proto?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/include/mesos/mesos.proto (original)
+++ incubator/mesos/trunk/include/mesos/mesos.proto Wed Mar 13 06:23:08 2013
@@ -91,12 +91,17 @@ message ExecutorID {
  * performing failover). The amount of time that the master will wait
  * for the scheduler to failover before removing the framework is
  * specified by failover_timeout.
+ * If checkpoint is set, framework pid, executor pids and status updates
+ * are checkpointed to disk.
+ * Checkpointing allows a restarted slave to reconnect with old executors
+ * and recover status updates, at the cost of disk I/O.
  */
 message FrameworkInfo {
   required string user = 1;
   required string name = 2;
   optional FrameworkID id = 3;
   optional double failover_timeout = 4 [default = 0.0];
+  optional bool checkpoint = 5 [default = false];
 }
 
 

Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Wed Mar 13 06:23:08 2013
@@ -781,10 +781,11 @@ balloon_executor_LDADD = libmesos.la
 
 check_PROGRAMS += mesos-tests
 
-mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp tests/filter.cpp  	\
+mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp tests/filter.cpp	\
 	              tests/environment.cpp				\
 	              tests/master_tests.cpp tests/state_tests.cpp	\
 	              tests/slave_state_tests.cpp			\
+	              tests/status_update_manager_tests.cpp		\
 	              tests/gc_tests.cpp				\
 	              tests/resource_offers_tests.cpp			\
 	              tests/fault_tolerance_tests.cpp			\

Modified: incubator/mesos/trunk/src/slave/gc.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/gc.cpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/gc.cpp (original)
+++ incubator/mesos/trunk/src/slave/gc.cpp Wed Mar 13 06:23:08 2013
@@ -196,7 +196,7 @@ Future<Nothing> GarbageCollector::schedu
 
 void GarbageCollector::prune(const Duration& d)
 {
-  return dispatch(process, &GarbageCollectorProcess::prune, d);
+  dispatch(process, &GarbageCollectorProcess::prune, d);
 }
 
 } // namespace mesos {

Modified: incubator/mesos/trunk/src/slave/paths.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/paths.hpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/paths.hpp (original)
+++ incubator/mesos/trunk/src/slave/paths.hpp Wed Mar 13 06:23:08 2013
@@ -39,53 +39,42 @@ namespace paths {
 
 const std::string EXECUTOR_LATEST_SYMLINK = "latest";
 
-// Path layout templates.
-const std::string ROOT_PATH =
-  "%s";
-
-const std::string SLAVEID_PATH =
-  ROOT_PATH + "/slaves/slave.id";
-
-const std::string SLAVE_PATH =
-  ROOT_PATH + "/slaves/%s";
+// Helper functions to generate paths.
 
-const std::string FRAMEWORK_PATH =
-  SLAVE_PATH + "/frameworks/%s";
+// File names.
+const std::string SLAVEID_FILE = "slave.id";
+const std::string FRAMEWORK_PID_FILE = "framework.pid";
+const std::string LIBPROCESS_PID_FILE = "libprocess.pid";
+const std::string EXECED_PID_FILE = "execed.pid";
+const std::string FORKED_PID_FILE = "forked.pid";
+const std::string TASK_INFO_FILE = "info";
+const std::string TASK_UPDATES_FILE = "updates";
 
+// Path layout templates.
+const std::string ROOT_PATH = "%s";
+const std::string SLAVEID_PATH = ROOT_PATH + "/slaves/" + SLAVEID_FILE;
+const std::string SLAVE_PATH = ROOT_PATH + "/slaves/%s";
+const std::string FRAMEWORK_PATH = SLAVE_PATH + "/frameworks/%s";
 const std::string FRAMEWORK_PID_PATH =
-  FRAMEWORK_PATH + "/framework.pid";
-
-const std::string EXECUTOR_PATH =
-  FRAMEWORK_PATH + "/executors/%s";
-
-const std::string EXECUTOR_RUN_PATH =
-  EXECUTOR_PATH + "/runs/%s";
-
+    FRAMEWORK_PATH + "/" + FRAMEWORK_PID_FILE;
+const std::string EXECUTOR_PATH = FRAMEWORK_PATH + "/executors/%s";
+const std::string EXECUTOR_RUN_PATH = EXECUTOR_PATH + "/runs/%s";
 const std::string EXECUTOR_LATEST_RUN_PATH =
   EXECUTOR_PATH + "/runs/" + EXECUTOR_LATEST_SYMLINK;
+const std::string PIDS_PATH = EXECUTOR_RUN_PATH + "/pids";
+const std::string LIBPROCESS_PID_PATH = PIDS_PATH + "/" + LIBPROCESS_PID_FILE;
+const std::string EXECED_PID_PATH = PIDS_PATH + "/" + EXECED_PID_FILE;
+const std::string FORKED_PID_PATH = PIDS_PATH + "/" + FORKED_PID_FILE;
+const std::string TASK_PATH = EXECUTOR_RUN_PATH + "/tasks/%s";
+const std::string TASK_INFO_PATH = TASK_PATH + "/" + TASK_INFO_FILE;
+const std::string TASK_UPDATES_PATH = TASK_PATH + "/" + TASK_UPDATES_FILE;
 
-const std::string PIDS_PATH =
-  EXECUTOR_RUN_PATH + "/pids";
 
-const std::string LIBPROCESS_PID_PATH =
-  PIDS_PATH + "/libprocess.pid";
-
-const std::string EXECED_PID_PATH =
-  PIDS_PATH + "/execed.pid";
-
-const std::string FORKED_PID_PATH =
-  PIDS_PATH + "/forked.pid";
-
-const std::string TASK_PATH =
-  EXECUTOR_RUN_PATH + "/tasks/%s";
-
-const std::string TASK_INFO_PATH =
-  TASK_PATH + "/info";
-
-const std::string TASK_UPDATES_PATH =
-  TASK_PATH + "/updates";
+inline std::string getMetaRootDir(const std::string rootDir)
+{
+  return path::join(rootDir, "meta");
+}
 
-// Helper functions to generate paths.
 
 inline std::string getSlaveIDPath(const std::string& rootDir)
 {
@@ -93,128 +82,182 @@ inline std::string getSlaveIDPath(const 
 }
 
 
-inline std::string getSlavePath(const std::string& rootDir,
-                                const SlaveID& slaveId)
+inline std::string getSlavePath(
+    const std::string& rootDir,
+    const SlaveID& slaveId)
 {
   return strings::format(SLAVE_PATH, rootDir, slaveId).get();
 }
 
 
-inline std::string getFrameworkPath(const std::string& rootDir,
-                                    const SlaveID& slaveId,
-                                    const FrameworkID& frameworkId)
+inline std::string getFrameworkPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId)
 {
-  return strings::format(FRAMEWORK_PATH, rootDir, slaveId,
-                         frameworkId).get();
+  return strings::format(
+      FRAMEWORK_PATH, rootDir, slaveId, frameworkId).get();
 }
 
 
-inline std::string getFrameworkPIDPath(const std::string& rootDir,
-                                       const SlaveID& slaveId,
-                                       const FrameworkID& frameworkId)
+inline std::string getFrameworkPIDPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId)
 {
-  return strings::format(FRAMEWORK_PID_PATH, rootDir, slaveId,
-                              frameworkId).get();
+  return strings::format(
+      FRAMEWORK_PID_PATH, rootDir, slaveId, frameworkId).get();
 }
 
 
-inline std::string getExecutorPath(const std::string& rootDir,
-                                   const SlaveID& slaveId,
-                                   const FrameworkID& frameworkId,
-                                   const ExecutorID& executorId)
+inline std::string getExecutorPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
 {
-  return strings::format(EXECUTOR_PATH, rootDir, slaveId, frameworkId,
-                         executorId).get();
+  return strings::format(
+      EXECUTOR_PATH, rootDir, slaveId, frameworkId, executorId).get();
 }
 
 
-inline std::string getExecutorRunPath(const std::string& rootDir,
-                                      const SlaveID& slaveId,
-                                      const FrameworkID& frameworkId,
-                                      const ExecutorID& executorId,
-                                      const UUID& executorUUID)
+inline std::string getExecutorRunPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const UUID& executorUUID)
 {
-  return strings::format(EXECUTOR_RUN_PATH, rootDir, slaveId, frameworkId,
-                         executorId, executorUUID.toString()).get();
+  return strings::format(
+      EXECUTOR_RUN_PATH,
+      rootDir,
+      slaveId,
+      frameworkId,
+      executorId,
+      executorUUID.toString()).get();
 }
 
 
-inline std::string getExecutorLatestRunPath(const std::string& rootDir,
-                                            const SlaveID& slaveId,
-                                            const FrameworkID& frameworkId,
-                                            const ExecutorID& executorId)
+inline std::string getExecutorLatestRunPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
 {
-  return strings::format(EXECUTOR_LATEST_RUN_PATH, rootDir, slaveId,
-                         frameworkId, executorId).get();
+  return strings::format(
+      EXECUTOR_LATEST_RUN_PATH,
+      rootDir,
+      slaveId,
+      frameworkId,
+      executorId).get();
 }
 
 
-inline std::string getLibprocessPIDPath(const std::string& rootDir,
-                                        const SlaveID& slaveId,
-                                        const FrameworkID& frameworkId,
-                                        const ExecutorID& executorId,
-                                        const UUID& executorUUID)
+inline std::string getLibprocessPIDPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const UUID& executorUUID)
 {
-  return strings::format(LIBPROCESS_PID_PATH, rootDir, slaveId, frameworkId,
-                         executorId, executorUUID.toString()).get();
+  return strings::format(
+      LIBPROCESS_PID_PATH,
+      rootDir,
+      slaveId,
+      frameworkId,
+      executorId,
+      executorUUID.toString()).get();
 }
 
 
-inline std::string getExecedPIDPath(const std::string& rootDir,
-                                    const SlaveID& slaveId,
-                                    const FrameworkID& frameworkId,
-                                    const ExecutorID& executorId,
-                                    const UUID& executorUUID)
+inline std::string getExecedPIDPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const UUID& executorUUID)
 {
-  return strings::format(EXECED_PID_PATH, rootDir, slaveId, frameworkId,
-                         executorId, executorUUID.toString()).get();
+  return strings::format(
+      EXECED_PID_PATH,
+      rootDir,
+      slaveId,
+      frameworkId,
+      executorId,
+      executorUUID.toString()).get();
 }
 
 
-inline std::string getForkedPIDPath(const std::string& rootDir,
-                                    const SlaveID& slaveId,
-                                    const FrameworkID& frameworkId,
-                                    const ExecutorID& executorId,
-                                    const UUID& executorUUID)
+inline std::string getForkedPIDPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const UUID& executorUUID)
 {
-  return strings::format(FORKED_PID_PATH, rootDir, slaveId, frameworkId,
-                         executorId, executorUUID.toString()).get();
+  return strings::format(
+      FORKED_PID_PATH,
+      rootDir,
+      slaveId,
+      frameworkId,
+      executorId,
+      executorUUID.toString()).get();
 }
 
 
-inline std::string getTaskPath(const std::string& rootDir,
-                               const SlaveID& slaveId,
-                               const FrameworkID& frameworkId,
-                               const ExecutorID& executorId,
-                               const UUID& executorUUID,
-                               const TaskID& taskId)
+inline std::string getTaskPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const UUID& executorUUID,
+    const TaskID& taskId)
 {
-  return strings::format(TASK_PATH, rootDir, slaveId, frameworkId, executorId,
-                         executorUUID.toString(), taskId).get();
+  return strings::format(
+      TASK_PATH,
+      rootDir,
+      slaveId,
+      frameworkId,
+      executorId,
+      executorUUID.toString(),
+      taskId).get();
 }
 
 
-inline std::string getTaskInfoPath(const std::string& rootDir,
-                                   const SlaveID& slaveId,
-                                   const FrameworkID& frameworkId,
-                                   const ExecutorID& executorId,
-                                   const UUID& executorUUID,
-                                   const TaskID& taskId)
+inline std::string getTaskInfoPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const UUID& executorUUID,
+    const TaskID& taskId)
 {
-  return strings::format(TASK_INFO_PATH, rootDir, slaveId, frameworkId,
-                         executorId, executorUUID.toString(), taskId).get();
+  return strings::format(
+      TASK_INFO_PATH,
+      rootDir,
+      slaveId,
+      frameworkId,
+      executorId,
+      executorUUID.toString(),
+      taskId).get();
 }
 
 
-inline std::string getTaskUpdatesPath(const std::string& rootDir,
-                                      const SlaveID& slaveId,
-                                      const FrameworkID& frameworkId,
-                                      const ExecutorID& executorId,
-                                      const UUID& executorUUID,
-                                      const TaskID& taskId)
+inline std::string getTaskUpdatesPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const UUID& executorUUID,
+    const TaskID& taskId)
 {
-  return strings::format(TASK_UPDATES_PATH, rootDir, slaveId, frameworkId,
-                         executorId, executorUUID.toString(), taskId).get();
+  return strings::format(
+      TASK_UPDATES_PATH,
+      rootDir,
+      slaveId,
+      frameworkId,
+      executorId,
+      executorUUID.toString(),
+      taskId).get();
 }
 
 
@@ -237,7 +280,10 @@ inline std::string createExecutorDirecto
   // Remove the previous "latest" symlink.
   std::string latest =
     getExecutorLatestRunPath(rootDir, slaveId, frameworkId, executorId);
-  os::rm(latest);
+
+  if (os::exists(latest)) {
+    CHECK_SOME(os::rm(latest)) << "Failed to remove latest symlink " << latest;
+  }
 
   // Symlink the new executor directory to "latest".
   Try<Nothing> symlink = fs::symlink(directory, latest);
@@ -248,7 +294,6 @@ inline std::string createExecutorDirecto
   return directory;
 }
 
-
 } // namespace paths {
 } // namespace slave {
 } // namespace internal {

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Mar 13 06:23:08 2013
@@ -49,6 +49,7 @@
 #include "slave/flags.hpp"
 #include "slave/paths.hpp"
 #include "slave/slave.hpp"
+#include "slave/status_update_manager.hpp"
 
 namespace params = std::tr1::placeholders;
 
@@ -75,7 +76,8 @@ Slave::Slave(const Resources& _resources
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
     isolationModule(_isolationModule),
     files(_files),
-    monitor(isolationModule) {}
+    monitor(isolationModule),
+    statusUpdateManager(new StatusUpdateManager()) {}
 
 
 Slave::Slave(const flags::Flags<logging::Flags, slave::Flags>& _flags,
@@ -88,7 +90,8 @@ Slave::Slave(const flags::Flags<logging:
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
     isolationModule(_isolationModule),
     files(_files),
-    monitor(isolationModule)
+    monitor(isolationModule),
+    statusUpdateManager(new StatusUpdateManager())
 {
   // TODO(benh): Move this computation into Flags as the "default".
 
@@ -184,6 +187,8 @@ Slave::~Slave()
   foreachvalue (Framework* framework, frameworks) {
     delete framework;
   }
+
+  delete statusUpdateManager;
 }
 
 
@@ -227,6 +232,8 @@ void Slave::initialize()
            local,
            self());
 
+  statusUpdateManager->initialize(self());
+
   // Start disk monitoring.
   // NOTE: We send a delayed message here instead of directly calling
   // checkDiskUsage, to make disabling this feature easy (e.g by specifying
@@ -353,6 +360,7 @@ void Slave::finalize()
     // immediately). Of course, this still isn't sufficient
     // because those status updates might get lost and we won't
     // resend them unless we build that into the system.
+    // TODO(vinod): Kill this shutdown when slave recovery is in place.
     shutdownFramework(frameworkId);
   }
 
@@ -406,6 +414,9 @@ void Slave::newMasterDetected(const UPID
 
   connected = false;
   doReliableRegistration();
+
+  // Inform the status updates manager about the new master.
+  statusUpdateManager->newMasterDetected(master);
 }
 
 
@@ -532,16 +543,10 @@ void Slave::runTask(
                    << " with executor '" << executorId
                    << "' which is being shut down";
 
-      StatusUpdateMessage message;
-      StatusUpdate* update = message.mutable_update();
-      update->mutable_framework_id()->MergeFrom(frameworkId);
-      update->mutable_slave_id()->MergeFrom(id);
-      TaskStatus* status = update->mutable_status();
-      status->mutable_task_id()->MergeFrom(task.task_id());
-      status->set_state(TASK_LOST);
-      update->set_timestamp(Clock::now());
-      update->set_uuid(UUID::random().toBytes());
-      send(master, message);
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          frameworkId, id, task.task_id(), TASK_LOST, "Executor shutting down");
+
+      statusUpdate(update);
     } else if (!executor->pid) {
       // Queue task until the executor starts up.
       LOG(INFO) << "Queuing task '" << task.task_id()
@@ -559,7 +564,9 @@ void Slave::runTask(
       // the resources before the executor acts on its RunTaskMessage.
       dispatch(isolationModule,
                &IsolationModule::resourcesChanged,
-               framework->id, executor->id, executor->resources);
+               framework->id,
+               executor->id,
+               executor->resources);
 
       LOG(INFO) << "Sending task '" << task.task_id()
                 << "' to executor '" << executorId
@@ -585,13 +592,16 @@ void Slave::runTask(
     // Queue task until the executor starts up.
     executor->queuedTasks[task.task_id()] = task;
 
-    // Tell the isolation module to launch the executor. (TODO(benh):
-    // Make the isolation module a process so that it can block while
-    // trying to launch the executor.)
+    // Tell the isolation module to launch the executor.
+    // TODO(benh): Make the isolation module a process so that it
+    // can block while trying to launch the executor.
     dispatch(isolationModule,
              &IsolationModule::launchExecutor,
-             framework->id, framework->info, executor->info,
-             executor->directory, executor->resources);
+             framework->id,
+             framework->info,
+             executor->info,
+             executor->directory,
+             executor->resources);
   }
 }
 
@@ -607,59 +617,37 @@ void Slave::killTask(const FrameworkID& 
                  << " of framework " << frameworkId
                  << " because no such framework is running";
 
-    StatusUpdateMessage message;
-    StatusUpdate* update = message.mutable_update();
-    update->mutable_framework_id()->MergeFrom(frameworkId);
-    update->mutable_slave_id()->MergeFrom(id);
-    TaskStatus* status = update->mutable_status();
-    status->mutable_task_id()->MergeFrom(taskId);
-    status->set_state(TASK_LOST);
-    update->set_timestamp(Clock::now());
-    update->set_uuid(UUID::random().toBytes());
-    send(master, message);
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        frameworkId, id, taskId, TASK_LOST, "Cannot find framework");
 
+    statusUpdate(update);
     return;
   }
 
-
   // Tell the executor to kill the task if it is up and
   // running, otherwise, consider the task lost.
   Executor* executor = framework->getExecutor(taskId);
   if (executor == NULL) {
     LOG(WARNING) << "WARNING! Cannot kill task " << taskId
                  << " of framework " << frameworkId
-                 << " because no such task is running";
+                 << " because no corresponding executor is running";
 
-    StatusUpdateMessage message;
-    StatusUpdate* update = message.mutable_update();
-    update->mutable_framework_id()->MergeFrom(framework->id);
-    update->mutable_slave_id()->MergeFrom(id);
-    TaskStatus* status = update->mutable_status();
-    status->mutable_task_id()->MergeFrom(taskId);
-    status->set_state(TASK_LOST);
-    update->set_timestamp(Clock::now());
-    update->set_uuid(UUID::random().toBytes());
-    send(master, message);
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        frameworkId, id, taskId, TASK_LOST, "Cannot find executor");
+
+    statusUpdate(update);
   } else if (!executor->pid) {
-    // Remove the task.
-    executor->removeTask(taskId);
+    // We are here, if the executor hasn't registered with the slave yet.
 
-    // Tell the isolation module to update the resources.
-    dispatch(isolationModule,
-             &IsolationModule::resourcesChanged,
-             framework->id, executor->id, executor->resources);
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        frameworkId,
+        id,
+        taskId,
+        TASK_KILLED,
+        "Unregistered executor",
+        executor->id);
 
-    StatusUpdateMessage message;
-    StatusUpdate* update = message.mutable_update();
-    update->mutable_framework_id()->MergeFrom(framework->id);
-    update->mutable_executor_id()->MergeFrom(executor->id);
-    update->mutable_slave_id()->MergeFrom(id);
-    TaskStatus* status = update->mutable_status();
-    status->mutable_task_id()->MergeFrom(taskId);
-    status->set_state(TASK_KILLED);
-    update->set_timestamp(Clock::now());
-    update->set_uuid(UUID::random().toBytes());
-    send(master, message);
+    statusUpdate(update);
   } else {
     // Otherwise, send a message to the executor and wait for
     // it to send us a status update.
@@ -678,24 +666,33 @@ void Slave::killTask(const FrameworkID& 
 // therefore never processed.
 void Slave::shutdownFramework(const FrameworkID& frameworkId)
 {
-  if (from != master) {
+  // Allow shutdownFramework() only if
+  // its called directly (e.g. Slave::finalize()) or
+  // its a message from the currently registered master.
+  if (from && from != master) {
     LOG(WARNING) << "Ignoring shutdown framework message from " << from
-                 << "because it is not from the registered master ("
+                 << " because it is not from the registered master ("
                  << master << ")";
     return;
   }
 
-  LOG(INFO) << "Asked to shut down framework " << frameworkId;
+  LOG(INFO) << "Asked to shut down framework " << frameworkId
+            << " by " << from;
 
   Framework* framework = getFramework(frameworkId);
   if (framework != NULL) {
     LOG(INFO) << "Shutting down framework " << framework->id;
 
     // Shut down all executors of this framework.
+    // Note that the framework and its corresponding executors are removed from
+    // the frameworks map by shutdownExecutorTimeout() or executorTerminated().
     foreachvalue (Executor* executor, framework->executors) {
       shutdownExecutor(framework, executor);
     }
   }
+
+  // Close all status update streams for this framework.
+  statusUpdateManager->cleanup(frameworkId);
 }
 
 
@@ -757,25 +754,42 @@ void Slave::statusUpdateAcknowledgement(
     const TaskID& taskId,
     const string& uuid)
 {
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    if (framework->updates.contains(UUID::fromBytes(uuid))) {
-      LOG(INFO) << "Got acknowledgement of status update"
-                << " for task " << taskId
-                << " of framework " << frameworkId;
-
-      framework->updates.erase(UUID::fromBytes(uuid));
-
-      // Cleanup if this framework has no executors running and no pending updates.
-      if (framework->executors.size() == 0 && framework->updates.empty()) {
-        frameworks.erase(framework->id);
-
-        // Pass ownership of the framework pointer.
-        completedFrameworks.push_back(
-            std::tr1::shared_ptr<Framework>(framework));
-      }
-    }
+  LOG(INFO) << "Got acknowledgement of status update"
+            << " for task " << taskId
+            << " of framework " << frameworkId;
+
+  statusUpdateManager->acknowledgement(taskId, frameworkId, uuid)
+    .onAny(defer(self(),
+                 &Slave::_statusUpdateAcknowledgement,
+                 params::_1,
+                 taskId,
+                 frameworkId,
+                 uuid));
+}
+
+
+void Slave::_statusUpdateAcknowledgement(
+    const Future<Try<Nothing> >& future,
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const string& uuid)
+{
+  if (!future.isReady()) {
+    LOG(FATAL) << "Failed to handle status update acknowledgement " << uuid
+               << " for task " << taskId
+               << " of framework " << frameworkId
+               << (future.isFailed() ? future.failure() : "future discarded");
+  }
+
+  if (future.get().isError()) {
+    LOG(ERROR) << "Failed to handle the status update acknowledgement " << uuid
+        << " for task " << taskId
+        << " of framework " << frameworkId
+        << future.get().error();
+    return;
   }
+
+  // TODO(vinod): Garbage collect the task meta directory.
 }
 
 
@@ -860,41 +874,21 @@ void Slave::registerExecutor(
   }
 }
 
-
+// This can be called in two ways:
+// 1) When a status update from the executor is received.
+// 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
 void Slave::statusUpdate(const StatusUpdate& update)
 {
   const TaskStatus& status = update.status();
 
-  LOG(INFO) << "Status update: task " << status.task_id()
-            << " of framework " << update.framework_id()
-            << " is now in state " << status.state();
+  LOG(INFO) << "Handling status update" << update;
 
   Framework* framework = getFramework(update.framework_id());
-  if (framework != NULL) {
-    // Send message and record the status for possible resending.
-    // TODO(vinod): Revisit the strategy of always sending a status update
-    // upstream, when we have persistent state at the master and slave.
-    StatusUpdateMessage message;
-    message.mutable_update()->MergeFrom(update);
-    message.set_pid(self());
-    send(master, message);
-
-    UUID uuid = UUID::fromBytes(update.uuid());
-
-    // Send us a message to try and resend after some delay.
-    delay(STATUS_UPDATE_RETRY_INTERVAL,
-          self(),
-          &Slave::statusUpdateTimeout,
-          framework->id,
-          uuid);
-
-    framework->updates[uuid] = update;
-
-    stats.tasks[status.state()]++;
+  Executor* executor = NULL;
 
-    stats.validStatusUpdates++;
+  if (framework != NULL) {
+    executor = framework->getExecutor(status.task_id());
 
-    Executor* executor = framework->getExecutor(status.task_id());
     if (executor != NULL) {
       executor->updateTaskState(status.task_id(), status.state());
 
@@ -902,22 +896,108 @@ void Slave::statusUpdate(const StatusUpd
       if (protobuf::isTerminalState(status.state())) {
         executor->removeTask(status.task_id());
 
-        dispatch(isolationModule,
-                 &IsolationModule::resourcesChanged,
-                 framework->id,
-                 executor->id,
-                 executor->resources);
+        dispatch(
+            isolationModule,
+            &IsolationModule::resourcesChanged,
+            framework->id,
+            executor->id,
+            executor->resources);
       }
     } else {
-      LOG(WARNING) << "Status update error: couldn't lookup "
-                   << "executor for framework " << update.framework_id();
+      LOG(WARNING) << "Could not find executor for task " << status.task_id()
+                   << " of framework " << update.framework_id();
+
       stats.invalidStatusUpdates++;
     }
   } else {
-    LOG(WARNING) << "Status update error: couldn't lookup "
-                 << "framework " << update.framework_id();
+    LOG(WARNING) << "Could not find framework " << update.framework_id()
+                 << " for task " << status.task_id();
+
     stats.invalidStatusUpdates++;
   }
+
+  // Forward the update to the status update manager.
+  // NOTE: We forward the update even if the framework/executor is unknown
+  // because currently there is no persistent state in the master.
+  // The lack of persistence might lead frameworks to use out-of-band means
+  // to figure out the task state mismatch and use status updates to reconcile.
+  // We need to revisit this issue once master has persistent state.
+  forwardUpdate(update, executor);
+}
+
+
+void Slave::forwardUpdate(const StatusUpdate& update, Executor* executor)
+{
+  LOG(INFO) << "Forwarding status update " << update
+            << " to the status update manager";
+
+  const FrameworkID& frameworkId = update.framework_id();
+  const TaskID& taskId = update.status().task_id();
+
+  Option<UPID> pid;
+  Option<string> path;
+  bool checkpoint = false;
+
+  if (executor != NULL) {
+    // Get the executor pid.
+    if (executor->pid) {
+      pid = executor->pid;
+    }
+
+    // Check whether we need to do checkpointing.
+    Framework* framework = getFramework(frameworkId);
+    CHECK_NOTNULL(framework);
+    checkpoint = framework->info.checkpoint();
+
+    if (checkpoint) {
+      // Get the path to store the updates.
+      path = paths::getTaskUpdatesPath(
+          paths::getMetaRootDir(flags.work_dir),
+          id,
+          frameworkId,
+          executor->id,
+          executor->uuid,
+          taskId);
+    }
+  }
+
+  stats.tasks[update.status().state()]++;
+  stats.validStatusUpdates++;
+
+  statusUpdateManager->update(update, checkpoint, path)
+    .onAny(defer(self(), &Slave::_forwardUpdate, params::_1, update, pid));;
+}
+
+
+void Slave::_forwardUpdate(
+    const Future<Try<Nothing> >& future,
+    const StatusUpdate& update,
+    const Option<UPID>& pid)
+{
+  if (!future.isReady()) {
+    LOG(FATAL) << "Failed to handle status update " << update
+               << (future.isFailed() ? future.failure() : "future discarded");
+    return;
+  }
+
+  if (future.get().isError()) {
+    LOG(ERROR)
+      << "Failed to handle the status update " << update
+      << ": " << future.get().error();
+    return;
+  }
+
+  // Status update manager successfully handled the status update.
+  // Acknowledge the executor, if necessary.
+  if (pid.isSome()) {
+    LOG(INFO) << "Sending ACK for status update " << update
+              << " to executor " << pid.get();
+    StatusUpdateAcknowledgementMessage message;
+    message.mutable_framework_id()->MergeFrom(update.framework_id());
+    message.mutable_slave_id()->MergeFrom(update.slave_id());
+    message.mutable_task_id()->MergeFrom(update.status().task_id());
+    send(pid.get(), message);
+  }
 }
 
 
@@ -956,34 +1036,6 @@ void Slave::ping(const UPID& from, const
 }
 
 
-void Slave::statusUpdateTimeout(
-    const FrameworkID& frameworkId,
-    const UUID& uuid)
-{
-  // Check and see if we still need to send this update.
-  Framework* framework = getFramework(frameworkId);
-  if (framework != NULL) {
-    if (framework->updates.contains(uuid)) {
-      const StatusUpdate& update = framework->updates[uuid];
-
-      LOG(INFO) << "Resending status update"
-                << " for task " << update.status().task_id()
-                << " of framework " << update.framework_id();
-
-      StatusUpdateMessage message;
-      message.mutable_update()->MergeFrom(update);
-      message.set_pid(self());
-      send(master, message);
-
-      // Send us a message to try and resend after some delay.
-      delay(STATUS_UPDATE_RETRY_INTERVAL,
-            self(), &Slave::statusUpdateTimeout,
-            framework->id, uuid);
-    }
-  }
-}
-
-
 void Slave::exited(const UPID& pid)
 {
   LOG(INFO) << "Process exited: " << from;
@@ -1059,21 +1111,6 @@ void _watch(
 }
 
 
-void Slave::sendStatusUpdate(
-    const FrameworkID& frameworkId,
-    const ExecutorID& executorId,
-    const TaskID& taskId,
-    TaskState taskState,
-    const string& message)
-{
-  const StatusUpdate& update = protobuf::createStatusUpdate(
-      frameworkId, id, taskId, taskState, message, executorId);
-
-  // Handle the status update as though it came from the executor.
-  statusUpdate(update);
-}
-
-
 void _unwatch(
     const Future<Nothing>& watch,
     const FrameworkID& frameworkId,
@@ -1124,18 +1161,20 @@ void Slave::executorTerminated(
   // or if this is a command executor, we send TASK_FAILED status updates
   // instead of TASK_LOST.
 
+  StatusUpdate update;
+
   // Transition all live launched tasks.
   foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
     if (!protobuf::isTerminalState(task->state())) {
       isCommandExecutor = !task->has_executor_id();
-
       if (destroyed || isCommandExecutor) {
-        sendStatusUpdate(
-            frameworkId, executorId, task->task_id(), TASK_FAILED, message);
+        update = protobuf::createStatusUpdate(
+            frameworkId, id, task->task_id(), TASK_FAILED, message, executorId);
       } else {
-        sendStatusUpdate(
-            frameworkId, executorId, task->task_id(), TASK_LOST, message);
+        update = protobuf::createStatusUpdate(
+            frameworkId, id, task->task_id(), TASK_LOST, message, executorId);
       }
+      statusUpdate(update); // Handle the status update.
     }
   }
 
@@ -1144,12 +1183,13 @@ void Slave::executorTerminated(
     isCommandExecutor = task.has_command();
 
     if (destroyed || isCommandExecutor) {
-      sendStatusUpdate(
-          frameworkId, executorId, task.task_id(), TASK_FAILED, message);
+      update = protobuf::createStatusUpdate(
+          frameworkId, id, task.task_id(), TASK_FAILED, message, executorId);
     } else {
-      sendStatusUpdate(
-          frameworkId, executorId, task.task_id(), TASK_LOST, message);
+      update = protobuf::createStatusUpdate(
+          frameworkId, id, task.task_id(), TASK_LOST, message, executorId);
     }
+    statusUpdate(update); // Handle the status update.
   }
 
   if (!isCommandExecutor) {
@@ -1169,7 +1209,7 @@ void Slave::executorTerminated(
   framework->destroyExecutor(executor->id);
 
   // Cleanup if this framework has no executors running.
-  if (framework->executors.size() == 0) {
+  if (framework->executors.empty()) {
     frameworks.erase(framework->id);
 
     // Pass ownership of the framework pointer.
@@ -1257,7 +1297,8 @@ void Slave::checkDiskUsage()
 void Slave::_checkDiskUsage(const Future<Try<double> >& usage)
 {
   if (!usage.isReady()) {
-    LOG(WARNING) << "Error getting disk usage";
+    LOG(ERROR) << "Failed to get disk usage: "
+               << (usage.isFailed() ? usage.failure() : "future discarded");
   } else {
     Try<double> result = usage.get();
 

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Mar 13 06:23:08 2013
@@ -60,6 +60,7 @@ namespace slave {
 using namespace process;
 
 // Some forward declarations.
+class StatusUpdateManager;
 struct Executor;
 struct Framework;
 
@@ -106,18 +107,10 @@ public:
 
   void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
 
-  void statusUpdateAcknowledgement(
-      const SlaveID& slaveId,
-      const FrameworkID& frameworkId,
-      const TaskID& taskId,
-      const std::string& uuid);
-
   void registerExecutor(
       const FrameworkID& frameworkId,
       const ExecutorID& executorId);
 
-  void statusUpdate(const StatusUpdate& update);
-
   void executorMessage(
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
@@ -126,14 +119,33 @@ public:
 
   void ping(const UPID& from, const std::string& body);
 
-  void statusUpdateTimeout(const FrameworkID& frameworkId, const UUID& uuid);
+  // Handles the status update.
+  void statusUpdate(const StatusUpdate& update);
+
+  // Forwards the update to the status update manager.
+  // NOTE: Executor could 'NULL' when we want to forward the update
+  // despite not knowing about the framework/executor.
+  void forwardUpdate(const StatusUpdate& update, Executor* executor = NULL);
+
+  // This callback is called when the status update manager finishes
+  // handling the update. If the handling is successful, an acknowledgment
+  // is sent to the executor.
+  void _forwardUpdate(
+      const Future<Try<Nothing> >& future,
+      const StatusUpdate& update,
+      const Option<UPID>& pid);
 
-  void sendStatusUpdate(
+  void statusUpdateAcknowledgement(
+      const SlaveID& slaveId,
       const FrameworkID& frameworkId,
-      const ExecutorID& executorId,
       const TaskID& taskId,
-      TaskState taskState,
-      const std::string& message);
+      const std::string& uuid);
+
+  void _statusUpdateAcknowledgement(
+      const Future<Try<Nothing> >& future,
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const std::string& uuid);
 
   void executorStarted(
       const FrameworkID& frameworkId,
@@ -175,9 +187,10 @@ protected:
 
   // Handle the second phase of shutting down an executor for those
   // executors that have not properly shutdown within a timeout.
-  void shutdownExecutorTimeout(const FrameworkID& frameworkId,
-                               const ExecutorID& executorId,
-                               const UUID& uuid);
+  void shutdownExecutorTimeout(
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      const UUID& uuid);
 
   // This function returns the max age of executor/slave directories allowed,
   // given a disk usage. This value could be used to tune gc.
@@ -242,6 +255,8 @@ private:
   ResourceMonitor monitor;
 
   state::SlaveState state;
+
+  StatusUpdateManager* statusUpdateManager;
 };
 
 
@@ -467,7 +482,6 @@ struct Framework
         return executor;
       }
     }
-
     return NULL;
   }
 

Modified: incubator/mesos/trunk/src/slave/status_update_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.hpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.hpp (original)
+++ incubator/mesos/trunk/src/slave/status_update_manager.hpp Wed Mar 13 06:23:08 2013
@@ -35,6 +35,7 @@
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
+#include <stout/protobuf.hpp>
 #include <stout/utils.hpp>
 #include <stout/uuid.hpp>
 

Modified: incubator/mesos/trunk/src/tests/master_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_tests.cpp?rev=1455811&r1=1455810&r2=1455811&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_tests.cpp Wed Mar 13 06:23:08 2013
@@ -377,11 +377,6 @@ TEST_F(MasterTest, StatusUpdateAck)
 
   MockExecutor exec;
 
-  trigger statusUpdateAckMsg;
-  EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()), _, _)
-    .WillOnce(DoAll(Trigger(&statusUpdateAckMsg),
-                    Return(false)));
-
   trigger shutdownCall;
 
   EXPECT_CALL(exec, registered(_, _, _, _))
@@ -403,6 +398,14 @@ TEST_F(MasterTest, StatusUpdateAck)
 
   BasicMasterDetector detector(master, slave, true);
 
+  trigger statusUpdateAckMsg;
+  EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()),
+                 _,
+                 Eq(slave))
+    .WillOnce(DoAll(Trigger(&statusUpdateAckMsg),
+                    Return(false)));
+
+
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master);
 
@@ -445,7 +448,7 @@ TEST_F(MasterTest, StatusUpdateAck)
 
   EXPECT_EQ(TASK_RUNNING, status.state());
 
-  // Ensure we get a status update ACK.
+  // Ensure the slave gets a status update ACK.
   WAIT_UNTIL(statusUpdateAckMsg);
 
   driver.stop();

Added: incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp?rev=1455811&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp Wed Mar 13 06:23:08 2013
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <map>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/pid.hpp>
+
+#include <stout/protobuf.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+#include <stout/os.hpp>
+
+#include "detector/detector.hpp"
+
+#include "master/allocator.hpp"
+#include "master/hierarchical_allocator_process.hpp"
+#include "master/master.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/process_based_isolation_module.hpp"
+#include "slave/slave.hpp"
+#include "slave/paths.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/filter.hpp"
+#include "tests/utils.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+using namespace mesos::internal::slave::paths;
+
+using namespace process;
+
+using mesos::internal::master::Allocator;
+using mesos::internal::master::HierarchicalDRFAllocatorProcess;
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::DoAll;
+using testing::Eq;
+using testing::Return;
+using testing::SaveArg;
+
+
+class StatusUpdateManagerTest: public ::testing::Test
+{
+protected:
+  static void SetUpTestCase()
+  {
+    flags.work_dir = "/tmp/mesos-tests";
+    os::rmdir(flags.work_dir);
+  }
+
+  virtual void SetUp()
+  {
+    ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+    a = new Allocator(&allocator);
+    m = new Master(a, &files);
+    master = process::spawn(m);
+
+    execs[DEFAULT_EXECUTOR_ID] = &exec;
+
+    isolationModule = new TestingIsolationModule(execs);
+
+    s = new Slave(flags, true, isolationModule, &files);
+    slave = process::spawn(s);
+
+    detector = new BasicMasterDetector(master, slave, true);
+
+    frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+    frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+  }
+
+  virtual void TearDown()
+  {
+    delete detector;
+
+    process::terminate(slave);
+    process::wait(slave);
+    delete s;
+
+    delete isolationModule;
+    process::terminate(master);
+    process::wait(master);
+    delete m;
+    delete a;
+
+    os::rmdir(flags.work_dir);
+  }
+
+  vector<TaskInfo> createTasks(const Offer& offer)
+  {
+    TaskInfo task;
+    task.set_name("test-task");
+    task.mutable_task_id()->set_value("1");
+    task.mutable_slave_id()->MergeFrom(offer.slave_id());
+    task.mutable_resources()->MergeFrom(offer.resources());
+    task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+    vector<TaskInfo> tasks;
+    tasks.push_back(task);
+
+    return tasks;
+  }
+
+  HierarchicalDRFAllocatorProcess allocator;
+  Allocator *a;
+  Master* m;
+  TestingIsolationModule* isolationModule;
+  Slave* s;
+  Files files;
+  BasicMasterDetector* detector;
+  FrameworkInfo frameworkInfo;
+  MockExecutor exec;
+  map<ExecutorID, Executor*> execs;
+  MockScheduler sched;
+  TaskStatus status;
+  PID<Master> master;
+  PID<Slave> slave;
+  static flags::Flags<logging::Flags, slave::Flags> flags;
+};
+
+// Initialize static members here.
+flags::Flags<logging::Flags, slave::Flags> StatusUpdateManagerTest::flags;
+
+
+TEST_F(StatusUpdateManagerTest, CheckpointStatusUpdate)
+{
+  // Message expectations.
+  trigger statusUpdateAckMsg;
+  EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()),
+                 _,
+                 slave)
+    .WillRepeatedly(DoAll(Trigger(&statusUpdateAckMsg), Return(false)));
+
+  // Executor expectations.
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  trigger shutdownCall;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillRepeatedly(Trigger(&shutdownCall));
+
+  // Scheduler expectations.
+  EXPECT_CALL(sched, registered(_, _, _))
+    .Times(1);
+
+  trigger resourceOffersCall;
+  vector<Offer> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillRepeatedly(Return());
+
+  trigger statusUpdateCall;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)))
+    .WillRepeatedly(Return());
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master);
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOffersCall);
+
+  EXPECT_NE(0u, offers.size());
+  driver.launchTasks(offers[0].id(), createTasks(offers[0]));
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  EXPECT_EQ(TASK_RUNNING, status.state());
+
+  // Check if the status update was properly checkpointed.
+  WAIT_UNTIL(statusUpdateAckMsg);
+
+  sleep(1); // To make sure the status updates manager acted on the ACK.
+
+  // Ensure that both the status update and its acknowledgement
+  // are correctly checkpointed.
+  Try<list<string> > found = os::find(flags.work_dir, TASK_UPDATES_FILE);
+  ASSERT_SOME(found);
+  ASSERT_EQ(1u, found.get().size());
+
+  Try<int> fd = os::open(found.get().front(), O_RDONLY);
+  ASSERT_SOME(fd);
+
+  int updates = 0;
+  int acks = 0;
+  string uuid;
+  Result<StatusUpdateRecord> record = Result<StatusUpdateRecord>::none();
+  while (true) {
+    record = protobuf::read<StatusUpdateRecord>(fd.get());
+
+    if (!record.isSome()) {
+      break;
+    }
+
+    if (record.get().type() == StatusUpdateRecord::UPDATE) {
+      EXPECT_EQ(TASK_RUNNING, record.get().update().status().state());
+      uuid = record.get().update().uuid();
+      updates++;
+    } else {
+      EXPECT_EQ(uuid, record.get().uuid());
+      acks++;
+    }
+  }
+
+  ASSERT_TRUE(record.isNone());
+  ASSERT_EQ(1, updates);
+  ASSERT_EQ(1, acks);
+
+  driver.stop();
+  driver.join();
+
+  WAIT_UNTIL(shutdownCall); // Ensures MockExecutor can be deallocated.
+}
+
+
+TEST_F(StatusUpdateManagerTest, RetryStatusUpdate)
+{
+  // Message expectations.
+
+  // Drop the first status update message.
+  trigger statusUpdateMsgDrop;
+  EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), master, _)
+    .WillOnce(DoAll(Trigger(&statusUpdateMsgDrop), Return(true)))
+    .WillRepeatedly(Return(false));
+
+  // Executor expectations.
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillRepeatedly(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  trigger shutdownCall;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillRepeatedly(Trigger(&shutdownCall));
+
+  // Scheduler expectations.
+  EXPECT_CALL(sched, registered(_, _, _))
+    .Times(1);
+
+  trigger resourceOffersCall;
+  vector<Offer> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillRepeatedly(Return());
+
+  trigger statusUpdateCall;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&status), Trigger(&statusUpdateCall)))
+    .WillRepeatedly(Return());
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master);
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOffersCall);
+
+  Clock::pause();
+
+  EXPECT_NE(0u, offers.size());
+  driver.launchTasks(offers[0].id(), createTasks(offers[0]));
+
+  WAIT_UNTIL(statusUpdateMsgDrop);
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL.secs());
+
+  WAIT_UNTIL(statusUpdateCall);
+
+  EXPECT_EQ(TASK_RUNNING, status.state());
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  WAIT_UNTIL(shutdownCall); // Ensures MockExecutor can be deallocated.
+}