You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/09/17 18:45:31 UTC
git commit: Add Docker pull to docker abstraction.
Repository: mesos
Updated Branches:
refs/heads/master cc9fd8124 -> a47398bec
Add Docker pull to docker abstraction.
Review: https://reviews.apache.org/r/25523
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a47398be
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a47398be
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a47398be
Branch: refs/heads/master
Commit: a47398bec4d1e24226785571eee8ee8114cd445e
Parents: cc9fd81
Author: Timothy Chen <tn...@apache.org>
Authored: Wed Sep 17 09:09:14 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Sep 17 09:45:23 2014 -0700
----------------------------------------------------------------------
src/docker/docker.cpp | 195 ++++++++++++++++++++++++++++++++
src/docker/docker.hpp | 34 ++++++
src/slave/containerizer/docker.cpp | 110 ++++--------------
src/tests/docker_tests.cpp | 36 +++++-
4 files changed, 283 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a47398be/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index 1dd3dd1..6063114 100644
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -232,6 +232,45 @@ Try<Docker::Container> Docker::Container::create(const JSON::Object& json)
}
+Try<Docker::Image> Docker::Image::create(const JSON::Object& json)
+{
+ Result<JSON::Value> entrypoint =
+ json.find<JSON::Value>("ContainerConfig.Entrypoint");
+
+ if (entrypoint.isError()) {
+ return Error("Failed to find 'ContainerConfig.Entrypoint': " +
+ entrypoint.error());
+
+ } else if (entrypoint.isNone()) {
+ return Error("Unable to find 'ContainerConfig.Entrypoint'");
+ }
+
+ if (entrypoint.get().is<JSON::Null>()) {
+ return Docker::Image(None());
+ }
+
+ if (!entrypoint.get().is<JSON::Array>()) {
+ return Error("Unexpected type found for 'ContainerConfig.Entrypoint'");
+ }
+
+ const list<JSON::Value>& values = entrypoint.get().as<JSON::Array>().values;
+ if (values.size() == 0) {
+ return Docker::Image(None());
+ }
+
+ vector<string> result;
+
+ foreach (const JSON::Value& value, values) {
+ if (!value.is<JSON::String>()) {
+ return Error("Expecting 'ContainerConfig.EntryPoint' array of strings");
+ }
+ result.push_back(value.as<JSON::String>().value);
+ }
+
+ return Docker::Image(result);
+}
+
+
Future<Nothing> Docker::run(
const ContainerInfo& containerInfo,
const CommandInfo& commandInfo,
@@ -698,3 +737,159 @@ Future<list<Docker::Container> > Docker::__ps(
return collect(futures);
}
+
+
+Future<Docker::Image> Docker::pull(
+ const string& directory,
+ const string& image)
+{
+ vector<string> argv;
+
+ string dockerImage = image;
+
+ // Check if the specified image has a tag. Also split on "/" in case
+ // the user specified a registry server (ie: localhost:5000/image)
+ // to get the actual image name. If no tag was given we add a
+ // 'latest' tag to avoid pulling down the repository.
+
+ vector<string> parts = strings::split(image, "/");
+
+ if (!strings::contains(parts.back(), ":")) {
+ dockerImage += ":latest";
+ }
+
+ argv.push_back(path);
+ argv.push_back("inspect");
+ argv.push_back(dockerImage);
+
+ string cmd = strings::join(" ", argv);
+
+ VLOG(1) << "Running " << cmd;
+
+ Try<Subprocess> s = subprocess(
+ path,
+ argv,
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ None());
+
+ if (s.isError()) {
+ return Failure("Failed to execute '" + cmd + "': " + s.error());
+ }
+
+ // We assume docker inspect to exit quickly and do not need to be
+ // discarded.
+ return s.get().status()
+ .then(lambda::bind(
+ &Docker::_pull,
+ s.get(),
+ directory,
+ dockerImage,
+ path));
+}
+
+
+Future<Docker::Image> Docker::_pull(
+ const Subprocess& s,
+ const string& directory,
+ const string& image,
+ const string& path)
+{
+ Option<int> status = s.status().get();
+ if (status.isSome() && status.get() == 0) {
+ return io::read(s.out().get())
+ .then(lambda::bind(&Docker::___pull, lambda::_1));
+ }
+
+ vector<string> argv;
+ argv.push_back(path);
+ argv.push_back("pull");
+ argv.push_back(image);
+
+ string cmd = strings::join(" ", argv);
+
+ VLOG(1) << "Running " << cmd;
+
+ // Set HOME variable to pick up .dockercfg.
+ map<string, string> environment;
+
+ environment["HOME"] = directory;
+
+ Try<Subprocess> s_ = subprocess(
+ path,
+ argv,
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ None(),
+ environment);
+
+ if (s_.isError()) {
+ return Failure("Failed to execute '" + cmd + "': " + s_.error());
+ }
+
+ // Docker pull can run for a long time due to large images, so
+ // we allow the future to be discarded and it will kill the pull
+ // process.
+ return s_.get().status()
+ .then(lambda::bind(&Docker::__pull, s_.get(), cmd))
+ .onDiscard(lambda::bind(&Docker::pullDiscarded, s_.get(), cmd));
+}
+
+
+void Docker::pullDiscarded(const Subprocess& s, const string& cmd)
+{
+ VLOG(1) << "'" << cmd << "' is being discarded";
+ os::killtree(s.pid(), SIGKILL);
+}
+
+
+Future<Docker::Image> Docker::__pull(
+ const Subprocess& s,
+ const string& cmd)
+{
+ Option<int> status = s.status().get();
+
+ if (!status.isSome()) {
+ return Failure("No status found from '" + cmd + "'");
+ } else if (status.get() != 0) {
+ return io::read(s.err().get())
+ .then(lambda::bind(&failure<Image>, cmd, status.get(), lambda::_1));
+ }
+
+ return io::read(s.out().get())
+ .then(lambda::bind(&Docker::___pull, lambda::_1));
+}
+
+
+Future<Docker::Image> Docker::___pull(
+ const string& output)
+{
+ Try<JSON::Array> parse = JSON::parse<JSON::Array>(output);
+
+ if (parse.isError()) {
+ return Failure("Failed to parse JSON: " + parse.error());
+ }
+
+ JSON::Array array = parse.get();
+
+ // Only return if only one image identified with name.
+ if (array.values.size() == 1) {
+ CHECK(array.values.front().is<JSON::Object>());
+
+ Try<Docker::Image> image =
+ Docker::Image::create(array.values.front().as<JSON::Object>());
+
+ if (image.isError()) {
+ return Failure("Unable to create image: " + image.error());
+ }
+
+ return image.get();
+ }
+
+ // TODO(tnachen): Handle the case where the short image ID was
+ // not sufficiently unique and 'array.values.size() > 1'.
+
+ return Failure("Failed to find image");
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a47398be/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index e7adedb..443db49 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -65,6 +65,19 @@ public:
: id(_id), name(_name), pid(_pid) {}
};
+ class Image
+ {
+ public:
+ static Try<Image> create(const JSON::Object& json);
+
+ Option<std::vector<std::string> > entrypoint;
+
+ private:
+ Image(const Option<std::vector<std::string> >& _entrypoint)
+ : entrypoint(_entrypoint) {}
+ };
+
+
// Performs 'docker run IMAGE'.
process::Future<Nothing> run(
const mesos::ContainerInfo& containerInfo,
@@ -104,6 +117,10 @@ public:
const std::string& container,
const std::string& directory);
+ process::Future<Image> pull(
+ const std::string& directory,
+ const std::string& image);
+
private:
// Uses the specified path to the Docker CLI tool.
Docker(const std::string& _path) : path(_path) {};
@@ -133,6 +150,23 @@ private:
const Option<std::string>& prefix,
const std::string& output);
+ static process::Future<Image> _pull(
+ const process::Subprocess& s,
+ const std::string& directory,
+ const std::string& image,
+ const std::string& path);
+
+ static process::Future<Image> __pull(
+ const process::Subprocess& s,
+ const std::string& cmd);
+
+ static process::Future<Image> ___pull(
+ const std::string& output);
+
+ static void pullDiscarded(
+ const process::Subprocess& s,
+ const std::string& cmd);
+
const std::string path;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/a47398be/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 0febbac..9a29489 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -135,14 +135,9 @@ private:
process::Future<Nothing> pull(
const ContainerID& containerId,
const std::string& directory,
- const ContainerInfo::DockerInfo& dockerInfo);
+ const std::string& image);
- process::Future<Nothing> _pull(
- const Subprocess& s);
-
- process::Future<Nothing> __pull(
- const Subprocess& s,
- const string& output);
+ process::Future<Nothing> _pull(const std::string& image);
process::Future<Nothing> _recover(
const std::list<Docker::Container>& containers);
@@ -273,10 +268,10 @@ private:
// state anymore, although it doesn't hurt since it gives us
// better error messages.
enum State {
- FETCHING,
- PULLING,
- RUNNING,
- DESTROYING
+ FETCHING = 1,
+ PULLING = 2,
+ RUNNING = 3,
+ DESTROYING = 4
} state;
ContainerID id;
@@ -302,9 +297,9 @@ private:
// are fetching.
Option<Subprocess> fetcher;
- // The docker pull subprocess is stored so we can killtree the
- // pid when destroy is called while docker is pulling the image.
- Option<Subprocess> pull;
+ // The docker pull future is stored so we can discard when
+ // destroy is called while docker is pulling the image.
+ Future<Docker::Image> pull;
// Once the container is running, this saves the pid of the
// running container.
@@ -430,86 +425,21 @@ Future<Nothing> DockerContainerizerProcess::_fetch(
}
-// TODO(benh): Move this into Docker::pull after we've correctly made
-// the futures returned from Docker::* functions be discardable.
Future<Nothing> DockerContainerizerProcess::pull(
const ContainerID& containerId,
const string& directory,
- const ContainerInfo::DockerInfo& dockerInfo)
-{
- vector<string> argv;
- argv.push_back(flags.docker);
- argv.push_back("pull");
-
- // Check if the specified image has a tag. Also split on "/" in case
- // the user specified a registry server (ie: localhost:5000/image)
- // to get the actual image name. If no tag was given we add a
- // 'latest' tag to avoid pulling down the repository.
- vector<string> parts = strings::split(dockerInfo.image(), "/");
- if (strings::contains(parts.back(), ":")) {
- argv.push_back(dockerInfo.image());
- } else {
- argv.push_back(dockerInfo.image() + ":latest");
- }
-
- VLOG(1) << "Running " << strings::join(" ", argv);
-
- map<string, string> environment;
- environment["HOME"] = directory;
-
- Try<Subprocess> s = subprocess(
- flags.docker,
- argv,
- Subprocess::PATH("/dev/null"),
- Subprocess::PATH("/dev/null"),
- Subprocess::PIPE(),
- None(),
- environment);
-
- if (s.isError()) {
- return Failure("Failed to execute 'docker pull': " + s.error());
- }
-
- containers_[containerId]->pull = s.get();
-
- return s.get().status()
- .then(defer(self(), &Self::_pull, s.get()));
-}
-
-
-Future<Nothing> DockerContainerizerProcess::_pull(
- const Subprocess& s)
+ const string& image)
{
- CHECK_READY(s.status());
-
- Option<int> status = s.status().get();
-
- if (status.isSome() && status.get() == 0) {
- return Nothing();
- }
-
- CHECK_SOME(s.err());
- return io::read(s.err().get())
- .then(defer(self(), &Self::__pull, s, lambda::_1));
+ Future<Docker::Image> future = docker.pull(directory, image);
+ containers_[containerId]->pull = future;
+ return future.then(defer(self(), &Self::_pull, image));
}
-Future<Nothing> DockerContainerizerProcess::__pull(
- const Subprocess& s,
- const string& output)
+Future<Nothing> DockerContainerizerProcess::_pull(const string& image)
{
- CHECK_READY(s.status());
-
- Option<int> status = s.status().get();
-
- if (status.isNone()) {
- return Failure("No exit status available from 'docker pull': \n" + output);
- }
-
- CHECK_NE(0, status.get());
-
- return Failure("Failed to execute 'docker pull', exited with status (" +
- WSTRINGIFY(status.get()) + "): \n" + output);
+ VLOG(1) << "Docker pull " << image << " completed";
+ return Nothing();
}
@@ -849,7 +779,7 @@ Future<bool> DockerContainerizerProcess::_launch(
containers_[containerId]->state = Container::PULLING;
- return pull(containerId, directory, taskInfo.container().docker())
+ return pull(containerId, directory, taskInfo.container().docker().image())
.then(defer(self(),
&Self::__launch,
containerId,
@@ -1087,7 +1017,7 @@ Future<bool> DockerContainerizerProcess::_launch(
containers_[containerId]->state = Container::PULLING;
- return pull(containerId, directory, executorInfo.container().docker())
+ return pull(containerId, directory, executorInfo.container().docker().image())
.then(defer(self(),
&Self::__launch,
containerId,
@@ -1587,9 +1517,7 @@ void DockerContainerizerProcess::destroy(
LOG(INFO) << "Destroying Container '"
<< containerId << "' in PULLING state";
- if (container->pull.isSome()) {
- os::killtree(container->pull.get().pid(), SIGKILL);
- }
+ container->pull.discard();
containerizer::Termination termination;
termination.set_killed(killed);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a47398be/src/tests/docker_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_tests.cpp b/src/tests/docker_tests.cpp
index e6c228a..04139af 100644
--- a/src/tests/docker_tests.cpp
+++ b/src/tests/docker_tests.cpp
@@ -20,6 +20,8 @@
#include <process/future.hpp>
#include <process/gtest.hpp>
+#include <process/owned.hpp>
+#include <process/subprocess.hpp>
#include <stout/option.hpp>
#include <stout/gtest.hpp>
@@ -260,7 +262,7 @@ TEST(DockerTest, ROOT_DOCKER_CheckPortResource)
resources);
// Port should be out side of the provided ranges.
- AWAIT_EXPECTED_FAILED(run);
+ AWAIT_EXPECT_FAILED(run);
resources = Resources::parse("ports:[9998-9999];ports:[10000-11000]").get();
@@ -280,3 +282,35 @@ TEST(DockerTest, ROOT_DOCKER_CheckPortResource)
Future<Nothing> status = docker.rm(containerName, true);
AWAIT_READY(status);
}
+
+
+TEST(DockerTest, ROOT_DOCKER_CancelPull)
+{
+ // Delete the test image if it exists.
+
+ Try<Subprocess> s = process::subprocess(
+ tests::flags.docker + " rmi lingmann/1gb",
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PATH("/dev/null"));
+
+ ASSERT_SOME(s);
+
+ AWAIT_READY_FOR(s.get().status(), Seconds(30));
+
+ Docker docker = Docker::create(tests::flags.docker, false).get();
+
+ Try<string> directory = environment->mkdtemp();
+
+ CHECK_SOME(directory) << "Failed to create temporary directory";
+
+ // Assume that pulling the very large image 'lingmann/1gb' will take
+ // sufficiently long that we can start it and discard (i.e., cancel
+ // it) right away and the future will indeed get discarded.
+ Future<Docker::Image> future =
+ docker.pull(directory.get(), "lingmann/1gb");
+
+ future.discard();
+
+ AWAIT_DISCARDED(future);
+}