You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ti...@apache.org on 2015/06/04 16:00:20 UTC
mesos git commit: Moved implementation of StatusUpdateStream to a
compilation unit.
Repository: mesos
Updated Branches:
refs/heads/master f8dc08206 -> 9f91651ba
Moved implementation of StatusUpdateStream to a compilation unit.
Review: https://reviews.apache.org/r/33358
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f91651b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f91651b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f91651b
Branch: refs/heads/master
Commit: 9f91651ba18d1584a0d4714078103cccc695de6d
Parents: f8dc082
Author: Alexander Rojas <al...@mesosphere.io>
Authored: Thu Jun 4 15:32:37 2015 +0200
Committer: Till Toenshoff <to...@me.com>
Committed: Thu Jun 4 15:59:15 2015 +0200
----------------------------------------------------------------------
src/slave/status_update_manager.cpp | 238 +++++++++++++++++++++++++++++++
src/slave/status_update_manager.hpp | 234 ++----------------------------
2 files changed, 253 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f91651b/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index fab8c22..1d7c4d0 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -26,12 +26,16 @@
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/option.hpp>
+#include <stout/os.hpp>
#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
#include <stout/utils.hpp>
#include <stout/uuid.hpp>
#include "common/protobuf_utils.hpp"
+#include "logging/logging.hpp"
+
#include "slave/constants.hpp"
#include "slave/flags.hpp"
#include "slave/slave.hpp"
@@ -633,6 +637,240 @@ void StatusUpdateManager::cleanup(const FrameworkID& frameworkId)
dispatch(process, &StatusUpdateManagerProcess::cleanup, frameworkId);
}
+
+StatusUpdateStream::StatusUpdateStream(
+ const TaskID& _taskId,
+ const FrameworkID& _frameworkId,
+ const SlaveID& _slaveId,
+ const Flags& _flags,
+ bool _checkpoint,
+ const Option<ExecutorID>& executorId,
+ const Option<ContainerID>& containerId)
+ : checkpoint(_checkpoint),
+ terminated(false),
+ taskId(_taskId),
+ frameworkId(_frameworkId),
+ slaveId(_slaveId),
+ flags(_flags),
+ error(None())
+{
+ if (checkpoint) {
+ CHECK_SOME(executorId);
+ CHECK_SOME(containerId);
+
+ path = paths::getTaskUpdatesPath(
+ paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ frameworkId,
+ executorId.get(),
+ containerId.get(),
+ taskId);
+
+ // Create the base updates directory, if it doesn't exist.
+ Try<Nothing> directory = os::mkdir(os::dirname(path.get()).get());
+ if (directory.isError()) {
+ error = "Failed to create " + os::dirname(path.get()).get();
+ return;
+ }
+
+ // Open the updates file.
+ Try<int> result = os::open(
+ path.get(),
+ O_CREAT | O_WRONLY | O_APPEND | O_SYNC | O_CLOEXEC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+ if (result.isError()) {
+ error = "Failed to open '" + path.get() + "' for status updates";
+ return;
+ }
+
+ // We keep the file open through the lifetime of the task, because it
+ // makes it easy to append status update records to the file.
+ fd = result.get();
+ }
+}
+
+
+StatusUpdateStream::~StatusUpdateStream()
+{
+ if (fd.isSome()) {
+ Try<Nothing> close = os::close(fd.get());
+ if (close.isError()) {
+ CHECK_SOME(path);
+ LOG(ERROR) << "Failed to close file '" << path.get() << "': "
+ << close.error();
+ }
+ }
+}
+
+
+Try<bool> StatusUpdateStream::update(const StatusUpdate& update)
+{
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ // Check that this status update has not already been acknowledged.
+ // This could happen in the rare case when the slave received the ACK
+ // from the framework, died, but slave's ACK to the executor never made it!
+ if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
+ LOG(WARNING) << "Ignoring status update " << update
+ << " that has already been acknowledged by the framework!";
+ return false;
+ }
+
+ // Check that this update hasn't already been received.
+ // This could happen if the slave receives a status update from an executor,
+ // then crashes after it writes it to disk but before it sends an ack.
+ if (received.contains(UUID::fromBytes(update.uuid()))) {
+ LOG(WARNING) << "Ignoring duplicate status update " << update;
+ return false;
+ }
+
+ // Handle the update, checkpointing if necessary.
+ Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
+ if (result.isError()) {
+ return Error(result.error());
+ }
+
+ return true;
+}
+
+
+Try<bool> StatusUpdateStream::acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const UUID& uuid,
+ const StatusUpdate& update)
+{
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ if (acknowledged.contains(uuid)) {
+ LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
+ << uuid << ") for update " << update;
+ return false;
+ }
+
+ // This might happen if we retried a status update and got back
+ // acknowledgments for both the original and the retried update.
+ if (uuid != UUID::fromBytes(update.uuid())) {
+ LOG(WARNING) << "Unexpected status update acknowledgement (received "
+ << uuid << ", expecting " << UUID::fromBytes(update.uuid())
+ << ") for update " << update;
+ return false;
+ }
+
+ // Handle the ACK, checkpointing if necessary.
+ Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
+ if (result.isError()) {
+ return Error(result.error());
+ }
+
+ return true;
+}
+
+
+Result<StatusUpdate> StatusUpdateStream::next()
+{
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ if (!pending.empty()) {
+ return pending.front();
+ }
+
+ return None();
+}
+
+
+Try<Nothing> StatusUpdateStream::replay(
+ const std::vector<StatusUpdate>& updates,
+ const hashset<UUID>& acks)
+{
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ VLOG(1) << "Replaying status update stream for task " << taskId;
+
+ foreach (const StatusUpdate& update, updates) {
+ // Handle the update.
+ _handle(update, StatusUpdateRecord::UPDATE);
+
+ // Check if the update has an ACK too.
+ if (acks.contains(UUID::fromBytes(update.uuid()))) {
+ _handle(update, StatusUpdateRecord::ACK);
+ }
+ }
+
+ return Nothing();
+}
+
+
+Try<Nothing> StatusUpdateStream::handle(
+ const StatusUpdate& update,
+ const StatusUpdateRecord::Type& type)
+{
+ CHECK(error.isNone());
+
+ // Checkpoint the update if necessary.
+ if (checkpoint) {
+ LOG(INFO) << "Checkpointing " << type << " for status update " << update;
+
+ CHECK_SOME(fd);
+
+ StatusUpdateRecord record;
+ record.set_type(type);
+
+ if (type == StatusUpdateRecord::UPDATE) {
+ record.mutable_update()->CopyFrom(update);
+ } else {
+ record.set_uuid(update.uuid());
+ }
+
+ Try<Nothing> write = ::protobuf::write(fd.get(), record);
+ if (write.isError()) {
+ error = "Failed to write status update " + stringify(update) +
+ " to '" + path.get() + "': " + write.error();
+ return Error(error.get());
+ }
+ }
+
+ // Now actually handle the update.
+ _handle(update, type);
+
+ return Nothing();
+}
+
+
+void StatusUpdateStream::_handle(
+ const StatusUpdate& update,
+ const StatusUpdateRecord::Type& type)
+{
+ CHECK(error.isNone());
+
+ if (type == StatusUpdateRecord::UPDATE) {
+ // Record this update.
+ received.insert(UUID::fromBytes(update.uuid()));
+
+ // Add it to the pending updates queue.
+ pending.push(update);
+ } else {
+ // Record this ACK.
+ acknowledged.insert(UUID::fromBytes(update.uuid()));
+
+ // Remove the corresponding update from the pending queue.
+ pending.pop();
+
+ if (!terminated) {
+ terminated = protobuf::isTerminalState(update.status().state());
+ }
+ }
+}
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f91651b/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index b4d91b2..88eba8c 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -19,32 +19,21 @@
#ifndef __STATUS_UPDATE_MANAGER_HPP__
#define __STATUS_UPDATE_MANAGER_HPP__
-#include <ostream>
#include <queue>
#include <string>
-#include <utility>
-#include <mesos/type_utils.hpp>
+#include <mesos/mesos.hpp>
+#include <process/future.hpp>
#include <process/pid.hpp>
-#include <process/process.hpp>
-#include <process/protobuf.hpp>
#include <process/timeout.hpp>
-#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
-#include <stout/none.hpp>
-#include <stout/nothing.hpp>
#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/stringify.hpp>
-#include <stout/utils.hpp>
+#include <stout/try.hpp>
#include <stout/uuid.hpp>
-#include "logging/logging.hpp"
-
#include "messages/messages.hpp"
#include "slave/flags.hpp"
@@ -56,9 +45,7 @@ namespace slave {
// Forward declarations.
namespace state {
-
struct SlaveState;
-
}
class StatusUpdateManagerProcess;
@@ -152,98 +139,15 @@ struct StatusUpdateStream
const Flags& _flags,
bool _checkpoint,
const Option<ExecutorID>& executorId,
- const Option<ContainerID>& containerId)
- : checkpoint(_checkpoint),
- terminated(false),
- taskId(_taskId),
- frameworkId(_frameworkId),
- slaveId(_slaveId),
- flags(_flags),
- error(None())
- {
- if (checkpoint) {
- CHECK_SOME(executorId);
- CHECK_SOME(containerId);
-
- path = paths::getTaskUpdatesPath(
- paths::getMetaRootDir(flags.work_dir),
- slaveId,
- frameworkId,
- executorId.get(),
- containerId.get(),
- taskId);
-
- // Create the base updates directory, if it doesn't exist.
- Try<Nothing> directory = os::mkdir(os::dirname(path.get()).get());
- if (directory.isError()) {
- error = "Failed to create " + os::dirname(path.get()).get();
- return;
- }
-
- // Open the updates file.
- Try<int> result = os::open(
- path.get(),
- O_CREAT | O_WRONLY | O_APPEND | O_SYNC | O_CLOEXEC,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
-
- if (result.isError()) {
- error = "Failed to open '" + path.get() + "' for status updates";
- return;
- }
-
- // We keep the file open through the lifetime of the task, because it
- // makes it easy to append status update records to the file.
- fd = result.get();
- }
- }
-
- ~StatusUpdateStream()
- {
- if (fd.isSome()) {
- Try<Nothing> close = os::close(fd.get());
- if (close.isError()) {
- CHECK_SOME(path);
- LOG(ERROR) << "Failed to close file '" << path.get() << "': "
- << close.error();
- }
- }
- }
+ const Option<ContainerID>& containerId);
+
+ ~StatusUpdateStream();
// This function handles the update, checkpointing if necessary.
// @return True if the update is successfully handled.
// False if the update is a duplicate.
// Error Any errors (e.g., checkpointing).
- Try<bool> update(const StatusUpdate& update)
- {
- if (error.isSome()) {
- return Error(error.get());
- }
-
- // Check that this status update has not already been acknowledged.
- // This could happen in the rare case when the slave received the ACK
- // from the framework, died, but slave's ACK to the executor never made it!
- if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
- LOG(WARNING) << "Ignoring status update " << update
- << " that has already been acknowledged by the framework!";
- return false;
- }
-
- // Check that this update hasn't already been received.
- // This could happen if the slave receives a status update from an executor,
- // then crashes after it writes it to disk but before it sends an ack.
- if (received.contains(UUID::fromBytes(update.uuid()))) {
- LOG(WARNING) << "Ignoring duplicate status update " << update;
- return false;
- }
-
- // Handle the update, checkpointing if necessary.
- Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
- if (result.isError()) {
- return Error(result.error());
- }
-
- return true;
- }
+ Try<bool> update(const StatusUpdate& update);
// This function handles the ACK, checkpointing if necessary.
// @return True if the acknowledgement is successfully handled.
@@ -253,74 +157,16 @@ struct StatusUpdateStream
const TaskID& taskId,
const FrameworkID& frameworkId,
const UUID& uuid,
- const StatusUpdate& update)
- {
- if (error.isSome()) {
- return Error(error.get());
- }
-
- if (acknowledged.contains(uuid)) {
- LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
- << uuid << ") for update " << update;
- return false;
- }
-
- // This might happen if we retried a status update and got back
- // acknowledgments for both the original and the retried update.
- if (uuid != UUID::fromBytes(update.uuid())) {
- LOG(WARNING) << "Unexpected status update acknowledgement (received "
- << uuid << ", expecting " << UUID::fromBytes(update.uuid())
- << ") for update " << update;
- return false;
- }
-
- // Handle the ACK, checkpointing if necessary.
- Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
- if (result.isError()) {
- return Error(result.error());
- }
-
- return true;
- }
+ const StatusUpdate& update);
// Returns the next update (or none, if empty) in the queue.
- Result<StatusUpdate> next()
- {
- if (error.isSome()) {
- return Error(error.get());
- }
-
- if (!pending.empty()) {
- return pending.front();
- }
-
- return None();
- }
+ Result<StatusUpdate> next();
// Replays the stream by sequentially handling an update and its
// corresponding ACK, if present.
Try<Nothing> replay(
const std::vector<StatusUpdate>& updates,
- const hashset<UUID>& acks)
- {
- if (error.isSome()) {
- return Error(error.get());
- }
-
- VLOG(1) << "Replaying status update stream for task " << taskId;
-
- foreach (const StatusUpdate& update, updates) {
- // Handle the update.
- _handle(update, StatusUpdateRecord::UPDATE);
-
- // Check if the update has an ACK too.
- if (acks.contains(UUID::fromBytes(update.uuid()))) {
- _handle(update, StatusUpdateRecord::ACK);
- }
- }
-
- return Nothing();
- }
+ const hashset<UUID>& acks);
// TODO(vinod): Explore semantics to make these private.
const bool checkpoint;
@@ -337,61 +183,11 @@ private:
// A better solution might be to be have async write capability for file io.
Try<Nothing> handle(
const StatusUpdate& update,
- const StatusUpdateRecord::Type& type)
- {
- CHECK(error.isNone());
-
- // Checkpoint the update if necessary.
- if (checkpoint) {
- LOG(INFO) << "Checkpointing " << type << " for status update " << update;
-
- CHECK_SOME(fd);
-
- StatusUpdateRecord record;
- record.set_type(type);
-
- if (type == StatusUpdateRecord::UPDATE) {
- record.mutable_update()->CopyFrom(update);
- } else {
- record.set_uuid(update.uuid());
- }
-
- Try<Nothing> write = ::protobuf::write(fd.get(), record);
- if (write.isError()) {
- error = "Failed to write status update " + stringify(update) +
- " to '" + path.get() + "': " + write.error();
- return Error(error.get());
- }
- }
-
- // Now actually handle the update.
- _handle(update, type);
-
- return Nothing();
- }
-
- void _handle(const StatusUpdate& update, const StatusUpdateRecord::Type& type)
- {
- CHECK(error.isNone());
-
- if (type == StatusUpdateRecord::UPDATE) {
- // Record this update.
- received.insert(UUID::fromBytes(update.uuid()));
-
- // Add it to the pending updates queue.
- pending.push(update);
- } else {
- // Record this ACK.
- acknowledged.insert(UUID::fromBytes(update.uuid()));
-
- // Remove the corresponding update from the pending queue.
- pending.pop();
-
- if (!terminated) {
- terminated = protobuf::isTerminalState(update.status().state());
- }
- }
- }
+ const StatusUpdateRecord::Type& type);
+
+ void _handle(
+ const StatusUpdate& update,
+ const StatusUpdateRecord::Type& type);
const TaskID taskId;
const FrameworkID frameworkId;