You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/11/21 00:54:21 UTC

[6/6] mesos git commit: Added the "task" prefix to the name of the status update manager files.

Added the "task" prefix to the name of the status update manager files.

Review: https://reviews.apache.org/r/63853/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/62d11733
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/62d11733
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/62d11733

Branch: refs/heads/master
Commit: 62d11733446cf54beb02295e7cd94d419541f152
Parents: 2377cd8
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Nov 20 16:43:12 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Nov 20 16:53:05 2017 -0800

----------------------------------------------------------------------
 src/CMakeLists.txt                             |   2 +-
 src/Makefile.am                                |   6 +-
 src/local/local.cpp                            |   2 +-
 src/slave/main.cpp                             |   2 +-
 src/slave/slave.cpp                            |   2 +-
 src/slave/status_update_manager.cpp            | 897 -------------------
 src/slave/status_update_manager.hpp            | 210 -----
 src/slave/task_status_update_manager.cpp       | 898 ++++++++++++++++++++
 src/slave/task_status_update_manager.hpp       | 210 +++++
 src/tests/CMakeLists.txt                       |   2 +-
 src/tests/cluster.cpp                          |   2 +-
 src/tests/cluster.hpp                          |   2 +-
 src/tests/mock_slave.cpp                       |   2 +-
 src/tests/status_update_manager_tests.cpp      | 852 -------------------
 src/tests/task_status_update_manager_tests.cpp | 852 +++++++++++++++++++
 15 files changed, 1971 insertions(+), 1970 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4f11418..be212d9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -257,7 +257,7 @@ set(AGENT_SRC
   slave/resource_estimator.cpp
   slave/slave.cpp
   slave/state.cpp
-  slave/status_update_manager.cpp
+  slave/task_status_update_manager.cpp
   slave/validation.cpp
   slave/container_loggers/sandbox.cpp
   slave/containerizer/composing.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 49dec55..9641ad4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1022,7 +1022,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   slave/resource_estimator.cpp						\
   slave/slave.cpp							\
   slave/state.cpp							\
-  slave/status_update_manager.cpp					\
+  slave/task_status_update_manager.cpp					\
   slave/validation.cpp							\
   slave/container_loggers/sandbox.cpp					\
   slave/containerizer/composing.cpp					\
@@ -1167,7 +1167,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   slave/posix_signalhandler.hpp						\
   slave/slave.hpp							\
   slave/state.hpp							\
-  slave/status_update_manager.hpp					\
+  slave/task_status_update_manager.hpp					\
   slave/validation.hpp							\
   slave/windows_ctrlhandler.hpp						\
   slave/container_loggers/sandbox.hpp					\
@@ -2469,7 +2469,7 @@ mesos_tests_SOURCES =						\
   tests/slave_tests.cpp						\
   tests/sorter_tests.cpp					\
   tests/state_tests.cpp						\
-  tests/status_update_manager_tests.cpp				\
+  tests/task_status_update_manager_tests.cpp			\
   tests/teardown_tests.cpp					\
   tests/upgrade_tests.cpp					\
   tests/uri_tests.cpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 63d9822..2e141c6 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -75,7 +75,7 @@
 
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/fetcher.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index de87553..f0716fb 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -69,7 +69,7 @@
 
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
 
 #include "version/version.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c3a4088..6e9adc6 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -98,7 +98,7 @@
 #include "slave/flags.hpp"
 #include "slave/paths.hpp"
 #include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
 
 #ifdef __WINDOWS__
 // Used to install a Windows console ctrl handler.

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
deleted file mode 100644
index fed0903..0000000
--- a/src/slave/status_update_manager.cpp
+++ /dev/null
@@ -1,897 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include <process/delay.hpp>
-#include <process/id.hpp>
-#include <process/process.hpp>
-#include <process/timer.hpp>
-
-#include <stout/check.hpp>
-#include <stout/foreach.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/hashset.hpp>
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/stringify.hpp>
-#include <stout/utils.hpp>
-#include <stout/uuid.hpp>
-
-#include "common/protobuf_utils.hpp"
-
-#include "logging/logging.hpp"
-
-#include "slave/constants.hpp"
-#include "slave/flags.hpp"
-#include "slave/slave.hpp"
-#include "slave/state.hpp"
-#include "slave/status_update_manager.hpp"
-
-using lambda::function;
-
-using std::string;
-
-using process::wait; // Necessary on some OS's to disambiguate.
-using process::Failure;
-using process::Future;
-using process::PID;
-using process::Timeout;
-using process::UPID;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-using state::SlaveState;
-using state::FrameworkState;
-using state::ExecutorState;
-using state::RunState;
-using state::TaskState;
-
-
-class TaskStatusUpdateManagerProcess
-  : public ProtobufProcess<TaskStatusUpdateManagerProcess>
-{
-public:
-  TaskStatusUpdateManagerProcess(const Flags& flags);
-  virtual ~TaskStatusUpdateManagerProcess();
-
-  // Explicitly use 'initialize' since we're overloading below.
-  using process::ProcessBase::initialize;
-
-  // TaskStatusUpdateManager implementation.
-  void initialize(const function<void(StatusUpdate)>& forward);
-
-  Future<Nothing> update(
-      const StatusUpdate& update,
-      const SlaveID& slaveId,
-      const ExecutorID& executorId,
-      const ContainerID& containerId);
-
-  Future<Nothing> update(
-      const StatusUpdate& update,
-      const SlaveID& slaveId);
-
-  Future<bool> acknowledgement(
-      const TaskID& taskId,
-      const FrameworkID& frameworkId,
-      const UUID& uuid);
-
-  Future<Nothing> recover(
-      const string& rootDir,
-      const Option<SlaveState>& state);
-
-  void pause();
-  void resume();
-
-  void cleanup(const FrameworkID& frameworkId);
-
-private:
-  // Helper function to handle update.
-  Future<Nothing> _update(
-      const StatusUpdate& update,
-      const SlaveID& slaveId,
-      bool checkpoint,
-      const Option<ExecutorID>& executorId,
-      const Option<ContainerID>& containerId);
-
-  // Status update timeout.
-  void timeout(const Duration& duration);
-
-  // Forwards the status update to the master and starts a timer based
-  // on the 'duration' to check for ACK from the scheduler.
-  // NOTE: This should only be used for those messages that expect an
-  // ACK (e.g updates from the executor).
-  Timeout forward(const StatusUpdate& update, const Duration& duration);
-
-  // Helper functions.
-
-  // Creates a new status update stream (opening the updates file, if path is
-  // present) and adds it to streams.
-  TaskStatusUpdateStream* createStatusUpdateStream(
-      const TaskID& taskId,
-      const FrameworkID& frameworkId,
-      const SlaveID& slaveId,
-      bool checkpoint,
-      const Option<ExecutorID>& executorId,
-      const Option<ContainerID>& containerId);
-
-  TaskStatusUpdateStream* getStatusUpdateStream(
-      const TaskID& taskId,
-      const FrameworkID& frameworkId);
-
-  void cleanupStatusUpdateStream(
-      const TaskID& taskId,
-      const FrameworkID& frameworkId);
-
-  const Flags flags;
-  bool paused;
-
-  function<void(StatusUpdate)> forward_;
-
-  hashmap<FrameworkID, hashmap<TaskID, TaskStatusUpdateStream*>> streams;
-};
-
-
-TaskStatusUpdateManagerProcess::TaskStatusUpdateManagerProcess(
-    const Flags& _flags)
-  : ProcessBase(process::ID::generate("task-status-update-manager")),
-    flags(_flags),
-    paused(false)
-{
-}
-
-
-TaskStatusUpdateManagerProcess::~TaskStatusUpdateManagerProcess()
-{
-  foreachkey (const FrameworkID& frameworkId, streams) {
-    foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
-      delete stream;
-    }
-  }
-  streams.clear();
-}
-
-
-void TaskStatusUpdateManagerProcess::initialize(
-    const function<void(StatusUpdate)>& forward)
-{
-  forward_ = forward;
-}
-
-
-void TaskStatusUpdateManagerProcess::pause()
-{
-  LOG(INFO) << "Pausing sending task status updates";
-  paused = true;
-}
-
-
-void TaskStatusUpdateManagerProcess::resume()
-{
-  LOG(INFO) << "Resuming sending task status updates";
-  paused = false;
-
-  foreachkey (const FrameworkID& frameworkId, streams) {
-    foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
-      if (!stream->pending.empty()) {
-        const StatusUpdate& update = stream->pending.front();
-        LOG(WARNING) << "Resending task status update " << update;
-        stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
-      }
-    }
-  }
-}
-
-
-Future<Nothing> TaskStatusUpdateManagerProcess::recover(
-    const string& rootDir,
-    const Option<SlaveState>& state)
-{
-  LOG(INFO) << "Recovering task status update manager";
-
-  if (state.isNone()) {
-    return Nothing();
-  }
-
-  foreachvalue (const FrameworkState& framework, state->frameworks) {
-    foreachvalue (const ExecutorState& executor, framework.executors) {
-      LOG(INFO) << "Recovering executor '" << executor.id
-                << "' of framework " << framework.id;
-
-      if (executor.info.isNone()) {
-        LOG(WARNING) << "Skipping recovering task status updates of"
-                     << " executor '" << executor.id
-                     << "' of framework " << framework.id
-                     << " because its info cannot be recovered";
-        continue;
-      }
-
-      if (executor.latest.isNone()) {
-        LOG(WARNING) << "Skipping recovering task status updates of"
-                     << " executor '" << executor.id
-                     << "' of framework " << framework.id
-                     << " because its latest run cannot be recovered";
-        continue;
-      }
-
-      // We are only interested in the latest run of the executor!
-      const ContainerID& latest = executor.latest.get();
-      Option<RunState> run = executor.runs.get(latest);
-      CHECK_SOME(run);
-
-      if (run->completed) {
-        VLOG(1) << "Skipping recovering task status updates of"
-                << " executor '" << executor.id
-                << "' of framework " << framework.id
-                << " because its latest run " << latest.value()
-                << " is completed";
-        continue;
-      }
-
-      foreachvalue (const TaskState& task, run->tasks) {
-        // No updates were ever received for this task!
-        // This means either:
-        // 1) the executor never received this task or
-        // 2) executor launched it but the slave died before it got any updates.
-        if (task.updates.empty()) {
-          LOG(WARNING) << "No status updates found for task " << task.id
-                       << " of framework " << framework.id;
-          continue;
-        }
-
-        // Create a new status update stream.
-        TaskStatusUpdateStream* stream = createStatusUpdateStream(
-            task.id, framework.id, state->id, true, executor.id, latest);
-
-        // Replay the stream.
-        Try<Nothing> replay = stream->replay(task.updates, task.acks);
-        if (replay.isError()) {
-          return Failure(
-              "Failed to replay status updates for task " + stringify(task.id) +
-              " of framework " + stringify(framework.id) +
-              ": " + replay.error());
-        }
-
-        // At the end of the replay, the stream is either terminated or
-        // contains only unacknowledged, if any, pending updates. The
-        // pending updates will be flushed after the slave
-        // re-registers with the master.
-        if (stream->terminated) {
-          cleanupStatusUpdateStream(task.id, framework.id);
-        }
-      }
-    }
-  }
-
-  return Nothing();
-}
-
-
-void TaskStatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId)
-{
-  LOG(INFO) << "Closing task status update streams for framework "
-            << frameworkId;
-
-  if (streams.contains(frameworkId)) {
-    foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) {
-      cleanupStatusUpdateStream(taskId, frameworkId);
-    }
-  }
-}
-
-
-Future<Nothing> TaskStatusUpdateManagerProcess::update(
-    const StatusUpdate& update,
-    const SlaveID& slaveId,
-    const ExecutorID& executorId,
-    const ContainerID& containerId)
-{
-  return _update(update, slaveId, true, executorId, containerId);
-}
-
-
-Future<Nothing> TaskStatusUpdateManagerProcess::update(
-    const StatusUpdate& update,
-    const SlaveID& slaveId)
-{
-  return _update(update, slaveId, false, None(), None());
-}
-
-
-Future<Nothing> TaskStatusUpdateManagerProcess::_update(
-    const StatusUpdate& update,
-    const SlaveID& slaveId,
-    bool checkpoint,
-    const Option<ExecutorID>& executorId,
-    const Option<ContainerID>& containerId)
-{
-  const TaskID& taskId = update.status().task_id();
-  const FrameworkID& frameworkId = update.framework_id();
-
-  LOG(INFO) << "Received task status update " << update;
-
-  // Write the status update to disk and enqueue it to send it to the master.
-  // Create/Get the status update stream for this task.
-  TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
-  if (stream == nullptr) {
-    stream = createStatusUpdateStream(
-        taskId, frameworkId, slaveId, checkpoint, executorId, containerId);
-  }
-
-  // Verify that we didn't get a non-checkpointable update for a
-  // stream that is checkpointable, and vice-versa.
-  if (stream->checkpoint != checkpoint) {
-    return Failure(
-        "Mismatched checkpoint value for task status update " +
-        stringify(update) + " (expected checkpoint=" +
-        stringify(stream->checkpoint) + " actual checkpoint=" +
-        stringify(checkpoint) + ")");
-  }
-
-  // Handle the status update.
-  Try<bool> result = stream->update(update);
-  if (result.isError()) {
-    return Failure(result.error());
-  }
-
-  // We don't return a failed future here so that the slave can re-ack
-  // the duplicate update.
-  if (!result.get()) {
-    return Nothing();
-  }
-
-  // Forward the status update to the master if this is the first in the stream.
-  // Subsequent status updates will get sent in 'acknowledgement()'.
-  if (!paused && stream->pending.size() == 1) {
-    CHECK_NONE(stream->timeout);
-    const Result<StatusUpdate>& next = stream->next();
-    if (next.isError()) {
-      return Failure(next.error());
-    }
-
-    CHECK_SOME(next);
-    stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
-  }
-
-  return Nothing();
-}
-
-
-Timeout TaskStatusUpdateManagerProcess::forward(
-    const StatusUpdate& update,
-    const Duration& duration)
-{
-  CHECK(!paused);
-
-  VLOG(1) << "Forwarding task status update " << update << " to the agent";
-
-  // Forward the update.
-  forward_(update);
-
-  // Send a message to self to resend after some delay if no ACK is received.
-  return delay(duration,
-               self(),
-               &TaskStatusUpdateManagerProcess::timeout,
-               duration).timeout();
-}
-
-
-Future<bool> TaskStatusUpdateManagerProcess::acknowledgement(
-    const TaskID& taskId,
-    const FrameworkID& frameworkId,
-    const UUID& uuid)
-{
-  LOG(INFO) << "Received task status update acknowledgement (UUID: " << uuid
-            << ") for task " << taskId
-            << " of framework " << frameworkId;
-
-  TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
-
-  // This might happen if we haven't completed recovery yet or if the
-  // acknowledgement is for a stream that has been cleaned up.
-  if (stream == nullptr) {
-    return Failure(
-        "Cannot find the task status update stream for task " +
-        stringify(taskId) + " of framework " + stringify(frameworkId));
-  }
-
-  // Get the corresponding update for this ACK.
-  const Result<StatusUpdate>& update = stream->next();
-  if (update.isError()) {
-    return Failure(update.error());
-  }
-
-  // This might happen if we retried a status update and got back
-  // acknowledgments for both the original and the retried update.
-  if (update.isNone()) {
-    return Failure(
-        "Unexpected task status update acknowledgment (UUID: " +
-        uuid.toString() + ") for task " + stringify(taskId) + " of framework " +
-        stringify(frameworkId));
-  }
-
-  // Handle the acknowledgement.
-  Try<bool> result =
-    stream->acknowledgement(taskId, frameworkId, uuid, update.get());
-
-  if (result.isError()) {
-    return Failure(result.error());
-  }
-
-  if (!result.get()) {
-    return Failure("Duplicate task status acknowledgement");
-  }
-
-  // Reset the timeout.
-  stream->timeout = None();
-
-  // Get the next update in the queue.
-  const Result<StatusUpdate>& next = stream->next();
-  if (next.isError()) {
-    return Failure(next.error());
-  }
-
-  bool terminated = stream->terminated;
-
-  if (terminated) {
-    if (next.isSome()) {
-      LOG(WARNING) << "Acknowledged a terminal"
-                   << " task status update " << update.get()
-                   << " but updates are still pending";
-    }
-    cleanupStatusUpdateStream(taskId, frameworkId);
-  } else if (!paused && next.isSome()) {
-    // Forward the next queued status update.
-    stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
-  }
-
-  return !terminated;
-}
-
-
-// TODO(vinod): There should be a limit on the retries.
-void TaskStatusUpdateManagerProcess::timeout(const Duration& duration)
-{
-  if (paused) {
-    return;
-  }
-
-  // Check and see if we should resend any status updates.
-  foreachkey (const FrameworkID& frameworkId, streams) {
-    foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
-      CHECK_NOTNULL(stream);
-      if (!stream->pending.empty()) {
-        CHECK_SOME(stream->timeout);
-        if (stream->timeout->expired()) {
-          const StatusUpdate& update = stream->pending.front();
-          LOG(WARNING) << "Resending task status update " << update;
-
-          // Bounded exponential backoff.
-          Duration duration_ =
-            std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX);
-
-          stream->timeout = forward(update, duration_);
-        }
-      }
-    }
-  }
-}
-
-
-TaskStatusUpdateStream*
-TaskStatusUpdateManagerProcess::createStatusUpdateStream(
-    const TaskID& taskId,
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId,
-    bool checkpoint,
-    const Option<ExecutorID>& executorId,
-    const Option<ContainerID>& containerId)
-{
-  VLOG(1) << "Creating StatusUpdate stream for task " << taskId
-          << " of framework " << frameworkId;
-
-  TaskStatusUpdateStream* stream = new TaskStatusUpdateStream(
-      taskId, frameworkId, slaveId, flags, checkpoint, executorId, containerId);
-
-  streams[frameworkId][taskId] = stream;
-  return stream;
-}
-
-
-TaskStatusUpdateStream* TaskStatusUpdateManagerProcess::getStatusUpdateStream(
-    const TaskID& taskId,
-    const FrameworkID& frameworkId)
-{
-  if (!streams.contains(frameworkId)) {
-    return nullptr;
-  }
-
-  if (!streams[frameworkId].contains(taskId)) {
-    return nullptr;
-  }
-
-  return streams[frameworkId][taskId];
-}
-
-
-void TaskStatusUpdateManagerProcess::cleanupStatusUpdateStream(
-    const TaskID& taskId,
-    const FrameworkID& frameworkId)
-{
-  VLOG(1) << "Cleaning up status update stream"
-          << " for task " << taskId
-          << " of framework " << frameworkId;
-
-  CHECK(streams.contains(frameworkId))
-    << "Cannot find the task status update streams for framework "
-    << frameworkId;
-
-  CHECK(streams[frameworkId].contains(taskId))
-    << "Cannot find the status update streams for task " << taskId;
-
-  TaskStatusUpdateStream* stream = streams[frameworkId][taskId];
-
-  streams[frameworkId].erase(taskId);
-  if (streams[frameworkId].empty()) {
-    streams.erase(frameworkId);
-  }
-
-  delete stream;
-}
-
-
-TaskStatusUpdateManager::TaskStatusUpdateManager(const Flags& flags)
-{
-  process = new TaskStatusUpdateManagerProcess(flags);
-  spawn(process);
-}
-
-
-TaskStatusUpdateManager::~TaskStatusUpdateManager()
-{
-  terminate(process);
-  wait(process);
-  delete process;
-}
-
-
-void TaskStatusUpdateManager::initialize(
-    const function<void(StatusUpdate)>& forward)
-{
-  dispatch(process, &TaskStatusUpdateManagerProcess::initialize, forward);
-}
-
-
-Future<Nothing> TaskStatusUpdateManager::update(
-    const StatusUpdate& update,
-    const SlaveID& slaveId,
-    const ExecutorID& executorId,
-    const ContainerID& containerId)
-{
-  return dispatch(
-      process,
-      &TaskStatusUpdateManagerProcess::update,
-      update,
-      slaveId,
-      executorId,
-      containerId);
-}
-
-
-Future<Nothing> TaskStatusUpdateManager::update(
-    const StatusUpdate& update,
-    const SlaveID& slaveId)
-{
-  return dispatch(
-      process,
-      &TaskStatusUpdateManagerProcess::update,
-      update,
-      slaveId);
-}
-
-
-Future<bool> TaskStatusUpdateManager::acknowledgement(
-    const TaskID& taskId,
-    const FrameworkID& frameworkId,
-    const UUID& uuid)
-{
-  return dispatch(
-      process,
-      &TaskStatusUpdateManagerProcess::acknowledgement,
-      taskId,
-      frameworkId,
-      uuid);
-}
-
-
-Future<Nothing> TaskStatusUpdateManager::recover(
-    const string& rootDir,
-    const Option<SlaveState>& state)
-{
-  return dispatch(
-      process, &TaskStatusUpdateManagerProcess::recover, rootDir, state);
-}
-
-
-void TaskStatusUpdateManager::pause()
-{
-  dispatch(process, &TaskStatusUpdateManagerProcess::pause);
-}
-
-
-void TaskStatusUpdateManager::resume()
-{
-  dispatch(process, &TaskStatusUpdateManagerProcess::resume);
-}
-
-
-void TaskStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
-{
-  dispatch(process, &TaskStatusUpdateManagerProcess::cleanup, frameworkId);
-}
-
-
-TaskStatusUpdateStream::TaskStatusUpdateStream(
-    const TaskID& _taskId,
-    const FrameworkID& _frameworkId,
-    const SlaveID& _slaveId,
-    const Flags& _flags,
-    bool _checkpoint,
-    const Option<ExecutorID>& executorId,
-    const Option<ContainerID>& containerId)
-    : checkpoint(_checkpoint),
-      terminated(false),
-      taskId(_taskId),
-      frameworkId(_frameworkId),
-      slaveId(_slaveId),
-      flags(_flags),
-      error(None())
-{
-  if (checkpoint) {
-    CHECK_SOME(executorId);
-    CHECK_SOME(containerId);
-
-    path = paths::getTaskUpdatesPath(
-        paths::getMetaRootDir(flags.work_dir),
-        slaveId,
-        frameworkId,
-        executorId.get(),
-        containerId.get(),
-        taskId);
-
-    // Create the base updates directory, if it doesn't exist.
-    const string& dirName = Path(path.get()).dirname();
-    Try<Nothing> directory = os::mkdir(dirName);
-    if (directory.isError()) {
-      error = "Failed to create '" + dirName + "': " + directory.error();
-      return;
-    }
-
-    // Open the updates file.
-    // NOTE: We don't use `O_SYNC` here because we only read this file
-    // if the host did not crash. `os::write` success implies the kernel
-    // will have flushed our data to the page cache. This is sufficient
-    // for the recovery scenarios we use this data for.
-    Try<int_fd> result = os::open(
-        path.get(),
-        O_CREAT | O_WRONLY | O_APPEND | O_CLOEXEC,
-        S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
-
-    if (result.isError()) {
-      error = "Failed to open '" + path.get() +
-              "' for status updates: " + result.error();
-      return;
-    }
-
-    // We keep the file open through the lifetime of the task, because it
-    // makes it easy to append status update records to the file.
-    fd = result.get();
-  }
-}
-
-
-TaskStatusUpdateStream::~TaskStatusUpdateStream()
-{
-  if (fd.isSome()) {
-    Try<Nothing> close = os::close(fd.get());
-    if (close.isError()) {
-      CHECK_SOME(path);
-      LOG(ERROR) << "Failed to close file '" << path.get() << "': "
-                 << close.error();
-    }
-  }
-}
-
-
-Try<bool> TaskStatusUpdateStream::update(const StatusUpdate& update)
-{
-  if (error.isSome()) {
-    return Error(error.get());
-  }
-
-  if (!update.has_uuid()) {
-    return Error("Task status update is missing 'uuid'");
-  }
-
-  // Check that this status update has not already been acknowledged.
-  // This could happen in the rare case when the slave received the ACK
-  // from the framework, died, but slave's ACK to the executor never made it!
-  if (acknowledged.contains(UUID::fromBytes(update.uuid()).get())) {
-    LOG(WARNING) << "Ignoring task status update " << update
-                 << " that has already been acknowledged by the framework!";
-    return false;
-  }
-
-  // Check that this update hasn't already been received.
-  // This could happen if the slave receives a status update from an executor,
-  // then crashes after it writes it to disk but before it sends an ack.
-  if (received.contains(UUID::fromBytes(update.uuid()).get())) {
-    LOG(WARNING) << "Ignoring duplicate task status update " << update;
-    return false;
-  }
-
-  // Handle the update, checkpointing if necessary.
-  Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
-  if (result.isError()) {
-    return Error(result.error());
-  }
-
-  return true;
-}
-
-
-Try<bool> TaskStatusUpdateStream::acknowledgement(
-    const TaskID& taskId,
-    const FrameworkID& frameworkId,
-    const UUID& uuid,
-    const StatusUpdate& update)
-{
-  if (error.isSome()) {
-    return Error(error.get());
-  }
-
-  if (acknowledged.contains(uuid)) {
-    LOG(WARNING) << "Duplicate task status update acknowledgment (UUID: "
-                  << uuid << ") for update " << update;
-    return false;
-  }
-
-  // This might happen if we retried a status update and got back
-  // acknowledgments for both the original and the retried update.
-  if (uuid != UUID::fromBytes(update.uuid()).get()) {
-    LOG(WARNING) << "Unexpected task status update acknowledgement (received "
-                 << uuid << ", expecting "
-                 << UUID::fromBytes(update.uuid()).get()
-                 << ") for update " << update;
-    return false;
-  }
-
-  // Handle the ACK, checkpointing if necessary.
-  Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
-  if (result.isError()) {
-    return Error(result.error());
-  }
-
-  return true;
-}
-
-
-Result<StatusUpdate> TaskStatusUpdateStream::next()
-{
-  if (error.isSome()) {
-    return Error(error.get());
-  }
-
-  if (!pending.empty()) {
-    return pending.front();
-  }
-
-  return None();
-}
-
-
-Try<Nothing> TaskStatusUpdateStream::replay(
-    const std::vector<StatusUpdate>& updates,
-    const hashset<UUID>& acks)
-{
-  if (error.isSome()) {
-    return Error(error.get());
-  }
-
-  VLOG(1) << "Replaying task status update stream for task " << taskId;
-
-  foreach (const StatusUpdate& update, updates) {
-    // Handle the update.
-    _handle(update, StatusUpdateRecord::UPDATE);
-
-    // Check if the update has an ACK too.
-    if (acks.contains(UUID::fromBytes(update.uuid()).get())) {
-      _handle(update, StatusUpdateRecord::ACK);
-    }
-  }
-
-  return Nothing();
-}
-
-
-Try<Nothing> TaskStatusUpdateStream::handle(
-    const StatusUpdate& update,
-    const StatusUpdateRecord::Type& type)
-{
-  CHECK_NONE(error);
-
-  // Checkpoint the update if necessary.
-  if (checkpoint) {
-    LOG(INFO) << "Checkpointing " << type << " for task status update "
-              << update;
-
-    CHECK_SOME(fd);
-
-    StatusUpdateRecord record;
-    record.set_type(type);
-
-    if (type == StatusUpdateRecord::UPDATE) {
-      record.mutable_update()->CopyFrom(update);
-    } else {
-      record.set_uuid(update.uuid());
-    }
-
-    Try<Nothing> write = ::protobuf::write(fd.get(), record);
-    if (write.isError()) {
-      error = "Failed to write task status update " + stringify(update) +
-              " to '" + path.get() + "': " + write.error();
-      return Error(error.get());
-    }
-  }
-
-  // Now actually handle the update.
-  _handle(update, type);
-
-  return Nothing();
-}
-
-
-void TaskStatusUpdateStream::_handle(
-    const StatusUpdate& update,
-    const StatusUpdateRecord::Type& type)
-{
-  CHECK_NONE(error);
-
-  if (type == StatusUpdateRecord::UPDATE) {
-    // Record this update.
-    received.insert(UUID::fromBytes(update.uuid()).get());
-
-    // Add it to the pending updates queue.
-    pending.push(update);
-  } else {
-    // Record this ACK.
-    acknowledged.insert(UUID::fromBytes(update.uuid()).get());
-
-    // Remove the corresponding update from the pending queue.
-    pending.pop();
-
-    if (!terminated) {
-      terminated = protobuf::isTerminalState(update.status().state());
-    }
-  }
-}
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
deleted file mode 100644
index 4f7d45d..0000000
--- a/src/slave/status_update_manager.hpp
+++ /dev/null
@@ -1,210 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef __STATUS_UPDATE_MANAGER_HPP__
-#define __STATUS_UPDATE_MANAGER_HPP__
-
-#include <queue>
-#include <string>
-
-#include <mesos/mesos.hpp>
-
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/timeout.hpp>
-
-#include <stout/hashset.hpp>
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-#include <stout/uuid.hpp>
-
-#include "messages/messages.hpp"
-
-#include "slave/flags.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declarations.
-
-namespace state {
-struct SlaveState;
-}
-
-class TaskStatusUpdateManagerProcess;
-struct TaskStatusUpdateStream;
-
-
-// TaskStatusUpdateManager is responsible for
-// 1) Reliably sending status updates to the master.
-// 2) Checkpointing the update to disk (optional).
-// 3) Sending ACKs to the executor (optional).
-// 4) Receiving ACKs from the scheduler.
-class TaskStatusUpdateManager
-{
-public:
-  TaskStatusUpdateManager(const Flags& flags);
-  virtual ~TaskStatusUpdateManager();
-
-  // Expects a callback 'forward' which gets called whenever there is
-  // a new status update that needs to be forwarded to the master.
-  void initialize(const lambda::function<void(StatusUpdate)>& forward);
-
-  // TODO(vinod): Come up with better names/signatures for the
-  // checkpointing and non-checkpointing 'update()' functions.
-  // Currently, it is not obvious that one version of 'update()'
-  // does checkpointing while the other doesn't.
-
-  // Checkpoints the status update and reliably sends the
-  // update to the master (and hence the scheduler).
-  // @return Whether the update is handled successfully
-  // (e.g. checkpointed).
-  process::Future<Nothing> update(
-      const StatusUpdate& update,
-      const SlaveID& slaveId,
-      const ExecutorID& executorId,
-      const ContainerID& containerId);
-
-  // Retries the update to the master (as long as the slave is
-  // alive), but does not checkpoint the update.
-  // @return Whether the update is handled successfully.
-  process::Future<Nothing> update(
-      const StatusUpdate& update,
-      const SlaveID& slaveId);
-
-  // Checkpoints the status update to disk if necessary.
-  // Also, sends the next pending status update, if any.
-  // @return True if the ACK is handled successfully (e.g., checkpointed)
-  //              and the task's status update stream is not terminated.
-  //         False same as above except the status update stream is terminated.
-  //         Failed if there are any errors (e.g., duplicate, checkpointing).
-  process::Future<bool> acknowledgement(
-      const TaskID& taskId,
-      const FrameworkID& frameworkId,
-      const UUID& uuid);
-
-  // Recover status updates.
-  process::Future<Nothing> recover(
-      const std::string& rootDir,
-      const Option<state::SlaveState>& state);
-
-
-  // Pause sending updates.
-  // This is useful when the slave is disconnected because a
-  // disconnected slave will drop the updates.
-  void pause();
-
-  // Unpause and resend all the pending updates right away.
-  // This is useful when the updates were pending because there was
-  // no master elected (e.g., during recovery) or framework failed over.
-  void resume();
-
-  // Closes all the status update streams corresponding to this framework.
-  // NOTE: This stops retrying any pending status updates for this framework.
-  void cleanup(const FrameworkID& frameworkId);
-
-private:
-  TaskStatusUpdateManagerProcess* process;
-};
-
-
-// TaskStatusUpdateStream handles the status updates and acknowledgements
-// of a task, checkpointing them if necessary. It also holds the information
-// about received, acknowledged and pending status updates.
-// NOTE: A task is expected to have a globally unique ID across the lifetime
-// of a framework. In other words the tuple (taskId, frameworkId) should be
-// always unique.
-struct TaskStatusUpdateStream
-{
-  TaskStatusUpdateStream(const TaskID& _taskId,
-                     const FrameworkID& _frameworkId,
-                     const SlaveID& _slaveId,
-                     const Flags& _flags,
-                     bool _checkpoint,
-                     const Option<ExecutorID>& executorId,
-                     const Option<ContainerID>& containerId);
-
-  ~TaskStatusUpdateStream();
-
-  // This function handles the update, checkpointing if necessary.
-  // @return   True if the update is successfully handled.
-  //           False if the update is a duplicate.
-  //           Error Any errors (e.g., checkpointing).
-  Try<bool> update(const StatusUpdate& update);
-
-  // This function handles the ACK, checkpointing if necessary.
-  // @return   True if the acknowledgement is successfully handled.
-  //           False if the acknowledgement is a duplicate.
-  //           Error Any errors (e.g., checkpointing).
-  Try<bool> acknowledgement(
-      const TaskID& taskId,
-      const FrameworkID& frameworkId,
-      const UUID& uuid,
-      const StatusUpdate& update);
-
-  // Returns the next update (or none, if empty) in the queue.
-  Result<StatusUpdate> next();
-
-  // Replays the stream by sequentially handling an update and its
-  // corresponding ACK, if present.
-  Try<Nothing> replay(
-      const std::vector<StatusUpdate>& updates,
-      const hashset<UUID>& acks);
-
-  // TODO(vinod): Explore semantics to make these private.
-  const bool checkpoint;
-  bool terminated;
-  Option<process::Timeout> timeout; // Timeout for resending status update.
-  std::queue<StatusUpdate> pending;
-
-private:
-  // Handles the status update and writes it to disk, if necessary.
-  // TODO(vinod): The write has to be asynchronous to avoid status updates that
-  // are being checkpointed, blocking the processing of other updates.
-  // One solution is to wrap the protobuf::write inside async, but its probably
-  // too much of an overhead to spin up a new libprocess per status update?
-  // A better solution might be to be have async write capability for file io.
-  Try<Nothing> handle(
-      const StatusUpdate& update,
-      const StatusUpdateRecord::Type& type);
-
-  void _handle(
-      const StatusUpdate& update,
-      const StatusUpdateRecord::Type& type);
-
-  const TaskID taskId;
-  const FrameworkID frameworkId;
-  const SlaveID slaveId;
-
-  const Flags flags;
-
-  hashset<UUID> received;
-  hashset<UUID> acknowledged;
-
-  Option<std::string> path; // File path of the update stream.
-  Option<int_fd> fd; // File descriptor to the update stream.
-
-  Option<std::string> error; // Potential non-retryable error.
-};
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-
-#endif // __STATUS_UPDATE_MANAGER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/task_status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/task_status_update_manager.cpp b/src/slave/task_status_update_manager.cpp
new file mode 100644
index 0000000..1ec6be7
--- /dev/null
+++ b/src/slave/task_status_update_manager.cpp
@@ -0,0 +1,898 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "slave/task_status_update_manager.hpp"
+
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+#include <process/timer.hpp>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
+#include <stout/utils.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "logging/logging.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/flags.hpp"
+#include "slave/slave.hpp"
+#include "slave/state.hpp"
+
+using lambda::function;
+
+using std::string;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+using process::Failure;
+using process::Future;
+using process::PID;
+using process::Timeout;
+using process::UPID;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+using state::SlaveState;
+using state::FrameworkState;
+using state::ExecutorState;
+using state::RunState;
+using state::TaskState;
+
+
+class TaskStatusUpdateManagerProcess
+  : public ProtobufProcess<TaskStatusUpdateManagerProcess>
+{
+public:
+  TaskStatusUpdateManagerProcess(const Flags& flags);
+  virtual ~TaskStatusUpdateManagerProcess();
+
+  // Explicitly use 'initialize' since we're overloading below.
+  using process::ProcessBase::initialize;
+
+  // TaskStatusUpdateManager implementation.
+  void initialize(const function<void(StatusUpdate)>& forward);
+
+  Future<Nothing> update(
+      const StatusUpdate& update,
+      const SlaveID& slaveId,
+      const ExecutorID& executorId,
+      const ContainerID& containerId);
+
+  Future<Nothing> update(
+      const StatusUpdate& update,
+      const SlaveID& slaveId);
+
+  Future<bool> acknowledgement(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const UUID& uuid);
+
+  Future<Nothing> recover(
+      const string& rootDir,
+      const Option<SlaveState>& state);
+
+  void pause();
+  void resume();
+
+  void cleanup(const FrameworkID& frameworkId);
+
+private:
+  // Helper function to handle update.
+  Future<Nothing> _update(
+      const StatusUpdate& update,
+      const SlaveID& slaveId,
+      bool checkpoint,
+      const Option<ExecutorID>& executorId,
+      const Option<ContainerID>& containerId);
+
+  // Status update timeout.
+  void timeout(const Duration& duration);
+
+  // Forwards the status update to the master and starts a timer based
+  // on the 'duration' to check for ACK from the scheduler.
+  // NOTE: This should only be used for those messages that expect an
+  // ACK (e.g updates from the executor).
+  Timeout forward(const StatusUpdate& update, const Duration& duration);
+
+  // Helper functions.
+
+  // Creates a new status update stream (opening the updates file, if path is
+  // present) and adds it to streams.
+  TaskStatusUpdateStream* createStatusUpdateStream(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      bool checkpoint,
+      const Option<ExecutorID>& executorId,
+      const Option<ContainerID>& containerId);
+
+  TaskStatusUpdateStream* getStatusUpdateStream(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId);
+
+  void cleanupStatusUpdateStream(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId);
+
+  const Flags flags;
+  bool paused;
+
+  function<void(StatusUpdate)> forward_;
+
+  hashmap<FrameworkID, hashmap<TaskID, TaskStatusUpdateStream*>> streams;
+};
+
+
+TaskStatusUpdateManagerProcess::TaskStatusUpdateManagerProcess(
+    const Flags& _flags)
+  : ProcessBase(process::ID::generate("task-status-update-manager")),
+    flags(_flags),
+    paused(false)
+{
+}
+
+
+TaskStatusUpdateManagerProcess::~TaskStatusUpdateManagerProcess()
+{
+  foreachkey (const FrameworkID& frameworkId, streams) {
+    foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
+      delete stream;
+    }
+  }
+  streams.clear();
+}
+
+
+void TaskStatusUpdateManagerProcess::initialize(
+    const function<void(StatusUpdate)>& forward)
+{
+  forward_ = forward;
+}
+
+
+void TaskStatusUpdateManagerProcess::pause()
+{
+  LOG(INFO) << "Pausing sending task status updates";
+  paused = true;
+}
+
+
+void TaskStatusUpdateManagerProcess::resume()
+{
+  LOG(INFO) << "Resuming sending task status updates";
+  paused = false;
+
+  foreachkey (const FrameworkID& frameworkId, streams) {
+    foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
+      if (!stream->pending.empty()) {
+        const StatusUpdate& update = stream->pending.front();
+        LOG(WARNING) << "Resending task status update " << update;
+        stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
+      }
+    }
+  }
+}
+
+
+Future<Nothing> TaskStatusUpdateManagerProcess::recover(
+    const string& rootDir,
+    const Option<SlaveState>& state)
+{
+  LOG(INFO) << "Recovering task status update manager";
+
+  if (state.isNone()) {
+    return Nothing();
+  }
+
+  foreachvalue (const FrameworkState& framework, state->frameworks) {
+    foreachvalue (const ExecutorState& executor, framework.executors) {
+      LOG(INFO) << "Recovering executor '" << executor.id
+                << "' of framework " << framework.id;
+
+      if (executor.info.isNone()) {
+        LOG(WARNING) << "Skipping recovering task status updates of"
+                     << " executor '" << executor.id
+                     << "' of framework " << framework.id
+                     << " because its info cannot be recovered";
+        continue;
+      }
+
+      if (executor.latest.isNone()) {
+        LOG(WARNING) << "Skipping recovering task status updates of"
+                     << " executor '" << executor.id
+                     << "' of framework " << framework.id
+                     << " because its latest run cannot be recovered";
+        continue;
+      }
+
+      // We are only interested in the latest run of the executor!
+      const ContainerID& latest = executor.latest.get();
+      Option<RunState> run = executor.runs.get(latest);
+      CHECK_SOME(run);
+
+      if (run->completed) {
+        VLOG(1) << "Skipping recovering task status updates of"
+                << " executor '" << executor.id
+                << "' of framework " << framework.id
+                << " because its latest run " << latest.value()
+                << " is completed";
+        continue;
+      }
+
+      foreachvalue (const TaskState& task, run->tasks) {
+        // No updates were ever received for this task!
+        // This means either:
+        // 1) the executor never received this task or
+        // 2) executor launched it but the slave died before it got any updates.
+        if (task.updates.empty()) {
+          LOG(WARNING) << "No status updates found for task " << task.id
+                       << " of framework " << framework.id;
+          continue;
+        }
+
+        // Create a new status update stream.
+        TaskStatusUpdateStream* stream = createStatusUpdateStream(
+            task.id, framework.id, state->id, true, executor.id, latest);
+
+        // Replay the stream.
+        Try<Nothing> replay = stream->replay(task.updates, task.acks);
+        if (replay.isError()) {
+          return Failure(
+              "Failed to replay status updates for task " + stringify(task.id) +
+              " of framework " + stringify(framework.id) +
+              ": " + replay.error());
+        }
+
+        // At the end of the replay, the stream is either terminated or
+        // contains only unacknowledged, if any, pending updates. The
+        // pending updates will be flushed after the slave
+        // re-registers with the master.
+        if (stream->terminated) {
+          cleanupStatusUpdateStream(task.id, framework.id);
+        }
+      }
+    }
+  }
+
+  return Nothing();
+}
+
+
+void TaskStatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId)
+{
+  LOG(INFO) << "Closing task status update streams for framework "
+            << frameworkId;
+
+  if (streams.contains(frameworkId)) {
+    foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) {
+      cleanupStatusUpdateStream(taskId, frameworkId);
+    }
+  }
+}
+
+
+Future<Nothing> TaskStatusUpdateManagerProcess::update(
+    const StatusUpdate& update,
+    const SlaveID& slaveId,
+    const ExecutorID& executorId,
+    const ContainerID& containerId)
+{
+  return _update(update, slaveId, true, executorId, containerId);
+}
+
+
+Future<Nothing> TaskStatusUpdateManagerProcess::update(
+    const StatusUpdate& update,
+    const SlaveID& slaveId)
+{
+  return _update(update, slaveId, false, None(), None());
+}
+
+
+Future<Nothing> TaskStatusUpdateManagerProcess::_update(
+    const StatusUpdate& update,
+    const SlaveID& slaveId,
+    bool checkpoint,
+    const Option<ExecutorID>& executorId,
+    const Option<ContainerID>& containerId)
+{
+  const TaskID& taskId = update.status().task_id();
+  const FrameworkID& frameworkId = update.framework_id();
+
+  LOG(INFO) << "Received task status update " << update;
+
+  // Write the status update to disk and enqueue it to send it to the master.
+  // Create/Get the status update stream for this task.
+  TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
+  if (stream == nullptr) {
+    stream = createStatusUpdateStream(
+        taskId, frameworkId, slaveId, checkpoint, executorId, containerId);
+  }
+
+  // Verify that we didn't get a non-checkpointable update for a
+  // stream that is checkpointable, and vice-versa.
+  if (stream->checkpoint != checkpoint) {
+    return Failure(
+        "Mismatched checkpoint value for task status update " +
+        stringify(update) + " (expected checkpoint=" +
+        stringify(stream->checkpoint) + " actual checkpoint=" +
+        stringify(checkpoint) + ")");
+  }
+
+  // Handle the status update.
+  Try<bool> result = stream->update(update);
+  if (result.isError()) {
+    return Failure(result.error());
+  }
+
+  // We don't return a failed future here so that the slave can re-ack
+  // the duplicate update.
+  if (!result.get()) {
+    return Nothing();
+  }
+
+  // Forward the status update to the master if this is the first in the stream.
+  // Subsequent status updates will get sent in 'acknowledgement()'.
+  if (!paused && stream->pending.size() == 1) {
+    CHECK_NONE(stream->timeout);
+    const Result<StatusUpdate>& next = stream->next();
+    if (next.isError()) {
+      return Failure(next.error());
+    }
+
+    CHECK_SOME(next);
+    stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  }
+
+  return Nothing();
+}
+
+
+Timeout TaskStatusUpdateManagerProcess::forward(
+    const StatusUpdate& update,
+    const Duration& duration)
+{
+  CHECK(!paused);
+
+  VLOG(1) << "Forwarding task status update " << update << " to the agent";
+
+  // Forward the update.
+  forward_(update);
+
+  // Send a message to self to resend after some delay if no ACK is received.
+  return delay(duration,
+               self(),
+               &TaskStatusUpdateManagerProcess::timeout,
+               duration).timeout();
+}
+
+
+Future<bool> TaskStatusUpdateManagerProcess::acknowledgement(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const UUID& uuid)
+{
+  LOG(INFO) << "Received task status update acknowledgement (UUID: " << uuid
+            << ") for task " << taskId
+            << " of framework " << frameworkId;
+
+  TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
+
+  // This might happen if we haven't completed recovery yet or if the
+  // acknowledgement is for a stream that has been cleaned up.
+  if (stream == nullptr) {
+    return Failure(
+        "Cannot find the task status update stream for task " +
+        stringify(taskId) + " of framework " + stringify(frameworkId));
+  }
+
+  // Get the corresponding update for this ACK.
+  const Result<StatusUpdate>& update = stream->next();
+  if (update.isError()) {
+    return Failure(update.error());
+  }
+
+  // This might happen if we retried a status update and got back
+  // acknowledgments for both the original and the retried update.
+  if (update.isNone()) {
+    return Failure(
+        "Unexpected task status update acknowledgment (UUID: " +
+        uuid.toString() + ") for task " + stringify(taskId) + " of framework " +
+        stringify(frameworkId));
+  }
+
+  // Handle the acknowledgement.
+  Try<bool> result =
+    stream->acknowledgement(taskId, frameworkId, uuid, update.get());
+
+  if (result.isError()) {
+    return Failure(result.error());
+  }
+
+  if (!result.get()) {
+    return Failure("Duplicate task status acknowledgement");
+  }
+
+  // Reset the timeout.
+  stream->timeout = None();
+
+  // Get the next update in the queue.
+  const Result<StatusUpdate>& next = stream->next();
+  if (next.isError()) {
+    return Failure(next.error());
+  }
+
+  bool terminated = stream->terminated;
+
+  if (terminated) {
+    if (next.isSome()) {
+      LOG(WARNING) << "Acknowledged a terminal"
+                   << " task status update " << update.get()
+                   << " but updates are still pending";
+    }
+    cleanupStatusUpdateStream(taskId, frameworkId);
+  } else if (!paused && next.isSome()) {
+    // Forward the next queued status update.
+    stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  }
+
+  return !terminated;
+}
+
+
+// TODO(vinod): There should be a limit on the retries.
+void TaskStatusUpdateManagerProcess::timeout(const Duration& duration)
+{
+  if (paused) {
+    return;
+  }
+
+  // Check and see if we should resend any status updates.
+  foreachkey (const FrameworkID& frameworkId, streams) {
+    foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
+      CHECK_NOTNULL(stream);
+      if (!stream->pending.empty()) {
+        CHECK_SOME(stream->timeout);
+        if (stream->timeout->expired()) {
+          const StatusUpdate& update = stream->pending.front();
+          LOG(WARNING) << "Resending task status update " << update;
+
+          // Bounded exponential backoff.
+          Duration duration_ =
+            std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX);
+
+          stream->timeout = forward(update, duration_);
+        }
+      }
+    }
+  }
+}
+
+
+TaskStatusUpdateStream*
+TaskStatusUpdateManagerProcess::createStatusUpdateStream(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    bool checkpoint,
+    const Option<ExecutorID>& executorId,
+    const Option<ContainerID>& containerId)
+{
+  VLOG(1) << "Creating StatusUpdate stream for task " << taskId
+          << " of framework " << frameworkId;
+
+  TaskStatusUpdateStream* stream = new TaskStatusUpdateStream(
+      taskId, frameworkId, slaveId, flags, checkpoint, executorId, containerId);
+
+  streams[frameworkId][taskId] = stream;
+  return stream;
+}
+
+
+TaskStatusUpdateStream* TaskStatusUpdateManagerProcess::getStatusUpdateStream(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId)
+{
+  if (!streams.contains(frameworkId)) {
+    return nullptr;
+  }
+
+  if (!streams[frameworkId].contains(taskId)) {
+    return nullptr;
+  }
+
+  return streams[frameworkId][taskId];
+}
+
+
+void TaskStatusUpdateManagerProcess::cleanupStatusUpdateStream(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId)
+{
+  VLOG(1) << "Cleaning up status update stream"
+          << " for task " << taskId
+          << " of framework " << frameworkId;
+
+  CHECK(streams.contains(frameworkId))
+    << "Cannot find the task status update streams for framework "
+    << frameworkId;
+
+  CHECK(streams[frameworkId].contains(taskId))
+    << "Cannot find the status update streams for task " << taskId;
+
+  TaskStatusUpdateStream* stream = streams[frameworkId][taskId];
+
+  streams[frameworkId].erase(taskId);
+  if (streams[frameworkId].empty()) {
+    streams.erase(frameworkId);
+  }
+
+  delete stream;
+}
+
+
+TaskStatusUpdateManager::TaskStatusUpdateManager(const Flags& flags)
+{
+  process = new TaskStatusUpdateManagerProcess(flags);
+  spawn(process);
+}
+
+
+TaskStatusUpdateManager::~TaskStatusUpdateManager()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}
+
+
+void TaskStatusUpdateManager::initialize(
+    const function<void(StatusUpdate)>& forward)
+{
+  dispatch(process, &TaskStatusUpdateManagerProcess::initialize, forward);
+}
+
+
+Future<Nothing> TaskStatusUpdateManager::update(
+    const StatusUpdate& update,
+    const SlaveID& slaveId,
+    const ExecutorID& executorId,
+    const ContainerID& containerId)
+{
+  return dispatch(
+      process,
+      &TaskStatusUpdateManagerProcess::update,
+      update,
+      slaveId,
+      executorId,
+      containerId);
+}
+
+
+Future<Nothing> TaskStatusUpdateManager::update(
+    const StatusUpdate& update,
+    const SlaveID& slaveId)
+{
+  return dispatch(
+      process,
+      &TaskStatusUpdateManagerProcess::update,
+      update,
+      slaveId);
+}
+
+
+Future<bool> TaskStatusUpdateManager::acknowledgement(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const UUID& uuid)
+{
+  return dispatch(
+      process,
+      &TaskStatusUpdateManagerProcess::acknowledgement,
+      taskId,
+      frameworkId,
+      uuid);
+}
+
+
+Future<Nothing> TaskStatusUpdateManager::recover(
+    const string& rootDir,
+    const Option<SlaveState>& state)
+{
+  return dispatch(
+      process, &TaskStatusUpdateManagerProcess::recover, rootDir, state);
+}
+
+
+void TaskStatusUpdateManager::pause()
+{
+  dispatch(process, &TaskStatusUpdateManagerProcess::pause);
+}
+
+
+void TaskStatusUpdateManager::resume()
+{
+  dispatch(process, &TaskStatusUpdateManagerProcess::resume);
+}
+
+
+void TaskStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
+{
+  dispatch(process, &TaskStatusUpdateManagerProcess::cleanup, frameworkId);
+}
+
+
+TaskStatusUpdateStream::TaskStatusUpdateStream(
+    const TaskID& _taskId,
+    const FrameworkID& _frameworkId,
+    const SlaveID& _slaveId,
+    const Flags& _flags,
+    bool _checkpoint,
+    const Option<ExecutorID>& executorId,
+    const Option<ContainerID>& containerId)
+    : checkpoint(_checkpoint),
+      terminated(false),
+      taskId(_taskId),
+      frameworkId(_frameworkId),
+      slaveId(_slaveId),
+      flags(_flags),
+      error(None())
+{
+  if (checkpoint) {
+    CHECK_SOME(executorId);
+    CHECK_SOME(containerId);
+
+    path = paths::getTaskUpdatesPath(
+        paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        frameworkId,
+        executorId.get(),
+        containerId.get(),
+        taskId);
+
+    // Create the base updates directory, if it doesn't exist.
+    const string& dirName = Path(path.get()).dirname();
+    Try<Nothing> directory = os::mkdir(dirName);
+    if (directory.isError()) {
+      error = "Failed to create '" + dirName + "': " + directory.error();
+      return;
+    }
+
+    // Open the updates file.
+    // NOTE: We don't use `O_SYNC` here because we only read this file
+    // if the host did not crash. `os::write` success implies the kernel
+    // will have flushed our data to the page cache. This is sufficient
+    // for the recovery scenarios we use this data for.
+    Try<int_fd> result = os::open(
+        path.get(),
+        O_CREAT | O_WRONLY | O_APPEND | O_CLOEXEC,
+        S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+    if (result.isError()) {
+      error = "Failed to open '" + path.get() +
+              "' for status updates: " + result.error();
+      return;
+    }
+
+    // We keep the file open through the lifetime of the task, because it
+    // makes it easy to append status update records to the file.
+    fd = result.get();
+  }
+}
+
+
+TaskStatusUpdateStream::~TaskStatusUpdateStream()
+{
+  if (fd.isSome()) {
+    Try<Nothing> close = os::close(fd.get());
+    if (close.isError()) {
+      CHECK_SOME(path);
+      LOG(ERROR) << "Failed to close file '" << path.get() << "': "
+                 << close.error();
+    }
+  }
+}
+
+
+Try<bool> TaskStatusUpdateStream::update(const StatusUpdate& update)
+{
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  if (!update.has_uuid()) {
+    return Error("Task status update is missing 'uuid'");
+  }
+
+  // Check that this status update has not already been acknowledged.
+  // This could happen in the rare case when the slave received the ACK
+  // from the framework, died, but slave's ACK to the executor never made it!
+  if (acknowledged.contains(UUID::fromBytes(update.uuid()).get())) {
+    LOG(WARNING) << "Ignoring task status update " << update
+                 << " that has already been acknowledged by the framework!";
+    return false;
+  }
+
+  // Check that this update hasn't already been received.
+  // This could happen if the slave receives a status update from an executor,
+  // then crashes after it writes it to disk but before it sends an ack.
+  if (received.contains(UUID::fromBytes(update.uuid()).get())) {
+    LOG(WARNING) << "Ignoring duplicate task status update " << update;
+    return false;
+  }
+
+  // Handle the update, checkpointing if necessary.
+  Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
+  if (result.isError()) {
+    return Error(result.error());
+  }
+
+  return true;
+}
+
+
+Try<bool> TaskStatusUpdateStream::acknowledgement(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const UUID& uuid,
+    const StatusUpdate& update)
+{
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  if (acknowledged.contains(uuid)) {
+    LOG(WARNING) << "Duplicate task status update acknowledgment (UUID: "
+                  << uuid << ") for update " << update;
+    return false;
+  }
+
+  // This might happen if we retried a status update and got back
+  // acknowledgments for both the original and the retried update.
+  if (uuid != UUID::fromBytes(update.uuid()).get()) {
+    LOG(WARNING) << "Unexpected task status update acknowledgement (received "
+                 << uuid << ", expecting "
+                 << UUID::fromBytes(update.uuid()).get()
+                 << ") for update " << update;
+    return false;
+  }
+
+  // Handle the ACK, checkpointing if necessary.
+  Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
+  if (result.isError()) {
+    return Error(result.error());
+  }
+
+  return true;
+}
+
+
+Result<StatusUpdate> TaskStatusUpdateStream::next()
+{
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  if (!pending.empty()) {
+    return pending.front();
+  }
+
+  return None();
+}
+
+
+Try<Nothing> TaskStatusUpdateStream::replay(
+    const std::vector<StatusUpdate>& updates,
+    const hashset<UUID>& acks)
+{
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  VLOG(1) << "Replaying task status update stream for task " << taskId;
+
+  foreach (const StatusUpdate& update, updates) {
+    // Handle the update.
+    _handle(update, StatusUpdateRecord::UPDATE);
+
+    // Check if the update has an ACK too.
+    if (acks.contains(UUID::fromBytes(update.uuid()).get())) {
+      _handle(update, StatusUpdateRecord::ACK);
+    }
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing> TaskStatusUpdateStream::handle(
+    const StatusUpdate& update,
+    const StatusUpdateRecord::Type& type)
+{
+  CHECK_NONE(error);
+
+  // Checkpoint the update if necessary.
+  if (checkpoint) {
+    LOG(INFO) << "Checkpointing " << type << " for task status update "
+              << update;
+
+    CHECK_SOME(fd);
+
+    StatusUpdateRecord record;
+    record.set_type(type);
+
+    if (type == StatusUpdateRecord::UPDATE) {
+      record.mutable_update()->CopyFrom(update);
+    } else {
+      record.set_uuid(update.uuid());
+    }
+
+    Try<Nothing> write = ::protobuf::write(fd.get(), record);
+    if (write.isError()) {
+      error = "Failed to write task status update " + stringify(update) +
+              " to '" + path.get() + "': " + write.error();
+      return Error(error.get());
+    }
+  }
+
+  // Now actually handle the update.
+  _handle(update, type);
+
+  return Nothing();
+}
+
+
+void TaskStatusUpdateStream::_handle(
+    const StatusUpdate& update,
+    const StatusUpdateRecord::Type& type)
+{
+  CHECK_NONE(error);
+
+  if (type == StatusUpdateRecord::UPDATE) {
+    // Record this update.
+    received.insert(UUID::fromBytes(update.uuid()).get());
+
+    // Add it to the pending updates queue.
+    pending.push(update);
+  } else {
+    // Record this ACK.
+    acknowledged.insert(UUID::fromBytes(update.uuid()).get());
+
+    // Remove the corresponding update from the pending queue.
+    pending.pop();
+
+    if (!terminated) {
+      terminated = protobuf::isTerminalState(update.status().state());
+    }
+  }
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/task_status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/task_status_update_manager.hpp b/src/slave/task_status_update_manager.hpp
new file mode 100644
index 0000000..6bdb468
--- /dev/null
+++ b/src/slave/task_status_update_manager.hpp
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __TASK_STATUS_UPDATE_MANAGER_HPP__
+#define __TASK_STATUS_UPDATE_MANAGER_HPP__
+
+#include <queue>
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/messages.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declarations.
+
+namespace state {
+struct SlaveState;
+}
+
+class TaskStatusUpdateManagerProcess;
+struct TaskStatusUpdateStream;
+
+
+// TaskStatusUpdateManager is responsible for
+// 1) Reliably sending status updates to the master.
+// 2) Checkpointing the update to disk (optional).
+// 3) Sending ACKs to the executor (optional).
+// 4) Receiving ACKs from the scheduler.
+class TaskStatusUpdateManager
+{
+public:
+  TaskStatusUpdateManager(const Flags& flags);
+  virtual ~TaskStatusUpdateManager();
+
+  // Expects a callback 'forward' which gets called whenever there is
+  // a new status update that needs to be forwarded to the master.
+  void initialize(const lambda::function<void(StatusUpdate)>& forward);
+
+  // TODO(vinod): Come up with better names/signatures for the
+  // checkpointing and non-checkpointing 'update()' functions.
+  // Currently, it is not obvious that one version of 'update()'
+  // does checkpointing while the other doesn't.
+
+  // Checkpoints the status update and reliably sends the
+  // update to the master (and hence the scheduler).
+  // @return Whether the update is handled successfully
+  // (e.g. checkpointed).
+  process::Future<Nothing> update(
+      const StatusUpdate& update,
+      const SlaveID& slaveId,
+      const ExecutorID& executorId,
+      const ContainerID& containerId);
+
+  // Retries the update to the master (as long as the slave is
+  // alive), but does not checkpoint the update.
+  // @return Whether the update is handled successfully.
+  process::Future<Nothing> update(
+      const StatusUpdate& update,
+      const SlaveID& slaveId);
+
+  // Checkpoints the status update to disk if necessary.
+  // Also, sends the next pending status update, if any.
+  // @return True if the ACK is handled successfully (e.g., checkpointed)
+  //              and the task's status update stream is not terminated.
+  //         False same as above except the status update stream is terminated.
+  //         Failed if there are any errors (e.g., duplicate, checkpointing).
+  process::Future<bool> acknowledgement(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const UUID& uuid);
+
+  // Recover status updates.
+  process::Future<Nothing> recover(
+      const std::string& rootDir,
+      const Option<state::SlaveState>& state);
+
+
+  // Pause sending updates.
+  // This is useful when the slave is disconnected because a
+  // disconnected slave will drop the updates.
+  void pause();
+
+  // Unpause and resend all the pending updates right away.
+  // This is useful when the updates were pending because there was
+  // no master elected (e.g., during recovery) or framework failed over.
+  void resume();
+
+  // Closes all the status update streams corresponding to this framework.
+  // NOTE: This stops retrying any pending status updates for this framework.
+  void cleanup(const FrameworkID& frameworkId);
+
+private:
+  TaskStatusUpdateManagerProcess* process;
+};
+
+
+// TaskStatusUpdateStream handles the status updates and acknowledgements
+// of a task, checkpointing them if necessary. It also holds the information
+// about received, acknowledged and pending status updates.
+// NOTE: A task is expected to have a globally unique ID across the lifetime
+// of a framework. In other words the tuple (taskId, frameworkId) should be
+// always unique.
+struct TaskStatusUpdateStream
+{
+  TaskStatusUpdateStream(const TaskID& _taskId,
+                     const FrameworkID& _frameworkId,
+                     const SlaveID& _slaveId,
+                     const Flags& _flags,
+                     bool _checkpoint,
+                     const Option<ExecutorID>& executorId,
+                     const Option<ContainerID>& containerId);
+
+  ~TaskStatusUpdateStream();
+
+  // This function handles the update, checkpointing if necessary.
+  // @return   True if the update is successfully handled.
+  //           False if the update is a duplicate.
+  //           Error Any errors (e.g., checkpointing).
+  Try<bool> update(const StatusUpdate& update);
+
+  // This function handles the ACK, checkpointing if necessary.
+  // @return   True if the acknowledgement is successfully handled.
+  //           False if the acknowledgement is a duplicate.
+  //           Error Any errors (e.g., checkpointing).
+  Try<bool> acknowledgement(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const UUID& uuid,
+      const StatusUpdate& update);
+
+  // Returns the next update (or none, if empty) in the queue.
+  Result<StatusUpdate> next();
+
+  // Replays the stream by sequentially handling an update and its
+  // corresponding ACK, if present.
+  Try<Nothing> replay(
+      const std::vector<StatusUpdate>& updates,
+      const hashset<UUID>& acks);
+
+  // TODO(vinod): Explore semantics to make these private.
+  const bool checkpoint;
+  bool terminated;
+  Option<process::Timeout> timeout; // Timeout for resending status update.
+  std::queue<StatusUpdate> pending;
+
+private:
+  // Handles the status update and writes it to disk, if necessary.
+  // TODO(vinod): The write has to be asynchronous to avoid status updates that
+  // are being checkpointed, blocking the processing of other updates.
+  // One solution is to wrap the protobuf::write inside async, but its probably
+  // too much of an overhead to spin up a new libprocess per status update?
+  // A better solution might be to be have async write capability for file io.
+  Try<Nothing> handle(
+      const StatusUpdate& update,
+      const StatusUpdateRecord::Type& type);
+
+  void _handle(
+      const StatusUpdate& update,
+      const StatusUpdateRecord::Type& type);
+
+  const TaskID taskId;
+  const FrameworkID frameworkId;
+  const SlaveID slaveId;
+
+  const Flags flags;
+
+  hashset<UUID> received;
+  hashset<UUID> acknowledged;
+
+  Option<std::string> path; // File path of the update stream.
+  Option<int_fd> fd; // File descriptor to the update stream.
+
+  Option<std::string> error; // Potential non-retryable error.
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+
+#endif // __TASK_STATUS_UPDATE_MANAGER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index db5e531..8997cc0 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -117,7 +117,7 @@ set(MESOS_TESTS_SRC
   slave_tests.cpp
   slave_validation_tests.cpp
   sorter_tests.cpp
-  status_update_manager_tests.cpp
+  task_status_update_manager_tests.cpp
   uri_tests.cpp
   uri_fetcher_tests.cpp
   values_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/cluster.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index 6111be4..b854904 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -84,7 +84,7 @@
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/fetcher.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index b3b1348..d572a09 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -58,7 +58,7 @@
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
 #include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
 
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/fetcher.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index 5dd2188..90c4369 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -29,7 +29,7 @@
 #include <stout/option.hpp>
 
 #include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
 
 #include "tests/mock_slave.hpp"