You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/25 23:37:43 UTC
[1/7] git commit: Refactored Subprocess to support execve style
launch and customized clone function.
Repository: mesos
Updated Branches:
refs/heads/master b2d13bc3b -> d0046dca7
Refactored Subprocess to support execve style launch and customized
clone function.
Review: https://reviews.apache.org/r/22831
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1b0fdf01
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1b0fdf01
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1b0fdf01
Branch: refs/heads/master
Commit: 1b0fdf01b47591ebd966eba3e0df360caa40888e
Parents: b2d13bc
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jun 20 12:28:02 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:31:29 2014 -0700
----------------------------------------------------------------------
.../libprocess/include/process/subprocess.hpp | 101 ++++++++--
3rdparty/libprocess/src/subprocess.cpp | 201 ++++++++++++-------
.../libprocess/src/tests/subprocess_tests.cpp | 19 +-
3 files changed, 223 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/include/process/subprocess.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/subprocess.hpp b/3rdparty/libprocess/include/process/subprocess.hpp
index d6e2c1f..7ff5a10 100644
--- a/3rdparty/libprocess/include/process/subprocess.hpp
+++ b/3rdparty/libprocess/include/process/subprocess.hpp
@@ -7,6 +7,7 @@
#include <map>
#include <string>
+#include <vector>
#include <process/future.hpp>
@@ -38,17 +39,25 @@ public:
// 3. FD: Redirect to an open file descriptor.
class IO
{
+ public:
+ bool isPipe() const { return mode == PIPE; }
+ bool isPath() const { return mode == PATH; }
+ bool isFd() const { return mode == FD; }
+
private:
friend class Subprocess;
friend Try<Subprocess> subprocess(
- const std::string& command,
- const IO& in,
- const IO& out,
- const IO& err,
+ const std::string& path,
+ std::vector<std::string> argv,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
const Option<flags::FlagsBase>& flags,
const Option<std::map<std::string, std::string> >& environment,
- const Option<lambda::function<int()> >& setup);
+ const Option<lambda::function<int()> >& setup,
+ const Option<lambda::function<
+ pid_t(const lambda::function<int()>&)> >& clone);
enum Mode
{
@@ -95,13 +104,16 @@ public:
private:
friend Try<Subprocess> subprocess(
- const std::string& command,
- const IO& in,
- const IO& out,
- const IO& err,
+ const std::string& path,
+ std::vector<std::string> argv,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
const Option<flags::FlagsBase>& flags,
const Option<std::map<std::string, std::string> >& environment,
- const Option<lambda::function<int()> >& setup);
+ const Option<lambda::function<int()> >& setup,
+ const Option<lambda::function<
+ pid_t(const lambda::function<int()>&)> >& clone);
struct Data
{
@@ -141,29 +153,86 @@ private:
// must not contain any async unsafe code.
// TODO(dhamon): Add an option to not combine the two environments.
Try<Subprocess> subprocess(
- const std::string& command,
+ const std::string& path,
+ std::vector<std::string> argv,
const Subprocess::IO& in,
const Subprocess::IO& out,
const Subprocess::IO& err,
const Option<flags::FlagsBase>& flags = None(),
const Option<std::map<std::string, std::string> >& environment = None(),
- const Option<lambda::function<int()> >& setup = None());
+ const Option<lambda::function<int()> >& setup = None(),
+ const Option<lambda::function<
+ pid_t(const lambda::function<int()>&)> >& clone = None());
inline Try<Subprocess> subprocess(
- const std::string& command,
+ const std::string& path,
+ std::vector<std::string> argv,
const Option<flags::FlagsBase>& flags = None(),
const Option<std::map<std::string, std::string> >& environment = None(),
- const Option<lambda::function<int()> >& setup = None())
+ const Option<lambda::function<int()> >& setup = None(),
+ const Option<lambda::function<
+ pid_t(const lambda::function<int()>&)> >& clone = None())
{
return subprocess(
- command,
+ path,
+ argv,
Subprocess::FD(STDIN_FILENO),
Subprocess::FD(STDOUT_FILENO),
Subprocess::FD(STDERR_FILENO),
flags,
environment,
- setup);
+ setup,
+ clone);
+}
+
+
+// Overloads for launching a shell command. Currently, we do not
+// support flags for shell command variants due to the complexity
+// involved in escaping quotes in flags.
+inline Try<Subprocess> subprocess(
+ const std::string& command,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
+ const Option<std::map<std::string, std::string> >& environment = None(),
+ const Option<lambda::function<int()> >& setup = None(),
+ const Option<lambda::function<
+ pid_t(const lambda::function<int()>&)> >& clone = None())
+{
+ std::vector<std::string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = command;
+
+ return subprocess(
+ "/bin/sh",
+ argv,
+ in,
+ out,
+ err,
+ None(),
+ environment,
+ setup,
+ clone);
+}
+
+
+inline Try<Subprocess> subprocess(
+ const std::string& command,
+ const Option<std::map<std::string, std::string> >& environment = None(),
+ const Option<lambda::function<int()> >& setup = None(),
+ const Option<lambda::function<
+ pid_t(const lambda::function<int()>&)> >& clone = None())
+{
+ return subprocess(
+ command,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ environment,
+ setup,
+ clone);
}
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/src/subprocess.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/subprocess.cpp b/3rdparty/libprocess/src/subprocess.cpp
index 6ac7b5b..68bfd5d 100644
--- a/3rdparty/libprocess/src/subprocess.cpp
+++ b/3rdparty/libprocess/src/subprocess.cpp
@@ -24,6 +24,7 @@
using std::map;
using std::string;
+using std::vector;
namespace process {
namespace internal {
@@ -85,15 +86,99 @@ static Try<Nothing> cloexec(int stdinFd[2], int stdoutFd[2], int stderrFd[2])
} // namespace internal {
-// Runs the provided command in a subprocess.
+static pid_t defaultClone(const lambda::function<int()>& func)
+{
+ pid_t pid = ::fork();
+ if (pid == -1) {
+ return -1;
+ } else if (pid == 0) {
+ // Child.
+ ::exit(func());
+ return UNREACHABLE();
+ } else {
+ // Parent.
+ return pid;
+ }
+}
+
+
+// The main entry of the child process. Note that this function has to
+// be async singal safe.
+static int childMain(
+ const string& path,
+ char** argv,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
+ os::ExecEnv* envp,
+ const Option<lambda::function<int()> >& setup,
+ int stdinFd[2],
+ int stdoutFd[2],
+ int stderrFd[2])
+{
+ // Close parent's end of the pipes.
+ if (in.isPipe()) {
+ while (::close(stdinFd[1]) == -1 && errno == EINTR);
+ }
+ if (out.isPipe()) {
+ while (::close(stdoutFd[0]) == -1 && errno == EINTR);
+ }
+ if (err.isPipe()) {
+ while (::close(stderrFd[0]) == -1 && errno == EINTR);
+ }
+
+ // Redirect I/O for stdin/stdout/stderr.
+ while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
+ while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
+ while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR);
+
+ // Close the copies. We need to make sure that we do not close the
+ // file descriptor assigned to stdin/stdout/stderr in case the
+ // parent has closed stdin/stdout/stderr when calling this
+ // function (in that case, a dup'ed file descriptor may have the
+ // same file descriptor number as stdin/stdout/stderr).
+ if (stdinFd[0] != STDIN_FILENO &&
+ stdinFd[0] != STDOUT_FILENO &&
+ stdinFd[0] != STDERR_FILENO) {
+ while (::close(stdinFd[0]) == -1 && errno == EINTR);
+ }
+ if (stdoutFd[1] != STDIN_FILENO &&
+ stdoutFd[1] != STDOUT_FILENO &&
+ stdoutFd[1] != STDERR_FILENO) {
+ while (::close(stdoutFd[1]) == -1 && errno == EINTR);
+ }
+ if (stderrFd[1] != STDIN_FILENO &&
+ stderrFd[1] != STDOUT_FILENO &&
+ stderrFd[1] != STDERR_FILENO) {
+ while (::close(stderrFd[1]) == -1 && errno == EINTR);
+ }
+
+ if (setup.isSome()) {
+ int status = setup.get()();
+ if (status != 0) {
+ _exit(status);
+ }
+ }
+
+ execve(path.c_str(), argv, (*envp)());
+
+ ABORT("Failed to execve in childMain\n");
+
+ return UNREACHABLE();
+}
+
+
Try<Subprocess> subprocess(
- const string& _command,
+ const string& path,
+ vector<string> argv,
const Subprocess::IO& in,
const Subprocess::IO& out,
const Subprocess::IO& err,
const Option<flags::FlagsBase>& flags,
const Option<map<string, string> >& environment,
- const Option<lambda::function<int()> >& setup)
+ const Option<lambda::function<int()> >& setup,
+ const Option<lambda::function<
+ pid_t(const lambda::function<int()>&)> >& _clone)
{
// File descriptors for redirecting stdin/stdout/stderr. These file
// descriptors are used for different purposes depending on the
@@ -116,7 +201,7 @@ Try<Subprocess> subprocess(
break;
}
case Subprocess::IO::PIPE: {
- if (pipe(stdinFd) == -1) {
+ if (::pipe(stdinFd) == -1) {
return ErrnoError("Failed to create pipe");
}
break;
@@ -147,7 +232,7 @@ Try<Subprocess> subprocess(
break;
}
case Subprocess::IO::PIPE: {
- if (pipe(stdoutFd) == -1) {
+ if (::pipe(stdoutFd) == -1) {
// Save the errno as 'close' below might overwrite it.
ErrnoError error("Failed to create pipe");
internal::close(stdinFd, stdoutFd, stderrFd);
@@ -186,7 +271,7 @@ Try<Subprocess> subprocess(
break;
}
case Subprocess::IO::PIPE: {
- if (pipe(stderrFd) == -1) {
+ if (::pipe(stderrFd) == -1) {
// Save the errno as 'close' below might overwrite it.
ErrnoError error("Failed to create pipe");
internal::close(stdinFd, stdoutFd, stderrFd);
@@ -219,95 +304,65 @@ Try<Subprocess> subprocess(
return Error("Failed to cloexec: " + cloexec.error());
}
- // Prepare the command to execute. If the user specifies the
- // 'flags', we will stringify it and append it to the command.
- string command = _command;
-
+ // Prepare the arguments. If the user specifies the 'flags', we will
+ // stringify them and append them to the existing arguments.
if (flags.isSome()) {
foreachpair (const string& name, const flags::Flag& flag, flags.get()) {
Option<string> value = flag.stringify(flags.get());
if (value.isSome()) {
- // TODO(jieyu): Need a better way to escape quotes. For
- // example, what if 'value.get()' contains a single quote?
- string argument = "--" + name + "='" + value.get() + "'";
- command = strings::join(" ", command, argument);
+ argv.push_back("--" + name + "=" + value.get());
}
}
}
- // We need to do this construction before doing the fork as it
+ // The real arguments that will be passed to 'execve'. We need to
+ // construct them here before doing the clone as it might not be
+ // async signal safe.
+ char** _argv = new char*[argv.size() + 1];
+ for (int i = 0; i < argv.size(); i++) {
+ _argv[i] = (char*) argv[i].c_str();
+ }
+ _argv[argv.size()] = NULL;
+
+ // We need to do this construction before doing the clone as it
// might not be async-safe.
// TODO(tillt): Consider optimizing this to not pass an empty map
// into the constructor or even further to use execl instead of
// execle once we have no user supplied environment.
os::ExecEnv envp(environment.get(map<string, string>()));
- pid_t pid;
- if ((pid = fork()) == -1) {
+ // Determine the function to clone the child process. If the user
+ // does not specify the clone function, we will use the default.
+ lambda::function<pid_t(const lambda::function<int()>&)> clone =
+ (_clone.isSome() ? _clone.get() : defaultClone);
+
+ // Now, clone the child process.
+ pid_t pid = clone(lambda::bind(
+ &childMain,
+ path,
+ _argv,
+ in,
+ out,
+ err,
+ &envp,
+ setup,
+ stdinFd,
+ stdoutFd,
+ stderrFd));
+
+ delete[] _argv;
+
+ if (pid == -1) {
// Save the errno as 'close' below might overwrite it.
- ErrnoError error("Failed to fork");
+ ErrnoError error("Failed to clone");
internal::close(stdinFd, stdoutFd, stderrFd);
return error;
}
+ // Parent.
Subprocess process;
process.data->pid = pid;
- if (process.data->pid == 0) {
- // Child.
- // Close parent's end of the pipes.
- if (in.mode == Subprocess::IO::PIPE) {
- while (::close(stdinFd[1]) == -1 && errno == EINTR);
- }
- if (out.mode == Subprocess::IO::PIPE) {
- while (::close(stdoutFd[0]) == -1 && errno == EINTR);
- }
- if (err.mode == Subprocess::IO::PIPE) {
- while (::close(stderrFd[0]) == -1 && errno == EINTR);
- }
-
- // Redirect I/O for stdin/stdout/stderr.
- while (::dup2(stdinFd[0], STDIN_FILENO) == -1 && errno == EINTR);
- while (::dup2(stdoutFd[1], STDOUT_FILENO) == -1 && errno == EINTR);
- while (::dup2(stderrFd[1], STDERR_FILENO) == -1 && errno == EINTR);
-
- // Close the copies. We need to make sure that we do not close the
- // file descriptor assigned to stdin/stdout/stderr in case the
- // parent has closed stdin/stdout/stderr when calling this
- // function (in that case, a dup'ed file descriptor may have the
- // same file descriptor number as stdin/stdout/stderr).
- if (stdinFd[0] != STDIN_FILENO &&
- stdinFd[0] != STDOUT_FILENO &&
- stdinFd[0] != STDERR_FILENO) {
- while (::close(stdinFd[0]) == -1 && errno == EINTR);
- }
- if (stdoutFd[1] != STDIN_FILENO &&
- stdoutFd[1] != STDOUT_FILENO &&
- stdoutFd[1] != STDERR_FILENO) {
- while (::close(stdoutFd[1]) == -1 && errno == EINTR);
- }
- if (stderrFd[1] != STDIN_FILENO &&
- stderrFd[1] != STDOUT_FILENO &&
- stderrFd[1] != STDERR_FILENO) {
- while (::close(stderrFd[1]) == -1 && errno == EINTR);
- }
-
- if (setup.isSome()) {
- int status = setup.get()();
- if (status != 0) {
- _exit(status);
- }
- }
-
- // TODO(jieyu): Consider providing an optional way to launch the
- // subprocess without using the shell (similar to 'shell=False'
- // used in python subprocess.Popen).
- execle("/bin/sh", "sh", "-c", command.c_str(), (char*) NULL, envp());
-
- ABORT("Failed to execle '/bin/sh -c ", command.c_str(), "'\n");
- }
-
- // Parent.
// Close the file descriptors that are created by this function. For
// pipes, we close the child ends and store the parent ends (see the
// code below).
http://git-wip-us.apache.org/repos/asf/mesos/blob/1b0fdf01/3rdparty/libprocess/src/tests/subprocess_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/subprocess_tests.cpp b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
index 7dfa384..98a4e44 100644
--- a/3rdparty/libprocess/src/tests/subprocess_tests.cpp
+++ b/3rdparty/libprocess/src/tests/subprocess_tests.cpp
@@ -509,6 +509,8 @@ struct Flags : public flags::FlagsBase
add(&b, "b", "bool");
add(&i, "i", "int");
add(&s, "s", "string");
+ add(&s2, "s2", "string with single quote");
+ add(&s3, "s3", "string with double quote");
add(&d, "d", "Duration");
add(&y, "y", "Bytes");
add(&j, "j", "JSON::Object");
@@ -517,6 +519,8 @@ struct Flags : public flags::FlagsBase
Option<bool> b;
Option<int> i;
Option<string> s;
+ Option<string> s2;
+ Option<string> s3;
Option<Duration> d;
Option<Bytes> y;
Option<JSON::Object> j;
@@ -531,6 +535,8 @@ TEST_F(SubprocessTest, Flags)
flags.b = true;
flags.i = 42;
flags.s = "hello";
+ flags.s2 = "we're";
+ flags.s3 = "\"geek\"";
flags.d = Seconds(10);
flags.y = Bytes(100);
@@ -555,7 +561,8 @@ TEST_F(SubprocessTest, Flags)
string out = path::join(os::getcwd(), "stdout");
Try<Subprocess> s = subprocess(
- "echo",
+ "/bin/echo",
+ vector<string>(1, "echo"),
Subprocess::PIPE(),
Subprocess::PATH(out),
Subprocess::PIPE(),
@@ -597,6 +604,8 @@ TEST_F(SubprocessTest, Flags)
EXPECT_EQ(flags.b, flags2.b);
EXPECT_EQ(flags.i, flags2.i);
EXPECT_EQ(flags.s, flags2.s);
+ EXPECT_EQ(flags.s2, flags2.s2);
+ EXPECT_EQ(flags.s3, flags2.s3);
EXPECT_EQ(flags.d, flags2.d);
EXPECT_EQ(flags.y, flags2.y);
EXPECT_EQ(flags.j, flags2.j);
@@ -623,7 +632,6 @@ TEST_F(SubprocessTest, Environment)
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
- None(),
environment);
ASSERT_SOME(s);
@@ -654,7 +662,6 @@ TEST_F(SubprocessTest, Environment)
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
- None(),
environment);
ASSERT_SOME(s);
@@ -692,7 +699,6 @@ TEST_F(SubprocessTest, EnvironmentWithSpaces)
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
- None(),
environment);
ASSERT_SOME(s);
@@ -730,7 +736,6 @@ TEST_F(SubprocessTest, EnvironmentWithSpacesAndQuotes)
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
- None(),
environment);
ASSERT_SOME(s);
@@ -770,7 +775,6 @@ TEST_F(SubprocessTest, EnvironmentOverride)
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
- None(),
environment);
ASSERT_SOME(s);
@@ -820,7 +824,6 @@ TEST_F(SubprocessTest, Setup)
Subprocess::PIPE(),
Subprocess::PIPE(),
None(),
- None(),
lambda::bind(&setupChdir, directory.get()));
ASSERT_SOME(s);
@@ -862,7 +865,6 @@ TEST_F(SubprocessTest, SetupStatus)
Subprocess::PIPE(),
Subprocess::PIPE(),
None(),
- None(),
lambda::bind(&setupStatus, 1));
ASSERT_SOME(s);
@@ -889,7 +891,6 @@ TEST_F(SubprocessTest, SetupStatus)
Subprocess::PIPE(),
Subprocess::PIPE(),
None(),
- None(),
lambda::bind(&setupStatus, 0));
ASSERT_SOME(s);
[6/7] Refactored the mesos containerizer launcher to fix MESOS-1404.
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
deleted file mode 100644
index 8a109f4..0000000
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ /dev/null
@@ -1,1174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sstream>
-
-#include <process/collect.hpp>
-#include <process/defer.hpp>
-#include <process/io.hpp>
-#include <process/reap.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/fatal.hpp>
-#include <stout/os.hpp>
-#include <stout/unreachable.hpp>
-
-#include <stout/os/execenv.hpp>
-
-#include "slave/paths.hpp"
-#include "slave/slave.hpp"
-
-#ifdef __linux__
-#include "slave/containerizer/linux_launcher.hpp"
-#endif // __linux__
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/isolator.hpp"
-#include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
-
-#include "slave/containerizer/isolators/posix.hpp"
-#ifdef __linux__
-#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
-#include "slave/containerizer/isolators/cgroups/mem.hpp"
-#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
-#endif // __linux__
-
-using std::list;
-using std::map;
-using std::string;
-using std::vector;
-
-using namespace process;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-using state::SlaveState;
-using state::FrameworkState;
-using state::ExecutorState;
-using state::RunState;
-
-// Local function declaration/definitions.
-Future<Nothing> _nothing() { return Nothing(); }
-
-
-// Helper method to build the environment map used to launch fetcher.
-map<string, string> fetcherEnvironment(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags)
-{
- // Prepare the environment variables to pass to mesos-fetcher.
- string uris = "";
- foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
- uris += uri.value() + "+" +
- (uri.has_executable() && uri.executable() ? "1" : "0") +
- (uri.extract() ? "X" : "N");
- uris += " ";
- }
- // Remove extra space at the end.
- uris = strings::trim(uris);
-
- map<string, string> environment;
- environment["MESOS_EXECUTOR_URIS"] = uris;
- environment["MESOS_WORK_DIRECTORY"] = directory;
- if (user.isSome()) {
- environment["MESOS_USER"] = user.get();
- }
- if (!flags.frameworks_home.empty()) {
- environment["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
- }
- if (!flags.hadoop_home.empty()) {
- environment["HADOOP_HOME"] = flags.hadoop_home;
- }
-
- return environment;
-}
-
-
-Try<MesosContainerizer*> MesosContainerizer::create(
- const Flags& flags,
- bool local)
-{
- string isolation;
- if (flags.isolation == "process") {
- LOG(WARNING) << "The 'process' isolation flag is deprecated, "
- << "please update your flags to"
- << " '--isolation=posix/cpu,posix/mem'.";
- isolation = "posix/cpu,posix/mem";
- } else if (flags.isolation == "cgroups") {
- LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
- << "please update your flags to"
- << " '--isolation=cgroups/cpu,cgroups/mem'.";
- isolation = "cgroups/cpu,cgroups/mem";
- } else {
- isolation = flags.isolation;
- }
-
- LOG(INFO) << "Using isolation: " << isolation;
-
- // Create a MesosContainerizerProcess using isolators and a launcher.
- hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
-
- creators["posix/cpu"] = &PosixCpuIsolatorProcess::create;
- creators["posix/mem"] = &PosixMemIsolatorProcess::create;
-#ifdef __linux__
- creators["cgroups/cpu"] = &CgroupsCpushareIsolatorProcess::create;
- creators["cgroups/mem"] = &CgroupsMemIsolatorProcess::create;
- creators["cgroups/perf_event"] = &CgroupsPerfEventIsolatorProcess::create;
-#endif // __linux__
-
- vector<Owned<Isolator> > isolators;
-
- foreach (const string& type, strings::split(isolation, ",")) {
- if (creators.contains(type)) {
- Try<Isolator*> isolator = creators[type](flags);
- if (isolator.isError()) {
- return Error(
- "Could not create isolator " + type + ": " + isolator.error());
- } else {
- isolators.push_back(Owned<Isolator>(isolator.get()));
- }
- } else {
- return Error("Unknown or unsupported isolator: " + type);
- }
- }
-
-#ifdef __linux__
- // Use cgroups on Linux if any cgroups isolators are used.
- Try<Launcher*> launcher = strings::contains(isolation, "cgroups")
- ? LinuxLauncher::create(flags)
- : PosixLauncher::create(flags);
-#else
- Try<Launcher*> launcher = PosixLauncher::create(flags);
-#endif // __linux__
-
- if (launcher.isError()) {
- return Error("Failed to create launcher: " + launcher.error());
- }
-
- return new MesosContainerizer(
- flags, local, Owned<Launcher>(launcher.get()), isolators);
-}
-
-
-MesosContainerizer::MesosContainerizer(
- const Flags& flags,
- bool local,
- const Owned<Launcher>& launcher,
- const vector<Owned<Isolator> >& isolators)
-{
- process = new MesosContainerizerProcess(
- flags, local, launcher, isolators);
- spawn(process);
-}
-
-
-MesosContainerizer::~MesosContainerizer()
-{
- terminate(process);
- process::wait(process);
- delete process;
-}
-
-
-Future<Nothing> MesosContainerizer::recover(
- const Option<state::SlaveState>& state)
-{
- return dispatch(process, &MesosContainerizerProcess::recover, state);
-}
-
-
-Future<Nothing> MesosContainerizer::launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
-{
- return dispatch(process,
- &MesosContainerizerProcess::launch,
- containerId,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint);
-}
-
-
-Future<Nothing> MesosContainerizer::launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
-{
- return dispatch(process,
- &MesosContainerizerProcess::launch,
- containerId,
- taskInfo,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint);
-}
-
-
-Future<Nothing> MesosContainerizer::update(
- const ContainerID& containerId,
- const Resources& resources)
-{
- return dispatch(process,
- &MesosContainerizerProcess::update,
- containerId,
- resources);
-}
-
-
-Future<ResourceStatistics> MesosContainerizer::usage(
- const ContainerID& containerId)
-{
- return dispatch(process, &MesosContainerizerProcess::usage, containerId);
-}
-
-
-Future<containerizer::Termination> MesosContainerizer::wait(
- const ContainerID& containerId)
-{
- return dispatch(process, &MesosContainerizerProcess::wait, containerId);
-}
-
-
-void MesosContainerizer::destroy(const ContainerID& containerId)
-{
- dispatch(process, &MesosContainerizerProcess::destroy, containerId);
-}
-
-
-Future<hashset<ContainerID> > MesosContainerizer::containers()
-{
- return dispatch(process, &MesosContainerizerProcess::containers);
-}
-
-
-Future<Nothing> MesosContainerizerProcess::recover(
- const Option<state::SlaveState>& state)
-{
- LOG(INFO) << "Recovering containerizer";
-
- // Gather the executor run states that we will attempt to recover.
- list<RunState> recoverable;
- if (state.isSome()) {
- foreachvalue (const FrameworkState& framework, state.get().frameworks) {
- foreachvalue (const ExecutorState& executor, framework.executors) {
- if (executor.info.isNone()) {
- LOG(WARNING) << "Skipping recovery of executor '" << executor.id
- << "' of framework " << framework.id
- << " because its info could not be recovered";
- continue;
- }
-
- if (executor.latest.isNone()) {
- LOG(WARNING) << "Skipping recovery of executor '" << executor.id
- << "' of framework " << framework.id
- << " because its latest run could not be recovered";
- continue;
- }
-
- // We are only interested in the latest run of the executor!
- const ContainerID& containerId = executor.latest.get();
- Option<RunState> run = executor.runs.get(containerId);
- CHECK_SOME(run);
-
- // We need the pid so the reaper can monitor the executor so skip this
- // executor if it's not present. This is not an error because the slave
- // will try to wait on the container which will return a failed
- // Termination and everything will get cleaned up.
- if (!run.get().forkedPid.isSome()) {
- continue;
- }
-
- if (run.get().completed) {
- VLOG(1) << "Skipping recovery of executor '" << executor.id
- << "' of framework " << framework.id
- << " because its latest run "
- << containerId << " is completed";
- continue;
- }
-
- LOG(INFO) << "Recovering container '" << containerId
- << "' for executor '" << executor.id
- << "' of framework " << framework.id;
-
- recoverable.push_back(run.get());
- }
- }
- }
-
- // Try to recover the launcher first.
- return launcher->recover(recoverable)
- .then(defer(self(), &Self::_recover, recoverable));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::_recover(
- const list<RunState>& recoverable)
-{
- // Then recover the isolators.
- list<Future<Nothing> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->recover(recoverable));
- }
-
- // If all isolators recover then continue.
- return collect(futures)
- .then(defer(self(), &Self::__recover, recoverable));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::__recover(
- const list<RunState>& recovered)
-{
- foreach (const RunState& run, recovered) {
- CHECK_SOME(run.id);
- const ContainerID& containerId = run.id.get();
-
- Owned<Promise<containerizer::Termination> > promise(
- new Promise<containerizer::Termination>());
- promises.put(containerId, promise);
-
- CHECK_SOME(run.forkedPid);
- Future<Option<int > > status = process::reap(run.forkedPid.get());
- statuses[containerId] = status;
- status.onAny(defer(self(), &Self::reaped, containerId));
-
- foreach (const Owned<Isolator>& isolator, isolators) {
- isolator->watch(containerId)
- .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
- }
- }
-
- return Nothing();
-}
-
-
-// This function is executed by the forked child and must remain
-// async-signal-safe.
-int execute(
- const CommandInfo& command,
- const string& directory,
- const os::ExecEnv& envp,
- uid_t uid,
- gid_t gid,
- bool redirectIO,
- int pipeRead,
- int pipeWrite,
- const list<Option<CommandInfo> >& commands)
-{
- if (close(pipeWrite) != 0) {
- ABORT("Failed to close pipe[1]");
- }
-
- // Do a blocking read on the pipe until the parent signals us to continue.
- char dummy;
- ssize_t length;
- while ((length = read(pipeRead, &dummy, sizeof(dummy))) == -1 &&
- errno == EINTR);
-
- if (length != sizeof(dummy)) {
- // This will occur if the slave terminates during executor launch.
- // There's a reasonable probability this will occur during slave restarts
- // across a large/busy cluster so we log and exit non-zero rather than
- // ABORT.
- const char* message =
- "Failed to synchronize with slave (it has probably exited)";
-
- while (write(STDERR_FILENO, message, strlen(message)) == -1 &&
- errno == EINTR);
-
- _exit(1);
- }
-
- if (close(pipeRead) != 0) {
- ABORT("Failed to close pipe[0]");
- }
-
- // Change gid and uid.
- if (setgid(gid) != 0) {
- ABORT("Failed to set gid");
- }
-
- if (setuid(uid) != 0) {
- ABORT("Failed to set uid");
- }
-
- // Enter working directory.
- if (chdir(directory.c_str()) != 0) {
- ABORT("Failed to chdir into work directory");
- }
-
- // Redirect output to files in working dir if required. We append because
- // others (e.g., mesos-fetcher) may have already logged to the files.
- // TODO(bmahler): It would be best if instead of closing stderr /
- // stdout and redirecting, we instead always output to stderr /
- // stdout. Also tee'ing their output into the work directory files
- // when redirection is desired.
- if (redirectIO) {
- int fd;
- while ((fd = open(
- "stdout",
- O_CREAT | O_WRONLY | O_APPEND,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) == -1 &&
- errno == EINTR);
- if (fd == -1) {
- ABORT("Failed to open stdout");
- }
-
- int status;
- while ((status = dup2(fd, STDOUT_FILENO)) == -1 && errno == EINTR);
- if (status == -1) {
- ABORT("Failed to dup2 for stdout");
- }
-
- if (close(fd) == -1) {
- ABORT("Failed to close stdout fd");
- }
-
- while ((fd = open(
- "stderr",
- O_CREAT | O_WRONLY | O_APPEND,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) == -1 &&
- errno == EINTR);
- if (fd == -1) {
- ABORT("Failed to open stderr");
- }
-
- while ((status = dup2(fd, STDERR_FILENO)) == -1 && errno == EINTR);
- if (status == -1) {
- ABORT("Failed to dup2 for stderr");
- }
-
- if (close(fd) == -1) {
- ABORT("Failed to close stderr fd");
- }
- }
-
- // Run additional preparation commands. These are run as the same user and
- // with the environment as the slave.
- // NOTE: os::system() is async-signal-safe.
- foreach (const Option<CommandInfo>& command, commands) {
- if (command.isSome()) {
- // Block until the command completes.
- int status = os::system(command.get().value());
-
- if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
- ABORT("Command '",
- command.get().value().c_str(),
- "' failed to execute successfully");
- }
- }
- }
-
- // Execute the command (via '/bin/sh -c command') with its environment.
- execle("/bin/sh", "sh", "-c", command.value().c_str(), (char*) NULL, envp());
-
- // If we get here, the execv call failed.
- ABORT("Failed to execute command");
-
- // This should not be reached.
- return -1;
-}
-
-
-// Launching an executor involves the following steps:
-// 1. Call prepare on each isolator.
-// 2. Fork the executor. The forked child is blocked from exec'ing until it has
-// been isolated.
-// 3. Isolate the executor. Call isolate with the pid for each isolator.
-// 4. Fetch the executor.
-// 4. Exec the executor. The forked child is signalled to continue. It will
-// first execute any preparation commands from isolators and then exec the
-// executor.
-Future<Nothing> MesosContainerizerProcess::launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
-{
- if (promises.contains(containerId)) {
- LOG(ERROR) << "Cannot start already running container '"
- << containerId << "'";
- return Failure("Container already started");
- }
-
- // TODO(tillt): The slave should expose which containerization
- // mechanisms it supports to avoid scheduling tasks that it cannot
- // run.
- const CommandInfo& command = executorInfo.command();
- if (command.has_container()) {
- // We return a Failure as this containerizer does not support
- // handling ContainerInfo. Users have to be made aware of this
- // lack of support to prevent confusion in the task configuration.
- return Failure("ContainerInfo is not supported");
- }
-
- Owned<Promise<containerizer::Termination> > promise(
- new Promise<containerizer::Termination>());
- promises.put(containerId, promise);
-
- // Store the resources for usage().
- resources.put(containerId, executorInfo.resources());
-
- LOG(INFO) << "Starting container '" << containerId
- << "' for executor '" << executorInfo.executor_id()
- << "' of framework '" << executorInfo.framework_id() << "'";
-
- return prepare(containerId, executorInfo, directory, user)
- .then(defer(self(),
- &Self::_launch,
- containerId,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint,
- lambda::_1))
- .onFailed(defer(self(),
- &Self::destroy,
- containerId));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::launch(
- const ContainerID& containerId,
- const TaskInfo&,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
-{
- return launch(
- containerId,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint);
-}
-
-Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user)
-{
- // Start preparing all isolators (in parallel) and gather any additional
- // preparation comands that must be run in the forked child before exec'ing
- // the executor.
- list<Future<Option<CommandInfo> > > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->prepare(containerId, executorInfo));
- }
-
- // Wait for all isolators to complete preparations.
- return collect(futures);
-}
-
-
-Future<Nothing> _fetch(
- const ContainerID& containerId,
- const string& directory,
- const Option<string>& user,
- const Option<int>& status)
-{
- if (status.isNone() || (status.get() != 0)) {
- return Failure("Failed to fetch URIs for container '" +
- stringify(containerId) + "': exit status " +
- (status.isNone() ? "none" : stringify(status.get())));
- }
-
- // Chown the work directory if a user is provided.
- if (user.isSome()) {
- Try<Nothing> chown = os::chown(user.get(), directory);
- if (chown.isError()) {
- return Failure("Failed to chown work directory");
- }
- }
-
- return Nothing();
-}
-
-
-Future<Nothing> MesosContainerizerProcess::fetch(
- const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const string& directory,
- const Option<string>& user)
-{
- // Determine path for mesos-fetcher.
- Result<string> realpath = os::realpath(
- path::join(flags.launcher_dir, "mesos-fetcher"));
-
- if (!realpath.isSome()) {
- LOG(ERROR) << "Failed to determine the canonical path "
- << "for the mesos-fetcher '"
- << path::join(flags.launcher_dir, "mesos-fetcher")
- << "': "
- << (realpath.isError() ? realpath.error()
- : "No such file or directory");
- return Failure("Could not fetch URIs: failed to find mesos-fetcher");
- }
-
- map<string, string> environment =
- fetcherEnvironment(commandInfo, directory, user, flags);
-
- // Now the actual mesos-fetcher command.
- string command = realpath.get();
-
- LOG(INFO) << "Fetching URIs for container '" << containerId
- << "' using command '" << command << "'";
-
- Try<Subprocess> fetcher = subprocess(
- command,
- Subprocess::PIPE(),
- Subprocess::PIPE(),
- Subprocess::PIPE(),
- environment);
-
- if (fetcher.isError()) {
- return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
- }
-
- // Redirect output (stdout and stderr) from the fetcher to log files
- // in the executor work directory, chown'ing them if a user is
- // specified.
- // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
- // TODO(tillt): Consider adding an overload to io::redirect
- // that accepts a file path as 'to' for further reducing code. We
- // would however also need an owner user parameter for such overload
- // to perfectly replace the below.
- Try<int> out = os::open(
- path::join(directory, "stdout"),
- O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
- if (out.isError()) {
- return Failure("Failed to redirect stdout: " + out.error());
- }
-
- if (user.isSome()) {
- Try<Nothing> chown = os::chown(
- user.get(), path::join(directory, "stdout"));
- if (chown.isError()) {
- os::close(out.get());
- return Failure(
- "Failed to redirect stdout: Failed to chown: " +
- chown.error());
- }
- }
-
- // Redirect takes care of nonblocking and close-on-exec for the
- // supplied file descriptors.
- io::redirect(fetcher.get().out().get(), out.get());
-
- // Redirect does 'dup' the file descriptor, hence we can close the
- // original now.
- os::close(out.get());
-
- // Repeat for stderr.
- Try<int> err = os::open(
- path::join(directory, "stderr"),
- O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
- if (err.isError()) {
- return Failure(
- "Failed to redirect stderr: Failed to open: " +
- err.error());
- }
-
- if (user.isSome()) {
- Try<Nothing> chown = os::chown(
- user.get(), path::join(directory, "stderr"));
- if (chown.isError()) {
- os::close(err.get());
- return Failure(
- "Failed to redirect stderr: Failed to chown: " +
- chown.error());
- }
- }
-
- io::redirect(fetcher.get().err().get(), err.get());
-
- os::close(err.get());
-
- return fetcher.get().status()
- .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::_launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint,
- const list<Option<CommandInfo> >& commands)
-{
- // Prepare environment variables for the executor.
- map<string, string> env = executorEnvironment(
- executorInfo,
- directory,
- slaveId,
- slavePid,
- checkpoint,
- flags.recovery_timeout);
-
- // Include any enviroment variables from CommandInfo.
- foreach (const Environment::Variable& variable,
- executorInfo.command().environment().variables()) {
- env[variable.name()] = variable.value();
- }
-
- // Construct a representation of the environment suitable for
- // passing to execle in the child. We construct it here because it
- // is not async-signal-safe.
- os::ExecEnv envp(env);
-
- // Use a pipe to block the child until it's been isolated.
- int pipes[2];
- // We assume this should not fail under reasonable conditions so we
- // use CHECK.
- CHECK(pipe(pipes) == 0);
-
- // Determine the uid and gid for the child now because getpwnam is
- // not async signal safe.
- Result<uid_t> uid = os::getuid(user);
- if (uid.isError() || uid.isNone()) {
- return Failure("Invalid user: " + (uid.isError() ? uid.error()
- : "nonexistent"));
- }
-
- Result<gid_t> gid = os::getgid(user);
- if (gid.isError() || gid.isNone()) {
- return Failure("Invalid user: " + (gid.isError() ? gid.error()
- : "nonexistent"));
- }
-
-
- // Prepare a function for the forked child to exec() the executor.
- lambda::function<int()> childFunction = lambda::bind(
- &execute,
- executorInfo.command(),
- directory,
- envp,
- uid.get(),
- gid.get(),
- !local,
- pipes[0],
- pipes[1],
- commands);
-
- Try<pid_t> forked = launcher->fork(containerId, childFunction);
-
- if (forked.isError()) {
- return Failure("Failed to fork executor: " + forked.error());
- }
- pid_t pid = forked.get();
-
- // Checkpoint the executor's pid if requested.
- if (checkpoint) {
- const string& path = slave::paths::getForkedPidPath(
- slave::paths::getMetaRootDir(flags.work_dir),
- slaveId,
- executorInfo.framework_id(),
- executorInfo.executor_id(),
- containerId);
-
- LOG(INFO) << "Checkpointing executor's forked pid " << pid
- << " to '" << path << "'";
-
- Try<Nothing> checkpointed =
- slave::state::checkpoint(path, stringify(pid));
-
- if (checkpointed.isError()) {
- LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
- << path << "': " << checkpointed.error();
-
- return Failure("Could not checkpoint executor's pid");
- }
- }
-
- // Monitor the executor's pid. We keep the future because we'll refer to it
- // again during container destroy.
- Future<Option<int> > status = process::reap(pid);
- statuses.put(containerId, status);
- status.onAny(defer(self(), &Self::reaped, containerId));
-
- return isolate(containerId, pid)
- .then(defer(self(),
- &Self::fetch,
- containerId,
- executorInfo.command(),
- directory,
- user))
- .then(defer(self(), &Self::exec, containerId, pipes[1]))
- .onAny(lambda::bind(&os::close, pipes[0]))
- .onAny(lambda::bind(&os::close, pipes[1]));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::isolate(
- const ContainerID& containerId,
- pid_t _pid)
-{
- // Set up callbacks for isolator limitations.
- foreach (const Owned<Isolator>& isolator, isolators) {
- isolator->watch(containerId)
- .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
- }
-
- // Isolate the executor with each isolator.
- list<Future<Nothing> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->isolate(containerId, _pid));
- }
-
- // Wait for all isolators to complete.
- return collect(futures)
- .then(lambda::bind(&_nothing));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::exec(
- const ContainerID& containerId,
- int pipeWrite)
-{
- CHECK(promises.contains(containerId));
-
- // Now that we've contained the child we can signal it to continue by
- // writing to the pipe.
- char dummy;
- ssize_t length;
- while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
- errno == EINTR);
-
- if (length != sizeof(dummy)) {
- return Failure("Failed to synchronize child process: " +
- string(strerror(errno)));
- }
-
- return Nothing();
-}
-
-
-Future<containerizer::Termination> MesosContainerizerProcess::wait(
- const ContainerID& containerId)
-{
- if (!promises.contains(containerId)) {
- return Failure("Unknown container: " + stringify(containerId));
- }
-
- return promises[containerId]->future();
-}
-
-
-Future<Nothing> MesosContainerizerProcess::update(
- const ContainerID& containerId,
- const Resources& _resources)
-{
- // The resources hashmap won't initially contain the container's resources
- // after recovery so we must check the promises hashmap (which is set during
- // recovery).
- if (!promises.contains(containerId)) {
- // It is not considered a failure if the container is not known
- // because the slave will attempt to update the container's
- // resources on a task's terminal state change but the executor
- // may have already exited and the container cleaned up.
- LOG(WARNING) << "Ignoring update for unknown container: " << containerId;
- return Nothing();
- }
-
- // Store the resources for usage().
- resources.put(containerId, _resources);
-
- // Update each isolator.
- list<Future<Nothing> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->update(containerId, _resources));
- }
-
- // Wait for all isolators to complete.
- return collect(futures)
- .then(lambda::bind(&_nothing));
-}
-
-
-// Resources are used to set the limit fields in the statistics but are
-// optional because they aren't known after recovery until/unless update() is
-// called.
-Future<ResourceStatistics> _usage(
- const ContainerID& containerId,
- const Option<Resources>& resources,
- const list<Future<ResourceStatistics> >& statistics)
-{
- ResourceStatistics result;
-
- // Set the timestamp now we have all statistics.
- result.set_timestamp(Clock::now().secs());
-
- foreach (const Future<ResourceStatistics>& statistic, statistics) {
- if (statistic.isReady()) {
- result.MergeFrom(statistic.get());
- } else {
- LOG(WARNING) << "Skipping resource statistic for container "
- << containerId << " because: "
- << (statistic.isFailed() ? statistic.failure()
- : "discarded");
- }
- }
-
- if (resources.isSome()) {
- // Set the resource allocations.
- Option<Bytes> mem = resources.get().mem();
- if (mem.isSome()) {
- result.set_mem_limit_bytes(mem.get().bytes());
- }
-
- Option<double> cpus = resources.get().cpus();
- if (cpus.isSome()) {
- result.set_cpus_limit(cpus.get());
- }
- }
-
- return result;
-}
-
-
-Future<ResourceStatistics> MesosContainerizerProcess::usage(
- const ContainerID& containerId)
-{
- if (!promises.contains(containerId)) {
- return Failure("Unknown container: " + stringify(containerId));
- }
-
- list<Future<ResourceStatistics> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->usage(containerId));
- }
-
- // Use await() here so we can return partial usage statistics.
- // TODO(idownes): After recovery resources won't be known until after an
- // update() because they aren't part of the SlaveState.
- return await(futures)
- .then(lambda::bind(
- _usage, containerId, resources.get(containerId), lambda::_1));
-}
-
-
-void MesosContainerizerProcess::destroy(const ContainerID& containerId)
-{
- if (!promises.contains(containerId)) {
- LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
- return;
- }
-
- if (destroying.contains(containerId)) {
- // Destroy has already been initiated.
- return;
- }
- destroying.insert(containerId);
-
- LOG(INFO) << "Destroying container '" << containerId << "'";
-
- if (statuses.contains(containerId)) {
- // Kill all processes then continue destruction.
- launcher->destroy(containerId)
- .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
- } else {
- // The executor never forked so no processes to kill, go straight to
- // __destroy() with status = None().
- __destroy(containerId, None());
- }
-}
-
-
-void MesosContainerizerProcess::_destroy(
- const ContainerID& containerId,
- const Future<Nothing>& future)
-{
- // Something has gone wrong and the launcher wasn't able to kill all the
- // processes in the container. We cannot clean up the isolators because they
- // may require that all processes have exited so just return the failure to
- // the slave.
- // TODO(idownes): This is a pretty bad state to be in but we should consider
- // cleaning up here.
- if (!future.isReady()) {
- promises[containerId]->fail(
- "Failed to destroy container: " +
- (future.isFailed() ? future.failure() : "discarded future"));
-
- destroying.erase(containerId);
-
- return;
- }
-
- // We've successfully killed all processes in the container so get the exit
- // status of the executor when it's ready (it may already be) and continue
- // the destroy.
- statuses.get(containerId).get()
- .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
-}
-
-
-void MesosContainerizerProcess::__destroy(
- const ContainerID& containerId,
- const Future<Option<int > >& status)
-{
- // Now that all processes have exited we can now clean up all isolators.
- list<Future<Nothing> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->cleanup(containerId));
- }
-
- // Wait for all isolators to complete cleanup before continuing.
- collect(futures)
- .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
-}
-
-
-void MesosContainerizerProcess::___destroy(
- const ContainerID& containerId,
- const Future<Option<int > >& status,
- const Future<list<Nothing> >& futures)
-{
- // Something has gone wrong with one of the Isolators and cleanup failed.
- // We'll fail the container termination and remove the 'destroying' flag but
- // leave all other state. The containerizer is now in a bad state because
- // at least one isolator has failed to clean up.
- if (!futures.isReady()) {
- promises[containerId]->fail(
- "Failed to clean up isolators when destroying container: " +
- (futures.isFailed() ? futures.failure() : "discarded future"));
-
- destroying.erase(containerId);
-
- return;
- }
-
- // A container is 'killed' if any isolator limited it.
- // Note: We may not see a limitation in time for it to be registered. This
- // could occur if the limitation (e.g., an OOM) killed the executor and we
- // triggered destroy() off the executor exit.
- bool killed = false;
- string message;
- if (limitations.contains(containerId)) {
- killed = true;
- foreach (const Limitation& limitation, limitations.get(containerId)) {
- message += limitation.message;
- }
- message = strings::trim(message);
- } else {
- message = "Executor terminated";
- }
-
- containerizer::Termination termination;
- termination.set_killed(killed);
- termination.set_message(message);
- if (status.isReady() && status.get().isSome()) {
- termination.set_status(status.get().get());
- }
-
- promises[containerId]->set(termination);
-
- promises.erase(containerId);
- statuses.erase(containerId);
- limitations.erase(containerId);
- resources.erase(containerId);
- destroying.erase(containerId);
-}
-
-
-void MesosContainerizerProcess::reaped(const ContainerID& containerId)
-{
- if (!promises.contains(containerId)) {
- return;
- }
-
- LOG(INFO) << "Executor for container '" << containerId << "' has exited";
-
- // The executor has exited so destroy the container.
- destroy(containerId);
-}
-
-
-void MesosContainerizerProcess::limited(
- const ContainerID& containerId,
- const Future<Limitation>& future)
-{
- if (!promises.contains(containerId)) {
- return;
- }
-
- if (future.isReady()) {
- LOG(INFO) << "Container " << containerId << " has reached its limit for"
- << " resource " << future.get().resource
- << " and will be terminated";
- limitations.put(containerId, future.get());
- } else {
- // TODO(idownes): A discarded future will not be an error when isolators
- // discard their promises after cleanup.
- LOG(ERROR) << "Error in a resource limitation for container "
- << containerId << ": " << (future.isFailed() ? future.failure()
- : "discarded");
- }
-
- // The container has been affected by the limitation so destroy it.
- destroy(containerId);
-}
-
-
-Future<hashset<ContainerID> > MesosContainerizerProcess::containers()
-{
- return promises.keys();
-}
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.hpp b/src/slave/containerizer/mesos_containerizer.hpp
deleted file mode 100644
index 21affae..0000000
--- a/src/slave/containerizer/mesos_containerizer.hpp
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MESOS_CONTAINERIZER_HPP__
-#define __MESOS_CONTAINERIZER_HPP__
-
-#include <list>
-#include <vector>
-
-#include <stout/hashmap.hpp>
-#include <stout/lambda.hpp>
-#include <stout/multihashmap.hpp>
-
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/isolator.hpp"
-#include "slave/containerizer/launcher.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declaration.
-class MesosContainerizerProcess;
-
-class MesosContainerizer : public Containerizer
-{
-public:
- static Try<MesosContainerizer*> create(const Flags& flags, bool local);
-
- MesosContainerizer(
- const Flags& flags,
- bool local,
- const process::Owned<Launcher>& launcher,
- const std::vector<process::Owned<Isolator> >& isolators);
-
- virtual ~MesosContainerizer();
-
- virtual process::Future<Nothing> recover(
- const Option<state::SlaveState>& state);
-
- virtual process::Future<Nothing> launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint);
-
- virtual process::Future<Nothing> launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint);
-
- virtual process::Future<Nothing> update(
- const ContainerID& containerId,
- const Resources& resources);
-
- virtual process::Future<ResourceStatistics> usage(
- const ContainerID& containerId);
-
- virtual process::Future<containerizer::Termination> wait(
- const ContainerID& containerId);
-
- virtual void destroy(const ContainerID& containerId);
-
- virtual process::Future<hashset<ContainerID> > containers();
-
-private:
- MesosContainerizerProcess* process;
-};
-
-
-class MesosContainerizerProcess
- : public process::Process<MesosContainerizerProcess>
-{
-public:
- MesosContainerizerProcess(
- const Flags& _flags,
- bool _local,
- const process::Owned<Launcher>& _launcher,
- const std::vector<process::Owned<Isolator> >& _isolators)
- : flags(_flags),
- local(_local),
- launcher(_launcher),
- isolators(_isolators) {}
-
- virtual ~MesosContainerizerProcess() {}
-
- process::Future<Nothing> recover(
- const Option<state::SlaveState>& state);
-
- process::Future<Nothing> launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint);
-
- process::Future<Nothing> launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint);
-
- process::Future<Nothing> update(
- const ContainerID& containerId,
- const Resources& resources);
-
- process::Future<ResourceStatistics> usage(
- const ContainerID& containerId);
-
- process::Future<containerizer::Termination> wait(
- const ContainerID& containerId);
-
- void destroy(const ContainerID& containerId);
-
- process::Future<hashset<ContainerID> > containers();
-
-private:
- process::Future<Nothing> _recover(
- const std::list<state::RunState>& recoverable);
-
- process::Future<Nothing> __recover(
- const std::list<state::RunState>& recovered);
-
- process::Future<std::list<Option<CommandInfo> > > prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user);
-
- process::Future<Nothing> fetch(
- const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user);
-
- process::Future<Nothing> _launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint,
- const std::list<Option<CommandInfo> >& scripts);
-
- process::Future<Nothing> isolate(
- const ContainerID& containerId,
- pid_t _pid);
-
- process::Future<Nothing> exec(
- const ContainerID& containerId,
- int pipeWrite);
-
- // Continues 'destroy()' once all processes have been killed by the launcher.
- void _destroy(
- const ContainerID& containerId,
- const process::Future<Nothing>& future);
-
- // Continues '_destroy()' once we get the exit status of the executor.
- void __destroy(
- const ContainerID& containerId,
- const process::Future<Option<int > >& status);
-
- // Continues (and completes) '__destroy()' once all isolators have completed
- // cleanup.
- void ___destroy(
- const ContainerID& containerId,
- const process::Future<Option<int > >& status,
- const process::Future<std::list<Nothing> >& futures);
-
- // Call back for when an isolator limits a container and impacts the
- // processes. This will trigger container destruction.
- void limited(
- const ContainerID& containerId,
- const process::Future<Limitation>& future);
-
- // Call back for when the executor exits. This will trigger container
- // destroy.
- void reaped(const ContainerID& containerId);
-
- const Flags flags;
- const bool local;
- const process::Owned<Launcher> launcher;
- const std::vector<process::Owned<Isolator> > isolators;
-
- // TODO(idownes): Consider putting these per-container variables into a
- // struct.
- // Promises for futures returned from wait().
- hashmap<ContainerID,
- process::Owned<process::Promise<containerizer::Termination> > > promises;
-
- // We need to keep track of the future exit status for each executor because
- // we'll only get a single notification when the executor exits.
- hashmap<ContainerID, process::Future<Option<int> > > statuses;
-
- // We keep track of any limitations received from each isolator so we can
- // determine the cause of an executor termination.
- multihashmap<ContainerID, Limitation> limitations;
-
- // We keep track of the resources for each container so we can set the
- // ResourceStatistics limits in usage().
- hashmap<ContainerID, Resources> resources;
-
- // Set of containers that are in process of being destroyed.
- hashset<ContainerID> destroying;
-};
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __MESOS_CONTAINERIZER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/cgroups_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cgroups_isolator_tests.cpp b/src/tests/cgroups_isolator_tests.cpp
index 5a9704d..2d9738f 100644
--- a/src/tests/cgroups_isolator_tests.cpp
+++ b/src/tests/cgroups_isolator_tests.cpp
@@ -25,7 +25,7 @@
#include <stout/proc.hpp>
#include <stout/stringify.hpp>
-#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/script.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 8ea7974..70e1245 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -31,7 +31,8 @@
#include "slave/containerizer/isolator.hpp"
#include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/isolator.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 0bbec09..5a141e3 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -29,6 +29,7 @@
#include <process/owned.hpp>
#include <process/reap.hpp>
+#include <stout/abort.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
@@ -84,23 +85,24 @@ using testing::Return;
using testing::SaveArg;
-int execute(const std::string& command, int pipes[2])
+static int childSetup(int pipes[2])
{
- // In child process
- ::close(pipes[1]);
+ // In child process.
+ while (::close(pipes[1]) == -1 && errno == EINTR);
// Wait until the parent signals us to continue.
- int buf;
- while (::read(pipes[0], &buf, sizeof(buf)) == -1 && errno == EINTR);
- ::close(pipes[0]);
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
- execl("/bin/sh", "sh", "-c", command.c_str(), (char*) NULL);
+ if (length != sizeof(dummy)) {
+ ABORT("Failed to synchronize with parent");
+ }
- const char* message = "Child failed to exec";
- while (write(STDERR_FILENO, message, strlen(message)) == -1 &&
- errno == EINTR);
+ while (::close(pipes[0]) == -1 && errno == EINTR);
- _exit(1);
+ return 0;
}
@@ -147,24 +149,38 @@ TYPED_TEST(CpuIsolatorTest, UserCpuUsage)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+ vector<string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = command;
+
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&childSetup, pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Reap the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait for the command to start.
while (!os::exists(file));
@@ -238,24 +254,38 @@ TYPED_TEST(CpuIsolatorTest, SystemCpuUsage)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+ vector<string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = command;
+
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&childSetup, pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Reap the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait for the command to start.
while (!os::exists(file));
@@ -335,24 +365,38 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+ vector<string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = command;
+
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&childSetup, pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Reap the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait for the command to complete.
AWAIT_READY(status);
@@ -413,24 +457,38 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs_Big_Quota)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(&execute, "exit 0", pipes);
+ vector<string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = "exit 0";
+
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&childSetup, pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Reap the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait for the command to complete successfully.
AWAIT_READY(status);
@@ -466,13 +524,20 @@ TYPED_TEST_CASE(MemIsolatorTest, MemIsolatorTypes);
// posix_memalign, mlock, memset and perror are not safe.
int consumeMemory(const Bytes& _size, const Duration& duration, int pipes[2])
{
- // In child process
- ::close(pipes[1]);
+ // In child process.
+ while (::close(pipes[1]) == -1 && errno == EINTR);
- int buf;
// Wait until the parent signals us to continue.
- while (::read(pipes[0], &buf, sizeof(buf)) == -1 && errno == EINTR);
- ::close(pipes[0]);
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
+ ABORT("Failed to synchronize with parent");
+ }
+
+ while (::close(pipes[0]) == -1 && errno == EINTR);
size_t size = static_cast<size_t>(_size.bytes());
void* buffer = NULL;
@@ -522,28 +587,33 @@ TYPED_TEST(MemIsolatorTest, MemUsage)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(
- &consumeMemory,
- Megabytes(256),
- Seconds(10),
- pipes);
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ vector<string>(),
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&consumeMemory, Megabytes(256), Seconds(10), pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Set up the reaper to wait on the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait up to 5 seconds for the child process to consume 256 MB of memory;
ResourceStatistics statistics;
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 885e0fb..c498d33 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -50,7 +50,7 @@
#include "slave/flags.hpp"
#include "slave/slave.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 24af32b..216bd6f 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -34,7 +34,8 @@
#endif
#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/containerizer.hpp"
#include "tests/environment.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index eeb8631..ae38a13 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -50,10 +50,12 @@
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
#include "slave/slave.hpp"
+#include "slave/containerizer/containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+
#include "tests/cluster.hpp"
#include "tests/utils.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 873f22d..371a5b8 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -43,11 +43,12 @@
#include "master/master.hpp"
#include "slave/constants.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
#include "slave/gc.hpp"
#include "slave/flags.hpp"
#include "slave/slave.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
+
#include "tests/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
[5/7] git commit: Reverted Operation in favor of using Subcommand.
Posted by ji...@apache.org.
Reverted Operation in favor of using Subcommand.
Review: https://reviews.apache.org/r/22764
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ae3c8e22
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ae3c8e22
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ae3c8e22
Branch: refs/heads/master
Commit: ae3c8e221c39ac9ef4b7f59fb833bf78904cc111
Parents: 6b7e657
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 25 14:35:22 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:35:27 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 8 --
src/launcher/launcher.cpp | 228 --------------------------------------
src/launcher/launcher.hpp | 135 ----------------------
src/launcher/main.cpp | 29 -----
src/tests/environment.cpp | 5 -
src/tests/launcher_tests.cpp | 79 -------------
6 files changed, 484 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 861aad2..a49a3fe 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -230,7 +230,6 @@ libmesos_no_3rdparty_la_SOURCES = \
sched/sched.cpp \
scheduler/scheduler.cpp \
local/local.cpp \
- launcher/launcher.cpp \
master/contender.cpp \
master/constants.cpp \
master/detector.cpp \
@@ -352,7 +351,6 @@ libmesos_no_3rdparty_la_SOURCES += \
credentials/credentials.hpp \
examples/utils.hpp files/files.hpp \
hdfs/hdfs.hpp \
- launcher/launcher.hpp \
linux/cgroups.hpp \
linux/perf.hpp \
linux/fs.hpp local/flags.hpp local/local.hpp \
@@ -574,11 +572,6 @@ mesos_usage_SOURCES = usage/main.cpp
mesos_usage_CPPFLAGS = $(MESOS_CPPFLAGS)
mesos_usage_LDADD = libmesos.la
-pkglibexec_PROGRAMS += mesos-launcher
-mesos_launcher_SOURCES = launcher/main.cpp
-mesos_launcher_CPPFLAGS = $(MESOS_CPPFLAGS)
-mesos_launcher_LDADD = libmesos.la
-
bin_PROGRAMS += mesos-log
mesos_log_SOURCES = log/main.cpp
mesos_log_CPPFLAGS = $(MESOS_CPPFLAGS)
@@ -995,7 +988,6 @@ mesos_tests_SOURCES = \
tests/gc_tests.cpp \
tests/isolator_tests.cpp \
tests/external_containerizer_test.cpp \
- tests/launcher_tests.cpp \
tests/log_tests.cpp \
tests/logging_tests.cpp \
tests/main.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/launcher/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp
deleted file mode 100644
index 5585aad..0000000
--- a/src/launcher/launcher.cpp
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <process/internal.hpp>
-#include <process/io.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/foreach.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/strings.hpp>
-
-#include "launcher/launcher.hpp"
-
-using namespace process;
-
-using std::cerr;
-using std::endl;
-using std::map;
-using std::string;
-
-namespace mesos {
-namespace internal {
-namespace launcher {
-
-// The default executable.
-const string DEFAULT_EXECUTABLE = "mesos-launcher";
-
-
-// The prefix of the environment variables that launcher uses.
-static const string LAUNCHER_PREFIX = "MESOS_LAUNCHER_";
-
-
-// The default directory to search for the executable.
-static Option<string> defaultPath;
-static int defaultPathLock = 0;
-
-
-// Stores all the registered operations.
-static hashmap<string, Owned<Operation> > operations;
-
-
-static void usage(const char* argv0)
-{
- cerr << "Usage: " << argv0 << " <operation> [OPTIONS]" << endl
- << endl
- << "Available operations:" << endl
- << " help" << endl;
-
- // Get a list of available operations.
- foreachkey (const string& name, operations) {
- cerr << " " << name << endl;
- }
-}
-
-
-void setDefaultPath(const string& path)
-{
- process::internal::acquire(&defaultPathLock);
- {
- defaultPath = path;
- }
- process::internal::release(&defaultPathLock);
-}
-
-
-static Option<string> getDefaultPath()
-{
- Option<string> path;
-
- process::internal::acquire(&defaultPathLock);
- {
- path = defaultPath;
- }
- process::internal::release(&defaultPathLock);
-
- return path;
-}
-
-
-void add(const Owned<Operation>& operation)
-{
- operations[operation->name()] = operation;
-}
-
-
-int main(int argc, char** argv)
-{
- if (argc < 2) {
- usage(argv[0]);
- return 1;
- }
-
- if (!strcmp(argv[1], "help")) {
- if (argc == 2) {
- usage(argv[0]);
- return 1;
- }
-
- // 'argv[0] help operation' => 'argv[0] operation --help'
- argv[1] = argv[2];
- argv[2] = (char*) "--help";
- }
-
- const string operation = argv[1];
-
- if (!operations.contains(operation)) {
- cerr << "Operation '" << operation << "' is not available" << endl;
- usage(argv[0]);
- return 1;
- }
-
- // Create the operation specific flags.
- flags::FlagsBase* flags = operations[operation]->getFlags();
-
- // Parse the flags from the environment and the command line.
- Try<Nothing> load = flags->load(LAUNCHER_PREFIX, argc, argv);
- if (load.isError()) {
- cerr << "Failed to parse the flags: " << load.error() << endl;
- return 1;
- }
-
- // Execute the operation.
- return operations[operation]->execute();
-}
-
-
-ShellOperation::Flags::Flags()
-{
- add(&command,
- "command",
- "The shell command to be executed");
-}
-
-
-int ShellOperation::execute()
-{
- if (flags.command.isNone()) {
- cerr << "The command is not specified" << endl;
- return 1;
- }
-
- int status = os::system(flags.command.get());
- if (!WIFEXITED(status)) {
- return 1;
- }
-
- return WEXITSTATUS(status);
-}
-
-
-process::Future<Option<int> > Operation::launch(
- const Option<int>& stdout,
- const Option<int>& stderr,
- const string& executable,
- const Option<string>& _path)
-{
- // Determine the path to search for the executable. If the path is
- // specified by the user, use it. Otherwise, use the default path.
- // If both are not specified, return failure.
- string path;
- if (_path.isSome()) {
- path = _path.get();
- } else {
- Option<string> _defaultPath = getDefaultPath();
- if (_defaultPath.isNone()) {
- return Failure("Path is not specified and no default path is found");
- }
- path = _defaultPath.get();
- }
-
- Result<string> realpath = os::realpath(path::join(path, executable));
- if (!realpath.isSome()) {
- return Failure(
- "Failed to determine the canonical path for '" + executable + "': " +
- (realpath.isError() ? realpath.error() : "No such file or directory"));
- }
-
- // Prepare the environment variables.
- map<string, string> environment;
- foreachpair (const string& name, const flags::Flag& flag, *getFlags()) {
- Option<string> value = flag.stringify(*getFlags());
- if (value.isSome()) {
- string key = LAUNCHER_PREFIX + name;
- environment[key] = value.get();
- VLOG(1) << "Setting launcher environment " << key << "=" << value.get();
- }
- }
-
- // Prepare the command: 'mesos-launcher <operation_name> ...'.
- string command = strings::join(" ", realpath.get(), name());
-
- Try<Subprocess> s = subprocess(
- command,
- Subprocess::PIPE(),
- Subprocess::PIPE(),
- Subprocess::PIPE(),
- environment);
-
- if (s.isError()) {
- return Failure("Launch subprocess failed: " + s.error());
- }
-
- io::redirect(s.get().out().get(), stdout);
- io::redirect(s.get().err().get(), stderr);
-
- return s.get().status();
-}
-
-} // namespace launcher {
-} // namespace internal {
-} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/launcher/launcher.hpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.hpp b/src/launcher/launcher.hpp
deleted file mode 100644
index 35cdc69..0000000
--- a/src/launcher/launcher.hpp
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __LAUNCHER_LAUNCHER_HPP__
-#define __LAUNCHER_LAUNCHER_HPP__
-
-#include <string>
-
-#include <process/future.hpp>
-#include <process/owned.hpp>
-
-#include <stout/flags.hpp>
-#include <stout/none.hpp>
-#include <stout/nothing.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-
-namespace mesos {
-namespace internal {
-namespace launcher {
-
-// The default executable used by the launcher.
-extern const std::string DEFAULT_EXECUTABLE;
-
-
-// Represents an operation to be executed by a subprocess.
-class Operation
-{
-public:
- virtual ~Operation() {}
-
- // Launches this operation in a subprocess. The user may choose to
- // specify the executable and the path in which to search for the
- // executable. If not specified, the default executable and the
- // default path will be used.
- process::Future<Option<int> > launch(
- const Option<int>& stdout = None(),
- const Option<int>& stderr = None(),
- const std::string& executable = DEFAULT_EXECUTABLE,
- const Option<std::string>& path = None());
-
-protected:
- // Returns the name of this operation.
- virtual std::string name() const = 0;
-
- // Defines the operation that will be executed by a subprocess. The
- // return value will be the exit code of the subprocess.
- virtual int execute() = 0;
-
- // Returns the pointer to the flags that will be used for this
- // operation. By default, the flags is empty.
- virtual flags::FlagsBase* getFlags() { return &flags; }
-
-private:
- friend void add(const process::Owned<Operation>& operation);
- friend int main(int argc, char** argv);
-
- // The default flags which is empty.
- flags::FlagsBase flags;
-};
-
-
-// Tell the launcher which directory to search for the executable by
-// default if it is not specified by the user. When launching an
-// operation, if the user does not specify the 'path' and no default
-// 'path' is set, the 'launch' will fail.
-void setDefaultPath(const std::string& path);
-
-
-// Register an operation. This is supposed to be called in the main
-// function of the subprocess.
-void add(const process::Owned<Operation>& operation);
-
-
-// Syntactic sugar for registering an operation. For example, the
-// following code shows a typical main function of the subprocess.
-//
-// int main(int argc, char** argv)
-// {
-// launcher::add<Operation1>();
-// launcher::add<OPeration2>();
-//
-// return launcher::main(argc, argv);
-// }
-template <typename T>
-void add()
-{
- add(process::Owned<Operation>(new T()));
-}
-
-
-// The main entry of the subprocess.
-int main(int argc, char** argv);
-
-
-// An operation which takes a shell command and executes it. This is
-// mainly used for testing.
-class ShellOperation : public Operation
-{
-public:
- struct Flags : public flags::FlagsBase
- {
- Flags();
-
- Option<std::string> command;
- };
-
- Flags flags;
-
-protected:
- virtual std::string name() const { return "shell"; }
- virtual int execute();
- virtual flags::FlagsBase* getFlags() { return &flags; }
-};
-
-} // namespace launcher {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __LAUNCHER_LAUNCHER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/launcher/main.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/main.cpp b/src/launcher/main.cpp
deleted file mode 100644
index b497e98..0000000
--- a/src/launcher/main.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "launcher/launcher.hpp"
-
-using namespace mesos::internal;
-
-
-int main(int argc, char** argv)
-{
- launcher::add<launcher::ShellOperation>();
-
- return launcher::main(argc, argv);
-}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/tests/environment.cpp
----------------------------------------------------------------------
diff --git a/src/tests/environment.cpp b/src/tests/environment.cpp
index 21b9d1d..e991d57 100644
--- a/src/tests/environment.cpp
+++ b/src/tests/environment.cpp
@@ -38,8 +38,6 @@
#include "linux/cgroups.hpp"
#endif
-#include "launcher/launcher.hpp"
-
#include "logging/logging.hpp"
#include "tests/environment.hpp"
@@ -246,9 +244,6 @@ void Environment::SetUp()
os::setenv("MESOS_NATIVE_JAVA_LIBRARY", path);
}
- // Set the default path for the launcher.
- launcher::setDefaultPath(path::join(tests::flags.build_dir, "src"));
-
if (!GTEST_IS_THREADSAFE) {
EXIT(1) << "Testing environment is not thread safe, bailing!";
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ae3c8e22/src/tests/launcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/launcher_tests.cpp b/src/tests/launcher_tests.cpp
deleted file mode 100644
index e293cc5..0000000
--- a/src/tests/launcher_tests.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <stdio.h>
-
-#include <gtest/gtest.h>
-
-#include <process/gtest.hpp>
-
-#include <stout/gtest.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-
-#include "launcher/launcher.hpp"
-
-#include "tests/flags.hpp"
-#include "tests/utils.hpp"
-
-using namespace process;
-
-using namespace mesos::internal;
-using namespace mesos::internal::launcher;
-
-using std::string;
-
-
-class LauncherTest: public tests::TemporaryDirectoryTest {};
-
-
-TEST_F(LauncherTest, Launch)
-{
- Option<int> stdout = None();
- Option<int> stderr = None();
-
- // Redirect output if running the tests verbosely.
- if (tests::flags.verbose) {
- stdout = STDOUT_FILENO;
- stderr = STDERR_FILENO;
- }
-
- string temp1 = path::join(os::getcwd(), "temp1");
- string temp2 = path::join(os::getcwd(), "temp2");
-
- ASSERT_SOME(os::write(temp1, "hello world"));
-
- ShellOperation operation;
- operation.flags.command = "cp " + temp1 + " " + temp2;
-
- Future<Option<int> > launch = operation.launch(stdout, stderr);
- AWAIT_READY(launch);
- EXPECT_SOME_EQ(0, launch.get());
- ASSERT_SOME_EQ("hello world", os::read(temp2));
-
- AWAIT_FAILED(operation.launch(
- stdout,
- stderr,
- "non-exist"));
-
- AWAIT_FAILED(operation.launch(
- stdout,
- stderr,
- launcher::DEFAULT_EXECUTABLE,
- "non-exist"));
-}
[3/7] git commit: Introduced a Subcommand abstraction in stout.
Posted by ji...@apache.org.
Introduced a Subcommand abstraction in stout.
Review: https://reviews.apache.org/r/22764/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3279c406
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3279c406
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3279c406
Branch: refs/heads/master
Commit: 3279c406c7ff12f869189f7b20dbef17310bce49
Parents: 7f1774b
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 25 14:33:16 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:33:21 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/stout/Makefile.am | 2 +
.../3rdparty/stout/include/stout/subcommand.hpp | 191 +++++++++++++++++++
.../3rdparty/stout/tests/subcommand_tests.cpp | 181 ++++++++++++++++++
3 files changed, 374 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3279c406/3rdparty/libprocess/3rdparty/stout/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/Makefile.am b/3rdparty/libprocess/3rdparty/stout/Makefile.am
index eac7ab5..b562e87 100644
--- a/3rdparty/libprocess/3rdparty/stout/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/Makefile.am
@@ -68,6 +68,7 @@ EXTRA_DIST = \
include/stout/stopwatch.hpp \
include/stout/stringify.hpp \
include/stout/strings.hpp \
+ include/stout/subcommand.hpp \
include/stout/tests/utils.hpp \
include/stout/thread.hpp \
include/stout/try.hpp \
@@ -105,5 +106,6 @@ EXTRA_DIST = \
tests/set_tests.cpp \
tests/some_tests.cpp \
tests/strings_tests.cpp \
+ tests/subcommand_tests.cpp \
tests/thread_tests.cpp \
tests/uuid_tests.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/3279c406/3rdparty/libprocess/3rdparty/stout/include/stout/subcommand.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/subcommand.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/subcommand.hpp
new file mode 100644
index 0000000..b121836
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/subcommand.hpp
@@ -0,0 +1,191 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STOUT_SUBCOMMAND_HPP__
+#define __STOUT_SUBCOMMAND_HPP__
+
+#include <iostream>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <stout/flags.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashset.hpp>
+#include <stout/option.hpp>
+#include <stout/preprocessor.hpp>
+
+// Subcommand is an abstraction for creating command binaries that
+// encompass many subcommands. For example:
+//
+// $ ./runner start --arg1=1 --arg2=2
+// $ ./runner stop --arg3=3 --arg4=4
+//
+// Here, the 'runner' command contains two subcommand implementations:
+// StartCommand and StopCommand. Each subcommand needs to define a
+// name, implement an 'execute' function, and provide the address of a
+// flags where the command line arguments will be parsed to. To
+// simplify creating command binaries that encompass many subcommands,
+// we provide a 'dispatch' function which will look at argv[1] to
+// decide which subcommand to execute (based on its name) and then
+// parse the command line flags for you.
+class Subcommand
+{
+public:
+ // This function is supposed to be called by the main function of
+ // the command binary. A user needs to register at least one
+ // subcommand. Here is a typical example of the main function of the
+ // command binary:
+ //
+ // int main(int argc, char** argv)
+ // {
+ // return Subcommand::dispatch(
+ // None(),
+ // argc,
+ // argv,
+ // new Subcommand1(),
+ // new Subcommand2(),
+ // new Subcommand3());
+ // }
+#define INSERT(z, N, _) subcommands.push_back( c ## N );
+#define TEMPLATE(Z, N, DATA) \
+ static int dispatch( \
+ const Option<std::string>& prefix, \
+ int argc, \
+ char** argv, \
+ ENUM_PARAMS(N, Subcommand* c)) \
+ { \
+ std::vector<Subcommand*> subcommands; \
+ REPEAT_FROM_TO(0, N, INSERT, _) \
+ return dispatch(prefix, argc, argv, subcommands); \
+ }
+
+ REPEAT_FROM_TO(1, 11, TEMPLATE, _) // Args C1 -> C11.
+#undef TEMPLATE
+#undef INSERT
+
+ explicit Subcommand(const std::string& _name) : name_(_name) {}
+ virtual ~Subcommand() {}
+
+ std::string name() const { return name_; }
+
+protected:
+ // Defines the main function of this subcommand. The return value
+ // will be used as the exit code.
+ // TODO(jieyu): Consider passing in argc and argv as some users
+ // might want to access the remaining command line arguments.
+ virtual int execute() = 0;
+
+ // Returns the pointer to the flags that will be used for this
+ // subcommand. If the user does not provide an override, the default
+ // empty flags will be used.
+ virtual flags::FlagsBase* getFlags() { return &flags_; }
+
+private:
+ // Returns the usage by listing all the registered subcommands.
+ static std::string usage(
+ const std::string& argv0,
+ const std::vector<Subcommand*>& subcommands);
+
+ static int dispatch(
+ const Option<std::string>& prefix,
+ int argc,
+ char** argv,
+ const std::vector<Subcommand*>& subcommands);
+
+ // The name of this subcommand.
+ std::string name_;
+
+ // The default flags which is empty.
+ flags::FlagsBase flags_;
+};
+
+
+inline std::string Subcommand::usage(
+ const std::string& argv0,
+ const std::vector<Subcommand*>& subcommands)
+{
+ std::ostringstream stream;
+
+ stream << "Usage: " << argv0 << " <subcommand> [OPTIONS]\n\n"
+ << "Available subcommands:\n"
+ << " help\n";
+
+ // Get a list of available subcommands.
+ foreach (Subcommand* subcommand, subcommands) {
+ stream << " " << subcommand->name() << "\n";
+ }
+
+ return stream.str();
+}
+
+
+inline int Subcommand::dispatch(
+ const Option<std::string>& prefix,
+ int argc,
+ char** argv,
+ const std::vector<Subcommand*>& subcommands)
+{
+ if (subcommands.empty()) {
+ std::cerr << "No subcommand is found" << std::endl;
+ return 1;
+ }
+
+ // Check for duplicated subcommand names.
+ hashset<std::string> names;
+ foreach (Subcommand* subcommand, subcommands) {
+ if (names.contains(subcommand->name())) {
+ std::cerr << "Multiple subcommands have name '"
+ << subcommand->name() << "'" << std::endl;
+ return 1;
+ }
+ names.insert(subcommand->name());
+ }
+
+ if (argc < 2) {
+ std::cerr << usage(argv[0], subcommands) << std::endl;
+ return 1;
+ }
+
+ if (std::string(argv[1]) == "help") {
+ if (argc == 2) {
+ std::cout << usage(argv[0], subcommands) << std::endl;
+ return 0;
+ }
+
+ // 'argv[0] help subcommand' => 'argv[0] subcommand --help'
+ argv[1] = argv[2];
+ argv[2] = (char*) "--help";
+ }
+
+ foreach (Subcommand* subcommand, subcommands) {
+ if (subcommand->name() == argv[1]) {
+ flags::FlagsBase* flags = subcommand->getFlags();
+
+ Try<Nothing> load = flags->load(prefix, argc - 1, argv + 1);
+ if (load.isError()) {
+ std::cerr << "Failed to parse the flags: " << load.error() << std::endl;
+ return 1;
+ }
+
+ return subcommand->execute();
+ }
+ }
+
+ std::cerr << "Subcommand '" << argv[1] << "' is not available\n"
+ << usage(argv[0], subcommands) << std::endl;
+ return 1;
+}
+
+#endif // __STOUT_SUBCOMMAND_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/3279c406/3rdparty/libprocess/3rdparty/stout/tests/subcommand_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/subcommand_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/subcommand_tests.cpp
new file mode 100644
index 0000000..c40bba4
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/tests/subcommand_tests.cpp
@@ -0,0 +1,181 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string.h>
+#include <stdlib.h>
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include <stout/flags.hpp>
+#include <stout/foreach.hpp>
+#include <stout/subcommand.hpp>
+
+using std::string;
+using std::vector;
+
+
+class TestSubcommand : public Subcommand
+{
+public:
+ struct Flags : public flags::FlagsBase
+ {
+ Flags()
+ {
+ add(&b, "b", "bool");
+ add(&i, "i", "int");
+ add(&s, "s", "string");
+ add(&s2, "s2", "string with single quote");
+ add(&s3, "s3", "string with double quote");
+ add(&d, "d", "Duration");
+ add(&y, "y", "Bytes");
+ add(&j, "j", "JSON::Object");
+ }
+
+ void populate()
+ {
+ b = true;
+ i = 42;
+ s = "hello";
+ s2 = "we're";
+ s3 = "\"geek\"";
+ d = Seconds(10);
+ y = Bytes(100);
+
+ JSON::Object object;
+ object.values["strings"] = "string";
+ object.values["integer1"] = 1;
+ object.values["integer2"] = -1;
+ object.values["double1"] = 1;
+ object.values["double2"] = -1;
+ object.values["double3"] = -1.42;
+
+ JSON::Object nested;
+ nested.values["string"] = "string";
+ object.values["nested"] = nested;
+
+ JSON::Array array;
+ array.values.push_back(nested);
+ object.values["array"] = array;
+
+ j = object;
+ }
+
+ Option<bool> b;
+ Option<int> i;
+ Option<string> s;
+ Option<string> s2;
+ Option<string> s3;
+ Option<Duration> d;
+ Option<Bytes> y;
+ Option<JSON::Object> j;
+ };
+
+ explicit TestSubcommand(const string& name) : Subcommand(name) {}
+
+ Flags flags;
+
+protected:
+ virtual int execute() { return 0; }
+ virtual flags::FlagsBase* getFlags() { return &flags; }
+};
+
+
+// Generates a vector of arguments from flags.
+static vector<string> getArgv(const flags::FlagsBase& flags)
+{
+ vector<string> argv;
+ foreachpair (const string& name, const flags::Flag& flag, flags) {
+ Option<string> value = flag.stringify(flags);
+ if (value.isSome()) {
+ argv.push_back("--" + name + "=" + value.get());
+ }
+ }
+ return argv;
+}
+
+
+TEST(SubcommandTest, Flags)
+{
+ TestSubcommand::Flags flags;
+ flags.populate();
+
+ // Construct the command line arguments.
+ vector<string> _argv = getArgv(flags);
+ int argc = _argv.size() + 2;
+ char** argv = new char*[argc];
+ argv[0] = (char*) "command";
+ argv[1] = (char*) "subcommand";
+ for (int i = 2; i < argc; i++) {
+ argv[i] = ::strdup(_argv[i - 2].c_str());
+ }
+
+ TestSubcommand subcommand("subcommand");
+
+ ASSERT_EQ(0, Subcommand::dispatch(
+ None(),
+ argc,
+ argv,
+ &subcommand));
+
+ EXPECT_EQ(flags.b, subcommand.flags.b);
+ EXPECT_EQ(flags.i, subcommand.flags.i);
+ EXPECT_EQ(flags.s, subcommand.flags.s);
+ EXPECT_EQ(flags.s2, subcommand.flags.s2);
+ EXPECT_EQ(flags.s3, subcommand.flags.s3);
+ EXPECT_EQ(flags.d, subcommand.flags.d);
+ EXPECT_EQ(flags.y, subcommand.flags.y);
+ EXPECT_EQ(flags.j, subcommand.flags.j);
+
+ for (int i = 2; i < argc; i++) {
+ ::free(argv[i]);
+ }
+ delete argv;
+}
+
+
+TEST(SubcommandTest, Dispatch)
+{
+ TestSubcommand subcommand("subcommand");
+ TestSubcommand subcommand2("subcommand2");
+
+ int argc = 2;
+ char* argv[] = {
+ (char*) "command",
+ (char*) "subcommand"
+ };
+
+ EXPECT_EQ(1, Subcommand::dispatch(
+ None(),
+ argc,
+ argv,
+ &subcommand2));
+
+ // Duplicated subcommand names.
+ EXPECT_EQ(1, Subcommand::dispatch(
+ None(),
+ argc,
+ argv,
+ &subcommand,
+ &subcommand));
+
+ EXPECT_EQ(0, Subcommand::dispatch(
+ None(),
+ argc,
+ argv,
+ &subcommand,
+ &subcommand2));
+}
[2/7] git commit: Updated mesos to adapt to the new Subprocess
interfaces.
Posted by ji...@apache.org.
Updated mesos to adapt to the new Subprocess interfaces.
Review: https://reviews.apache.org/r/22851
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7f1774b8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7f1774b8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7f1774b8
Branch: refs/heads/master
Commit: 7f1774b8aee8ad85f80c0d26fac1321cde3ee1cc
Parents: 1b0fdf0
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jun 20 12:46:08 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:32:16 2014 -0700
----------------------------------------------------------------------
src/health-check/main.cpp | 1 -
src/launcher/executor.cpp | 41 +++++++++++---------
src/launcher/launcher.cpp | 1 -
.../containerizer/external_containerizer.cpp | 1 -
src/slave/containerizer/mesos_containerizer.cpp | 1 -
src/tests/slave_tests.cpp | 1 -
6 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/health-check/main.cpp
----------------------------------------------------------------------
diff --git a/src/health-check/main.cpp b/src/health-check/main.cpp
index 05e2924..707810a 100644
--- a/src/health-check/main.cpp
+++ b/src/health-check/main.cpp
@@ -154,7 +154,6 @@ private:
Subprocess::PIPE(),
Subprocess::FD(STDERR_FILENO),
Subprocess::FD(STDERR_FILENO),
- None(),
environment);
if (external.isError()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 9f48c88..9c80848 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -62,6 +62,7 @@ using std::cout;
using std::cerr;
using std::endl;
using std::string;
+using std::vector;
namespace mesos {
namespace internal {
@@ -402,31 +403,35 @@ private:
void launchHealthCheck(const TaskInfo& task)
{
if (task.has_health_check()) {
- const HealthCheck& healthCheck = task.health_check();
- JSON::Object json = JSON::Protobuf(healthCheck);
- // TODO(tnachen): Use flags when subprocess handle arguments
- // with quotes.
- const string& healthCommand =
- path::join(healthCheckDir, "mesos-health-check") + " --executor=\"" +
- stringify(self()) + "\" --health_check_json='" + stringify(json) +
- "' --task_id=" + task.task_id().value();
- cout << "Launching health check process: " << healthCommand << endl;
+ JSON::Object json = JSON::Protobuf(task.health_check());
+
+ // Launch the subprocess using 'execve' style so that quotes can
+ // be properly handled.
+ vector<string> argv(4);
+ argv[0] = "mesos-health-check";
+ argv[1] = "--executor=" + stringify(self());
+ argv[2] = "--health_check_json=" + stringify(json);
+ argv[3] = "--task_id=" + task.task_id().value();
+
+ cout << "Launching health check process: "
+ << path::join(healthCheckDir, "mesos-health-check")
+ << " " << argv[1] << " " << argv[2] << " " << argv[3] << endl;
+
Try<Subprocess> healthProcess =
process::subprocess(
- healthCommand,
+ path::join(healthCheckDir, "mesos-health-check"),
+ argv,
Subprocess::PIPE(),
Subprocess::FD(STDOUT_FILENO),
- Subprocess::FD(STDERR_FILENO),
- None(),
- None(),
- None());
+ Subprocess::FD(STDERR_FILENO));
+
if (healthProcess.isError()) {
cerr << "Unable to launch health process: " << healthProcess.error();
} else {
- const Subprocess& health = healthProcess.get();
- healthPid = health.pid();
- cout << "Health check process launched at pid: " << stringify(healthPid)
- << endl;
+ healthPid = healthProcess.get().pid();
+
+ cout << "Health check process launched at pid: "
+ << stringify(healthPid) << endl;
}
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/launcher/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/launcher.cpp b/src/launcher/launcher.cpp
index 872e2e8..5585aad 100644
--- a/src/launcher/launcher.cpp
+++ b/src/launcher/launcher.cpp
@@ -211,7 +211,6 @@ process::Future<Option<int> > Operation::launch(
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
- None(),
environment);
if (s.isError()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index bb3e5cc..3f28d85 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -1117,7 +1117,6 @@ Try<Subprocess> ExternalContainerizerProcess::invoke(
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
- None(),
environment,
lambda::bind(&setup, sandbox.isSome() ? sandbox.get().directory
: string()));
http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
index 76c101c..8a109f4 100644
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ b/src/slave/containerizer/mesos_containerizer.cpp
@@ -666,7 +666,6 @@ Future<Nothing> MesosContainerizerProcess::fetch(
Subprocess::PIPE(),
Subprocess::PIPE(),
Subprocess::PIPE(),
- None(),
environment);
if (fetcher.isError()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/7f1774b8/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 21fe685..873f22d 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -351,7 +351,6 @@ TEST_F(SlaveTest, MesosExecutorWithOverride)
process::Subprocess::PIPE(),
process::Subprocess::PIPE(),
process::Subprocess::PIPE(),
- None(),
environment);
ASSERT_SOME(executor);
[7/7] git commit: Refactored the mesos containerizer launcher to fix
MESOS-1404.
Posted by ji...@apache.org.
Refactored the mesos containerizer launcher to fix MESOS-1404.
Review: https://reviews.apache.org/r/22852
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0046dca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0046dca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0046dca
Branch: refs/heads/master
Commit: d0046dca732ac4c1636ef85384bda3092ff3fa4f
Parents: ae3c8e2
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jun 20 21:40:37 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:36:12 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 21 +-
src/slave/containerizer/containerizer.cpp | 3 +-
src/slave/containerizer/launcher.cpp | 89 +-
src/slave/containerizer/launcher.hpp | 46 +-
src/slave/containerizer/linux_launcher.cpp | 199 +--
src/slave/containerizer/linux_launcher.hpp | 17 +-
src/slave/containerizer/mesos/containerizer.cpp | 1048 ++++++++++++++++
src/slave/containerizer/mesos/containerizer.hpp | 240 ++++
src/slave/containerizer/mesos/launch.cpp | 211 ++++
src/slave/containerizer/mesos/launch.hpp | 60 +
src/slave/containerizer/mesos/main.cpp | 34 +
src/slave/containerizer/mesos_containerizer.cpp | 1174 ------------------
src/slave/containerizer/mesos_containerizer.hpp | 242 ----
src/tests/cgroups_isolator_tests.cpp | 2 +-
src/tests/containerizer_tests.cpp | 3 +-
src/tests/isolator_tests.cpp | 170 ++-
src/tests/master_tests.cpp | 2 +-
src/tests/mesos.cpp | 3 +-
src/tests/mesos.hpp | 6 +-
src/tests/slave_tests.cpp | 3 +-
20 files changed, 1958 insertions(+), 1615 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a49a3fe..fb3af9d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -247,10 +247,11 @@ libmesos_no_3rdparty_la_SOURCES = \
slave/slave.cpp \
slave/http.cpp \
slave/containerizer/containerizer.cpp \
+ slave/containerizer/external_containerizer.cpp \
slave/containerizer/isolator.cpp \
slave/containerizer/launcher.cpp \
- slave/containerizer/mesos_containerizer.cpp \
- slave/containerizer/external_containerizer.cpp \
+ slave/containerizer/mesos/containerizer.cpp \
+ slave/containerizer/mesos/launch.cpp \
slave/status_update_manager.cpp \
exec/exec.cpp \
common/lock.cpp \
@@ -365,16 +366,17 @@ libmesos_no_3rdparty_la_SOURCES += \
master/registrar.hpp \
master/master.hpp master/sorter.hpp \
messages/messages.hpp slave/constants.hpp \
- slave/containerizer/linux_launcher.hpp \
slave/containerizer/containerizer.hpp \
+ slave/containerizer/external_containerizer.hpp \
slave/containerizer/isolator.hpp \
+ slave/containerizer/launcher.hpp \
+ slave/containerizer/linux_launcher.hpp \
+ slave/containerizer/mesos/containerizer.hpp \
+ slave/containerizer/mesos/launch.hpp \
+ slave/containerizer/isolators/posix.hpp \
slave/containerizer/isolators/cgroups/cpushare.hpp \
slave/containerizer/isolators/cgroups/mem.hpp \
slave/containerizer/isolators/cgroups/perf_event.hpp \
- slave/containerizer/isolators/posix.hpp \
- slave/containerizer/launcher.hpp \
- slave/containerizer/mesos_containerizer.hpp \
- slave/containerizer/external_containerizer.hpp \
slave/flags.hpp slave/gc.hpp slave/monitor.hpp \
slave/paths.hpp slave/state.hpp \
slave/status_update_manager.hpp \
@@ -562,6 +564,11 @@ mesos_executor_SOURCES = launcher/executor.cpp
mesos_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
mesos_executor_LDADD = libmesos.la
+pkglibexec_PROGRAMS += mesos-containerizer
+mesos_containerizer_SOURCES = slave/containerizer/mesos/main.cpp
+mesos_containerizer_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_containerizer_LDADD = libmesos.la
+
pkglibexec_PROGRAMS += mesos-health-check
mesos_health_check_SOURCES = health-check/main.cpp
mesos_health_check_CPPFLAGS = $(MESOS_CPPFLAGS)
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index 922ae17..1b71f33 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -37,9 +37,10 @@
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/isolator.hpp"
#include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
#include "slave/containerizer/external_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
+
using std::map;
using std::string;
using std::vector;
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/launcher.cpp b/src/slave/containerizer/launcher.cpp
index 2b13d1f..92c0657 100644
--- a/src/slave/containerizer/launcher.cpp
+++ b/src/slave/containerizer/launcher.cpp
@@ -16,8 +16,6 @@
* limitations under the License.
*/
-#include <unistd.h>
-
#include <process/collect.hpp>
#include <process/delay.hpp>
#include <process/process.hpp>
@@ -32,7 +30,9 @@
using namespace process;
using std::list;
+using std::map;
using std::string;
+using std::vector;
namespace mesos {
namespace internal {
@@ -63,11 +63,12 @@ Future<Nothing> PosixLauncher::recover(const list<RunState>& states)
pid_t pid = state.forkedPid.get();
if (pids.containsValue(pid)) {
- // This should (almost) never occur. There is the possibility that a new
- // executor is launched with the same pid as one that just exited (highly
- // unlikely) and the slave dies after the new executor is launched but
- // before it hears about the termination of the earlier executor (also
- // unlikely). Regardless, the launcher can't do anything sensible so this
+ // This should (almost) never occur. There is the possibility
+ // that a new executor is launched with the same pid as one that
+ // just exited (highly unlikely) and the slave dies after the
+ // new executor is launched but before it hears about the
+ // termination of the earlier executor (also unlikely).
+ // Regardless, the launcher can't do anything sensible so this
// is considered an error.
return Failure("Detected duplicate pid " + stringify(pid) +
" for container " + stringify(containerId));
@@ -80,46 +81,66 @@ Future<Nothing> PosixLauncher::recover(const list<RunState>& states)
}
+// The setup function in child before the exec.
+static int childSetup(const Option<lambda::function<int()> >& setup)
+{
+ // POSIX guarantees a forked child's pid does not match any existing
+ // process group id so only a single setsid() is required and the
+ // session id will be the pid.
+ // TODO(idownes): perror is not listed as async-signal-safe and
+ // should be reimplemented safely.
+ // TODO(jieyu): Move this logic to the subprocess (i.e.,
+ // mesos-containerizer launch).
+ if (::setsid() == -1) {
+ perror("Failed to put child in a new session");
+ _exit(1);
+ }
+
+ if (setup.isSome()) {
+ return setup.get()();
+ }
+
+ return 0;
+}
+
+
Try<pid_t> PosixLauncher::fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction)
+ const string& path,
+ const vector<string>& argv,
+ const Subprocess::IO& in,
+ const Subprocess::IO& out,
+ const Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<map<string, string> >& environment,
+ const Option<lambda::function<int()> >& setup)
{
if (pids.contains(containerId)) {
return Error("Process has already been forked for container " +
stringify(containerId));
}
- pid_t pid;
-
- if ((pid = ::fork()) == -1) {
- return ErrnoError("Failed to fork");
- }
-
- if (pid == 0) {
- // In child.
- // POSIX guarantees a forked child's pid does not match any existing
- // process group id so only a single setsid() is required and the session
- // id will be the pid.
- // TODO(idownes): perror is not listed as async-signal-safe and should be
- // reimplemented safely.
- if (setsid() == -1) {
- perror("Failed to put child in a new session");
- _exit(1);
- }
-
- // This function should exec() and therefore not return.
- childFunction();
-
- ABORT("Child failed to exec");
+ Try<Subprocess> child = subprocess(
+ path,
+ argv,
+ in,
+ out,
+ err,
+ flags,
+ environment,
+ lambda::bind(&childSetup, setup));
+
+ if (child.isError()) {
+ return Error("Failed to fork a child process: " + child.error());
}
- // parent.
- LOG(INFO) << "Forked child with pid '" << pid
+ LOG(INFO) << "Forked child with pid '" << child.get().pid()
<< "' for container '" << containerId << "'";
+
// Store the pid (session id and process group id).
- pids.put(containerId, pid);
+ pids.put(containerId, child.get().pid());
- return pid;
+ return child.get().pid();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/launcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/launcher.hpp b/src/slave/containerizer/launcher.hpp
index 835d7a9..18b3546 100644
--- a/src/slave/containerizer/launcher.hpp
+++ b/src/slave/containerizer/launcher.hpp
@@ -20,10 +20,15 @@
#define __LAUNCHER_HPP__
#include <list>
+#include <map>
+#include <string>
#include <process/future.hpp>
+#include <process/subprocess.hpp>
+#include <stout/flags.hpp>
#include <stout/lambda.hpp>
+#include <stout/option.hpp>
#include <stout/try.hpp>
#include "slave/flags.hpp"
@@ -42,23 +47,33 @@ public:
virtual process::Future<Nothing> recover(
const std::list<state::RunState>& states) = 0;
- // Fork a new process in the containerized context. The child will call the
- // specified function and the parent will return the child's pid.
- // NOTE: The function must be async-signal safe and should exec as soon as
- // possible.
+ // Fork a new process in the containerized context. The child will
+ // exec the binary at the given path with the given argv, flags and
+ // environment. The I/O of the child will be redirected according to
+ // the specified I/O descriptors. The user can provide a 'setup'
+ // function which will be invoked in the child process right before
+ // the exec. The 'setup' function has to be async signal safe. The
+ // parent will return the child's pid if the fork is successful.
virtual Try<pid_t> fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction) = 0;
+ const std::string& path,
+ const std::vector<std::string>& argv,
+ const process::Subprocess::IO& in,
+ const process::Subprocess::IO& out,
+ const process::Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<std::map<std::string, std::string> >& environment,
+ const Option<lambda::function<int()> >& setup) = 0;
// Kill all processes in the containerized context.
virtual process::Future<Nothing> destroy(const ContainerID& containerId) = 0;
};
-// Launcher suitable for any POSIX compliant system. Uses process groups and
-// sessions to track processes in a container. POSIX states that process groups
-// cannot migrate between sessions so all processes for a container will be
-// contained in a session.
+// Launcher suitable for any POSIX compliant system. Uses process
+// groups and sessions to track processes in a container. POSIX states
+// that process groups cannot migrate between sessions so all
+// processes for a container will be contained in a session.
class PosixLauncher : public Launcher
{
public:
@@ -71,15 +86,22 @@ public:
virtual Try<pid_t> fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction);
+ const std::string& path,
+ const std::vector<std::string>& argv,
+ const process::Subprocess::IO& in,
+ const process::Subprocess::IO& out,
+ const process::Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<std::map<std::string, std::string> >& environment,
+ const Option<lambda::function<int()> >& setup);
virtual process::Future<Nothing> destroy(const ContainerID& containerId);
private:
PosixLauncher() {}
- // The 'pid' is the process id of the first process and also the process
- // group id and session id.
+ // The 'pid' is the process id of the first process and also the
+ // process group id and session id.
hashmap<ContainerID, pid_t> pids;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/linux_launcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.cpp b/src/slave/containerizer/linux_launcher.cpp
index 7ebccb4..1ce03e3 100644
--- a/src/slave/containerizer/linux_launcher.cpp
+++ b/src/slave/containerizer/linux_launcher.cpp
@@ -38,6 +38,7 @@
using namespace process;
using std::list;
+using std::map;
using std::string;
using std::vector;
@@ -59,7 +60,9 @@ LinuxLauncher::LinuxLauncher(
Try<Launcher*> LinuxLauncher::create(const Flags& flags)
{
Try<string> hierarchy = cgroups::prepare(
- flags.cgroups_hierarchy, "freezer", flags.cgroups_root);
+ flags.cgroups_hierarchy,
+ "freezer",
+ flags.cgroups_root);
if (hierarchy.isError()) {
return Error("Failed to create Linux launcher: " + hierarchy.error());
@@ -68,7 +71,8 @@ Try<Launcher*> LinuxLauncher::create(const Flags& flags)
LOG(INFO) << "Using " << hierarchy.get()
<< " as the freezer hierarchy for the Linux launcher";
- // TODO(idownes): Inspect the isolation flag to determine namespaces to use.
+ // TODO(idownes): Inspect the isolation flag to determine namespaces
+ // to use.
int namespaces = 0;
return new LinuxLauncher(flags, namespaces, hierarchy.get());
@@ -94,10 +98,10 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
Try<bool> exists = cgroups::exists(hierarchy, cgroup(containerId));
if (!exists.get()) {
- // This may occur if the freezer cgroup was destroyed but the slave dies
- // before noticing this.
- // The containerizer will monitor the container's pid and notice that it
- // has exited, triggering destruction of the container.
+ // This may occur if the freezer cgroup was destroyed but the
+ // slave dies before noticing this. The containerizer will
+ // monitor the container's pid and notice that it has exited,
+ // triggering destruction of the container.
LOG(INFO) << "Couldn't find freezer cgroup for container " << containerId;
continue;
}
@@ -109,11 +113,12 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
pid_t pid = state.forkedPid.get();
if (pids.containsValue(pid)) {
- // This should (almost) never occur. There is the possibility that a new
- // executor is launched with the same pid as one that just exited (highly
- // unlikely) and the slave dies after the new executor is launched but
- // before it hears about the termination of the earlier executor (also
- // unlikely). Regardless, the launcher can't do anything sensible so this
+ // This should (almost) never occur. There is the possibility
+ // that a new executor is launched with the same pid as one that
+ // just exited (highly unlikely) and the slave dies after the
+ // new executor is launched but before it hears about the
+ // termination of the earlier executor (also unlikely).
+ // Regardless, the launcher can't do anything sensible so this
// is considered an error.
return Failure("Detected duplicate pid " + stringify(pid) +
" for container " + stringify(containerId));
@@ -149,58 +154,84 @@ Future<Nothing> LinuxLauncher::recover(const std::list<state::RunState>& states)
// Helper for clone() which expects an int(void*).
-static int childMain(void* child)
+static int childMain(void* _func)
{
const lambda::function<int()>* func =
- static_cast<const lambda::function<int()>*> (child);
+ static_cast<const lambda::function<int()>*> (_func);
return (*func)();
}
-// Helper that creates a new session then blocks on reading the pipe before
-// calling the supplied function.
-static int _childMain(
- const lambda::function<int()>& childFunction,
- int pipes[2])
+// The customized clone function which will be used by 'subprocess()'.
+static pid_t clone(const lambda::function<int()>& func, int namespaces)
{
- // In child.
- os::close(pipes[1]);
+ // Stack for the child.
+ // - unsigned long long used for best alignment.
+ // - static is ok because each child gets their own copy after the clone.
+ // - 8 MiB appears to be the default for "ulimit -s" on OSX and Linux.
+ static unsigned long long stack[(8*1024*1024)/sizeof(unsigned long long)];
+
+ LOG(INFO) << "Cloning child process with flags = " << namespaces;
+
+ return ::clone(
+ childMain,
+ &stack[sizeof(stack)/sizeof(stack[0]) - 1], // stack grows down
+ namespaces | SIGCHLD, // Specify SIGCHLD as child termination signal
+ (void*) &func);
+}
- // Move to a different session (and new process group) so we're independent
- // from the slave's session (otherwise children will receive SIGHUP if the
- // slave exits).
- // TODO(idownes): perror is not listed as async-signal-safe and should be
- // reimplemented safely.
- if (setsid() == -1) {
- perror("Failed to put child in a new session");
- os::close(pipes[0]);
- _exit(1);
- }
- // Do a blocking read on the pipe until the parent signals us to continue.
- int buf;
- int len;
- while ((len = read(pipes[0], &buf, sizeof(buf))) == -1 && errno == EINTR);
+static int childSetup(
+ int pipes[2],
+ const Option<lambda::function<int()> >& setup)
+{
+ // In child.
+ while (::close(pipes[1]) == -1 && errno == EINTR);
- if (len != sizeof(buf)) {
+ // Do a blocking read on the pipe until the parent signals us to
+ // continue.
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
ABORT("Failed to synchronize with parent");
}
- os::close(pipes[0]);
+ while (::close(pipes[0]) == -1 && errno == EINTR);
- // This function should exec() and therefore not return.
- childFunction();
+ // Move to a different session (and new process group) so we're
+ // independent from the slave's session (otherwise children will
+ // receive SIGHUP if the slave exits).
+ // TODO(idownes): perror is not listed as async-signal-safe and
+ // should be reimplemented safely.
+ // TODO(jieyu): Move this logic to the subprocess (i.e.,
+ // mesos-containerizer launch).
+ if (::setsid() == -1) {
+ perror("Failed to put child in a new session");
+ return 1;
+ }
- ABORT("Child failed to exec");
+ if (setup.isSome()) {
+ return setup.get()();
+ }
- return -1;
+ return 0;
}
Try<pid_t> LinuxLauncher::fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction)
+ const string& path,
+ const vector<string>& argv,
+ const process::Subprocess::IO& in,
+ const process::Subprocess::IO& out,
+ const process::Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<map<string, string> >& environment,
+ const Option<lambda::function<int()> >& setup)
{
// Create a freezer cgroup for this container if necessary.
Try<bool> exists = cgroups::exists(hierarchy, cgroup(containerId));
@@ -217,70 +248,72 @@ Try<pid_t> LinuxLauncher::fork(
}
}
- // Use a pipe to block the child until it's been moved into the freezer
- // cgroup.
+ // Use a pipe to block the child until it's been moved into the
+ // freezer cgroup.
int pipes[2];
- // We assume this should not fail under reasonable conditions so we use CHECK.
- CHECK(pipe(pipes) == 0);
- // Use the _childMain helper which moves the child into a new session and
- // blocks on the pipe until we're ready for it to run.
- lambda::function<int()> func =
- lambda::bind(&_childMain, childFunction, pipes);
-
- // Stack for the child.
- // - unsigned long long used for best alignment.
- // - static is ok because each child gets their own copy after the clone.
- // - 8 MiB appears to be the default for "ulimit -s" on OSX and Linux.
- static unsigned long long stack[(8*1024*1024)/sizeof(unsigned long long)];
-
- LOG(INFO) << "Cloning child process with flags = " << namespaces;
-
- pid_t pid;
- if ((pid = ::clone(
- childMain,
- &stack[sizeof(stack)/sizeof(stack[0]) - 1], // stack grows down
- namespaces | SIGCHLD, // Specify SIGCHLD as child termination signal
- static_cast<void*>(&func))) == -1) {
- return ErrnoError("Failed to clone child process");
+ // We assume this should not fail under reasonable conditions so we
+ // use CHECK.
+ CHECK_EQ(0, ::pipe(pipes));
+
+ Try<Subprocess> child = subprocess(
+ path,
+ argv,
+ in,
+ out,
+ err,
+ flags,
+ environment,
+ lambda::bind(&childSetup, pipes, setup),
+ lambda::bind(&clone, lambda::_1, namespaces));
+
+ if (child.isError()) {
+ return Error("Failed to clone child process: " + child.error());
}
// Parent.
os::close(pipes[0]);
- // Move the child into the freezer cgroup. Any grandchildren will also be
- // contained in the cgroup.
- Try<Nothing> assign = cgroups::assign(hierarchy, cgroup(containerId), pid);
+ // Move the child into the freezer cgroup. Any grandchildren will
+ // also be contained in the cgroup.
+ // TODO(jieyu): Move this logic to the subprocess (i.e.,
+ // mesos-containerizer launch).
+ Try<Nothing> assign = cgroups::assign(
+ hierarchy,
+ cgroup(containerId),
+ child.get().pid());
if (assign.isError()) {
- LOG(ERROR) << "Failed to assign process " << pid
+ LOG(ERROR) << "Failed to assign process " << child.get().pid()
<< " of container '" << containerId << "'"
<< " to its freezer cgroup: " << assign.error();
- kill(pid, SIGKILL);
+
+ ::kill(child.get().pid(), SIGKILL);
return Error("Failed to contain process");
}
- // Now that we've contained the child we can signal it to continue by
- // writing to the pipe.
- int buf;
- ssize_t len;
- while ((len = write(pipes[1], &buf, sizeof(buf))) == -1 && errno == EINTR);
+ // Now that we've contained the child we can signal it to continue
+ // by writing to the pipe.
+ char dummy;
+ ssize_t length;
+ while ((length = ::write(pipes[1], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
- if (len != sizeof(buf)) {
+ os::close(pipes[1]);
+
+ if (length != sizeof(dummy)) {
// Ensure the child is killed.
- kill(pid, SIGKILL);
- os::close(pipes[1]);
+ ::kill(child.get().pid(), SIGKILL);
return Error("Failed to synchronize child process");
}
- os::close(pipes[1]);
- // Store the pid (session id and process group id) if this is the first
- // process forked for this container.
+ // Store the pid (session id and process group id) if this is the
+ // first process forked for this container.
if (!pids.contains(containerId)) {
- pids.put(containerId, pid);
+ pids.put(containerId, child.get().pid());
}
- return pid;
+ return child.get().pid();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/linux_launcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/linux_launcher.hpp b/src/slave/containerizer/linux_launcher.hpp
index 622810c..3d9794d 100644
--- a/src/slave/containerizer/linux_launcher.hpp
+++ b/src/slave/containerizer/linux_launcher.hpp
@@ -25,8 +25,8 @@ namespace mesos {
namespace internal {
namespace slave {
-// Launcher for Linux systems with cgroups. Uses a freezer cgroup to track
-// pids.
+// Launcher for Linux systems with cgroups. Uses a freezer cgroup to
+// track pids.
class LinuxLauncher : public Launcher
{
public:
@@ -39,7 +39,14 @@ public:
virtual Try<pid_t> fork(
const ContainerID& containerId,
- const lambda::function<int()>& childFunction);
+ const std::string& path,
+ const std::vector<std::string>& argv,
+ const process::Subprocess::IO& in,
+ const process::Subprocess::IO& out,
+ const process::Subprocess::IO& err,
+ const Option<flags::FlagsBase>& flags,
+ const Option<std::map<std::string, std::string> >& environment,
+ const Option<lambda::function<int()> >& setup);
virtual process::Future<Nothing> destroy(const ContainerID& containerId);
@@ -56,8 +63,8 @@ private:
std::string cgroup(const ContainerID& containerId);
- // The 'pid' is the process id of the child process and also the process
- // group id and session id.
+ // The 'pid' is the process id of the child process and also the
+ // process group id and session id.
hashmap<ContainerID, pid_t> pids;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
new file mode 100644
index 0000000..27f8e09
--- /dev/null
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -0,0 +1,1048 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/io.hpp>
+#include <process/reap.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/os.hpp>
+
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+#ifdef __linux__
+#include "slave/containerizer/linux_launcher.hpp"
+#endif // __linux__
+
+#include "slave/containerizer/isolators/posix.hpp"
+#ifdef __linux__
+#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
+#include "slave/containerizer/isolators/cgroups/mem.hpp"
+#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
+#endif // __linux__
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+#include "slave/containerizer/mesos/launch.hpp"
+
+using std::list;
+using std::map;
+using std::string;
+using std::vector;
+
+using namespace process;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+using state::SlaveState;
+using state::FrameworkState;
+using state::ExecutorState;
+using state::RunState;
+
+// Local function declaration/definitions.
+Future<Nothing> _nothing() { return Nothing(); }
+
+
+// Helper method to build the environment map used to launch fetcher.
+map<string, string> fetcherEnvironment(
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const Flags& flags)
+{
+ // Prepare the environment variables to pass to mesos-fetcher.
+ string uris = "";
+ foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
+ uris += uri.value() + "+" +
+ (uri.has_executable() && uri.executable() ? "1" : "0") +
+ (uri.extract() ? "X" : "N");
+ uris += " ";
+ }
+ // Remove extra space at the end.
+ uris = strings::trim(uris);
+
+ map<string, string> environment;
+ environment["MESOS_EXECUTOR_URIS"] = uris;
+ environment["MESOS_WORK_DIRECTORY"] = directory;
+ if (user.isSome()) {
+ environment["MESOS_USER"] = user.get();
+ }
+ if (!flags.frameworks_home.empty()) {
+ environment["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
+ }
+ if (!flags.hadoop_home.empty()) {
+ environment["HADOOP_HOME"] = flags.hadoop_home;
+ }
+
+ return environment;
+}
+
+
+Try<MesosContainerizer*> MesosContainerizer::create(
+ const Flags& flags,
+ bool local)
+{
+ string isolation;
+ if (flags.isolation == "process") {
+ LOG(WARNING) << "The 'process' isolation flag is deprecated, "
+ << "please update your flags to"
+ << " '--isolation=posix/cpu,posix/mem'.";
+ isolation = "posix/cpu,posix/mem";
+ } else if (flags.isolation == "cgroups") {
+ LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
+ << "please update your flags to"
+ << " '--isolation=cgroups/cpu,cgroups/mem'.";
+ isolation = "cgroups/cpu,cgroups/mem";
+ } else {
+ isolation = flags.isolation;
+ }
+
+ LOG(INFO) << "Using isolation: " << isolation;
+
+ // Create a MesosContainerizerProcess using isolators and a launcher.
+ hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
+
+ creators["posix/cpu"] = &PosixCpuIsolatorProcess::create;
+ creators["posix/mem"] = &PosixMemIsolatorProcess::create;
+#ifdef __linux__
+ creators["cgroups/cpu"] = &CgroupsCpushareIsolatorProcess::create;
+ creators["cgroups/mem"] = &CgroupsMemIsolatorProcess::create;
+ creators["cgroups/perf_event"] = &CgroupsPerfEventIsolatorProcess::create;
+#endif // __linux__
+
+ vector<Owned<Isolator> > isolators;
+
+ foreach (const string& type, strings::split(isolation, ",")) {
+ if (creators.contains(type)) {
+ Try<Isolator*> isolator = creators[type](flags);
+ if (isolator.isError()) {
+ return Error(
+ "Could not create isolator " + type + ": " + isolator.error());
+ } else {
+ isolators.push_back(Owned<Isolator>(isolator.get()));
+ }
+ } else {
+ return Error("Unknown or unsupported isolator: " + type);
+ }
+ }
+
+#ifdef __linux__
+ // Use cgroups on Linux if any cgroups isolators are used.
+ Try<Launcher*> launcher = strings::contains(isolation, "cgroups")
+ ? LinuxLauncher::create(flags)
+ : PosixLauncher::create(flags);
+#else
+ Try<Launcher*> launcher = PosixLauncher::create(flags);
+#endif // __linux__
+
+ if (launcher.isError()) {
+ return Error("Failed to create launcher: " + launcher.error());
+ }
+
+ return new MesosContainerizer(
+ flags, local, Owned<Launcher>(launcher.get()), isolators);
+}
+
+
+MesosContainerizer::MesosContainerizer(
+ const Flags& flags,
+ bool local,
+ const Owned<Launcher>& launcher,
+ const vector<Owned<Isolator> >& isolators)
+{
+ process = new MesosContainerizerProcess(
+ flags, local, launcher, isolators);
+ spawn(process);
+}
+
+
+MesosContainerizer::~MesosContainerizer()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+Future<Nothing> MesosContainerizer::recover(
+ const Option<state::SlaveState>& state)
+{
+ return dispatch(process, &MesosContainerizerProcess::recover, state);
+}
+
+
+Future<Nothing> MesosContainerizer::launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ return dispatch(process,
+ &MesosContainerizerProcess::launch,
+ containerId,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+}
+
+
+Future<Nothing> MesosContainerizer::launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ return dispatch(process,
+ &MesosContainerizerProcess::launch,
+ containerId,
+ taskInfo,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+}
+
+
+Future<Nothing> MesosContainerizer::update(
+ const ContainerID& containerId,
+ const Resources& resources)
+{
+ return dispatch(process,
+ &MesosContainerizerProcess::update,
+ containerId,
+ resources);
+}
+
+
+Future<ResourceStatistics> MesosContainerizer::usage(
+ const ContainerID& containerId)
+{
+ return dispatch(process, &MesosContainerizerProcess::usage, containerId);
+}
+
+
+Future<containerizer::Termination> MesosContainerizer::wait(
+ const ContainerID& containerId)
+{
+ return dispatch(process, &MesosContainerizerProcess::wait, containerId);
+}
+
+
+void MesosContainerizer::destroy(const ContainerID& containerId)
+{
+ dispatch(process, &MesosContainerizerProcess::destroy, containerId);
+}
+
+
+Future<hashset<ContainerID> > MesosContainerizer::containers()
+{
+ return dispatch(process, &MesosContainerizerProcess::containers);
+}
+
+
+Future<Nothing> MesosContainerizerProcess::recover(
+ const Option<state::SlaveState>& state)
+{
+ LOG(INFO) << "Recovering containerizer";
+
+ // Gather the executor run states that we will attempt to recover.
+ list<RunState> recoverable;
+ if (state.isSome()) {
+ foreachvalue (const FrameworkState& framework, state.get().frameworks) {
+ foreachvalue (const ExecutorState& executor, framework.executors) {
+ if (executor.info.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its info could not be recovered";
+ continue;
+ }
+
+ if (executor.latest.isNone()) {
+ LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its latest run could not be recovered";
+ continue;
+ }
+
+ // We are only interested in the latest run of the executor!
+ const ContainerID& containerId = executor.latest.get();
+ Option<RunState> run = executor.runs.get(containerId);
+ CHECK_SOME(run);
+
+ // We need the pid so the reaper can monitor the executor so skip this
+ // executor if it's not present. This is not an error because the slave
+ // will try to wait on the container which will return a failed
+ // Termination and everything will get cleaned up.
+ if (!run.get().forkedPid.isSome()) {
+ continue;
+ }
+
+ if (run.get().completed) {
+ VLOG(1) << "Skipping recovery of executor '" << executor.id
+ << "' of framework " << framework.id
+ << " because its latest run "
+ << containerId << " is completed";
+ continue;
+ }
+
+ LOG(INFO) << "Recovering container '" << containerId
+ << "' for executor '" << executor.id
+ << "' of framework " << framework.id;
+
+ recoverable.push_back(run.get());
+ }
+ }
+ }
+
+ // Try to recover the launcher first.
+ return launcher->recover(recoverable)
+ .then(defer(self(), &Self::_recover, recoverable));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::_recover(
+ const list<RunState>& recoverable)
+{
+ // Then recover the isolators.
+ list<Future<Nothing> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->recover(recoverable));
+ }
+
+ // If all isolators recover then continue.
+ return collect(futures)
+ .then(defer(self(), &Self::__recover, recoverable));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::__recover(
+ const list<RunState>& recovered)
+{
+ foreach (const RunState& run, recovered) {
+ CHECK_SOME(run.id);
+ const ContainerID& containerId = run.id.get();
+
+ Owned<Promise<containerizer::Termination> > promise(
+ new Promise<containerizer::Termination>());
+ promises.put(containerId, promise);
+
+ CHECK_SOME(run.forkedPid);
+ Future<Option<int > > status = process::reap(run.forkedPid.get());
+ statuses[containerId] = status;
+ status.onAny(defer(self(), &Self::reaped, containerId));
+
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ isolator->watch(containerId)
+ .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
+ }
+ }
+
+ return Nothing();
+}
+
+
+// Launching an executor involves the following steps:
+// 1. Call prepare on each isolator.
+// 2. Fork the executor. The forked child is blocked from exec'ing until it has
+// been isolated.
+// 3. Isolate the executor. Call isolate with the pid for each isolator.
+// 4. Fetch the executor.
+// 4. Exec the executor. The forked child is signalled to continue. It will
+// first execute any preparation commands from isolators and then exec the
+// executor.
+Future<Nothing> MesosContainerizerProcess::launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ if (promises.contains(containerId)) {
+ LOG(ERROR) << "Cannot start already running container '"
+ << containerId << "'";
+ return Failure("Container already started");
+ }
+
+ // TODO(tillt): The slave should expose which containerization
+ // mechanisms it supports to avoid scheduling tasks that it cannot
+ // run.
+ const CommandInfo& command = executorInfo.command();
+ if (command.has_container()) {
+ // We return a Failure as this containerizer does not support
+ // handling ContainerInfo. Users have to be made aware of this
+ // lack of support to prevent confusion in the task configuration.
+ return Failure("ContainerInfo is not supported");
+ }
+
+ Owned<Promise<containerizer::Termination> > promise(
+ new Promise<containerizer::Termination>());
+ promises.put(containerId, promise);
+
+ // Store the resources for usage().
+ resources.put(containerId, executorInfo.resources());
+
+ LOG(INFO) << "Starting container '" << containerId
+ << "' for executor '" << executorInfo.executor_id()
+ << "' of framework '" << executorInfo.framework_id() << "'";
+
+ return prepare(containerId, executorInfo, directory, user)
+ .then(defer(self(),
+ &Self::_launch,
+ containerId,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint,
+ lambda::_1))
+ .onFailed(defer(self(),
+ &Self::destroy,
+ containerId));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::launch(
+ const ContainerID& containerId,
+ const TaskInfo&,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+{
+ return launch(
+ containerId,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+}
+
+Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user)
+{
+ // Start preparing all isolators (in parallel) and gather any additional
+ // preparation comands that must be run in the forked child before exec'ing
+ // the executor.
+ list<Future<Option<CommandInfo> > > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->prepare(containerId, executorInfo));
+ }
+
+ // Wait for all isolators to complete preparations.
+ return collect(futures);
+}
+
+
+Future<Nothing> _fetch(
+ const ContainerID& containerId,
+ const string& directory,
+ const Option<string>& user,
+ const Option<int>& status)
+{
+ if (status.isNone() || (status.get() != 0)) {
+ return Failure("Failed to fetch URIs for container '" +
+ stringify(containerId) + "': exit status " +
+ (status.isNone() ? "none" : stringify(status.get())));
+ }
+
+ // Chown the work directory if a user is provided.
+ if (user.isSome()) {
+ Try<Nothing> chown = os::chown(user.get(), directory);
+ if (chown.isError()) {
+ return Failure("Failed to chown work directory");
+ }
+ }
+
+ return Nothing();
+}
+
+
+Future<Nothing> MesosContainerizerProcess::fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const string& directory,
+ const Option<string>& user)
+{
+ // Determine path for mesos-fetcher.
+ Result<string> realpath = os::realpath(
+ path::join(flags.launcher_dir, "mesos-fetcher"));
+
+ if (!realpath.isSome()) {
+ LOG(ERROR) << "Failed to determine the canonical path "
+ << "for the mesos-fetcher '"
+ << path::join(flags.launcher_dir, "mesos-fetcher")
+ << "': "
+ << (realpath.isError() ? realpath.error()
+ : "No such file or directory");
+ return Failure("Could not fetch URIs: failed to find mesos-fetcher");
+ }
+
+ map<string, string> environment =
+ fetcherEnvironment(commandInfo, directory, user, flags);
+
+ // Now the actual mesos-fetcher command.
+ string command = realpath.get();
+
+ LOG(INFO) << "Fetching URIs for container '" << containerId
+ << "' using command '" << command << "'";
+
+ Try<Subprocess> fetcher = subprocess(
+ command,
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ environment);
+
+ if (fetcher.isError()) {
+ return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
+ }
+
+ // Redirect output (stdout and stderr) from the fetcher to log files
+ // in the executor work directory, chown'ing them if a user is
+ // specified.
+ // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
+ // TODO(tillt): Consider adding an overload to io::redirect
+ // that accepts a file path as 'to' for further reducing code. We
+ // would however also need an owner user parameter for such overload
+ // to perfectly replace the below.
+ Try<int> out = os::open(
+ path::join(directory, "stdout"),
+ O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ if (out.isError()) {
+ return Failure("Failed to redirect stdout: " + out.error());
+ }
+
+ if (user.isSome()) {
+ Try<Nothing> chown = os::chown(
+ user.get(), path::join(directory, "stdout"));
+ if (chown.isError()) {
+ os::close(out.get());
+ return Failure(
+ "Failed to redirect stdout: Failed to chown: " +
+ chown.error());
+ }
+ }
+
+ // Redirect takes care of nonblocking and close-on-exec for the
+ // supplied file descriptors.
+ io::redirect(fetcher.get().out().get(), out.get());
+
+ // Redirect does 'dup' the file descriptor, hence we can close the
+ // original now.
+ os::close(out.get());
+
+ // Repeat for stderr.
+ Try<int> err = os::open(
+ path::join(directory, "stderr"),
+ O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
+
+ if (err.isError()) {
+ return Failure(
+ "Failed to redirect stderr: Failed to open: " +
+ err.error());
+ }
+
+ if (user.isSome()) {
+ Try<Nothing> chown = os::chown(
+ user.get(), path::join(directory, "stderr"));
+ if (chown.isError()) {
+ os::close(err.get());
+ return Failure(
+ "Failed to redirect stderr: Failed to chown: " +
+ chown.error());
+ }
+ }
+
+ io::redirect(fetcher.get().err().get(), err.get());
+
+ os::close(err.get());
+
+ return fetcher.get().status()
+ .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::_launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint,
+ const list<Option<CommandInfo> >& commands)
+{
+ // Prepare environment variables for the executor.
+ map<string, string> env = executorEnvironment(
+ executorInfo,
+ directory,
+ slaveId,
+ slavePid,
+ checkpoint,
+ flags.recovery_timeout);
+
+ // Include any enviroment variables from CommandInfo.
+ foreach (const Environment::Variable& variable,
+ executorInfo.command().environment().variables()) {
+ env[variable.name()] = variable.value();
+ }
+
+ // Use a pipe to block the child until it's been isolated.
+ int pipes[2];
+
+ // We assume this should not fail under reasonable conditions so we
+ // use CHECK.
+ CHECK(pipe(pipes) == 0);
+
+ // Prepare the flags to pass to the launch process.
+ MesosContainerizerLaunch::Flags launchFlags;
+
+ launchFlags.command = JSON::Protobuf(executorInfo.command());
+ launchFlags.directory = directory;
+ launchFlags.user = user;
+ launchFlags.pipe_read = pipes[0];
+ launchFlags.pipe_write = pipes[1];
+
+ // Prepare the additional preparation commands.
+ // TODO(jieyu): Use JSON::Array once we have generic parse support.
+ JSON::Object object;
+ JSON::Array array;
+ foreach (const Option<CommandInfo>& command, commands) {
+ if (command.isSome()) {
+ array.values.push_back(JSON::Protobuf(command.get()));
+ }
+ }
+ object.values["commands"] = array;
+
+ launchFlags.commands = object;
+
+ // Fork the child using launcher.
+ vector<string> argv(2);
+ argv[0] = "mesos-containerizer";
+ argv[1] = MesosContainerizerLaunch::NAME;
+
+ Try<pid_t> forked = launcher->fork(
+ containerId,
+ path::join(flags.launcher_dir, "mesos-containerizer"),
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ (local ? Subprocess::FD(STDOUT_FILENO)
+ : Subprocess::PATH(path::join(directory, "stdout"))),
+ (local ? Subprocess::FD(STDERR_FILENO)
+ : Subprocess::PATH(path::join(directory, "stderr"))),
+ launchFlags,
+ env,
+ None());
+
+ if (forked.isError()) {
+ return Failure("Failed to fork executor: " + forked.error());
+ }
+ pid_t pid = forked.get();
+
+ // Checkpoint the executor's pid if requested.
+ if (checkpoint) {
+ const string& path = slave::paths::getForkedPidPath(
+ slave::paths::getMetaRootDir(flags.work_dir),
+ slaveId,
+ executorInfo.framework_id(),
+ executorInfo.executor_id(),
+ containerId);
+
+ LOG(INFO) << "Checkpointing executor's forked pid " << pid
+ << " to '" << path << "'";
+
+ Try<Nothing> checkpointed =
+ slave::state::checkpoint(path, stringify(pid));
+
+ if (checkpointed.isError()) {
+ LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
+ << path << "': " << checkpointed.error();
+
+ return Failure("Could not checkpoint executor's pid");
+ }
+ }
+
+ // Monitor the executor's pid. We keep the future because we'll
+ // refer to it again during container destroy.
+ Future<Option<int> > status = process::reap(pid);
+ statuses.put(containerId, status);
+ status.onAny(defer(self(), &Self::reaped, containerId));
+
+ return isolate(containerId, pid)
+ .then(defer(self(),
+ &Self::fetch,
+ containerId,
+ executorInfo.command(),
+ directory,
+ user))
+ .then(defer(self(), &Self::exec, containerId, pipes[1]))
+ .onAny(lambda::bind(&os::close, pipes[0]))
+ .onAny(lambda::bind(&os::close, pipes[1]));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::isolate(
+ const ContainerID& containerId,
+ pid_t _pid)
+{
+ // Set up callbacks for isolator limitations.
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ isolator->watch(containerId)
+ .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
+ }
+
+ // Isolate the executor with each isolator.
+ list<Future<Nothing> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->isolate(containerId, _pid));
+ }
+
+ // Wait for all isolators to complete.
+ return collect(futures)
+ .then(lambda::bind(&_nothing));
+}
+
+
+Future<Nothing> MesosContainerizerProcess::exec(
+ const ContainerID& containerId,
+ int pipeWrite)
+{
+ CHECK(promises.contains(containerId));
+
+ // Now that we've contained the child we can signal it to continue by
+ // writing to the pipe.
+ char dummy;
+ ssize_t length;
+ while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
+ return Failure("Failed to synchronize child process: " +
+ string(strerror(errno)));
+ }
+
+ return Nothing();
+}
+
+
+Future<containerizer::Termination> MesosContainerizerProcess::wait(
+ const ContainerID& containerId)
+{
+ if (!promises.contains(containerId)) {
+ return Failure("Unknown container: " + stringify(containerId));
+ }
+
+ return promises[containerId]->future();
+}
+
+
+Future<Nothing> MesosContainerizerProcess::update(
+ const ContainerID& containerId,
+ const Resources& _resources)
+{
+ // The resources hashmap won't initially contain the container's resources
+ // after recovery so we must check the promises hashmap (which is set during
+ // recovery).
+ if (!promises.contains(containerId)) {
+ // It is not considered a failure if the container is not known
+ // because the slave will attempt to update the container's
+ // resources on a task's terminal state change but the executor
+ // may have already exited and the container cleaned up.
+ LOG(WARNING) << "Ignoring update for unknown container: " << containerId;
+ return Nothing();
+ }
+
+ // Store the resources for usage().
+ resources.put(containerId, _resources);
+
+ // Update each isolator.
+ list<Future<Nothing> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->update(containerId, _resources));
+ }
+
+ // Wait for all isolators to complete.
+ return collect(futures)
+ .then(lambda::bind(&_nothing));
+}
+
+
+// Resources are used to set the limit fields in the statistics but are
+// optional because they aren't known after recovery until/unless update() is
+// called.
+Future<ResourceStatistics> _usage(
+ const ContainerID& containerId,
+ const Option<Resources>& resources,
+ const list<Future<ResourceStatistics> >& statistics)
+{
+ ResourceStatistics result;
+
+ // Set the timestamp now we have all statistics.
+ result.set_timestamp(Clock::now().secs());
+
+ foreach (const Future<ResourceStatistics>& statistic, statistics) {
+ if (statistic.isReady()) {
+ result.MergeFrom(statistic.get());
+ } else {
+ LOG(WARNING) << "Skipping resource statistic for container "
+ << containerId << " because: "
+ << (statistic.isFailed() ? statistic.failure()
+ : "discarded");
+ }
+ }
+
+ if (resources.isSome()) {
+ // Set the resource allocations.
+ Option<Bytes> mem = resources.get().mem();
+ if (mem.isSome()) {
+ result.set_mem_limit_bytes(mem.get().bytes());
+ }
+
+ Option<double> cpus = resources.get().cpus();
+ if (cpus.isSome()) {
+ result.set_cpus_limit(cpus.get());
+ }
+ }
+
+ return result;
+}
+
+
+Future<ResourceStatistics> MesosContainerizerProcess::usage(
+ const ContainerID& containerId)
+{
+ if (!promises.contains(containerId)) {
+ return Failure("Unknown container: " + stringify(containerId));
+ }
+
+ list<Future<ResourceStatistics> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->usage(containerId));
+ }
+
+ // Use await() here so we can return partial usage statistics.
+ // TODO(idownes): After recovery resources won't be known until after an
+ // update() because they aren't part of the SlaveState.
+ return await(futures)
+ .then(lambda::bind(
+ _usage, containerId, resources.get(containerId), lambda::_1));
+}
+
+
+void MesosContainerizerProcess::destroy(const ContainerID& containerId)
+{
+ if (!promises.contains(containerId)) {
+ LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
+ return;
+ }
+
+ if (destroying.contains(containerId)) {
+ // Destroy has already been initiated.
+ return;
+ }
+ destroying.insert(containerId);
+
+ LOG(INFO) << "Destroying container '" << containerId << "'";
+
+ if (statuses.contains(containerId)) {
+ // Kill all processes then continue destruction.
+ launcher->destroy(containerId)
+ .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
+ } else {
+ // The executor never forked so no processes to kill, go straight to
+ // __destroy() with status = None().
+ __destroy(containerId, None());
+ }
+}
+
+
+void MesosContainerizerProcess::_destroy(
+ const ContainerID& containerId,
+ const Future<Nothing>& future)
+{
+ // Something has gone wrong and the launcher wasn't able to kill all the
+ // processes in the container. We cannot clean up the isolators because they
+ // may require that all processes have exited so just return the failure to
+ // the slave.
+ // TODO(idownes): This is a pretty bad state to be in but we should consider
+ // cleaning up here.
+ if (!future.isReady()) {
+ promises[containerId]->fail(
+ "Failed to destroy container: " +
+ (future.isFailed() ? future.failure() : "discarded future"));
+
+ destroying.erase(containerId);
+
+ return;
+ }
+
+ // We've successfully killed all processes in the container so get the exit
+ // status of the executor when it's ready (it may already be) and continue
+ // the destroy.
+ statuses.get(containerId).get()
+ .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::__destroy(
+ const ContainerID& containerId,
+ const Future<Option<int > >& status)
+{
+ // Now that all processes have exited we can now clean up all isolators.
+ list<Future<Nothing> > futures;
+ foreach (const Owned<Isolator>& isolator, isolators) {
+ futures.push_back(isolator->cleanup(containerId));
+ }
+
+ // Wait for all isolators to complete cleanup before continuing.
+ collect(futures)
+ .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::___destroy(
+ const ContainerID& containerId,
+ const Future<Option<int > >& status,
+ const Future<list<Nothing> >& futures)
+{
+ // Something has gone wrong with one of the Isolators and cleanup failed.
+ // We'll fail the container termination and remove the 'destroying' flag but
+ // leave all other state. The containerizer is now in a bad state because
+ // at least one isolator has failed to clean up.
+ if (!futures.isReady()) {
+ promises[containerId]->fail(
+ "Failed to clean up isolators when destroying container: " +
+ (futures.isFailed() ? futures.failure() : "discarded future"));
+
+ destroying.erase(containerId);
+
+ return;
+ }
+
+ // A container is 'killed' if any isolator limited it.
+ // Note: We may not see a limitation in time for it to be registered. This
+ // could occur if the limitation (e.g., an OOM) killed the executor and we
+ // triggered destroy() off the executor exit.
+ bool killed = false;
+ string message;
+ if (limitations.contains(containerId)) {
+ killed = true;
+ foreach (const Limitation& limitation, limitations.get(containerId)) {
+ message += limitation.message;
+ }
+ message = strings::trim(message);
+ } else {
+ message = "Executor terminated";
+ }
+
+ containerizer::Termination termination;
+ termination.set_killed(killed);
+ termination.set_message(message);
+ if (status.isReady() && status.get().isSome()) {
+ termination.set_status(status.get().get());
+ }
+
+ promises[containerId]->set(termination);
+
+ promises.erase(containerId);
+ statuses.erase(containerId);
+ limitations.erase(containerId);
+ resources.erase(containerId);
+ destroying.erase(containerId);
+}
+
+
+void MesosContainerizerProcess::reaped(const ContainerID& containerId)
+{
+ if (!promises.contains(containerId)) {
+ return;
+ }
+
+ LOG(INFO) << "Executor for container '" << containerId << "' has exited";
+
+ // The executor has exited so destroy the container.
+ destroy(containerId);
+}
+
+
+void MesosContainerizerProcess::limited(
+ const ContainerID& containerId,
+ const Future<Limitation>& future)
+{
+ if (!promises.contains(containerId)) {
+ return;
+ }
+
+ if (future.isReady()) {
+ LOG(INFO) << "Container " << containerId << " has reached its limit for"
+ << " resource " << future.get().resource
+ << " and will be terminated";
+ limitations.put(containerId, future.get());
+ } else {
+ // TODO(idownes): A discarded future will not be an error when isolators
+ // discard their promises after cleanup.
+ LOG(ERROR) << "Error in a resource limitation for container "
+ << containerId << ": " << (future.isFailed() ? future.failure()
+ : "discarded");
+ }
+
+ // The container has been affected by the limitation so destroy it.
+ destroy(containerId);
+}
+
+
+Future<hashset<ContainerID> > MesosContainerizerProcess::containers()
+{
+ return promises.keys();
+}
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
new file mode 100644
index 0000000..8746968
--- /dev/null
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_CONTAINERIZER_HPP__
+#define __MESOS_CONTAINERIZER_HPP__
+
+#include <list>
+#include <vector>
+
+#include <stout/hashmap.hpp>
+#include <stout/multihashmap.hpp>
+
+#include "slave/containerizer/containerizer.hpp"
+#include "slave/containerizer/isolator.hpp"
+#include "slave/containerizer/launcher.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+// Forward declaration.
+class MesosContainerizerProcess;
+
+class MesosContainerizer : public Containerizer
+{
+public:
+ static Try<MesosContainerizer*> create(const Flags& flags, bool local);
+
+ MesosContainerizer(
+ const Flags& flags,
+ bool local,
+ const process::Owned<Launcher>& launcher,
+ const std::vector<process::Owned<Isolator> >& isolators);
+
+ virtual ~MesosContainerizer();
+
+ virtual process::Future<Nothing> recover(
+ const Option<state::SlaveState>& state);
+
+ virtual process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ virtual process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ virtual process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ virtual process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ virtual process::Future<containerizer::Termination> wait(
+ const ContainerID& containerId);
+
+ virtual void destroy(const ContainerID& containerId);
+
+ virtual process::Future<hashset<ContainerID> > containers();
+
+private:
+ MesosContainerizerProcess* process;
+};
+
+
+class MesosContainerizerProcess
+ : public process::Process<MesosContainerizerProcess>
+{
+public:
+ MesosContainerizerProcess(
+ const Flags& _flags,
+ bool _local,
+ const process::Owned<Launcher>& _launcher,
+ const std::vector<process::Owned<Isolator> >& _isolators)
+ : flags(_flags),
+ local(_local),
+ launcher(_launcher),
+ isolators(_isolators) {}
+
+ virtual ~MesosContainerizerProcess() {}
+
+ process::Future<Nothing> recover(
+ const Option<state::SlaveState>& state);
+
+ process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ process::Future<Nothing> launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ process::Future<containerizer::Termination> wait(
+ const ContainerID& containerId);
+
+ void destroy(const ContainerID& containerId);
+
+ process::Future<hashset<ContainerID> > containers();
+
+private:
+ process::Future<Nothing> _recover(
+ const std::list<state::RunState>& recoverable);
+
+ process::Future<Nothing> __recover(
+ const std::list<state::RunState>& recovered);
+
+ process::Future<std::list<Option<CommandInfo> > > prepare(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user);
+
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& directory,
+ const Option<std::string>& user);
+
+ process::Future<Nothing> _launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint,
+ const std::list<Option<CommandInfo> >& scripts);
+
+ process::Future<Nothing> isolate(
+ const ContainerID& containerId,
+ pid_t _pid);
+
+ process::Future<Nothing> exec(
+ const ContainerID& containerId,
+ int pipeWrite);
+
+ // Continues 'destroy()' once all processes have been killed by the launcher.
+ void _destroy(
+ const ContainerID& containerId,
+ const process::Future<Nothing>& future);
+
+ // Continues '_destroy()' once we get the exit status of the executor.
+ void __destroy(
+ const ContainerID& containerId,
+ const process::Future<Option<int > >& status);
+
+ // Continues (and completes) '__destroy()' once all isolators have completed
+ // cleanup.
+ void ___destroy(
+ const ContainerID& containerId,
+ const process::Future<Option<int > >& status,
+ const process::Future<std::list<Nothing> >& futures);
+
+ // Call back for when an isolator limits a container and impacts the
+ // processes. This will trigger container destruction.
+ void limited(
+ const ContainerID& containerId,
+ const process::Future<Limitation>& future);
+
+ // Call back for when the executor exits. This will trigger container
+ // destroy.
+ void reaped(const ContainerID& containerId);
+
+ const Flags flags;
+ const bool local;
+ const process::Owned<Launcher> launcher;
+ const std::vector<process::Owned<Isolator> > isolators;
+
+ // TODO(idownes): Consider putting these per-container variables into a
+ // struct.
+ // Promises for futures returned from wait().
+ hashmap<ContainerID,
+ process::Owned<process::Promise<containerizer::Termination> > > promises;
+
+ // We need to keep track of the future exit status for each executor because
+ // we'll only get a single notification when the executor exits.
+ hashmap<ContainerID, process::Future<Option<int> > > statuses;
+
+ // We keep track of any limitations received from each isolator so we can
+ // determine the cause of an executor termination.
+ multihashmap<ContainerID, Limitation> limitations;
+
+ // We keep track of the resources for each container so we can set the
+ // ResourceStatistics limits in usage().
+ hashmap<ContainerID, Resources> resources;
+
+ // Set of containers that are in process of being destroyed.
+ hashset<ContainerID> destroying;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_CONTAINERIZER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/launch.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/launch.cpp b/src/slave/containerizer/mesos/launch.cpp
new file mode 100644
index 0000000..2db1c7a
--- /dev/null
+++ b/src/slave/containerizer/mesos/launch.cpp
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <unistd.h>
+
+#include <iostream>
+#include <map>
+
+#include <stout/foreach.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/unreachable.hpp>
+
+#include <stout/os/execenv.hpp>
+
+#include "mesos/mesos.hpp"
+
+#include "slave/containerizer/mesos/launch.hpp"
+
+using std::cerr;
+using std::endl;
+using std::map;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+const string MesosContainerizerLaunch::NAME = "launch";
+
+
+MesosContainerizerLaunch::Flags::Flags()
+{
+ add(&command,
+ "command",
+ "The command to execute.");
+
+ add(&directory,
+ "directory",
+ "The directory to chdir to.");
+
+ add(&user,
+ "user",
+ "The user to change to.");
+
+ add(&pipe_read,
+ "pipe_read",
+ "The read end of the control pipe.");
+
+ add(&pipe_write,
+ "pipe_write",
+ "The write end of the control pipe.");
+
+ add(&commands,
+ "commands",
+ "The additional preparation commands to execute before\n"
+ "executing the command.");
+}
+
+
+int MesosContainerizerLaunch::execute()
+{
+ // Check command line flags.
+ if (flags.command.isNone()) {
+ cerr << "Flag --command is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.directory.isNone()) {
+ cerr << "Flag --directory is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.pipe_read.isNone()) {
+ cerr << "Flag --pipe_read is not specified" << endl;
+ return 1;
+ }
+
+ if (flags.pipe_write.isNone()) {
+ cerr << "Flag --pipe_write is not specified" << endl;
+ return 1;
+ }
+
+ // Parse the command.
+ Try<CommandInfo> command =
+ ::protobuf::parse<CommandInfo>(flags.command.get());
+
+ if (command.isError()) {
+ cerr << "Failed to parse the command: " << command.error() << endl;
+ return 1;
+ }
+
+ Try<Nothing> close = os::close(flags.pipe_write.get());
+ if (close.isError()) {
+ cerr << "Failed to close pipe[1]: " << close.error() << endl;
+ return 1;
+ }
+
+ // Do a blocking read on the pipe until the parent signals us to continue.
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(
+ flags.pipe_read.get(),
+ &dummy,
+ sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
+ // There's a reasonable probability this will occur during slave
+ // restarts across a large/busy cluster.
+ cerr << "Failed to synchronize with slave (it's probably exited)" << endl;
+ return 1;
+ }
+
+ close = os::close(flags.pipe_read.get());
+ if (close.isError()) {
+ cerr << "Failed to close pipe[0]: " << close.error() << endl;
+ return 1;
+ }
+
+ // Run additional preparation commands. These are run as the same
+ // user and with the environment as the slave.
+ if (flags.commands.isSome()) {
+ // TODO(jieyu): Use JSON::Array if we have generic parse support.
+ JSON::Object object = flags.commands.get();
+ if (object.values.count("commands") == 0) {
+ cerr << "Invalid JSON format for flag --commands" << endl;
+ return 1;
+ }
+
+ if (!object.values["commands"].is<JSON::Array>()) {
+ cerr << "Invalid JSON format for flag --commands" << endl;
+ return 1;
+ }
+
+ JSON::Array array = object.values["commands"].as<JSON::Array>();
+ foreach (const JSON::Value& value, array.values) {
+ if (!value.is<JSON::Object>()) {
+ cerr << "Invalid JSON format for flag --commands" << endl;
+ return 1;
+ }
+
+ Try<CommandInfo> parse = ::protobuf::parse<CommandInfo>(value);
+ if (parse.isError()) {
+ cerr << "Failed to parse a preparation command: "
+ << parse.error() << endl;
+ return 1;
+ }
+
+ // Block until the command completes.
+ int status = os::system(parse.get().value());
+ if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
+ cerr << "Failed to execute a preparation command" << endl;
+ return 1;
+ }
+ }
+ }
+
+ // Enter working directory.
+ if (!os::chdir(flags.directory.get())) {
+ cerr << "Failed to chdir into work directory "
+ << flags.directory.get() << endl;
+ return 1;
+ }
+
+ // Change user if provided. Note that we do that after executing the
+ // preparation commands so that those commands will be run with the
+ // same privilege as the mesos-slave.
+ if (flags.user.isSome() && !os::su(flags.user.get())) {
+ cerr << "Failed to change user to " << flags.user.get() << endl;
+ return 1;
+ }
+
+ // Relay the environment variables.
+ // TODO(jieyu): Consider using a clean environment.
+ map<string, string> env;
+ os::ExecEnv envp(env);
+
+ // Execute the command (via '/bin/sh -c command') with its environment.
+ execle(
+ "/bin/sh",
+ "sh",
+ "-c",
+ command.get().value().c_str(),
+ (char*) NULL,
+ envp());
+
+ // If we get here, the execle call failed.
+ cerr << "Failed to execute command" << endl;
+
+ return UNREACHABLE();
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/launch.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/launch.hpp b/src/slave/containerizer/mesos/launch.hpp
new file mode 100644
index 0000000..7c8b535
--- /dev/null
+++ b/src/slave/containerizer/mesos/launch.hpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_CONTAINERIZER_LAUNCH_HPP__
+#define __MESOS_CONTAINERIZER_LAUNCH_HPP__
+
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+#include <stout/subcommand.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class MesosContainerizerLaunch : public Subcommand
+{
+public:
+ static const std::string NAME;
+
+ struct Flags : public flags::FlagsBase
+ {
+ Flags();
+
+ Option<JSON::Object> command;
+ Option<std::string> directory;
+ Option<std::string> user;
+ Option<int> pipe_read;
+ Option<int> pipe_write;
+ Option<JSON::Object> commands; // Additional preparation commands.
+ };
+
+ MesosContainerizerLaunch() : Subcommand(NAME) {}
+
+ Flags flags;
+
+protected:
+ virtual int execute();
+ virtual flags::FlagsBase* getFlags() { return &flags; }
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_CONTAINERIZER_LAUNCH_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/main.cpp b/src/slave/containerizer/mesos/main.cpp
new file mode 100644
index 0000000..0e17931
--- /dev/null
+++ b/src/slave/containerizer/mesos/main.cpp
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/none.hpp>
+#include <stout/subcommand.hpp>
+
+#include "slave/containerizer/mesos/launch.hpp"
+
+using namespace mesos::internal::slave;
+
+
+int main(int argc, char** argv)
+{
+ return Subcommand::dispatch(
+ None(),
+ argc,
+ argv,
+ new MesosContainerizerLaunch());
+}
[4/7] git commit: Added subcommand tests to stout tests.
Posted by ji...@apache.org.
Added subcommand tests to stout tests.
Review: https://reviews.apache.org/r/22764
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6b7e6572
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6b7e6572
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6b7e6572
Branch: refs/heads/master
Commit: 6b7e6572d27ec56847b55157fb000547707b87d5
Parents: 3279c40
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 25 14:34:12 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Jun 25 14:34:15 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/Makefile.am | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6b7e6572/3rdparty/libprocess/3rdparty/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/Makefile.am b/3rdparty/libprocess/3rdparty/Makefile.am
index 3359907..429c956 100644
--- a/3rdparty/libprocess/3rdparty/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/Makefile.am
@@ -156,6 +156,7 @@ stout_tests_SOURCES = \
$(STOUT)/tests/set_tests.cpp \
$(STOUT)/tests/some_tests.cpp \
$(STOUT)/tests/strings_tests.cpp \
+ $(STOUT)/tests/subcommand_tests.cpp \
$(STOUT)/tests/thread_tests.cpp \
$(STOUT)/tests/uuid_tests.cpp