You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/09/20 00:21:26 UTC

svn commit: r1387802 - in /incubator/mesos/trunk: src/ src/common/ src/slave/ src/tests/ third_party/libprocess/include/stout/

Author: benh
Date: Wed Sep 19 22:21:25 2012
New Revision: 1387802

URL: http://svn.apache.org/viewvc?rev=1387802&view=rev
Log:
Created slave "state" so that we can record information about running
executors and tasks _across_ slave restarts/upgrades (contributed by
Vinod Kone, https://reviews.apache.org/r/6842).

Added:
    incubator/mesos/trunk/src/common/protobuf_utils.hpp
    incubator/mesos/trunk/src/slave/paths.hpp
    incubator/mesos/trunk/src/slave/state.cpp
    incubator/mesos/trunk/src/slave/state.hpp
    incubator/mesos/trunk/src/tests/slave_state_tests.cpp
Modified:
    incubator/mesos/trunk/src/Makefile.am
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp
    incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp

Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Wed Sep 19 22:21:25 2012
@@ -154,7 +154,7 @@ nodist_libmesos_no_third_party_la_SOURCE
 libmesos_no_third_party_la_SOURCES = sched/sched.cpp local/local.cpp	\
 	master/allocator.cpp master/drf_sorter.cpp			\
 	master/frameworks_manager.cpp master/http.cpp master/master.cpp	\
-	master/slaves_manager.cpp slave/gc.cpp				\
+	master/slaves_manager.cpp slave/gc.cpp slave/state.cpp		\
 	slave/slave.cpp slave/http.cpp slave/isolation_module.cpp	\
 	slave/process_based_isolation_module.cpp slave/reaper.cpp	\
 	launcher/launcher.cpp exec/exec.cpp common/lock.cpp		\
@@ -207,6 +207,7 @@ libmesos_no_third_party_la_SOURCES += co
 	slave/isolation_module.hpp slave/isolation_module_factory.hpp	\
 	slave/cgroups_isolation_module.hpp				\
 	slave/lxc_isolation_module.hpp					\
+	slave/paths.hpp slave/state.hpp					\
 	slave/process_based_isolation_module.hpp slave/reaper.hpp	\
 	slave/slave.hpp slave/solaris_project_isolation_module.hpp	\
 	slave/webui.hpp tests/external_test.hpp				\
@@ -589,7 +590,7 @@ nodist_libjava_la_SOURCES =					\
 BUILT_SOURCES += java/jni/org_apache_mesos_MesosSchedulerDriver.h	\
 	java/jni/org_apache_mesos_MesosExecutorDriver.h			\
 	java/jni/org_apache_mesos_Log.h					\
-	java/jni/org_apache_mesos_state_Variable.h		\
+	java/jni/org_apache_mesos_state_Variable.h			\
 	java/jni/org_apache_mesos_state_ZooKeeperState.h
 
 
@@ -765,6 +766,7 @@ check_PROGRAMS += mesos-tests
 
 mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp			\
 	              tests/master_tests.cpp tests/state_tests.cpp	\
+	              tests/slave_state_tests.cpp			\
 	              tests/gc_tests.cpp				\
 	              tests/resource_offers_tests.cpp			\
 	              tests/fault_tolerance_tests.cpp			\

Added: incubator/mesos/trunk/src/common/protobuf_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/protobuf_utils.hpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/common/protobuf_utils.hpp (added)
+++ incubator/mesos/trunk/src/common/protobuf_utils.hpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROTOBUF_UTILS_HPP__
+#define __PROTOBUF_UTILS_HPP__
+
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/uuid.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "messages/messages.hpp"
+
+namespace mesos {
+namespace internal {
+namespace protobuf {
+
+inline bool isTerminalState(const TaskState& state)
+{
+  return (state == TASK_FINISHED ||
+          state == TASK_FAILED ||
+          state == TASK_KILLED ||
+          state == TASK_LOST);
+}
+
+
+inline StatusUpdate createStatusUpdate(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const TaskID& taskId,
+    const TaskState& state,
+    const ExecutorID& executorId = ExecutorID())
+{
+  StatusUpdate update;
+
+  update.mutable_framework_id()->MergeFrom(frameworkId);
+
+  if (!(executorId == "")) {
+    update.mutable_executor_id()->MergeFrom(executorId);
+  }
+
+  update.mutable_slave_id()->MergeFrom(slaveId);
+  TaskStatus* status = update.mutable_status();
+  status->mutable_task_id()->MergeFrom(taskId);
+  status->set_state(state);
+  update.set_timestamp(::process::Clock::now());
+  update.set_uuid(UUID::random().toBytes());
+
+  return update;
+}
+
+
+inline Task createTask(const TaskInfo& task,
+                       const TaskState& state,
+                       const ExecutorID& executorId,
+                       const FrameworkID& frameworkId)
+{
+  Task t;
+  t.mutable_framework_id()->MergeFrom(frameworkId);
+  t.set_state(state);
+  t.set_name(task.name());
+  t.mutable_task_id()->MergeFrom(task.task_id());
+  t.mutable_slave_id()->MergeFrom(task.slave_id());
+  t.mutable_resources()->MergeFrom(task.resources());
+
+  if (!task.has_command()) {
+    t.mutable_executor_id()->MergeFrom(executorId);
+  }
+
+  return t;
+}
+
+} // namespace protobuf
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROTOBUF_UTILS_HPP__

Added: incubator/mesos/trunk/src/slave/paths.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/paths.hpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/paths.hpp (added)
+++ incubator/mesos/trunk/src/slave/paths.hpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SLAVE_PATHS_HPP__
+#define __SLAVE_PATHS_HPP__
+
+#include <list>
+
+#include "stout/foreach.hpp"
+#include "stout/hashmap.hpp"
+#include "stout/hashset.hpp"
+#include "stout/numify.hpp"
+#include "stout/strings.hpp"
+#include "stout/try.hpp"
+#include "stout/utils.hpp"
+
+#include "common/type_utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "process/pid.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace paths {
+
+// Helper functions to generate paths.
+// Path layout templates.
+const std::string ROOT_PATH = "%s";
+const std::string SLAVEID_PATH = ROOT_PATH + "/slaves/slave.id";
+const std::string SLAVE_PATH = ROOT_PATH + "/slaves/%s";
+const std::string FRAMEWORK_PATH = SLAVE_PATH + "/frameworks/%s";
+const std::string FRAMEWORK_PID_PATH = FRAMEWORK_PATH + "/framework.pid";
+const std::string EXECUTOR_PATH = FRAMEWORK_PATH + "/executors/%s";
+const std::string EXECUTOR_RUN_PATH = EXECUTOR_PATH + "/runs/%s";
+const std::string PIDS_PATH = EXECUTOR_RUN_PATH + "/pids";
+const std::string LIBPROCESS_PID_PATH = PIDS_PATH + "/libprocess.pid";
+const std::string EXECED_PID_PATH = PIDS_PATH + "/execed.pid";
+const std::string FORKED_PID_PATH = PIDS_PATH + "/forked.pid";
+const std::string TASK_PATH = EXECUTOR_RUN_PATH + "/tasks/%s";
+const std::string TASK_INFO_PATH = TASK_PATH + "/info";
+const std::string TASK_UPDATES_PATH = TASK_PATH + "/updates";
+
+
+inline std::string getSlaveIDPath(const std::string& rootDir)
+{
+  return strings::format(SLAVEID_PATH, rootDir).get();
+}
+
+
+inline std::string getSlavePath(const std::string& rootDir,
+                                const SlaveID& slaveId)
+{
+  return strings::format(SLAVE_PATH, rootDir, slaveId).get();
+}
+
+
+inline std::string getFrameworkPath(const std::string& rootDir,
+                                    const SlaveID& slaveId,
+                                    const FrameworkID& frameworkId)
+{
+  return strings::format(FRAMEWORK_PATH, rootDir, slaveId,
+                         frameworkId).get();
+}
+
+
+inline std::string getFrameworkPIDPath(const std::string& rootDir,
+                                       const SlaveID& slaveId,
+                                       const FrameworkID& frameworkId)
+{
+  return strings::format(FRAMEWORK_PID_PATH, rootDir, slaveId,
+                              frameworkId).get();
+}
+
+
+inline std::string getExecutorPath(const std::string& rootDir,
+                                   const SlaveID& slaveId,
+                                   const FrameworkID& frameworkId,
+                                   const ExecutorID& executorId)
+{
+  return strings::format(EXECUTOR_PATH, rootDir, slaveId, frameworkId,
+                         executorId).get();
+}
+
+
+inline std::string getExecutorRunPath(const std::string& rootDir,
+                                      const SlaveID& slaveId,
+                                      const FrameworkID& frameworkId,
+                                      const ExecutorID& executorId,
+                                      int run)
+{
+  return strings::format(EXECUTOR_RUN_PATH, rootDir, slaveId, frameworkId,
+                         executorId, stringify(run)).get();
+}
+
+
+inline std::string getLibprocessPIDPath(const std::string& rootDir,
+                                        const SlaveID& slaveId,
+                                        const FrameworkID& frameworkId,
+                                        const ExecutorID& executorId,
+                                        int run)
+{
+  return strings::format(LIBPROCESS_PID_PATH, rootDir, slaveId, frameworkId,
+                         executorId, stringify(run)).get();
+}
+
+
+inline std::string getExecedPIDPath(const std::string& rootDir,
+                                    const SlaveID& slaveId,
+                                    const FrameworkID& frameworkId,
+                                    const ExecutorID& executorId,
+                                    int run)
+{
+  return strings::format(EXECED_PID_PATH, rootDir, slaveId, frameworkId,
+                         executorId, stringify(run)).get();
+}
+
+
+inline std::string getForkedPIDPath(const std::string& rootDir,
+                                    const SlaveID& slaveId,
+                                    const FrameworkID& frameworkId,
+                                    const ExecutorID& executorId,
+                                    int run)
+{
+  return strings::format(FORKED_PID_PATH, rootDir, slaveId, frameworkId,
+                         executorId, stringify(run)).get();
+}
+
+
+inline std::string getTaskPath(const std::string& rootDir,
+                               const SlaveID& slaveId,
+                               const FrameworkID& frameworkId,
+                               const ExecutorID& executorId,
+                               int run,
+                               const TaskID& taskId)
+{
+  return strings::format(TASK_PATH, rootDir, slaveId, frameworkId, executorId,
+                         stringify(run), taskId).get();
+}
+
+
+inline std::string getTaskInfoPath(const std::string& rootDir,
+                                   const SlaveID& slaveId,
+                                   const FrameworkID& frameworkId,
+                                   const ExecutorID& executorId,
+                                   int run,
+                                   const TaskID& taskId)
+{
+  return strings::format(TASK_PATH, rootDir, slaveId, frameworkId, executorId,
+                         stringify(run), taskId).get();
+}
+
+
+inline std::string getTaskUpdatesPath(const std::string& rootDir,
+                                      const SlaveID& slaveId,
+                                      const FrameworkID& frameworkId,
+                                      const ExecutorID& executorId,
+                                      int run,
+                                      const TaskID& taskId)
+{
+  return strings::format(TASK_PATH, rootDir, slaveId, frameworkId, executorId,
+                         stringify(run), taskId).get();
+}
+
+
+inline std::string createUniqueExecutorWorkDirectory(
+    const std::string& rootDir,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId)
+{
+  LOG(INFO) << "Generating a unique work directory for executor '" << executorId
+            << "' of framework " << frameworkId << " on slave " << slaveId;
+
+  // Find a unique directory based on the path given by the slave
+  // (this is because we might launch multiple executors from the same
+  // framework on this slave).
+  int run = -1;
+
+  Try<std::list<std::string> > paths = os::glob(
+      strings::format(EXECUTOR_RUN_PATH, rootDir, slaveId, frameworkId,
+                      executorId, "*").get());
+
+  CHECK(!paths.isError()) << paths.error();
+
+  if (paths.isSome()) {
+    foreach (const std::string& path, paths.get()) {
+      Try<int> temp = numify<int>(os::basename(path));
+      if (temp.isError()) {
+        continue;
+      }
+      run = std::max(run, temp.get());
+    }
+  }
+
+  std::string path =
+    getExecutorRunPath(rootDir, slaveId, frameworkId, executorId, run + 1);
+
+  Try<Nothing> created = os::mkdir(path);
+
+  CHECK(created.isSome())
+    << "Error creating directory '" << path << "': " << created.error();
+
+  return path;
+}
+
+} // namespace paths {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_PATHS_HPP__

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Sep 19 22:21:25 2012
@@ -38,9 +38,11 @@
 #include <stout/utils.hpp>
 
 #include "common/build.hpp"
+#include "common/protobuf_utils.hpp"
 #include "common/type_utils.hpp"
 
 #include "slave/flags.hpp"
+#include "slave/paths.hpp"
 #include "slave/slave.hpp"
 
 namespace params = std::tr1::placeholders;
@@ -57,42 +59,6 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-// // Represents a pending status update that has been sent and we are
-// // waiting for an acknowledgement. In pa
-
-// // stream of status updates for a framework/task. Note
-// // that these are stored in the slave rather than per Framework
-// // because a framework might go away before all of the status
-// // updates have been sent and acknowledged.
-// struct Slave::StatusUpdateStream
-// {
-//   StatusUpdateStreamID streamId;
-//   string directory;
-//   FILE* updates;
-//   FILE* acknowledged;
-//   queue<StatusUpdate> pending;
-//   double timeout;
-// };
-
-
-//   StatusUpdateStreamID id;
-
-
-
-//   queue<StatusUpdate> pending;
-//   double timeout;
-// };
-
-// Helper function that returns true if the task state is terminal
-bool isTerminalTaskState(TaskState state)
-{
-  return state == TASK_FINISHED ||
-    state == TASK_FAILED ||
-    state == TASK_KILLED ||
-    state == TASK_LOST;
-}
-
-
 Slave::Slave(const Resources& _resources,
              bool _local,
              IsolationModule* _isolationModule,
@@ -559,7 +525,8 @@ void Slave::runTask(const FrameworkInfo&
   } else {
     // Launch an executor for this task.
     const string& directory =
-      createUniqueWorkDirectory(framework->id, executorId);
+        paths::createUniqueExecutorWorkDirectory(flags.work_dir, id,
+                                                 framework->id, executorId);
 
     LOG(INFO) << "Using '" << directory
               << "' as work directory for executor '" << executorId
@@ -754,77 +721,6 @@ void Slave::statusUpdateAcknowledgement(
 }
 
 
-// void Slave::statusUpdateAcknowledged(const SlaveID& slaveId,
-//                                      const FrameworkID& frameworkId,
-//                                      const TaskID& taskId,
-//                                      uint32_t sequence)
-// {
-//   StatusUpdateStreamID id(frameworkId, taskId);
-//   StatusUpdateStream* stream = getStatusUpdateStream(id);
-
-//   if (stream == NULL) {
-//     LOG(WARNING) << "WARNING! Received unexpected status update"
-//                  << " acknowledgement for task " << taskId
-//                  << " of framework " << frameworkId;
-//     return;
-//   }
-
-//   CHECK(!stream->pending.empty());
-
-//   const StatusUpdate& update = stream->pending.front();
-
-//   if (update->sequence() != sequence) {
-//     LOG(WARNING) << "WARNING! Received status update acknowledgement"
-//                  << " with bad sequence number (received " << sequence
-//                  << ", expecting " << update->sequence()
-//                  << ") for task " << taskId
-//                  << " of framework " << frameworkId;
-//   } else {
-//     LOG(INFO) << "Received status update acknowledgement for task "
-//               << taskId << " of framework " << frameworkId;
-
-//     // Write the update out to disk.
-//     CHECK(stream->acknowledged != NULL);
-
-//     Try<bool> result =
-//       utils::protobuf::write(stream->acknowledged, update);
-
-//     if (result.isError()) {
-//       // Failing here is rather dramatic, but so is not being able to
-//       // write to disk ... seems like failing early and often might do
-//       // more benefit than harm.
-//       LOG(FATAL) << "Failed to write status update to "
-//                  << stream->directory << "/acknowledged: "
-//                  << result.message();
-//     }
-
-//     stream->pending.pop();
-
-//     bool empty = stream->pending.empty();
-
-//     bool terminal =
-//       update.status().state() == TASK_FINISHED &&
-//       update.status().state() == TASK_FAILED &&
-//       update.status().state() == TASK_KILLED &&
-//       update.status().state() == TASK_LOST;
-
-//     if (empty && terminal) {
-//       cleanupStatusUpdateStream(stream);
-//     } else if (!empty && terminal) {
-//       LOG(WARNING) << "WARNING! Acknowledged a \"terminal\""
-//                    << " task status but updates are still pending";
-//     } else if (!empty) {
-//       StatusUpdateMessage message;
-//       message.mutable_update()->MergeFrom(stream->pending.front());
-//       message.set_reliable(true);
-//       send(master, message);
-
-//       stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
-//     }
-//   }
-// }
-
-
 void Slave::registerExecutor(const FrameworkID& frameworkId,
                              const ExecutorID& executorId)
 {
@@ -904,126 +800,6 @@ void Slave::registerExecutor(const Frame
 }
 
 
-// void Slave::statusUpdate(const StatusUpdate& update)
-// {
-//   LOG(INFO) << "Received update that task " << update.status().task_id()
-//             << " of framework " << update.framework_id()
-//             << " is now in state " << update.status().state();
-
-//   Framework* framework = getFramework(update.framework_id());
-//   if (framework == NULL) {
-//     LOG(WARNING) << "WARNING! Failed to lookup"
-//                  << " framework " << update.framework_id()
-//                  << " of received status update";
-//     stats.invalidStatusUpdates++;
-//     return;
-//   }
-
-//   Executor* executor = framework->getExecutor(update.status().task_id());
-//   if (executor == NULL) {
-//     LOG(WARNING) << "WARNING! Failed to lookup executor"
-//                  << " for framework " << update.framework_id()
-//                  << " of received status update";
-//     stats.invalidStatusUpdates++;
-//     return;
-//   }
-
-//   // Create/Get the status update stream for this framework/task.
-//   StatusUpdateStreamID id(update.framework_id(), update.status().task_id());
-
-//   if (!statusUpdateStreams.contains(id)) {
-//     StatusUpdateStream* stream =
-//       createStatusUpdateStream(id, executor->directory);
-
-//     if (stream == NULL) {
-//       LOG(WARNING) << "WARNING! Failed to create status update"
-//                    << " stream for task " << update.status().task_id()
-//                    << " of framework " << update.framework_id()
-//                    << " ... removing executor!";
-//       removeExecutor(framework, executor);
-//       return;
-//     }
-//   }
-
-//   StatusUpdateStream* stream = getStatusUpdateStream(id);
-
-//   CHECK(stream != NULL);
-
-//   // If we are already waiting on an acknowledgement, check that this
-//   // update (coming from the executor), is the same one that we are
-//   // waiting on being acknowledged.
-
-//   // Check that this is status update has not already been
-//   // acknowledged. this could happen because a slave writes the
-//   // acknowledged message but then fails before it can pass the
-//   // message on to the executor, so the executor tries again.
-
-//   returnhere;
-
-//   // TODO(benh): Check that this update hasn't already been received
-//   // or acknowledged! This could happen if a slave receives a status
-//   // update from an executor, then crashes after it writes it to disk
-//   // but before it sends an ack back to
-
-//   // Okay, record this update as received.
-//   CHECK(stream->received != NULL);
-
-//   Result<bool> result =
-//     utils::protobuf::write(stream->received, &update);
-
-//   if (result.isError()) {
-//     // Failing here is rather dramatic, but so is not being able to
-//     // write to disk ... seems like failing early and often might do
-//     // more benefit than harm.
-//     LOG(FATAL) << "Failed to write status update to "
-//                << stream->directory << "/received: "
-//                << result.message();
-//   }
-
-//   // Now acknowledge the executor.
-//   StatusUpdateAcknowledgementMessage message;
-//   message.mutable_framework_id()->MergeFrom(update.framework_id());
-//   message.mutable_slave_id()->MergeFrom(update.slave_id());
-//   message.mutable_task_id()->MergeFrom(update.status().task_id());
-//   send(executor->pid, message);
-
-//   executor->updateTaskState(
-//       update.status().task_id(),
-//       update.status().state());
-
-//   // Remove the task if it's reached a terminal state.
-//   bool terminal =
-//     update.status().state() == TASK_FINISHED &&
-//     update.status().state() == TASK_FAILED &&
-//     update.status().state() == TASK_KILLED &&
-//     update.status().state() == TASK_LOST;
-
-//   if (terminal) {
-//     executor->removeTask(update.status().task_id());
-//     isolationModule->resourcesChanged(
-//         framework->id, framework->info,
-//         executor->info, executor->resources);
-//   }
-
-//   stream->pending.push(update);
-
-//   // Send the status update if this is the first in the
-//   // stream. Subsequent status updates will get sent in
-//   // Slave::statusUpdateAcknowledged.
-//   if (stream->pending.size() == 1) {
-//     CHECK(stream->timeout == -1);
-//     StatusUpdateMessage message;
-//     message.mutable_update()->MergeFrom(update);
-//     message.set_reliable(true);
-//     send(master, message);
-
-//     stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
-//   }
-
-//   stats.tasks[status.state()]++;
-//   stats.validStatusUpdates++;
-// }
-
 void Slave::statusUpdate(const StatusUpdate& update)
 {
   const TaskStatus& status = update.status();
@@ -1039,7 +815,7 @@ void Slave::statusUpdate(const StatusUpd
       executor->updateTaskState(status.task_id(), status.state());
 
       // Handle the task appropriately if it's terminated.
-      if (isTerminalTaskState(status.state())) {
+      if (protobuf::isTerminalState(status.state())) {
         executor->removeTask(status.task_id());
 
         dispatch(isolationModule,
@@ -1140,32 +916,6 @@ void Slave::statusUpdateTimeout(
 }
 
 
-// void Slave::timeout()
-// {
-//   // Check and see if we should re-send any status updates.
-//   double now = Clock::now();
-
-//   foreachvalue (StatusUpdateStream* stream, statusUpdateStreams) {
-//     CHECK(stream->timeout > 0);
-//     if (stream->timeout < now) {
-//       CHECK(!stream->pending.empty());
-//       const StatusUpdate& update = stream->pending.front();
-
-//       LOG(WARNING) << "WARNING! Resending status update"
-//                 << " for task " << update.status().task_id()
-//                 << " of framework " << update.framework_id();
-
-//       StatusUpdateMessage message;
-//       message.mutable_update()->MergeFrom(update);
-//       message.set_reliable(true);
-//       send(master, message);
-
-//       stream->timeout = now + STATUS_UPDATE_RETRY_INTERVAL;
-//     }
-//   }
-// }
-
-
 void Slave::exited(const UPID& pid)
 {
   LOG(INFO) << "Process exited: " << from;
@@ -1188,174 +938,6 @@ Framework* Slave::getFramework(const Fra
 }
 
 
-// StatusUpdates* Slave::getStatusUpdateStream(const StatusUpdateStreamID& id)
-// {
-//   if (statusUpdateStreams.contains(id)) {
-//     return statusUpdateStreams[id];
-//   }
-
-//   return NULL;
-// }
-
-
-// StatusUpdateStream* Slave::createStatusUpdateStream(
-//     const FrameworkID& frameworkId,
-//     const TaskID& taskId,
-//     const string& directory)
-// {
-//   StatusUpdateStream* stream = new StatusUpdates();
-//   stream->id = id;
-//   stream->directory = directory;
-//   stream->received = NULL;
-//   stream->acknowledged = NULL;
-//   stream->timeout = -1;
-
-//   streams[id] = stream;
-
-//   // Open file descriptors for "updates" and "acknowledged".
-//   string path;
-//   Result<int> result;
-
-//   path = stream->directory + "/received";
-//   result = utils::os::open(path, O_CREAT | O_RDWR | O_SYNC);
-//   if (result.isError() || result.isNone()) {
-//     LOG(WARNING) << "Failed to open " << path
-//                  << " for storing received status updates";
-//     cleanupStatusUpdateStream(stream);
-//     return NULL;
-//   }
-
-//   stream->received = result.get();
-
-//   path = updates->directory + "/acknowledged";
-//   result = utils::os::open(path, O_CREAT | O_RDWR | O_SYNC);
-//   if (result.isError() || result.isNone()) {
-//     LOG(WARNING) << "Failed to open " << path <<
-//                  << " for storing acknowledged status updates";
-//     cleanupStatusUpdateStream(stream);
-//     return NULL;
-//   }
-
-//   stream->acknowledged = result.get();
-
-//   // Replay the status updates. This is necessary because the slave
-//   // might have crashed but was restarted before the executors
-//   // died. Or another task with the same id as before got run again on
-//   // the same executor.
-//   bool replayed = replayStatusUpdateStream(stream);
-
-//   if (!replayed) {
-//     LOG(WARNING) << "Failed to correctly replay status updates"
-//                  << " for task " << taskId
-//                  << " of framework " << frameworkId
-//                  << " found at " << path;
-//     cleanupStatusUpdateStream(stream);
-//     return NULL;
-//   }
-
-//   // Start sending any pending status updates. In this case, the slave
-//   // probably died after it sent the status update and never received
-//   // the acknowledgement.
-//   if (!stream->pending.empty()) {
-//     StatusUpdate* update = stream->pending.front();
-//     StatusUpdateMessage message;
-//     message.mutable_update()->MergeFrom(*update);
-//     message.set_reliable(true);
-//     send(master, message);
-
-//     stream->timeout = Clock::now() + STATUS_UPDATE_RETRY_INTERVAL;
-//   }
-
-//   return stream;
-// }
-
-
-// bool Slave::replayStatusUpdateStream(StatusUpdateStream* stream)
-// {
-//   CHECK(stream->received != NULL);
-//   CHECK(stream->acknowledged != NULL);
-
-//   Result<StatusUpdate*> result;
-
-//   // Okay, now read all the recevied status updates.
-//   hashmap<uint32_t, StatusUpdate> pending;
-
-//   result = utils::protobuf::read(stream->received);
-//   while (result.isSome()) {
-//     StatusUpdate* update = result.get();
-//     CHECK(!pending.contains(update->sequence()));
-//     pending[update->sequence()] = *update;
-//     delete update;
-//     result = utils::protobuf::read(stream->received);
-//   }
-
-//   if (result.isError()) {
-//     return false;
-//   }
-
-//   CHECK(result.isNone());
-
-//   LOG(INFO) << "Recovered " << pending.size()
-//             << " TOTAL status updates for task "
-//             << stream->id.second << " of framework "
-//             << stream->id.first;
-
-//   // Okay, now get all the acknowledged status updates.
-//   result = utils::protobuf::read(stream->acknowledged);
-//   while (result.isSome()) {
-//     StatusUpdate* update = result.get();
-//     stream->sequence = std::max(stream->sequence, update->sequence());
-//     CHECK(pending.contains(update->sequence()));
-//     pending.erase(update->sequence());
-//     delete update;
-//     result = utils::protobuf::read(stream->acknowledged);
-//   }
-
-//   if (result.isError()) {
-//     return false;
-//   }
-
-//   CHECK(result.isNone());
-
-//   LOG(INFO) << "Recovered " << pending.size()
-//             << " PENDING status updates for task "
-//             << stream->id.second << " of framework "
-//             << stream->id.first;
-
-//   // Add the pending status updates in sorted order.
-//   uint32_t sequence = 0;
-
-//   while (!pending.empty()) {
-//     // Find the smallest sequence number.
-//     foreachvalue (const StatusUpdate& update, pending) {
-//       sequence = std::min(sequence, update.sequence());
-//     }
-
-//     // Push that update and remove it from pending.
-//     stream->pending.push(pending[sequence]);
-//     pending.erase(sequence);
-//   }
-
-//   return true;
-// }
-
-
-// void Slave::cleanupStatusUpdateStream(StatusUpdateStream* stream)
-// {
-//   if (stream->received != NULL) {
-//     fclose(stream->received);
-//   }
-
-//   if (stream->acknowledged != NULL) {
-//     fclose(stream->acknowledged);
-//   }
-
-//   streams.erase(stream->id);
-
-//   delete stream;
-// }
-
-
 // N.B. When the slave is running in "local" mode then the pid is
 // uninteresting (and possibly could cause bugs).
 void Slave::executorStarted(const FrameworkID& frameworkId,
@@ -1451,7 +1033,7 @@ void Slave::executorExited(const Framewo
 
   // Transition all live tasks to TASK_LOST/TASK_FAILED.
   foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
-    if (!isTerminalTaskState(task->state())) {
+    if (!protobuf::isTerminalState(task->state())) {
       isCommandExecutor = !task->has_executor_id();
 
       transitionLiveTask(task->task_id(),
@@ -1543,58 +1125,6 @@ void Slave::shutdownExecutorTimeout(cons
 }
 
 
-// void Slave::recover()
-// {
-//   // if we find an executor that is no longer running and it's last
-//   // acknowledged task statuses are not terminal, create a
-//   // statusupdatestream for each task and try and reliably send
-//   // TASK_LOST updates.
-
-//   // otherwise once we reconnect the executor will just start sending
-//   // us status updates that we need to send, wait for ack, write to
-//   // disk, and then respond.
-// }
-
-
-string Slave::createUniqueWorkDirectory(const FrameworkID& frameworkId,
-                                        const ExecutorID& executorId)
-{
-  LOG(INFO) << "Generating a unique work directory for executor '"
-            << executorId << "' of framework " << frameworkId;
-
-  std::ostringstream out(std::ios_base::app | std::ios_base::out);
-  out << flags.work_dir
-      << "/slaves/" << id
-      << "/frameworks/" << frameworkId
-      << "/executors/" << executorId;
-
-  // Find a unique directory based on the path given by the slave
-  // (this is because we might launch multiple executors from the same
-  // framework on this slave).
-  // NOTE: The run number of the new directory will be the highest of
-  // all the existing run directories for this executor.
-  out << "/runs/";
-
-  int maxrun = 0;
-  foreach (const string& runStr, os::ls(out.str())) {
-    Try<int> run = numify<int>(runStr);
-    if (run.isError()) {
-      LOG(ERROR) << "Ignoring invalid run directory " << runStr;
-      continue;
-    }
-
-    maxrun = std::max(maxrun, run.get());
-  }
-  out << maxrun;
-
-  Try<Nothing> mkdir = os::mkdir(out.str());
-  CHECK(mkdir.isSome()) << "Error creating work directory '"
-                        << out.str() << "': " << mkdir.error();
-
-  return out.str();
-}
-
-
 // TODO(vinod): Figure out a way to express this function via cmd line.
 Duration Slave::age(double usage)
 {
@@ -1606,7 +1136,7 @@ void Slave::checkDiskUsage()
 {
   VLOG(1) << "Checking disk usage";
 
-  // TODO(vinod): We are making usage a Future, so that we can plug-in
+  // TODO(vinod): We are making usage a Future, so that we can plug in
   // os::usage() into async.
   Future<Try<double> > usage = os::usage();
 

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Sep 19 22:21:25 2012
@@ -33,6 +33,7 @@
 #include "slave/gc.hpp"
 #include "slave/http.hpp"
 #include "slave/isolation_module.hpp"
+#include "slave/state.hpp"
 
 #include "common/attributes.hpp"
 #include "common/resources.hpp"
@@ -155,19 +156,6 @@ protected:
                                const ExecutorID& executorId,
                                const UUID& uuid);
 
-//   // Create a new status update stream.
-//   StatusUpdates* createStatusUpdateStream(const StatusUpdateStreamID& streamId,
-//                                           const string& directory);
-
-//   StatusUpdates* getStatusUpdateStream(const StatusUpdateStreamID& streamId);
-
-  // Helper function for generating a unique work directory for this
-  // framework/executor pair (non-trivial since a framework/executor
-  // pair may be launched more than once on the same slave).
-  std::string createUniqueWorkDirectory(const FrameworkID& frameworkId,
-                                        const ExecutorID& executorId);
-
-
   // This function returns the max age of executor/slave directories allowed,
   // given a disk usage. This value could be used to tune gc.
   Duration age(double usage);
@@ -222,6 +210,8 @@ private:
   bool connected; // Flag to indicate if slave is registered.
 
   GarbageCollector gc;
+
+  state::SlaveState state;
 };
 
 
@@ -350,7 +340,7 @@ struct Framework
 
       // Now determine the path to the executor.
       Try<std::string> path = os::realpath(
-          path::join(flags.launcher_dir, "mesos-executor"));
+          ::path::join(flags.launcher_dir, "mesos-executor"));
 
       if (path.isSome()) {
         executor.mutable_command()->set_value(path.get());

Added: incubator/mesos/trunk/src/slave/state.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/state.cpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/state.cpp (added)
+++ incubator/mesos/trunk/src/slave/state.cpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,214 @@
+#include <glog/logging.h>
+
+#include "stout/foreach.hpp"
+#include "stout/format.hpp"
+#include "stout/numify.hpp"
+#include "stout/os.hpp"
+#include "stout/protobuf.hpp"
+#include "stout/try.hpp"
+
+#include "slave/paths.hpp"
+#include "slave/state.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace state {
+
+using std::list;
+using std::string;
+using std::max;
+
+SlaveState parse(const string& rootDir, const SlaveID& slaveId)
+{
+  SlaveState state;
+
+  const string& slaveDir = paths::getSlavePath(rootDir, slaveId);
+  state.slaveMetaDir = slaveDir;
+
+  // Find the frameworks.
+  Try<list<string> > frameworks = os::glob(
+      strings::format(paths::FRAMEWORK_PATH, rootDir, slaveId, "*").get());
+
+  if (frameworks.isError()) {
+    LOG(ERROR) << "Error finding frameworks for slave " << frameworks.error();
+    return state;
+  }
+
+  foreach (const string& path, frameworks.get()) {
+    FrameworkID frameworkId;
+    frameworkId.set_value(os::basename(path));
+
+    // Find the executors.
+    Try<list<string> > executors =
+        os::glob(strings::format(paths::EXECUTOR_PATH, rootDir, slaveId,
+                                 frameworkId, "*").get());
+
+    if (executors.isError()) {
+      LOG(ERROR) << "Error finding executors for framework "
+                 << executors.error();
+      continue;
+    }
+
+    foreach (const string& path, executors.get()) {
+      ExecutorID executorId;
+      executorId.set_value(os::basename(path));
+
+      // Find the runs.
+      Try<list<string> > runs =
+          os::glob(strings::format(paths::EXECUTOR_RUN_PATH, rootDir, slaveId,
+                                   frameworkId, executorId, "*").get());
+
+      if (runs.isError()) {
+        LOG(ERROR) << "Error finding runs for executor " << runs.error();
+        continue;
+      }
+
+      foreach (const string& path, runs.get()) {
+        Try<int> result = numify<int>(os::basename(path));
+        if (!result.isSome()) {
+          LOG(ERROR) << "Non-numeric run number in path " << path;
+          continue;
+        }
+
+        int run = result.get();
+
+        // Update max run.
+        state.frameworks[frameworkId].executors[executorId].latest =
+            max(run,
+                state.frameworks[frameworkId].executors[executorId].latest);
+
+        // Find the tasks.
+        Try<list<string> > tasks =
+            os::glob(strings::format(paths::TASK_PATH, rootDir, slaveId,
+                                     frameworkId, executorId, stringify(run),
+                                     "*").get());
+
+        if (tasks.isError()) {
+          LOG(WARNING) << "Error finding tasks " << tasks.error();
+          continue;
+        }
+
+        foreach (const string& path, tasks.get()) {
+          TaskID taskId;
+          taskId.set_value(os::basename(path));
+
+          state.frameworks[frameworkId].executors[executorId].runs[run].tasks
+            .insert(taskId);
+        }
+      }
+    }
+  }
+  return state;
+}
+
+
+// Helper functions for check-pointing slave data.
+void writeTask(Task* task, const string& taskDir)
+{
+  const string& path = taskDir + "/task";
+
+  Try<Nothing> created = os::mkdir(os::dirname(path));
+
+  CHECK(created.isSome())
+    << "Error creating directory '" << os::dirname(path)
+    << "': " << created.error();
+
+  LOG(INFO) << "Writing task description for task "
+            << task->task_id() << " to " << path;
+
+  Try<bool> result = protobuf::write(path, *task);
+
+  if (result.isError()) {
+    LOG(FATAL) << "Failed to write task description to disk " << result.error();
+  }
+}
+
+
+void writeSlaveID(const string& rootDir, const SlaveID& slaveId)
+{
+  const string& path = paths::getSlaveIDPath(rootDir);
+
+  Try<Nothing> created = os::mkdir(os::dirname(path));
+
+  CHECK(created.isSome())
+    << "Error creating directory '" << os::dirname(path)
+    << "': " << created.error();
+
+  LOG(INFO) << "Writing slave id " << slaveId << " to " << path;
+
+  Try<Nothing> result = os::write(path, stringify(slaveId));
+
+  CHECK(result.isSome())
+    << "Error writing slave id to disk " << strerror(errno);
+}
+
+
+SlaveID readSlaveID(const string& rootDir)
+{
+  const string& path = paths::getSlaveIDPath(rootDir);
+
+  Result<string> result = os::read(path);
+
+  SlaveID slaveId;
+
+  if (!result.isSome()) {
+    LOG(WARNING) << "Cannot read slave id from " << path << " because "
+                 << result.isError() ? result.error() : "empty";
+    return slaveId;
+  }
+
+  LOG(INFO) << "Read slave id " << result.get() << " from " << path;
+
+  slaveId.set_value(result.get());
+  return slaveId;
+}
+
+
+void writeFrameworkPID(const string& metaRootDir,
+                       const SlaveID& slaveId,
+                       const FrameworkID& frameworkId,
+                       const string& pid)
+{
+  const string& path = paths::getFrameworkPIDPath(metaRootDir, slaveId,
+                                                  frameworkId);
+
+  Try<Nothing> created = os::mkdir(os::dirname(path));
+
+  CHECK(created.isSome())
+    << "Error creating directory '" << os::dirname(path)
+    << "': " << created.error();
+
+  LOG(INFO) << "Writing framework pid " << pid << " to " << path;
+
+  Try<Nothing> result = os::write(path, pid);
+
+  CHECK(result.isSome())
+    << "Error writing framework pid to disk " << strerror(errno);
+}
+
+
+process::UPID readFrameworkPID(const string& metaRootDir,
+                               const SlaveID& slaveId,
+                               const FrameworkID& frameworkId)
+{
+  const string& path = paths::getFrameworkPIDPath(metaRootDir, slaveId,
+                                                  frameworkId);
+
+  Result<string> result = os::read(path);
+
+  if (!result.isSome()) {
+    LOG(WARNING) << "Cannot read framework pid from " << path << " because "
+                 << result.isError() ? result.error() : "empty";
+    return process::UPID();
+  }
+
+  LOG(INFO) << "Read framework pid " << result.get() << " from " << path;
+
+  return process::UPID(result.get());
+}
+
+} // namespace state {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

Added: incubator/mesos/trunk/src/slave/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/state.hpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/slave/state.hpp (added)
+++ incubator/mesos/trunk/src/slave/state.hpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SLAVE_STATE_HPP__
+#define __SLAVE_STATE_HPP__
+
+#include "stout/foreach.hpp"
+#include "stout/hashmap.hpp"
+#include "stout/hashset.hpp"
+#include "stout/strings.hpp"
+#include "stout/utils.hpp"
+
+#include "common/type_utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "process/pid.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace state {
+
+// SlaveState stores the information about the frameworks, executors and
+// tasks running on this slave.
+struct SlaveState
+{
+  struct FrameworkState
+  {
+    struct RunState
+    {
+      RunState()
+        : latest(-1) {}
+
+      struct ExecutorState
+      {
+        hashset<TaskID> tasks;
+      };
+
+      hashmap<int, ExecutorState> runs;
+      int latest; // Latest run number.
+    };
+
+    hashmap<ExecutorID, RunState> executors;
+  };
+
+  SlaveID slaveId;
+  std::string slaveMetaDir;
+  hashmap<FrameworkID, FrameworkState> frameworks;
+};
+
+
+// Parses the slave's work directory rooted at 'rootDir' and re-builds the
+// the slave state.
+SlaveState parse(const std::string& rootDir, const SlaveID& slaveId);
+
+
+// TODO(vinod): Re-evaluate the need for these helpers (or genericize them)
+// after StatusUpdateManager is integrated.
+
+// Writes the task information to "taskDir + '/task'".
+void writeTask(Task* task, const std::string& taskDir);
+
+
+// Writes slaveId to the file path returned by getSlaveIDPath(metaRootDir).
+void writeSlaveID(const std::string& metaRootDir, const SlaveID& slaveId);
+
+
+// Reads slaveId from the file path returned by getSlaveIDPath().
+SlaveID readSlaveID(const std::string& metaRootDir);
+
+
+// Writes frameworkPID from the path returned by getFrameworkPIDPath().
+void writeFrameworkPID(const std::string& metaRootDir,
+                       const SlaveID& slaveId,
+                       const FrameworkID& frameworkId,
+                       const std::string& pid);
+
+
+// Reads frameworkPID from the path returned by getFrameworkPIDPath().
+process::UPID readFrameworkPID(const std::string& metaRootDir,
+                               const SlaveID& slaveId,
+                               const FrameworkID& frameworkId);
+
+} // namespace state {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_STATE_HPP__

Added: incubator/mesos/trunk/src/tests/slave_state_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/slave_state_tests.cpp?rev=1387802&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/slave_state_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/slave_state_tests.cpp Wed Sep 19 22:21:25 2012
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "stout/os.hpp"
+#include "stout/strings.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/paths.hpp"
+#include "slave/state.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace state {
+
+using std::string;
+using strings::format;
+
+class SlaveStateFixture: public ::testing::Test
+{
+protected:
+  SlaveStateFixture()
+  {
+    slaveId.set_value("slave1");
+    frameworkId.set_value("framework1");
+    executorId.set_value("executor1");
+    taskId.set_value("task1");
+    run = 0;
+    rootDir = os::mkdtemp().get();
+  }
+
+  virtual ~SlaveStateFixture()
+  {
+     os::rmdir(rootDir);
+  }
+
+  SlaveID slaveId;
+  FrameworkID frameworkId;
+  ExecutorID executorId;
+  TaskID taskId;
+  int run;
+  string rootDir;
+};
+
+
+TEST_F(SlaveStateFixture, CreateExecutorDirectory)
+{
+  const string& result = paths::createUniqueExecutorWorkDirectory(rootDir,
+                                                                  slaveId,
+                                                                  frameworkId,
+                                                                  executorId);
+
+  // Expected directory layout.
+  string dir = rootDir + "/slaves/" + slaveId.value() + "/frameworks/"
+               + frameworkId.value() + "/executors/" + executorId.value()
+               + "/runs/" + stringify(run);
+
+  ASSERT_EQ(dir, result);
+}
+
+
+TEST_F(SlaveStateFixture, format)
+{
+  string dir = rootDir;
+
+  dir += "/slaves/" + slaveId.value();
+  ASSERT_EQ(dir, paths::getSlavePath(rootDir, slaveId));
+
+  dir += "/frameworks/" + frameworkId.value();
+  ASSERT_EQ(dir, paths::getFrameworkPath(rootDir, slaveId, frameworkId));
+
+  dir += "/executors/" + executorId.value();
+  ASSERT_EQ(dir, paths::getExecutorPath(rootDir, slaveId, frameworkId,
+                                        executorId));
+
+  dir += "/runs/" + stringify(run);
+  ASSERT_EQ(dir, paths::getExecutorRunPath(rootDir, slaveId, frameworkId,
+                                           executorId,run));
+
+  dir += "/tasks/" + taskId.value();
+  ASSERT_EQ(dir, paths::getTaskPath(rootDir, slaveId, frameworkId, executorId,
+                                    run, taskId));
+}
+
+
+TEST_F(SlaveStateFixture, parse)
+{
+  // Create some layouts and check if parse works as expected.
+  const string& result = paths::createUniqueExecutorWorkDirectory(rootDir,
+                                                                  slaveId,
+                                                                  frameworkId,
+                                                                  executorId);
+
+  // Write framework pid file.
+  const string& frameworkpidPath = paths::getFrameworkPIDPath(rootDir, slaveId,
+                                                              frameworkId);
+  os::touch(frameworkpidPath);
+
+  // Write process pid files.
+  const string& executorDir = result;
+  ASSERT_TRUE(os::mkdir(executorDir + "/pids").isSome());
+
+  const string& libpidPath = paths::getLibprocessPIDPath(rootDir, slaveId,
+                                                         frameworkId, executorId,
+                                                         run);
+
+  const string& forkedpidPath = paths::getForkedPIDPath(rootDir, slaveId,
+                                                        frameworkId, executorId,
+                                                        run);
+
+  const string& execedpidPath = paths::getExecedPIDPath(rootDir, slaveId,
+                                                        frameworkId, executorId,
+                                                        run);
+
+  os::touch(libpidPath);
+  os::touch(forkedpidPath);
+  os::touch(execedpidPath);
+
+  // Write task and updates files.
+  const string& taskDir = paths::getTaskPath(rootDir, slaveId, frameworkId,
+                                             executorId, run, taskId);
+
+  ASSERT_TRUE(os::mkdir(taskDir).isSome());
+
+  const string& infoPath = paths::getTaskInfoPath(rootDir, slaveId, frameworkId,
+                                                  executorId, run, taskId);
+
+  const string& updatesPath = paths::getTaskUpdatesPath(rootDir, slaveId,
+                                                        frameworkId, executorId,
+                                                        run, taskId);
+
+  os::touch(infoPath);
+  os::touch(updatesPath);
+
+  SlaveState state = parse(rootDir, slaveId);
+
+  ASSERT_TRUE(state.frameworks.contains(frameworkId));
+  ASSERT_TRUE(state.frameworks[frameworkId].executors.contains(executorId));
+  ASSERT_EQ(0, state.frameworks[frameworkId].executors[executorId].latest);
+  ASSERT_TRUE(
+      state.frameworks[frameworkId].executors[executorId].runs[0].tasks.contains(taskId));
+}
+
+
+TEST_F(SlaveStateFixture, CheckpointSlaveID)
+{
+  writeSlaveID(rootDir, slaveId);
+
+  ASSERT_EQ(slaveId, readSlaveID(rootDir));
+}
+
+
+TEST_F(SlaveStateFixture, CheckpointFrameworkPID)
+{
+  process::UPID upid("random");
+  writeFrameworkPID(rootDir, slaveId, frameworkId, upid);
+
+  ASSERT_EQ(upid, readFrameworkPID(rootDir, slaveId, frameworkId));
+}
+
+} // namespace state {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

Modified: incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/stout/format.hpp Wed Sep 19 22:21:25 2012
@@ -282,9 +282,13 @@ struct stringify<T, false>
 template <typename T>
 struct stringify<T, true>
 {
-  stringify(const T& t) : s(::stringify(t)) {}
+  stringify(const T& _t) : s(::stringify(_t)) {}
   const char* get() { return s.c_str(); }
-  const std::string& s;
+
+  // NOTE: We need to do the copy here, because the temporary returned by
+  // ::stringify() doesn't outlive the get() call inside strings::format().
+  // TODO(vinod): Figure out a fix for using const ref here.
+  const std::string s;
 };
 
 

Modified: incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp?rev=1387802&r1=1387801&r2=1387802&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/stout/os.hpp Wed Sep 19 22:21:25 2012
@@ -5,6 +5,7 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <fts.h>
+#include <glob.h>
 #include <libgen.h>
 #include <limits.h>
 #include <netdb.h>
@@ -26,6 +27,8 @@
 #include <sys/utsname.h>
 
 #include <list>
+#include <set>
+#include <sstream>
 #include <string>
 
 #include "foreach.hpp"
@@ -183,6 +186,24 @@ inline Try<Nothing> touch(const std::str
 }
 
 
+// Creates a temporary file under 'root' directory and returns its path.
+inline Try<std::string> mktemp(const std::string& root = "/tmp")
+{
+  const std::string path = root + "/XXXXXX";
+  char* temp = new char[path.size() + 1];
+
+  if (::mktemp(::strncpy(temp, path.c_str(), path.size())) != NULL) {
+    std::string result(temp);
+    delete temp;
+    return result;
+  } else {
+    delete temp;
+    return Try<std::string>::error(
+        std::string("Cannot create temporary file: ") + strerror(errno));
+  }
+}
+
+
 // Write out the string to the file at the current fd position.
 inline Try<Nothing> write(int fd, const std::string& message)
 {
@@ -390,6 +411,22 @@ inline Try<Nothing> mkdir(const std::str
   return Nothing();
 }
 
+// Creates a temporary directory under 'root' directory and returns its path.
+inline Try<std::string> mkdtemp(const std::string& root = "/tmp")
+{
+  const std::string path = root + "/XXXXXX";
+  char* temp = new char[path.size() + 1];
+
+  if (::mkdtemp(::strncpy(temp, path.c_str(), path.size())) != NULL) {
+    std::string result(temp);
+    delete temp;
+    return result;
+  } else {
+    delete temp;
+    return Try<std::string>::error(
+        std::string("Cannot create temporary directory: ") + strerror(errno));
+  }
+}
 
 // Recursively deletes a directory akin to: 'rm -r'. Note that this
 // function expects an absolute path.
@@ -725,6 +762,33 @@ inline Try<int> shell(std::ostream* os, 
 }
 
 
+// Returns the list of files that match the given (shell) pattern.
+inline Try<std::list<std::string> > glob(const std::string& pattern)
+{
+  glob_t g;
+  int status = ::glob(pattern.c_str(), GLOB_NOSORT, NULL, &g);
+
+  std::list<std::string> result;
+
+  if (status != 0) {
+    if (status == GLOB_NOMATCH) {
+      return result; // Empty list.
+    } else {
+      return Try<std::list<std::string> >::error(
+          "Error globbing pattern '" + pattern + "':" + strerror(errno));
+    }
+  }
+
+  for (size_t i = 0; i < g.gl_pathc; ++i) {
+    result.push_back(g.gl_pathv[i]);
+  }
+
+  globfree(&g); // Best-effort free of dynamically allocated memory.
+
+  return result;
+}
+
+
 inline int system(const std::string& command)
 {
   return ::system(command.c_str());