You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/25 23:37:48 UTC
[6/7] Refactored the mesos containerizer launcher to fix MESOS-1404.
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
deleted file mode 100644
index 8a109f4..0000000
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ /dev/null
@@ -1,1174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sstream>
-
-#include <process/collect.hpp>
-#include <process/defer.hpp>
-#include <process/io.hpp>
-#include <process/reap.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/fatal.hpp>
-#include <stout/os.hpp>
-#include <stout/unreachable.hpp>
-
-#include <stout/os/execenv.hpp>
-
-#include "slave/paths.hpp"
-#include "slave/slave.hpp"
-
-#ifdef __linux__
-#include "slave/containerizer/linux_launcher.hpp"
-#endif // __linux__
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/isolator.hpp"
-#include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
-
-#include "slave/containerizer/isolators/posix.hpp"
-#ifdef __linux__
-#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
-#include "slave/containerizer/isolators/cgroups/mem.hpp"
-#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
-#endif // __linux__
-
-using std::list;
-using std::map;
-using std::string;
-using std::vector;
-
-using namespace process;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-using state::SlaveState;
-using state::FrameworkState;
-using state::ExecutorState;
-using state::RunState;
-
-// Local function declaration/definitions.
-Future<Nothing> _nothing() { return Nothing(); }
-
-
-// Helper method to build the environment map used to launch fetcher.
-map<string, string> fetcherEnvironment(
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const Flags& flags)
-{
- // Prepare the environment variables to pass to mesos-fetcher.
- string uris = "";
- foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
- uris += uri.value() + "+" +
- (uri.has_executable() && uri.executable() ? "1" : "0") +
- (uri.extract() ? "X" : "N");
- uris += " ";
- }
- // Remove extra space at the end.
- uris = strings::trim(uris);
-
- map<string, string> environment;
- environment["MESOS_EXECUTOR_URIS"] = uris;
- environment["MESOS_WORK_DIRECTORY"] = directory;
- if (user.isSome()) {
- environment["MESOS_USER"] = user.get();
- }
- if (!flags.frameworks_home.empty()) {
- environment["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
- }
- if (!flags.hadoop_home.empty()) {
- environment["HADOOP_HOME"] = flags.hadoop_home;
- }
-
- return environment;
-}
-
-
-Try<MesosContainerizer*> MesosContainerizer::create(
- const Flags& flags,
- bool local)
-{
- string isolation;
- if (flags.isolation == "process") {
- LOG(WARNING) << "The 'process' isolation flag is deprecated, "
- << "please update your flags to"
- << " '--isolation=posix/cpu,posix/mem'.";
- isolation = "posix/cpu,posix/mem";
- } else if (flags.isolation == "cgroups") {
- LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
- << "please update your flags to"
- << " '--isolation=cgroups/cpu,cgroups/mem'.";
- isolation = "cgroups/cpu,cgroups/mem";
- } else {
- isolation = flags.isolation;
- }
-
- LOG(INFO) << "Using isolation: " << isolation;
-
- // Create a MesosContainerizerProcess using isolators and a launcher.
- hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
-
- creators["posix/cpu"] = &PosixCpuIsolatorProcess::create;
- creators["posix/mem"] = &PosixMemIsolatorProcess::create;
-#ifdef __linux__
- creators["cgroups/cpu"] = &CgroupsCpushareIsolatorProcess::create;
- creators["cgroups/mem"] = &CgroupsMemIsolatorProcess::create;
- creators["cgroups/perf_event"] = &CgroupsPerfEventIsolatorProcess::create;
-#endif // __linux__
-
- vector<Owned<Isolator> > isolators;
-
- foreach (const string& type, strings::split(isolation, ",")) {
- if (creators.contains(type)) {
- Try<Isolator*> isolator = creators[type](flags);
- if (isolator.isError()) {
- return Error(
- "Could not create isolator " + type + ": " + isolator.error());
- } else {
- isolators.push_back(Owned<Isolator>(isolator.get()));
- }
- } else {
- return Error("Unknown or unsupported isolator: " + type);
- }
- }
-
-#ifdef __linux__
- // Use cgroups on Linux if any cgroups isolators are used.
- Try<Launcher*> launcher = strings::contains(isolation, "cgroups")
- ? LinuxLauncher::create(flags)
- : PosixLauncher::create(flags);
-#else
- Try<Launcher*> launcher = PosixLauncher::create(flags);
-#endif // __linux__
-
- if (launcher.isError()) {
- return Error("Failed to create launcher: " + launcher.error());
- }
-
- return new MesosContainerizer(
- flags, local, Owned<Launcher>(launcher.get()), isolators);
-}
-
-
-MesosContainerizer::MesosContainerizer(
- const Flags& flags,
- bool local,
- const Owned<Launcher>& launcher,
- const vector<Owned<Isolator> >& isolators)
-{
- process = new MesosContainerizerProcess(
- flags, local, launcher, isolators);
- spawn(process);
-}
-
-
-MesosContainerizer::~MesosContainerizer()
-{
- terminate(process);
- process::wait(process);
- delete process;
-}
-
-
-Future<Nothing> MesosContainerizer::recover(
- const Option<state::SlaveState>& state)
-{
- return dispatch(process, &MesosContainerizerProcess::recover, state);
-}
-
-
-Future<Nothing> MesosContainerizer::launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
-{
- return dispatch(process,
- &MesosContainerizerProcess::launch,
- containerId,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint);
-}
-
-
-Future<Nothing> MesosContainerizer::launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
-{
- return dispatch(process,
- &MesosContainerizerProcess::launch,
- containerId,
- taskInfo,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint);
-}
-
-
-Future<Nothing> MesosContainerizer::update(
- const ContainerID& containerId,
- const Resources& resources)
-{
- return dispatch(process,
- &MesosContainerizerProcess::update,
- containerId,
- resources);
-}
-
-
-Future<ResourceStatistics> MesosContainerizer::usage(
- const ContainerID& containerId)
-{
- return dispatch(process, &MesosContainerizerProcess::usage, containerId);
-}
-
-
-Future<containerizer::Termination> MesosContainerizer::wait(
- const ContainerID& containerId)
-{
- return dispatch(process, &MesosContainerizerProcess::wait, containerId);
-}
-
-
-void MesosContainerizer::destroy(const ContainerID& containerId)
-{
- dispatch(process, &MesosContainerizerProcess::destroy, containerId);
-}
-
-
-Future<hashset<ContainerID> > MesosContainerizer::containers()
-{
- return dispatch(process, &MesosContainerizerProcess::containers);
-}
-
-
-Future<Nothing> MesosContainerizerProcess::recover(
- const Option<state::SlaveState>& state)
-{
- LOG(INFO) << "Recovering containerizer";
-
- // Gather the executor run states that we will attempt to recover.
- list<RunState> recoverable;
- if (state.isSome()) {
- foreachvalue (const FrameworkState& framework, state.get().frameworks) {
- foreachvalue (const ExecutorState& executor, framework.executors) {
- if (executor.info.isNone()) {
- LOG(WARNING) << "Skipping recovery of executor '" << executor.id
- << "' of framework " << framework.id
- << " because its info could not be recovered";
- continue;
- }
-
- if (executor.latest.isNone()) {
- LOG(WARNING) << "Skipping recovery of executor '" << executor.id
- << "' of framework " << framework.id
- << " because its latest run could not be recovered";
- continue;
- }
-
- // We are only interested in the latest run of the executor!
- const ContainerID& containerId = executor.latest.get();
- Option<RunState> run = executor.runs.get(containerId);
- CHECK_SOME(run);
-
- // We need the pid so the reaper can monitor the executor so skip this
- // executor if it's not present. This is not an error because the slave
- // will try to wait on the container which will return a failed
- // Termination and everything will get cleaned up.
- if (!run.get().forkedPid.isSome()) {
- continue;
- }
-
- if (run.get().completed) {
- VLOG(1) << "Skipping recovery of executor '" << executor.id
- << "' of framework " << framework.id
- << " because its latest run "
- << containerId << " is completed";
- continue;
- }
-
- LOG(INFO) << "Recovering container '" << containerId
- << "' for executor '" << executor.id
- << "' of framework " << framework.id;
-
- recoverable.push_back(run.get());
- }
- }
- }
-
- // Try to recover the launcher first.
- return launcher->recover(recoverable)
- .then(defer(self(), &Self::_recover, recoverable));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::_recover(
- const list<RunState>& recoverable)
-{
- // Then recover the isolators.
- list<Future<Nothing> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->recover(recoverable));
- }
-
- // If all isolators recover then continue.
- return collect(futures)
- .then(defer(self(), &Self::__recover, recoverable));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::__recover(
- const list<RunState>& recovered)
-{
- foreach (const RunState& run, recovered) {
- CHECK_SOME(run.id);
- const ContainerID& containerId = run.id.get();
-
- Owned<Promise<containerizer::Termination> > promise(
- new Promise<containerizer::Termination>());
- promises.put(containerId, promise);
-
- CHECK_SOME(run.forkedPid);
- Future<Option<int > > status = process::reap(run.forkedPid.get());
- statuses[containerId] = status;
- status.onAny(defer(self(), &Self::reaped, containerId));
-
- foreach (const Owned<Isolator>& isolator, isolators) {
- isolator->watch(containerId)
- .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
- }
- }
-
- return Nothing();
-}
-
-
-// This function is executed by the forked child and must remain
-// async-signal-safe.
-int execute(
- const CommandInfo& command,
- const string& directory,
- const os::ExecEnv& envp,
- uid_t uid,
- gid_t gid,
- bool redirectIO,
- int pipeRead,
- int pipeWrite,
- const list<Option<CommandInfo> >& commands)
-{
- if (close(pipeWrite) != 0) {
- ABORT("Failed to close pipe[1]");
- }
-
- // Do a blocking read on the pipe until the parent signals us to continue.
- char dummy;
- ssize_t length;
- while ((length = read(pipeRead, &dummy, sizeof(dummy))) == -1 &&
- errno == EINTR);
-
- if (length != sizeof(dummy)) {
- // This will occur if the slave terminates during executor launch.
- // There's a reasonable probability this will occur during slave restarts
- // across a large/busy cluster so we log and exit non-zero rather than
- // ABORT.
- const char* message =
- "Failed to synchronize with slave (it has probably exited)";
-
- while (write(STDERR_FILENO, message, strlen(message)) == -1 &&
- errno == EINTR);
-
- _exit(1);
- }
-
- if (close(pipeRead) != 0) {
- ABORT("Failed to close pipe[0]");
- }
-
- // Change gid and uid.
- if (setgid(gid) != 0) {
- ABORT("Failed to set gid");
- }
-
- if (setuid(uid) != 0) {
- ABORT("Failed to set uid");
- }
-
- // Enter working directory.
- if (chdir(directory.c_str()) != 0) {
- ABORT("Failed to chdir into work directory");
- }
-
- // Redirect output to files in working dir if required. We append because
- // others (e.g., mesos-fetcher) may have already logged to the files.
- // TODO(bmahler): It would be best if instead of closing stderr /
- // stdout and redirecting, we instead always output to stderr /
- // stdout. Also tee'ing their output into the work directory files
- // when redirection is desired.
- if (redirectIO) {
- int fd;
- while ((fd = open(
- "stdout",
- O_CREAT | O_WRONLY | O_APPEND,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) == -1 &&
- errno == EINTR);
- if (fd == -1) {
- ABORT("Failed to open stdout");
- }
-
- int status;
- while ((status = dup2(fd, STDOUT_FILENO)) == -1 && errno == EINTR);
- if (status == -1) {
- ABORT("Failed to dup2 for stdout");
- }
-
- if (close(fd) == -1) {
- ABORT("Failed to close stdout fd");
- }
-
- while ((fd = open(
- "stderr",
- O_CREAT | O_WRONLY | O_APPEND,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) == -1 &&
- errno == EINTR);
- if (fd == -1) {
- ABORT("Failed to open stderr");
- }
-
- while ((status = dup2(fd, STDERR_FILENO)) == -1 && errno == EINTR);
- if (status == -1) {
- ABORT("Failed to dup2 for stderr");
- }
-
- if (close(fd) == -1) {
- ABORT("Failed to close stderr fd");
- }
- }
-
- // Run additional preparation commands. These are run as the same user and
- // with the environment as the slave.
- // NOTE: os::system() is async-signal-safe.
- foreach (const Option<CommandInfo>& command, commands) {
- if (command.isSome()) {
- // Block until the command completes.
- int status = os::system(command.get().value());
-
- if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
- ABORT("Command '",
- command.get().value().c_str(),
- "' failed to execute successfully");
- }
- }
- }
-
- // Execute the command (via '/bin/sh -c command') with its environment.
- execle("/bin/sh", "sh", "-c", command.value().c_str(), (char*) NULL, envp());
-
- // If we get here, the execv call failed.
- ABORT("Failed to execute command");
-
- // This should not be reached.
- return -1;
-}
-
-
-// Launching an executor involves the following steps:
-// 1. Call prepare on each isolator.
-// 2. Fork the executor. The forked child is blocked from exec'ing until it has
-// been isolated.
-// 3. Isolate the executor. Call isolate with the pid for each isolator.
-// 4. Fetch the executor.
-// 4. Exec the executor. The forked child is signalled to continue. It will
-// first execute any preparation commands from isolators and then exec the
-// executor.
-Future<Nothing> MesosContainerizerProcess::launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
-{
- if (promises.contains(containerId)) {
- LOG(ERROR) << "Cannot start already running container '"
- << containerId << "'";
- return Failure("Container already started");
- }
-
- // TODO(tillt): The slave should expose which containerization
- // mechanisms it supports to avoid scheduling tasks that it cannot
- // run.
- const CommandInfo& command = executorInfo.command();
- if (command.has_container()) {
- // We return a Failure as this containerizer does not support
- // handling ContainerInfo. Users have to be made aware of this
- // lack of support to prevent confusion in the task configuration.
- return Failure("ContainerInfo is not supported");
- }
-
- Owned<Promise<containerizer::Termination> > promise(
- new Promise<containerizer::Termination>());
- promises.put(containerId, promise);
-
- // Store the resources for usage().
- resources.put(containerId, executorInfo.resources());
-
- LOG(INFO) << "Starting container '" << containerId
- << "' for executor '" << executorInfo.executor_id()
- << "' of framework '" << executorInfo.framework_id() << "'";
-
- return prepare(containerId, executorInfo, directory, user)
- .then(defer(self(),
- &Self::_launch,
- containerId,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint,
- lambda::_1))
- .onFailed(defer(self(),
- &Self::destroy,
- containerId));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::launch(
- const ContainerID& containerId,
- const TaskInfo&,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
-{
- return launch(
- containerId,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint);
-}
-
-Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user)
-{
- // Start preparing all isolators (in parallel) and gather any additional
- // preparation comands that must be run in the forked child before exec'ing
- // the executor.
- list<Future<Option<CommandInfo> > > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->prepare(containerId, executorInfo));
- }
-
- // Wait for all isolators to complete preparations.
- return collect(futures);
-}
-
-
-Future<Nothing> _fetch(
- const ContainerID& containerId,
- const string& directory,
- const Option<string>& user,
- const Option<int>& status)
-{
- if (status.isNone() || (status.get() != 0)) {
- return Failure("Failed to fetch URIs for container '" +
- stringify(containerId) + "': exit status " +
- (status.isNone() ? "none" : stringify(status.get())));
- }
-
- // Chown the work directory if a user is provided.
- if (user.isSome()) {
- Try<Nothing> chown = os::chown(user.get(), directory);
- if (chown.isError()) {
- return Failure("Failed to chown work directory");
- }
- }
-
- return Nothing();
-}
-
-
-Future<Nothing> MesosContainerizerProcess::fetch(
- const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const string& directory,
- const Option<string>& user)
-{
- // Determine path for mesos-fetcher.
- Result<string> realpath = os::realpath(
- path::join(flags.launcher_dir, "mesos-fetcher"));
-
- if (!realpath.isSome()) {
- LOG(ERROR) << "Failed to determine the canonical path "
- << "for the mesos-fetcher '"
- << path::join(flags.launcher_dir, "mesos-fetcher")
- << "': "
- << (realpath.isError() ? realpath.error()
- : "No such file or directory");
- return Failure("Could not fetch URIs: failed to find mesos-fetcher");
- }
-
- map<string, string> environment =
- fetcherEnvironment(commandInfo, directory, user, flags);
-
- // Now the actual mesos-fetcher command.
- string command = realpath.get();
-
- LOG(INFO) << "Fetching URIs for container '" << containerId
- << "' using command '" << command << "'";
-
- Try<Subprocess> fetcher = subprocess(
- command,
- Subprocess::PIPE(),
- Subprocess::PIPE(),
- Subprocess::PIPE(),
- environment);
-
- if (fetcher.isError()) {
- return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
- }
-
- // Redirect output (stdout and stderr) from the fetcher to log files
- // in the executor work directory, chown'ing them if a user is
- // specified.
- // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
- // TODO(tillt): Consider adding an overload to io::redirect
- // that accepts a file path as 'to' for further reducing code. We
- // would however also need an owner user parameter for such overload
- // to perfectly replace the below.
- Try<int> out = os::open(
- path::join(directory, "stdout"),
- O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
- if (out.isError()) {
- return Failure("Failed to redirect stdout: " + out.error());
- }
-
- if (user.isSome()) {
- Try<Nothing> chown = os::chown(
- user.get(), path::join(directory, "stdout"));
- if (chown.isError()) {
- os::close(out.get());
- return Failure(
- "Failed to redirect stdout: Failed to chown: " +
- chown.error());
- }
- }
-
- // Redirect takes care of nonblocking and close-on-exec for the
- // supplied file descriptors.
- io::redirect(fetcher.get().out().get(), out.get());
-
- // Redirect does 'dup' the file descriptor, hence we can close the
- // original now.
- os::close(out.get());
-
- // Repeat for stderr.
- Try<int> err = os::open(
- path::join(directory, "stderr"),
- O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
- if (err.isError()) {
- return Failure(
- "Failed to redirect stderr: Failed to open: " +
- err.error());
- }
-
- if (user.isSome()) {
- Try<Nothing> chown = os::chown(
- user.get(), path::join(directory, "stderr"));
- if (chown.isError()) {
- os::close(err.get());
- return Failure(
- "Failed to redirect stderr: Failed to chown: " +
- chown.error());
- }
- }
-
- io::redirect(fetcher.get().err().get(), err.get());
-
- os::close(err.get());
-
- return fetcher.get().status()
- .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::_launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint,
- const list<Option<CommandInfo> >& commands)
-{
- // Prepare environment variables for the executor.
- map<string, string> env = executorEnvironment(
- executorInfo,
- directory,
- slaveId,
- slavePid,
- checkpoint,
- flags.recovery_timeout);
-
- // Include any enviroment variables from CommandInfo.
- foreach (const Environment::Variable& variable,
- executorInfo.command().environment().variables()) {
- env[variable.name()] = variable.value();
- }
-
- // Construct a representation of the environment suitable for
- // passing to execle in the child. We construct it here because it
- // is not async-signal-safe.
- os::ExecEnv envp(env);
-
- // Use a pipe to block the child until it's been isolated.
- int pipes[2];
- // We assume this should not fail under reasonable conditions so we
- // use CHECK.
- CHECK(pipe(pipes) == 0);
-
- // Determine the uid and gid for the child now because getpwnam is
- // not async signal safe.
- Result<uid_t> uid = os::getuid(user);
- if (uid.isError() || uid.isNone()) {
- return Failure("Invalid user: " + (uid.isError() ? uid.error()
- : "nonexistent"));
- }
-
- Result<gid_t> gid = os::getgid(user);
- if (gid.isError() || gid.isNone()) {
- return Failure("Invalid user: " + (gid.isError() ? gid.error()
- : "nonexistent"));
- }
-
-
- // Prepare a function for the forked child to exec() the executor.
- lambda::function<int()> childFunction = lambda::bind(
- &execute,
- executorInfo.command(),
- directory,
- envp,
- uid.get(),
- gid.get(),
- !local,
- pipes[0],
- pipes[1],
- commands);
-
- Try<pid_t> forked = launcher->fork(containerId, childFunction);
-
- if (forked.isError()) {
- return Failure("Failed to fork executor: " + forked.error());
- }
- pid_t pid = forked.get();
-
- // Checkpoint the executor's pid if requested.
- if (checkpoint) {
- const string& path = slave::paths::getForkedPidPath(
- slave::paths::getMetaRootDir(flags.work_dir),
- slaveId,
- executorInfo.framework_id(),
- executorInfo.executor_id(),
- containerId);
-
- LOG(INFO) << "Checkpointing executor's forked pid " << pid
- << " to '" << path << "'";
-
- Try<Nothing> checkpointed =
- slave::state::checkpoint(path, stringify(pid));
-
- if (checkpointed.isError()) {
- LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
- << path << "': " << checkpointed.error();
-
- return Failure("Could not checkpoint executor's pid");
- }
- }
-
- // Monitor the executor's pid. We keep the future because we'll refer to it
- // again during container destroy.
- Future<Option<int> > status = process::reap(pid);
- statuses.put(containerId, status);
- status.onAny(defer(self(), &Self::reaped, containerId));
-
- return isolate(containerId, pid)
- .then(defer(self(),
- &Self::fetch,
- containerId,
- executorInfo.command(),
- directory,
- user))
- .then(defer(self(), &Self::exec, containerId, pipes[1]))
- .onAny(lambda::bind(&os::close, pipes[0]))
- .onAny(lambda::bind(&os::close, pipes[1]));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::isolate(
- const ContainerID& containerId,
- pid_t _pid)
-{
- // Set up callbacks for isolator limitations.
- foreach (const Owned<Isolator>& isolator, isolators) {
- isolator->watch(containerId)
- .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
- }
-
- // Isolate the executor with each isolator.
- list<Future<Nothing> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->isolate(containerId, _pid));
- }
-
- // Wait for all isolators to complete.
- return collect(futures)
- .then(lambda::bind(&_nothing));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::exec(
- const ContainerID& containerId,
- int pipeWrite)
-{
- CHECK(promises.contains(containerId));
-
- // Now that we've contained the child we can signal it to continue by
- // writing to the pipe.
- char dummy;
- ssize_t length;
- while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
- errno == EINTR);
-
- if (length != sizeof(dummy)) {
- return Failure("Failed to synchronize child process: " +
- string(strerror(errno)));
- }
-
- return Nothing();
-}
-
-
-Future<containerizer::Termination> MesosContainerizerProcess::wait(
- const ContainerID& containerId)
-{
- if (!promises.contains(containerId)) {
- return Failure("Unknown container: " + stringify(containerId));
- }
-
- return promises[containerId]->future();
-}
-
-
-Future<Nothing> MesosContainerizerProcess::update(
- const ContainerID& containerId,
- const Resources& _resources)
-{
- // The resources hashmap won't initially contain the container's resources
- // after recovery so we must check the promises hashmap (which is set during
- // recovery).
- if (!promises.contains(containerId)) {
- // It is not considered a failure if the container is not known
- // because the slave will attempt to update the container's
- // resources on a task's terminal state change but the executor
- // may have already exited and the container cleaned up.
- LOG(WARNING) << "Ignoring update for unknown container: " << containerId;
- return Nothing();
- }
-
- // Store the resources for usage().
- resources.put(containerId, _resources);
-
- // Update each isolator.
- list<Future<Nothing> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->update(containerId, _resources));
- }
-
- // Wait for all isolators to complete.
- return collect(futures)
- .then(lambda::bind(&_nothing));
-}
-
-
-// Resources are used to set the limit fields in the statistics but are
-// optional because they aren't known after recovery until/unless update() is
-// called.
-Future<ResourceStatistics> _usage(
- const ContainerID& containerId,
- const Option<Resources>& resources,
- const list<Future<ResourceStatistics> >& statistics)
-{
- ResourceStatistics result;
-
- // Set the timestamp now we have all statistics.
- result.set_timestamp(Clock::now().secs());
-
- foreach (const Future<ResourceStatistics>& statistic, statistics) {
- if (statistic.isReady()) {
- result.MergeFrom(statistic.get());
- } else {
- LOG(WARNING) << "Skipping resource statistic for container "
- << containerId << " because: "
- << (statistic.isFailed() ? statistic.failure()
- : "discarded");
- }
- }
-
- if (resources.isSome()) {
- // Set the resource allocations.
- Option<Bytes> mem = resources.get().mem();
- if (mem.isSome()) {
- result.set_mem_limit_bytes(mem.get().bytes());
- }
-
- Option<double> cpus = resources.get().cpus();
- if (cpus.isSome()) {
- result.set_cpus_limit(cpus.get());
- }
- }
-
- return result;
-}
-
-
-Future<ResourceStatistics> MesosContainerizerProcess::usage(
- const ContainerID& containerId)
-{
- if (!promises.contains(containerId)) {
- return Failure("Unknown container: " + stringify(containerId));
- }
-
- list<Future<ResourceStatistics> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->usage(containerId));
- }
-
- // Use await() here so we can return partial usage statistics.
- // TODO(idownes): After recovery resources won't be known until after an
- // update() because they aren't part of the SlaveState.
- return await(futures)
- .then(lambda::bind(
- _usage, containerId, resources.get(containerId), lambda::_1));
-}
-
-
-void MesosContainerizerProcess::destroy(const ContainerID& containerId)
-{
- if (!promises.contains(containerId)) {
- LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
- return;
- }
-
- if (destroying.contains(containerId)) {
- // Destroy has already been initiated.
- return;
- }
- destroying.insert(containerId);
-
- LOG(INFO) << "Destroying container '" << containerId << "'";
-
- if (statuses.contains(containerId)) {
- // Kill all processes then continue destruction.
- launcher->destroy(containerId)
- .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
- } else {
- // The executor never forked so no processes to kill, go straight to
- // __destroy() with status = None().
- __destroy(containerId, None());
- }
-}
-
-
-void MesosContainerizerProcess::_destroy(
- const ContainerID& containerId,
- const Future<Nothing>& future)
-{
- // Something has gone wrong and the launcher wasn't able to kill all the
- // processes in the container. We cannot clean up the isolators because they
- // may require that all processes have exited so just return the failure to
- // the slave.
- // TODO(idownes): This is a pretty bad state to be in but we should consider
- // cleaning up here.
- if (!future.isReady()) {
- promises[containerId]->fail(
- "Failed to destroy container: " +
- (future.isFailed() ? future.failure() : "discarded future"));
-
- destroying.erase(containerId);
-
- return;
- }
-
- // We've successfully killed all processes in the container so get the exit
- // status of the executor when it's ready (it may already be) and continue
- // the destroy.
- statuses.get(containerId).get()
- .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
-}
-
-
-void MesosContainerizerProcess::__destroy(
- const ContainerID& containerId,
- const Future<Option<int > >& status)
-{
- // Now that all processes have exited we can now clean up all isolators.
- list<Future<Nothing> > futures;
- foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->cleanup(containerId));
- }
-
- // Wait for all isolators to complete cleanup before continuing.
- collect(futures)
- .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
-}
-
-
-void MesosContainerizerProcess::___destroy(
- const ContainerID& containerId,
- const Future<Option<int > >& status,
- const Future<list<Nothing> >& futures)
-{
- // Something has gone wrong with one of the Isolators and cleanup failed.
- // We'll fail the container termination and remove the 'destroying' flag but
- // leave all other state. The containerizer is now in a bad state because
- // at least one isolator has failed to clean up.
- if (!futures.isReady()) {
- promises[containerId]->fail(
- "Failed to clean up isolators when destroying container: " +
- (futures.isFailed() ? futures.failure() : "discarded future"));
-
- destroying.erase(containerId);
-
- return;
- }
-
- // A container is 'killed' if any isolator limited it.
- // Note: We may not see a limitation in time for it to be registered. This
- // could occur if the limitation (e.g., an OOM) killed the executor and we
- // triggered destroy() off the executor exit.
- bool killed = false;
- string message;
- if (limitations.contains(containerId)) {
- killed = true;
- foreach (const Limitation& limitation, limitations.get(containerId)) {
- message += limitation.message;
- }
- message = strings::trim(message);
- } else {
- message = "Executor terminated";
- }
-
- containerizer::Termination termination;
- termination.set_killed(killed);
- termination.set_message(message);
- if (status.isReady() && status.get().isSome()) {
- termination.set_status(status.get().get());
- }
-
- promises[containerId]->set(termination);
-
- promises.erase(containerId);
- statuses.erase(containerId);
- limitations.erase(containerId);
- resources.erase(containerId);
- destroying.erase(containerId);
-}
-
-
-void MesosContainerizerProcess::reaped(const ContainerID& containerId)
-{
- if (!promises.contains(containerId)) {
- return;
- }
-
- LOG(INFO) << "Executor for container '" << containerId << "' has exited";
-
- // The executor has exited so destroy the container.
- destroy(containerId);
-}
-
-
-void MesosContainerizerProcess::limited(
- const ContainerID& containerId,
- const Future<Limitation>& future)
-{
- if (!promises.contains(containerId)) {
- return;
- }
-
- if (future.isReady()) {
- LOG(INFO) << "Container " << containerId << " has reached its limit for"
- << " resource " << future.get().resource
- << " and will be terminated";
- limitations.put(containerId, future.get());
- } else {
- // TODO(idownes): A discarded future will not be an error when isolators
- // discard their promises after cleanup.
- LOG(ERROR) << "Error in a resource limitation for container "
- << containerId << ": " << (future.isFailed() ? future.failure()
- : "discarded");
- }
-
- // The container has been affected by the limitation so destroy it.
- destroy(containerId);
-}
-
-
-Future<hashset<ContainerID> > MesosContainerizerProcess::containers()
-{
- return promises.keys();
-}
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.hpp b/src/slave/containerizer/mesos_containerizer.hpp
deleted file mode 100644
index 21affae..0000000
--- a/src/slave/containerizer/mesos_containerizer.hpp
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MESOS_CONTAINERIZER_HPP__
-#define __MESOS_CONTAINERIZER_HPP__
-
-#include <list>
-#include <vector>
-
-#include <stout/hashmap.hpp>
-#include <stout/lambda.hpp>
-#include <stout/multihashmap.hpp>
-
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/isolator.hpp"
-#include "slave/containerizer/launcher.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declaration.
-class MesosContainerizerProcess;
-
-class MesosContainerizer : public Containerizer
-{
-public:
- static Try<MesosContainerizer*> create(const Flags& flags, bool local);
-
- MesosContainerizer(
- const Flags& flags,
- bool local,
- const process::Owned<Launcher>& launcher,
- const std::vector<process::Owned<Isolator> >& isolators);
-
- virtual ~MesosContainerizer();
-
- virtual process::Future<Nothing> recover(
- const Option<state::SlaveState>& state);
-
- virtual process::Future<Nothing> launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint);
-
- virtual process::Future<Nothing> launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint);
-
- virtual process::Future<Nothing> update(
- const ContainerID& containerId,
- const Resources& resources);
-
- virtual process::Future<ResourceStatistics> usage(
- const ContainerID& containerId);
-
- virtual process::Future<containerizer::Termination> wait(
- const ContainerID& containerId);
-
- virtual void destroy(const ContainerID& containerId);
-
- virtual process::Future<hashset<ContainerID> > containers();
-
-private:
- MesosContainerizerProcess* process;
-};
-
-
-class MesosContainerizerProcess
- : public process::Process<MesosContainerizerProcess>
-{
-public:
- MesosContainerizerProcess(
- const Flags& _flags,
- bool _local,
- const process::Owned<Launcher>& _launcher,
- const std::vector<process::Owned<Isolator> >& _isolators)
- : flags(_flags),
- local(_local),
- launcher(_launcher),
- isolators(_isolators) {}
-
- virtual ~MesosContainerizerProcess() {}
-
- process::Future<Nothing> recover(
- const Option<state::SlaveState>& state);
-
- process::Future<Nothing> launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint);
-
- process::Future<Nothing> launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint);
-
- process::Future<Nothing> update(
- const ContainerID& containerId,
- const Resources& resources);
-
- process::Future<ResourceStatistics> usage(
- const ContainerID& containerId);
-
- process::Future<containerizer::Termination> wait(
- const ContainerID& containerId);
-
- void destroy(const ContainerID& containerId);
-
- process::Future<hashset<ContainerID> > containers();
-
-private:
- process::Future<Nothing> _recover(
- const std::list<state::RunState>& recoverable);
-
- process::Future<Nothing> __recover(
- const std::list<state::RunState>& recovered);
-
- process::Future<std::list<Option<CommandInfo> > > prepare(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user);
-
- process::Future<Nothing> fetch(
- const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const std::string& directory,
- const Option<std::string>& user);
-
- process::Future<Nothing> _launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const process::PID<Slave>& slavePid,
- bool checkpoint,
- const std::list<Option<CommandInfo> >& scripts);
-
- process::Future<Nothing> isolate(
- const ContainerID& containerId,
- pid_t _pid);
-
- process::Future<Nothing> exec(
- const ContainerID& containerId,
- int pipeWrite);
-
- // Continues 'destroy()' once all processes have been killed by the launcher.
- void _destroy(
- const ContainerID& containerId,
- const process::Future<Nothing>& future);
-
- // Continues '_destroy()' once we get the exit status of the executor.
- void __destroy(
- const ContainerID& containerId,
- const process::Future<Option<int > >& status);
-
- // Continues (and completes) '__destroy()' once all isolators have completed
- // cleanup.
- void ___destroy(
- const ContainerID& containerId,
- const process::Future<Option<int > >& status,
- const process::Future<std::list<Nothing> >& futures);
-
- // Call back for when an isolator limits a container and impacts the
- // processes. This will trigger container destruction.
- void limited(
- const ContainerID& containerId,
- const process::Future<Limitation>& future);
-
- // Call back for when the executor exits. This will trigger container
- // destroy.
- void reaped(const ContainerID& containerId);
-
- const Flags flags;
- const bool local;
- const process::Owned<Launcher> launcher;
- const std::vector<process::Owned<Isolator> > isolators;
-
- // TODO(idownes): Consider putting these per-container variables into a
- // struct.
- // Promises for futures returned from wait().
- hashmap<ContainerID,
- process::Owned<process::Promise<containerizer::Termination> > > promises;
-
- // We need to keep track of the future exit status for each executor because
- // we'll only get a single notification when the executor exits.
- hashmap<ContainerID, process::Future<Option<int> > > statuses;
-
- // We keep track of any limitations received from each isolator so we can
- // determine the cause of an executor termination.
- multihashmap<ContainerID, Limitation> limitations;
-
- // We keep track of the resources for each container so we can set the
- // ResourceStatistics limits in usage().
- hashmap<ContainerID, Resources> resources;
-
- // Set of containers that are in process of being destroyed.
- hashset<ContainerID> destroying;
-};
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __MESOS_CONTAINERIZER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/cgroups_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cgroups_isolator_tests.cpp b/src/tests/cgroups_isolator_tests.cpp
index 5a9704d..2d9738f 100644
--- a/src/tests/cgroups_isolator_tests.cpp
+++ b/src/tests/cgroups_isolator_tests.cpp
@@ -25,7 +25,7 @@
#include <stout/proc.hpp>
#include <stout/stringify.hpp>
-#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/script.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 8ea7974..70e1245 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -31,7 +31,8 @@
#include "slave/containerizer/isolator.hpp"
#include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/isolator.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 0bbec09..5a141e3 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -29,6 +29,7 @@
#include <process/owned.hpp>
#include <process/reap.hpp>
+#include <stout/abort.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
@@ -84,23 +85,24 @@ using testing::Return;
using testing::SaveArg;
-int execute(const std::string& command, int pipes[2])
+static int childSetup(int pipes[2])
{
- // In child process
- ::close(pipes[1]);
+ // In child process.
+ while (::close(pipes[1]) == -1 && errno == EINTR);
// Wait until the parent signals us to continue.
- int buf;
- while (::read(pipes[0], &buf, sizeof(buf)) == -1 && errno == EINTR);
- ::close(pipes[0]);
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
- execl("/bin/sh", "sh", "-c", command.c_str(), (char*) NULL);
+ if (length != sizeof(dummy)) {
+ ABORT("Failed to synchronize with parent");
+ }
- const char* message = "Child failed to exec";
- while (write(STDERR_FILENO, message, strlen(message)) == -1 &&
- errno == EINTR);
+ while (::close(pipes[0]) == -1 && errno == EINTR);
- _exit(1);
+ return 0;
}
@@ -147,24 +149,38 @@ TYPED_TEST(CpuIsolatorTest, UserCpuUsage)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+ vector<string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = command;
+
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&childSetup, pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Reap the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait for the command to start.
while (!os::exists(file));
@@ -238,24 +254,38 @@ TYPED_TEST(CpuIsolatorTest, SystemCpuUsage)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+ vector<string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = command;
+
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&childSetup, pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Reap the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait for the command to start.
while (!os::exists(file));
@@ -335,24 +365,38 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+ vector<string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = command;
+
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&childSetup, pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Reap the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait for the command to complete.
AWAIT_READY(status);
@@ -413,24 +457,38 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs_Big_Quota)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(&execute, "exit 0", pipes);
+ vector<string> argv(3);
+ argv[0] = "sh";
+ argv[1] = "-c";
+ argv[2] = "exit 0";
+
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ argv,
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&childSetup, pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Reap the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait for the command to complete successfully.
AWAIT_READY(status);
@@ -466,13 +524,20 @@ TYPED_TEST_CASE(MemIsolatorTest, MemIsolatorTypes);
// posix_memalign, mlock, memset and perror are not safe.
int consumeMemory(const Bytes& _size, const Duration& duration, int pipes[2])
{
- // In child process
- ::close(pipes[1]);
+ // In child process.
+ while (::close(pipes[1]) == -1 && errno == EINTR);
- int buf;
// Wait until the parent signals us to continue.
- while (::read(pipes[0], &buf, sizeof(buf)) == -1 && errno == EINTR);
- ::close(pipes[0]);
+ char dummy;
+ ssize_t length;
+ while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+ errno == EINTR);
+
+ if (length != sizeof(dummy)) {
+ ABORT("Failed to synchronize with parent");
+ }
+
+ while (::close(pipes[0]) == -1 && errno == EINTR);
size_t size = static_cast<size_t>(_size.bytes());
void* buffer = NULL;
@@ -522,28 +587,33 @@ TYPED_TEST(MemIsolatorTest, MemUsage)
int pipes[2];
ASSERT_NE(-1, ::pipe(pipes));
- lambda::function<int()> inChild = lambda::bind(
- &consumeMemory,
- Megabytes(256),
- Seconds(10),
- pipes);
+ Try<pid_t> pid = launcher.get()->fork(
+ containerId,
+ "/bin/sh",
+ vector<string>(),
+ Subprocess::FD(STDIN_FILENO),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ None(),
+ None(),
+ lambda::bind(&consumeMemory, Megabytes(256), Seconds(10), pipes));
- Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
ASSERT_SOME(pid);
// Set up the reaper to wait on the forked child.
Future<Option<int> > status = process::reap(pid.get());
// Continue in the parent.
- ::close(pipes[0]);
+ ASSERT_SOME(os::close(pipes[0]));
// Isolate the forked child.
AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
// Now signal the child to continue.
- int buf;
- ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
- ::close(pipes[1]);
+ char dummy;
+ ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+ ASSERT_SOME(os::close(pipes[1]));
// Wait up to 5 seconds for the child process to consume 256 MB of memory;
ResourceStatistics statistics;
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 885e0fb..c498d33 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -50,7 +50,7 @@
#include "slave/flags.hpp"
#include "slave/slave.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 24af32b..216bd6f 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -34,7 +34,8 @@
#endif
#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
#include "tests/containerizer.hpp"
#include "tests/environment.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index eeb8631..ae38a13 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -50,10 +50,12 @@
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
#include "slave/slave.hpp"
+#include "slave/containerizer/containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+
#include "tests/cluster.hpp"
#include "tests/utils.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 873f22d..371a5b8 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -43,11 +43,12 @@
#include "master/master.hpp"
#include "slave/constants.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
#include "slave/gc.hpp"
#include "slave/flags.hpp"
#include "slave/slave.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
+
#include "tests/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"