You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2014/05/13 02:30:49 UTC

git commit: Updated 'recover' implementation to forward to the ECP.

Repository: mesos
Updated Branches:
  refs/heads/master 4f58b7ed3 -> 1594cb878


Updated 'recover' implementation to forward to the ECP.

Review: https://reviews.apache.org/r/21130


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1594cb87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1594cb87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1594cb87

Branch: refs/heads/master
Commit: 1594cb878e0ab4cfffe13671c30b6c319e530458
Parents: 4f58b7e
Author: Till Toenshoff <to...@me.com>
Authored: Mon May 12 16:52:55 2014 -0700
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Mon May 12 17:10:20 2014 -0700

----------------------------------------------------------------------
 src/examples/python/test_containerizer.py       |  29 ++---
 .../containerizer/external_containerizer.cpp    | 129 ++++++++++++++++++-
 .../containerizer/external_containerizer.hpp    |  19 ++-
 3 files changed, 146 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1594cb87/src/examples/python/test_containerizer.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_containerizer.py b/src/examples/python/test_containerizer.py
index 7d1d4b3..c65d891 100644
--- a/src/examples/python/test_containerizer.py
+++ b/src/examples/python/test_containerizer.py
@@ -25,14 +25,12 @@
 # usage < Usage > ResourceStatistics
 # wait < Wait > Termination
 # destroy < Destroy
+# containers > Containers
+# recover
 #
 # 'wait' is expected to block until the task command/executor has
 # terminated.
 
-# TODO(tillt): Implement a protocol for external containerizer
-# recovery by defining needed protobuf/s.
-# Currently we exepect to cover recovery entirely on the slave side.
-
 import fcntl
 import multiprocessing
 import os
@@ -257,25 +255,14 @@ def destroy():
     return 0
 
 
-# Recover the containerized executor.
-# TODO(tillt): It is yet to be defined which data will be forwarded
-# for the external containerizer to be able to recover its internal
-# states. Currently this command is not being invoked.
+# Recover all containerized executors states.
 def recover():
-    try:
-        data = receive()
-        if len(data) == 0:
-            return 1
-        containerId = mesos_pb2.ContainerID()
-        containerId.ParseFromString(data)
 
-    except google.protobuf.message.DecodeError:
-        print >> sys.stderr, "Could not deserialise ContainerID protobuf."
-        return 1
-
-    except OSError as e:
-        print >> sys.stderr, e.strerror
-        return 1
+    # This currently does not try to recover any internal state and
+    # therefore is to be regarded as being not complete.
+    # A complete implementation would attempt to recover all active
+    # containers by deserializing all previously checkpointed
+    # ContainerIDs.
 
     return 0
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1594cb87/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index 385dac6..2ff19b1 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -244,9 +244,132 @@ ExternalContainerizerProcess::ExternalContainerizerProcess(
 Future<Nothing> ExternalContainerizerProcess::recover(
     const Option<state::SlaveState>& state)
 {
-  // TODO(tillt): Consider forwarding the recover command to the
-  // external containerizer. For now, recovery should be entirely
-  // covered by the slave itself.
+  LOG(INFO) << "Recovering containerizer";
+
+  // We need a slave state for recovery as otherwise we will not be
+  // able to reconstruct the sandbox of an active container.
+  if (state.isNone()) {
+    LOG(WARNING) << "No slave state available to recover from";
+    return Nothing();
+  }
+
+  // Ask the external containerizer to recover its internal state.
+  Try<Subprocess> invoked = invoke("recover");
+
+  if (invoked.isError()) {
+    return Failure("Recover failed: " + invoked.error());
+  }
+
+  return invoked.get().status()
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::_recover,
+        state.get(),
+        lambda::_1));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::_recover(
+    const state::SlaveState& state,
+    const Future<Option<int> >& future)
+{
+  VLOG(1) << "Recover validation callback triggered";
+
+  Option<Error> error = validate(future);
+
+  if (error.isSome()) {
+    return Failure("Recover failed: " + error.get().message);
+  }
+
+  // Gather the active containers from the external containerizer.
+  return containers()
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::__recover,
+        state,
+        lambda::_1));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::__recover(
+    const state::SlaveState& state,
+    const hashset<ContainerID>& containers)
+{
+  VLOG(1) << "Recover continuation triggered";
+
+  foreachvalue (const FrameworkState& framework, state.frameworks) {
+    foreachvalue (const ExecutorState& executor, framework.executors) {
+      if (executor.info.isNone()) {
+        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                     << "' of framework " << framework.id
+                     << " because its info could not be recovered";
+        continue;
+      }
+
+      if (executor.latest.isNone()) {
+        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                     << "' of framework " << framework.id
+                     << " because its latest run could not be recovered";
+        continue;
+      }
+
+      // We are only interested in the latest run of the executor!
+      const ContainerID& containerId = executor.latest.get();
+      Option<RunState> run = executor.runs.get(containerId);
+      CHECK_SOME(run);
+
+      if (run.get().completed) {
+        VLOG(1) << "Skipping recovery of executor '" << executor.id
+                << "' of framework " << framework.id
+                << " because its latest run "
+                << containerId << " is completed";
+        continue;
+      }
+
+      // Containers the external containerizer does not have
+      // information on, should be skipped as their state is not
+      // recoverable.
+      if (!containers.contains(containerId)) {
+        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                     << "' of framework " << framework.id
+                     << " because the external containerizer has not "
+                     << " identified " << containerId << " as active";
+        continue;
+      }
+
+      // Re-create the sandbox for this container.
+      const string& directory = paths::createExecutorDirectory(
+          flags.work_dir,
+          state.id,
+          framework.id,
+          executor.id,
+          containerId);
+
+      Option<string> user = None();
+      if (flags.switch_user) {
+        // The command (either in form of task or executor command)
+        // can define a specific user to run as. If present, this
+        // precedes the framework user value.
+        if (executor.info.isSome() &&
+            executor.info.get().command().has_user()) {
+          user = executor.info.get().command().user();
+        } else if (framework.info.isSome()) {
+          user = framework.info.get().user();
+        }
+      }
+
+      Sandbox sandbox(directory, user);
+
+      // Collect this container as being active.
+      actives.put(containerId, Owned<Container>(new Container(sandbox)));
+
+      // Assume that this container had been launched, if this proves
+      // to be wrong, the containerizer::Termination delivered by the
+      // subsequent wait invocation will tell us.
+      actives[containerId]->launched.set(Nothing());
+    }
+  }
+
   return Nothing();
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1594cb87/src/slave/containerizer/external_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.hpp b/src/slave/containerizer/external_containerizer.hpp
index a955a38..afffff1 100644
--- a/src/slave/containerizer/external_containerizer.hpp
+++ b/src/slave/containerizer/external_containerizer.hpp
@@ -39,7 +39,7 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-// The scheme an external containerizer has to adhere to is;
+// The scheme an external containerizer programs have to adhere to is;
 //
 // COMMAND < INPUT-PROTO > RESULT-PROTO
 //
@@ -49,6 +49,7 @@ namespace slave {
 // wait < containerizer::Wait > containerizer::Termination
 // destroy < containerizer::Destroy
 // containers > containerizer::Containers
+// recover
 //
 // 'wait' on the external containerizer side is expected to block
 // until the task command/executor has terminated.
@@ -65,12 +66,8 @@ namespace slave {
 // Check src/examples/python/test_containerizer.py for a rough
 // implementation template of this protocol.
 
-// TODO(tillt): Implement a protocol for external containerizer
-// recovery by defining needed protobuf/s.
-// Currently we expect to cover recovery entirely on the slave side.
-
-// For debugging purposes of an external containerizer, it might be
-// helpful to enable verbose logging on the slave (GLOG_v=2).
+// For debugging purposes of an external containerizer program, it
+// might be helpful to enable verbose logging on the slave (GLOG_v=2).
 
 class ExternalContainerizerProcess;
 
@@ -205,6 +202,14 @@ private:
   // Stores all active containers.
   hashmap<ContainerID, process::Owned<Container> > actives;
 
+  process::Future<Nothing> _recover(
+      const state::SlaveState& state,
+      const process::Future<Option<int> >& future);
+
+  process::Future<Nothing> __recover(
+      const state::SlaveState& state,
+      const hashset<ContainerID>& containers);
+
   process::Future<Nothing> _launch(
       const ContainerID& containerId,
       const process::Future<Option<int> >& future);