You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2014/05/13 02:30:49 UTC
git commit: Updated 'recover' implementation to forward to the ECP.
Repository: mesos
Updated Branches:
refs/heads/master 4f58b7ed3 -> 1594cb878
Updated 'recover' implementation to forward to the ECP.
Review: https://reviews.apache.org/r/21130
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1594cb87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1594cb87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1594cb87
Branch: refs/heads/master
Commit: 1594cb878e0ab4cfffe13671c30b6c319e530458
Parents: 4f58b7e
Author: Till Toenshoff <to...@me.com>
Authored: Mon May 12 16:52:55 2014 -0700
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Mon May 12 17:10:20 2014 -0700
----------------------------------------------------------------------
src/examples/python/test_containerizer.py | 29 ++---
.../containerizer/external_containerizer.cpp | 129 ++++++++++++++++++-
.../containerizer/external_containerizer.hpp | 19 ++-
3 files changed, 146 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1594cb87/src/examples/python/test_containerizer.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_containerizer.py b/src/examples/python/test_containerizer.py
index 7d1d4b3..c65d891 100644
--- a/src/examples/python/test_containerizer.py
+++ b/src/examples/python/test_containerizer.py
@@ -25,14 +25,12 @@
# usage < Usage > ResourceStatistics
# wait < Wait > Termination
# destroy < Destroy
+# containers > Containers
+# recover
#
# 'wait' is expected to block until the task command/executor has
# terminated.
-# TODO(tillt): Implement a protocol for external containerizer
-# recovery by defining needed protobuf/s.
-# Currently we exepect to cover recovery entirely on the slave side.
-
import fcntl
import multiprocessing
import os
@@ -257,25 +255,14 @@ def destroy():
return 0
-# Recover the containerized executor.
-# TODO(tillt): It is yet to be defined which data will be forwarded
-# for the external containerizer to be able to recover its internal
-# states. Currently this command is not being invoked.
+# Recover all containerized executors states.
def recover():
- try:
- data = receive()
- if len(data) == 0:
- return 1
- containerId = mesos_pb2.ContainerID()
- containerId.ParseFromString(data)
- except google.protobuf.message.DecodeError:
- print >> sys.stderr, "Could not deserialise ContainerID protobuf."
- return 1
-
- except OSError as e:
- print >> sys.stderr, e.strerror
- return 1
+ # This currently does not try to recover any internal state and
+ # therefore is to be regarded as being not complete.
+ # A complete implementation would attempt to recover all active
+ # containers by deserializing all previously checkpointed
+ # ContainerIDs.
return 0
http://git-wip-us.apache.org/repos/asf/mesos/blob/1594cb87/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index 385dac6..2ff19b1 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -244,9 +244,132 @@ ExternalContainerizerProcess::ExternalContainerizerProcess(
Future<Nothing> ExternalContainerizerProcess::recover(
const Option<state::SlaveState>& state)
{
- // TODO(tillt): Consider forwarding the recover command to the
- // external containerizer. For now, recovery should be entirely
- // covered by the slave itself.
+ LOG(INFO) << "Recovering containerizer";
+
+ // We need a slave state for recovery as otherwise we will not be
+ // able to reconstruct the sandbox of an active container.
+ if (state.isNone()) {
+ LOG(WARNING) << "No slave state available to recover from";
+ return Nothing();
+ }
+
+ // Ask the external containerizer to recover its internal state.
+ Try<Subprocess> invoked = invoke("recover");
+
+ if (invoked.isError()) {
+ return Failure("Recover failed: " + invoked.error());
+ }
+
+ return invoked.get().status()
+ .then(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::_recover,
+ state.get(),
+ lambda::_1));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::_recover(
+ const state::SlaveState& state,
+ const Future<Option<int> >& future)
+{
+ VLOG(1) << "Recover validation callback triggered";
+
+ Option<Error> error = validate(future);
+
+ if (error.isSome()) {
+ return Failure("Recover failed: " + error.get().message);
+ }
+
+ // Gather the active containers from the external containerizer.
+ return containers()
+ .then(defer(
+ PID<ExternalContainerizerProcess>(this),
+ &ExternalContainerizerProcess::__recover,
+ state,
+ lambda::_1));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::__recover(
+ const state::SlaveState& state,
+ const hashset<ContainerID>& containers)
+{
+ VLOG(1) << "Recover continuation triggered";
+
+ foreachvalue (const FrameworkState& framework, state.frameworks) {
+ foreachvalue (const ExecutorState& executor, framework.executors) {
+ if (executor.info.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its info could not be recovered";
+ continue;
+ }
+
+ if (executor.latest.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its latest run could not be recovered";
+ continue;
+ }
+
+ // We are only interested in the latest run of the executor!
+ const ContainerID& containerId = executor.latest.get();
+ Option<RunState> run = executor.runs.get(containerId);
+ CHECK_SOME(run);
+
+ if (run.get().completed) {
+ VLOG(1) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its latest run "
+ << containerId << " is completed";
+ continue;
+ }
+
+ // Containers the external containerizer does not have
+ // information on, should be skipped as their state is not
+ // recoverable.
+ if (!containers.contains(containerId)) {
+ LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because the external containerizer has not "
+ << " identified " << containerId << " as active";
+ continue;
+ }
+
+ // Re-create the sandbox for this container.
+ const string& directory = paths::createExecutorDirectory(
+ flags.work_dir,
+ state.id,
+ framework.id,
+ executor.id,
+ containerId);
+
+ Option<string> user = None();
+ if (flags.switch_user) {
+ // The command (either in form of task or executor command)
+ // can define a specific user to run as. If present, this
+ // precedes the framework user value.
+ if (executor.info.isSome() &&
+ executor.info.get().command().has_user()) {
+ user = executor.info.get().command().user();
+ } else if (framework.info.isSome()) {
+ user = framework.info.get().user();
+ }
+ }
+
+ Sandbox sandbox(directory, user);
+
+ // Collect this container as being active.
+ actives.put(containerId, Owned<Container>(new Container(sandbox)));
+
+ // Assume that this container had been launched, if this proves
+ // to be wrong, the containerizer::Termination delivered by the
+ // subsequent wait invocation will tell us.
+ actives[containerId]->launched.set(Nothing());
+ }
+ }
+
return Nothing();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/1594cb87/src/slave/containerizer/external_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.hpp b/src/slave/containerizer/external_containerizer.hpp
index a955a38..afffff1 100644
--- a/src/slave/containerizer/external_containerizer.hpp
+++ b/src/slave/containerizer/external_containerizer.hpp
@@ -39,7 +39,7 @@ namespace mesos {
namespace internal {
namespace slave {
-// The scheme an external containerizer has to adhere to is;
+// The scheme an external containerizer programs have to adhere to is;
//
// COMMAND < INPUT-PROTO > RESULT-PROTO
//
@@ -49,6 +49,7 @@ namespace slave {
// wait < containerizer::Wait > containerizer::Termination
// destroy < containerizer::Destroy
// containers > containerizer::Containers
+// recover
//
// 'wait' on the external containerizer side is expected to block
// until the task command/executor has terminated.
@@ -65,12 +66,8 @@ namespace slave {
// Check src/examples/python/test_containerizer.py for a rough
// implementation template of this protocol.
-// TODO(tillt): Implement a protocol for external containerizer
-// recovery by defining needed protobuf/s.
-// Currently we expect to cover recovery entirely on the slave side.
-
-// For debugging purposes of an external containerizer, it might be
-// helpful to enable verbose logging on the slave (GLOG_v=2).
+// For debugging purposes of an external containerizer program, it
+// might be helpful to enable verbose logging on the slave (GLOG_v=2).
class ExternalContainerizerProcess;
@@ -205,6 +202,14 @@ private:
// Stores all active containers.
hashmap<ContainerID, process::Owned<Container> > actives;
+ process::Future<Nothing> _recover(
+ const state::SlaveState& state,
+ const process::Future<Option<int> >& future);
+
+ process::Future<Nothing> __recover(
+ const state::SlaveState& state,
+ const hashset<ContainerID>& containers);
+
process::Future<Nothing> _launch(
const ContainerID& containerId,
const process::Future<Option<int> >& future);