You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/10/31 23:06:08 UTC

[09/12] git commit: Even more simplification of launch paths in Docker containerizer.

Even more simplification of launch paths in Docker containerizer.

Review: https://reviews.apache.org/r/26615


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ce1fc54
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ce1fc54
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ce1fc54

Branch: refs/heads/master
Commit: 9ce1fc5486b341782b01698486b3a3d527c57c3c
Parents: 4cbcac4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Oct 11 14:42:20 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Oct 31 15:05:39 2014 -0700

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp | 218 +++++++++++++-------------------
 1 file changed, 85 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9ce1fc54/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index b211db0..73e3c40 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -139,6 +139,10 @@ private:
 
   process::Future<Nothing> _pull(const std::string& image);
 
+  Try<Nothing> checkpoint(
+      const ContainerID& containerId,
+      pid_t pid);
+
   process::Future<Nothing> _recover(
       const std::list<Docker::Container>& containers);
 
@@ -151,29 +155,13 @@ private:
       const std::string& directory);
 
   process::Future<bool> ___launch(
-      const ContainerID& containerId,
-      const TaskInfo& taskInfo,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const SlaveID& slaveId,
-      const PID<Slave>& slavePid,
-      bool checkpoint);
-
-  process::Future<bool> ___launch(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const string& directory,
-      const SlaveID& slaveId,
-      const PID<Slave>& slavePid,
-      bool checkpoint);
+      const ContainerID& containerId);
 
   process::Future<bool> ____launch(
+      const ContainerID& containerId);
+
+  process::Future<bool> _____launch(
       const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const string& directory,
-      const SlaveID& slaveId,
-      const PID<Slave>& slavePid,
-      bool checkpoint,
       const Docker::Container& container);
 
   void _destroy(
@@ -357,7 +345,9 @@ private:
     Future<Nothing> run;
 
     // We keep track of the resources for each container so we can set
-    // the ResourceStatistics limits in usage().
+    // the ResourceStatistics limits in usage(). Note that this is
+    // different than just what we might get from TaskInfo::resources
+    // or ExecutorInfo::resources because they can change dynamically.
     Resources resources;
 
     // The mesos-fetcher subprocess, kept around so that we can do a
@@ -558,6 +548,32 @@ Future<Nothing> DockerContainerizerProcess::_pull(const string& image)
 }
 
 
+Try<Nothing> DockerContainerizerProcess::checkpoint(
+    const ContainerID& containerId,
+    pid_t pid)
+{
+  CHECK(containers_.contains(containerId));
+
+  Container* container = containers_[containerId];
+
+  if (container->checkpoint) {
+    const string& path =
+      slave::paths::getForkedPidPath(
+	  slave::paths::getMetaRootDir(flags.work_dir),
+	  container->slaveId,
+	  container->executor.framework_id(),
+	  container->executor.executor_id(),
+	  containerId);
+
+    LOG(INFO) << "Checkpointing pid " << pid << " to '" << path <<  "'";
+
+    return slave::state::checkpoint(path, stringify(pid));
+  }
+
+  return Nothing();
+}
+
+
 Future<Nothing> DockerContainerizer::recover(
     const Option<SlaveState>& state)
 {
@@ -852,15 +868,7 @@ Future<bool> DockerContainerizerProcess::launch(
   return fetch(containerId, taskInfo.command(), directory)
     .then(defer(self(), &Self::_launch, containerId, directory))
     .then(defer(self(), &Self::__launch, containerId, directory))
-    .then(defer(self(),
-                &Self::___launch,
-                containerId,
-                taskInfo,
-                executorInfo,
-                directory,
-                slaveId,
-                slavePid,
-                checkpoint))
+    .then(defer(self(), &Self::___launch, containerId))
     .onFailed(defer(self(), &Self::destroy, containerId, true));
 }
 
@@ -916,31 +924,27 @@ Future<Nothing> DockerContainerizerProcess::__launch(
 
 
 Future<bool> DockerContainerizerProcess::___launch(
-    const ContainerID& containerId,
-    const TaskInfo& taskInfo,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
+    const ContainerID& containerId)
 {
   // After we do Docker::run we shouldn't remove a container until
   // after we set 'status', which we do in this function.
   CHECK(containers_.contains(containerId));
 
+  Container* container = containers_[containerId];
+
   // Prepare environment variables for the executor.
-  map<string, string> env = executorEnvironment(
-      executorInfo,
-      directory,
-      slaveId,
-      slavePid,
-      checkpoint,
+  map<string, string> environment = executorEnvironment(
+      container->executor,
+      container->directory,
+      container->slaveId,
+      container->slavePid,
+      container->checkpoint,
       flags.recovery_timeout);
 
   // Include any enviroment variables from ExecutorInfo.
   foreach (const Environment::Variable& variable,
-           executorInfo.command().environment().variables()) {
-    env[variable.name()] = variable.value();
+           container->executor.command().environment().variables()) {
+    environment[variable.name()] = variable.value();
   }
 
   // Construct the mesos-executor "override" to do a 'docker wait'
@@ -949,46 +953,30 @@ Future<bool> DockerContainerizerProcess::___launch(
   // don't want the exit status from 'docker wait' but rather the exit
   // status from the container, hence the use of /bin/bash.
   string override =
-    "/bin/sh -c 'exit `" +
-    flags.docker + " wait " + containers_[containerId]->name() + "`'";
+    "/bin/sh -c 'exit `" + flags.docker + " wait " + container->name() + "`'";
 
   Try<Subprocess> s = subprocess(
-      executorInfo.command().value() + " --override " + override,
+      container->executor.command().value() + " --override " + override,
       Subprocess::PIPE(),
-      Subprocess::PATH(path::join(directory, "stdout")),
-      Subprocess::PATH(path::join(directory, "stderr")),
-      env,
-      lambda::bind(&setup, directory));
+      Subprocess::PATH(path::join(container->directory, "stdout")),
+      Subprocess::PATH(path::join(container->directory, "stderr")),
+      environment,
+      lambda::bind(&setup, container->directory));
 
   if (s.isError()) {
     return Failure("Failed to fork executor: " + s.error());
   }
 
-  // Checkpoint the executor's pid if requested.
-  if (checkpoint) {
-    const string& path = slave::paths::getForkedPidPath(
-        slave::paths::getMetaRootDir(flags.work_dir),
-        slaveId,
-        executorInfo.framework_id(),
-        executorInfo.executor_id(),
-        containerId);
-
-    LOG(INFO) << "Checkpointing executor's forked pid "
-              << s.get().pid() << " to '" << path <<  "'";
+  // Checkpoint the executor's pid (if necessary).
+  Try<Nothing> checkpointed = checkpoint(containerId, s.get().pid());
 
-    Try<Nothing> checkpointed =
-      slave::state::checkpoint(path, stringify(s.get().pid()));
-
-    if (checkpointed.isError()) {
-      LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
-                 << path << "': " << checkpointed.error();
-
-      // Close the subprocess's stdin so that it aborts.
-      CHECK_SOME(s.get().in());
-      os::close(s.get().in().get());
+  if (checkpointed.isError()) {
+    // Close the subprocess's stdin so that it aborts.
+    CHECK_SOME(s.get().in());
+    os::close(s.get().in().get());
 
-      return Failure("Could not checkpoint executor's pid");
-    }
+    return Failure(
+	"Failed to checkpoint executor's pid: " + checkpointed.error());
   }
 
   // Checkpoing complete, now synchronize with the process so that it
@@ -1006,16 +994,17 @@ Future<bool> DockerContainerizerProcess::___launch(
   }
 
   // Store the resources for usage().
-  containers_[containerId]->resources = taskInfo.resources();
+  CHECK_SOME(container->task);
+  container->resources = container->task.get().resources();
 
   // And finally watch for when the executor gets reaped.
-  containers_[containerId]->status.set(process::reap(s.get().pid()));
+  container->status.set(process::reap(s.get().pid()));
 
-  containers_[containerId]->status.future().get()
+  container->status.future().get()
     .onAny(defer(self(), &Self::reaped, containerId));
 
   // TODO(benh): Check failure of Docker::logs.
-  docker.logs(containers_[containerId]->name(), directory);
+  docker.logs(container->name(), container->directory);
 
   return true;
 }
@@ -1070,48 +1059,24 @@ Future<bool> DockerContainerizerProcess::launch(
   return fetch(containerId, executorInfo.command(), directory)
     .then(defer(self(), &Self::_launch, containerId, directory))
     .then(defer(self(), &Self::__launch, containerId, directory))
-    .then(defer(self(),
-                &Self::___launch,
-                containerId,
-                executorInfo,
-                directory,
-                slaveId,
-                slavePid,
-                checkpoint))
+    .then(defer(self(), &Self::____launch, containerId))
     .onFailed(defer(self(), &Self::destroy, containerId, true));
 }
 
 
-Future<bool> DockerContainerizerProcess::___launch(
-    const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint)
+Future<bool> DockerContainerizerProcess::____launch(
+    const ContainerID& containerId)
 {
   // We shouldn't remove container until we set 'status'.
   CHECK(containers_.contains(containerId));
+
   return docker.inspect(containers_[containerId]->name())
-     .then(defer(self(),
-                &Self::____launch,
-                containerId,
-                executorInfo,
-                directory,
-                slaveId,
-                slavePid,
-                checkpoint,
-                lambda::_1));
+     .then(defer(self(), &Self::_____launch, containerId, lambda::_1));
 }
 
 
-Future<bool> DockerContainerizerProcess::____launch(
+Future<bool> DockerContainerizerProcess::_____launch(
     const ContainerID& containerId,
-    const ExecutorInfo& executorInfo,
-    const string& directory,
-    const SlaveID& slaveId,
-    const PID<Slave>& slavePid,
-    bool checkpoint,
     const Docker::Container& container)
 {
   // After we do Docker::run we shouldn't remove a container until
@@ -1124,34 +1089,21 @@ Future<bool> DockerContainerizerProcess::____launch(
     return Failure("Unable to get executor pid after launch");
   }
 
-  if (checkpoint) {
-    // TODO(tnachen): We might not be able to checkpoint if the slave
-    // dies before it can checkpoint while the executor is still
-    // running. Optinally we can consider recording the slave id and
-    // executor id as part of the docker container name so we can
-    // recover from this.
-    const string& path =
-      slave::paths::getForkedPidPath(
-          slave::paths::getMetaRootDir(flags.work_dir),
-          slaveId,
-          executorInfo.framework_id(),
-          executorInfo.executor_id(),
-          containerId);
+  // TODO(tnachen): We might not be able to checkpoint if the slave
+  // dies before it can checkpoint while the executor is still
+  // running. Optinally we can consider recording the slave id and
+  // executor id as part of the docker container name so we can
+  // recover from this.
 
-    LOG(INFO) << "Checkpointing executor's forked pid "
-              << pid.get() << " to '" << path <<  "'";
+  Try<Nothing> checkpointed = checkpoint(containerId, pid.get());
 
-    Try<Nothing> checkpointed =
-      slave::state::checkpoint(path, stringify(pid.get()));
-
-    if (checkpointed.isError()) {
-      return Failure("Failed to checkpoint executor's forked pid to '"
-                     + path + "': " + checkpointed.error());
-    }
+  if (checkpointed.isError()) {
+    return Failure(
+	"Failed to checkpoint executor's pid: " + checkpointed.error());
   }
 
   // Store the resources for usage().
-  containers_[containerId]->resources = executorInfo.resources();
+  containers_[containerId]->resources = containers_[containerId]->executor.resources();
 
   // And finally watch for when the container gets reaped.
   containers_[containerId]->status.set(process::reap(pid.get()));
@@ -1160,7 +1112,7 @@ Future<bool> DockerContainerizerProcess::____launch(
     .onAny(defer(self(), &Self::reaped, containerId));
 
   // TODO(benh): Check failure of Docker::logs.
-  docker.logs(containers_[containerId]->name(), directory);
+  docker.logs(containers_[containerId]->name(), containers_[containerId]->directory);
 
   return true;
 }