You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/03/13 07:23:00 UTC
svn commit: r1455810 - in /incubator/mesos/trunk: src/ src/common/
src/local/ src/messages/ src/slave/ third_party/libprocess/include/process/
Author: vinodkone
Date: Wed Mar 13 06:23:00 2013
New Revision: 1455810
URL: http://svn.apache.org/r1455810
Log:
Added status update manager.
Review: https://reviews.apache.org/r/7212
Added:
incubator/mesos/trunk/src/slave/status_update_manager.cpp
incubator/mesos/trunk/src/slave/status_update_manager.hpp
Modified:
incubator/mesos/trunk/src/Makefile.am
incubator/mesos/trunk/src/common/protobuf_utils.hpp
incubator/mesos/trunk/src/common/type_utils.hpp
incubator/mesos/trunk/src/local/local.cpp
incubator/mesos/trunk/src/messages/messages.proto
incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp
Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Wed Mar 13 06:23:00 2013
@@ -161,7 +161,8 @@ libmesos_no_third_party_la_SOURCES =
master/constants.cpp \
master/drf_sorter.cpp \
master/frameworks_manager.cpp \
- master/http.cpp master/master.cpp \
+ master/http.cpp \
+ master/master.cpp \
master/slaves_manager.cpp \
slave/constants.cpp \
slave/gc.cpp \
@@ -172,6 +173,7 @@ libmesos_no_third_party_la_SOURCES =
slave/isolation_module.cpp \
slave/process_based_isolation_module.cpp \
slave/reaper.cpp \
+ slave/status_update_manager.cpp \
launcher/launcher.cpp \
exec/exec.cpp \
common/lock.cpp \
@@ -233,6 +235,7 @@ libmesos_no_third_party_la_SOURCES += co
slave/cgroups_isolation_module.hpp \
slave/lxc_isolation_module.hpp \
slave/paths.hpp slave/state.hpp \
+ slave/status_update_manager.hpp \
slave/process_based_isolation_module.hpp slave/reaper.hpp \
slave/slave.hpp slave/solaris_project_isolation_module.hpp \
tests/environment.hpp tests/script.hpp \
Modified: incubator/mesos/trunk/src/common/protobuf_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/protobuf_utils.hpp?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/protobuf_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/protobuf_utils.hpp Wed Mar 13 06:23:00 2013
@@ -22,6 +22,7 @@
#include <process/process.hpp>
#include <process/protobuf.hpp>
+#include <stout/none.hpp>
#include <stout/uuid.hpp>
#include "common/type_utils.hpp"
@@ -46,26 +47,25 @@ inline StatusUpdate createStatusUpdate(
const SlaveID& slaveId,
const TaskID& taskId,
const TaskState& state,
- const std::string& message,
- const ExecutorID& executorId = ExecutorID())
+ const std::string& message = "",
+ const Option<ExecutorID>& executorId = None())
{
StatusUpdate update;
+ update.set_timestamp(process::Clock::now());
+ update.set_uuid(UUID::random().toBytes());
update.mutable_framework_id()->MergeFrom(frameworkId);
+ update.mutable_slave_id()->MergeFrom(slaveId);
- if (!(executorId == "")) {
- update.mutable_executor_id()->MergeFrom(executorId);
+ if (executorId.isSome()) {
+ update.mutable_executor_id()->MergeFrom(executorId.get());
}
- update.mutable_slave_id()->MergeFrom(slaveId);
TaskStatus* status = update.mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
status->set_state(state);
status->set_message(message);
- update.set_timestamp(::process::Clock::now());
- update.set_uuid(UUID::random().toBytes());
-
return update;
}
Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Wed Mar 13 06:23:00 2013
@@ -34,52 +34,49 @@
namespace mesos {
-inline std::ostream& operator << (std::ostream& stream, const FrameworkID& frameworkId)
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const FrameworkID& frameworkId)
{
- stream << frameworkId.value();
- return stream;
+ return stream << frameworkId.value();
}
inline std::ostream& operator << (std::ostream& stream, const OfferID& offerId)
{
- stream << offerId.value();
- return stream;
+ return stream << offerId.value();
}
inline std::ostream& operator << (std::ostream& stream, const SlaveID& slaveId)
{
- stream << slaveId.value();
- return stream;
+ return stream << slaveId.value();
}
inline std::ostream& operator << (std::ostream& stream, const TaskID& taskId)
{
- stream << taskId.value();
- return stream;
+ return stream << taskId.value();
}
-inline std::ostream& operator << (std::ostream& stream, const ExecutorID& executorId)
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const ExecutorID& executorId)
{
- stream << executorId.value();
- return stream;
+ return stream << executorId.value();
}
inline std::ostream& operator << (std::ostream& stream, const TaskState& state)
{
- stream << TaskState_descriptor()->FindValueByNumber(state)->name();
- return stream;
+ return stream << TaskState_descriptor()->FindValueByNumber(state)->name();
}
inline std::ostream& operator << (std::ostream& stream, const TaskInfo& task)
{
- stream << task.DebugString();
- return stream;
+ return stream << task.DebugString();
}
@@ -205,8 +202,9 @@ inline bool operator == (const Environme
}
-inline bool operator == (const CommandInfo::URI& left,
- const CommandInfo::URI& right)
+inline bool operator == (
+ const CommandInfo::URI& left,
+ const CommandInfo::URI& right)
{
return left.has_executable() == right.has_executable() &&
(!left.has_executable() || (left.executable() == right.executable())) &&
@@ -299,10 +297,23 @@ inline std::size_t hash_value(const Exec
namespace internal {
-inline std::ostream& operator << (std::ostream& stream, const Task* task)
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const StatusUpdate& update)
{
- stream << "task " << task->framework_id() << ":" << task->task_id();
- return stream;
+ return stream
+ << update.status().state()
+ << " from task " << update.status().task_id()
+ << " of framework " << update.framework_id();
+}
+
+
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const StatusUpdateRecord::Type& type)
+{
+ return stream
+ << StatusUpdateRecord::Type_descriptor()->FindValueByNumber(type)->name();
}
}} // namespace mesos { namespace internal {
Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Wed Mar 13 06:23:00 2013
@@ -132,6 +132,10 @@ PID<Master> launch(const Configuration&
// TODO(benh): Create a local isolation module?
ProcessBasedIsolationModule* isolationModule =
new ProcessBasedIsolationModule();
+
+ // Use a different work directory for each slave.
+ flags.work_dir = path::join(flags.work_dir, stringify(i));
+
Slave* slave = new Slave(flags, true, isolationModule, files);
slaves[isolationModule] = slave;
pids.push_back(process::spawn(slave));
Modified: incubator/mesos/trunk/src/messages/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/messages.proto?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/messages.proto (original)
+++ incubator/mesos/trunk/src/messages/messages.proto Wed Mar 13 06:23:00 2013
@@ -59,6 +59,20 @@ message StatusUpdate {
}
+// This message encapsulates how we checkpoint a status update to disk.
+// NOTE: If type == UPDATE, the 'update' field is required.
+// NOTE: If type == ACK, the 'uuid' field is required.
+message StatusUpdateRecord {
+ enum Type {
+ UPDATE = 0;
+ ACK = 1;
+ }
+ required Type type = 1;
+ optional StatusUpdate update = 2;
+ optional bytes uuid = 3;
+}
+
+
message Slaves
{
repeated SlaveInfo infos = 1;
Added: incubator/mesos/trunk/src/slave/status_update_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.cpp?rev=1455810&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.cpp (added)
+++ incubator/mesos/trunk/src/slave/status_update_manager.cpp Wed Mar 13 06:23:00 2013
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/delay.hpp>
+#include <process/process.hpp>
+#include <process/timer.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "stout/foreach.hpp"
+#include "stout/hashmap.hpp"
+#include "stout/hashset.hpp"
+#include "stout/protobuf.hpp"
+#include "stout/utils.hpp"
+
+#include "slave/constants.hpp"
+#include "slave/slave.hpp"
+#include "slave/status_update_manager.hpp"
+
+using std::string;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class StatusUpdateManagerProcess
+ : public ProtobufProcess<StatusUpdateManagerProcess>
+{
+public:
+ StatusUpdateManagerProcess() {}
+ virtual ~StatusUpdateManagerProcess();
+
+ void initialize(const PID<Slave>& slave);
+
+ Try<Nothing> update(
+ const StatusUpdate& update,
+ bool checkpoint,
+ const Option<std::string>& path);
+
+ Try<Nothing> acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const string& uuid);
+
+ void newMasterDetected(const UPID& pid);
+
+ void cleanup(const FrameworkID& frameworkId);
+
+private:
+ // Status update timeout.
+ void timeout();
+
+ // Forwards the status update to the master and starts a timer to check
+ // for ACK from the scheduler.
+ // NOTE: This should only be used for those messages that expect an
+ // ACK (e.g updates from the executor).
+ Timeout forward(const StatusUpdate& update);
+
+ // Helper functions.
+
+ // Creates a new status update stream (opening the updates file, if path is
+ // present) and adds it to streams.
+ StatusUpdateStream* createStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const Option<std::string>& path);
+
+ StatusUpdateStream* getStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId);
+
+ void cleanupStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId);
+
+ UPID master;
+ PID<Slave> slave;
+ hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams;
+};
+
+
+StatusUpdateManagerProcess::~StatusUpdateManagerProcess()
+{
+ foreachkey (const FrameworkID& frameworkId, streams) {
+ foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
+ delete stream;
+ }
+ }
+ streams.clear();
+}
+
+
+void StatusUpdateManagerProcess::initialize(const process::PID<Slave>& _slave)
+{
+ slave = _slave;
+}
+
+
+void StatusUpdateManagerProcess::newMasterDetected(const UPID& pid)
+{
+ LOG(INFO) << "New master detected at " << pid;
+ master = pid;
+}
+
+
+void StatusUpdateManagerProcess::cleanup(const FrameworkID& frameworkId)
+{
+ LOG(INFO) << "Closing status update streams for framework " << frameworkId;
+
+ if (streams.contains(frameworkId)) {
+ foreachkey (const TaskID& taskId, utils::copy(streams[frameworkId])) {
+ cleanupStatusUpdateStream(taskId, frameworkId);
+ }
+ }
+}
+
+
+Try<Nothing> StatusUpdateManagerProcess::update(
+ const StatusUpdate& update,
+ bool checkpoint,
+ const Option<string>& path)
+{
+ CHECK(!checkpoint || path.isSome())
+ << "Asked to checkpoint update " << update << " without providing a path";
+
+ const TaskID& taskId = update.status().task_id();
+ const FrameworkID& frameworkId = update.framework_id();
+
+ LOG(INFO) << "Received status update " << update;
+
+ // Write the status update to disk and enqueue it to send it to the master.
+ // Create/Get the status update stream for this task.
+ StatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
+ if (stream == NULL) {
+ stream = createStatusUpdateStream(taskId, frameworkId, path);
+ }
+
+ // Handle the status update.
+ Try<Nothing> result = stream->update(update);
+ if (result.isError()) {
+ return result;
+ }
+
+ // Forward the status update to the master if this is the first in the stream.
+ // Subsequent status updates will get sent in 'acknowledgement()'.
+ if (stream->pending.size() == 1) {
+ CHECK(stream->timeout.isNone());
+ const Result<StatusUpdate>& next = stream->next();
+ if (next.isError()) {
+ return Error(next.error());
+ }
+
+ CHECK_SOME(next);
+ stream->timeout = forward(next.get());
+ }
+
+ return Nothing();
+}
+
+
+Timeout StatusUpdateManagerProcess::forward(const StatusUpdate& update)
+{
+ LOG(INFO) << "Forwarding status update " << update
+ << " to the master at " << master;
+
+ StatusUpdateMessage message;
+ message.mutable_update()->MergeFrom(update);
+ message.set_pid(slave); // The ACK will be first received by the slave.
+
+ send(master, message);
+
+ // Send a message to self to resend after some delay if no ACK is received.
+ return delay(STATUS_UPDATE_RETRY_INTERVAL,
+ self(),
+ &StatusUpdateManagerProcess::timeout).timeout();
+}
+
+
+Try<Nothing> StatusUpdateManagerProcess::acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const string& uuid)
+{
+ LOG(INFO) << "Received status update acknowledgement"
+ << " for task " << taskId
+ << " of framework " << frameworkId;
+
+ StatusUpdateStream* stream = getStatusUpdateStream(taskId, frameworkId);
+
+ // This might happen if we haven't completed recovery yet or if the
+ // acknowledgement is for a stream that has been cleaned up.
+ if (stream == NULL) {
+ return Error(
+ "Cannot find the status update stream for task " + stringify(taskId) +
+ " of framework " + stringify(frameworkId));
+ }
+
+ // Get the corresponding update for this ACK.
+ const Result<StatusUpdate>& update = stream->next();
+ if (update.isError()) {
+ return Error(update.error());
+ }
+
+ // This might happen if we retried a status update and got back
+ // acknowledgments for both the original and the retried update.
+ if (update.isNone()) {
+ LOG(WARNING) << "Ignoring unexpected status update acknowledgment"
+ << " for task " << taskId
+ << " of framework " << frameworkId;
+ return Nothing();
+ }
+
+ // Handle the acknowledgement.
+ Try<Nothing> result =
+ stream->acknowledgement(taskId, frameworkId, uuid, update.get());
+
+ if (result.isError()) {
+ return result;
+ }
+
+ // Reset the timeout.
+ stream->timeout = None();
+
+ // Get the next update in the queue.
+ const Result<StatusUpdate>& next = stream->next();
+ if (next.isError()) {
+ return Error(next.error());
+ }
+
+ if (protobuf::isTerminalState(update.get().status().state())) {
+ if (next.isSome()) {
+ LOG(WARNING) << "Acknowledged a terminal"
+ << " status update " << update.get()
+ << " but updates are still pending";
+ }
+ cleanupStatusUpdateStream(taskId, frameworkId);
+ } else if (next.isSome()) {
+ // Forward the next queued status update.
+ stream->timeout = forward(next.get());
+ }
+
+ return Nothing();
+}
+
+
+// TODO(vinod): There should be a limit on the retries.
+void StatusUpdateManagerProcess::timeout()
+{
+ LOG(INFO) << "Checking for unacknowledged status updates";
+ // Check and see if we should resend any status updates.
+ foreachkey (const FrameworkID& frameworkId, streams) {
+ foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
+ CHECK_NOTNULL(stream);
+ if (!stream->pending.empty()) {
+ CHECK(stream->timeout.isSome());
+ if (stream->timeout.get().expired()) {
+ const StatusUpdate& update = stream->pending.front();
+ LOG(WARNING) << "Resending status update " << update;
+ stream->timeout = forward(update);
+ }
+ }
+ }
+ }
+}
+
+
+StatusUpdateStream* StatusUpdateManagerProcess::createStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const Option<string>& path)
+{
+ LOG(INFO) << "Creating StatusUpdate stream for task " << taskId
+ << " of framework " << frameworkId;
+
+ StatusUpdateStream* stream =
+ new StatusUpdateStream(taskId, frameworkId, path);
+
+ streams[frameworkId][taskId] = stream;
+ return stream;
+}
+
+
+StatusUpdateStream* StatusUpdateManagerProcess::getStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId)
+{
+ if (!streams.contains(frameworkId)) {
+ return NULL;
+ }
+
+ if (!streams[frameworkId].contains(taskId)) {
+ return NULL;
+ }
+
+ return streams[frameworkId][taskId];
+}
+
+
+void StatusUpdateManagerProcess::cleanupStatusUpdateStream(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId)
+{
+ LOG(INFO) << "Cleaning up status update stream"
+ << " for task " << taskId
+ << " of framework " << frameworkId;
+
+ CHECK(streams.contains(frameworkId))
+ << "Cannot find the status update streams for framework " << frameworkId;
+
+ CHECK(streams[frameworkId].contains(taskId))
+ << "Cannot find the status update streams for task " << taskId;
+
+ StatusUpdateStream* stream = streams[frameworkId][taskId];
+
+ streams[frameworkId].erase(taskId);
+ if (streams[frameworkId].empty()) {
+ streams.erase(frameworkId);
+ }
+
+ delete stream;
+}
+
+
+StatusUpdateManager::StatusUpdateManager()
+{
+ process = new StatusUpdateManagerProcess();
+ spawn(process);
+}
+
+
+StatusUpdateManager::~StatusUpdateManager()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}
+
+
+void StatusUpdateManager::initialize(const PID<Slave>& slave)
+{
+ dispatch(process, &StatusUpdateManagerProcess::initialize, slave);
+}
+
+
+Future<Try<Nothing> > StatusUpdateManager::update(
+ const StatusUpdate& update,
+ bool checkpoint,
+ const Option<std::string>& path)
+{
+ return dispatch(
+ process, &StatusUpdateManagerProcess::update, update, checkpoint, path);
+}
+
+
+Future<Try<Nothing> > StatusUpdateManager::acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const std::string& uuid)
+{
+ return dispatch(
+ process,
+ &StatusUpdateManagerProcess::acknowledgement,
+ taskId,
+ frameworkId,
+ uuid);
+}
+
+
+void StatusUpdateManager::newMasterDetected(const UPID& pid)
+{
+ dispatch(process, &StatusUpdateManagerProcess::newMasterDetected, pid);
+}
+
+
+void StatusUpdateManager::cleanup(const FrameworkID& frameworkId)
+{
+ dispatch(process, &StatusUpdateManagerProcess::cleanup, frameworkId);
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
Added: incubator/mesos/trunk/src/slave/status_update_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.hpp?rev=1455810&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.hpp (added)
+++ incubator/mesos/trunk/src/slave/status_update_manager.hpp Wed Mar 13 06:23:00 2013
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STATUS_UPDATE_MANAGER_HPP__
+#define __STATUS_UPDATE_MANAGER_HPP__
+
+#include <ostream>
+#include <queue>
+#include <string>
+#include <utility>
+
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/utils.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "messages/messages.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declarations.
+class StatusUpdateManagerProcess;
+struct StatusUpdateStream;
+
+
+// StatusUpdateManager is responsible for
+// 1) Reliably sending status updates to the master (and hence, the scheduler).
+// 2) Checkpointing the update to disk (optional).
+// 3) Sending ACKs to the executor (optional).
+// 4) Receiving ACKs from the scheduler.
+class StatusUpdateManager
+{
+public:
+ StatusUpdateManager();
+ virtual ~StatusUpdateManager();
+
+ void initialize(const PID<Slave>& slave);
+
+ // Enqueues the status update to reliably send the update to the master.
+ // If 'path' is provided, the update is also checkpointed to the given path.
+ // @return A future indicating whether the update is handled
+ // successfully (e.g. checkpointed).
+ process::Future<Try<Nothing> > update(
+ const StatusUpdate& update,
+ bool checkpoint,
+ const Option<std::string>& path);
+
+ // Receives the ACK from the scheduler and checkpoints it to disk if
+ // necessary. Also, sends the next pending status update, if any.
+ // @return A future of indicating whether the acknowledgement
+ // is handled successfully (e.g. checkpointed).
+ process::Future<Try<Nothing> > acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const std::string& uuid);
+
+ // TODO(vinod): Remove this hack once the new leader detector code is merged.
+ void newMasterDetected(const UPID& pid);
+
+ // Closes all the status update streams corresponding to this framework.
+ // NOTE: This stops retrying any pending status updates for this framework.
+ void cleanup(const FrameworkID& frameworkId);
+
+private:
+ StatusUpdateManagerProcess* process;
+};
+
+
+// StatusUpdateStream handles the status updates and acknowledgements
+// of a task, checkpointing them if necessary. It also holds the information
+// about received, acknowledged and pending status updates.
+// NOTE: A task is expected to have a globally unique ID across the lifetime
+// of a framework. In other words the tuple (taskId, frameworkId) should be
+// always unique.
+struct StatusUpdateStream
+{
+ StatusUpdateStream(const TaskID& _taskId,
+ const FrameworkID& _frameworkId,
+ const Option<std::string>& _path,
+ int oflag = O_CREAT | O_RDWR)
+ : taskId(_taskId), frameworkId(_frameworkId), path(_path), error(None())
+ {
+ if (path.isSome()) {
+ // Create the base updates directory, if it doesn't exist.
+ Try<Nothing> mkdir = os::mkdir(os::dirname(path.get()).get());
+ if (mkdir.isError()) {
+ error = "Failed to create " + os::dirname(path.get()).get();
+ return;
+ }
+
+ // Open the updates file.
+ Try<int> result = os::open(
+ path.get(), oflag | O_SYNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ if(result.isError()) {
+ error = "Failed to open '" + path.get() + "' for status updates";
+ return;
+ }
+
+ // We keep the file open through the lifetime of the task, because it
+ // makes it easy to append status update records to the file.
+ fd = result.get();
+ }
+ }
+
+ ~StatusUpdateStream()
+ {
+ if (fd.isSome()) {
+ Try<Nothing> close = os::close(fd.get());
+ if (close.isError()) {
+ LOG(ERROR) << "Failed to close file '" << path.get() << "': "
+ << close.error();
+ }
+ }
+ }
+
+ Try<Nothing> update(const StatusUpdate& update)
+ {
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ // Check that this status update has not already been acknowledged.
+ // This could happen in the rare case when the slave received the ACK
+ // from the framework, died, but slave's ACK to the executor never made it!
+ if (acknowledged.contains(update.uuid())) {
+ LOG(WARNING) << "Ignoring status update " << update
+ << " that has already been acknowledged by the framework!";
+ return Nothing();
+ }
+
+ // Check that this update hasn't already been received.
+ // This could happen if the slave receives a status update from an executor,
+ // then crashes after it writes it to disk but before it sends an ack.
+ if (received.contains(update.uuid())) {
+ LOG(WARNING) << "Ignoring duplicate status update " << update;
+ return Nothing();
+ }
+
+ // Handle the update, checkpointing if necessary.
+ return handle(update, StatusUpdateRecord::UPDATE);
+ }
+
+ Try<Nothing> acknowledgement(
+ const TaskID& taskId,
+ const FrameworkID& frameworkId,
+ const std::string& uuid,
+ const StatusUpdate& update)
+ {
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ CHECK(uuid.compare(update.uuid()) == 0)
+ << "Unexpected UUID mismatch! (received " << uuid
+ << ", expecting " << update.uuid()
+ << ") for update " << stringify(update);
+
+ // Handle the ACK, checkpointing if necessary.
+ return handle(update, StatusUpdateRecord::ACK);
+ }
+
+ // Returns the next update (or none, if empty) in the queue.
+ Result<StatusUpdate> next()
+ {
+ if (error.isSome()) {
+ return Error(error.get());
+ }
+
+ if (!pending.empty()) {
+ return pending.front();
+ }
+
+ return None();
+ }
+
+ // TODO(vinod): Explore semantics to make 'timeout' and 'pending' private.
+ Option<Timeout> timeout; // Timeout for resending status update.
+ std::queue<StatusUpdate> pending;
+
+private:
+ // Handles the status update and writes it to disk, if necessary.
+ // TODO(vinod): The write has to be asynchronous to avoid status updates that
+ // are being checkpointed, blocking the processing of other updates.
+ // One solution is to wrap the protobuf::write inside async, but its probably
+ // too much of an overhead to spin up a new libprocess per status update?
+ // A better solution might be to be have async write capability for file io.
+ Try<Nothing> handle(
+ const StatusUpdate& update,
+ const StatusUpdateRecord::Type& type)
+ {
+ CHECK(error.isNone());
+
+ LOG(INFO) << "Handling " << type << " for status update " << update;
+
+ // Checkpoint the update if necessary.
+ if (path.isSome()) {
+ LOG(INFO) << "Checkpointing " << type << " for status update " << update;
+
+ CHECK_SOME(fd);
+
+ StatusUpdateRecord record;
+ record.set_type(type);
+
+ if (type == StatusUpdateRecord::UPDATE) {
+ record.mutable_update()->CopyFrom(update);
+ } else {
+ record.set_uuid(update.uuid());
+ }
+
+ Try<Nothing> write = ::protobuf::write(fd.get(), record);
+ if (write.isError()) {
+ error = "Failed to write status update " + stringify(update) +
+ " to '" + path.get() + "': " + write.error();
+ return Error(error.get());
+ }
+ }
+
+ if (type == StatusUpdateRecord::UPDATE) {
+ // Record this update.
+ received.insert(update.uuid());
+
+ // Add it to the pending updates queue.
+ pending.push(update);
+ } else {
+ // Record this ACK.
+ acknowledged.insert(update.uuid());
+
+ // Remove the corresponding update from the pending queue.
+ pending.pop();
+ }
+
+ return Nothing();
+ }
+
+ const TaskID taskId;
+ const FrameworkID frameworkId;
+
+ hashset<std::string> received;
+ hashset<std::string> acknowledged;
+
+ const Option<std::string> path; // File path of the update stream.
+ Option<int> fd; // File descriptor to the update stream.
+
+ Option<std::string> error; // Potential non-retryable error.
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+
+#endif // __STATUS_UPDATE_MANAGER_HPP__
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp?rev=1455810&r1=1455809&r2=1455810&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/timeout.hpp Wed Mar 13 06:23:00 2013
@@ -69,6 +69,12 @@ public:
return Seconds(seconds > 0 ? seconds : 0);
}
+ // Returns true if the timeout expired.
+ bool expired() const
+ {
+ return (timeout - Clock::now()) <= 0;
+ }
+
private:
double timeout;
};