You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/11/21 00:54:21 UTC
[6/6] mesos git commit: Added the "task" prefix to the name of the
status update manager files.
Added the "task" prefix to the name of the status update manager files.
Review: https://reviews.apache.org/r/63853/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/62d11733
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/62d11733
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/62d11733
Branch: refs/heads/master
Commit: 62d11733446cf54beb02295e7cd94d419541f152
Parents: 2377cd8
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon Nov 20 16:43:12 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon Nov 20 16:53:05 2017 -0800
----------------------------------------------------------------------
src/CMakeLists.txt | 2 +-
src/Makefile.am | 6 +-
src/local/local.cpp | 2 +-
src/slave/main.cpp | 2 +-
src/slave/slave.cpp | 2 +-
src/slave/status_update_manager.cpp | 897 -------------------
src/slave/status_update_manager.hpp | 210 -----
src/slave/task_status_update_manager.cpp | 898 ++++++++++++++++++++
src/slave/task_status_update_manager.hpp | 210 +++++
src/tests/CMakeLists.txt | 2 +-
src/tests/cluster.cpp | 2 +-
src/tests/cluster.hpp | 2 +-
src/tests/mock_slave.cpp | 2 +-
src/tests/status_update_manager_tests.cpp | 852 -------------------
src/tests/task_status_update_manager_tests.cpp | 852 +++++++++++++++++++
15 files changed, 1971 insertions(+), 1970 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4f11418..be212d9 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -257,7 +257,7 @@ set(AGENT_SRC
slave/resource_estimator.cpp
slave/slave.cpp
slave/state.cpp
- slave/status_update_manager.cpp
+ slave/task_status_update_manager.cpp
slave/validation.cpp
slave/container_loggers/sandbox.cpp
slave/containerizer/composing.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 49dec55..9641ad4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1022,7 +1022,7 @@ libmesos_no_3rdparty_la_SOURCES += \
slave/resource_estimator.cpp \
slave/slave.cpp \
slave/state.cpp \
- slave/status_update_manager.cpp \
+ slave/task_status_update_manager.cpp \
slave/validation.cpp \
slave/container_loggers/sandbox.cpp \
slave/containerizer/composing.cpp \
@@ -1167,7 +1167,7 @@ libmesos_no_3rdparty_la_SOURCES += \
slave/posix_signalhandler.hpp \
slave/slave.hpp \
slave/state.hpp \
- slave/status_update_manager.hpp \
+ slave/task_status_update_manager.hpp \
slave/validation.hpp \
slave/windows_ctrlhandler.hpp \
slave/container_loggers/sandbox.hpp \
@@ -2469,7 +2469,7 @@ mesos_tests_SOURCES = \
tests/slave_tests.cpp \
tests/sorter_tests.cpp \
tests/state_tests.cpp \
- tests/status_update_manager_tests.cpp \
+ tests/task_status_update_manager_tests.cpp \
tests/teardown_tests.cpp \
tests/upgrade_tests.cpp \
tests/uri_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 63d9822..2e141c6 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -75,7 +75,7 @@
#include "slave/gc.hpp"
#include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/fetcher.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index de87553..f0716fb 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -69,7 +69,7 @@
#include "slave/gc.hpp"
#include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
#include "version/version.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c3a4088..6e9adc6 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -98,7 +98,7 @@
#include "slave/flags.hpp"
#include "slave/paths.hpp"
#include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
#ifdef __WINDOWS__
// Used to install a Windows console ctrl handler.
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
deleted file mode 100644
index fed0903..0000000
--- a/src/slave/status_update_manager.cpp
+++ /dev/null
@@ -1,897 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include <process/delay.hpp>
-#include <process/id.hpp>
-#include <process/process.hpp>
-#include <process/timer.hpp>
-
-#include <stout/check.hpp>
-#include <stout/foreach.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/hashset.hpp>
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/stringify.hpp>
-#include <stout/utils.hpp>
-#include <stout/uuid.hpp>
-
-#include "common/protobuf_utils.hpp"
-
-#include "logging/logging.hpp"
-
-#include "slave/constants.hpp"
-#include "slave/flags.hpp"
-#include "slave/slave.hpp"
-#include "slave/state.hpp"
-#include "slave/status_update_manager.hpp"
-
-using lambda::function;
-
-using std::string;
-
-using process::wait; // Necessary on some OS's to disambiguate.
-using process::Failure;
-using process::Future;
-using process::PID;
-using process::Timeout;
-using process::UPID;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-using state::SlaveState;
-using state::FrameworkState;
-using state::ExecutorState;
-using state::RunState;
-using state::TaskState;
-
-
-class TaskStatusUpdateManagerProcess
- : public ProtobufProcess<TaskStatusUpdateManagerProcess>
-{
-public:
- TaskStatusUpdateManagerProcess(const Flags& flags);
- virtual ~TaskStatusUpdateManagerProcess();
-
- // Explicitly use 'initialize' since we're overloading below.
- using process::ProcessBase::initialize;
-
- // TaskStatusUpdateManager implementation.
- void initialize(const function<void(StatusUpdate)>& forward);
-
- Future<Nothing> update(
- const StatusUpdate& update,
- const SlaveID& slaveId,
- const ExecutorID& executorId,
- const ContainerID& containerId);
-
- Future<Nothing> update(
- const StatusUpdate& update,
- const SlaveID& slaveId);
-
- Future<bool> acknowledgement(
- const TaskID& taskId,
- const FrameworkID& frameworkId,
- const UUID& uuid);
-
- Future<Nothing> recover(
- const string& rootDir,
- const Option<SlaveState>& state);
-
- void pause();
- void resume();
-
- void cleanup(const FrameworkID& frameworkId);
-
-private:
- // Helper function to handle update.
- Future<Nothing> _update(
- const StatusUpdate& update,
- const SlaveID& slaveId,
- bool checkpoint,
- const Option<ExecutorID>& executorId,
- const Option<ContainerID>& containerId);
-
- // Status update timeout.
- void timeout(const Duration& duration);
-
- // Forwards the status update to the master and starts a timer based
- // on the 'duration' to check for ACK from the scheduler.
- // NOTE: This should only be used for those messages that expect an
- // ACK (e.g updates from the executor).
- Timeout forward(const StatusUpdate& update, const Duration& duration);
-
- // Helper functions.
-
- // Creates a new status update stream (opening the updates file, if path is
- // present) and adds it to streams.
- TaskStatusUpdateStream* createStatusUpdateStream(
- const TaskID& taskId,
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- bool checkpoint,
- const Option<ExecutorID>& executorId,
- const Option<ContainerID>& containerId);
-
- TaskStatusUpdateStream* getStatusUpdateStream(
- const TaskID& taskId,
- const FrameworkID& frameworkId);
-
- void cleanupStatusUpdateStream(
- const TaskID& taskId,
- const FrameworkID& frameworkId);
-
- const Flags flags;
- bool paused;
-
- function<void(StatusUpdate)> forward_;
-
- hashmap<FrameworkID, hashmap<TaskID, TaskStatusUpdateStream*>> streams;
-};
-
-
-TaskStatusUpdateManagerProcess::TaskStatusUpdateManagerProcess(
- const Flags& _flags)
- : ProcessBase(process::ID::generate("task-status-update-manager")),
- flags(_flags),
- paused(false)
-{
-}
-
-
-TaskStatusUpdateManagerProcess::~TaskStatusUpdateManagerProcess()
-{
- foreachkey (const FrameworkID& frameworkId, streams) {
- foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
- delete stream;
- }
- }
- streams.clear();
-}
-
-
-void TaskStatusUpdateManagerProcess::initialize(
- const function<void(StatusUpdate)>& forward)
-{
- forward_ = forward;
-}
-
-
-void TaskStatusUpdateManagerProcess::pause()
-{
- LOG(INFO) << "Pausing sending task status updates";
- paused = true;
-}
-
-
-void TaskStatusUpdateManagerProcess::resume()
-{
- LOG(INFO) << "Resuming sending task status updates";
- paused = false;
-
- foreachkey (const FrameworkID& frameworkId, streams) {
- foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
- if (!stream->pending.empty()) {
- const StatusUpdate& update = stream->pending.front();
- LOG(WARNING) << "Resending task status update " << update;
- stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
- }
- }
- }
-}
-
-
-Future<Nothing> TaskStatusUpdateManagerProcess::recover(
- const string& rootDir,
- const Option<SlaveState>& state)
-{
- LOG(INFO) << "Recovering task status update manager";
-
- if (state.isNone()) {
- return Nothing();
- }
-
- foreachvalue (const FrameworkState& framework, state->frameworks) {
- foreachvalue (const ExecutorState& executor, framework.executors) {
- LOG(INFO) << "Recovering executor '" << executor.id
- << "' of framework " << framework.id;
-
- if (executor.info.isNone()) {
- LOG(WARNING) << "Skipping recovering task status updates of"
- << " executor '" << executor.id
- << "' of framework " << framework.id
- << " because its info cannot be recovered";
- continue;
- }
-
- if (executor.latest.isNone()) {
- LOG(WARNING) << "Skipping recovering task status updates of"
- << " executor '" << executor.id
- << "' of framework " << framework.id
- << " because its latest run cannot be recovered";
- continue;
- }
-
- // We are only interested in the latest run of the executor!
- const ContainerID& latest = executor.latest.get();
- Option<RunState> run = executor.runs.get(latest);
- CHECK_SOME(run);
-
- if (run->completed) {
- VLOG(1) << "Skipping recovering task status updates of"
- << " executor '" << executor.id
- << "' of framework " << framework.id
- << " because its latest run " << latest.value()
- << " is completed";
- continue;
- }
-
- foreachvalue (const TaskState& task, run->tasks) {
- // No updates were ever received for this task!
- // This means either:
- // 1) the executor never received this task or
- // 2) executor launched it but the slave died before it got any updates.
- if (task.updates.empty()) {
- LOG(WARNING) << "No status updates found for task " << task.id
- << " of framework " << framework.id;
- continue;
- }
-
- // Create a new status update stream.
- TaskStatusUpdateStream* stream = createStatusUpdateStream(
- task.id, framework.id, state->id, true, executor.id, latest);
-
- // Replay the stream.
- Try<Nothing> replay = stream->replay(task.updates, task.acks);
- if (replay.isError()) {
- return Failure(
- "Failed to replay status updates for task " + stringify(task.id) +
- " of framework " + stringify(framework.id) +
- ": " + replay.error());
- }
-
- // At the end of the replay, the stream is either terminated or
- // contains only unacknowledged, if any, pending updates. The
- // pending updates will be flushed after the slave
- // re-registers with the master.
- if (stream->terminated) {
- cleanupStatusUpdateStream(task.id, framework.id);
- }
- }
- }
- }
-
- return Nothing();
-}
-
-
-void TaskStatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId)
-{
- LOG(INFO) << "Closing task status update streams for framework "
- << frameworkId;
-
- if (streams.contains(frameworkId)) {
- foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) {
- cleanupStatusUpdateStream(taskId, frameworkId);
- }
- }
-}
-
-
-Future<Nothing> TaskStatusUpdateManagerProcess::update(
- const StatusUpdate& update,
- const SlaveID& slaveId,
- const ExecutorID& executorId,
- const ContainerID& containerId)
-{
- return _update(update, slaveId, true, executorId, containerId);
-}
-
-
-Future<Nothing> TaskStatusUpdateManagerProcess::update(
- const StatusUpdate& update,
- const SlaveID& slaveId)
-{
- return _update(update, slaveId, false, None(), None());
-}
-
-
-Future<Nothing> TaskStatusUpdateManagerProcess::_update(
- const StatusUpdate& update,
- const SlaveID& slaveId,
- bool checkpoint,
- const Option<ExecutorID>& executorId,
- const Option<ContainerID>& containerId)
-{
- const TaskID& taskId = update.status().task_id();
- const FrameworkID& frameworkId = update.framework_id();
-
- LOG(INFO) << "Received task status update " << update;
-
- // Write the status update to disk and enqueue it to send it to the master.
- // Create/Get the status update stream for this task.
- TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
- if (stream == nullptr) {
- stream = createStatusUpdateStream(
- taskId, frameworkId, slaveId, checkpoint, executorId, containerId);
- }
-
- // Verify that we didn't get a non-checkpointable update for a
- // stream that is checkpointable, and vice-versa.
- if (stream->checkpoint != checkpoint) {
- return Failure(
- "Mismatched checkpoint value for task status update " +
- stringify(update) + " (expected checkpoint=" +
- stringify(stream->checkpoint) + " actual checkpoint=" +
- stringify(checkpoint) + ")");
- }
-
- // Handle the status update.
- Try<bool> result = stream->update(update);
- if (result.isError()) {
- return Failure(result.error());
- }
-
- // We don't return a failed future here so that the slave can re-ack
- // the duplicate update.
- if (!result.get()) {
- return Nothing();
- }
-
- // Forward the status update to the master if this is the first in the stream.
- // Subsequent status updates will get sent in 'acknowledgement()'.
- if (!paused && stream->pending.size() == 1) {
- CHECK_NONE(stream->timeout);
- const Result<StatusUpdate>& next = stream->next();
- if (next.isError()) {
- return Failure(next.error());
- }
-
- CHECK_SOME(next);
- stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
- }
-
- return Nothing();
-}
-
-
-Timeout TaskStatusUpdateManagerProcess::forward(
- const StatusUpdate& update,
- const Duration& duration)
-{
- CHECK(!paused);
-
- VLOG(1) << "Forwarding task status update " << update << " to the agent";
-
- // Forward the update.
- forward_(update);
-
- // Send a message to self to resend after some delay if no ACK is received.
- return delay(duration,
- self(),
- &TaskStatusUpdateManagerProcess::timeout,
- duration).timeout();
-}
-
-
-Future<bool> TaskStatusUpdateManagerProcess::acknowledgement(
- const TaskID& taskId,
- const FrameworkID& frameworkId,
- const UUID& uuid)
-{
- LOG(INFO) << "Received task status update acknowledgement (UUID: " << uuid
- << ") for task " << taskId
- << " of framework " << frameworkId;
-
- TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
-
- // This might happen if we haven't completed recovery yet or if the
- // acknowledgement is for a stream that has been cleaned up.
- if (stream == nullptr) {
- return Failure(
- "Cannot find the task status update stream for task " +
- stringify(taskId) + " of framework " + stringify(frameworkId));
- }
-
- // Get the corresponding update for this ACK.
- const Result<StatusUpdate>& update = stream->next();
- if (update.isError()) {
- return Failure(update.error());
- }
-
- // This might happen if we retried a status update and got back
- // acknowledgments for both the original and the retried update.
- if (update.isNone()) {
- return Failure(
- "Unexpected task status update acknowledgment (UUID: " +
- uuid.toString() + ") for task " + stringify(taskId) + " of framework " +
- stringify(frameworkId));
- }
-
- // Handle the acknowledgement.
- Try<bool> result =
- stream->acknowledgement(taskId, frameworkId, uuid, update.get());
-
- if (result.isError()) {
- return Failure(result.error());
- }
-
- if (!result.get()) {
- return Failure("Duplicate task status acknowledgement");
- }
-
- // Reset the timeout.
- stream->timeout = None();
-
- // Get the next update in the queue.
- const Result<StatusUpdate>& next = stream->next();
- if (next.isError()) {
- return Failure(next.error());
- }
-
- bool terminated = stream->terminated;
-
- if (terminated) {
- if (next.isSome()) {
- LOG(WARNING) << "Acknowledged a terminal"
- << " task status update " << update.get()
- << " but updates are still pending";
- }
- cleanupStatusUpdateStream(taskId, frameworkId);
- } else if (!paused && next.isSome()) {
- // Forward the next queued status update.
- stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
- }
-
- return !terminated;
-}
-
-
-// TODO(vinod): There should be a limit on the retries.
-void TaskStatusUpdateManagerProcess::timeout(const Duration& duration)
-{
- if (paused) {
- return;
- }
-
- // Check and see if we should resend any status updates.
- foreachkey (const FrameworkID& frameworkId, streams) {
- foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
- CHECK_NOTNULL(stream);
- if (!stream->pending.empty()) {
- CHECK_SOME(stream->timeout);
- if (stream->timeout->expired()) {
- const StatusUpdate& update = stream->pending.front();
- LOG(WARNING) << "Resending task status update " << update;
-
- // Bounded exponential backoff.
- Duration duration_ =
- std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX);
-
- stream->timeout = forward(update, duration_);
- }
- }
- }
- }
-}
-
-
-TaskStatusUpdateStream*
-TaskStatusUpdateManagerProcess::createStatusUpdateStream(
- const TaskID& taskId,
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- bool checkpoint,
- const Option<ExecutorID>& executorId,
- const Option<ContainerID>& containerId)
-{
- VLOG(1) << "Creating StatusUpdate stream for task " << taskId
- << " of framework " << frameworkId;
-
- TaskStatusUpdateStream* stream = new TaskStatusUpdateStream(
- taskId, frameworkId, slaveId, flags, checkpoint, executorId, containerId);
-
- streams[frameworkId][taskId] = stream;
- return stream;
-}
-
-
-TaskStatusUpdateStream* TaskStatusUpdateManagerProcess::getStatusUpdateStream(
- const TaskID& taskId,
- const FrameworkID& frameworkId)
-{
- if (!streams.contains(frameworkId)) {
- return nullptr;
- }
-
- if (!streams[frameworkId].contains(taskId)) {
- return nullptr;
- }
-
- return streams[frameworkId][taskId];
-}
-
-
-void TaskStatusUpdateManagerProcess::cleanupStatusUpdateStream(
- const TaskID& taskId,
- const FrameworkID& frameworkId)
-{
- VLOG(1) << "Cleaning up status update stream"
- << " for task " << taskId
- << " of framework " << frameworkId;
-
- CHECK(streams.contains(frameworkId))
- << "Cannot find the task status update streams for framework "
- << frameworkId;
-
- CHECK(streams[frameworkId].contains(taskId))
- << "Cannot find the status update streams for task " << taskId;
-
- TaskStatusUpdateStream* stream = streams[frameworkId][taskId];
-
- streams[frameworkId].erase(taskId);
- if (streams[frameworkId].empty()) {
- streams.erase(frameworkId);
- }
-
- delete stream;
-}
-
-
-TaskStatusUpdateManager::TaskStatusUpdateManager(const Flags& flags)
-{
- process = new TaskStatusUpdateManagerProcess(flags);
- spawn(process);
-}
-
-
-TaskStatusUpdateManager::~TaskStatusUpdateManager()
-{
- terminate(process);
- wait(process);
- delete process;
-}
-
-
-void TaskStatusUpdateManager::initialize(
- const function<void(StatusUpdate)>& forward)
-{
- dispatch(process, &TaskStatusUpdateManagerProcess::initialize, forward);
-}
-
-
-Future<Nothing> TaskStatusUpdateManager::update(
- const StatusUpdate& update,
- const SlaveID& slaveId,
- const ExecutorID& executorId,
- const ContainerID& containerId)
-{
- return dispatch(
- process,
- &TaskStatusUpdateManagerProcess::update,
- update,
- slaveId,
- executorId,
- containerId);
-}
-
-
-Future<Nothing> TaskStatusUpdateManager::update(
- const StatusUpdate& update,
- const SlaveID& slaveId)
-{
- return dispatch(
- process,
- &TaskStatusUpdateManagerProcess::update,
- update,
- slaveId);
-}
-
-
-Future<bool> TaskStatusUpdateManager::acknowledgement(
- const TaskID& taskId,
- const FrameworkID& frameworkId,
- const UUID& uuid)
-{
- return dispatch(
- process,
- &TaskStatusUpdateManagerProcess::acknowledgement,
- taskId,
- frameworkId,
- uuid);
-}
-
-
-Future<Nothing> TaskStatusUpdateManager::recover(
- const string& rootDir,
- const Option<SlaveState>& state)
-{
- return dispatch(
- process, &TaskStatusUpdateManagerProcess::recover, rootDir, state);
-}
-
-
-void TaskStatusUpdateManager::pause()
-{
- dispatch(process, &TaskStatusUpdateManagerProcess::pause);
-}
-
-
-void TaskStatusUpdateManager::resume()
-{
- dispatch(process, &TaskStatusUpdateManagerProcess::resume);
-}
-
-
-void TaskStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
-{
- dispatch(process, &TaskStatusUpdateManagerProcess::cleanup, frameworkId);
-}
-
-
-TaskStatusUpdateStream::TaskStatusUpdateStream(
- const TaskID& _taskId,
- const FrameworkID& _frameworkId,
- const SlaveID& _slaveId,
- const Flags& _flags,
- bool _checkpoint,
- const Option<ExecutorID>& executorId,
- const Option<ContainerID>& containerId)
- : checkpoint(_checkpoint),
- terminated(false),
- taskId(_taskId),
- frameworkId(_frameworkId),
- slaveId(_slaveId),
- flags(_flags),
- error(None())
-{
- if (checkpoint) {
- CHECK_SOME(executorId);
- CHECK_SOME(containerId);
-
- path = paths::getTaskUpdatesPath(
- paths::getMetaRootDir(flags.work_dir),
- slaveId,
- frameworkId,
- executorId.get(),
- containerId.get(),
- taskId);
-
- // Create the base updates directory, if it doesn't exist.
- const string& dirName = Path(path.get()).dirname();
- Try<Nothing> directory = os::mkdir(dirName);
- if (directory.isError()) {
- error = "Failed to create '" + dirName + "': " + directory.error();
- return;
- }
-
- // Open the updates file.
- // NOTE: We don't use `O_SYNC` here because we only read this file
- // if the host did not crash. `os::write` success implies the kernel
- // will have flushed our data to the page cache. This is sufficient
- // for the recovery scenarios we use this data for.
- Try<int_fd> result = os::open(
- path.get(),
- O_CREAT | O_WRONLY | O_APPEND | O_CLOEXEC,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
-
- if (result.isError()) {
- error = "Failed to open '" + path.get() +
- "' for status updates: " + result.error();
- return;
- }
-
- // We keep the file open through the lifetime of the task, because it
- // makes it easy to append status update records to the file.
- fd = result.get();
- }
-}
-
-
-TaskStatusUpdateStream::~TaskStatusUpdateStream()
-{
- if (fd.isSome()) {
- Try<Nothing> close = os::close(fd.get());
- if (close.isError()) {
- CHECK_SOME(path);
- LOG(ERROR) << "Failed to close file '" << path.get() << "': "
- << close.error();
- }
- }
-}
-
-
-Try<bool> TaskStatusUpdateStream::update(const StatusUpdate& update)
-{
- if (error.isSome()) {
- return Error(error.get());
- }
-
- if (!update.has_uuid()) {
- return Error("Task status update is missing 'uuid'");
- }
-
- // Check that this status update has not already been acknowledged.
- // This could happen in the rare case when the slave received the ACK
- // from the framework, died, but slave's ACK to the executor never made it!
- if (acknowledged.contains(UUID::fromBytes(update.uuid()).get())) {
- LOG(WARNING) << "Ignoring task status update " << update
- << " that has already been acknowledged by the framework!";
- return false;
- }
-
- // Check that this update hasn't already been received.
- // This could happen if the slave receives a status update from an executor,
- // then crashes after it writes it to disk but before it sends an ack.
- if (received.contains(UUID::fromBytes(update.uuid()).get())) {
- LOG(WARNING) << "Ignoring duplicate task status update " << update;
- return false;
- }
-
- // Handle the update, checkpointing if necessary.
- Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
- if (result.isError()) {
- return Error(result.error());
- }
-
- return true;
-}
-
-
-Try<bool> TaskStatusUpdateStream::acknowledgement(
- const TaskID& taskId,
- const FrameworkID& frameworkId,
- const UUID& uuid,
- const StatusUpdate& update)
-{
- if (error.isSome()) {
- return Error(error.get());
- }
-
- if (acknowledged.contains(uuid)) {
- LOG(WARNING) << "Duplicate task status update acknowledgment (UUID: "
- << uuid << ") for update " << update;
- return false;
- }
-
- // This might happen if we retried a status update and got back
- // acknowledgments for both the original and the retried update.
- if (uuid != UUID::fromBytes(update.uuid()).get()) {
- LOG(WARNING) << "Unexpected task status update acknowledgement (received "
- << uuid << ", expecting "
- << UUID::fromBytes(update.uuid()).get()
- << ") for update " << update;
- return false;
- }
-
- // Handle the ACK, checkpointing if necessary.
- Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
- if (result.isError()) {
- return Error(result.error());
- }
-
- return true;
-}
-
-
-Result<StatusUpdate> TaskStatusUpdateStream::next()
-{
- if (error.isSome()) {
- return Error(error.get());
- }
-
- if (!pending.empty()) {
- return pending.front();
- }
-
- return None();
-}
-
-
-Try<Nothing> TaskStatusUpdateStream::replay(
- const std::vector<StatusUpdate>& updates,
- const hashset<UUID>& acks)
-{
- if (error.isSome()) {
- return Error(error.get());
- }
-
- VLOG(1) << "Replaying task status update stream for task " << taskId;
-
- foreach (const StatusUpdate& update, updates) {
- // Handle the update.
- _handle(update, StatusUpdateRecord::UPDATE);
-
- // Check if the update has an ACK too.
- if (acks.contains(UUID::fromBytes(update.uuid()).get())) {
- _handle(update, StatusUpdateRecord::ACK);
- }
- }
-
- return Nothing();
-}
-
-
-Try<Nothing> TaskStatusUpdateStream::handle(
- const StatusUpdate& update,
- const StatusUpdateRecord::Type& type)
-{
- CHECK_NONE(error);
-
- // Checkpoint the update if necessary.
- if (checkpoint) {
- LOG(INFO) << "Checkpointing " << type << " for task status update "
- << update;
-
- CHECK_SOME(fd);
-
- StatusUpdateRecord record;
- record.set_type(type);
-
- if (type == StatusUpdateRecord::UPDATE) {
- record.mutable_update()->CopyFrom(update);
- } else {
- record.set_uuid(update.uuid());
- }
-
- Try<Nothing> write = ::protobuf::write(fd.get(), record);
- if (write.isError()) {
- error = "Failed to write task status update " + stringify(update) +
- " to '" + path.get() + "': " + write.error();
- return Error(error.get());
- }
- }
-
- // Now actually handle the update.
- _handle(update, type);
-
- return Nothing();
-}
-
-
-void TaskStatusUpdateStream::_handle(
- const StatusUpdate& update,
- const StatusUpdateRecord::Type& type)
-{
- CHECK_NONE(error);
-
- if (type == StatusUpdateRecord::UPDATE) {
- // Record this update.
- received.insert(UUID::fromBytes(update.uuid()).get());
-
- // Add it to the pending updates queue.
- pending.push(update);
- } else {
- // Record this ACK.
- acknowledged.insert(UUID::fromBytes(update.uuid()).get());
-
- // Remove the corresponding update from the pending queue.
- pending.pop();
-
- if (!terminated) {
- terminated = protobuf::isTerminalState(update.status().state());
- }
- }
-}
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
deleted file mode 100644
index 4f7d45d..0000000
--- a/src/slave/status_update_manager.hpp
+++ /dev/null
@@ -1,210 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef __STATUS_UPDATE_MANAGER_HPP__
-#define __STATUS_UPDATE_MANAGER_HPP__
-
-#include <queue>
-#include <string>
-
-#include <mesos/mesos.hpp>
-
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/timeout.hpp>
-
-#include <stout/hashset.hpp>
-#include <stout/lambda.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-#include <stout/uuid.hpp>
-
-#include "messages/messages.hpp"
-
-#include "slave/flags.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declarations.
-
-namespace state {
-struct SlaveState;
-}
-
-class TaskStatusUpdateManagerProcess;
-struct TaskStatusUpdateStream;
-
-
-// TaskStatusUpdateManager is responsible for
-// 1) Reliably sending status updates to the master.
-// 2) Checkpointing the update to disk (optional).
-// 3) Sending ACKs to the executor (optional).
-// 4) Receiving ACKs from the scheduler.
-class TaskStatusUpdateManager
-{
-public:
- TaskStatusUpdateManager(const Flags& flags);
- virtual ~TaskStatusUpdateManager();
-
- // Expects a callback 'forward' which gets called whenever there is
- // a new status update that needs to be forwarded to the master.
- void initialize(const lambda::function<void(StatusUpdate)>& forward);
-
- // TODO(vinod): Come up with better names/signatures for the
- // checkpointing and non-checkpointing 'update()' functions.
- // Currently, it is not obvious that one version of 'update()'
- // does checkpointing while the other doesn't.
-
- // Checkpoints the status update and reliably sends the
- // update to the master (and hence the scheduler).
- // @return Whether the update is handled successfully
- // (e.g. checkpointed).
- process::Future<Nothing> update(
- const StatusUpdate& update,
- const SlaveID& slaveId,
- const ExecutorID& executorId,
- const ContainerID& containerId);
-
- // Retries the update to the master (as long as the slave is
- // alive), but does not checkpoint the update.
- // @return Whether the update is handled successfully.
- process::Future<Nothing> update(
- const StatusUpdate& update,
- const SlaveID& slaveId);
-
- // Checkpoints the status update to disk if necessary.
- // Also, sends the next pending status update, if any.
- // @return True if the ACK is handled successfully (e.g., checkpointed)
- // and the task's status update stream is not terminated.
- // False same as above except the status update stream is terminated.
- // Failed if there are any errors (e.g., duplicate, checkpointing).
- process::Future<bool> acknowledgement(
- const TaskID& taskId,
- const FrameworkID& frameworkId,
- const UUID& uuid);
-
- // Recover status updates.
- process::Future<Nothing> recover(
- const std::string& rootDir,
- const Option<state::SlaveState>& state);
-
-
- // Pause sending updates.
- // This is useful when the slave is disconnected because a
- // disconnected slave will drop the updates.
- void pause();
-
- // Unpause and resend all the pending updates right away.
- // This is useful when the updates were pending because there was
- // no master elected (e.g., during recovery) or framework failed over.
- void resume();
-
- // Closes all the status update streams corresponding to this framework.
- // NOTE: This stops retrying any pending status updates for this framework.
- void cleanup(const FrameworkID& frameworkId);
-
-private:
- TaskStatusUpdateManagerProcess* process;
-};
-
-
-// TaskStatusUpdateStream handles the status updates and acknowledgements
-// of a task, checkpointing them if necessary. It also holds the information
-// about received, acknowledged and pending status updates.
-// NOTE: A task is expected to have a globally unique ID across the lifetime
-// of a framework. In other words the tuple (taskId, frameworkId) should be
-// always unique.
-struct TaskStatusUpdateStream
-{
- TaskStatusUpdateStream(const TaskID& _taskId,
- const FrameworkID& _frameworkId,
- const SlaveID& _slaveId,
- const Flags& _flags,
- bool _checkpoint,
- const Option<ExecutorID>& executorId,
- const Option<ContainerID>& containerId);
-
- ~TaskStatusUpdateStream();
-
- // This function handles the update, checkpointing if necessary.
- // @return True if the update is successfully handled.
- // False if the update is a duplicate.
- // Error Any errors (e.g., checkpointing).
- Try<bool> update(const StatusUpdate& update);
-
- // This function handles the ACK, checkpointing if necessary.
- // @return True if the acknowledgement is successfully handled.
- // False if the acknowledgement is a duplicate.
- // Error Any errors (e.g., checkpointing).
- Try<bool> acknowledgement(
- const TaskID& taskId,
- const FrameworkID& frameworkId,
- const UUID& uuid,
- const StatusUpdate& update);
-
- // Returns the next update (or none, if empty) in the queue.
- Result<StatusUpdate> next();
-
- // Replays the stream by sequentially handling an update and its
- // corresponding ACK, if present.
- Try<Nothing> replay(
- const std::vector<StatusUpdate>& updates,
- const hashset<UUID>& acks);
-
- // TODO(vinod): Explore semantics to make these private.
- const bool checkpoint;
- bool terminated;
- Option<process::Timeout> timeout; // Timeout for resending status update.
- std::queue<StatusUpdate> pending;
-
-private:
- // Handles the status update and writes it to disk, if necessary.
- // TODO(vinod): The write has to be asynchronous to avoid status updates that
- // are being checkpointed, blocking the processing of other updates.
- // One solution is to wrap the protobuf::write inside async, but its probably
- // too much of an overhead to spin up a new libprocess per status update?
- // A better solution might be to be have async write capability for file io.
- Try<Nothing> handle(
- const StatusUpdate& update,
- const StatusUpdateRecord::Type& type);
-
- void _handle(
- const StatusUpdate& update,
- const StatusUpdateRecord::Type& type);
-
- const TaskID taskId;
- const FrameworkID frameworkId;
- const SlaveID slaveId;
-
- const Flags flags;
-
- hashset<UUID> received;
- hashset<UUID> acknowledged;
-
- Option<std::string> path; // File path of the update stream.
- Option<int_fd> fd; // File descriptor to the update stream.
-
- Option<std::string> error; // Potential non-retryable error.
-};
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-
-#endif // __STATUS_UPDATE_MANAGER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/task_status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/task_status_update_manager.cpp b/src/slave/task_status_update_manager.cpp
new file mode 100644
index 0000000..1ec6be7
--- /dev/null
+++ b/src/slave/task_status_update_manager.cpp
@@ -0,0 +1,898 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "slave/task_status_update_manager.hpp"
+
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+#include <process/timer.hpp>
+
+#include <stout/check.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
+#include <stout/utils.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "logging/logging.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/flags.hpp"
+#include "slave/slave.hpp"
+#include "slave/state.hpp"
+
+using lambda::function;
+
+using std::string;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+using process::Failure;
+using process::Future;
+using process::PID;
+using process::Timeout;
+using process::UPID;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+using state::SlaveState;
+using state::FrameworkState;
+using state::ExecutorState;
+using state::RunState;
+using state::TaskState;
+
+
+class TaskStatusUpdateManagerProcess
+ : public ProtobufProcess<TaskStatusUpdateManagerProcess>
+{
+public:
+ TaskStatusUpdateManagerProcess(const Flags& flags);
+ virtual ~TaskStatusUpdateManagerProcess();
+
+ // Explicitly use 'initialize' since we're overloading below.
+ using process::ProcessBase::initialize;
+
+ // TaskStatusUpdateManager implementation.
+ void initialize(const function<void(StatusUpdate)>& forward);
+
+ Future<Nothing> update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId,
+ const ExecutorID& executorId,
+ const ContainerID& containerId);
+
+ Future<Nothing> update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId);
+
+ Future<bool> acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const UUID& uuid);
+
+ Future<Nothing> recover(
+ const string& rootDir,
+ const Option<SlaveState>& state);
+
+ void pause();
+ void resume();
+
+ void cleanup(const FrameworkID& frameworkId);
+
+private:
+ // Helper function to handle update.
+ Future<Nothing> _update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId,
+ bool checkpoint,
+ const Option<ExecutorID>& executorId,
+ const Option<ContainerID>& containerId);
+
+ // Status update timeout.
+ void timeout(const Duration& duration);
+
+ // Forwards the status update to the master and starts a timer based
+ // on the 'duration' to check for ACK from the scheduler.
+ // NOTE: This should only be used for those messages that expect an
+ // ACK (e.g updates from the executor).
+ Timeout forward(const StatusUpdate& update, const Duration& duration);
+
+ // Helper functions.
+
+ // Creates a new status update stream (opening the updates file, if path is
+ // present) and adds it to streams.
+ TaskStatusUpdateStream* createStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ bool checkpoint,
+ const Option<ExecutorID>& executorId,
+ const Option<ContainerID>& containerId);
+
+ TaskStatusUpdateStream* getStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId);
+
+ void cleanupStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId);
+
+ const Flags flags;
+ bool paused;
+
+ function<void(StatusUpdate)> forward_;
+
+ hashmap<FrameworkID, hashmap<TaskID, TaskStatusUpdateStream*>> streams;
+};
+
+
+TaskStatusUpdateManagerProcess::TaskStatusUpdateManagerProcess(
+ const Flags& _flags)
+ : ProcessBase(process::ID::generate("task-status-update-manager")),
+ flags(_flags),
+ paused(false)
+{
+}
+
+
+TaskStatusUpdateManagerProcess::~TaskStatusUpdateManagerProcess()
+{
+ foreachkey (const FrameworkID& frameworkId, streams) {
+ foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
+ delete stream;
+ }
+ }
+ streams.clear();
+}
+
+
+void TaskStatusUpdateManagerProcess::initialize(
+ const function<void(StatusUpdate)>& forward)
+{
+ forward_ = forward;
+}
+
+
+void TaskStatusUpdateManagerProcess::pause()
+{
+ LOG(INFO) << "Pausing sending task status updates";
+ paused = true;
+}
+
+
+void TaskStatusUpdateManagerProcess::resume()
+{
+ LOG(INFO) << "Resuming sending task status updates";
+ paused = false;
+
+ foreachkey (const FrameworkID& frameworkId, streams) {
+ foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
+ if (!stream->pending.empty()) {
+ const StatusUpdate& update = stream->pending.front();
+ LOG(WARNING) << "Resending task status update " << update;
+ stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ }
+ }
+ }
+}
+
+
+Future<Nothing> TaskStatusUpdateManagerProcess::recover(
+ const string& rootDir,
+ const Option<SlaveState>& state)
+{
+ LOG(INFO) << "Recovering task status update manager";
+
+ if (state.isNone()) {
+ return Nothing();
+ }
+
+ foreachvalue (const FrameworkState& framework, state->frameworks) {
+ foreachvalue (const ExecutorState& executor, framework.executors) {
+ LOG(INFO) << "Recovering executor '" << executor.id
+ << "' of framework " << framework.id;
+
+ if (executor.info.isNone()) {
+ LOG(WARNING) << "Skipping recovering task status updates of"
+ << " executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its info cannot be recovered";
+ continue;
+ }
+
+ if (executor.latest.isNone()) {
+ LOG(WARNING) << "Skipping recovering task status updates of"
+ << " executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its latest run cannot be recovered";
+ continue;
+ }
+
+ // We are only interested in the latest run of the executor!
+ const ContainerID& latest = executor.latest.get();
+ Option<RunState> run = executor.runs.get(latest);
+ CHECK_SOME(run);
+
+ if (run->completed) {
+ VLOG(1) << "Skipping recovering task status updates of"
+ << " executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its latest run " << latest.value()
+ << " is completed";
+ continue;
+ }
+
+ foreachvalue (const TaskState& task, run->tasks) {
+ // No updates were ever received for this task!
+ // This means either:
+ // 1) the executor never received this task or
+ // 2) executor launched it but the slave died before it got any updates.
+ if (task.updates.empty()) {
+ LOG(WARNING) << "No status updates found for task " << task.id
+ << " of framework " << framework.id;
+ continue;
+ }
+
+ // Create a new status update stream.
+ TaskStatusUpdateStream* stream = createStatusUpdateStream(
+ task.id, framework.id, state->id, true, executor.id, latest);
+
+ // Replay the stream.
+ Try<Nothing> replay = stream->replay(task.updates, task.acks);
+ if (replay.isError()) {
+ return Failure(
+ "Failed to replay status updates for task " + stringify(task.id) +
+ " of framework " + stringify(framework.id) +
+ ": " + replay.error());
+ }
+
+ // At the end of the replay, the stream is either terminated or
+ // contains only unacknowledged, if any, pending updates. The
+ // pending updates will be flushed after the slave
+ // re-registers with the master.
+ if (stream->terminated) {
+ cleanupStatusUpdateStream(task.id, framework.id);
+ }
+ }
+ }
+ }
+
+ return Nothing();
+}
+
+
+void TaskStatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId)
+{
+ LOG(INFO) << "Closing task status update streams for framework "
+ << frameworkId;
+
+ if (streams.contains(frameworkId)) {
+ foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) {
+ cleanupStatusUpdateStream(taskId, frameworkId);
+ }
+ }
+}
+
+
+Future<Nothing> TaskStatusUpdateManagerProcess::update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId,
+ const ExecutorID& executorId,
+ const ContainerID& containerId)
+{
+ return _update(update, slaveId, true, executorId, containerId);
+}
+
+
+Future<Nothing> TaskStatusUpdateManagerProcess::update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId)
+{
+ return _update(update, slaveId, false, None(), None());
+}
+
+
+Future<Nothing> TaskStatusUpdateManagerProcess::_update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId,
+ bool checkpoint,
+ const Option<ExecutorID>& executorId,
+ const Option<ContainerID>& containerId)
+{
+ const TaskID& taskId = update.status().task_id();
+ const FrameworkID& frameworkId = update.framework_id();
+
+ LOG(INFO) << "Received task status update " << update;
+
+ // Write the status update to disk and enqueue it to send it to the master.
+ // Create/Get the status update stream for this task.
+ TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
+ if (stream == nullptr) {
+ stream = createStatusUpdateStream(
+ taskId, frameworkId, slaveId, checkpoint, executorId, containerId);
+ }
+
+ // Verify that we didn't get a non-checkpointable update for a
+ // stream that is checkpointable, and vice-versa.
+ if (stream->checkpoint != checkpoint) {
+ return Failure(
+ "Mismatched checkpoint value for task status update " +
+ stringify(update) + " (expected checkpoint=" +
+ stringify(stream->checkpoint) + " actual checkpoint=" +
+ stringify(checkpoint) + ")");
+ }
+
+ // Handle the status update.
+ Try<bool> result = stream->update(update);
+ if (result.isError()) {
+ return Failure(result.error());
+ }
+
+ // We don't return a failed future here so that the slave can re-ack
+ // the duplicate update.
+ if (!result.get()) {
+ return Nothing();
+ }
+
+ // Forward the status update to the master if this is the first in the stream.
+ // Subsequent status updates will get sent in 'acknowledgement()'.
+ if (!paused && stream->pending.size() == 1) {
+ CHECK_NONE(stream->timeout);
+ const Result<StatusUpdate>& next = stream->next();
+ if (next.isError()) {
+ return Failure(next.error());
+ }
+
+ CHECK_SOME(next);
+ stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ }
+
+ return Nothing();
+}
+
+
+Timeout TaskStatusUpdateManagerProcess::forward(
+ const StatusUpdate& update,
+ const Duration& duration)
+{
+ CHECK(!paused);
+
+ VLOG(1) << "Forwarding task status update " << update << " to the agent";
+
+ // Forward the update.
+ forward_(update);
+
+ // Send a message to self to resend after some delay if no ACK is received.
+ return delay(duration,
+ self(),
+ &TaskStatusUpdateManagerProcess::timeout,
+ duration).timeout();
+}
+
+
+Future<bool> TaskStatusUpdateManagerProcess::acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const UUID& uuid)
+{
+ LOG(INFO) << "Received task status update acknowledgement (UUID: " << uuid
+ << ") for task " << taskId
+ << " of framework " << frameworkId;
+
+ TaskStatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
+
+ // This might happen if we haven't completed recovery yet or if the
+ // acknowledgement is for a stream that has been cleaned up.
+ if (stream == nullptr) {
+ return Failure(
+ "Cannot find the task status update stream for task " +
+ stringify(taskId) + " of framework " + stringify(frameworkId));
+ }
+
+ // Get the corresponding update for this ACK.
+ const Result<StatusUpdate>& update = stream->next();
+ if (update.isError()) {
+ return Failure(update.error());
+ }
+
+ // This might happen if we retried a status update and got back
+ // acknowledgments for both the original and the retried update.
+ if (update.isNone()) {
+ return Failure(
+ "Unexpected task status update acknowledgment (UUID: " +
+ uuid.toString() + ") for task " + stringify(taskId) + " of framework " +
+ stringify(frameworkId));
+ }
+
+ // Handle the acknowledgement.
+ Try<bool> result =
+ stream->acknowledgement(taskId, frameworkId, uuid, update.get());
+
+ if (result.isError()) {
+ return Failure(result.error());
+ }
+
+ if (!result.get()) {
+ return Failure("Duplicate task status acknowledgement");
+ }
+
+ // Reset the timeout.
+ stream->timeout = None();
+
+ // Get the next update in the queue.
+ const Result<StatusUpdate>& next = stream->next();
+ if (next.isError()) {
+ return Failure(next.error());
+ }
+
+ bool terminated = stream->terminated;
+
+ if (terminated) {
+ if (next.isSome()) {
+ LOG(WARNING) << "Acknowledged a terminal"
+ << " task status update " << update.get()
+ << " but updates are still pending";
+ }
+ cleanupStatusUpdateStream(taskId, frameworkId);
+ } else if (!paused && next.isSome()) {
+ // Forward the next queued status update.
+ stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ }
+
+ return !terminated;
+}
+
+
+// TODO(vinod): There should be a limit on the retries.
+void TaskStatusUpdateManagerProcess::timeout(const Duration& duration)
+{
+ if (paused) {
+ return;
+ }
+
+ // Check and see if we should resend any status updates.
+ foreachkey (const FrameworkID& frameworkId, streams) {
+ foreachvalue (TaskStatusUpdateStream* stream, streams[frameworkId]) {
+ CHECK_NOTNULL(stream);
+ if (!stream->pending.empty()) {
+ CHECK_SOME(stream->timeout);
+ if (stream->timeout->expired()) {
+ const StatusUpdate& update = stream->pending.front();
+ LOG(WARNING) << "Resending task status update " << update;
+
+ // Bounded exponential backoff.
+ Duration duration_ =
+ std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX);
+
+ stream->timeout = forward(update, duration_);
+ }
+ }
+ }
+ }
+}
+
+
+TaskStatusUpdateStream*
+TaskStatusUpdateManagerProcess::createStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ bool checkpoint,
+ const Option<ExecutorID>& executorId,
+ const Option<ContainerID>& containerId)
+{
+ VLOG(1) << "Creating StatusUpdate stream for task " << taskId
+ << " of framework " << frameworkId;
+
+ TaskStatusUpdateStream* stream = new TaskStatusUpdateStream(
+ taskId, frameworkId, slaveId, flags, checkpoint, executorId, containerId);
+
+ streams[frameworkId][taskId] = stream;
+ return stream;
+}
+
+
+TaskStatusUpdateStream* TaskStatusUpdateManagerProcess::getStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId)
+{
+ if (!streams.contains(frameworkId)) {
+ return nullptr;
+ }
+
+ if (!streams[frameworkId].contains(taskId)) {
+ return nullptr;
+ }
+
+ return streams[frameworkId][taskId];
+}
+
+
+void TaskStatusUpdateManagerProcess::cleanupStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId)
+{
+ VLOG(1) << "Cleaning up status update stream"
+ << " for task " << taskId
+ << " of framework " << frameworkId;
+
+ CHECK(streams.contains(frameworkId))
+ << "Cannot find the task status update streams for framework "
+ << frameworkId;
+
+ CHECK(streams[frameworkId].contains(taskId))
+ << "Cannot find the status update streams for task " << taskId;
+
+ TaskStatusUpdateStream* stream = streams[frameworkId][taskId];
+
+ streams[frameworkId].erase(taskId);
+ if (streams[frameworkId].empty()) {
+ streams.erase(frameworkId);
+ }
+
+ delete stream;
+}
+
+
+TaskStatusUpdateManager::TaskStatusUpdateManager(const Flags& flags)
+{
+ process = new TaskStatusUpdateManagerProcess(flags);
+ spawn(process);
+}
+
+
+TaskStatusUpdateManager::~TaskStatusUpdateManager()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}
+
+
+void TaskStatusUpdateManager::initialize(
+ const function<void(StatusUpdate)>& forward)
+{
+ dispatch(process, &TaskStatusUpdateManagerProcess::initialize, forward);
+}
+
+
+Future<Nothing> TaskStatusUpdateManager::update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId,
+ const ExecutorID& executorId,
+ const ContainerID& containerId)
+{
+ return dispatch(
+ process,
+ &TaskStatusUpdateManagerProcess::update,
+ update,
+ slaveId,
+ executorId,
+ containerId);
+}
+
+
+Future<Nothing> TaskStatusUpdateManager::update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId)
+{
+ return dispatch(
+ process,
+ &TaskStatusUpdateManagerProcess::update,
+ update,
+ slaveId);
+}
+
+
+Future<bool> TaskStatusUpdateManager::acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const UUID& uuid)
+{
+ return dispatch(
+ process,
+ &TaskStatusUpdateManagerProcess::acknowledgement,
+ taskId,
+ frameworkId,
+ uuid);
+}
+
+
+Future<Nothing> TaskStatusUpdateManager::recover(
+ const string& rootDir,
+ const Option<SlaveState>& state)
+{
+ return dispatch(
+ process, &TaskStatusUpdateManagerProcess::recover, rootDir, state);
+}
+
+
+void TaskStatusUpdateManager::pause()
+{
+ dispatch(process, &TaskStatusUpdateManagerProcess::pause);
+}
+
+
+void TaskStatusUpdateManager::resume()
+{
+ dispatch(process, &TaskStatusUpdateManagerProcess::resume);
+}
+
+
+void TaskStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
+{
+ dispatch(process, &TaskStatusUpdateManagerProcess::cleanup, frameworkId);
+}
+
+
+TaskStatusUpdateStream::TaskStatusUpdateStream(
+ const TaskID& _taskId,
+ const FrameworkID& _frameworkId,
+ const SlaveID& _slaveId,
+ const Flags& _flags,
+ bool _checkpoint,
+ const Option<ExecutorID>& executorId,
+ const Option<ContainerID>& containerId)
+ : checkpoint(_checkpoint),
+ terminated(false),
+ taskId(_taskId),
+ frameworkId(_frameworkId),
+ slaveId(_slaveId),
+ flags(_flags),
+ error(None())
+{
+ if (checkpoint) {
+ CHECK_SOME(executorId);
+ CHECK_SOME(containerId);
+
+ path = paths::getTaskUpdatesPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ executorId.get(),
+ containerId.get(),
+ taskId);
+
+ // Create the base updates directory, if it doesn't exist.
+ const string& dirName = Path(path.get()).dirname();
+ Try<Nothing> directory = os::mkdir(dirName);
+ if (directory.isError()) {
+ error = "Failed to create '" + dirName + "': " + directory.error();
+ return;
+ }
+
+ // Open the updates file.
+ // NOTE: We don't use `O_SYNC` here because we only read this file
+ // if the host did not crash. `os::write` success implies the kernel
+ // will have flushed our data to the page cache. This is sufficient
+ // for the recovery scenarios we use this data for.
+ Try<int_fd> result = os::open(
+ path.get(),
+ O_CREAT | O_WRONLY | O_APPEND | O_CLOEXEC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+ if (result.isError()) {
+ error = "Failed to open '" + path.get() +
+ "' for status updates: " + result.error();
+ return;
+ }
+
+ // We keep the file open through the lifetime of the task, because it
+ // makes it easy to append status update records to the file.
+ fd = result.get();
+ }
+}
+
+
+TaskStatusUpdateStream::~TaskStatusUpdateStream()
+{
+ if (fd.isSome()) {
+ Try<Nothing> close = os::close(fd.get());
+ if (close.isError()) {
+ CHECK_SOME(path);
+ LOG(ERROR) << "Failed to close file '" << path.get() << "': "
+ << close.error();
+ }
+ }
+}
+
+
+Try<bool> TaskStatusUpdateStream::update(const StatusUpdate& update)
+{
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ if (!update.has_uuid()) {
+ return Error("Task status update is missing 'uuid'");
+ }
+
+ // Check that this status update has not already been acknowledged.
+ // This could happen in the rare case when the slave received the ACK
+ // from the framework, died, but slave's ACK to the executor never made it!
+ if (acknowledged.contains(UUID::fromBytes(update.uuid()).get())) {
+ LOG(WARNING) << "Ignoring task status update " << update
+ << " that has already been acknowledged by the framework!";
+ return false;
+ }
+
+ // Check that this update hasn't already been received.
+ // This could happen if the slave receives a status update from an executor,
+ // then crashes after it writes it to disk but before it sends an ack.
+ if (received.contains(UUID::fromBytes(update.uuid()).get())) {
+ LOG(WARNING) << "Ignoring duplicate task status update " << update;
+ return false;
+ }
+
+ // Handle the update, checkpointing if necessary.
+ Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
+ if (result.isError()) {
+ return Error(result.error());
+ }
+
+ return true;
+}
+
+
+Try<bool> TaskStatusUpdateStream::acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const UUID& uuid,
+ const StatusUpdate& update)
+{
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ if (acknowledged.contains(uuid)) {
+ LOG(WARNING) << "Duplicate task status update acknowledgment (UUID: "
+ << uuid << ") for update " << update;
+ return false;
+ }
+
+ // This might happen if we retried a status update and got back
+ // acknowledgments for both the original and the retried update.
+ if (uuid != UUID::fromBytes(update.uuid()).get()) {
+ LOG(WARNING) << "Unexpected task status update acknowledgement (received "
+ << uuid << ", expecting "
+ << UUID::fromBytes(update.uuid()).get()
+ << ") for update " << update;
+ return false;
+ }
+
+ // Handle the ACK, checkpointing if necessary.
+ Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
+ if (result.isError()) {
+ return Error(result.error());
+ }
+
+ return true;
+}
+
+
+Result<StatusUpdate> TaskStatusUpdateStream::next()
+{
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ if (!pending.empty()) {
+ return pending.front();
+ }
+
+ return None();
+}
+
+
+Try<Nothing> TaskStatusUpdateStream::replay(
+ const std::vector<StatusUpdate>& updates,
+ const hashset<UUID>& acks)
+{
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ VLOG(1) << "Replaying task status update stream for task " << taskId;
+
+ foreach (const StatusUpdate& update, updates) {
+ // Handle the update.
+ _handle(update, StatusUpdateRecord::UPDATE);
+
+ // Check if the update has an ACK too.
+ if (acks.contains(UUID::fromBytes(update.uuid()).get())) {
+ _handle(update, StatusUpdateRecord::ACK);
+ }
+ }
+
+ return Nothing();
+}
+
+
+Try<Nothing> TaskStatusUpdateStream::handle(
+ const StatusUpdate& update,
+ const StatusUpdateRecord::Type& type)
+{
+ CHECK_NONE(error);
+
+ // Checkpoint the update if necessary.
+ if (checkpoint) {
+ LOG(INFO) << "Checkpointing " << type << " for task status update "
+ << update;
+
+ CHECK_SOME(fd);
+
+ StatusUpdateRecord record;
+ record.set_type(type);
+
+ if (type == StatusUpdateRecord::UPDATE) {
+ record.mutable_update()->CopyFrom(update);
+ } else {
+ record.set_uuid(update.uuid());
+ }
+
+ Try<Nothing> write = ::protobuf::write(fd.get(), record);
+ if (write.isError()) {
+ error = "Failed to write task status update " + stringify(update) +
+ " to '" + path.get() + "': " + write.error();
+ return Error(error.get());
+ }
+ }
+
+ // Now actually handle the update.
+ _handle(update, type);
+
+ return Nothing();
+}
+
+
+void TaskStatusUpdateStream::_handle(
+ const StatusUpdate& update,
+ const StatusUpdateRecord::Type& type)
+{
+ CHECK_NONE(error);
+
+ if (type == StatusUpdateRecord::UPDATE) {
+ // Record this update.
+ received.insert(UUID::fromBytes(update.uuid()).get());
+
+ // Add it to the pending updates queue.
+ pending.push(update);
+ } else {
+ // Record this ACK.
+ acknowledged.insert(UUID::fromBytes(update.uuid()).get());
+
+ // Remove the corresponding update from the pending queue.
+ pending.pop();
+
+ if (!terminated) {
+ terminated = protobuf::isTerminalState(update.status().state());
+ }
+ }
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/slave/task_status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/task_status_update_manager.hpp b/src/slave/task_status_update_manager.hpp
new file mode 100644
index 0000000..6bdb468
--- /dev/null
+++ b/src/slave/task_status_update_manager.hpp
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __TASK_STATUS_UPDATE_MANAGER_HPP__
+#define __TASK_STATUS_UPDATE_MANAGER_HPP__
+
+#include <queue>
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/hashset.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/messages.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declarations.
+
+namespace state {
+struct SlaveState;
+}
+
+class TaskStatusUpdateManagerProcess;
+struct TaskStatusUpdateStream;
+
+
+// TaskStatusUpdateManager is responsible for
+// 1) Reliably sending status updates to the master.
+// 2) Checkpointing the update to disk (optional).
+// 3) Sending ACKs to the executor (optional).
+// 4) Receiving ACKs from the scheduler.
+class TaskStatusUpdateManager
+{
+public:
+ TaskStatusUpdateManager(const Flags& flags);
+ virtual ~TaskStatusUpdateManager();
+
+ // Expects a callback 'forward' which gets called whenever there is
+ // a new status update that needs to be forwarded to the master.
+ void initialize(const lambda::function<void(StatusUpdate)>& forward);
+
+ // TODO(vinod): Come up with better names/signatures for the
+ // checkpointing and non-checkpointing 'update()' functions.
+ // Currently, it is not obvious that one version of 'update()'
+ // does checkpointing while the other doesn't.
+
+ // Checkpoints the status update and reliably sends the
+ // update to the master (and hence the scheduler).
+ // @return Whether the update is handled successfully
+ // (e.g. checkpointed).
+ process::Future<Nothing> update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId,
+ const ExecutorID& executorId,
+ const ContainerID& containerId);
+
+ // Retries the update to the master (as long as the slave is
+ // alive), but does not checkpoint the update.
+ // @return Whether the update is handled successfully.
+ process::Future<Nothing> update(
+ const StatusUpdate& update,
+ const SlaveID& slaveId);
+
+ // Checkpoints the status update to disk if necessary.
+ // Also, sends the next pending status update, if any.
+ // @return True if the ACK is handled successfully (e.g., checkpointed)
+ // and the task's status update stream is not terminated.
+ // False same as above except the status update stream is terminated.
+ // Failed if there are any errors (e.g., duplicate, checkpointing).
+ process::Future<bool> acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const UUID& uuid);
+
+ // Recover status updates.
+ process::Future<Nothing> recover(
+ const std::string& rootDir,
+ const Option<state::SlaveState>& state);
+
+
+ // Pause sending updates.
+ // This is useful when the slave is disconnected because a
+ // disconnected slave will drop the updates.
+ void pause();
+
+ // Unpause and resend all the pending updates right away.
+ // This is useful when the updates were pending because there was
+ // no master elected (e.g., during recovery) or framework failed over.
+ void resume();
+
+ // Closes all the status update streams corresponding to this framework.
+ // NOTE: This stops retrying any pending status updates for this framework.
+ void cleanup(const FrameworkID& frameworkId);
+
+private:
+ TaskStatusUpdateManagerProcess* process;
+};
+
+
+// TaskStatusUpdateStream handles the status updates and acknowledgements
+// of a task, checkpointing them if necessary. It also holds the information
+// about received, acknowledged and pending status updates.
+// NOTE: A task is expected to have a globally unique ID across the lifetime
+// of a framework. In other words the tuple (taskId, frameworkId) should be
+// always unique.
+struct TaskStatusUpdateStream
+{
+ TaskStatusUpdateStream(const TaskID& _taskId,
+ const FrameworkID& _frameworkId,
+ const SlaveID& _slaveId,
+ const Flags& _flags,
+ bool _checkpoint,
+ const Option<ExecutorID>& executorId,
+ const Option<ContainerID>& containerId);
+
+ ~TaskStatusUpdateStream();
+
+ // This function handles the update, checkpointing if necessary.
+ // @return True if the update is successfully handled.
+ // False if the update is a duplicate.
+ // Error Any errors (e.g., checkpointing).
+ Try<bool> update(const StatusUpdate& update);
+
+ // This function handles the ACK, checkpointing if necessary.
+ // @return True if the acknowledgement is successfully handled.
+ // False if the acknowledgement is a duplicate.
+ // Error Any errors (e.g., checkpointing).
+ Try<bool> acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const UUID& uuid,
+ const StatusUpdate& update);
+
+ // Returns the next update (or none, if empty) in the queue.
+ Result<StatusUpdate> next();
+
+ // Replays the stream by sequentially handling an update and its
+ // corresponding ACK, if present.
+ Try<Nothing> replay(
+ const std::vector<StatusUpdate>& updates,
+ const hashset<UUID>& acks);
+
+ // TODO(vinod): Explore semantics to make these private.
+ const bool checkpoint;
+ bool terminated;
+ Option<process::Timeout> timeout; // Timeout for resending status update.
+ std::queue<StatusUpdate> pending;
+
+private:
+ // Handles the status update and writes it to disk, if necessary.
+ // TODO(vinod): The write has to be asynchronous to avoid status updates that
+ // are being checkpointed, blocking the processing of other updates.
+ // One solution is to wrap the protobuf::write inside async, but its probably
+ // too much of an overhead to spin up a new libprocess per status update?
+ // A better solution might be to be have async write capability for file io.
+ Try<Nothing> handle(
+ const StatusUpdate& update,
+ const StatusUpdateRecord::Type& type);
+
+ void _handle(
+ const StatusUpdate& update,
+ const StatusUpdateRecord::Type& type);
+
+ const TaskID taskId;
+ const FrameworkID frameworkId;
+ const SlaveID slaveId;
+
+ const Flags flags;
+
+ hashset<UUID> received;
+ hashset<UUID> acknowledged;
+
+ Option<std::string> path; // File path of the update stream.
+ Option<int_fd> fd; // File descriptor to the update stream.
+
+ Option<std::string> error; // Potential non-retryable error.
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+
+#endif // __TASK_STATUS_UPDATE_MANAGER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index db5e531..8997cc0 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -117,7 +117,7 @@ set(MESOS_TESTS_SRC
slave_tests.cpp
slave_validation_tests.cpp
sorter_tests.cpp
- status_update_manager_tests.cpp
+ task_status_update_manager_tests.cpp
uri_tests.cpp
uri_fetcher_tests.cpp
values_tests.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/cluster.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index 6111be4..b854904 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -84,7 +84,7 @@
#include "slave/flags.hpp"
#include "slave/gc.hpp"
#include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/fetcher.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index b3b1348..d572a09 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -58,7 +58,7 @@
#include "slave/flags.hpp"
#include "slave/gc.hpp"
#include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/fetcher.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/62d11733/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index 5dd2188..90c4369 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -29,7 +29,7 @@
#include <stout/option.hpp>
#include "slave/slave.hpp"
-#include "slave/status_update_manager.hpp"
+#include "slave/task_status_update_manager.hpp"
#include "tests/mock_slave.hpp"