You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/09/20 00:21:26 UTC
svn commit: r1387802 - in /incubator/mesos/trunk: src/ src/common/
src/slave/ src/tests/ third_party/libprocess/include/stout/
Author: benh
Date: Wed Sep 19 22:21:25 2012
New Revision: 1387802
URL: http://svn.apache.org/viewvc?rev=1387802&view=rev
Log:
Created slave "state" so that we can record information about running
executors and tasks _across_ slave restarts/upgrades (contributed by
Vinod Kone, https://reviews.apache.org/r/6842).
Added:
incubator/mesos/trunk/src/common/protobuf_utils.hpp
incubator/mesos/trunk/src/slave/paths.hpp
incubator/mesos/trunk/src/slave/state.cpp
incubator/mesos/trunk/src/slave/state.hpp
incubator/mesos/trunk/src/tests/slave_state_tests.cpp
Modified:
incubator/mesos/trunk/src/Makefile.am
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp
incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp
Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Wed Sep 19 22:21:25 2012
@@ -154,7 +154,7 @@ nodist_libmesos_no_third_party_la_SOURCE
libmesos_no_third_party_la_SOURCES = sched/sched.cpp local/local.cpp \
master/allocator.cpp master/drf_sorter.cpp \
master/frameworks_manager.cpp master/http.cpp master/master.cpp \
- master/slaves_manager.cpp slave/gc.cpp \
+ master/slaves_manager.cpp slave/gc.cpp slave/state.cpp \
slave/slave.cpp slave/http.cpp slave/isolation_module.cpp \
slave/process_based_isolation_module.cpp slave/reaper.cpp \
launcher/launcher.cpp exec/exec.cpp common/lock.cpp \
@@ -207,6 +207,7 @@ libmesos_no_third_party_la_SOURCES += co
slave/isolation_module.hpp slave/isolation_module_factory.hpp \
slave/cgroups_isolation_module.hpp \
slave/lxc_isolation_module.hpp \
+ slave/paths.hpp slave/state.hpp \
slave/process_based_isolation_module.hpp slave/reaper.hpp \
slave/slave.hpp slave/solaris_project_isolation_module.hpp \
slave/webui.hpp tests/external_test.hpp \
@@ -589,7 +590,7 @@ nodist_libjava_la_SOURCES = \
BUILT_SOURCES += java/jni/org_apache_mesos_MesosSchedulerDriver.h \
java/jni/org_apache_mesos_MesosExecutorDriver.h \
java/jni/org_apache_mesos_Log.h \
- java/jni/org_apache_mesos_state_Variable.h \
+ java/jni/org_apache_mesos_state_Variable.h \
java/jni/org_apache_mesos_state_ZooKeeperState.h
@@ -765,6 +766,7 @@ check_PROGRAMS += mesos-tests
mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp \
tests/master_tests.cpp tests/state_tests.cpp \
+ tests/slave_state_tests.cpp \
tests/gc_tests.cpp \
tests/resource_offers_tests.cpp \
tests/fault_tolerance_tests.cpp \
Added: incubator/mesos/trunk/src/common/protobuf_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/protobuf_utils.hpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/common/protobuf_utils.hpp (added)
+++ incubator/mesos/trunk/src/common/protobuf_utils.hpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROTOBUF_UTILS_HPP__
+#define __PROTOBUF_UTILS_HPP__
+
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/uuid.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "messages/messages.hpp"
+
+namespace mesos {
+namespace internal {
+namespace protobuf {
+
+inline bool isTerminalState(const TaskState& state)
+{
+ return (state == TASK_FINISHED ||
+ state == TASK_FAILED ||
+ state == TASK_KILLED ||
+ state == TASK_LOST);
+}
+
+
+inline StatusUpdate createStatusUpdate(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const TaskID& taskId,
+ const TaskState& state,
+ const ExecutorID& executorId = ExecutorID())
+{
+ StatusUpdate update;
+
+ update.mutable_framework_id()->MergeFrom(frameworkId);
+
+ if (!(executorId == "")) {
+ update.mutable_executor_id()->MergeFrom(executorId);
+ }
+
+ update.mutable_slave_id()->MergeFrom(slaveId);
+ TaskStatus* status = update.mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->set_state(state);
+ update.set_timestamp(::process::Clock::now());
+ update.set_uuid(UUID::random().toBytes());
+
+ return update;
+}
+
+
+inline Task createTask(const TaskInfo& task,
+ const TaskState& state,
+ const ExecutorID& executorId,
+ const FrameworkID& frameworkId)
+{
+ Task t;
+ t.mutable_framework_id()->MergeFrom(frameworkId);
+ t.set_state(state);
+ t.set_name(task.name());
+ t.mutable_task_id()->MergeFrom(task.task_id());
+ t.mutable_slave_id()->MergeFrom(task.slave_id());
+ t.mutable_resources()->MergeFrom(task.resources());
+
+ if (!task.has_command()) {
+ t.mutable_executor_id()->MergeFrom(executorId);
+ }
+
+ return t;
+}
+
+} // namespace protobuf
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROTOBUF_UTILS_HPP__
Added: incubator/mesos/trunk/src/slave/paths.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/paths.hpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/paths.hpp (added)
+++ incubator/mesos/trunk/src/slave/paths.hpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SLAVE_PATHS_HPP__
+#define __SLAVE_PATHS_HPP__
+
+#include <list>
+
+#include "stout/foreach.hpp"
+#include "stout/hashmap.hpp"
+#include "stout/hashset.hpp"
+#include "stout/numify.hpp"
+#include "stout/strings.hpp"
+#include "stout/try.hpp"
+#include "stout/utils.hpp"
+
+#include "common/type_utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "process/pid.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace paths {
+
+// Helper functions to generate paths.
+// Path layout templates.
+const std::string ROOT_PATH = "%s";
+const std::string SLAVEID_PATH = ROOT_PATH + "/slaves/slave.id";
+const std::string SLAVE_PATH = ROOT_PATH + "/slaves/%s";
+const std::string FRAMEWORK_PATH = SLAVE_PATH + "/frameworks/%s";
+const std::string FRAMEWORK_PID_PATH = FRAMEWORK_PATH + "/framework.pid";
+const std::string EXECUTOR_PATH = FRAMEWORK_PATH + "/executors/%s";
+const std::string EXECUTOR_RUN_PATH = EXECUTOR_PATH + "/runs/%s";
+const std::string PIDS_PATH = EXECUTOR_RUN_PATH + "/pids";
+const std::string LIBPROCESS_PID_PATH = PIDS_PATH + "/libprocess.pid";
+const std::string EXECED_PID_PATH = PIDS_PATH + "/execed.pid";
+const std::string FORKED_PID_PATH = PIDS_PATH + "/forked.pid";
+const std::string TASK_PATH = EXECUTOR_RUN_PATH + "/tasks/%s";
+const std::string TASK_INFO_PATH = TASK_PATH + "/info";
+const std::string TASK_UPDATES_PATH = TASK_PATH + "/updates";
+
+
+inline std::string getSlaveIDPath(const std::string& rootDir)
+{
+ return strings::format(SLAVEID_PATH, rootDir).get();
+}
+
+
+inline std::string getSlavePath(const std::string& rootDir,
+ const SlaveID& slaveId)
+{
+ return strings::format(SLAVE_PATH, rootDir, slaveId).get();
+}
+
+
+inline std::string getFrameworkPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId)
+{
+ return strings::format(FRAMEWORK_PATH, rootDir, slaveId,
+ frameworkId).get();
+}
+
+
+inline std::string getFrameworkPIDPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId)
+{
+ return strings::format(FRAMEWORK_PID_PATH, rootDir, slaveId,
+ frameworkId).get();
+}
+
+
+inline std::string getExecutorPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ return strings::format(EXECUTOR_PATH, rootDir, slaveId, frameworkId,
+ executorId).get();
+}
+
+
+inline std::string getExecutorRunPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int run)
+{
+ return strings::format(EXECUTOR_RUN_PATH, rootDir, slaveId, frameworkId,
+ executorId, stringify(run)).get();
+}
+
+
+inline std::string getLibprocessPIDPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int run)
+{
+ return strings::format(LIBPROCESS_PID_PATH, rootDir, slaveId, frameworkId,
+ executorId, stringify(run)).get();
+}
+
+
+inline std::string getExecedPIDPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int run)
+{
+ return strings::format(EXECED_PID_PATH, rootDir, slaveId, frameworkId,
+ executorId, stringify(run)).get();
+}
+
+
+inline std::string getForkedPIDPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int run)
+{
+ return strings::format(FORKED_PID_PATH, rootDir, slaveId, frameworkId,
+ executorId, stringify(run)).get();
+}
+
+
+inline std::string getTaskPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int run,
+ const TaskID& taskId)
+{
+ return strings::format(TASK_PATH, rootDir, slaveId, frameworkId, executorId,
+ stringify(run), taskId).get();
+}
+
+
+inline std::string getTaskInfoPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int run,
+ const TaskID& taskId)
+{
+ return strings::format(TASK_PATH, rootDir, slaveId, frameworkId, executorId,
+ stringify(run), taskId).get();
+}
+
+
+inline std::string getTaskUpdatesPath(const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int run,
+ const TaskID& taskId)
+{
+ return strings::format(TASK_PATH, rootDir, slaveId, frameworkId, executorId,
+ stringify(run), taskId).get();
+}
+
+
+inline std::string createUniqueExecutorWorkDirectory(
+ const std::string& rootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ LOG(INFO) << "Generating a unique work directory for executor '" << executorId
+ << "' of framework " << frameworkId << " on slave " << slaveId;
+
+ // Find a unique directory based on the path given by the slave
+ // (this is because we might launch multiple executors from the same
+ // framework on this slave).
+ int run = -1;
+
+ Try<std::list<std::string> > paths = os::glob(
+ strings::format(EXECUTOR_RUN_PATH, rootDir, slaveId, frameworkId,
+ executorId, "*").get());
+
+ CHECK(!paths.isError()) << paths.error();
+
+ if (paths.isSome()) {
+ foreach (const std::string& path, paths.get()) {
+ Try<int> temp = numify<int>(os::basename(path));
+ if (temp.isError()) {
+ continue;
+ }
+ run = std::max(run, temp.get());
+ }
+ }
+
+ std::string path =
+ getExecutorRunPath(rootDir, slaveId, frameworkId, executorId, run + 1);
+
+ Try<Nothing> created = os::mkdir(path);
+
+ CHECK(created.isSome())
+ << "Error creating directory '" << path << "': " << created.error();
+
+ return path;
+}
+
+} // namespace paths {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_PATHS_HPP__
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Sep 19 22:21:25 2012
@@ -38,9 +38,11 @@
#include <stout/utils.hpp>
#include "common/build.hpp"
+#include "common/protobuf_utils.hpp"
#include "common/type_utils.hpp"
#include "slave/flags.hpp"
+#include "slave/paths.hpp"
#include "slave/slave.hpp"
namespace params = std::tr1::placeholders;
@@ -57,42 +59,6 @@ namespace mesos {
namespace internal {
namespace slave {
-// // Represents a pending status update that has been sent and we are
-// // waiting for an acknowledgement. In pa
-
-// // stream of status updates for a framework/task. Note
-// // that these are stored in the slave rather than per Framework
-// // because a framework might go away before all of the status
-// // updates have been sent and acknowledged.
-// struct Slave::StatusUpdateStream
-// {
-// StatusUpdateStreamID streamId;
-// string directory;
-// FILE* updates;
-// FILE* acknowledged;
-// queue<StatusUpdate> pending;
-// double timeout;
-// };
-
-
-// StatusUpdateStreamID id;
-
-
-
-// queue<StatusUpdate> pending;
-// double timeout;
-// };
-
-// Helper function that returns true if the task state is terminal
-bool isTerminalTaskState(TaskState state)
-{
- return state == TASK_FINISHED ||
- state == TASK_FAILED ||
- state == TASK_KILLED ||
- state == TASK_LOST;
-}
-
-
Slave::Slave(const Resources& _resources,
bool _local,
IsolationModule* _isolationModule,
@@ -559,7 +525,8 @@ void Slave::runTask(const FrameworkInfo&
} else {
// Launch an executor for this task.
const string& directory =
- createUniqueWorkDirectory(framework->id, executorId);
+ paths::createUniqueExecutorWorkDirectory(flags.work_dir, id,
+ framework->id, executorId);
LOG(INFO) << "Using '" << directory
<< "' as work directory for executor '" << executorId
@@ -754,77 +721,6 @@ void Slave::statusUpdateAcknowledgement(
}
-// void Slave::statusUpdateAcknowledged(const SlaveID& slaveId,
-// const FrameworkID& frameworkId,
-// const TaskID& taskId,
-// uint32_t sequence)
-// {
-// StatusUpdateStreamID id(frameworkId, taskId);
-// StatusUpdateStream* stream = getStatusUpdateStream(id);
-
-// if (stream == NULL) {
-// LOG(WARNING) << "WARNING! Received unexpected status update"
-// << " acknowledgement for task " << taskId
-// << " of framework " << frameworkId;
-// return;
-// }
-
-// CHECK(!stream->pending.empty());
-
-// const StatusUpdate& update = stream->pending.front();
-
-// if (update->sequence() != sequence) {
-// LOG(WARNING) << "WARNING! Received status update acknowledgement"
-// << " with bad sequence number (received " << sequence
-// << ", expecting " << update->sequence()
-// << ") for task " << taskId
-// << " of framework " << frameworkId;
-// } else {
-// LOG(INFO) << "Received status update acknowledgement for task "
-// << taskId << " of framework " << frameworkId;
-
-// // Write the update out to disk.
-// CHECK(stream->acknowledged != NULL);
-
-// Try<bool> result =
-// utils::protobuf::write(stream->acknowledged, update);
-
-// if (result.isError()) {
-// // Failing here is rather dramatic, but so is not being able to
-// // write to disk ... seems like failing early and often might do
-// // more benefit than harm.
-// LOG(FATAL) << "Failed to write status update to "
-// << stream->directory << "/acknowledged: "
-// << result.message();
-// }
-
-// stream->pending.pop();
-
-// bool empty = stream->pending.empty();
-
-// bool terminal =
-// update.status().state() == TASK_FINISHED &&
-// update.status().state() == TASK_FAILED &&
-// update.status().state() == TASK_KILLED &&
-// update.status().state() == TASK_LOST;
-
-// if (empty && terminal) {
-// cleanupStatusUpdateStream(stream);
-// } else if (!empty && terminal) {
-// LOG(WARNING) << "WARNING! Acknowledged a \"terminal\""
-// << " task status but updates are still pending";
-// } else if (!empty) {
-// StatusUpdateMessage message;
-// message.mutable_update()->MergeFrom(stream->pending.front());
-// message.set_reliable(true);
-// send(master, message);
-
-// stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
-// }
-// }
-// }
-
-
void Slave::registerExecutor(const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
@@ -904,126 +800,6 @@ void Slave::registerExecutor(const Frame
}
-// void Slave::statusUpdate(const StatusUpdate& update)
-// {
-// LOG(INFO) << "Received update that task " << update.status().task_id()
-// << " of framework " << update.framework_id()
-// << " is now in state " << update.status().state();
-
-// Framework* framework = getFramework(update.framework_id());
-// if (framework == NULL) {
-// LOG(WARNING) << "WARNING! Failed to lookup"
-// << " framework " << update.framework_id()
-// << " of received status update";
-// stats.invalidStatusUpdates++;
-// return;
-// }
-
-// Executor* executor = framework->getExecutor(update.status().task_id());
-// if (executor == NULL) {
-// LOG(WARNING) << "WARNING! Failed to lookup executor"
-// << " for framework " << update.framework_id()
-// << " of received status update";
-// stats.invalidStatusUpdates++;
-// return;
-// }
-
-// // Create/Get the status update stream for this framework/task.
-// StatusUpdateStreamID id(update.framework_id(), update.status().task_id());
-
-// if (!statusUpdateStreams.contains(id)) {
-// StatusUpdateStream* stream =
-// createStatusUpdateStream(id, executor->directory);
-
-// if (stream == NULL) {
-// LOG(WARNING) << "WARNING! Failed to create status update"
-// << " stream for task " << update.status().task_id()
-// << " of framework " << update.framework_id()
-// << " ... removing executor!";
-// removeExecutor(framework, executor);
-// return;
-// }
-// }
-
-// StatusUpdateStream* stream = getStatusUpdateStream(id);
-
-// CHECK(stream != NULL);
-
-// // If we are already waiting on an acknowledgement, check that this
-// // update (coming from the executor), is the same one that we are
-// // waiting on being acknowledged.
-
-// // Check that this is status update has not already been
-// // acknowledged. this could happen because a slave writes the
-// // acknowledged message but then fails before it can pass the
-// // message on to the executor, so the executor tries again.
-
-// returnhere;
-
-// // TODO(benh): Check that this update hasn't already been received
-// // or acknowledged! This could happen if a slave receives a status
-// // update from an executor, then crashes after it writes it to disk
-// // but before it sends an ack back to
-
-// // Okay, record this update as received.
-// CHECK(stream->received != NULL);
-
-// Result<bool> result =
-// utils::protobuf::write(stream->received, &update);
-
-// if (result.isError()) {
-// // Failing here is rather dramatic, but so is not being able to
-// // write to disk ... seems like failing early and often might do
-// // more benefit than harm.
-// LOG(FATAL) << "Failed to write status update to "
-// << stream->directory << "/received: "
-// << result.message();
-// }
-
-// // Now acknowledge the executor.
-// StatusUpdateAcknowledgementMessage message;
-// message.mutable_framework_id()->MergeFrom(update.framework_id());
-// message.mutable_slave_id()->MergeFrom(update.slave_id());
-// message.mutable_task_id()->MergeFrom(update.status().task_id());
-// send(executor->pid, message);
-
-// executor->updateTaskState(
-// update.status().task_id(),
-// update.status().state());
-
-// // Remove the task if it's reached a terminal state.
-// bool terminal =
-// update.status().state() == TASK_FINISHED &&
-// update.status().state() == TASK_FAILED &&
-// update.status().state() == TASK_KILLED &&
-// update.status().state() == TASK_LOST;
-
-// if (terminal) {
-// executor->removeTask(update.status().task_id());
-// isolationModule->resourcesChanged(
-// framework->id, framework->info,
-// executor->info, executor->resources);
-// }
-
-// stream->pending.push(update);
-
-// // Send the status update if this is the first in the
-// // stream. Subsequent status updates will get sent in
-// // Slave::statusUpdateAcknowledged.
-// if (stream->pending.size() == 1) {
-// CHECK(stream->timeout == -1);
-// StatusUpdateMessage message;
-// message.mutable_update()->MergeFrom(update);
-// message.set_reliable(true);
-// send(master, message);
-
-// stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
-// }
-
-// stats.tasks[status.state()]++;
-// stats.validStatusUpdates++;
-// }
-
void Slave::statusUpdate(const StatusUpdate& update)
{
const TaskStatus& status = update.status();
@@ -1039,7 +815,7 @@ void Slave::statusUpdate(const StatusUpd
executor->updateTaskState(status.task_id(), status.state());
// Handle the task appropriately if it's terminated.
- if (isTerminalTaskState(status.state())) {
+ if (protobuf::isTerminalState(status.state())) {
executor->removeTask(status.task_id());
dispatch(isolationModule,
@@ -1140,32 +916,6 @@ void Slave::statusUpdateTimeout(
}
-// void Slave::timeout()
-// {
-// // Check and see if we should re-send any status updates.
-// double now = Clock::now();
-
-// foreachvalue (StatusUpdateStream* stream, statusUpdateStreams) {
-// CHECK(stream->timeout > 0);
-// if (stream->timeout < now) {
-// CHECK(!stream->pending.empty());
-// const StatusUpdate& update = stream->pending.front();
-
-// LOG(WARNING) << "WARNING! Resending status update"
-// << " for task " << update.status().task_id()
-// << " of framework " << update.framework_id();
-
-// StatusUpdateMessage message;
-// message.mutable_update()->MergeFrom(update);
-// message.set_reliable(true);
-// send(master, message);
-
-// stream->timeout = now + STATUS_UPDATE_RETRY_INTERVAL;
-// }
-// }
-// }
-
-
void Slave::exited(const UPID& pid)
{
LOG(INFO) << "Process exited: " << from;
@@ -1188,174 +938,6 @@ Framework* Slave::getFramework(const Fra
}
-// StatusUpdates* Slave::getStatusUpdateStream(const StatusUpdateStreamID& id)
-// {
-// if (statusUpdateStreams.contains(id)) {
-// return statusUpdateStreams[id];
-// }
-
-// return NULL;
-// }
-
-
-// StatusUpdateStream* Slave::createStatusUpdateStream(
-// const FrameworkID& frameworkId,
-// const TaskID& taskId,
-// const string& directory)
-// {
-// StatusUpdateStream* stream = new StatusUpdates();
-// stream->id = id;
-// stream->directory = directory;
-// stream->received = NULL;
-// stream->acknowledged = NULL;
-// stream->timeout = -1;
-
-// streams[id] = stream;
-
-// // Open file descriptors for "updates" and "acknowledged".
-// string path;
-// Result<int> result;
-
-// path = stream->directory + "/received";
-// result = utils::os::open(path, O_CREAT | O_RDWR | O_SYNC);
-// if (result.isError() || result.isNone()) {
-// LOG(WARNING) << "Failed to open " << path
-// << " for storing received status updates";
-// cleanupStatusUpdateStream(stream);
-// return NULL;
-// }
-
-// stream->received = result.get();
-
-// path = updates->directory + "/acknowledged";
-// result = utils::os::open(path, O_CREAT | O_RDWR | O_SYNC);
-// if (result.isError() || result.isNone()) {
-// LOG(WARNING) << "Failed to open " << path <<
-// << " for storing acknowledged status updates";
-// cleanupStatusUpdateStream(stream);
-// return NULL;
-// }
-
-// stream->acknowledged = result.get();
-
-// // Replay the status updates. This is necessary because the slave
-// // might have crashed but was restarted before the executors
-// // died. Or another task with the same id as before got run again on
-// // the same executor.
-// bool replayed = replayStatusUpdateStream(stream);
-
-// if (!replayed) {
-// LOG(WARNING) << "Failed to correctly replay status updates"
-// << " for task " << taskId
-// << " of framework " << frameworkId
-// << " found at " << path;
-// cleanupStatusUpdateStream(stream);
-// return NULL;
-// }
-
-// // Start sending any pending status updates. In this case, the slave
-// // probably died after it sent the status update and never received
-// // the acknowledgement.
-// if (!stream->pending.empty()) {
-// StatusUpdate* update = stream->pending.front();
-// StatusUpdateMessage message;
-// message.mutable_update()->MergeFrom(*update);
-// message.set_reliable(true);
-// send(master, message);
-
-// stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
-// }
-
-// return stream;
-// }
-
-
-// bool Slave::replayStatusUpdateStream(StatusUpdateStream* stream)
-// {
-// CHECK(stream->received != NULL);
-// CHECK(stream->acknowledged != NULL);
-
-// Result<StatusUpdate*> result;
-
-// // Okay, now read all the recevied status updates.
-// hashmap<uint32_t, StatusUpdate> pending;
-
-// result = utils::protobuf::read(stream->received);
-// while (result.isSome()) {
-// StatusUpdate* update = result.get();
-// CHECK(!pending.contains(update->sequence()));
-// pending[update->sequence()] = *update;
-// delete update;
-// result = utils::protobuf::read(stream->received);
-// }
-
-// if (result.isError()) {
-// return false;
-// }
-
-// CHECK(result.isNone());
-
-// LOG(INFO) << "Recovered " << pending.size()
-// << " TOTAL status updates for task "
-// << stream->id.second << " of framework "
-// << stream->id.first;
-
-// // Okay, now get all the acknowledged status updates.
-// result = utils::protobuf::read(stream->acknowledged);
-// while (result.isSome()) {
-// StatusUpdate* update = result.get();
-// stream->sequence = std::max(stream->sequence, update->sequence());
-// CHECK(pending.contains(update->sequence()));
-// pending.erase(update->sequence());
-// delete update;
-// result = utils::protobuf::read(stream->acknowledged);
-// }
-
-// if (result.isError()) {
-// return false;
-// }
-
-// CHECK(result.isNone());
-
-// LOG(INFO) << "Recovered " << pending.size()
-// << " PENDING status updates for task "
-// << stream->id.second << " of framework "
-// << stream->id.first;
-
-// // Add the pending status updates in sorted order.
-// uint32_t sequence = 0;
-
-// while (!pending.empty()) {
-// // Find the smallest sequence number.
-// foreachvalue (const StatusUpdate& update, pending) {
-// sequence = std::min(sequence, update.sequence());
-// }
-
-// // Push that update and remove it from pending.
-// stream->pending.push(pending[sequence]);
-// pending.erase(sequence);
-// }
-
-// return true;
-// }
-
-
-// void Slave::cleanupStatusUpdateStream(StatusUpdateStream* stream)
-// {
-// if (stream->received != NULL) {
-// fclose(stream->received);
-// }
-
-// if (stream->acknowledged != NULL) {
-// fclose(stream->acknowledged);
-// }
-
-// streams.erase(stream->id);
-
-// delete stream;
-// }
-
-
// N.B. When the slave is running in "local" mode then the pid is
// uninteresting (and possibly could cause bugs).
void Slave::executorStarted(const FrameworkID& frameworkId,
@@ -1451,7 +1033,7 @@ void Slave::executorExited(const Framewo
// Transition all live tasks to TASK_LOST/TASK_FAILED.
foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
- if (!isTerminalTaskState(task->state())) {
+ if (!protobuf::isTerminalState(task->state())) {
isCommandExecutor = !task->has_executor_id();
transitionLiveTask(task->task_id(),
@@ -1543,58 +1125,6 @@ void Slave::shutdownExecutorTimeout(cons
}
-// void Slave::recover()
-// {
-// // if we find an executor that is no longer running and it's last
-// // acknowledged task statuses are not terminal, create a
-// // statusupdatestream for each task and try and reliably send
-// // TASK_LOST updates.
-
-// // otherwise once we reconnect the executor will just start sending
-// // us status updates that we need to send, wait for ack, write to
-// // disk, and then respond.
-// }
-
-
-string Slave::createUniqueWorkDirectory(const FrameworkID& frameworkId,
- const ExecutorID& executorId)
-{
- LOG(INFO) << "Generating a unique work directory for executor '"
- << executorId << "' of framework " << frameworkId;
-
- std::ostringstream out(std::ios_base::app | std::ios_base::out);
- out << flags.work_dir
- << "/slaves/" << id
- << "/frameworks/" << frameworkId
- << "/executors/" << executorId;
-
- // Find a unique directory based on the path given by the slave
- // (this is because we might launch multiple executors from the same
- // framework on this slave).
- // NOTE: The run number of the new directory will be the highest of
- // all the existing run directories for this executor.
- out << "/runs/";
-
- int maxrun = 0;
- foreach (const string& runStr, os::ls(out.str())) {
- Try<int> run = numify<int>(runStr);
- if (run.isError()) {
- LOG(ERROR) << "Ignoring invalid run directory " << runStr;
- continue;
- }
-
- maxrun = std::max(maxrun, run.get());
- }
- out << maxrun;
-
- Try<Nothing> mkdir = os::mkdir(out.str());
- CHECK(mkdir.isSome()) << "Error creating work directory '"
- << out.str() << "': " << mkdir.error();
-
- return out.str();
-}
-
-
// TODO(vinod): Figure out a way to express this function via cmd line.
Duration Slave::age(double usage)
{
@@ -1606,7 +1136,7 @@ void Slave::checkDiskUsage()
{
VLOG(1) << "Checking disk usage";
- // TODO(vinod): We are making usage a Future, so that we can plug-in
+ // TODO(vinod): We are making usage a Future, so that we can plug in
// os::usage() into async.
Future<Try<double> > usage = os::usage();
Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Sep 19 22:21:25 2012
@@ -33,6 +33,7 @@
#include "slave/gc.hpp"
#include "slave/http.hpp"
#include "slave/isolation_module.hpp"
+#include "slave/state.hpp"
#include "common/attributes.hpp"
#include "common/resources.hpp"
@@ -155,19 +156,6 @@ protected:
const ExecutorID& executorId,
const UUID& uuid);
-// // Create a new status update stream.
-// StatusUpdates* createStatusUpdateStream(const StatusUpdateStreamID& streamId,
-// const string& directory);
-
-// StatusUpdates* getStatusUpdateStream(const StatusUpdateStreamID& streamId);
-
- // Helper function for generating a unique work directory for this
- // framework/executor pair (non-trivial since a framework/executor
- // pair may be launched more than once on the same slave).
- std::string createUniqueWorkDirectory(const FrameworkID& frameworkId,
- const ExecutorID& executorId);
-
-
// This function returns the max age of executor/slave directories allowed,
// given a disk usage. This value could be used to tune gc.
Duration age(double usage);
@@ -222,6 +210,8 @@ private:
bool connected; // Flag to indicate if slave is registered.
GarbageCollector gc;
+
+ state::SlaveState state;
};
@@ -350,7 +340,7 @@ struct Framework
// Now determine the path to the executor.
Try<std::string> path = os::realpath(
- path::join(flags.launcher_dir, "mesos-executor"));
+ ::path::join(flags.launcher_dir, "mesos-executor"));
if (path.isSome()) {
executor.mutable_command()->set_value(path.get());
Added: incubator/mesos/trunk/src/slave/state.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/state.cpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/state.cpp (added)
+++ incubator/mesos/trunk/src/slave/state.cpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,214 @@
+#include <glog/logging.h>
+
+#include "stout/foreach.hpp"
+#include "stout/format.hpp"
+#include "stout/numify.hpp"
+#include "stout/os.hpp"
+#include "stout/protobuf.hpp"
+#include "stout/try.hpp"
+
+#include "slave/paths.hpp"
+#include "slave/state.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace state {
+
+using std::list;
+using std::string;
+using std::max;
+
+SlaveState parse(const string& rootDir, const SlaveID& slaveId)
+{
+ SlaveState state;
+
+ const string& slaveDir = paths::getSlavePath(rootDir, slaveId);
+ state.slaveMetaDir = slaveDir;
+
+ // Find the frameworks.
+ Try<list<string> > frameworks = os::glob(
+ strings::format(paths::FRAMEWORK_PATH, rootDir, slaveId, "*").get());
+
+ if (frameworks.isError()) {
+ LOG(ERROR) << "Error finding frameworks for slave " << frameworks.error();
+ return state;
+ }
+
+ foreach (const string& path, frameworks.get()) {
+ FrameworkID frameworkId;
+ frameworkId.set_value(os::basename(path));
+
+ // Find the executors.
+ Try<list<string> > executors =
+ os::glob(strings::format(paths::EXECUTOR_PATH, rootDir, slaveId,
+ frameworkId, "*").get());
+
+ if (executors.isError()) {
+ LOG(ERROR) << "Error finding executors for framework "
+ << executors.error();
+ continue;
+ }
+
+ foreach (const string& path, executors.get()) {
+ ExecutorID executorId;
+ executorId.set_value(os::basename(path));
+
+ // Find the runs.
+ Try<list<string> > runs =
+ os::glob(strings::format(paths::EXECUTOR_RUN_PATH, rootDir, slaveId,
+ frameworkId, executorId, "*").get());
+
+ if (runs.isError()) {
+ LOG(ERROR) << "Error finding runs for executor " << runs.error();
+ continue;
+ }
+
+ foreach (const string& path, runs.get()) {
+ Try<int> result = numify<int>(os::basename(path));
+ if (!result.isSome()) {
+ LOG(ERROR) << "Non-numeric run number in path " << path;
+ continue;
+ }
+
+ int run = result.get();
+
+ // Update max run.
+ state.frameworks[frameworkId].executors[executorId].latest =
+ max(run,
+ state.frameworks[frameworkId].executors[executorId].latest);
+
+ // Find the tasks.
+ Try<list<string> > tasks =
+ os::glob(strings::format(paths::TASK_PATH, rootDir, slaveId,
+ frameworkId, executorId, stringify(run),
+ "*").get());
+
+ if (tasks.isError()) {
+ LOG(WARNING) << "Error finding tasks " << tasks.error();
+ continue;
+ }
+
+ foreach (const string& path, tasks.get()) {
+ TaskID taskId;
+ taskId.set_value(os::basename(path));
+
+ state.frameworks[frameworkId].executors[executorId].runs[run].tasks
+ .insert(taskId);
+ }
+ }
+ }
+ }
+ return state;
+}
+
+
+// Helper functions for check-pointing slave data.
+void writeTask(Task* task, const string& taskDir)
+{
+ const string& path = taskDir + "/task";
+
+ Try<Nothing> created = os::mkdir(os::dirname(path));
+
+ CHECK(created.isSome())
+ << "Error creating directory '" << os::dirname(path)
+ << "': " << created.error();
+
+ LOG(INFO) << "Writing task description for task "
+ << task->task_id() << " to " << path;
+
+ Try<bool> result = protobuf::write(path, *task);
+
+ if (result.isError()) {
+ LOG(FATAL) << "Failed to write task description to disk " << result.error();
+ }
+}
+
+
+void writeSlaveID(const string& rootDir, const SlaveID& slaveId)
+{
+ const string& path = paths::getSlaveIDPath(rootDir);
+
+ Try<Nothing> created = os::mkdir(os::dirname(path));
+
+ CHECK(created.isSome())
+ << "Error creating directory '" << os::dirname(path)
+ << "': " << created.error();
+
+ LOG(INFO) << "Writing slave id " << slaveId << " to " << path;
+
+ Try<Nothing> result = os::write(path, stringify(slaveId));
+
+ CHECK(result.isSome())
+ << "Error writing slave id to disk " << strerror(errno);
+}
+
+
+SlaveID readSlaveID(const string& rootDir)
+{
+ const string& path = paths::getSlaveIDPath(rootDir);
+
+ Result<string> result = os::read(path);
+
+ SlaveID slaveId;
+
+ if (!result.isSome()) {
+ LOG(WARNING) << "Cannot read slave id from " << path << " because "
+ << result.isError() ? result.error() : "empty";
+ return slaveId;
+ }
+
+ LOG(INFO) << "Read slave id " << result.get() << " from " << path;
+
+ slaveId.set_value(result.get());
+ return slaveId;
+}
+
+
+void writeFrameworkPID(const string& metaRootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const string& pid)
+{
+ const string& path = paths::getFrameworkPIDPath(metaRootDir, slaveId,
+ frameworkId);
+
+ Try<Nothing> created = os::mkdir(os::dirname(path));
+
+ CHECK(created.isSome())
+ << "Error creating directory '" << os::dirname(path)
+ << "': " << created.error();
+
+ LOG(INFO) << "Writing framework pid " << pid << " to " << path;
+
+ Try<Nothing> result = os::write(path, pid);
+
+ CHECK(result.isSome())
+ << "Error writing framework pid to disk " << strerror(errno);
+}
+
+
+process::UPID readFrameworkPID(const string& metaRootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId)
+{
+ const string& path = paths::getFrameworkPIDPath(metaRootDir, slaveId,
+ frameworkId);
+
+ Result<string> result = os::read(path);
+
+ if (!result.isSome()) {
+ LOG(WARNING) << "Cannot read framework pid from " << path << " because "
+ << result.isError() ? result.error() : "empty";
+ return process::UPID();
+ }
+
+ LOG(INFO) << "Read framework pid " << result.get() << " from " << path;
+
+ return process::UPID(result.get());
+}
+
+} // namespace state {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
Added: incubator/mesos/trunk/src/slave/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/state.hpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/state.hpp (added)
+++ incubator/mesos/trunk/src/slave/state.hpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SLAVE_STATE_HPP__
+#define __SLAVE_STATE_HPP__
+
+#include "stout/foreach.hpp"
+#include "stout/hashmap.hpp"
+#include "stout/hashset.hpp"
+#include "stout/strings.hpp"
+#include "stout/utils.hpp"
+
+#include "common/type_utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "process/pid.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace state {
+
+// SlaveState stores the information about the frameworks, executors and
+// tasks running on this slave.
+struct SlaveState
+{
+ struct FrameworkState
+ {
+ struct RunState
+ {
+ RunState()
+ : latest(-1) {}
+
+ struct ExecutorState
+ {
+ hashset<TaskID> tasks;
+ };
+
+ hashmap<int, ExecutorState> runs;
+ int latest; // Latest run number.
+ };
+
+ hashmap<ExecutorID, RunState> executors;
+ };
+
+ SlaveID slaveId;
+ std::string slaveMetaDir;
+ hashmap<FrameworkID, FrameworkState> frameworks;
+};
+
+
+// Parses the slave's work directory rooted at 'rootDir' and re-builds the
+// the slave state.
+SlaveState parse(const std::string& rootDir, const SlaveID& slaveId);
+
+
+// TODO(vinod): Re-evaluate the need for these helpers (or genericize them)
+// after StatusUpdateManager is integrated.
+
+// Writes the task information to "taskDir + '/task'".
+void writeTask(Task* task, const std::string& taskDir);
+
+
+// Writes slaveId to the file path returned by getSlaveIDPath(metaRootDir).
+void writeSlaveID(const std::string& metaRootDir, const SlaveID& slaveId);
+
+
+// Reads slaveId from the file path returned by getSlaveIDPath().
+SlaveID readSlaveID(const std::string& metaRootDir);
+
+
+// Writes frameworkPID from the path returned by getFrameworkPIDPath().
+void writeFrameworkPID(const std::string& metaRootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const std::string& pid);
+
+
+// Reads frameworkPID from the path returned by getFrameworkPIDPath().
+process::UPID readFrameworkPID(const std::string& metaRootDir,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId);
+
+} // namespace state {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_STATE_HPP__
Added: incubator/mesos/trunk/src/tests/slave_state_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/slave_state_tests.cpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/slave_state_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/slave_state_tests.cpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "stout/os.hpp"
+#include "stout/strings.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/paths.hpp"
+#include "slave/state.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace state {
+
+using std::string;
+using strings::format;
+
+class SlaveStateFixture: public ::testing::Test
+{
+protected:
+ SlaveStateFixture()
+ {
+ slaveId.set_value("slave1");
+ frameworkId.set_value("framework1");
+ executorId.set_value("executor1");
+ taskId.set_value("task1");
+ run = 0;
+ rootDir = os::mkdtemp().get();
+ }
+
+ virtual ~SlaveStateFixture()
+ {
+ os::rmdir(rootDir);
+ }
+
+ SlaveID slaveId;
+ FrameworkID frameworkId;
+ ExecutorID executorId;
+ TaskID taskId;
+ int run;
+ string rootDir;
+};
+
+
+TEST_F(SlaveStateFixture, CreateExecutorDirectory)
+{
+ const string& result = paths::createUniqueExecutorWorkDirectory(rootDir,
+ slaveId,
+ frameworkId,
+ executorId);
+
+ // Expected directory layout.
+ string dir = rootDir + "/slaves/" + slaveId.value() + "/frameworks/"
+ + frameworkId.value() + "/executors/" + executorId.value()
+ + "/runs/" + stringify(run);
+
+ ASSERT_EQ(dir, result);
+}
+
+
+TEST_F(SlaveStateFixture, format)
+{
+ string dir = rootDir;
+
+ dir += "/slaves/" + slaveId.value();
+ ASSERT_EQ(dir, paths::getSlavePath(rootDir, slaveId));
+
+ dir += "/frameworks/" + frameworkId.value();
+ ASSERT_EQ(dir, paths::getFrameworkPath(rootDir, slaveId, frameworkId));
+
+ dir += "/executors/" + executorId.value();
+ ASSERT_EQ(dir, paths::getExecutorPath(rootDir, slaveId, frameworkId,
+ executorId));
+
+ dir += "/runs/" + stringify(run);
+ ASSERT_EQ(dir, paths::getExecutorRunPath(rootDir, slaveId, frameworkId,
+ executorId,run));
+
+ dir += "/tasks/" + taskId.value();
+ ASSERT_EQ(dir, paths::getTaskPath(rootDir, slaveId, frameworkId, executorId,
+ run, taskId));
+}
+
+
+TEST_F(SlaveStateFixture, parse)
+{
+ // Create some layouts and check if parse works as expected.
+ const string& result = paths::createUniqueExecutorWorkDirectory(rootDir,
+ slaveId,
+ frameworkId,
+ executorId);
+
+ // Write framework pid file.
+ const string& frameworkpidPath = paths::getFrameworkPIDPath(rootDir, slaveId,
+ frameworkId);
+ os::touch(frameworkpidPath);
+
+ // Write process pid files.
+ const string& executorDir = result;
+ ASSERT_TRUE(os::mkdir(executorDir + "/pids").isSome());
+
+ const string& libpidPath = paths::getLibprocessPIDPath(rootDir, slaveId,
+ frameworkId, executorId,
+ run);
+
+ const string& forkedpidPath = paths::getForkedPIDPath(rootDir, slaveId,
+ frameworkId, executorId,
+ run);
+
+ const string& execedpidPath = paths::getExecedPIDPath(rootDir, slaveId,
+ frameworkId, executorId,
+ run);
+
+ os::touch(libpidPath);
+ os::touch(forkedpidPath);
+ os::touch(execedpidPath);
+
+ // Write task and updates files.
+ const string& taskDir = paths::getTaskPath(rootDir, slaveId, frameworkId,
+ executorId, run, taskId);
+
+ ASSERT_TRUE(os::mkdir(taskDir).isSome());
+
+ const string& infoPath = paths::getTaskInfoPath(rootDir, slaveId, frameworkId,
+ executorId, run, taskId);
+
+ const string& updatesPath = paths::getTaskUpdatesPath(rootDir, slaveId,
+ frameworkId, executorId,
+ run, taskId);
+
+ os::touch(infoPath);
+ os::touch(updatesPath);
+
+ SlaveState state = parse(rootDir, slaveId);
+
+ ASSERT_TRUE(state.frameworks.contains(frameworkId));
+ ASSERT_TRUE(state.frameworks[frameworkId].executors.contains(executorId));
+ ASSERT_EQ(0, state.frameworks[frameworkId].executors[executorId].latest);
+ ASSERT_TRUE(
+ state.frameworks[frameworkId].executors[executorId].runs[0].tasks.contains(taskId));
+}
+
+
+TEST_F(SlaveStateFixture, CheckpointSlaveID)
+{
+ writeSlaveID(rootDir, slaveId);
+
+ ASSERT_EQ(slaveId, readSlaveID(rootDir));
+}
+
+
+TEST_F(SlaveStateFixture, CheckpointFrameworkPID)
+{
+ process::UPID upid("random");
+ writeFrameworkPID(rootDir, slaveId, frameworkId, upid);
+
+ ASSERT_EQ(upid, readFrameworkPID(rootDir, slaveId, frameworkId));
+}
+
+} // namespace state {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
Modified: incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp Wed Sep 19 22:21:25 2012
@@ -282,9 +282,13 @@ struct stringify<T, false>
template <typename T>
struct stringify<T, true>
{
- stringify(const T& t) : s(::stringify(t)) {}
+ stringify(const T& _t) : s(::stringify(_t)) {}
const char* get() { return s.c_str(); }
- const std::string& s;
+
+ // NOTE: We need to do the copy here, because the temporary returned by
+ // ::stringify() doesn't outlive the get() call inside strings::format().
+ // TODO(vinod): Figure out a fix for using const ref here.
+ const std::string s;
};
Modified: incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp Wed Sep 19 22:21:25 2012
@@ -5,6 +5,7 @@
#include <errno.h>
#include <fcntl.h>
#include <fts.h>
+#include <glob.h>
#include <libgen.h>
#include <limits.h>
#include <netdb.h>
@@ -26,6 +27,8 @@
#include <sys/utsname.h>
#include <list>
+#include <set>
+#include <sstream>
#include <string>
#include "foreach.hpp"
@@ -183,6 +186,24 @@ inline Try<Nothing> touch(const std::str
}
+// Creates a temporary file under 'root' directory and returns its path.
+inline Try<std::string> mktemp(const std::string& root = "/tmp")
+{
+ const std::string path = root + "/XXXXXX";
+ char* temp = new char[path.size() + 1];
+
+ if (::mktemp(::strncpy(temp, path.c_str(), path.size())) != NULL) {
+ std::string result(temp);
+ delete temp;
+ return result;
+ } else {
+ delete temp;
+ return Try<std::string>::error(
+ std::string("Cannot create temporary file: ") + strerror(errno));
+ }
+}
+
+
// Write out the string to the file at the current fd position.
inline Try<Nothing> write(int fd, const std::string& message)
{
@@ -390,6 +411,22 @@ inline Try<Nothing> mkdir(const std::str
return Nothing();
}
+// Creates a temporary directory under 'root' directory and returns its path.
+inline Try<std::string> mkdtemp(const std::string& root = "/tmp")
+{
+ const std::string path = root + "/XXXXXX";
+ char* temp = new char[path.size() + 1];
+
+ if (::mkdtemp(::strncpy(temp, path.c_str(), path.size())) != NULL) {
+ std::string result(temp);
+ delete temp;
+ return result;
+ } else {
+ delete temp;
+ return Try<std::string>::error(
+ std::string("Cannot create temporary directory: ") + strerror(errno));
+ }
+}
// Recursively deletes a directory akin to: 'rm -r'. Note that this
// function expects an absolute path.
@@ -725,6 +762,33 @@ inline Try<int> shell(std::ostream* os,
}
+// Returns the list of files that match the given (shell) pattern.
+inline Try<std::list<std::string> > glob(const std::string& pattern)
+{
+ glob_t g;
+ int status = ::glob(pattern.c_str(), GLOB_NOSORT, NULL, &g);
+
+ std::list<std::string> result;
+
+ if (status != 0) {
+ if (status == GLOB_NOMATCH) {
+ return result; // Empty list.
+ } else {
+ return Try<std::list<std::string> >::error(
+ "Error globbing pattern '" + pattern + "':" + strerror(errno));
+ }
+ }
+
+ for (size_t i = 0; i < g.gl_pathc; ++i) {
+ result.push_back(g.gl_pathv[i]);
+ }
+
+ globfree(&g); // Best-effort free of dynamically allocated memory.
+
+ return result;
+}
+
+
inline int system(const std::string& command)
{
return ::system(command.c_str());