You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/10/31 23:06:08 UTC
[09/12] git commit: Even more simplification of launch paths in
Docker containerizer.
Even more simplification of launch paths in Docker containerizer.
Review: https://reviews.apache.org/r/26615
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ce1fc54
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ce1fc54
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ce1fc54
Branch: refs/heads/master
Commit: 9ce1fc5486b341782b01698486b3a3d527c57c3c
Parents: 4cbcac4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Oct 11 14:42:20 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Oct 31 15:05:39 2014 -0700
----------------------------------------------------------------------
src/slave/containerizer/docker.cpp | 218 +++++++++++++-------------------
1 file changed, 85 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9ce1fc54/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index b211db0..73e3c40 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -139,6 +139,10 @@ private:
process::Future<Nothing> _pull(const std::string& image);
+ Try<Nothing> checkpoint(
+ const ContainerID& containerId,
+ pid_t pid);
+
process::Future<Nothing> _recover(
const std::list<Docker::Container>& containers);
@@ -151,29 +155,13 @@ private:
const std::string& directory);
process::Future<bool> ___launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint);
-
- process::Future<bool> ___launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint);
+ const ContainerID& containerId);
process::Future<bool> ____launch(
+ const ContainerID& containerId);
+
+ process::Future<bool> _____launch(
const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint,
const Docker::Container& container);
void _destroy(
@@ -357,7 +345,9 @@ private:
Future<Nothing> run;
// We keep track of the resources for each container so we can set
- // the ResourceStatistics limits in usage().
+ // the ResourceStatistics limits in usage(). Note that this is
+ // different than just what we might get from TaskInfo::resources
+ // or ExecutorInfo::resources because they can change dynamically.
Resources resources;
// The mesos-fetcher subprocess, kept around so that we can do a
@@ -558,6 +548,32 @@ Future<Nothing> DockerContainerizerProcess::_pull(const string& image)
}
+Try<Nothing> DockerContainerizerProcess::checkpoint(
+ const ContainerID& containerId,
+ pid_t pid)
+{
+ CHECK(containers_.contains(containerId));
+
+ Container* container = containers_[containerId];
+
+ if (container->checkpoint) {
+ const string& path =
+ slave::paths::getForkedPidPath(
+ slave::paths::getMetaRootDir(flags.work_dir),
+ container->slaveId,
+ container->executor.framework_id(),
+ container->executor.executor_id(),
+ containerId);
+
+ LOG(INFO) << "Checkpointing pid " << pid << " to '" << path << "'";
+
+ return slave::state::checkpoint(path, stringify(pid));
+ }
+
+ return Nothing();
+}
+
+
Future<Nothing> DockerContainerizer::recover(
const Option<SlaveState>& state)
{
@@ -852,15 +868,7 @@ Future<bool> DockerContainerizerProcess::launch(
return fetch(containerId, taskInfo.command(), directory)
.then(defer(self(), &Self::_launch, containerId, directory))
.then(defer(self(), &Self::__launch, containerId, directory))
- .then(defer(self(),
- &Self::___launch,
- containerId,
- taskInfo,
- executorInfo,
- directory,
- slaveId,
- slavePid,
- checkpoint))
+ .then(defer(self(), &Self::___launch, containerId))
.onFailed(defer(self(), &Self::destroy, containerId, true));
}
@@ -916,31 +924,27 @@ Future<Nothing> DockerContainerizerProcess::__launch(
Future<bool> DockerContainerizerProcess::___launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
+ const ContainerID& containerId)
{
// After we do Docker::run we shouldn't remove a container until
// after we set 'status', which we do in this function.
CHECK(containers_.contains(containerId));
+ Container* container = containers_[containerId];
+
// Prepare environment variables for the executor.
- map<string, string> env = executorEnvironment(
- executorInfo,
- directory,
- slaveId,
- slavePid,
- checkpoint,
+ map<string, string> environment = executorEnvironment(
+ container->executor,
+ container->directory,
+ container->slaveId,
+ container->slavePid,
+ container->checkpoint,
flags.recovery_timeout);
// Include any enviroment variables from ExecutorInfo.
foreach (const Environment::Variable& variable,
- executorInfo.command().environment().variables()) {
- env[variable.name()] = variable.value();
+ container->executor.command().environment().variables()) {
+ environment[variable.name()] = variable.value();
}
// Construct the mesos-executor "override" to do a 'docker wait'
@@ -949,46 +953,30 @@ Future<bool> DockerContainerizerProcess::___launch(
// don't want the exit status from 'docker wait' but rather the exit
// status from the container, hence the use of /bin/bash.
string override =
- "/bin/sh -c 'exit `" +
- flags.docker + " wait " + containers_[containerId]->name() + "`'";
+ "/bin/sh -c 'exit `" + flags.docker + " wait " + container->name() + "`'";
Try<Subprocess> s = subprocess(
- executorInfo.command().value() + " --override " + override,
+ container->executor.command().value() + " --override " + override,
Subprocess::PIPE(),
- Subprocess::PATH(path::join(directory, "stdout")),
- Subprocess::PATH(path::join(directory, "stderr")),
- env,
- lambda::bind(&setup, directory));
+ Subprocess::PATH(path::join(container->directory, "stdout")),
+ Subprocess::PATH(path::join(container->directory, "stderr")),
+ environment,
+ lambda::bind(&setup, container->directory));
if (s.isError()) {
return Failure("Failed to fork executor: " + s.error());
}
- // Checkpoint the executor's pid if requested.
- if (checkpoint) {
- const string& path = slave::paths::getForkedPidPath(
- slave::paths::getMetaRootDir(flags.work_dir),
- slaveId,
- executorInfo.framework_id(),
- executorInfo.executor_id(),
- containerId);
-
- LOG(INFO) << "Checkpointing executor's forked pid "
- << s.get().pid() << " to '" << path << "'";
+ // Checkpoint the executor's pid (if necessary).
+ Try<Nothing> checkpointed = checkpoint(containerId, s.get().pid());
- Try<Nothing> checkpointed =
- slave::state::checkpoint(path, stringify(s.get().pid()));
-
- if (checkpointed.isError()) {
- LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
- << path << "': " << checkpointed.error();
-
- // Close the subprocess's stdin so that it aborts.
- CHECK_SOME(s.get().in());
- os::close(s.get().in().get());
+ if (checkpointed.isError()) {
+ // Close the subprocess's stdin so that it aborts.
+ CHECK_SOME(s.get().in());
+ os::close(s.get().in().get());
- return Failure("Could not checkpoint executor's pid");
- }
+ return Failure(
+ "Failed to checkpoint executor's pid: " + checkpointed.error());
}
// Checkpoing complete, now synchronize with the process so that it
@@ -1006,16 +994,17 @@ Future<bool> DockerContainerizerProcess::___launch(
}
// Store the resources for usage().
- containers_[containerId]->resources = taskInfo.resources();
+ CHECK_SOME(container->task);
+ container->resources = container->task.get().resources();
// And finally watch for when the executor gets reaped.
- containers_[containerId]->status.set(process::reap(s.get().pid()));
+ container->status.set(process::reap(s.get().pid()));
- containers_[containerId]->status.future().get()
+ container->status.future().get()
.onAny(defer(self(), &Self::reaped, containerId));
// TODO(benh): Check failure of Docker::logs.
- docker.logs(containers_[containerId]->name(), directory);
+ docker.logs(container->name(), container->directory);
return true;
}
@@ -1070,48 +1059,24 @@ Future<bool> DockerContainerizerProcess::launch(
return fetch(containerId, executorInfo.command(), directory)
.then(defer(self(), &Self::_launch, containerId, directory))
.then(defer(self(), &Self::__launch, containerId, directory))
- .then(defer(self(),
- &Self::___launch,
- containerId,
- executorInfo,
- directory,
- slaveId,
- slavePid,
- checkpoint))
+ .then(defer(self(), &Self::____launch, containerId))
.onFailed(defer(self(), &Self::destroy, containerId, true));
}
-Future<bool> DockerContainerizerProcess::___launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint)
+Future<bool> DockerContainerizerProcess::____launch(
+ const ContainerID& containerId)
{
// We shouldn't remove container until we set 'status'.
CHECK(containers_.contains(containerId));
+
return docker.inspect(containers_[containerId]->name())
- .then(defer(self(),
- &Self::____launch,
- containerId,
- executorInfo,
- directory,
- slaveId,
- slavePid,
- checkpoint,
- lambda::_1));
+ .then(defer(self(), &Self::_____launch, containerId, lambda::_1));
}
-Future<bool> DockerContainerizerProcess::____launch(
+Future<bool> DockerContainerizerProcess::_____launch(
const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const string& directory,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint,
const Docker::Container& container)
{
// After we do Docker::run we shouldn't remove a container until
@@ -1124,34 +1089,21 @@ Future<bool> DockerContainerizerProcess::____launch(
return Failure("Unable to get executor pid after launch");
}
- if (checkpoint) {
- // TODO(tnachen): We might not be able to checkpoint if the slave
- // dies before it can checkpoint while the executor is still
- // running. Optinally we can consider recording the slave id and
- // executor id as part of the docker container name so we can
- // recover from this.
- const string& path =
- slave::paths::getForkedPidPath(
- slave::paths::getMetaRootDir(flags.work_dir),
- slaveId,
- executorInfo.framework_id(),
- executorInfo.executor_id(),
- containerId);
+ // TODO(tnachen): We might not be able to checkpoint if the slave
+ // dies before it can checkpoint while the executor is still
+ // running. Optinally we can consider recording the slave id and
+ // executor id as part of the docker container name so we can
+ // recover from this.
- LOG(INFO) << "Checkpointing executor's forked pid "
- << pid.get() << " to '" << path << "'";
+ Try<Nothing> checkpointed = checkpoint(containerId, pid.get());
- Try<Nothing> checkpointed =
- slave::state::checkpoint(path, stringify(pid.get()));
-
- if (checkpointed.isError()) {
- return Failure("Failed to checkpoint executor's forked pid to '"
- + path + "': " + checkpointed.error());
- }
+ if (checkpointed.isError()) {
+ return Failure(
+ "Failed to checkpoint executor's pid: " + checkpointed.error());
}
// Store the resources for usage().
- containers_[containerId]->resources = executorInfo.resources();
+ containers_[containerId]->resources = containers_[containerId]->executor.resources();
// And finally watch for when the container gets reaped.
containers_[containerId]->status.set(process::reap(pid.get()));
@@ -1160,7 +1112,7 @@ Future<bool> DockerContainerizerProcess::____launch(
.onAny(defer(self(), &Self::reaped, containerId));
// TODO(benh): Check failure of Docker::logs.
- docker.logs(containers_[containerId]->name(), directory);
+ docker.logs(containers_[containerId]->name(), containers_[containerId]->directory);
return true;
}