You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/06/25 23:37:48 UTC

[6/7] Refactored the mesos containerizer launcher to fix MESOS-1404.

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.cpp b/src/slave/containerizer/mesos_containerizer.cpp
deleted file mode 100644
index 8a109f4..0000000
--- a/src/slave/containerizer/mesos_containerizer.cpp
+++ /dev/null
@@ -1,1174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sstream>
-
-#include <process/collect.hpp>
-#include <process/defer.hpp>
-#include <process/io.hpp>
-#include <process/reap.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/fatal.hpp>
-#include <stout/os.hpp>
-#include <stout/unreachable.hpp>
-
-#include <stout/os/execenv.hpp>
-
-#include "slave/paths.hpp"
-#include "slave/slave.hpp"
-
-#ifdef __linux__
-#include "slave/containerizer/linux_launcher.hpp"
-#endif // __linux__
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/isolator.hpp"
-#include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
-
-#include "slave/containerizer/isolators/posix.hpp"
-#ifdef __linux__
-#include "slave/containerizer/isolators/cgroups/cpushare.hpp"
-#include "slave/containerizer/isolators/cgroups/mem.hpp"
-#include "slave/containerizer/isolators/cgroups/perf_event.hpp"
-#endif // __linux__
-
-using std::list;
-using std::map;
-using std::string;
-using std::vector;
-
-using namespace process;
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-using state::SlaveState;
-using state::FrameworkState;
-using state::ExecutorState;
-using state::RunState;
-
-// Local function declaration/definitions.
-Future<Nothing> _nothing() { return Nothing(); }
-
-
-// Helper method to build the environment map used to launch fetcher.
-map<string, string> fetcherEnvironment(
-    const CommandInfo& commandInfo,
-    const std::string& directory,
-    const Option<std::string>& user,
-    const Flags& flags)
-{
-  // Prepare the environment variables to pass to mesos-fetcher.
-  string uris = "";
-  foreach (const CommandInfo::URI& uri, commandInfo.uris()) {
-    uris += uri.value() + "+" +
-            (uri.has_executable() && uri.executable() ? "1" : "0") +
-            (uri.extract() ? "X" : "N");
-    uris += " ";
-  }
-  // Remove extra space at the end.
-  uris = strings::trim(uris);
-
-  map<string, string> environment;
-  environment["MESOS_EXECUTOR_URIS"] = uris;
-  environment["MESOS_WORK_DIRECTORY"] = directory;
-  if (user.isSome()) {
-    environment["MESOS_USER"] = user.get();
-  }
-  if (!flags.frameworks_home.empty()) {
-    environment["MESOS_FRAMEWORKS_HOME"] = flags.frameworks_home;
-  }
-  if (!flags.hadoop_home.empty()) {
-    environment["HADOOP_HOME"] = flags.hadoop_home;
-  }
-
-  return environment;
-}
-
-
-Try<MesosContainerizer*> MesosContainerizer::create(
-    const Flags& flags,
-    bool local)
-{
-  string isolation;
-  if (flags.isolation == "process") {
-    LOG(WARNING) << "The 'process' isolation flag is deprecated, "
-                 << "please update your flags to"
-                 << " '--isolation=posix/cpu,posix/mem'.";
-    isolation = "posix/cpu,posix/mem";
-  } else if (flags.isolation == "cgroups") {
-    LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
-                 << "please update your flags to"
-                 << " '--isolation=cgroups/cpu,cgroups/mem'.";
-    isolation = "cgroups/cpu,cgroups/mem";
-  } else {
-    isolation = flags.isolation;
-  }
-
-  LOG(INFO) << "Using isolation: " << isolation;
-
-  // Create a MesosContainerizerProcess using isolators and a launcher.
-  hashmap<std::string, Try<Isolator*> (*)(const Flags&)> creators;
-
-  creators["posix/cpu"]   = &PosixCpuIsolatorProcess::create;
-  creators["posix/mem"]   = &PosixMemIsolatorProcess::create;
-#ifdef __linux__
-  creators["cgroups/cpu"] = &CgroupsCpushareIsolatorProcess::create;
-  creators["cgroups/mem"] = &CgroupsMemIsolatorProcess::create;
-  creators["cgroups/perf_event"] = &CgroupsPerfEventIsolatorProcess::create;
-#endif // __linux__
-
-  vector<Owned<Isolator> > isolators;
-
-  foreach (const string& type, strings::split(isolation, ",")) {
-    if (creators.contains(type)) {
-      Try<Isolator*> isolator = creators[type](flags);
-      if (isolator.isError()) {
-        return Error(
-            "Could not create isolator " + type + ": " + isolator.error());
-      } else {
-        isolators.push_back(Owned<Isolator>(isolator.get()));
-      }
-    } else {
-      return Error("Unknown or unsupported isolator: " + type);
-    }
-  }
-
-#ifdef __linux__
-  // Use cgroups on Linux if any cgroups isolators are used.
-  Try<Launcher*> launcher = strings::contains(isolation, "cgroups")
-    ? LinuxLauncher::create(flags)
-    : PosixLauncher::create(flags);
-#else
-  Try<Launcher*> launcher = PosixLauncher::create(flags);
-#endif // __linux__
-
-  if (launcher.isError()) {
-    return Error("Failed to create launcher: " + launcher.error());
-  }
-
-  return new MesosContainerizer(
-      flags, local, Owned<Launcher>(launcher.get()), isolators);
-}
-
-
-MesosContainerizer::MesosContainerizer(
-    const Flags& flags,
-    bool local,
-    const Owned<Launcher>& launcher,
-    const vector<Owned<Isolator> >& isolators)
-{
-  process = new MesosContainerizerProcess(
-      flags, local, launcher, isolators);
-  spawn(process);
-}
-
-
-MesosContainerizer::~MesosContainerizer()
-{
-  terminate(process);
-  process::wait(process);
-  delete process;
-}
-
-
-Future<Nothing> MesosContainerizer::recover(
-    const Option<state::SlaveState>& state)
-{
-  return dispatch(process, &MesosContainerizerProcess::recover, state);
-}
-
-
-Future<Nothing> MesosContainerizer::launch(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
-{
-  return dispatch(process,
-                  &MesosContainerizerProcess::launch,
-                  containerId,
-                  executorInfo,
-                  directory,
-                  user,
-                  slaveId,
-                  slavePid,
-                  checkpoint);
-}
-
-
-Future<Nothing> MesosContainerizer::launch(
-    const ContainerID& containerId,
-    const TaskInfo& taskInfo,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
-{
-  return dispatch(process,
-                  &MesosContainerizerProcess::launch,
-                  containerId,
-                  taskInfo,
-                  executorInfo,
-                  directory,
-                  user,
-                  slaveId,
-                  slavePid,
-                  checkpoint);
-}
-
-
-Future<Nothing> MesosContainerizer::update(
-    const ContainerID& containerId,
-    const Resources& resources)
-{
-  return dispatch(process,
-                  &MesosContainerizerProcess::update,
-                  containerId,
-                  resources);
-}
-
-
-Future<ResourceStatistics> MesosContainerizer::usage(
-    const ContainerID& containerId)
-{
-  return dispatch(process, &MesosContainerizerProcess::usage, containerId);
-}
-
-
-Future<containerizer::Termination> MesosContainerizer::wait(
-    const ContainerID& containerId)
-{
-  return dispatch(process, &MesosContainerizerProcess::wait, containerId);
-}
-
-
-void MesosContainerizer::destroy(const ContainerID& containerId)
-{
-  dispatch(process, &MesosContainerizerProcess::destroy, containerId);
-}
-
-
-Future<hashset<ContainerID> > MesosContainerizer::containers()
-{
-  return dispatch(process, &MesosContainerizerProcess::containers);
-}
-
-
-Future<Nothing> MesosContainerizerProcess::recover(
-    const Option<state::SlaveState>& state)
-{
-  LOG(INFO) << "Recovering containerizer";
-
-  // Gather the executor run states that we will attempt to recover.
-  list<RunState> recoverable;
-  if (state.isSome()) {
-    foreachvalue (const FrameworkState& framework, state.get().frameworks) {
-      foreachvalue (const ExecutorState& executor, framework.executors) {
-        if (executor.info.isNone()) {
-          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                       << "' of framework " << framework.id
-                       << " because its info could not be recovered";
-          continue;
-        }
-
-        if (executor.latest.isNone()) {
-          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                       << "' of framework " << framework.id
-                       << " because its latest run could not be recovered";
-          continue;
-        }
-
-        // We are only interested in the latest run of the executor!
-        const ContainerID& containerId = executor.latest.get();
-        Option<RunState> run = executor.runs.get(containerId);
-        CHECK_SOME(run);
-
-        // We need the pid so the reaper can monitor the executor so skip this
-        // executor if it's not present. This is not an error because the slave
-        // will try to wait on the container which will return a failed
-        // Termination and everything will get cleaned up.
-        if (!run.get().forkedPid.isSome()) {
-          continue;
-        }
-
-        if (run.get().completed) {
-          VLOG(1) << "Skipping recovery of executor '" << executor.id
-                  << "' of framework " << framework.id
-                  << " because its latest run "
-                  << containerId << " is completed";
-          continue;
-        }
-
-        LOG(INFO) << "Recovering container '" << containerId
-                  << "' for executor '" << executor.id
-                  << "' of framework " << framework.id;
-
-        recoverable.push_back(run.get());
-      }
-    }
-  }
-
-  // Try to recover the launcher first.
-  return launcher->recover(recoverable)
-    .then(defer(self(), &Self::_recover, recoverable));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::_recover(
-    const list<RunState>& recoverable)
-{
-  // Then recover the isolators.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->recover(recoverable));
-  }
-
-  // If all isolators recover then continue.
-  return collect(futures)
-    .then(defer(self(), &Self::__recover, recoverable));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::__recover(
-    const list<RunState>& recovered)
-{
-  foreach (const RunState& run, recovered) {
-    CHECK_SOME(run.id);
-    const ContainerID& containerId = run.id.get();
-
-    Owned<Promise<containerizer::Termination> > promise(
-        new Promise<containerizer::Termination>());
-    promises.put(containerId, promise);
-
-    CHECK_SOME(run.forkedPid);
-    Future<Option<int > > status = process::reap(run.forkedPid.get());
-    statuses[containerId] = status;
-    status.onAny(defer(self(), &Self::reaped, containerId));
-
-    foreach (const Owned<Isolator>& isolator, isolators) {
-      isolator->watch(containerId)
-        .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
-    }
-  }
-
-  return Nothing();
-}
-
-
-// This function is executed by the forked child and must remain
-// async-signal-safe.
-int execute(
-    const CommandInfo& command,
-    const string& directory,
-    const os::ExecEnv& envp,
-    uid_t uid,
-    gid_t gid,
-    bool redirectIO,
-    int pipeRead,
-    int pipeWrite,
-    const list<Option<CommandInfo> >& commands)
-{
-  if (close(pipeWrite) != 0) {
-    ABORT("Failed to close pipe[1]");
-  }
-
-  // Do a blocking read on the pipe until the parent signals us to continue.
-  char dummy;
-  ssize_t length;
-  while ((length = read(pipeRead, &dummy, sizeof(dummy))) == -1 &&
-         errno == EINTR);
-
-  if (length != sizeof(dummy)) {
-    // This will occur if the slave terminates during executor launch.
-    // There's a reasonable probability this will occur during slave restarts
-    // across a large/busy cluster so we log and exit non-zero rather than
-    // ABORT.
-    const char* message =
-      "Failed to synchronize with slave (it has probably exited)";
-
-    while (write(STDERR_FILENO, message, strlen(message)) == -1 &&
-           errno == EINTR);
-
-    _exit(1);
-  }
-
-  if (close(pipeRead) != 0) {
-    ABORT("Failed to close pipe[0]");
-  }
-
-  // Change gid and uid.
-  if (setgid(gid) != 0) {
-    ABORT("Failed to set gid");
-  }
-
-  if (setuid(uid) != 0) {
-    ABORT("Failed to set uid");
-  }
-
-  // Enter working directory.
-  if (chdir(directory.c_str()) != 0) {
-    ABORT("Failed to chdir into work directory");
-  }
-
-  // Redirect output to files in working dir if required. We append because
-  // others (e.g., mesos-fetcher) may have already logged to the files.
-  // TODO(bmahler): It would be best if instead of closing stderr /
-  // stdout and redirecting, we instead always output to stderr /
-  // stdout. Also tee'ing their output into the work directory files
-  // when redirection is desired.
-  if (redirectIO) {
-    int fd;
-    while ((fd = open(
-        "stdout",
-        O_CREAT | O_WRONLY | O_APPEND,
-        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) == -1 &&
-            errno == EINTR);
-    if (fd == -1) {
-      ABORT("Failed to open stdout");
-    }
-
-    int status;
-    while ((status = dup2(fd, STDOUT_FILENO)) == -1 && errno == EINTR);
-    if (status == -1) {
-      ABORT("Failed to dup2 for stdout");
-    }
-
-    if (close(fd) == -1) {
-      ABORT("Failed to close stdout fd");
-    }
-
-    while ((fd = open(
-        "stderr",
-        O_CREAT | O_WRONLY | O_APPEND,
-        S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH)) == -1 &&
-            errno == EINTR);
-    if (fd == -1) {
-      ABORT("Failed to open stderr");
-    }
-
-    while ((status = dup2(fd, STDERR_FILENO)) == -1 && errno == EINTR);
-    if (status == -1) {
-      ABORT("Failed to dup2 for stderr");
-    }
-
-    if (close(fd) == -1) {
-      ABORT("Failed to close stderr fd");
-    }
-  }
-
-  // Run additional preparation commands. These are run as the same user and
-  // with the environment as the slave.
-  // NOTE: os::system() is async-signal-safe.
-  foreach (const Option<CommandInfo>& command, commands) {
-    if (command.isSome()) {
-      // Block until the command completes.
-      int status = os::system(command.get().value());
-
-      if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
-        ABORT("Command '",
-              command.get().value().c_str(),
-              "' failed to execute successfully");
-      }
-    }
-  }
-
-  // Execute the command (via '/bin/sh -c command') with its environment.
-  execle("/bin/sh", "sh", "-c", command.value().c_str(), (char*) NULL, envp());
-
-  // If we get here, the execv call failed.
-  ABORT("Failed to execute command");
-
-  // This should not be reached.
-  return -1;
-}
-
-
-// Launching an executor involves the following steps:
-// 1. Call prepare on each isolator.
-// 2. Fork the executor. The forked child is blocked from exec'ing until it has
-//    been isolated.
-// 3. Isolate the executor. Call isolate with the pid for each isolator.
-// 4. Fetch the executor.
-// 4. Exec the executor. The forked child is signalled to continue. It will
-//    first execute any preparation commands from isolators and then exec the
-//    executor.
-Future<Nothing> MesosContainerizerProcess::launch(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
-{
-  if (promises.contains(containerId)) {
-    LOG(ERROR) << "Cannot start already running container '"
-               << containerId << "'";
-    return Failure("Container already started");
-  }
-
-  // TODO(tillt): The slave should expose which containerization
-  // mechanisms it supports to avoid scheduling tasks that it cannot
-  // run.
-  const CommandInfo& command = executorInfo.command();
-  if (command.has_container()) {
-    // We return a Failure as this containerizer does not support
-    // handling ContainerInfo. Users have to be made aware of this
-    // lack of support to prevent confusion in the task configuration.
-    return Failure("ContainerInfo is not supported");
-  }
-
-  Owned<Promise<containerizer::Termination> > promise(
-      new Promise<containerizer::Termination>());
-  promises.put(containerId, promise);
-
-  // Store the resources for usage().
-  resources.put(containerId, executorInfo.resources());
-
-  LOG(INFO) << "Starting container '" << containerId
-            << "' for executor '" << executorInfo.executor_id()
-            << "' of framework '" << executorInfo.framework_id() << "'";
-
-  return prepare(containerId, executorInfo, directory, user)
-    .then(defer(self(),
-                &Self::_launch,
-                containerId,
-                executorInfo,
-                directory,
-                user,
-                slaveId,
-                slavePid,
-                checkpoint,
-                lambda::_1))
-    .onFailed(defer(self(),
-                    &Self::destroy,
-                    containerId));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::launch(
-    const ContainerID& containerId,
-    const TaskInfo&,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
-{
-  return launch(
-      containerId,
-      executorInfo,
-      directory,
-      user,
-      slaveId,
-      slavePid,
-      checkpoint);
-}
-
-Future<list<Option<CommandInfo> > > MesosContainerizerProcess::prepare(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user)
-{
-  // Start preparing all isolators (in parallel) and gather any additional
-  // preparation comands that must be run in the forked child before exec'ing
-  // the executor.
-  list<Future<Option<CommandInfo> > > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->prepare(containerId, executorInfo));
-  }
-
-  // Wait for all isolators to complete preparations.
-  return collect(futures);
-}
-
-
-Future<Nothing> _fetch(
-    const ContainerID& containerId,
-    const string& directory,
-    const Option<string>& user,
-    const Option<int>& status)
-{
-  if (status.isNone() || (status.get() != 0)) {
-    return Failure("Failed to fetch URIs for container '" +
-                   stringify(containerId) + "': exit status " +
-                   (status.isNone() ? "none" : stringify(status.get())));
-  }
-
-  // Chown the work directory if a user is provided.
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(user.get(), directory);
-    if (chown.isError()) {
-      return Failure("Failed to chown work directory");
-    }
-  }
-
-  return Nothing();
-}
-
-
-Future<Nothing> MesosContainerizerProcess::fetch(
-    const ContainerID& containerId,
-    const CommandInfo& commandInfo,
-    const string& directory,
-    const Option<string>& user)
-{
-  // Determine path for mesos-fetcher.
-  Result<string> realpath = os::realpath(
-      path::join(flags.launcher_dir, "mesos-fetcher"));
-
-  if (!realpath.isSome()) {
-    LOG(ERROR) << "Failed to determine the canonical path "
-                << "for the mesos-fetcher '"
-                << path::join(flags.launcher_dir, "mesos-fetcher")
-                << "': "
-                << (realpath.isError() ? realpath.error()
-                                       : "No such file or directory");
-    return Failure("Could not fetch URIs: failed to find mesos-fetcher");
-  }
-
-  map<string, string> environment =
-    fetcherEnvironment(commandInfo, directory, user, flags);
-
-  // Now the actual mesos-fetcher command.
-  string command = realpath.get();
-
-  LOG(INFO) << "Fetching URIs for container '" << containerId
-            << "' using command '" << command << "'";
-
-  Try<Subprocess> fetcher = subprocess(
-      command,
-      Subprocess::PIPE(),
-      Subprocess::PIPE(),
-      Subprocess::PIPE(),
-      environment);
-
-  if (fetcher.isError()) {
-    return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
-  }
-
-  // Redirect output (stdout and stderr) from the fetcher to log files
-  // in the executor work directory, chown'ing them if a user is
-  // specified.
-  // TODO(tillt): Consider adding O_CLOEXEC for atomic close-on-exec.
-  // TODO(tillt): Consider adding an overload to io::redirect
-  // that accepts a file path as 'to' for further reducing code. We
-  // would however also need an owner user parameter for such overload
-  // to perfectly replace the below.
-  Try<int> out = os::open(
-      path::join(directory, "stdout"),
-      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
-      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
-  if (out.isError()) {
-    return Failure("Failed to redirect stdout: " + out.error());
-  }
-
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(
-        user.get(), path::join(directory, "stdout"));
-    if (chown.isError()) {
-      os::close(out.get());
-      return Failure(
-          "Failed to redirect stdout: Failed to chown: " +
-          chown.error());
-    }
-  }
-
-  // Redirect takes care of nonblocking and close-on-exec for the
-  // supplied file descriptors.
-  io::redirect(fetcher.get().out().get(), out.get());
-
-  // Redirect does 'dup' the file descriptor, hence we can close the
-  // original now.
-  os::close(out.get());
-
-  // Repeat for stderr.
-  Try<int> err = os::open(
-      path::join(directory, "stderr"),
-      O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK,
-      S_IRUSR | S_IWUSR | S_IRGRP | S_IRWXO);
-
-  if (err.isError()) {
-    return Failure(
-        "Failed to redirect stderr: Failed to open: " +
-        err.error());
-  }
-
-  if (user.isSome()) {
-    Try<Nothing> chown = os::chown(
-        user.get(), path::join(directory, "stderr"));
-    if (chown.isError()) {
-      os::close(err.get());
-      return Failure(
-          "Failed to redirect stderr: Failed to chown: " +
-          chown.error());
-    }
-  }
-
-  io::redirect(fetcher.get().err().get(), err.get());
-
-  os::close(err.get());
-
-  return fetcher.get().status()
-    .then(lambda::bind(&_fetch, containerId, directory, user, lambda::_1));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::_launch(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const Option<string>& user,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint,
-    const list<Option<CommandInfo> >& commands)
-{
-  // Prepare environment variables for the executor.
-  map<string, string> env = executorEnvironment(
-      executorInfo,
-      directory,
-      slaveId,
-      slavePid,
-      checkpoint,
-      flags.recovery_timeout);
-
-  // Include any enviroment variables from CommandInfo.
-  foreach (const Environment::Variable& variable,
-           executorInfo.command().environment().variables()) {
-    env[variable.name()] = variable.value();
-  }
-
-  // Construct a representation of the environment suitable for
-  // passing to execle in the child. We construct it here because it
-  // is not async-signal-safe.
-  os::ExecEnv envp(env);
-
-  // Use a pipe to block the child until it's been isolated.
-  int pipes[2];
-  // We assume this should not fail under reasonable conditions so we
-  // use CHECK.
-  CHECK(pipe(pipes) == 0);
-
-  // Determine the uid and gid for the child now because getpwnam is
-  // not async signal safe.
-  Result<uid_t> uid = os::getuid(user);
-  if (uid.isError() || uid.isNone()) {
-    return Failure("Invalid user: " + (uid.isError() ? uid.error()
-                                                     : "nonexistent"));
-  }
-
-  Result<gid_t> gid = os::getgid(user);
-  if (gid.isError() || gid.isNone()) {
-    return Failure("Invalid user: " + (gid.isError() ? gid.error()
-                                                     : "nonexistent"));
-  }
-
-
-  // Prepare a function for the forked child to exec() the executor.
-  lambda::function<int()> childFunction = lambda::bind(
-      &execute,
-      executorInfo.command(),
-      directory,
-      envp,
-      uid.get(),
-      gid.get(),
-      !local,
-      pipes[0],
-      pipes[1],
-      commands);
-
-  Try<pid_t> forked = launcher->fork(containerId, childFunction);
-
-  if (forked.isError()) {
-    return Failure("Failed to fork executor: " + forked.error());
-  }
-  pid_t pid = forked.get();
-
-  // Checkpoint the executor's pid if requested.
-  if (checkpoint) {
-    const string& path = slave::paths::getForkedPidPath(
-        slave::paths::getMetaRootDir(flags.work_dir),
-        slaveId,
-        executorInfo.framework_id(),
-        executorInfo.executor_id(),
-        containerId);
-
-    LOG(INFO) << "Checkpointing executor's forked pid " << pid
-              << " to '" << path <<  "'";
-
-    Try<Nothing> checkpointed =
-      slave::state::checkpoint(path, stringify(pid));
-
-    if (checkpointed.isError()) {
-      LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
-                 << path << "': " << checkpointed.error();
-
-      return Failure("Could not checkpoint executor's pid");
-    }
-  }
-
-  // Monitor the executor's pid. We keep the future because we'll refer to it
-  // again during container destroy.
-  Future<Option<int> > status = process::reap(pid);
-  statuses.put(containerId, status);
-  status.onAny(defer(self(), &Self::reaped, containerId));
-
-  return isolate(containerId, pid)
-    .then(defer(self(),
-                &Self::fetch,
-                containerId,
-                executorInfo.command(),
-                directory,
-                user))
-    .then(defer(self(), &Self::exec, containerId, pipes[1]))
-    .onAny(lambda::bind(&os::close, pipes[0]))
-    .onAny(lambda::bind(&os::close, pipes[1]));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::isolate(
-    const ContainerID& containerId,
-    pid_t _pid)
-{
-  // Set up callbacks for isolator limitations.
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    isolator->watch(containerId)
-      .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
-  }
-
-  // Isolate the executor with each isolator.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->isolate(containerId, _pid));
-  }
-
-  // Wait for all isolators to complete.
-  return collect(futures)
-    .then(lambda::bind(&_nothing));
-}
-
-
-Future<Nothing> MesosContainerizerProcess::exec(
-    const ContainerID& containerId,
-    int pipeWrite)
-{
-  CHECK(promises.contains(containerId));
-
-  // Now that we've contained the child we can signal it to continue by
-  // writing to the pipe.
-  char dummy;
-  ssize_t length;
-  while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
-         errno == EINTR);
-
-  if (length != sizeof(dummy)) {
-    return Failure("Failed to synchronize child process: " +
-                   string(strerror(errno)));
-  }
-
-  return Nothing();
-}
-
-
-Future<containerizer::Termination> MesosContainerizerProcess::wait(
-    const ContainerID& containerId)
-{
-  if (!promises.contains(containerId)) {
-    return Failure("Unknown container: " + stringify(containerId));
-  }
-
-  return promises[containerId]->future();
-}
-
-
-Future<Nothing> MesosContainerizerProcess::update(
-    const ContainerID& containerId,
-    const Resources& _resources)
-{
-  // The resources hashmap won't initially contain the container's resources
-  // after recovery so we must check the promises hashmap (which is set during
-  // recovery).
-  if (!promises.contains(containerId)) {
-    // It is not considered a failure if the container is not known
-    // because the slave will attempt to update the container's
-    // resources on a task's terminal state change but the executor
-    // may have already exited and the container cleaned up.
-    LOG(WARNING) << "Ignoring update for unknown container: " << containerId;
-    return Nothing();
-  }
-
-  // Store the resources for usage().
-  resources.put(containerId, _resources);
-
-  // Update each isolator.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->update(containerId, _resources));
-  }
-
-  // Wait for all isolators to complete.
-  return collect(futures)
-    .then(lambda::bind(&_nothing));
-}
-
-
-// Resources are used to set the limit fields in the statistics but are
-// optional because they aren't known after recovery until/unless update() is
-// called.
-Future<ResourceStatistics> _usage(
-    const ContainerID& containerId,
-    const Option<Resources>& resources,
-    const list<Future<ResourceStatistics> >& statistics)
-{
-  ResourceStatistics result;
-
-  // Set the timestamp now we have all statistics.
-  result.set_timestamp(Clock::now().secs());
-
-  foreach (const Future<ResourceStatistics>& statistic, statistics) {
-    if (statistic.isReady()) {
-      result.MergeFrom(statistic.get());
-    } else {
-      LOG(WARNING) << "Skipping resource statistic for container "
-                   << containerId << " because: "
-                   << (statistic.isFailed() ? statistic.failure()
-                                            : "discarded");
-    }
-  }
-
-  if (resources.isSome()) {
-    // Set the resource allocations.
-    Option<Bytes> mem = resources.get().mem();
-    if (mem.isSome()) {
-      result.set_mem_limit_bytes(mem.get().bytes());
-    }
-
-    Option<double> cpus = resources.get().cpus();
-    if (cpus.isSome()) {
-      result.set_cpus_limit(cpus.get());
-    }
-  }
-
-  return result;
-}
-
-
-Future<ResourceStatistics> MesosContainerizerProcess::usage(
-    const ContainerID& containerId)
-{
-  if (!promises.contains(containerId)) {
-    return Failure("Unknown container: " + stringify(containerId));
-  }
-
-  list<Future<ResourceStatistics> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->usage(containerId));
-  }
-
-  // Use await() here so we can return partial usage statistics.
-  // TODO(idownes): After recovery resources won't be known until after an
-  // update() because they aren't part of the SlaveState.
-  return await(futures)
-    .then(lambda::bind(
-          _usage, containerId, resources.get(containerId), lambda::_1));
-}
-
-
-void MesosContainerizerProcess::destroy(const ContainerID& containerId)
-{
-  if (!promises.contains(containerId)) {
-    LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
-    return;
-  }
-
-  if (destroying.contains(containerId)) {
-    // Destroy has already been initiated.
-    return;
-  }
-  destroying.insert(containerId);
-
-  LOG(INFO) << "Destroying container '" << containerId << "'";
-
-  if (statuses.contains(containerId)) {
-    // Kill all processes then continue destruction.
-    launcher->destroy(containerId)
-      .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
-  } else {
-    // The executor never forked so no processes to kill, go straight to
-    // __destroy() with status = None().
-    __destroy(containerId, None());
-  }
-}
-
-
-void MesosContainerizerProcess::_destroy(
-    const ContainerID& containerId,
-    const Future<Nothing>& future)
-{
-  // Something has gone wrong and the launcher wasn't able to kill all the
-  // processes in the container. We cannot clean up the isolators because they
-  // may require that all processes have exited so just return the failure to
-  // the slave.
-  // TODO(idownes): This is a pretty bad state to be in but we should consider
-  // cleaning up here.
-  if (!future.isReady()) {
-    promises[containerId]->fail(
-        "Failed to destroy container: " +
-        (future.isFailed() ? future.failure() : "discarded future"));
-
-    destroying.erase(containerId);
-
-    return;
-  }
-
-  // We've successfully killed all processes in the container so get the exit
-  // status of the executor when it's ready (it may already be) and continue
-  // the destroy.
-  statuses.get(containerId).get()
-    .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
-}
-
-
-void MesosContainerizerProcess::__destroy(
-    const ContainerID& containerId,
-    const Future<Option<int > >& status)
-{
-  // Now that all processes have exited we can now clean up all isolators.
-  list<Future<Nothing> > futures;
-  foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->cleanup(containerId));
-  }
-
-  // Wait for all isolators to complete cleanup before continuing.
-  collect(futures)
-    .onAny(defer(self(), &Self::___destroy, containerId, status, lambda::_1));
-}
-
-
-void MesosContainerizerProcess::___destroy(
-    const ContainerID& containerId,
-    const Future<Option<int > >& status,
-    const Future<list<Nothing> >& futures)
-{
-  // Something has gone wrong with one of the Isolators and cleanup failed.
-  // We'll fail the container termination and remove the 'destroying' flag but
-  // leave all other state. The containerizer is now in a bad state because
-  // at least one isolator has failed to clean up.
-  if (!futures.isReady()) {
-    promises[containerId]->fail(
-        "Failed to clean up isolators when destroying container: " +
-        (futures.isFailed() ? futures.failure() : "discarded future"));
-
-    destroying.erase(containerId);
-
-    return;
-  }
-
-  // A container is 'killed' if any isolator limited it.
-  // Note: We may not see a limitation in time for it to be registered. This
-  // could occur if the limitation (e.g., an OOM) killed the executor and we
-  // triggered destroy() off the executor exit.
-  bool killed = false;
-  string message;
-  if (limitations.contains(containerId)) {
-    killed = true;
-    foreach (const Limitation& limitation, limitations.get(containerId)) {
-      message += limitation.message;
-    }
-    message = strings::trim(message);
-  } else {
-    message = "Executor terminated";
-  }
-
-  containerizer::Termination termination;
-  termination.set_killed(killed);
-  termination.set_message(message);
-  if (status.isReady() && status.get().isSome()) {
-    termination.set_status(status.get().get());
-  }
-
-  promises[containerId]->set(termination);
-
-  promises.erase(containerId);
-  statuses.erase(containerId);
-  limitations.erase(containerId);
-  resources.erase(containerId);
-  destroying.erase(containerId);
-}
-
-
-void MesosContainerizerProcess::reaped(const ContainerID& containerId)
-{
-  if (!promises.contains(containerId)) {
-    return;
-  }
-
-  LOG(INFO) << "Executor for container '" << containerId << "' has exited";
-
-  // The executor has exited so destroy the container.
-  destroy(containerId);
-}
-
-
-void MesosContainerizerProcess::limited(
-    const ContainerID& containerId,
-    const Future<Limitation>& future)
-{
-  if (!promises.contains(containerId)) {
-    return;
-  }
-
-  if (future.isReady()) {
-    LOG(INFO) << "Container " << containerId << " has reached its limit for"
-              << " resource " << future.get().resource
-              << " and will be terminated";
-    limitations.put(containerId, future.get());
-  } else {
-    // TODO(idownes): A discarded future will not be an error when isolators
-    // discard their promises after cleanup.
-    LOG(ERROR) << "Error in a resource limitation for container "
-               << containerId << ": " << (future.isFailed() ? future.failure()
-                                                            : "discarded");
-  }
-
-  // The container has been affected by the limitation so destroy it.
-  destroy(containerId);
-}
-
-
-Future<hashset<ContainerID> > MesosContainerizerProcess::containers()
-{
-  return promises.keys();
-}
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/slave/containerizer/mesos_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos_containerizer.hpp b/src/slave/containerizer/mesos_containerizer.hpp
deleted file mode 100644
index 21affae..0000000
--- a/src/slave/containerizer/mesos_containerizer.hpp
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MESOS_CONTAINERIZER_HPP__
-#define __MESOS_CONTAINERIZER_HPP__
-
-#include <list>
-#include <vector>
-
-#include <stout/hashmap.hpp>
-#include <stout/lambda.hpp>
-#include <stout/multihashmap.hpp>
-
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/isolator.hpp"
-#include "slave/containerizer/launcher.hpp"
-
-namespace mesos {
-namespace internal {
-namespace slave {
-
-// Forward declaration.
-class MesosContainerizerProcess;
-
-class MesosContainerizer : public Containerizer
-{
-public:
-  static Try<MesosContainerizer*> create(const Flags& flags, bool local);
-
-  MesosContainerizer(
-      const Flags& flags,
-      bool local,
-      const process::Owned<Launcher>& launcher,
-      const std::vector<process::Owned<Isolator> >& isolators);
-
-  virtual ~MesosContainerizer();
-
-  virtual process::Future<Nothing> recover(
-      const Option<state::SlaveState>& state);
-
-  virtual process::Future<Nothing> launch(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint);
-
-  virtual process::Future<Nothing> launch(
-      const ContainerID& containerId,
-      const TaskInfo& taskInfo,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint);
-
-  virtual process::Future<Nothing> update(
-      const ContainerID& containerId,
-      const Resources& resources);
-
-  virtual process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId);
-
-  virtual process::Future<containerizer::Termination> wait(
-      const ContainerID& containerId);
-
-  virtual void destroy(const ContainerID& containerId);
-
-  virtual process::Future<hashset<ContainerID> > containers();
-
-private:
-  MesosContainerizerProcess* process;
-};
-
-
-class MesosContainerizerProcess
-  : public process::Process<MesosContainerizerProcess>
-{
-public:
-  MesosContainerizerProcess(
-      const Flags& _flags,
-      bool _local,
-      const process::Owned<Launcher>& _launcher,
-      const std::vector<process::Owned<Isolator> >& _isolators)
-    : flags(_flags),
-      local(_local),
-      launcher(_launcher),
-      isolators(_isolators) {}
-
-  virtual ~MesosContainerizerProcess() {}
-
-  process::Future<Nothing> recover(
-      const Option<state::SlaveState>& state);
-
-  process::Future<Nothing> launch(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint);
-
-  process::Future<Nothing> launch(
-      const ContainerID& containerId,
-      const TaskInfo& taskInfo,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint);
-
-  process::Future<Nothing> update(
-      const ContainerID& containerId,
-      const Resources& resources);
-
-  process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId);
-
-  process::Future<containerizer::Termination> wait(
-      const ContainerID& containerId);
-
-  void destroy(const ContainerID& containerId);
-
-  process::Future<hashset<ContainerID> > containers();
-
-private:
-  process::Future<Nothing> _recover(
-      const std::list<state::RunState>& recoverable);
-
-  process::Future<Nothing> __recover(
-      const std::list<state::RunState>& recovered);
-
-  process::Future<std::list<Option<CommandInfo> > > prepare(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user);
-
-  process::Future<Nothing> fetch(
-      const ContainerID& containerId,
-      const CommandInfo& commandInfo,
-      const std::string& directory,
-      const Option<std::string>& user);
-
-  process::Future<Nothing> _launch(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<std::string>& user,
-      const SlaveID& slaveId,
-      const process::PID<Slave>& slavePid,
-      bool checkpoint,
-      const std::list<Option<CommandInfo> >& scripts);
-
-  process::Future<Nothing> isolate(
-      const ContainerID& containerId,
-      pid_t _pid);
-
-  process::Future<Nothing> exec(
-      const ContainerID& containerId,
-      int pipeWrite);
-
-  // Continues 'destroy()' once all processes have been killed by the launcher.
-  void _destroy(
-      const ContainerID& containerId,
-      const process::Future<Nothing>& future);
-
-  // Continues '_destroy()' once we get the exit status of the executor.
-  void __destroy(
-      const ContainerID& containerId,
-      const process::Future<Option<int > >& status);
-
-  // Continues (and completes) '__destroy()' once all isolators have completed
-  // cleanup.
-  void ___destroy(
-      const ContainerID& containerId,
-      const process::Future<Option<int > >& status,
-      const process::Future<std::list<Nothing> >& futures);
-
-  // Call back for when an isolator limits a container and impacts the
-  // processes. This will trigger container destruction.
-  void limited(
-      const ContainerID& containerId,
-      const process::Future<Limitation>& future);
-
-  // Call back for when the executor exits. This will trigger container
-  // destroy.
-  void reaped(const ContainerID& containerId);
-
-  const Flags flags;
-  const bool local;
-  const process::Owned<Launcher> launcher;
-  const std::vector<process::Owned<Isolator> > isolators;
-
-  // TODO(idownes): Consider putting these per-container variables into a
-  // struct.
-  // Promises for futures returned from wait().
-  hashmap<ContainerID,
-    process::Owned<process::Promise<containerizer::Termination> > > promises;
-
-  // We need to keep track of the future exit status for each executor because
-  // we'll only get a single notification when the executor exits.
-  hashmap<ContainerID, process::Future<Option<int> > > statuses;
-
-  // We keep track of any limitations received from each isolator so we can
-  // determine the cause of an executor termination.
-  multihashmap<ContainerID, Limitation> limitations;
-
-  // We keep track of the resources for each container so we can set the
-  // ResourceStatistics limits in usage().
-  hashmap<ContainerID, Resources> resources;
-
-  // Set of containers that are in process of being destroyed.
-  hashset<ContainerID> destroying;
-};
-
-
-} // namespace slave {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __MESOS_CONTAINERIZER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/cgroups_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cgroups_isolator_tests.cpp b/src/tests/cgroups_isolator_tests.cpp
index 5a9704d..2d9738f 100644
--- a/src/tests/cgroups_isolator_tests.cpp
+++ b/src/tests/cgroups_isolator_tests.cpp
@@ -25,7 +25,7 @@
 #include <stout/proc.hpp>
 #include <stout/stringify.hpp>
 
-#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/script.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index 8ea7974..70e1245 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -31,7 +31,8 @@
 
 #include "slave/containerizer/isolator.hpp"
 #include "slave/containerizer/launcher.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/flags.hpp"
 #include "tests/isolator.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 0bbec09..5a141e3 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -29,6 +29,7 @@
 #include <process/owned.hpp>
 #include <process/reap.hpp>
 
+#include <stout/abort.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 
@@ -84,23 +85,24 @@ using testing::Return;
 using testing::SaveArg;
 
 
-int execute(const std::string& command, int pipes[2])
+static int childSetup(int pipes[2])
 {
-  // In child process
-  ::close(pipes[1]);
+  // In child process.
+  while (::close(pipes[1]) == -1 && errno == EINTR);
 
   // Wait until the parent signals us to continue.
-  int buf;
-  while (::read(pipes[0], &buf, sizeof(buf)) == -1 && errno == EINTR);
-  ::close(pipes[0]);
+  char dummy;
+  ssize_t length;
+  while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
 
-  execl("/bin/sh", "sh", "-c", command.c_str(), (char*) NULL);
+  if (length != sizeof(dummy)) {
+    ABORT("Failed to synchronize with parent");
+  }
 
-  const char* message = "Child failed to exec";
-  while (write(STDERR_FILENO, message, strlen(message)) == -1 &&
-         errno == EINTR);
+  while (::close(pipes[0]) == -1 && errno == EINTR);
 
-  _exit(1);
+  return 0;
 }
 
 
@@ -147,24 +149,38 @@ TYPED_TEST(CpuIsolatorTest, UserCpuUsage)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = command;
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Reap the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1],  &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait for the command to start.
   while (!os::exists(file));
@@ -238,24 +254,38 @@ TYPED_TEST(CpuIsolatorTest, SystemCpuUsage)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = command;
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Reap the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1],  &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1],  &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait for the command to start.
   while (!os::exists(file));
@@ -335,24 +365,38 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(&execute, command, pipes);
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = command;
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Reap the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1],  &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1],  &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait for the command to complete.
   AWAIT_READY(status);
@@ -413,24 +457,38 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs_Big_Quota)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(&execute, "exit 0", pipes);
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = "exit 0";
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Reap the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1],  &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1],  &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait for the command to complete successfully.
   AWAIT_READY(status);
@@ -466,13 +524,20 @@ TYPED_TEST_CASE(MemIsolatorTest, MemIsolatorTypes);
 // posix_memalign, mlock, memset and perror are not safe.
 int consumeMemory(const Bytes& _size, const Duration& duration, int pipes[2])
 {
-  // In child process
-  ::close(pipes[1]);
+  // In child process.
+  while (::close(pipes[1]) == -1 && errno == EINTR);
 
-  int buf;
   // Wait until the parent signals us to continue.
-  while (::read(pipes[0], &buf, sizeof(buf)) == -1 && errno == EINTR);
-  ::close(pipes[0]);
+  char dummy;
+  ssize_t length;
+  while ((length = ::read(pipes[0], &dummy, sizeof(dummy))) == -1 &&
+         errno == EINTR);
+
+  if (length != sizeof(dummy)) {
+    ABORT("Failed to synchronize with parent");
+  }
+
+  while (::close(pipes[0]) == -1 && errno == EINTR);
 
   size_t size = static_cast<size_t>(_size.bytes());
   void* buffer = NULL;
@@ -522,28 +587,33 @@ TYPED_TEST(MemIsolatorTest, MemUsage)
   int pipes[2];
   ASSERT_NE(-1, ::pipe(pipes));
 
-  lambda::function<int()> inChild = lambda::bind(
-      &consumeMemory,
-      Megabytes(256),
-      Seconds(10),
-      pipes);
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      vector<string>(),
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&consumeMemory, Megabytes(256), Seconds(10), pipes));
 
-  Try<pid_t> pid = launcher.get()->fork(containerId, inChild);
   ASSERT_SOME(pid);
 
   // Set up the reaper to wait on the forked child.
   Future<Option<int> > status = process::reap(pid.get());
 
   // Continue in the parent.
-  ::close(pipes[0]);
+  ASSERT_SOME(os::close(pipes[0]));
 
   // Isolate the forked child.
   AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
 
   // Now signal the child to continue.
-  int buf;
-  ASSERT_LT(0, ::write(pipes[1], &buf, sizeof(buf)));
-  ::close(pipes[1]);
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
 
   // Wait up to 5 seconds for the child process to consume 256 MB of memory;
   ResourceStatistics statistics;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 885e0fb..c498d33 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -50,7 +50,7 @@
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
-#include "slave/containerizer/mesos_containerizer.hpp"
+#include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 24af32b..216bd6f 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -34,7 +34,8 @@
 #endif
 
 #include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
 
 #include "tests/containerizer.hpp"
 #include "tests/environment.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index eeb8631..ae38a13 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -50,10 +50,12 @@
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
-#include "slave/containerizer/containerizer.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/containerizer.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+
 #include "tests/cluster.hpp"
 #include "tests/utils.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d0046dca/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 873f22d..371a5b8 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -43,11 +43,12 @@
 #include "master/master.hpp"
 
 #include "slave/constants.hpp"
-#include "slave/containerizer/mesos_containerizer.hpp"
 #include "slave/gc.hpp"
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
+#include "slave/containerizer/mesos/containerizer.hpp"
+
 #include "tests/containerizer.hpp"
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"