You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2014/11/10 08:27:20 UTC
mesos git commit: Add destroy tests for docker containerizer.
Repository: mesos
Updated Branches:
refs/heads/master 4a1bbf7d3 -> 3bfb136e9
Add destroy tests for docker containerizer.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bfb136e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bfb136e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bfb136e
Branch: refs/heads/master
Commit: 3bfb136e9dcc83715657cf7862000987fc0699b6
Parents: 4a1bbf7
Author: Timothy Chen <tn...@apache.org>
Authored: Mon Nov 3 13:59:34 2014 -0800
Committer: Timothy Chen <tn...@gmail.com>
Committed: Sun Nov 9 23:26:14 2014 -0800
----------------------------------------------------------------------
src/slave/containerizer/docker.cpp | 408 ++++----------------------
src/slave/containerizer/docker.hpp | 332 ++++++++++++++++++++-
src/tests/docker_containerizer_tests.cpp | 272 +++++++++++++++++
3 files changed, 654 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 37f422a..5978ec2 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -79,329 +79,6 @@ const string DOCKER_NAME_PREFIX = "mesos-";
// Declared in header, see explanation there.
const string DOCKER_SYMLINK_DIRECTORY = "docker/links";
-
-class DockerContainerizerProcess
- : public process::Process<DockerContainerizerProcess>
-{
-public:
- DockerContainerizerProcess(
- const Flags& _flags,
- Shared<Docker> _docker)
- : flags(_flags),
- docker(_docker) {}
-
- virtual process::Future<Nothing> recover(
- const Option<state::SlaveState>& state);
-
- virtual process::Future<bool> launch(
- const ContainerID& containerId,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint);
-
- virtual process::Future<bool> launch(
- const ContainerID& containerId,
- const TaskInfo& taskInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint);
-
- virtual process::Future<Nothing> update(
- const ContainerID& containerId,
- const Resources& resources);
-
- virtual process::Future<ResourceStatistics> usage(
- const ContainerID& containerId);
-
- virtual Future<containerizer::Termination> wait(
- const ContainerID& containerId);
-
- virtual void destroy(
- const ContainerID& containerId,
- bool killed = true); // process is either killed or reaped.
-
- virtual process::Future<hashset<ContainerID> > containers();
-
-private:
- // Continuations and helpers.
- process::Future<Nothing> fetch(const ContainerID& containerId);
-
- process::Future<Nothing> _fetch(
- const ContainerID& containerId,
- const Option<int>& status);
-
- process::Future<Nothing> pull(
- const ContainerID& containerId,
- const std::string& directory,
- const std::string& image);
-
- process::Future<Nothing> _pull(const std::string& image);
-
- Try<Nothing> checkpoint(
- const ContainerID& containerId,
- pid_t pid);
-
- process::Future<Nothing> _recover(
- const std::list<Docker::Container>& containers);
-
- process::Future<Nothing> _launch(
- const ContainerID& containerId);
-
- process::Future<Nothing> __launch(
- const ContainerID& containerId);
-
- // NOTE: This continuation is only applicable when launching a
- // container for a task.
- process::Future<pid_t> ___launch(
- const ContainerID& containerId);
-
- // NOTE: This continuation is only applicable when launching a
- // container for an executor.
- process::Future<Docker::Container> ____launch(
- const ContainerID& containerId);
-
- // NOTE: This continuation is only applicable when launching a
- // container for an executor.
- process::Future<pid_t> _____launch(
- const ContainerID& containerId,
- const Docker::Container& container);
-
- process::Future<bool> ______launch(
- const ContainerID& containerId,
- pid_t pid);
-
- void _destroy(
- const ContainerID& containerId,
- bool killed);
-
- void __destroy(
- const ContainerID& containerId,
- bool killed,
- const Future<Nothing>& future);
-
- void ___destroy(
- const ContainerID& containerId,
- bool killed,
- const Future<Option<int> >& status);
-
- process::Future<Nothing> _update(
- const ContainerID& containerId,
- const Resources& resources,
- const Docker::Container& container);
-
- process::Future<Nothing> __update(
- const ContainerID& containerId,
- const Resources& resources,
- pid_t pid);
-
- Future<ResourceStatistics> _usage(
- const ContainerID& containerId,
- const Docker::Container& container);
-
- Future<ResourceStatistics> __usage(
- const ContainerID& containerId,
- pid_t pid);
-
- // Call back for when the executor exits. This will trigger
- // container destroy.
- void reaped(const ContainerID& containerId);
-
- // Removes the docker container.
- void remove(const std::string& container);
-
- const Flags flags;
-
- Shared<Docker> docker;
-
- struct Container
- {
- static Try<Container*> create(
- const ContainerID& id,
- const Option<TaskInfo>& taskInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint,
- const Flags& flags);
-
- Container(const ContainerID& id)
- : state(FETCHING), id(id) {}
-
- Container(const ContainerID& id,
- const Option<TaskInfo>& taskInfo,
- const ExecutorInfo& executorInfo,
- const std::string& directory,
- const Option<std::string>& user,
- const SlaveID& slaveId,
- const PID<Slave>& slavePid,
- bool checkpoint,
- bool symlinked,
- const Flags& flags)
- : state(FETCHING),
- id(id),
- task(taskInfo),
- executor(executorInfo),
- directory(directory),
- user(user),
- slaveId(slaveId),
- slavePid(slavePid),
- checkpoint(checkpoint),
- symlinked(symlinked),
- flags(flags)
- {
- if (task.isSome()) {
- resources = task.get().resources();
- } else {
- resources = executor.resources();
- }
- }
-
- ~Container()
- {
- if (symlinked) {
- // The sandbox directory is a symlink, remove it at container
- // destroy.
- os::rm(directory);
- }
- }
-
- std::string name()
- {
- return DOCKER_NAME_PREFIX + stringify(id);
- }
-
- std::string image() const
- {
- if (task.isSome()) {
- return task.get().container().docker().image();
- }
-
- return executor.container().docker().image();
- }
-
- ContainerInfo container() const
- {
- if (task.isSome()) {
- return task.get().container();
- }
-
- return executor.container();
- }
-
- CommandInfo command() const
- {
- if (task.isSome()) {
- return task.get().command();
- }
-
- return executor.command();
- }
-
- // Returns any extra environment varaibles to set when launching
- // the Docker container (beyond the those found in CommandInfo).
- std::map<std::string, std::string> environment() const
- {
- if (task.isNone()) {
- return executorEnvironment(
- executor,
- directory,
- slaveId,
- slavePid,
- checkpoint,
- flags.recovery_timeout);
- }
-
- return std::map<std::string, std::string>();
- }
-
- // The DockerContainerier needs to be able to properly clean up
- // Docker containers, regardless of when they are destroyed. For
- // example, if a container gets destroyed while we are fetching,
- // we need to not keep running the fetch, nor should we try and
- // start the Docker container. For this reason, we've split out
- // the states into:
- //
- // FETCHING
- // PULLING
- // RUNNING
- // DESTROYING
- //
- // In particular, we made 'PULLING' be it's own state so that we
- // could easily destroy and cleanup when a user initiated pulling
- // a really big image but we timeout due to the executor
- // registration timeout. Since we curently have no way to discard
- // a Docker::run, we needed to explicitely do the pull (which is
- // the part that takes the longest) so that we can also explicitly
- // kill it when asked. Once the functions at Docker::* get support
- // for discarding, then we won't need to make pull be it's own
- // state anymore, although it doesn't hurt since it gives us
- // better error messages.
- enum State {
- FETCHING = 1,
- PULLING = 2,
- RUNNING = 3,
- DESTROYING = 4
- } state;
-
- ContainerID id;
- Option<TaskInfo> task;
- ExecutorInfo executor;
-
- // The sandbox directory for the container. This holds the
- // symlinked path if symlinked boolean is true.
- std::string directory;
-
- Option<std::string> user;
- SlaveID slaveId;
- PID<Slave> slavePid;
- bool checkpoint;
- bool symlinked;
- Flags flags;
-
- // Promise for future returned from wait().
- Promise<containerizer::Termination> termination;
-
- // Exit status of executor or container (depending on whether or
- // not we used the command executor). Represented as a promise so
- // that destroying can chain with it being set.
- Promise<Future<Option<int> > > status;
-
- // Future that tells us whether or not the run is still pending or
- // has failed so we know whether or not to wait for 'status'.
- Future<Nothing> run;
-
- // We keep track of the resources for each container so we can set
- // the ResourceStatistics limits in usage(). Note that this is
- // different than just what we might get from TaskInfo::resources
- // or ExecutorInfo::resources because they can change dynamically.
- Resources resources;
-
- // The mesos-fetcher subprocess, kept around so that we can do a
- // killtree on it if we're asked to destroy a container while we
- // are fetching.
- Option<Subprocess> fetcher;
-
- // The docker pull future is stored so we can discard when
- // destroy is called while docker is pulling the image.
- Future<Docker::Image> pull;
-
- // Once the container is running, this saves the pid of the
- // running container.
- Option<pid_t> pid;
- };
-
- hashmap<ContainerID, Container*> containers_;
-};
-
-
// Parse the ContainerID from a Docker container and return None if
// the container was not launched from Mesos.
Option<ContainerID> parse(const Docker::Container& container)
@@ -438,19 +115,26 @@ Try<DockerContainerizer*> DockerContainerizer::create(const Flags& flags)
DockerContainerizer::DockerContainerizer(
+ const Owned<DockerContainerizerProcess>& _process)
+ : process(_process)
+{
+ spawn(process.get());
+}
+
+
+DockerContainerizer::DockerContainerizer(
const Flags& flags,
Shared<Docker> docker)
+ : process(new DockerContainerizerProcess(flags, docker))
{
- process = new DockerContainerizerProcess(flags, docker);
- spawn(process);
+ spawn(process.get());
}
DockerContainerizer::~DockerContainerizer()
{
- terminate(process);
- process::wait(process);
- delete process;
+ terminate(process.get());
+ process::wait(process.get());
}
@@ -644,7 +328,7 @@ Try<Nothing> DockerContainerizerProcess::checkpoint(
Future<Nothing> DockerContainerizer::recover(
const Option<SlaveState>& state)
{
- return dispatch(process, &DockerContainerizerProcess::recover, state);
+ return dispatch(process.get(), &DockerContainerizerProcess::recover, state);
}
@@ -657,15 +341,16 @@ Future<bool> DockerContainerizer::launch(
const PID<Slave>& slavePid,
bool checkpoint)
{
- return dispatch(process,
- &DockerContainerizerProcess::launch,
- containerId,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint);
+ return dispatch(
+ process.get(),
+ &DockerContainerizerProcess::launch,
+ containerId,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
}
@@ -679,16 +364,17 @@ Future<bool> DockerContainerizer::launch(
const PID<Slave>& slavePid,
bool checkpoint)
{
- return dispatch(process,
- &DockerContainerizerProcess::launch,
- containerId,
- taskInfo,
- executorInfo,
- directory,
- user,
- slaveId,
- slavePid,
- checkpoint);
+ return dispatch(
+ process.get(),
+ &DockerContainerizerProcess::launch,
+ containerId,
+ taskInfo,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
}
@@ -696,36 +382,46 @@ Future<Nothing> DockerContainerizer::update(
const ContainerID& containerId,
const Resources& resources)
{
- return dispatch(process,
- &DockerContainerizerProcess::update,
- containerId,
- resources);
+ return dispatch(
+ process.get(),
+ &DockerContainerizerProcess::update,
+ containerId,
+ resources);
}
Future<ResourceStatistics> DockerContainerizer::usage(
const ContainerID& containerId)
{
- return dispatch(process, &DockerContainerizerProcess::usage, containerId);
+ return dispatch(
+ process.get(),
+ &DockerContainerizerProcess::usage,
+ containerId);
}
Future<containerizer::Termination> DockerContainerizer::wait(
const ContainerID& containerId)
{
- return dispatch(process, &DockerContainerizerProcess::wait, containerId);
+ return dispatch(
+ process.get(),
+ &DockerContainerizerProcess::wait,
+ containerId);
}
void DockerContainerizer::destroy(const ContainerID& containerId)
{
- dispatch(process, &DockerContainerizerProcess::destroy, containerId, true);
+ dispatch(
+ process.get(),
+ &DockerContainerizerProcess::destroy,
+ containerId, true);
}
Future<hashset<ContainerID> > DockerContainerizer::containers()
{
- return dispatch(process, &DockerContainerizerProcess::containers);
+ return dispatch(process.get(), &DockerContainerizerProcess::containers);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index ec6b9cd..f9f3ffb 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -56,6 +56,10 @@ public:
const Flags& flags,
process::Shared<Docker> docker);
+ // This is only public for tests.
+ DockerContainerizer(
+ const process::Owned<DockerContainerizerProcess>& _process);
+
virtual ~DockerContainerizer();
virtual process::Future<Nothing> recover(
@@ -92,12 +96,336 @@ public:
virtual void destroy(const ContainerID& containerId);
- virtual process::Future<hashset<ContainerID> > containers();
+ virtual process::Future<hashset<ContainerID>> containers();
private:
- DockerContainerizerProcess* process;
+ process::Owned<DockerContainerizerProcess> process;
};
+
+
+class DockerContainerizerProcess
+ : public process::Process<DockerContainerizerProcess>
+{
+public:
+ DockerContainerizerProcess(
+ const Flags& _flags,
+ process::Shared<Docker> _docker)
+ : flags(_flags),
+ docker(_docker) {}
+
+ virtual process::Future<Nothing> recover(
+ const Option<state::SlaveState>& state);
+
+ virtual process::Future<bool> launch(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ virtual process::Future<bool> launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint);
+
+ virtual process::Future<Nothing> update(
+ const ContainerID& containerId,
+ const Resources& resources);
+
+ virtual process::Future<ResourceStatistics> usage(
+ const ContainerID& containerId);
+
+ virtual Future<containerizer::Termination> wait(
+ const ContainerID& containerId);
+
+ virtual void destroy(
+ const ContainerID& containerId,
+ bool killed = true); // process is either killed or reaped.
+
+ virtual process::Future<Nothing> fetch(const ContainerID& containerId);
+
+ virtual process::Future<Nothing> pull(
+ const ContainerID& containerId,
+ const std::string& directory,
+ const std::string& image);
+
+ virtual process::Future<hashset<ContainerID>> containers();
+
+private:
+ // Continuations and helpers.
+ process::Future<Nothing> _fetch(
+ const ContainerID& containerId,
+ const Option<int>& status);
+
+ process::Future<Nothing> _pull(const std::string& image);
+
+ Try<Nothing> checkpoint(
+ const ContainerID& containerId,
+ pid_t pid);
+
+ process::Future<Nothing> _recover(
+ const std::list<Docker::Container>& containers);
+
+ process::Future<Nothing> _launch(
+ const ContainerID& containerId);
+
+ process::Future<Nothing> __launch(
+ const ContainerID& containerId);
+
+ // NOTE: This continuation is only applicable when launching a
+ // container for a task.
+ process::Future<pid_t> ___launch(
+ const ContainerID& containerId);
+
+ // NOTE: This continuation is only applicable when launching a
+ // container for an executor.
+ process::Future<Docker::Container> ____launch(
+ const ContainerID& containerId);
+
+ // NOTE: This continuation is only applicable when launching a
+ // container for an executor.
+ process::Future<pid_t> _____launch(
+ const ContainerID& containerId,
+ const Docker::Container& container);
+
+ process::Future<bool> ______launch(
+ const ContainerID& containerId,
+ pid_t pid);
+
+ void _destroy(
+ const ContainerID& containerId,
+ bool killed);
+
+ void __destroy(
+ const ContainerID& containerId,
+ bool killed,
+ const Future<Nothing>& future);
+
+ void ___destroy(
+ const ContainerID& containerId,
+ bool killed,
+ const Future<Option<int>>& status);
+
+ process::Future<Nothing> _update(
+ const ContainerID& containerId,
+ const Resources& resources,
+ const Docker::Container& container);
+
+ process::Future<Nothing> __update(
+ const ContainerID& containerId,
+ const Resources& resources,
+ pid_t pid);
+
+ Future<ResourceStatistics> _usage(
+ const ContainerID& containerId,
+ const Docker::Container& container);
+
+ Future<ResourceStatistics> __usage(
+ const ContainerID& containerId,
+ pid_t pid);
+
+ // Call back for when the executor exits. This will trigger
+ // container destroy.
+ void reaped(const ContainerID& containerId);
+
+ // Removes the docker container.
+ void remove(const std::string& container);
+
+ const Flags flags;
+
+ process::Shared<Docker> docker;
+
+ struct Container
+ {
+ static Try<Container*> create(
+ const ContainerID& id,
+ const Option<TaskInfo>& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint,
+ const Flags& flags);
+
+ Container(const ContainerID& id)
+ : state(FETCHING), id(id) {}
+
+ Container(const ContainerID& id,
+ const Option<TaskInfo>& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory,
+ const Option<std::string>& user,
+ const SlaveID& slaveId,
+ const process::PID<Slave>& slavePid,
+ bool checkpoint,
+ bool symlinked,
+ const Flags& flags)
+ : state(FETCHING),
+ id(id),
+ task(taskInfo),
+ executor(executorInfo),
+ directory(directory),
+ user(user),
+ slaveId(slaveId),
+ slavePid(slavePid),
+ checkpoint(checkpoint),
+ symlinked(symlinked),
+ flags(flags)
+ {
+ if (task.isSome()) {
+ resources = task.get().resources();
+ } else {
+ resources = executor.resources();
+ }
+ }
+
+ ~Container()
+ {
+ if (symlinked) {
+ // The sandbox directory is a symlink, remove it at container
+ // destroy.
+ os::rm(directory);
+ }
+ }
+
+ std::string name()
+ {
+ return DOCKER_NAME_PREFIX + stringify(id);
+ }
+
+ std::string image() const
+ {
+ if (task.isSome()) {
+ return task.get().container().docker().image();
+ }
+
+ return executor.container().docker().image();
+ }
+
+ ContainerInfo container() const
+ {
+ if (task.isSome()) {
+ return task.get().container();
+ }
+
+ return executor.container();
+ }
+
+ CommandInfo command() const
+ {
+ if (task.isSome()) {
+ return task.get().command();
+ }
+
+ return executor.command();
+ }
+
+ // Returns any extra environment varaibles to set when launching
+ // the Docker container (beyond the those found in CommandInfo).
+ std::map<std::string, std::string> environment() const
+ {
+ if (task.isNone()) {
+ return executorEnvironment(
+ executor,
+ directory,
+ slaveId,
+ slavePid,
+ checkpoint,
+ flags.recovery_timeout);
+ }
+
+ return std::map<std::string, std::string>();
+ }
+
+ // The DockerContainerier needs to be able to properly clean up
+ // Docker containers, regardless of when they are destroyed. For
+ // example, if a container gets destroyed while we are fetching,
+ // we need to not keep running the fetch, nor should we try and
+ // start the Docker container. For this reason, we've split out
+ // the states into:
+ //
+ // FETCHING
+ // PULLING
+ // RUNNING
+ // DESTROYING
+ //
+ // In particular, we made 'PULLING' be it's own state so that we
+ // could easily destroy and cleanup when a user initiated pulling
+ // a really big image but we timeout due to the executor
+ // registration timeout. Since we curently have no way to discard
+ // a Docker::run, we needed to explicitely do the pull (which is
+ // the part that takes the longest) so that we can also explicitly
+ // kill it when asked. Once the functions at Docker::* get support
+ // for discarding, then we won't need to make pull be it's own
+ // state anymore, although it doesn't hurt since it gives us
+ // better error messages.
+ enum State {
+ FETCHING = 1,
+ PULLING = 2,
+ RUNNING = 3,
+ DESTROYING = 4
+ } state;
+
+ ContainerID id;
+ Option<TaskInfo> task;
+ ExecutorInfo executor;
+
+ // The sandbox directory for the container. This holds the
+ // symlinked path if symlinked boolean is true.
+ std::string directory;
+
+ Option<std::string> user;
+ SlaveID slaveId;
+ process::PID<Slave> slavePid;
+ bool checkpoint;
+ bool symlinked;
+ Flags flags;
+
+ // Promise for future returned from wait().
+ Promise<containerizer::Termination> termination;
+
+ // Exit status of executor or container (depending on whether or
+ // not we used the command executor). Represented as a promise so
+ // that destroying can chain with it being set.
+ Promise<Future<Option<int>>> status;
+
+ // Future that tells us whether or not the run is still pending or
+ // has failed so we know whether or not to wait for 'status'.
+ Future<Nothing> run;
+
+ // We keep track of the resources for each container so we can set
+ // the ResourceStatistics limits in usage(). Note that this is
+ // different than just what we might get from TaskInfo::resources
+ // or ExecutorInfo::resources because they can change dynamically.
+ Resources resources;
+
+ // The mesos-fetcher subprocess, kept around so that we can do a
+ // killtree on it if we're asked to destroy a container while we
+ // are fetching.
+ Option<Subprocess> fetcher;
+
+ // The docker pull future is stored so we can discard when
+ // destroy is called while docker is pulling the image.
+ Future<Docker::Image> pull;
+
+ // Once the container is running, this saves the pid of the
+ // running container.
+ Option<pid_t> pid;
+ };
+
+ hashmap<ContainerID, Container*> containers_;
+};
+
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index 9d4ccc5..66552ad 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -21,6 +21,7 @@
#include <process/future.hpp>
#include <process/gmock.hpp>
+#include <process/owned.hpp>
#include <process/subprocess.hpp>
#include "linux/cgroups.hpp"
@@ -48,6 +49,7 @@ using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
using mesos::internal::slave::DockerContainerizer;
+using mesos::internal::slave::DockerContainerizerProcess;
using process::Future;
using process::Message;
@@ -59,6 +61,7 @@ using std::list;
using std::string;
using testing::_;
+using testing::DoAll;
using testing::DoDefault;
using testing::Eq;
using testing::Invoke;
@@ -164,6 +167,17 @@ public:
Shared<Docker> docker)
: DockerContainerizer(flags, docker)
{
+ initialize();
+ }
+
+ MockDockerContainerizer(const Owned<DockerContainerizerProcess>& process)
+ : DockerContainerizer(process)
+ {
+ initialize();
+ }
+
+ void initialize()
+ {
// NOTE: See TestContainerizer::setup for why we use
// 'EXPECT_CALL' and 'WillRepeatedly' here instead of
// 'ON_CALL' and 'WillByDefault'.
@@ -259,6 +273,50 @@ public:
};
+class MockDockerContainerizerProcess : public DockerContainerizerProcess
+{
+public:
+ MockDockerContainerizerProcess(
+ const slave::Flags& flags,
+ const Shared<Docker>& docker)
+ : DockerContainerizerProcess(flags, docker)
+ {
+ EXPECT_CALL(*this, fetch(_))
+ .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
+
+ EXPECT_CALL(*this, pull(_, _, _))
+ .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_pull));
+ }
+
+ MOCK_METHOD1(
+ fetch,
+ process::Future<Nothing>(const ContainerID& containerId));
+
+ MOCK_METHOD3(
+ pull,
+ process::Future<Nothing>(
+ const ContainerID& containerId,
+ const std::string& directory,
+ const std::string& image));
+
+ process::Future<Nothing> _fetch(const ContainerID& containerId)
+ {
+ return DockerContainerizerProcess::fetch(containerId);
+ }
+
+ process::Future<Nothing> _pull(
+ const ContainerID& containerId,
+ const std::string& directory,
+ const std::string& image)
+ {
+ return DockerContainerizerProcess::pull(
+ containerId,
+ directory,
+ image);
+ }
+};
+
+
// Only enable executor launch on linux as other platforms
// requires running linux VM and need special port forwarding
// to get host networking to work.
@@ -2276,3 +2334,217 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
Shutdown();
}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ // The docker containerizer will free the process, so we must
+ // allocate on the heap.
+ MockDockerContainerizerProcess* process =
+ new MockDockerContainerizerProcess(flags, docker);
+
+ MockDockerContainerizer dockerContainerizer(
+ (Owned<DockerContainerizerProcess>(process)));
+
+ Promise<Nothing> promise;
+ Future<Nothing> fetch;
+
+ // We want to pause the fetch call to simulate a long fetch time.
+ EXPECT_CALL(*process, fetch(_))
+ .WillOnce(DoAll(FutureSatisfy(&fetch),
+ Return(promise.future())));
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ Future<TaskStatus> statusFailed;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusFailed));
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+
+ AWAIT_READY(fetch);
+
+ dockerContainerizer.destroy(containerId.get());
+
+ // Resume docker launch.
+ promise.set(Nothing());
+
+ AWAIT_READY(statusFailed);
+
+ EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ // The docker containerizer will free the process, so we must
+ // allocate on the heap.
+ MockDockerContainerizerProcess* process =
+ new MockDockerContainerizerProcess(flags, docker);
+
+ MockDockerContainerizer dockerContainerizer(
+ (Owned<DockerContainerizerProcess>(process)));
+
+ Future<Nothing> fetch;
+ EXPECT_CALL(*process, fetch(_))
+ .WillOnce(DoAll(FutureSatisfy(&fetch),
+ Return(Nothing())));
+
+ Promise<Nothing> promise;
+
+ // We want to pause the fetch call to simulate a long fetch time.
+ EXPECT_CALL(*process, pull(_, _, _))
+ .WillOnce(Return(promise.future()));
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<TaskStatus> statusFailed;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusFailed));
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+
+ // Wait until fetch is finished.
+ AWAIT_READY(fetch);
+
+ dockerContainerizer.destroy(containerId.get());
+
+ // Resume docker launch.
+ promise.set(Nothing());
+
+ AWAIT_READY(statusFailed);
+
+ EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}