You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/25 23:37:49 UTC
[7/7] git commit: Refactored the mesos containerizer launcher to fix
MESOS-1404.
Refactored the mesos containerizer launcher to fix MESOS-1404.
Review: https://reviews.apache.org/r/22852
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0046dca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0046dca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0046dca
Branch: refs/heads/master
Commit: d0046dca732ac4c1636ef85384bda3092ff3fa4f
Parents: ae3c8e2
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jun 20 21:40:37 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:36:12 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 21 +-
src/slave/containerizer/containerizer.cpp | 3 +-
src/slave/containerizer/launcher.cpp | 89 +-
src/slave/containerizer/launcher.hpp | 46 +-
src/slave/containerizer/linux_launcher.cpp | 199 +--
src/slave/containerizer/linux_launcher.hpp | 17 +-
src/slave/containerizer/mesos/containerizer.cpp | 1048 ++++++++++++++++
src/slave/containerizer/mesos/containerizer.hpp | 240 ++++
src/slave/containerizer/mesos/launch.cpp | 211 ++++
src/slave/containerizer/mesos/launch.hpp | 60 +
src/slave/containerizer/mesos/main.cpp | 34 +
src/slave/containerizer/mesos_containerizer.cpp | 1174 ------------------
src/slave/containerizer/mesos_containerizer.hpp | 242 ----
src/tests/cgroups_isolator_tests.cpp | 2 +-
src/tests/containerizer_tests.cpp | 3 +-
src/tests/isolator_tests.cpp | 170 ++-
src/tests/master_tests.cpp | 2 +-
src/tests/mesos.cpp | 3 +-
src/tests/mesos.hpp | 6 +-
src/tests/slave_tests.cpp | 3 +-
20 files changed, 1958 insertions(+), 1615 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a49a3fe..fb3af9d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -247,10 +247,11 @@ libmesos_no_3rdparty_la_SOURCES = \
slave/slave.cpp \
slave/http.cpp \
slave/containerizer/containerizer.cpp \
+ slave/containerizer/external_containerizer.cpp \
slave/containerizer/isolator.cpp \
slave/containerizer/launcher.cpp \
- slave/containerizer/mesos_containerizer.cpp \
- slave/containerizer/external_containerizer.cpp \
+ slave/containerizer/mesos/containerizer.cpp \
+ slave/containerizer/mesos/launch.cpp \
slave/status_update_manager.cpp \
exec/exec.cpp \
common/lock.cpp \
@@ -365,16 +366,17 @@ libmesos_no_3rdparty_la_SOURCES += \
master/registrar.hpp \
master/master.hpp master/sorter.hpp \
messages/messages.hpp slave/constants.hpp \
- slave/containerizer/linux_launcher.hpp \
slave/containerizer/containerizer.hpp \
+ slave/containerizer/external_containerizer.hpp \
slave/containerizer/isolator.hpp \
+ slave/containerizer/launcher.hpp \
+ slave/containerizer/linux_launcher.hpp \
+ slave/containerizer/mesos/containerizer.hpp \
+ slave/containerizer/mesos/launch.hpp \
+ slave/containerizer/isolators/posix.hpp \
slave/containerizer/isolators/cgroups/cpushare.hpp \
slave/containerizer/isolators/cgroups/mem.hpp \
slave/containerizer/isolators/cgroups/perf_event.hpp \
- slave/containerizer/isolators/posix.hpp \
- slave/containerizer/launcher.hpp \
- slave/containerizer/mesos_containerizer.hpp \
- slave/containerizer/external_containerizer.hpp \
slave/flags.hpp slave/gc.hpp slave/monitor.hpp \
slave/paths.hpp slave/state.hpp \
slave/status_update_manager.hpp \
@@ -562,6 +564,11 @@ mesos_executor_SOURCES = launcher/executor.cpp
mesos_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
mesos_executor_LDADD = libmesos.la
+pkglibexec_PROGRAMS += mesos-containerizer
+mesos_containerizer_SOURCES = slave/containerizer/mesos/main.cpp
+mesos_containerizer_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_containerizer_LDADD = libmesos.la
+
pkglibexec_PROGRAMS += mesos-health-check
mesos_health_check_SOURCES = health-check/main.cpp
mesos_health_check_CPPFLAGS = $(MESOS_CPPFLAGS)
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 922ae17..1b71f33 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -37,9 +37,10 @@
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/isolator.hpp"
#include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
#include "slave/containerizer/external_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
+
using std::map;
using std::string;
using std::vector;
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/launcher.cpp b/src/slave/containerizer/launcher.cpp
index 2b13d1f..92c0657 100644
--- a/src/slave/containerizer/launcher.cpp
+++ b/src/slave/containerizer/launcher.cpp
@@ -16,8 +16,6 @@
* limitations under the License.
*/
-#include <unistd.h>
-
#include <process/collect.hpp>
#include <process/delay.hpp>
#include <process/process.hpp>
@@ -32,7 +30,9 @@
using namespace process;
using std::list;
+using std::map;
using std::string;
+using std::vector;
namespace mesos {
namespace internal {
@@ -63,11 +63,12 @@ Future<Nothing> PosixLauncher::recover(const list<RunState>& states)
pid_t pid = state.forkedPid.get();
if (pids.containsValue(pid)) {
- // This should (almost) never occur. There is the possibility that a new
- // executor is launched with the same pid as one that just exited (highly
- // unlikely) and the slave dies after the new executor is launched but
- // before it hears about the termination of the earlier executor (also
- // unlikely). Regardless, the launcher can't do anything sensible so this
+ // This should (almost) never occur. There is the possibility
+ // that a new executor is launched with the same pid as one that
+ // just exited (highly unlikely) and the slave dies after the
+ // new executor is launched but before it hears about the
+ // termination of the earlier executor (also unlikely).
+ // Regardless, the launcher can't do anything sensible so this
// is considered an error.
return Failure("Detected duplicate pid " + stringify(pid) +
" for container " + stringify(containerId));
@@ -80,46 +81,66 @@ Future<Nothing> PosixLauncher::recover(const list<RunState>& states)
}
+// The setup function in child before the exec.
+static int childSetup(const Option<lambda::function<int()> >& setup)
+{
+ // POSIX guarantees a forked child's pid does not match any existing
+ // process group id so only a single setsid() is required and the
+ // session id will be the pid.
+ // TODO(idownes): perror is not listed as async-signal-safe and
+ // should be reimplemented safely.
+ // TODO(jieyu): Move this logic to the subprocess (i.e.,
+ // mesos-containerizer launch).
+ if (::setsid() == -1) {
+ perror("Failed to put child in a new session");
+ _exit(1);
+ }
+
+ if (setup.isSome()) {
+ return setup.get()();
+ }
+
+ return 0;
+}
+
+
Try<pid_t> PosixLauncher::fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction)
+ const string& path,
+ const vector<string>& argv,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<map<string, string> >& environment,
+ const Option<lambda::function<int()> >& setup)
{
if (pids.contains(containerId)) {
return Error("Process has already been forked for container " +
stringify(containerId));
}
- pid_t pid;
-
- if ((pid = ::fork()) == -1) {
- return ErrnoError("Failed to fork");
- }
-
- if (pid == 0) {
- // In child.
- // POSIX guarantees a forked child's pid does not match any existing
- // process group id so only a single setsid() is required and the session
- // id will be the pid.
- // TODO(idownes): perror is not listed as async-signal-safe and should be
- // reimplemented safely.
- if (setsid() == -1) {
- perror("Failed to put child in a new session");
- _exit(1);
- }
-
- // This function should exec() and therefore not return.
- childFunction();
-
- ABORT("Child failed to exec");
+ Try<Subprocess> child = subprocess(
+ path,
+ argv,
+ in,
+ out,
+ err,
+ flags,
+ environment,
+ lambda::bind(&childSetup, setup));
+
+ if (child.isError()) {
+ return Error("Failed to fork a child process: " + child.error());
}
- // parent.
- LOG(INFO) << "Forked child with pid '" << pid
+ LOG(INFO) << "Forked child with pid '" << child.get().pid()
<< "' for container '" << containerId << "'";
+
// Store the pid (session id and process group id).
- pids.put(containerId, pid);
+ pids.put(containerId, child.get().pid());
- return pid;
+ return child.get().pid();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/launcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/launcher.hpp b/src/slave/containerizer/launcher.hpp
index 835d7a9..18b3546 100644
--- a/src/slave/containerizer/launcher.hpp
+++ b/src/slave/containerizer/launcher.hpp
@@ -20,10 +20,15 @@
#define __LAUNCHER_HPP__
#include <list>
+#include <map>
+#include <string>
#include <process/future.hpp>
+#include <process/subprocess.hpp>
+#include <stout/flags.hpp>
#include <stout/lambda.hpp>
+#include <stout/option.hpp>
#include <stout/try.hpp>
#include "slave/flags.hpp"
@@ -42,23 +47,33 @@ public:
virtual process::Future<Nothing> recover(
const std::list<state::RunState>& states) = 0;
- // Fork a new process in the containerized context. The child will call the
- // specified function and the parent will return the child's pid.
- // NOTE: The function must be async-signal safe and should exec as soon as
- // possible.
+ // Fork a new process in the containerized context. The child will
+ // exec the binary at the given path with the given argv, flags and
+ // environment. The I/O of the child will be redirected according to
+ // the specified I/O descriptors. The user can provide a 'setup'
+ // function which will be invoked in the child process right before
+ // the exec. The 'setup' function has to be async signal safe. The
+ // parent will return the child's pid if the fork is successful.
virtual Try<pid_t> fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction) = 0;
+ const std::string& path,
+ const std::vector<std::string>& argv,
+ const process::Subprocess::IO& in,
+ const process::Subprocess::IO& out,
+ const process::Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<std::map<std::string, std::string> >& environment,
+ const Option<lambda::function<int()> >& setup) = 0;
// Kill all processes in the containerized context.
virtual process::Future<Nothing> destroy(const ContainerID& containerId) = 0;
};
-// Launcher suitable for any POSIX compliant system. Uses process groups and
-// sessions to track processes in a container. POSIX states that process groups
-// cannot migrate between sessions so all processes for a container will be
-// contained in a session.
+// Launcher suitable for any POSIX compliant system. Uses process
+// groups and sessions to track processes in a container. POSIX states
+// that process groups cannot migrate between sessions so all
+// processes for a container will be contained in a session.
class PosixLauncher : public Launcher
{
public:
@@ -71,15 +86,22 @@ public:
virtual Try<pid_t> fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction);
+ const std::string& path,
+ const std::vector<std::string>& argv,
+ const process::Subprocess::IO& in,
+ const process::Subprocess::IO& out,
+ const process::Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<std::map<std::string, std::string> >& environment,
+ const Option<lambda::function<int()> >& setup);
virtual process::Future<Nothing> destroy(const ContainerID& containerId);
private:
PosixLauncher() {}
- // The 'pid' is the process id of the first process and also the process
- // group id and session id.
+ // The 'pid' is the process id of the first process and also the
+ // process group id and session id.
hashmap<ContainerID, pid_t> pids;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/linux_launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.cpp b/src/slave/containerizer/linux_launcher.cpp
index 7ebccb4..1ce03e3 100644
--- a/src/slave/containerizer/linux_launcher.cpp
+++ b/src/slave/containerizer/linux_launcher.cpp
@@ -38,6 +38,7 @@
using namespace process;
using std::list;
+using std::map;
using std::string;
using std::vector;
@@ -59,7 +60,9 @@ LinuxLauncher::LinuxLauncher(
Try<Launcher*> LinuxLauncher::create(const Flags& flags)
{
Try<string> hierarchy = cgroups::prepare(
- flags.cgroups_hierarchy, "freezer", flags.cgroups_root);
+ flags.cgroups_hierarchy,
+ "freezer",
+ flags.cgroups_root);
if (hierarchy.isError()) {
return Error("Failed to create Linux launcher: " + hierarchy.error());
@@ -68,7 +71,8 @@ Try<Launcher*> LinuxLauncher::create(const Flags& flags)
LOG(INFO) << "Using " << hierarchy.get()
<< " as the freezer hierarchy for the Linux launcher";
- // TODO(idownes): Inspect the isolation flag to determine namespaces to use.
+ // TODO(idownes): Inspect the isolation flag to determine namespaces
+ // to use.
int namespaces = 0;
return new LinuxLauncher(flags, namespaces, hierarchy.get());
@@ -94,10 +98,10 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
Try<bool> exists = cgroups::exists(hierarchy, cgroup(containerId));
if (!exists.get()) {
- // This may occur if the freezer cgroup was destroyed but the slave dies
- // before noticing this.
- // The containerizer will monitor the container's pid and notice that it
- // has exited, triggering destruction of the container.
+ // This may occur if the freezer cgroup was destroyed but the
+ // slave dies before noticing this. The containerizer will
+ // monitor the container's pid and notice that it has exited,
+ // triggering destruction of the container.
LOG(INFO) << "Couldn't find freezer cgroup for container " << containerId;
continue;
}
@@ -109,11 +113,12 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
pid_t pid = state.forkedPid.get();
if (pids.containsValue(pid)) {
- // This should (almost) never occur. There is the possibility that a new
- // executor is launched with the same pid as one that just exited (highly
- // unlikely) and the slave dies after the new executor is launched but
- // before it hears about the termination of the earlier executor (also
- // unlikely). Regardless, the launcher can't do anything sensible so this
+ // This should (almost) never occur. There is the possibility
+ // that a new executor is launched with the same pid as one that
+ // just exited (highly unlikely) and the slave dies after the
+ // new executor is launched but before it hears about the
+ // termination of the earlier executor (also unlikely).
+ // Regardless, the launcher can't do anything sensible so this
// is considered an error.
return Failure("Detected duplicate pid " + stringify(pid) +
" for container " + stringify(containerId));
@@ -149,58 +154,84 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
// Helper for clone() which expects an int(void*).
-static int childMain(void* child)
+static int childMain(void* _func)
{
const lambda::function<int()>* func =
- static_cast<const lambda::function<int()>*> (child);
+ static_cast<const lambda::function<int()>*> (_func);
return (*func)();
}
-// Helper that creates a new session then blocks on reading the pipe before
-// calling the supplied function.
-static int _childMain(
- const lambda::function<int()>& childFunction,
- int pipes[2])
+// The customized clone function which will be used by 'subprocess()'.
+static pid_t clone(const lambda::function<int()>& func, int namespaces)
{
- // In child.
- os::close(pipes[1]);
+ // Stack for the child.
+ // - unsigned long long used for best alignment.
+ // - static is ok because each child gets their own copy after the clone.
+ // - 8 MiB appears to be the default for "ulimit -s" on OSX and Linux.
+ static unsigned long long stack[(8*1024*1024)/sizeof(unsigned long long)];
+
+ LOG(INFO) << "Cloning child process with flags = " << namespaces;
+
+ return ::clone(
+ childMain,
+ &stack[sizeof(stack)/sizeof(stack[0]) - 1], // stack grows down
+ namespaces | SIGCHLD, // Specify SIGCHLD as child termination signal
+ (void*) &func);
+}
- // Move to a different session (and new process group) so we're independent
- // from the slave's session (otherwise children will receive SIGHUP if the
- // slave exits).
- // TODO(idownes): perror is not listed as async-signal-safe and should be
- // reimplemented safely.
- if (setsid() == -1) {
- perror("Failed to put child in a new session");
- os::close(pipes[0]);
- _exit(1);
- }
- // Do a blocking read on the pipe until the parent signals us to continue.
- int buf;
- int len;
- while ((len = read(pipes[0], &buf, sizeof(buf))) == -1 && errno == EINTR);
+static int childSetup(
+ int pipes[2],
+ const Option<lambda::function<int()> >& setup)
+{
+ // In child.
+ while (::close(pipes[1]) == -1 && errno == EINTR);
- if (len != sizeof(buf)) {
+ // Do a blocking read on the pipe until the parent signals us to
+ // continue.
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
ABORT("Failed to synchronize with parent");
}
- os::close(pipes[0]);
+ while (::close(pipes[0]) == -1 && errno == EINTR);
- // This function should exec() and therefore not return.
- childFunction();
+ // Move to a different session (and new process group) so we're
+ // independent from the slave's session (otherwise children will
+ // receive SIGHUP if the slave exits).
+ // TODO(idownes): perror is not listed as async-signal-safe and
+ // should be reimplemented safely.
+ // TODO(jieyu): Move this logic to the subprocess (i.e.,
+ // mesos-containerizer launch).
+ if (::setsid() == -1) {
+ perror("Failed to put child in a new session");
+ return 1;
+ }
- ABORT("Child failed to exec");
+ if (setup.isSome()) {
+ return setup.get()();
+ }
- return -1;
+ return 0;
}
Try<pid_t> LinuxLauncher::fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction)
+ const string& path,
+ const vector<string>& argv,
+ const process::Subprocess::IO& in,
+ const process::Subprocess::IO& out,
+ const process::Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<map<string, string> >& environment,
+ const Option<lambda::function<int()> >& setup)
{
// Create a freezer cgroup for this container if necessary.
Try<bool> exists = cgroups::exists(hierarchy, cgroup(containerId));
@@ -217,70 +248,72 @@ Try<pid_t> LinuxLauncher::fork(
}
}
- // Use a pipe to block the child until it's been moved into the freezer
- // cgroup.
+ // Use a pipe to block the child until it's been moved into the
+ // freezer cgroup.
int pipes[2];
- // We assume this should not fail under reasonable conditions so we use CHECK.
- CHECK(pipe(pipes) == 0);
- // Use the _childMain helper which moves the child into a new session and
- // blocks on the pipe until we're ready for it to run.
- lambda::function<int()> func =
- lambda::bind(&_childMain, childFunction, pipes);
-
- // Stack for the child.
- // - unsigned long long used for best alignment.
- // - static is ok because each child gets their own copy after the clone.
- // - 8 MiB appears to be the default for "ulimit -s" on OSX and Linux.
- static unsigned long long stack[(8*1024*1024)/sizeof(unsigned long long)];
-
- LOG(INFO) << "Cloning child process with flags = " << namespaces;
-
- pid_t pid;
- if ((pid = ::clone(
- childMain,
- &stack[sizeof(stack)/sizeof(stack[0]) - 1], // stack grows down
- namespaces | SIGCHLD, // Specify SIGCHLD as child termination signal
- static_cast<void*>(&func))) == -1) {
- return ErrnoError("Failed to clone child process");
+ // We assume this should not fail under reasonable conditions so we
+ // use CHECK.
+ CHECK_EQ(0, ::pipe(pipes));
+
+ Try<Subprocess> child = subprocess(
+ path,
+ argv,
+ in,
+ out,
+ err,
+ flags,
+ environment,
+ lambda::bind(&childSetup, pipes, setup),
+ lambda::bind(&clone, lambda::_1, namespaces));
+
+ if (child.isError()) {
+ return Error("Failed to clone child process: " + child.error());
}
// Parent.
os::close(pipes[0]);
- // Move the child into the freezer cgroup. Any grandchildren will also be
- // contained in the cgroup.
- Try<Nothing> assign = cgroups::assign(hierarchy, cgroup(containerId), pid);
+ // Move the child into the freezer cgroup. Any grandchildren will
+ // also be contained in the cgroup.
+ // TODO(jieyu): Move this logic to the subprocess (i.e.,
+ // mesos-containerizer launch).
+ Try<Nothing> assign = cgroups::assign(
+ hierarchy,
+ cgroup(containerId),
+ child.get().pid());
if (assign.isError()) {
- LOG(ERROR) << "Failed to assign process " << pid
+ LOG(ERROR) << "Failed to assign process " << child.get().pid()
<< " of container '" << containerId << "'"
<< " to its freezer cgroup: " << assign.error();
- kill(pid, SIGKILL);
+
+ ::kill(child.get().pid(), SIGKILL);
return Error("Failed to contain process");
}
- // Now that we've contained the child we can signal it to continue by
- // writing to the pipe.
- int buf;
- ssize_t len;
- while ((len = write(pipes[1], &buf, sizeof(buf))) == -1 && errno == EINTR);
+ // Now that we've contained the child we can signal it to continue
+ // by writing to the pipe.
+ char dummy;
+ ssize_t length;
+ while ((length = ::write(pipes[1], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
- if (len != sizeof(buf)) {
+ os::close(pipes[1]);
+
+ if (length != sizeof(dummy)) {
// Ensure the child is killed.
- kill(pid, SIGKILL);
- os::close(pipes[1]);
+ ::kill(child.get().pid(), SIGKILL);
return Error("Failed to synchronize child process");
}
- os::close(pipes[1]);
- // Store the pid (session id and process group id) if this is the first
- // process forked for this container.
+ // Store the pid (session id and process group id) if this is the
+ // first process forked for this container.
if (!pids.contains(containerId)) {
- pids.put(containerId, pid);
+ pids.put(containerId, child.get().pid());
}
- return pid;
+ return child.get().pid();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/linux_launcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.hpp b/src/slave/containerizer/linux_launcher.hpp
index 622810c..3d9794d 100644
--- a/src/slave/containerizer/linux_launcher.hpp
+++ b/src/slave/containerizer/linux_launcher.hpp
@@ -25,8 +25,8 @@ namespace mesos {
namespace internal {
namespace slave {
-// Launcher for Linux systems with cgroups. Uses a freezer cgroup to track
-// pids.
+// Launcher for Linux systems with cgroups. Uses a freezer cgroup to
+// track pids.
class LinuxLauncher : public Launcher
{
public:
@@ -39,7 +39,14 @@ public:
virtual Try<pid_t> fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction);
+ const std::string& path,
+ const std::vector<std::string>& argv,
+ const process::Subprocess::IO& in,
+ const process::Subprocess::IO& out,
+ const process::Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<std::map<std::string, std::string> >& environment,
+ const Option<lambda::function<int()> >& setup);
virtual process::Future<Nothing> destroy(const ContainerID& containerId);
@@ -56,8 +63,8 @@ private:
std::string cgroup(const ContainerID& containerId);
- // The 'pid' is the process id of the child process and also the process
- // group id and session id.
+ // The 'pid' is the process id of the child process and also the
+ // process group id and session id.
hashmap<ContainerID, pid_t> pids;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
new file mode 100644
index 0000000..27f8e09
--- /dev/null
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -0,0 +1,1048 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/io.hpp>
+#include <process/reap.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/os.hpp>
+
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+#ifdef __linux__
+#include "slave/containerizer/linux_launcher.hpp"
+#endif // __linux__
+
+#include "slave/containerizer/isolators/posix.hpp"
+#ifdef __linux__
+#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
+#include "slave/containerizer/isolators/cgroups/mem.hpp"
+#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
+#endif // __linux__
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+#include "slave/containerizer/mesos/launch.hpp"
+
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+using state::SlaveState;
+using state::FrameworkState;
+using state::ExecutorState;
+using state::RunState;
+
+// Local function declaration/definitions.
+Future<Nothing> _nothing() { return Nothing(); }
+
+
+// Helper method to build the environment map used to launch fetcher.
+map<string, string> fetcherEnvironment(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags)
+{
+ // Prepare the environment variables to pass to mesos-fetcher.
+ string uris = "";
+ foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+ uris += uri.value() + "+" +
+ (uri.has_executable() && uri.executable() ? "1" : "0") +
+ (uri.extract() ? "X" : "N");
+ uris += " ";
+ }
+ // Remove extra space at the end.
+ uris = strings::trim(uris);
+
+ map<string, string> environment;
+ environment["MESOS_EXECUTOR_URIS"] = uris;
+ environment["MESOS_WORK_DIRECTORY"] = directory;
+ if (user.isSome()) {
+ environment["MESOS_USER"] = user.get();
+ }
+ if (!flags.frameworks_home.empty()) {
+ environment["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
+ }
+ if (!flags.hadoop_home.empty()) {
+ environment["HADOOP_HOME"] = flags.hadoop_home;
+ }
+
+ return environment;
+}
+
+
+Try<MesosContainerizer*> MesosContainerizer::create(
+ const Flags& flags,
+ bool local)
+{
+ string isolation;
+ if (flags.isolation == "process") {
+ LOG(WARNING) << "The 'process' isolation flag is deprecated, "
+ << "please update your flags to"
+ << " '--isolation=posix/cpu,posix/mem'.";
+ isolation = "posix/cpu,posix/mem";
+ } else if (flags.isolation == "cgroups") {
+ LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
+ << "please update your flags to"
+ << " '--isolation=cgroups/cpu,cgroups/mem'.";
+ isolation = "cgroups/cpu,cgroups/mem";
+ } else {
+ isolation = flags.isolation;
+ }
+
+ LOG(INFO) << "Using isolation: " << isolation;
+
+ // Create a MesosContainerizerProcess using isolators and a launcher.
+ hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
+
+ creators["posix/cpu"] = &PosixCpuIsolatorProcess::create;
+ creators["posix/mem"] = &PosixMemIsolatorProcess::create;
+#ifdef __linux__
+ creators["cgroups/cpu"] = &CgroupsCpushareIsolatorProcess::create;
+ creators["cgroups/mem"] = &CgroupsMemIsolatorProcess::create;
+ creators["cgroups/perf_event"] = &CgroupsPerfEventIsolatorProcess::create;
+#endif // __linux__
+
+ vector<Owned<Isolator> > isolators;
+
+ foreach (const string& type, strings::split(isolation, ",")) {
+ if (creators.contains(type)) {
+ Try<Isolator*> isolator = creators[type](flags);
+ if (isolator.isError()) {
+ return Error(
+ "Could not create isolator " + type + ": " + isolator.error());
+ } else {
+ isolators.push_back(Owned<Isolator>(isolator.get()));
+ }
+ } else {
+ return Error("Unknown or unsupported isolator: " + type);
+ }
+ }
+
+#ifdef __linux__
+ // Use cgroups on Linux if any cgroups isolators are used.
+ Try<Launcher*> launcher = strings::contains(isolation, "cgroups")
+ ? LinuxLauncher::create(flags)
+ : PosixLauncher::create(flags);
+#else
+ Try<Launcher*> launcher = PosixLauncher::create(flags);
+#endif // __linux__
+
+ if (launcher.isError()) {
+ return Error("Failed to create launcher: " + launcher.error());
+ }
+
+ return new MesosContainerizer(
+ flags, local, Owned<Launcher>(launcher.get()), isolators);
+}
+
+
+MesosContainerizer::MesosContainerizer(
+ const Flags& flags,
+ bool local,
+ const Owned<Launcher>& launcher,
+ const vector<Owned<Isolator> >& isolators)
+{
+ process = new MesosContainerizerProcess(
+ flags, local, launcher, isolators);
+ spawn(process);
+}
+
+
+MesosContainerizer::~MesosContainerizer()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+Future<Nothing> MesosContainerizer::recover(
+ const Option<state::SlaveState>& state)
+{
+ return dispatch(process, &MesosContainerizerProcess::recover, state);
+}
+
+
+Future<Nothing> MesosContainerizer::launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ return dispatch(process,
+ &MesosContainerizerProcess::launch,
+ containerId,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+}
+
+
+Future<Nothing> MesosContainerizer::launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ return dispatch(process,
+ &MesosContainerizerProcess::launch,
+ containerId,
+ taskInfo,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+}
+
+
+Future<Nothing> MesosContainerizer::update(
+ const ContainerID& containerId,
+ const Resources& resources)
+{
+ return dispatch(process,
+ &MesosContainerizerProcess::update,
+ containerId,
+ resources);
+}
+
+
+Future<ResourceStatistics> MesosContainerizer::usage(
+ const ContainerID& containerId)
+{
+ return dispatch(process, &MesosContainerizerProcess::usage, containerId);
+}
+
+
+Future<containerizer::Termination> MesosContainerizer::wait(
+ const ContainerID& containerId)
+{
+ return dispatch(process, &MesosContainerizerProcess::wait, containerId);
+}
+
+
+void MesosContainerizer::destroy(const ContainerID& containerId)
+{
+ dispatch(process, &MesosContainerizerProcess::destroy, containerId);
+}
+
+
+Future<hashset<ContainerID> > MesosContainerizer::containers()
+{
+ return dispatch(process, &MesosContainerizerProcess::containers);
+}
+
+
+Future<Nothing> MesosContainerizerProcess::recover(
+ const Option<state::SlaveState>& state)
+{
+ LOG(INFO) << "Recovering containerizer";
+
+ // Gather the executor run states that we will attempt to recover.
+ list<RunState> recoverable;
+ if (state.isSome()) {
+ foreachvalue (const FrameworkState& framework, state.get().frameworks) {
+ foreachvalue (const ExecutorState& executor, framework.executors) {
+ if (executor.info.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its info could not be recovered";
+ continue;
+ }
+
+ if (executor.latest.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its latest run could not be recovered";
+ continue;
+ }
+
+ // We are only interested in the latest run of the executor!
+ const ContainerID& containerId = executor.latest.get();
+ Option<RunState> run = executor.runs.get(containerId);
+ CHECK_SOME(run);
+
+ // We need the pid so the reaper can monitor the executor so skip this
+ // executor if it's not present. This is not an error because the slave
+ // will try to wait on the container which will return a failed
+ // Termination and everything will get cleaned up.
+ if (!run.get().forkedPid.isSome()) {
+ continue;
+ }
+
+ if (run.get().completed) {
+ VLOG(1) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its latest run "
+ << containerId << " is completed";
+ continue;
+ }
+
+ LOG(INFO) << "Recovering container '" << containerId
+ << "' for executor '" << executor.id
+ << "' of framework " << framework.id;
+
+ recoverable.push_back(run.get());
+ }
+ }
+ }
+
+ // Try to recover the launcher first.
+ return launcher->recover(recoverable)
+ .then(defer(self(), &Self::_recover, recoverable));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::_recover(
+ const list<RunState>& recoverable)
+{
+ // Then recover the isolators.
+ list<Future<Nothing> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->recover(recoverable));
+ }
+
+ // If all isolators recover then continue.
+ return collect(futures)
+ .then(defer(self(), &Self::__recover, recoverable));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::__recover(
+ const list<RunState>& recovered)
+{
+ foreach (const RunState& run, recovered) {
+ CHECK_SOME(run.id);
+ const ContainerID& containerId = run.id.get();
+
+ Owned<Promise<containerizer::Termination> > promise(
+ new Promise<containerizer::Termination>());
+ promises.put(containerId, promise);
+
+ CHECK_SOME(run.forkedPid);
+ Future<Option<int > > status = process::reap(run.forkedPid.get());
+ statuses[containerId] = status;
+ status.onAny(defer(self(), &Self::reaped, containerId));
+
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ isolator->watch(containerId)
+ .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
+ }
+ }
+
+ return Nothing();
+}
+
+
+// Launching an executor involves the following steps:
+// 1. Call prepare on each isolator.
+// 2. Fork the executor. The forked child is blocked from exec'ing until it has
+// been isolated.
+// 3. Isolate the executor. Call isolate with the pid for each isolator.
+// 4. Fetch the executor.
+// 4. Exec the executor. The forked child is signalled to continue. It will
+// first execute any preparation commands from isolators and then exec the
+// executor.
+Future<Nothing> MesosContainerizerProcess::launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ if (promises.contains(containerId)) {
+ LOG(ERROR) << "Cannot start already running container '"
+ << containerId << "'";
+ return Failure("Container already started");
+ }
+
+ // TODO(tillt): The slave should expose which containerization
+ // mechanisms it supports to avoid scheduling tasks that it cannot
+ // run.
+ const CommandInfo& command = executorInfo.command();
+ if (command.has_container()) {
+ // We return a Failure as this containerizer does not support
+ // handling ContainerInfo. Users have to be made aware of this
+ // lack of support to prevent confusion in the task configuration.
+ return Failure("ContainerInfo is not supported");
+ }
+
+ Owned<Promise<containerizer::Termination> > promise(
+ new Promise<containerizer::Termination>());
+ promises.put(containerId, promise);
+
+ // Store the resources for usage().
+ resources.put(containerId, executorInfo.resources());
+
+ LOG(INFO) << "Starting container '" << containerId
+ << "' for executor '" << executorInfo.executor_id()
+ << "' of framework '" << executorInfo.framework_id() << "'";
+
+ return prepare(containerId, executorInfo, directory, user)
+ .then(defer(self(),
+ &Self::_launch,
+ containerId,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint,
+ lambda::_1))
+ .onFailed(defer(self(),
+ &Self::destroy,
+ containerId));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::launch(
+ const ContainerID& containerId,
+ const TaskInfo&,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ return launch(
+ containerId,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+}
+
+Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user)
+{
+ // Start preparing all isolators (in parallel) and gather any additional
+ // preparation comands that must be run in the forked child before exec'ing
+ // the executor.
+ list<Future<Option<CommandInfo> > > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->prepare(containerId, executorInfo));
+ }
+
+ // Wait for all isolators to complete preparations.
+ return collect(futures);
+}
+
+
+Future<Nothing> _fetch(
+ const ContainerID& containerId,
+ const string& directory,
+ const Option<string>& user,
+ const Option<int>& status)
+{
+ if (status.isNone() || (status.get() != 0)) {
+ return Failure("Failed to fetch URIs for container '" +
+ stringify(containerId) + "': exit status " +
+ (status.isNone() ? "none" : stringify(status.get())));
+ }
+
+ // Chown the work directory if a user is provided.
+ if (user.isSome()) {
+ Try<Nothing> chown = os::chown(user.get(), directory);
+ if (chown.isError()) {
+ return Failure("Failed to chown work directory");
+ }
+ }
+
+ return Nothing();
+}
+
+
+Future<Nothing> MesosContainerizerProcess::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user)
+{
+ // Determine path for mesos-fetcher.
+ Result<string> realpath = os::realpath(
+ path::join(flags.launcher_dir, "mesos-fetcher"));
+
+ if (!realpath.isSome()) {
+ LOG(ERROR) << "Failed to determine the canonical path "
+ << "for the mesos-fetcher '"
+ << path::join(flags.launcher_dir, "mesos-fetcher")
+ << "': "
+ << (realpath.isError() ? realpath.error()
+ : "No such file or directory");
+ return Failure("Could not fetch URIs: failed to find mesos-fetcher");
+ }
+
+ map<string, string> environment =
+ fetcherEnvironment(commandInfo, directory, user, flags);
+
+ // Now the actual mesos-fetcher command.
+ string command = realpath.get();
+
+ LOG(INFO) << "Fetching URIs for container '" << containerId
+ << "' using command '" << command << "'";
+
+ Try<Subprocess> fetcher = subprocess(
+ command,
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
+
+ if (fetcher.isError()) {
+ return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
+ }
+
+ // Redirect output (stdout and stderr) from the fetcher to log files
+ // in the executor work directory, chown'ing them if a user is
+ // specified.
+ // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
+ // TODO(tillt): Consider adding an overload to io::redirect
+ // that accepts a file path as 'to' for further reducing code. We
+ // would however also need an owner user parameter for such overload
+ // to perfectly replace the below.
+ Try<int> out = os::open(
+ path::join(directory, "stdout"),
+ O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ if (out.isError()) {
+ return Failure("Failed to redirect stdout: " + out.error());
+ }
+
+ if (user.isSome()) {
+ Try<Nothing> chown = os::chown(
+ user.get(), path::join(directory, "stdout"));
+ if (chown.isError()) {
+ os::close(out.get());
+ return Failure(
+ "Failed to redirect stdout: Failed to chown: " +
+ chown.error());
+ }
+ }
+
+ // Redirect takes care of nonblocking and close-on-exec for the
+ // supplied file descriptors.
+ io::redirect(fetcher.get().out().get(), out.get());
+
+ // Redirect does 'dup' the file descriptor, hence we can close the
+ // original now.
+ os::close(out.get());
+
+ // Repeat for stderr.
+ Try<int> err = os::open(
+ path::join(directory, "stderr"),
+ O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ if (err.isError()) {
+ return Failure(
+ "Failed to redirect stderr: Failed to open: " +
+ err.error());
+ }
+
+ if (user.isSome()) {
+ Try<Nothing> chown = os::chown(
+ user.get(), path::join(directory, "stderr"));
+ if (chown.isError()) {
+ os::close(err.get());
+ return Failure(
+ "Failed to redirect stderr: Failed to chown: " +
+ chown.error());
+ }
+ }
+
+ io::redirect(fetcher.get().err().get(), err.get());
+
+ os::close(err.get());
+
+ return fetcher.get().status()
+ .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::_launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint,
+ const list<Option<CommandInfo> >& commands)
+{
+ // Prepare environment variables for the executor.
+ map<string, string> env = executorEnvironment(
+ executorInfo,
+ directory,
+ slaveId,
+ slavePid,
+ checkpoint,
+ flags.recovery_timeout);
+
+ // Include any enviroment variables from CommandInfo.
+ foreach (const Environment::Variable& variable,
+ executorInfo.command().environment().variables()) {
+ env[variable.name()] = variable.value();
+ }
+
+ // Use a pipe to block the child until it's been isolated.
+ int pipes[2];
+
+ // We assume this should not fail under reasonable conditions so we
+ // use CHECK.
+ CHECK(pipe(pipes) == 0);
+
+ // Prepare the flags to pass to the launch process.
+ MesosContainerizerLaunch::Flags launchFlags;
+
+ launchFlags.command = JSON::Protobuf(executorInfo.command());
+ launchFlags.directory = directory;
+ launchFlags.user = user;
+ launchFlags.pipe_read = pipes[0];
+ launchFlags.pipe_write = pipes[1];
+
+ // Prepare the additional preparation commands.
+ // TODO(jieyu): Use JSON::Array once we have generic parse support.
+ JSON::Object object;
+ JSON::Array array;
+ foreach (const Option<CommandInfo>& command, commands) {
+ if (command.isSome()) {
+ array.values.push_back(JSON::Protobuf(command.get()));
+ }
+ }
+ object.values["commands"] = array;
+
+ launchFlags.commands = object;
+
+ // Fork the child using launcher.
+ vector<string> argv(2);
+ argv[0] = "mesos-containerizer";
+ argv[1] = MesosContainerizerLaunch::NAME;
+
+ Try<pid_t> forked = launcher->fork(
+ containerId,
+ path::join(flags.launcher_dir, "mesos-containerizer"),
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ (local ? Subprocess::FD(STDOUT_FILENO)
+ : Subprocess::PATH(path::join(directory, "stdout"))),
+ (local ? Subprocess::FD(STDERR_FILENO)
+ : Subprocess::PATH(path::join(directory, "stderr"))),
+ launchFlags,
+ env,
+ None());
+
+ if (forked.isError()) {
+ return Failure("Failed to fork executor: " + forked.error());
+ }
+ pid_t pid = forked.get();
+
+ // Checkpoint the executor's pid if requested.
+ if (checkpoint) {
+ const string& path = slave::paths::getForkedPidPath(
+ slave::paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ executorInfo.framework_id(),
+ executorInfo.executor_id(),
+ containerId);
+
+ LOG(INFO) << "Checkpointing executor's forked pid " << pid
+ << " to '" << path << "'";
+
+ Try<Nothing> checkpointed =
+ slave::state::checkpoint(path, stringify(pid));
+
+ if (checkpointed.isError()) {
+ LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
+ << path << "': " << checkpointed.error();
+
+ return Failure("Could not checkpoint executor's pid");
+ }
+ }
+
+ // Monitor the executor's pid. We keep the future because we'll
+ // refer to it again during container destroy.
+ Future<Option<int> > status = process::reap(pid);
+ statuses.put(containerId, status);
+ status.onAny(defer(self(), &Self::reaped, containerId));
+
+ return isolate(containerId, pid)
+ .then(defer(self(),
+ &Self::fetch,
+ containerId,
+ executorInfo.command(),
+ directory,
+ user))
+ .then(defer(self(), &Self::exec, containerId, pipes[1]))
+ .onAny(lambda::bind(&os::close, pipes[0]))
+ .onAny(lambda::bind(&os::close, pipes[1]));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::isolate(
+ const ContainerID& containerId,
+ pid_t _pid)
+{
+ // Set up callbacks for isolator limitations.
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ isolator->watch(containerId)
+ .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
+ }
+
+ // Isolate the executor with each isolator.
+ list<Future<Nothing> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->isolate(containerId, _pid));
+ }
+
+ // Wait for all isolators to complete.
+ return collect(futures)
+ .then(lambda::bind(&_nothing));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::exec(
+ const ContainerID& containerId,
+ int pipeWrite)
+{
+ CHECK(promises.contains(containerId));
+
+ // Now that we've contained the child we can signal it to continue by
+ // writing to the pipe.
+ char dummy;
+ ssize_t length;
+ while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
+ return Failure("Failed to synchronize child process: " +
+ string(strerror(errno)));
+ }
+
+ return Nothing();
+}
+
+
+Future<containerizer::Termination> MesosContainerizerProcess::wait(
+ const ContainerID& containerId)
+{
+ if (!promises.contains(containerId)) {
+ return Failure("Unknown container: " + stringify(containerId));
+ }
+
+ return promises[containerId]->future();
+}
+
+
+Future<Nothing> MesosContainerizerProcess::update(
+ const ContainerID& containerId,
+ const Resources& _resources)
+{
+ // The resources hashmap won't initially contain the container's resources
+ // after recovery so we must check the promises hashmap (which is set during
+ // recovery).
+ if (!promises.contains(containerId)) {
+ // It is not considered a failure if the container is not known
+ // because the slave will attempt to update the container's
+ // resources on a task's terminal state change but the executor
+ // may have already exited and the container cleaned up.
+ LOG(WARNING) << "Ignoring update for unknown container: " << containerId;
+ return Nothing();
+ }
+
+ // Store the resources for usage().
+ resources.put(containerId, _resources);
+
+ // Update each isolator.
+ list<Future<Nothing> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->update(containerId, _resources));
+ }
+
+ // Wait for all isolators to complete.
+ return collect(futures)
+ .then(lambda::bind(&_nothing));
+}
+
+
+// Resources are used to set the limit fields in the statistics but are
+// optional because they aren't known after recovery until/unless update() is
+// called.
+Future<ResourceStatistics> _usage(
+ const ContainerID& containerId,
+ const Option<Resources>& resources,
+ const list<Future<ResourceStatistics> >& statistics)
+{
+ ResourceStatistics result;
+
+ // Set the timestamp now we have all statistics.
+ result.set_timestamp(Clock::now().secs());
+
+ foreach (const Future<ResourceStatistics>& statistic, statistics) {
+ if (statistic.isReady()) {
+ result.MergeFrom(statistic.get());
+ } else {
+ LOG(WARNING) << "Skipping resource statistic for container "
+ << containerId << " because: "
+ << (statistic.isFailed() ? statistic.failure()
+ : "discarded");
+ }
+ }
+
+ if (resources.isSome()) {
+ // Set the resource allocations.
+ Option<Bytes> mem = resources.get().mem();
+ if (mem.isSome()) {
+ result.set_mem_limit_bytes(mem.get().bytes());
+ }
+
+ Option<double> cpus = resources.get().cpus();
+ if (cpus.isSome()) {
+ result.set_cpus_limit(cpus.get());
+ }
+ }
+
+ return result;
+}
+
+
+Future<ResourceStatistics> MesosContainerizerProcess::usage(
+ const ContainerID& containerId)
+{
+ if (!promises.contains(containerId)) {
+ return Failure("Unknown container: " + stringify(containerId));
+ }
+
+ list<Future<ResourceStatistics> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->usage(containerId));
+ }
+
+ // Use await() here so we can return partial usage statistics.
+ // TODO(idownes): After recovery resources won't be known until after an
+ // update() because they aren't part of the SlaveState.
+ return await(futures)
+ .then(lambda::bind(
+ _usage, containerId, resources.get(containerId), lambda::_1));
+}
+
+
+void MesosContainerizerProcess::destroy(const ContainerID& containerId)
+{
+ if (!promises.contains(containerId)) {
+ LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
+ return;
+ }
+
+ if (destroying.contains(containerId)) {
+ // Destroy has already been initiated.
+ return;
+ }
+ destroying.insert(containerId);
+
+ LOG(INFO) << "Destroying container '" << containerId << "'";
+
+ if (statuses.contains(containerId)) {
+ // Kill all processes then continue destruction.
+ launcher->destroy(containerId)
+ .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
+ } else {
+ // The executor never forked so no processes to kill, go straight to
+ // __destroy() with status = None().
+ __destroy(containerId, None());
+ }
+}
+
+
+void MesosContainerizerProcess::_destroy(
+ const ContainerID& containerId,
+ const Future<Nothing>& future)
+{
+ // Something has gone wrong and the launcher wasn't able to kill all the
+ // processes in the container. We cannot clean up the isolators because they
+ // may require that all processes have exited so just return the failure to
+ // the slave.
+ // TODO(idownes): This is a pretty bad state to be in but we should consider
+ // cleaning up here.
+ if (!future.isReady()) {
+ promises[containerId]->fail(
+ "Failed to destroy container: " +
+ (future.isFailed() ? future.failure() : "discarded future"));
+
+ destroying.erase(containerId);
+
+ return;
+ }
+
+ // We've successfully killed all processes in the container so get the exit
+ // status of the executor when it's ready (it may already be) and continue
+ // the destroy.
+ statuses.get(containerId).get()
+ .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::__destroy(
+ const ContainerID& containerId,
+ const Future<Option<int > >& status)
+{
+ // Now that all processes have exited we can now clean up all isolators.
+ list<Future<Nothing> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->cleanup(containerId));
+ }
+
+ // Wait for all isolators to complete cleanup before continuing.
+ collect(futures)
+ .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::___destroy(
+ const ContainerID& containerId,
+ const Future<Option<int > >& status,
+ const Future<list<Nothing> >& futures)
+{
+ // Something has gone wrong with one of the Isolators and cleanup failed.
+ // We'll fail the container termination and remove the 'destroying' flag but
+ // leave all other state. The containerizer is now in a bad state because
+ // at least one isolator has failed to clean up.
+ if (!futures.isReady()) {
+ promises[containerId]->fail(
+ "Failed to clean up isolators when destroying container: " +
+ (futures.isFailed() ? futures.failure() : "discarded future"));
+
+ destroying.erase(containerId);
+
+ return;
+ }
+
+ // A container is 'killed' if any isolator limited it.
+ // Note: We may not see a limitation in time for it to be registered. This
+ // could occur if the limitation (e.g., an OOM) killed the executor and we
+ // triggered destroy() off the executor exit.
+ bool killed = false;
+ string message;
+ if (limitations.contains(containerId)) {
+ killed = true;
+ foreach (const Limitation& limitation, limitations.get(containerId)) {
+ message += limitation.message;
+ }
+ message = strings::trim(message);
+ } else {
+ message = "Executor terminated";
+ }
+
+ containerizer::Termination termination;
+ termination.set_killed(killed);
+ termination.set_message(message);
+ if (status.isReady() && status.get().isSome()) {
+ termination.set_status(status.get().get());
+ }
+
+ promises[containerId]->set(termination);
+
+ promises.erase(containerId);
+ statuses.erase(containerId);
+ limitations.erase(containerId);
+ resources.erase(containerId);
+ destroying.erase(containerId);
+}
+
+
+void MesosContainerizerProcess::reaped(const ContainerID& containerId)
+{
+ if (!promises.contains(containerId)) {
+ return;
+ }
+
+ LOG(INFO) << "Executor for container '" << containerId << "' has exited";
+
+ // The executor has exited so destroy the container.
+ destroy(containerId);
+}
+
+
+void MesosContainerizerProcess::limited(
+ const ContainerID& containerId,
+ const Future<Limitation>& future)
+{
+ if (!promises.contains(containerId)) {
+ return;
+ }
+
+ if (future.isReady()) {
+ LOG(INFO) << "Container " << containerId << " has reached its limit for"
+ << " resource " << future.get().resource
+ << " and will be terminated";
+ limitations.put(containerId, future.get());
+ } else {
+ // TODO(idownes): A discarded future will not be an error when isolators
+ // discard their promises after cleanup.
+ LOG(ERROR) << "Error in a resource limitation for container "
+ << containerId << ": " << (future.isFailed() ? future.failure()
+ : "discarded");
+ }
+
+ // The container has been affected by the limitation so destroy it.
+ destroy(containerId);
+}
+
+
+Future<hashset<ContainerID> > MesosContainerizerProcess::containers()
+{
+ return promises.keys();
+}
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
new file mode 100644
index 0000000..8746968
--- /dev/null
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_CONTAINERIZER_HPP__
+#define __MESOS_CONTAINERIZER_HPP__
+
+#include <list>
+#include <vector>
+
+#include <stout/hashmap.hpp>
+#include <stout/multihashmap.hpp>
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declaration.
+class MesosContainerizerProcess;
+
+class MesosContainerizer : public Containerizer
+{
+public:
+ static Try<MesosContainerizer*> create(const Flags& flags, bool local);
+
+ MesosContainerizer(
+ const Flags& flags,
+ bool local,
+ const process::Owned<Launcher>& launcher,
+ const std::vector<process::Owned<Isolator> >& isolators);
+
+ virtual ~MesosContainerizer();
+
+ virtual process::Future<Nothing> recover(
+ const Option<state::SlaveState>& state);
+
+ virtual process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ virtual process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ virtual process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ virtual process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ virtual process::Future<containerizer::Termination> wait(
+ const ContainerID& containerId);
+
+ virtual void destroy(const ContainerID& containerId);
+
+ virtual process::Future<hashset<ContainerID> > containers();
+
+private:
+ MesosContainerizerProcess* process;
+};
+
+
+class MesosContainerizerProcess
+ : public process::Process<MesosContainerizerProcess>
+{
+public:
+ MesosContainerizerProcess(
+ const Flags& _flags,
+ bool _local,
+ const process::Owned<Launcher>& _launcher,
+ const std::vector<process::Owned<Isolator> >& _isolators)
+ : flags(_flags),
+ local(_local),
+ launcher(_launcher),
+ isolators(_isolators) {}
+
+ virtual ~MesosContainerizerProcess() {}
+
+ process::Future<Nothing> recover(
+ const Option<state::SlaveState>& state);
+
+ process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ process::Future<containerizer::Termination> wait(
+ const ContainerID& containerId);
+
+ void destroy(const ContainerID& containerId);
+
+ process::Future<hashset<ContainerID> > containers();
+
+private:
+ process::Future<Nothing> _recover(
+ const std::list<state::RunState>& recoverable);
+
+ process::Future<Nothing> __recover(
+ const std::list<state::RunState>& recovered);
+
+ process::Future<std::list<Option<CommandInfo> > > prepare(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user);
+
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user);
+
+ process::Future<Nothing> _launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint,
+ const std::list<Option<CommandInfo> >& scripts);
+
+ process::Future<Nothing> isolate(
+ const ContainerID& containerId,
+ pid_t _pid);
+
+ process::Future<Nothing> exec(
+ const ContainerID& containerId,
+ int pipeWrite);
+
+ // Continues 'destroy()' once all processes have been killed by the launcher.
+ void _destroy(
+ const ContainerID& containerId,
+ const process::Future<Nothing>& future);
+
+ // Continues '_destroy()' once we get the exit status of the executor.
+ void __destroy(
+ const ContainerID& containerId,
+ const process::Future<Option<int > >& status);
+
+ // Continues (and completes) '__destroy()' once all isolators have completed
+ // cleanup.
+ void ___destroy(
+ const ContainerID& containerId,
+ const process::Future<Option<int > >& status,
+ const process::Future<std::list<Nothing> >& futures);
+
+ // Call back for when an isolator limits a container and impacts the
+ // processes. This will trigger container destruction.
+ void limited(
+ const ContainerID& containerId,
+ const process::Future<Limitation>& future);
+
+ // Call back for when the executor exits. This will trigger container
+ // destroy.
+ void reaped(const ContainerID& containerId);
+
+ const Flags flags;
+ const bool local;
+ const process::Owned<Launcher> launcher;
+ const std::vector<process::Owned<Isolator> > isolators;
+
+ // TODO(idownes): Consider putting these per-container variables into a
+ // struct.
+ // Promises for futures returned from wait().
+ hashmap<ContainerID,
+ process::Owned<process::Promise<containerizer::Termination> > > promises;
+
+ // We need to keep track of the future exit status for each executor because
+ // we'll only get a single notification when the executor exits.
+ hashmap<ContainerID, process::Future<Option<int> > > statuses;
+
+ // We keep track of any limitations received from each isolator so we can
+ // determine the cause of an executor termination.
+ multihashmap<ContainerID, Limitation> limitations;
+
+ // We keep track of the resources for each container so we can set the
+ // ResourceStatistics limits in usage().
+ hashmap<ContainerID, Resources> resources;
+
+ // Set of containers that are in process of being destroyed.
+ hashset<ContainerID> destroying;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_CONTAINERIZER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/launch.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/launch.cpp b/src/slave/containerizer/mesos/launch.cpp
new file mode 100644
index 0000000..2db1c7a
--- /dev/null
+++ b/src/slave/containerizer/mesos/launch.cpp
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+
+#include <iostream>
+#include <map>
+
+#include <stout/foreach.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/unreachable.hpp>
+
+#include <stout/os/execenv.hpp>
+
+#include "mesos/mesos.hpp"
+
+#include "slave/containerizer/mesos/launch.hpp"
+
+using std::cerr;
+using std::endl;
+using std::map;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+const string MesosContainerizerLaunch::NAME = "launch";
+
+
+MesosContainerizerLaunch::Flags::Flags()
+{
+ add(&command,
+ "command",
+ "The command to execute.");
+
+ add(&directory,
+ "directory",
+ "The directory to chdir to.");
+
+ add(&user,
+ "user",
+ "The user to change to.");
+
+ add(&pipe_read,
+ "pipe_read",
+ "The read end of the control pipe.");
+
+ add(&pipe_write,
+ "pipe_write",
+ "The write end of the control pipe.");
+
+ add(&commands,
+ "commands",
+ "The additional preparation commands to execute before\n"
+ "executing the command.");
+}
+
+
+int MesosContainerizerLaunch::execute()
+{
+ // Check command line flags.
+ if (flags.command.isNone()) {
+ cerr << "Flag --command is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.directory.isNone()) {
+ cerr << "Flag --directory is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.pipe_read.isNone()) {
+ cerr << "Flag --pipe_read is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.pipe_write.isNone()) {
+ cerr << "Flag --pipe_write is not specified" << endl;
+ return 1;
+ }
+
+ // Parse the command.
+ Try<CommandInfo> command =
+ ::protobuf::parse<CommandInfo>(flags.command.get());
+
+ if (command.isError()) {
+ cerr << "Failed to parse the command: " << command.error() << endl;
+ return 1;
+ }
+
+ Try<Nothing> close = os::close(flags.pipe_write.get());
+ if (close.isError()) {
+ cerr << "Failed to close pipe[1]: " << close.error() << endl;
+ return 1;
+ }
+
+ // Do a blocking read on the pipe until the parent signals us to continue.
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(
+ flags.pipe_read.get(),
+ &dummy,
+ sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
+ // There's a reasonable probability this will occur during slave
+ // restarts across a large/busy cluster.
+ cerr << "Failed to synchronize with slave (it's probably exited)" << endl;
+ return 1;
+ }
+
+ close = os::close(flags.pipe_read.get());
+ if (close.isError()) {
+ cerr << "Failed to close pipe[0]: " << close.error() << endl;
+ return 1;
+ }
+
+ // Run additional preparation commands. These are run as the same
+ // user and with the environment as the slave.
+ if (flags.commands.isSome()) {
+ // TODO(jieyu): Use JSON::Array if we have generic parse support.
+ JSON::Object object = flags.commands.get();
+ if (object.values.count("commands") == 0) {
+ cerr << "Invalid JSON format for flag --commands" << endl;
+ return 1;
+ }
+
+ if (!object.values["commands"].is<JSON::Array>()) {
+ cerr << "Invalid JSON format for flag --commands" << endl;
+ return 1;
+ }
+
+ JSON::Array array = object.values["commands"].as<JSON::Array>();
+ foreach (const JSON::Value& value, array.values) {
+ if (!value.is<JSON::Object>()) {
+ cerr << "Invalid JSON format for flag --commands" << endl;
+ return 1;
+ }
+
+ Try<CommandInfo> parse = ::protobuf::parse<CommandInfo>(value);
+ if (parse.isError()) {
+ cerr << "Failed to parse a preparation command: "
+ << parse.error() << endl;
+ return 1;
+ }
+
+ // Block until the command completes.
+ int status = os::system(parse.get().value());
+ if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
+ cerr << "Failed to execute a preparation command" << endl;
+ return 1;
+ }
+ }
+ }
+
+ // Enter working directory.
+ if (!os::chdir(flags.directory.get())) {
+ cerr << "Failed to chdir into work directory "
+ << flags.directory.get() << endl;
+ return 1;
+ }
+
+ // Change user if provided. Note that we do that after executing the
+ // preparation commands so that those commands will be run with the
+ // same privilege as the mesos-slave.
+ if (flags.user.isSome() && !os::su(flags.user.get())) {
+ cerr << "Failed to change user to " << flags.user.get() << endl;
+ return 1;
+ }
+
+ // Relay the environment variables.
+ // TODO(jieyu): Consider using a clean environment.
+ map<string, string> env;
+ os::ExecEnv envp(env);
+
+ // Execute the command (via '/bin/sh -c command') with its environment.
+ execle(
+ "/bin/sh",
+ "sh",
+ "-c",
+ command.get().value().c_str(),
+ (char*) NULL,
+ envp());
+
+ // If we get here, the execle call failed.
+ cerr << "Failed to execute command" << endl;
+
+ return UNREACHABLE();
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/launch.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/launch.hpp b/src/slave/containerizer/mesos/launch.hpp
new file mode 100644
index 0000000..7c8b535
--- /dev/null
+++ b/src/slave/containerizer/mesos/launch.hpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_CONTAINERIZER_LAUNCH_HPP__
+#define __MESOS_CONTAINERIZER_LAUNCH_HPP__
+
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+#include <stout/subcommand.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class MesosContainerizerLaunch : public Subcommand
+{
+public:
+ static const std::string NAME;
+
+ struct Flags : public flags::FlagsBase
+ {
+ Flags();
+
+ Option<JSON::Object> command;
+ Option<std::string> directory;
+ Option<std::string> user;
+ Option<int> pipe_read;
+ Option<int> pipe_write;
+ Option<JSON::Object> commands; // Additional preparation commands.
+ };
+
+ MesosContainerizerLaunch() : Subcommand(NAME) {}
+
+ Flags flags;
+
+protected:
+ virtual int execute();
+ virtual flags::FlagsBase* getFlags() { return &flags; }
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_CONTAINERIZER_LAUNCH_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/main.cpp b/src/slave/containerizer/mesos/main.cpp
new file mode 100644
index 0000000..0e17931
--- /dev/null
+++ b/src/slave/containerizer/mesos/main.cpp
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/none.hpp>
+#include <stout/subcommand.hpp>
+
+#include "slave/containerizer/mesos/launch.hpp"
+
+using namespace mesos::internal::slave;
+
+
+int main(int argc, char** argv)
+{
+ return Subcommand::dispatch(
+ None(),
+ argc,
+ argv,
+ new MesosContainerizerLaunch());
+}