You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/03/13 07:23:00 UTC

svn commit: r1455810 - in /incubator/mesos/trunk: src/ src/common/ src/local/ src/messages/ src/slave/ third_party/libprocess/include/process/

Author: vinodkone
Date: Wed Mar 13 06:23:00 2013
New Revision: 1455810

URL: http://svn.apache.org/r1455810
Log:
Added status update manager.

Review: https://reviews.apache.org/r/7212

Added:
    incubator/mesos/trunk/src/slave/status_update_manager.cpp
    incubator/mesos/trunk/src/slave/status_update_manager.hpp
Modified:
    incubator/mesos/trunk/src/Makefile.am
    incubator/mesos/trunk/src/common/protobuf_utils.hpp
    incubator/mesos/trunk/src/common/type_utils.hpp
    incubator/mesos/trunk/src/local/local.cpp
    incubator/mesos/trunk/src/messages/messages.proto
    incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp

Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Wed Mar 13 06:23:00 2013
@@ -161,7 +161,8 @@ libmesos_no_third_party_la_SOURCES =				
 	master/constants.cpp						\
 	master/drf_sorter.cpp						\
 	master/frameworks_manager.cpp					\
-	master/http.cpp master/master.cpp				\
+	master/http.cpp							\
+	master/master.cpp						\
 	master/slaves_manager.cpp					\
 	slave/constants.cpp						\
 	slave/gc.cpp							\
@@ -172,6 +173,7 @@ libmesos_no_third_party_la_SOURCES =				
 	slave/isolation_module.cpp					\
 	slave/process_based_isolation_module.cpp			\
 	slave/reaper.cpp						\
+	slave/status_update_manager.cpp					\
 	launcher/launcher.cpp						\
 	exec/exec.cpp							\
 	common/lock.cpp							\
@@ -233,6 +235,7 @@ libmesos_no_third_party_la_SOURCES += co
 	slave/cgroups_isolation_module.hpp				\
 	slave/lxc_isolation_module.hpp					\
 	slave/paths.hpp slave/state.hpp					\
+	slave/status_update_manager.hpp					\
 	slave/process_based_isolation_module.hpp slave/reaper.hpp	\
 	slave/slave.hpp slave/solaris_project_isolation_module.hpp	\
 	tests/environment.hpp tests/script.hpp				\

Modified: incubator/mesos/trunk/src/common/protobuf_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/protobuf_utils.hpp?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/protobuf_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/protobuf_utils.hpp Wed Mar 13 06:23:00 2013
@@ -22,6 +22,7 @@
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
+#include <stout/none.hpp>
 #include <stout/uuid.hpp>
 
 #include "common/type_utils.hpp"
@@ -46,26 +47,25 @@ inline StatusUpdate createStatusUpdate(
     const SlaveID& slaveId,
     const TaskID& taskId,
     const TaskState& state,
-    const std::string& message,
-    const ExecutorID& executorId = ExecutorID())
+    const std::string& message = "",
+    const Option<ExecutorID>& executorId = None())
 {
   StatusUpdate update;
 
+  update.set_timestamp(process::Clock::now());
+  update.set_uuid(UUID::random().toBytes());
   update.mutable_framework_id()->MergeFrom(frameworkId);
+  update.mutable_slave_id()->MergeFrom(slaveId);
 
-  if (!(executorId == "")) {
-    update.mutable_executor_id()->MergeFrom(executorId);
+  if (executorId.isSome()) {
+    update.mutable_executor_id()->MergeFrom(executorId.get());
   }
 
-  update.mutable_slave_id()->MergeFrom(slaveId);
   TaskStatus* status = update.mutable_status();
   status->mutable_task_id()->MergeFrom(taskId);
   status->set_state(state);
   status->set_message(message);
 
-  update.set_timestamp(::process::Clock::now());
-  update.set_uuid(UUID::random().toBytes());
-
   return update;
 }
 

Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Wed Mar 13 06:23:00 2013
@@ -34,52 +34,49 @@
 
 namespace mesos {
 
-inline std::ostream& operator << (std::ostream& stream, const FrameworkID& frameworkId)
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const FrameworkID& frameworkId)
 {
-  stream << frameworkId.value();
-  return stream;
+  return stream << frameworkId.value();
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const OfferID& offerId)
 {
-  stream << offerId.value();
-  return stream;
+  return stream << offerId.value();
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const SlaveID& slaveId)
 {
-  stream << slaveId.value();
-  return stream;
+  return stream << slaveId.value();
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const TaskID& taskId)
 {
-  stream << taskId.value();
-  return stream;
+  return stream << taskId.value();
 }
 
 
-inline std::ostream& operator << (std::ostream& stream, const ExecutorID& executorId)
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const ExecutorID& executorId)
 {
-  stream << executorId.value();
-  return stream;
+  return stream << executorId.value();
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const TaskState& state)
 {
-  stream << TaskState_descriptor()->FindValueByNumber(state)->name();
-  return stream;
+  return stream << TaskState_descriptor()->FindValueByNumber(state)->name();
 }
 
 
 inline std::ostream& operator << (std::ostream& stream, const TaskInfo& task)
 {
-  stream << task.DebugString();
-  return stream;
+  return stream << task.DebugString();
 }
 
 
@@ -205,8 +202,9 @@ inline bool operator == (const Environme
 }
 
 
-inline bool operator == (const CommandInfo::URI& left,
-                         const CommandInfo::URI& right)
+inline bool operator == (
+    const CommandInfo::URI& left,
+    const CommandInfo::URI& right)
 {
   return left.has_executable() == right.has_executable() &&
     (!left.has_executable() || (left.executable() == right.executable())) &&
@@ -299,10 +297,23 @@ inline std::size_t hash_value(const Exec
 
 namespace internal {
 
-inline std::ostream& operator << (std::ostream& stream, const Task* task)
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const StatusUpdate& update)
 {
-  stream << "task " << task->framework_id() << ":" << task->task_id();
-  return stream;
+  return stream
+    << update.status().state()
+    << " from task " << update.status().task_id()
+    << " of framework " << update.framework_id();
+}
+
+
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const StatusUpdateRecord::Type& type)
+{
+  return stream
+    << StatusUpdateRecord::Type_descriptor()->FindValueByNumber(type)->name();
 }
 
 }} // namespace mesos { namespace internal {

Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Wed Mar 13 06:23:00 2013
@@ -132,6 +132,10 @@ PID<Master> launch(const Configuration& 
     // TODO(benh): Create a local isolation module?
     ProcessBasedIsolationModule* isolationModule =
       new ProcessBasedIsolationModule();
+
+    // Use a different work directory for each slave.
+    flags.work_dir = path::join(flags.work_dir, stringify(i));
+
     Slave* slave = new Slave(flags, true, isolationModule, files);
     slaves[isolationModule] = slave;
     pids.push_back(process::spawn(slave));

Modified: incubator/mesos/trunk/src/messages/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/messages.proto?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/messages.proto (original)
+++ incubator/mesos/trunk/src/messages/messages.proto Wed Mar 13 06:23:00 2013
@@ -59,6 +59,20 @@ message StatusUpdate {
 }
 
 
+// This message encapsulates how we checkpoint a status update to disk.
+// NOTE: If type == UPDATE, the 'update' field is required.
+// NOTE: If type == ACK, the 'uuid' field is required.
+message StatusUpdateRecord {
+  enum Type {
+    UPDATE = 0;
+    ACK = 1;
+  }
+  required Type type = 1;
+  optional StatusUpdate update = 2;
+  optional bytes uuid = 3;
+}
+
+
 message Slaves
 {
   repeated SlaveInfo infos = 1;

Added: incubator/mesos/trunk/src/slave/status_update_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.cpp?rev=1455810&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.cpp (added)
+++ incubator/mesos/trunk/src/slave/status_update_manager.cpp Wed Mar 13 06:23:00 2013
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/delay.hpp>
+#include <process/process.hpp>
+#include <process/timer.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "stout/foreach.hpp"
+#include "stout/hashmap.hpp"
+#include "stout/hashset.hpp"
+#include "stout/protobuf.hpp"
+#include "stout/utils.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/slave.hpp"
+#include "slave/status_update_manager.hpp"
+
+using std::string;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class StatusUpdateManagerProcess
+  : public ProtobufProcess<StatusUpdateManagerProcess>
+{
+public:
+  StatusUpdateManagerProcess() {}
+  virtual ~StatusUpdateManagerProcess();
+
+  void initialize(const PID<Slave>& slave);
+
+  Try<Nothing> update(
+      const StatusUpdate& update,
+      bool checkpoint,
+      const Option<std::string>& path);
+
+  Try<Nothing> acknowledgement(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const string& uuid);
+
+  void newMasterDetected(const UPID& pid);
+
+  void cleanup(const FrameworkID& frameworkId);
+
+private:
+  // Status update timeout.
+  void timeout();
+
+  // Forwards the status update to the master and starts a timer to check
+  // for ACK from the scheduler.
+  // NOTE: This should only be used for those messages that expect an
+  // ACK (e.g updates from the executor).
+  Timeout forward(const StatusUpdate& update);
+
+  // Helper functions.
+
+  // Creates a new status update stream (opening the updates file, if path is
+  // present) and adds it to streams.
+  StatusUpdateStream* createStatusUpdateStream(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const Option<std::string>& path);
+
+  StatusUpdateStream* getStatusUpdateStream(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId);
+
+  void cleanupStatusUpdateStream(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId);
+
+  UPID master;
+  PID<Slave> slave;
+  hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams;
+};
+
+
+StatusUpdateManagerProcess::~StatusUpdateManagerProcess()
+{
+  foreachkey (const FrameworkID& frameworkId, streams) {
+    foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
+      delete stream;
+    }
+  }
+  streams.clear();
+}
+
+
+void StatusUpdateManagerProcess::initialize(const process::PID<Slave>& _slave)
+{
+  slave = _slave;
+}
+
+
+void StatusUpdateManagerProcess::newMasterDetected(const UPID& pid)
+{
+  LOG(INFO) << "New master detected at " << pid;
+  master = pid;
+}
+
+
+void StatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId)
+{
+  LOG(INFO) << "Closing status update streams for framework " << frameworkId;
+
+  if (streams.contains(frameworkId)) {
+    foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) {
+      cleanupStatusUpdateStream(taskId, frameworkId);
+    }
+  }
+}
+
+
+Try<Nothing> StatusUpdateManagerProcess::update(
+    const StatusUpdate& update,
+    bool checkpoint,
+    const Option<string>& path)
+{
+  CHECK(!checkpoint || path.isSome())
+    << "Asked to checkpoint update " << update << " without providing a path";
+
+  const TaskID& taskId = update.status().task_id();
+  const FrameworkID& frameworkId = update.framework_id();
+
+  LOG(INFO) << "Received status update " << update;
+
+  // Write the status update to disk and enqueue it to send it to the master.
+  // Create/Get the status update stream for this task.
+  StatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
+  if (stream == NULL) {
+    stream = createStatusUpdateStream(taskId, frameworkId, path);
+  }
+
+  // Handle the status update.
+  Try<Nothing> result = stream->update(update);
+  if (result.isError()) {
+    return result;
+  }
+
+  // Forward the status update to the master if this is the first in the stream.
+  // Subsequent status updates will get sent in 'acknowledgement()'.
+  if (stream->pending.size() == 1) {
+    CHECK(stream->timeout.isNone());
+    const Result<StatusUpdate>& next = stream->next();
+    if (next.isError()) {
+      return Error(next.error());
+    }
+
+    CHECK_SOME(next);
+    stream->timeout = forward(next.get());
+  }
+
+  return Nothing();
+}
+
+
+Timeout StatusUpdateManagerProcess::forward(const StatusUpdate& update)
+{
+  LOG(INFO) << "Forwarding status update " << update
+            << " to the master at " << master;
+
+  StatusUpdateMessage message;
+  message.mutable_update()->MergeFrom(update);
+  message.set_pid(slave); // The ACK will be first received by the slave.
+
+  send(master, message);
+
+  // Send a message to self to resend after some delay if no ACK is received.
+  return delay(STATUS_UPDATE_RETRY_INTERVAL,
+               self(),
+               &StatusUpdateManagerProcess::timeout).timeout();
+}
+
+
+Try<Nothing> StatusUpdateManagerProcess::acknowledgement(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const string& uuid)
+{
+  LOG(INFO) << "Received status update acknowledgement"
+            << " for task " << taskId
+            << " of framework " << frameworkId;
+
+  StatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
+
+  // This might happen if we haven't completed recovery yet or if the
+  // acknowledgement is for a stream that has been cleaned up.
+  if (stream == NULL) {
+    return Error(
+        "Cannot find the status update stream for task " + stringify(taskId) +
+        " of framework " + stringify(frameworkId));
+  }
+
+  // Get the corresponding update for this ACK.
+  const Result<StatusUpdate>& update = stream->next();
+  if (update.isError()) {
+    return Error(update.error());
+  }
+
+  // This might happen if we retried a status update and got back
+  // acknowledgments for both the original and the retried update.
+  if (update.isNone()) {
+    LOG(WARNING) << "Ignoring unexpected status update acknowledgment"
+                 << " for task " << taskId
+                 << " of framework " << frameworkId;
+    return Nothing();
+  }
+
+  // Handle the acknowledgement.
+  Try<Nothing> result =
+    stream->acknowledgement(taskId, frameworkId, uuid, update.get());
+
+  if (result.isError()) {
+    return result;
+  }
+
+  // Reset the timeout.
+  stream->timeout = None();
+
+  // Get the next update in the queue.
+  const Result<StatusUpdate>& next = stream->next();
+  if (next.isError()) {
+    return Error(next.error());
+  }
+
+  if (protobuf::isTerminalState(update.get().status().state())) {
+    if (next.isSome()) {
+      LOG(WARNING) << "Acknowledged a terminal"
+                   << " status update " << update.get()
+                   << " but updates are still pending";
+    }
+    cleanupStatusUpdateStream(taskId, frameworkId);
+  } else if (next.isSome()) {
+    // Forward the next queued status update.
+    stream->timeout = forward(next.get());
+  }
+
+  return Nothing();
+}
+
+
+// TODO(vinod): There should be a limit on the retries.
+void StatusUpdateManagerProcess::timeout()
+{
+  LOG(INFO) << "Checking for unacknowledged status updates";
+  // Check and see if we should resend any status updates.
+  foreachkey (const FrameworkID& frameworkId, streams) {
+    foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
+      CHECK_NOTNULL(stream);
+      if (!stream->pending.empty()) {
+        CHECK(stream->timeout.isSome());
+        if (stream->timeout.get().expired()) {
+          const StatusUpdate& update = stream->pending.front();
+          LOG(WARNING) << "Resending status update " << update;
+          stream->timeout = forward(update);
+        }
+      }
+    }
+  }
+}
+
+
+StatusUpdateStream* StatusUpdateManagerProcess::createStatusUpdateStream(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const Option<string>& path)
+{
+  LOG(INFO) << "Creating StatusUpdate stream for task " << taskId
+            << " of framework " << frameworkId;
+
+  StatusUpdateStream* stream =
+    new StatusUpdateStream(taskId, frameworkId, path);
+
+  streams[frameworkId][taskId] = stream;
+  return stream;
+}
+
+
+StatusUpdateStream* StatusUpdateManagerProcess::getStatusUpdateStream(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId)
+{
+  if (!streams.contains(frameworkId)) {
+    return NULL;
+  }
+
+  if (!streams[frameworkId].contains(taskId)) {
+    return NULL;
+  }
+
+  return streams[frameworkId][taskId];
+}
+
+
+void StatusUpdateManagerProcess::cleanupStatusUpdateStream(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId)
+{
+  LOG(INFO) << "Cleaning up status update stream"
+            << " for task " << taskId
+            << " of framework " << frameworkId;
+
+  CHECK(streams.contains(frameworkId))
+    << "Cannot find the status update streams for framework " << frameworkId;
+
+  CHECK(streams[frameworkId].contains(taskId))
+    << "Cannot find the status update streams for task " << taskId;
+
+  StatusUpdateStream* stream = streams[frameworkId][taskId];
+
+  streams[frameworkId].erase(taskId);
+  if (streams[frameworkId].empty()) {
+    streams.erase(frameworkId);
+  }
+
+  delete stream;
+}
+
+
+StatusUpdateManager::StatusUpdateManager()
+{
+  process = new StatusUpdateManagerProcess();
+  spawn(process);
+}
+
+
+StatusUpdateManager::~StatusUpdateManager()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}
+
+
+void StatusUpdateManager::initialize(const PID<Slave>& slave)
+{
+  dispatch(process, &StatusUpdateManagerProcess::initialize, slave);
+}
+
+
+Future<Try<Nothing> > StatusUpdateManager::update(
+    const StatusUpdate& update,
+    bool checkpoint,
+    const Option<std::string>& path)
+{
+  return dispatch(
+      process, &StatusUpdateManagerProcess::update, update, checkpoint, path);
+}
+
+
+Future<Try<Nothing> > StatusUpdateManager::acknowledgement(
+    const TaskID& taskId,
+    const FrameworkID& frameworkId,
+    const std::string& uuid)
+{
+  return dispatch(
+      process,
+      &StatusUpdateManagerProcess::acknowledgement,
+      taskId,
+      frameworkId,
+      uuid);
+}
+
+
+void StatusUpdateManager::newMasterDetected(const UPID& pid)
+{
+  dispatch(process, &StatusUpdateManagerProcess::newMasterDetected, pid);
+}
+
+
+void StatusUpdateManager::cleanup(const FrameworkID& frameworkId)
+{
+  dispatch(process, &StatusUpdateManagerProcess::cleanup, frameworkId);
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

Added: incubator/mesos/trunk/src/slave/status_update_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.hpp?rev=1455810&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.hpp (added)
+++ incubator/mesos/trunk/src/slave/status_update_manager.hpp Wed Mar 13 06:23:00 2013
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STATUS_UPDATE_MANAGER_HPP__
+#define __STATUS_UPDATE_MANAGER_HPP__
+
+#include <ostream>
+#include <queue>
+#include <string>
+#include <utility>
+
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/utils.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "messages/messages.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declarations.
+class StatusUpdateManagerProcess;
+struct StatusUpdateStream;
+
+
+// StatusUpdateManager is responsible for
+// 1) Reliably sending status updates to the master (and hence, the scheduler).
+// 2) Checkpointing the update to disk (optional).
+// 3) Sending ACKs to the executor (optional).
+// 4) Receiving ACKs from the scheduler.
+class StatusUpdateManager
+{
+public:
+  StatusUpdateManager();
+  virtual ~StatusUpdateManager();
+
+  void initialize(const PID<Slave>& slave);
+
+  // Enqueues the status update to reliably send the update to the master.
+  // If 'path' is provided, the update is also checkpointed to the given path.
+  // @return A future indicating whether the update is handled
+  //         successfully (e.g. checkpointed).
+  process::Future<Try<Nothing> > update(
+      const StatusUpdate& update,
+      bool checkpoint,
+      const Option<std::string>& path);
+
+  // Receives the ACK from the scheduler and checkpoints it to disk if
+  // necessary. Also, sends the next pending status update, if any.
+  // @return A future of indicating whether the acknowledgement
+  //         is handled successfully (e.g. checkpointed).
+  process::Future<Try<Nothing> > acknowledgement(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const std::string& uuid);
+
+  // TODO(vinod): Remove this hack once the new leader detector code is merged.
+  void newMasterDetected(const UPID& pid);
+
+  // Closes all the status update streams corresponding to this framework.
+  // NOTE: This stops retrying any pending status updates for this framework.
+  void cleanup(const FrameworkID& frameworkId);
+
+private:
+  StatusUpdateManagerProcess* process;
+};
+
+
+// StatusUpdateStream handles the status updates and acknowledgements
+// of a task, checkpointing them if necessary. It also holds the information
+// about received, acknowledged and pending status updates.
+// NOTE: A task is expected to have a globally unique ID across the lifetime
+// of a framework. In other words the tuple (taskId, frameworkId) should be
+// always unique.
+struct StatusUpdateStream
+{
+  StatusUpdateStream(const TaskID& _taskId,
+                     const FrameworkID& _frameworkId,
+                     const Option<std::string>& _path,
+                     int oflag = O_CREAT | O_RDWR)
+    : taskId(_taskId), frameworkId(_frameworkId), path(_path), error(None())
+  {
+    if (path.isSome()) {
+      // Create the base updates directory, if it doesn't exist.
+      Try<Nothing> mkdir = os::mkdir(os::dirname(path.get()).get());
+      if (mkdir.isError()) {
+        error = "Failed to create " + os::dirname(path.get()).get();
+        return;
+      }
+
+      // Open the updates file.
+      Try<int> result = os::open(
+          path.get(), oflag | O_SYNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+      if(result.isError()) {
+        error = "Failed to open '" + path.get() + "' for status updates";
+        return;
+      }
+
+      // We keep the file open through the lifetime of the task, because it
+      // makes it easy to append status update records to the file.
+      fd = result.get();
+    }
+  }
+
+  ~StatusUpdateStream()
+  {
+    if (fd.isSome()) {
+      Try<Nothing> close = os::close(fd.get());
+      if (close.isError()) {
+        LOG(ERROR) << "Failed to close file '" << path.get() << "': "
+                   << close.error();
+      }
+    }
+  }
+
+  Try<Nothing> update(const StatusUpdate& update)
+  {
+    if (error.isSome()) {
+      return Error(error.get());
+    }
+
+    // Check that this status update has not already been acknowledged.
+    // This could happen in the rare case when the slave received the ACK
+    // from the framework, died, but slave's ACK to the executor never made it!
+    if (acknowledged.contains(update.uuid())) {
+      LOG(WARNING) << "Ignoring status update " << update
+                   << " that has already been acknowledged by the framework!";
+      return Nothing();
+    }
+
+    // Check that this update hasn't already been received.
+    // This could happen if the slave receives a status update from an executor,
+    // then crashes after it writes it to disk but before it sends an ack.
+    if (received.contains(update.uuid())) {
+      LOG(WARNING) << "Ignoring duplicate status update " << update;
+      return Nothing();
+    }
+
+    // Handle the update, checkpointing if necessary.
+    return handle(update, StatusUpdateRecord::UPDATE);
+  }
+
+  Try<Nothing> acknowledgement(
+      const TaskID& taskId,
+      const FrameworkID& frameworkId,
+      const std::string& uuid,
+      const StatusUpdate& update)
+  {
+    if (error.isSome()) {
+      return Error(error.get());
+    }
+
+    CHECK(uuid.compare(update.uuid()) == 0)
+      << "Unexpected UUID mismatch! (received " << uuid
+      << ", expecting " << update.uuid()
+      << ") for update " << stringify(update);
+
+    // Handle the ACK, checkpointing if necessary.
+    return handle(update, StatusUpdateRecord::ACK);
+  }
+
+  // Returns the next update (or none, if empty) in the queue.
+  Result<StatusUpdate> next()
+  {
+    if (error.isSome()) {
+      return Error(error.get());
+    }
+
+    if (!pending.empty()) {
+      return pending.front();
+    }
+
+    return None();
+  }
+
+  // TODO(vinod): Explore semantics to make 'timeout' and 'pending' private.
+  Option<Timeout> timeout; // Timeout for resending status update.
+  std::queue<StatusUpdate> pending;
+
+private:
+  // Handles the status update and writes it to disk, if necessary.
+  // TODO(vinod): The write has to be asynchronous to avoid status updates that
+  // are being checkpointed, blocking the processing of other updates.
+  // One solution is to wrap the protobuf::write inside async, but its probably
+  // too much of an overhead to spin up a new libprocess per status update?
+  // A better solution might be to be have async write capability for file io.
+  Try<Nothing> handle(
+      const StatusUpdate& update,
+      const StatusUpdateRecord::Type& type)
+  {
+    CHECK(error.isNone());
+
+    LOG(INFO) << "Handling " << type << " for status update " << update;
+
+    // Checkpoint the update if necessary.
+    if (path.isSome()) {
+      LOG(INFO) << "Checkpointing " << type << " for status update " << update;
+
+      CHECK_SOME(fd);
+
+      StatusUpdateRecord record;
+      record.set_type(type);
+
+      if (type == StatusUpdateRecord::UPDATE) {
+        record.mutable_update()->CopyFrom(update);
+      } else {
+        record.set_uuid(update.uuid());
+      }
+
+      Try<Nothing> write = ::protobuf::write(fd.get(), record);
+      if (write.isError()) {
+        error = "Failed to write status update " + stringify(update) +
+                " to '" + path.get() + "': " + write.error();
+        return Error(error.get());
+      }
+    }
+
+    if (type == StatusUpdateRecord::UPDATE) {
+      // Record this update.
+      received.insert(update.uuid());
+
+      // Add it to the pending updates queue.
+      pending.push(update);
+    } else {
+      // Record this ACK.
+      acknowledged.insert(update.uuid());
+
+      // Remove the corresponding update from the pending queue.
+      pending.pop();
+    }
+
+    return Nothing();
+  }
+
+  const TaskID taskId;
+  const FrameworkID frameworkId;
+
+  hashset<std::string> received;
+  hashset<std::string> acknowledged;
+
+  const Option<std::string> path; // File path of the update stream.
+  Option<int> fd; // File descriptor to the update stream.
+
+  Option<std::string> error; // Potential non-retryable error.
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+
+#endif // __STATUS_UPDATE_MANAGER_HPP__

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp Wed Mar 13 06:23:00 2013
@@ -69,6 +69,12 @@ public:
     return Seconds(seconds > 0 ? seconds : 0);
   }
 
+  // Returns true if the timeout expired.
+  bool expired() const
+  {
+    return (timeout - Clock::now()) <= 0;
+  }
+
 private:
   double timeout;
 };