You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2015/05/25 07:31:37 UTC

[02/20] mesos git commit: Add option to launch docker containers with helper containers.

Add option to launch docker containers with helper containers.

Review: https://reviews.apache.org/r/29334


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/280e4659
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/280e4659
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/280e4659

Branch: refs/heads/master
Commit: 280e4659b922d84c5c405a176b8d5db2f9203928
Parents: 168ba44
Author: Timothy Chen <tn...@apache.org>
Authored: Tue Dec 2 18:23:17 2014 -0800
Committer: Timothy Chen <tn...@apache.org>
Committed: Fri May 22 23:13:50 2015 -0700

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp | 212 +++++++++++++++++++++++++++++---
 src/slave/containerizer/docker.hpp |  32 ++++-
 2 files changed, 226 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/280e4659/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index c4e636c..c996afe 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -649,11 +649,29 @@ Future<bool> DockerContainerizerProcess::launch(
             << "' (and executor '" << executorInfo.executor_id()
             << "') of framework '" << executorInfo.framework_id() << "'";
 
-  return container.get()->launch = fetch(containerId)
+  Future<Nothing> future = fetch(containerId)
     .then(defer(self(), &Self::_launch, containerId))
-    .then(defer(self(), &Self::__launch, containerId))
-    .then(defer(self(), &Self::___launch, containerId))
-    .then(defer(self(), &Self::______launch, containerId, lambda::_1));
+    .then(defer(self(), &Self::__launch, containerId));
+
+  if (flags.docker_mesos_image.isNone()) {
+    // Launch executor and logs with subprocess.
+    return container.get()->launch = future
+      .then(defer(self(), &Self::___launch, containerId))
+      .then(defer(self(), &Self::______launch, containerId, lambda::_1))
+      .then(defer(self(), &Self::_______launch, containerId, lambda::_1))
+      .onFailed(defer(self(), &Self::destroy, containerId, true));
+  }
+
+  // Launch executor and logs with docker containers.
+  return container.get()->launch = future
+    .then(defer(self(), &Self::___launchInContainer, containerId))
+    .then(defer(
+        self(),
+        &Self::____launchInContainer,
+        containerId))
+    .then(defer(self(), &Self::______launch, containerId, lambda::_1))
+    .then(defer(self(), &Self::_______launch, containerId, lambda::_1))
+    .onFailed(defer(self(), &Self::destroy, containerId, true));
 }
 
 
@@ -777,6 +795,79 @@ Future<pid_t> DockerContainerizerProcess::___launch(
 }
 
 
+Future<Nothing> DockerContainerizerProcess::___launchInContainer(
+    const ContainerID& containerId)
+{
+  // After we do Docker::run we shouldn't remove a container until
+  // after we set Container::status.
+  CHECK(containers_.contains(containerId));
+  CHECK(flags.docker_mesos_image.isSome());
+
+  Container* container = containers_[containerId];
+
+  // Prepare environment variables for the executor.
+  map<string, string> environment = executorEnvironment(
+      container->executor,
+      container->directory,
+      container->slaveId,
+      container->slavePid,
+      container->checkpoint,
+      flags.recovery_timeout);
+
+  // Include any enviroment variables from ExecutorInfo.
+  foreach (const Environment::Variable& variable,
+           container->executor.command().environment().variables()) {
+    environment[variable.name()] = variable.value();
+  }
+
+  ContainerInfo containerInfo;
+  ContainerInfo::DockerInfo dockerInfo;
+
+  dockerInfo.set_image(flags.docker_mesos_image.get());
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  string command = "mesos-docker-executor --docker=" + flags.docker +
+                   " --container=" + container->name();
+
+  command = path::join(flags.launcher_dir, command);
+
+  CommandInfo commandInfo;
+  commandInfo.set_value(command);
+  commandInfo.set_shell(true);
+
+  VLOG(2) << "Launching docker executor in container with command: " << command;
+
+  return docker->run(
+      containerInfo,
+      commandInfo,
+      container->executorName(),
+      container->directory,
+      flags.docker_sandbox_directory,
+      None(),
+      environment);
+}
+
+
+// Launches a docker wait process on given container name.
+// Returns the wait process pid.
+Try<pid_t> launchWaitProcess(const string& docker, const string& name)
+{
+  string command = "exit `" + docker + " wait " + name + "`";
+
+  Try<Subprocess> wait = subprocess(
+      command,
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"));
+
+  if (wait.isError()) {
+    return Error("Unable to launch docker wait on executor: " + wait.error());
+  }
+
+  return wait.get().pid();
+}
+
+
 Future<bool> DockerContainerizerProcess::launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
@@ -828,20 +919,17 @@ Future<bool> DockerContainerizerProcess::launch(
     .then(defer(self(), &Self::__launch, containerId))
     .then(defer(self(), &Self::____launch, containerId))
     .then(defer(self(), &Self::_____launch, containerId, lambda::_1))
-    .then(defer(self(), &Self::______launch, containerId, lambda::_1));
+    .then(defer(self(), &Self::______launch, containerId, lambda::_1))
+    .then(defer(self(), &Self::_______launch, containerId, lambda::_1))
 }
 
 
 Future<Docker::Container> DockerContainerizerProcess::____launch(
     const ContainerID& containerId)
 {
-  // After we do Docker::run we shouldn't remove a container until
-  // after we set Container::status.
   CHECK(containers_.contains(containerId));
 
-  Container* container = containers_[containerId];
-
-  return docker->inspect(container->name());
+  return docker->inspect(containers_[containerId]->name());
 }
 
 
@@ -859,12 +947,6 @@ Future<pid_t> DockerContainerizerProcess::_____launch(
     return Failure("Unable to get executor pid after launch");
   }
 
-  // TODO(tnachen): We might not be able to checkpoint if the slave
-  // dies before it can checkpoint while the executor is still
-  // running. Optinally we can consider recording the slave id and
-  // executor id as part of the docker container name so we can
-  // recover from this.
-
   Try<Nothing> checkpointed = checkpoint(containerId, pid.get());
 
   if (checkpointed.isError()) {
@@ -876,7 +958,96 @@ Future<pid_t> DockerContainerizerProcess::_____launch(
 }
 
 
-Future<bool> DockerContainerizerProcess::______launch(
+Future<pid_t> DockerContainerizerProcess::______launch(
+    const ContainerID& containerId,
+    pid_t pid)
+{
+  CHECK(containers_.contains(containerId));
+
+  Container* container = containers_[containerId];
+
+  docker->logs(container->name(), container->directory);
+
+  return pid;
+}
+
+
+Future<pid_t> DockerContainerizerProcess::______launchInContainer(
+    const ContainerID& containerId,
+    pid_t pid)
+{
+  CHECK(containers_.contains(containerId));
+  CHECK(flags.docker_mesos_image.isSome());
+
+  Container* container = containers_[containerId];
+
+  // We are launching a docker container to read the logs from
+  // a given launched container into the sandbox stdout and stderr
+  // files. This requires the docker container to run docker logs,
+  // which requires us mounting in the docker socket and docker
+  // CLI binary.
+  ContainerInfo containerInfo;
+  Volume* volume = containerInfo.add_volumes();
+  volume->set_host_path(flags.docker_socket);
+  volume->set_container_path(flags.docker_socket);
+  volume->set_mode(Volume::RO);
+
+  volume = containerInfo.add_volumes();
+  volume->set_host_path(flags.docker);
+  volume->set_container_path(flags.docker);
+  volume->set_mode(Volume::RO);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image(flags.docker_mesos_image.get());
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  string command = flags.docker + " logs --follow " + container->name() +
+                   " 2>> " +
+                   path::join(flags.docker_sandbox_directory, "stderr") +
+                   " 1>> " +
+                   path::join(flags.docker_sandbox_directory, "stdout");
+
+  VLOG(1) << "Running docker logs in container with command: " << command;
+
+  CommandInfo commandInfo;
+  commandInfo.set_value(command);
+  commandInfo.set_shell(true);
+
+  docker->run(
+      containerInfo,
+      commandInfo,
+      container->logName(),
+      container->directory,
+      flags.docker_sandbox_directory);
+
+  return pid;
+}
+
+
+Future<pid_t> DockerContainerizerProcess::____launchInContainer(
+    const ContainerID& containerId)
+{
+  CHECK(containers_.contains(containerId));
+
+  Try<pid_t> waitPid =
+    launchWaitProcess(flags.docker, containers_[containerId]->executorName());
+
+  if (waitPid.isError()) {
+    return Failure(waitPid.error());
+  }
+
+  Try<Nothing> checkpointed = checkpoint(containerId, waitPid.get());
+
+  if (checkpointed.isError()) {
+    return Failure(
+        "Failed to checkpoint executor's pid: " + checkpointed.error());
+  }
+
+  return waitPid.get();
+}
+
+
+Future<bool> DockerContainerizerProcess::_______launch(
     const ContainerID& containerId,
     pid_t pid)
 {
@@ -1287,6 +1458,13 @@ void DockerContainerizerProcess::destroy(
 
   CHECK(container->state == Container::RUNNING);
 
+  // Remove the executor and log docker containers. They might not
+  // been configured to launch but we might have recovered containers
+  // on previous slave run that has configured to launch executor in
+  // docker.
+  docker->rm(container->logName(), true);
+  docker->rm(container->executorName(), true);
+
   container->state = Container::DESTROYING;
 
   if (container->executorPid.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/280e4659/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index f08520a..a54e0d4 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -196,6 +196,16 @@ private:
       const ContainerID& containerId);
 
   // NOTE: This continuation is only applicable when launching a
+  // container for a task.
+  process::Future<Nothing> ___launchInContainer(
+      const ContainerID& containerId);
+
+  // NOTE: This continuation is only applicable when launching a
+  // container for a task.
+  process::Future<pid_t> ____launchInContainer(
+      const ContainerID& containerId);
+
+  // NOTE: This continuation is only applicable when launching a
   // container for an executor.
   process::Future<Docker::Container> ____launch(
       const ContainerID& containerId);
@@ -206,7 +216,17 @@ private:
       const ContainerID& containerId,
       const Docker::Container& container);
 
-  process::Future<bool> ______launch(
+  process::Future<pid_t> ______launch(
+      const ContainerID& containerId,
+      pid_t pid);
+
+  // NOTE: This continuation is only applicable when launching a
+  // container for a task.
+  process::Future<pid_t> ______launchInContainer(
+      const ContainerID& containerId,
+      pid_t pid);
+
+  process::Future<bool> _______launch(
     const ContainerID& containerId,
     pid_t pid);
 
@@ -324,6 +344,16 @@ private:
       return DOCKER_NAME_PREFIX + slaveId.value() + "/" + stringify(id);
     }
 
+    std::string logName()
+    {
+      return name() + "/log";
+    }
+
+    std::string executorName()
+    {
+      return name() + "/executor";
+    }
+
     std::string image() const
     {
       if (task.isSome()) {