You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ti...@apache.org on 2015/06/04 16:00:20 UTC

mesos git commit: Moved implementation of StatusUpdateStream to a compilation unit.

Repository: mesos
Updated Branches:
  refs/heads/master f8dc08206 -> 9f91651ba


Moved implementation of StatusUpdateStream to a compilation unit.

Review: https://reviews.apache.org/r/33358


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f91651b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f91651b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f91651b

Branch: refs/heads/master
Commit: 9f91651ba18d1584a0d4714078103cccc695de6d
Parents: f8dc082
Author: Alexander Rojas <al...@mesosphere.io>
Authored: Thu Jun 4 15:32:37 2015 +0200
Committer: Till Toenshoff <to...@me.com>
Committed: Thu Jun 4 15:59:15 2015 +0200

----------------------------------------------------------------------
 src/slave/status_update_manager.cpp | 238 +++++++++++++++++++++++++++++++
 src/slave/status_update_manager.hpp | 234 ++----------------------------
 2 files changed, 253 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9f91651b/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index fab8c22..1d7c4d0 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -26,12 +26,16 @@
 #include <stout/hashset.hpp>
 #include <stout/lambda.hpp>
 #include <stout/option.hpp>
+#include <stout/os.hpp>
 #include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
 #include <stout/utils.hpp>
 #include <stout/uuid.hpp>
 
 #include "common/protobuf_utils.hpp"
 
+#include "logging/logging.hpp"
+
 #include "slave/constants.hpp"
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
@@ -633,6 +637,240 @@ void StatusUpdateManager::cleanup(const FrameworkID& frameworkId)
   dispatch(process, &StatusUpdateManagerProcess::cleanup, frameworkId);
 }
 
+
+StatusUpdateStream::StatusUpdateStream(
+    const TaskID& _taskId,
+    const FrameworkID& _frameworkId,
+    const SlaveID& _slaveId,
+    const Flags& _flags,
+    bool _checkpoint,
+    const Option<ExecutorID>& executorId,
+    const Option<ContainerID>& containerId)
+    : checkpoint(_checkpoint),
+      terminated(false),
+      taskId(_taskId),
+      frameworkId(_frameworkId),
+      slaveId(_slaveId),
+      flags(_flags),
+      error(None())
+{
+  if (checkpoint) {
+    CHECK_SOME(executorId);
+    CHECK_SOME(containerId);
+
+    path = paths::getTaskUpdatesPath(
+        paths::getMetaRootDir(flags.work_dir),
+        slaveId,
+        frameworkId,
+        executorId.get(),
+        containerId.get(),
+        taskId);
+
+    // Create the base updates directory, if it doesn't exist.
+    Try<Nothing> directory = os::mkdir(os::dirname(path.get()).get());
+    if (directory.isError()) {
+      error = "Failed to create " + os::dirname(path.get()).get();
+      return;
+    }
+
+    // Open the updates file.
+    Try<int> result = os::open(
+        path.get(),
+        O_CREAT | O_WRONLY | O_APPEND | O_SYNC | O_CLOEXEC,
+        S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+    if (result.isError()) {
+      error = "Failed to open '" + path.get() + "' for status updates";
+      return;
+    }
+
+    // We keep the file open through the lifetime of the task, because it
+    // makes it easy to append status update records to the file.
+    fd = result.get();
+  }
+}
+
+
+StatusUpdateStream::~StatusUpdateStream()
+{
+  if (fd.isSome()) {
+    Try<Nothing> close = os::close(fd.get());
+    if (close.isError()) {
+      CHECK_SOME(path);
+      LOG(ERROR) << "Failed to close file '" << path.get() << "': "
+                 << close.error();
+    }
+  }
+}
+
+
+Try<bool> StatusUpdateStream::update(const StatusUpdate& update)
+{
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  // Check that this status update has not already been acknowledged.
+  // This could happen in the rare case when the slave received the ACK
+  // from the framework, died, but slave's ACK to the executor never made it!
+  if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
+    LOG(WARNING) << "Ignoring status update " << update
+                 << " that has already been acknowledged by the framework!";
+    return false;
+  }
+
+  // Check that this update hasn't already been received.
+  // This could happen if the slave receives a status update from an executor,
+  // then crashes after it writes it to disk but before it sends an ack.
+  if (received.contains(UUID::fromBytes(update.uuid()))) {
+    LOG(WARNING) << "Ignoring duplicate status update " << update;
+    return false;
+  }
+
+  // Handle the update, checkpointing if necessary.
+  Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
+  if (result.isError()) {
+    return Error(result.error());
+  }
+
+  return true;
+}
+
+
+Try<bool> StatusUpdateStream::acknowledgement(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const UUID& uuid,
+    const StatusUpdate& update)
+{
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  if (acknowledged.contains(uuid)) {
+    LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
+                  << uuid << ") for update " << update;
+    return false;
+  }
+
+  // This might happen if we retried a status update and got back
+  // acknowledgments for both the original and the retried update.
+  if (uuid != UUID::fromBytes(update.uuid())) {
+    LOG(WARNING) << "Unexpected status update acknowledgement (received "
+                 << uuid << ", expecting " << UUID::fromBytes(update.uuid())
+                 << ") for update " << update;
+    return false;
+  }
+
+  // Handle the ACK, checkpointing if necessary.
+  Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
+  if (result.isError()) {
+    return Error(result.error());
+  }
+
+  return true;
+}
+
+
+Result<StatusUpdate> StatusUpdateStream::next()
+{
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  if (!pending.empty()) {
+    return pending.front();
+  }
+
+  return None();
+}
+
+
+Try<Nothing> StatusUpdateStream::replay(
+    const std::vector<StatusUpdate>& updates,
+    const hashset<UUID>& acks)
+{
+  if (error.isSome()) {
+    return Error(error.get());
+  }
+
+  VLOG(1) << "Replaying status update stream for task " << taskId;
+
+  foreach (const StatusUpdate& update, updates) {
+    // Handle the update.
+    _handle(update, StatusUpdateRecord::UPDATE);
+
+    // Check if the update has an ACK too.
+    if (acks.contains(UUID::fromBytes(update.uuid()))) {
+      _handle(update, StatusUpdateRecord::ACK);
+    }
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing> StatusUpdateStream::handle(
+    const StatusUpdate& update,
+    const StatusUpdateRecord::Type& type)
+{
+  CHECK(error.isNone());
+
+  // Checkpoint the update if necessary.
+  if (checkpoint) {
+    LOG(INFO) << "Checkpointing " << type << " for status update " << update;
+
+    CHECK_SOME(fd);
+
+    StatusUpdateRecord record;
+    record.set_type(type);
+
+    if (type == StatusUpdateRecord::UPDATE) {
+      record.mutable_update()->CopyFrom(update);
+    } else {
+      record.set_uuid(update.uuid());
+    }
+
+    Try<Nothing> write = ::protobuf::write(fd.get(), record);
+    if (write.isError()) {
+      error = "Failed to write status update " + stringify(update) +
+              " to '" + path.get() + "': " + write.error();
+      return Error(error.get());
+    }
+  }
+
+  // Now actually handle the update.
+  _handle(update, type);
+
+  return Nothing();
+}
+
+
+void StatusUpdateStream::_handle(
+    const StatusUpdate& update,
+    const StatusUpdateRecord::Type& type)
+{
+  CHECK(error.isNone());
+
+  if (type == StatusUpdateRecord::UPDATE) {
+    // Record this update.
+    received.insert(UUID::fromBytes(update.uuid()));
+
+    // Add it to the pending updates queue.
+    pending.push(update);
+  } else {
+    // Record this ACK.
+    acknowledged.insert(UUID::fromBytes(update.uuid()));
+
+    // Remove the corresponding update from the pending queue.
+    pending.pop();
+
+    if (!terminated) {
+      terminated = protobuf::isTerminalState(update.status().state());
+    }
+  }
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f91651b/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index b4d91b2..88eba8c 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -19,32 +19,21 @@
 #ifndef __STATUS_UPDATE_MANAGER_HPP__
 #define __STATUS_UPDATE_MANAGER_HPP__
 
-#include <ostream>
 #include <queue>
 #include <string>
-#include <utility>
 
-#include <mesos/type_utils.hpp>
+#include <mesos/mesos.hpp>
 
+#include <process/future.hpp>
 #include <process/pid.hpp>
-#include <process/process.hpp>
-#include <process/protobuf.hpp>
 #include <process/timeout.hpp>
 
-#include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/lambda.hpp>
-#include <stout/none.hpp>
-#include <stout/nothing.hpp>
 #include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/stringify.hpp>
-#include <stout/utils.hpp>
+#include <stout/try.hpp>
 #include <stout/uuid.hpp>
 
-#include "logging/logging.hpp"
-
 #include "messages/messages.hpp"
 
 #include "slave/flags.hpp"
@@ -56,9 +45,7 @@ namespace slave {
 // Forward declarations.
 
 namespace state {
-
 struct SlaveState;
-
 }
 
 class StatusUpdateManagerProcess;
@@ -152,98 +139,15 @@ struct StatusUpdateStream
                      const Flags& _flags,
                      bool _checkpoint,
                      const Option<ExecutorID>& executorId,
-                     const Option<ContainerID>& containerId)
-    : checkpoint(_checkpoint),
-      terminated(false),
-      taskId(_taskId),
-      frameworkId(_frameworkId),
-      slaveId(_slaveId),
-      flags(_flags),
-      error(None())
-  {
-    if (checkpoint) {
-      CHECK_SOME(executorId);
-      CHECK_SOME(containerId);
-
-      path = paths::getTaskUpdatesPath(
-          paths::getMetaRootDir(flags.work_dir),
-          slaveId,
-          frameworkId,
-          executorId.get(),
-          containerId.get(),
-          taskId);
-
-      // Create the base updates directory, if it doesn't exist.
-      Try<Nothing> directory = os::mkdir(os::dirname(path.get()).get());
-      if (directory.isError()) {
-        error = "Failed to create " + os::dirname(path.get()).get();
-        return;
-      }
-
-      // Open the updates file.
-      Try<int> result = os::open(
-          path.get(),
-          O_CREAT | O_WRONLY | O_APPEND | O_SYNC | O_CLOEXEC,
-          S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
-
-      if (result.isError()) {
-        error = "Failed to open '" + path.get() + "' for status updates";
-        return;
-      }
-
-      // We keep the file open through the lifetime of the task, because it
-      // makes it easy to append status update records to the file.
-      fd = result.get();
-    }
-  }
-
-  ~StatusUpdateStream()
-  {
-    if (fd.isSome()) {
-      Try<Nothing> close = os::close(fd.get());
-      if (close.isError()) {
-        CHECK_SOME(path);
-        LOG(ERROR) << "Failed to close file '" << path.get() << "': "
-                   << close.error();
-      }
-    }
-  }
+                     const Option<ContainerID>& containerId);
+
+  ~StatusUpdateStream();
 
   // This function handles the update, checkpointing if necessary.
   // @return   True if the update is successfully handled.
   //           False if the update is a duplicate.
   //           Error Any errors (e.g., checkpointing).
-  Try<bool> update(const StatusUpdate& update)
-  {
-    if (error.isSome()) {
-      return Error(error.get());
-    }
-
-    // Check that this status update has not already been acknowledged.
-    // This could happen in the rare case when the slave received the ACK
-    // from the framework, died, but slave's ACK to the executor never made it!
-    if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
-      LOG(WARNING) << "Ignoring status update " << update
-                   << " that has already been acknowledged by the framework!";
-      return false;
-    }
-
-    // Check that this update hasn't already been received.
-    // This could happen if the slave receives a status update from an executor,
-    // then crashes after it writes it to disk but before it sends an ack.
-    if (received.contains(UUID::fromBytes(update.uuid()))) {
-      LOG(WARNING) << "Ignoring duplicate status update " << update;
-      return false;
-    }
-
-    // Handle the update, checkpointing if necessary.
-    Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
-    if (result.isError()) {
-      return Error(result.error());
-    }
-
-    return true;
-  }
+  Try<bool> update(const StatusUpdate& update);
 
   // This function handles the ACK, checkpointing if necessary.
   // @return   True if the acknowledgement is successfully handled.
@@ -253,74 +157,16 @@ struct StatusUpdateStream
       const TaskID& taskId,
       const FrameworkID& frameworkId,
       const UUID& uuid,
-      const StatusUpdate& update)
-  {
-    if (error.isSome()) {
-      return Error(error.get());
-    }
-
-    if (acknowledged.contains(uuid)) {
-      LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
-                    << uuid << ") for update " << update;
-      return false;
-    }
-
-    // This might happen if we retried a status update and got back
-    // acknowledgments for both the original and the retried update.
-    if (uuid != UUID::fromBytes(update.uuid())) {
-      LOG(WARNING) << "Unexpected status update acknowledgement (received "
-                   << uuid << ", expecting " << UUID::fromBytes(update.uuid())
-                   << ") for update " << update;
-      return false;
-    }
-
-    // Handle the ACK, checkpointing if necessary.
-    Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
-    if (result.isError()) {
-      return Error(result.error());
-    }
-
-    return true;
-  }
+      const StatusUpdate& update);
 
   // Returns the next update (or none, if empty) in the queue.
-  Result<StatusUpdate> next()
-  {
-    if (error.isSome()) {
-      return Error(error.get());
-    }
-
-    if (!pending.empty()) {
-      return pending.front();
-    }
-
-    return None();
-  }
+  Result<StatusUpdate> next();
 
   // Replays the stream by sequentially handling an update and its
   // corresponding ACK, if present.
   Try<Nothing> replay(
       const std::vector<StatusUpdate>& updates,
-      const hashset<UUID>& acks)
-  {
-    if (error.isSome()) {
-      return Error(error.get());
-    }
-
-    VLOG(1) << "Replaying status update stream for task " << taskId;
-
-    foreach (const StatusUpdate& update, updates) {
-      // Handle the update.
-      _handle(update, StatusUpdateRecord::UPDATE);
-
-      // Check if the update has an ACK too.
-      if (acks.contains(UUID::fromBytes(update.uuid()))) {
-        _handle(update, StatusUpdateRecord::ACK);
-      }
-    }
-
-    return Nothing();
-  }
+      const hashset<UUID>& acks);
 
   // TODO(vinod): Explore semantics to make these private.
   const bool checkpoint;
@@ -337,61 +183,11 @@ private:
   // A better solution might be to be have async write capability for file io.
   Try<Nothing> handle(
       const StatusUpdate& update,
-      const StatusUpdateRecord::Type& type)
-  {
-    CHECK(error.isNone());
-
-    // Checkpoint the update if necessary.
-    if (checkpoint) {
-      LOG(INFO) << "Checkpointing " << type << " for status update " << update;
-
-      CHECK_SOME(fd);
-
-      StatusUpdateRecord record;
-      record.set_type(type);
-
-      if (type == StatusUpdateRecord::UPDATE) {
-        record.mutable_update()->CopyFrom(update);
-      } else {
-        record.set_uuid(update.uuid());
-      }
-
-      Try<Nothing> write = ::protobuf::write(fd.get(), record);
-      if (write.isError()) {
-        error = "Failed to write status update " + stringify(update) +
-                " to '" + path.get() + "': " + write.error();
-        return Error(error.get());
-      }
-    }
-
-    // Now actually handle the update.
-    _handle(update, type);
-
-    return Nothing();
-  }
-
-  void _handle(const StatusUpdate& update, const StatusUpdateRecord::Type& type)
-  {
-    CHECK(error.isNone());
-
-    if (type == StatusUpdateRecord::UPDATE) {
-      // Record this update.
-      received.insert(UUID::fromBytes(update.uuid()));
-
-      // Add it to the pending updates queue.
-      pending.push(update);
-    } else {
-      // Record this ACK.
-      acknowledged.insert(UUID::fromBytes(update.uuid()));
-
-      // Remove the corresponding update from the pending queue.
-      pending.pop();
-
-      if (!terminated) {
-        terminated = protobuf::isTerminalState(update.status().state());
-      }
-    }
-  }
+      const StatusUpdateRecord::Type& type);
+
+  void _handle(
+      const StatusUpdate& update,
+      const StatusUpdateRecord::Type& type);
 
   const TaskID taskId;
   const FrameworkID frameworkId;