You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2016/02/18 23:02:54 UTC
mesos git commit: Fixed persistent volumes with docker tasks.
Repository: mesos
Updated Branches:
refs/heads/master 454cdf42d -> 541b3d963
Fixed persistent volumes with docker tasks.
Review: https://reviews.apache.org/r/43015
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/541b3d96
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/541b3d96
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/541b3d96
Branch: refs/heads/master
Commit: 541b3d963cccf07e979ce5362cbb6ace0144f31a
Parents: 454cdf4
Author: Timothy Chen <tn...@gmail.com>
Authored: Fri Jan 29 18:09:52 2016 -0500
Committer: Timothy Chen <tn...@apache.org>
Committed: Thu Feb 18 14:02:42 2016 -0800
----------------------------------------------------------------------
src/slave/containerizer/docker.cpp | 270 ++++++++++-
src/slave/containerizer/docker.hpp | 17 +-
.../docker_containerizer_tests.cpp | 480 +++++++++++++++++++
3 files changed, 762 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index ed1c9a5..50248e5 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -22,6 +22,7 @@
#include <mesos/slave/container_logger.hpp>
#include <process/check.hpp>
+#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/io.hpp>
@@ -29,6 +30,7 @@
#include <process/reap.hpp>
#include <process/subprocess.hpp>
+#include <stout/adaptor.hpp>
#include <stout/fs.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
@@ -40,6 +42,7 @@
#ifdef __linux__
#include "linux/cgroups.hpp"
+#include "linux/fs.hpp"
#include "linux/systemd.hpp"
#endif // __linux__
@@ -155,6 +158,9 @@ Try<DockerContainerizer*> DockerContainerizer::create(
}
}
+ // TODO(tnachen): We should also mark the work directory as shared
+ // mount here, more details please refer to MESOS-3483.
+
return new DockerContainerizer(
flags,
fetcher,
@@ -387,6 +393,163 @@ Future<Nothing> DockerContainerizerProcess::pull(
}
+Try<Nothing> DockerContainerizerProcess::updatePersistentVolumes(
+ const ContainerID& containerId,
+ const string& directory,
+ const Resources& current,
+ const Resources& updated)
+{
+ // Docker Containerizer currently is only expected to run on Linux.
+#ifdef __linux__
+ // Unmount all persistent volumes that are no longer present.
+ foreach (const Resource& resource, current.persistentVolumes()) {
+ // This is enforced by the master.
+ CHECK(resource.disk().has_volume());
+
+ // Ignore absolute and nested paths.
+ const string& containerPath = resource.disk().volume().container_path();
+ if (strings::contains(containerPath, "/")) {
+ LOG(WARNING) << "Skipping updating mount for persistent volume "
+ << resource << " of container " << containerId
+ << " because the container path '" << containerPath
+ << "' contains slash";
+ continue;
+ }
+
+ if (updated.contains(resource)) {
+ continue;
+ }
+
+ const string target = path::join(
+ directory, resource.disk().volume().container_path());
+
+ Try<Nothing> unmount = fs::unmount(target);
+ if (unmount.isError()) {
+ return Error("Failed to unmount persistent volume at '" + target +
+ "': " + unmount.error());
+ }
+
+ // TODO(tnachen): Remove mount point after unmounting. This requires
+ // making sure the work directory is marked as a shared mount. For
+ // more details please refer to MESOS-3483.
+ }
+
+ // Set the ownership of the persistent volume to match that of the
+ // sandbox directory.
+ //
+ // NOTE: Currently, persistent volumes in Mesos are exclusive,
+ // meaning that if a persistent volume is used by one task or
+ // executor, it cannot be concurrently used by other task or
+ // executor. But if we allow multiple executors to use same
+ // persistent volume at the same time in the future, the ownership
+ // of the persistent volume may conflict here.
+ //
+ // TODO(haosdent): Consider letting the frameworks specify the
+ // user/group of the persistent volumes.
+ struct stat s;
+ if (::stat(directory.c_str(), &s) < 0) {
+ return Error("Failed to get ownership for '" + directory + "': " +
+ os::strerror(errno));
+ }
+
+ // Mount all new persistent volumes added.
+ foreach (const Resource& resource, updated.persistentVolumes()) {
+ // This is enforced by the master.
+ CHECK(resource.disk().has_volume());
+
+ if (current.contains(resource)) {
+ continue;
+ }
+
+ const string source =
+ paths::getPersistentVolumePath(flags.work_dir, resource);
+
+ // Ignore absolute and nested paths.
+ const string& containerPath = resource.disk().volume().container_path();
+ if (strings::contains(containerPath, "/")) {
+ LOG(WARNING) << "Skipping updating mount for persistent volume "
+ << resource << " of container " << containerId
+ << " because the container path '" << containerPath
+ << "' contains slash";
+ continue;
+ }
+
+ const string target = path::join(directory, containerPath);
+
+ LOG(INFO) << "Changing the ownership of the persistent volume at '"
+ << source << "' with uid " << s.st_uid
+ << " and gid " << s.st_gid;
+
+ Try<Nothing> chown = os::chown(s.st_uid, s.st_gid, source, true);
+ if (chown.isError()) {
+ return Error(
+ "Failed to change the ownership of the persistent volume at '" +
+ source + "' with uid " + stringify(s.st_uid) +
+ " and gid " + stringify(s.st_gid) + ": " + chown.error());
+ }
+
+ // TODO(tnachen): We should check if the target already exists
+ // when we support updating persistent mounts.
+
+ Try<Nothing> mkdir = os::mkdir(target);
+ if (mkdir.isError()) {
+ return Error("Failed to create persistent mount point at '" + target
+ + "': " + mkdir.error());
+ }
+
+ LOG(INFO) << "Mounting '" << source << "' to '" << target
+ << "' for persistent volume " << resource
+ << " of container " << containerId;
+
+ Try<Nothing> mount = fs::mount(source, target, None(), MS_BIND, NULL);
+ if (mount.isError()) {
+ return Error(
+ "Failed to mount persistent volume from '" +
+ source + "' to '" + target + "': " + mount.error());
+ }
+ }
+#else
+ if (!current.persistentVolumes().empty() ||
+ !updated.persistentVolumes().empty()) {
+ return Error("Persistent volumes are only supported on linux");
+ }
+#endif // __linux__
+
+ return Nothing();
+}
+
+
+Future<Nothing> DockerContainerizerProcess::mountPersistentVolumes(
+ const ContainerID& containerId)
+{
+ if (!containers_.contains(containerId)) {
+ return Failure("Container is already destroyed");
+ }
+
+ Container* container = containers_[containerId];
+ container->state = Container::MOUNTING;
+
+ if (container->task.isNone() &&
+ !container->resources.persistentVolumes().empty()) {
+ LOG(ERROR) << "Persistent volumes found with container '" << containerId
+ << "' but are not supported with custom executors";
+ return Nothing();
+ }
+
+ Try<Nothing> updateVolumes = updatePersistentVolumes(
+ containerId,
+ container->directory,
+ Resources(),
+ container->resources);
+
+ if (updateVolumes.isError()) {
+ return Failure(updateVolumes.error());
+ }
+
+ return Nothing();
+}
+
+
Try<Nothing> DockerContainerizerProcess::checkpoint(
const ContainerID& containerId,
pid_t pid)
@@ -699,6 +862,7 @@ Future<Nothing> DockerContainerizerProcess::_recover(
framework.id,
executor.id,
containerId);
+ container->directory = sandboxDirectory;
// Pass recovered containers to the container logger.
// NOTE: The current implementation of the container logger only
@@ -721,9 +885,44 @@ Future<Nothing> DockerContainerizerProcess::_recover(
}
+/**
+ * Unmount persistent volumes that is mounted for a container.
+ */
+Try<Nothing> unmountPersistentVolumes(const ContainerID& containerId)
+{
+ // We assume volumes are only supported on Linux, and also
+ // the target path contains the containerId.
+#ifdef __linux__
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ if (table.isError()) {
+ return Error("Failed to get mount table: " + table.error());
+ }
+
+ foreach (const fs::MountInfoTable::Entry& entry,
+ adaptor::reverse(table.get().entries)) {
+ // TODO(tnachen): We assume there is only one docker container
+ // running per container Id and no other mounts will have the
+ // container Id name. We might need to revisit if this is no
+ // longer true.
+ if (strings::contains(entry.target, containerId.value())) {
+ LOG(INFO) << "Unmounting volume for container '" << containerId
+ << "'";
+ Try<Nothing> unmount = fs::unmount(entry.target);
+ if (unmount.isError()) {
+ return Error("Failed to unmount volume '" + entry.target +
+ "': " + unmount.error());
+ }
+ }
+ }
+#endif // __linux__
+ return Nothing();
+}
+
+
Future<Nothing> DockerContainerizerProcess::__recover(
const list<Docker::Container>& _containers)
{
+ list<Future<ContainerID>> futures;
foreach (const Docker::Container& container, _containers) {
VLOG(1) << "Checking if Docker container named '"
<< container.name << "' was started by Mesos";
@@ -742,11 +941,33 @@ Future<Nothing> DockerContainerizerProcess::__recover(
// if not, rm -f the Docker container.
if (!containers_.contains(id.get())) {
// TODO(tnachen): Consider using executor_shutdown_grace_period.
- docker->stop(container.id, flags.docker_stop_timeout, true);
+ futures.push_back(
+ docker->stop(
+ container.id,
+ flags.docker_stop_timeout,
+ true)
+ .then([id]() { return id.get(); }));
}
}
- return Nothing();
+ return collect(futures)
+ .then([](Future<list<ContainerID>> future) -> Future<Nothing> {
+ if (!future.isReady()) {
+ return Failure("Unable to stop orphaned Docker containers: " +
+ (future.isFailed() ?
+ future.failure() : "future discarded"));
+ }
+
+ foreach (const ContainerID& containerId, future.get()) {
+ Try<Nothing> unmount = unmountPersistentVolumes(containerId);
+ if (unmount.isError()) {
+ return Failure("Unable to unmount volumes for Docker container '" +
+ containerId.value() + "': " + unmount.error());
+ }
+ }
+
+ return Nothing();
+ });
}
@@ -827,6 +1048,9 @@ Future<bool> DockerContainerizerProcess::launch(
// Launching task by forking a subprocess to run docker executor.
return container.get()->launch = fetch(containerId, slaveId)
.then(defer(self(), [=]() { return pull(containerId); }))
+ .then(defer(self(), [=]() {
+ return mountPersistentVolumes(containerId);
+ }))
.then(defer(self(), [=]() { return launchExecutorProcess(containerId); }))
.then(defer(self(), [=](pid_t pid) {
return reapExecutor(containerId, pid);
@@ -850,6 +1074,9 @@ Future<bool> DockerContainerizerProcess::launch(
return container.get()->launch = fetch(containerId, slaveId)
.then(defer(self(), [=]() { return pull(containerId); }))
.then(defer(self(), [=]() {
+ return mountPersistentVolumes(containerId);
+ }))
+ .then(defer(self(), [=]() {
return launchExecutorContainer(containerId, containerName);
}))
.then(defer(self(), [=](const Docker::Container& dockerContainer) {
@@ -1080,6 +1307,8 @@ Future<Nothing> DockerContainerizerProcess::update(
return Nothing();
}
+ // TODO(tnachen): Support updating persistent volumes, which requires
+ // Docker mount propagation support.
// Store the resources for usage().
container->resources = _resources;
@@ -1468,6 +1697,9 @@ void DockerContainerizerProcess::destroy(
// cleanup. Just as above, we'll need to deal with the race with
// 'docker pull' returning successfully.
//
+ // If we're MOUNTING, we want to unmount all the persistent volumes
+ // that has been mounted.
+ //
// If we're RUNNING, we want to wait for the status to get set, then
// do a Docker::kill, then wait for the status to complete, then
// cleanup.
@@ -1507,6 +1739,29 @@ void DockerContainerizerProcess::destroy(
return;
}
+ if (container->state == Container::MOUNTING) {
+ LOG(INFO) << "Destroying Container '" << containerId
+ << "' in MOUNTING state";
+
+ // Persistent volumes might already been mounted, remove them
+ // if necessary.
+ Try<Nothing> unmount = unmountPersistentVolumes(containerId);
+ if (unmount.isError()) {
+ LOG(WARNING) << "Failed to remove persistent volumes on destroy for "
+ << "container '" << containerId << "': "
+ << unmount.error();
+ }
+
+ containerizer::Termination termination;
+ termination.set_message("Container destroyed while mounting volumes");
+ container->termination.set(termination);
+
+ containers_.erase(containerId);
+ delete container;
+
+ return;
+ }
+
CHECK(container->state == Container::RUNNING);
container->state = Container::DESTROYING;
@@ -1613,6 +1868,17 @@ void DockerContainerizerProcess::___destroy(
{
CHECK(containers_.contains(containerId));
+ Try<Nothing> unmount = unmountPersistentVolumes(containerId);
+ if (unmount.isError()) {
+ // TODO(tnachen): Failing to unmount a persistent volume now
+ // leads to leaving the volume on the host, and we won't retry
+ // again since the Docker container is removed. We should consider
+ // not removing the container so we can retry.
+ LOG(WARNING) << "Failed to remove persistent volumes on destroy for "
+ << "container '" << containerId << "': "
+ << unmount.error();
+ }
+
Container* container = containers_[containerId];
containerizer::Termination termination;
http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 77a50d8..4d70381 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -223,6 +223,15 @@ private:
const Resources& resources,
pid_t pid);
+ process::Future<Nothing> mountPersistentVolumes(
+ const ContainerID& containerId);
+
+ Try<Nothing> updatePersistentVolumes(
+ const ContainerID& containerId,
+ const std::string& directory,
+ const Resources& current,
+ const Resources& updated);
+
Try<ResourceStatistics> cgroupsStatistics(pid_t pid) const;
// Call back for when the executor exits. This will trigger
@@ -387,6 +396,7 @@ private:
//
// FETCHING
// PULLING
+ // MOUNTING
// RUNNING
// DESTROYING
//
@@ -404,8 +414,9 @@ private:
{
FETCHING = 1,
PULLING = 2,
- RUNNING = 3,
- DESTROYING = 4
+ MOUNTING = 3,
+ RUNNING = 4,
+ DESTROYING = 5
} state;
const ContainerID id;
@@ -417,7 +428,7 @@ private:
// The sandbox directory for the container. This holds the
// symlinked path if symlinked boolean is true.
- const std::string directory;
+ std::string directory;
const Option<std::string> user;
SlaveID slaveId;
http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/tests/containerizer/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp
index 645bdcf..8541a9a 100644
--- a/src/tests/containerizer/docker_containerizer_tests.cpp
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -27,7 +27,10 @@
#include <stout/duration.hpp>
+#ifdef __linux__
#include "linux/cgroups.hpp"
+#include "linux/fs.hpp"
+#endif // __linux__
#include "messages/messages.hpp"
@@ -1081,6 +1084,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Recover)
RunState runState;
runState.id = containerId;
runState.forkedPid = wait.get().pid();
+
execState.runs.put(containerId, runState);
frameworkState.executors.put(execId, execState);
@@ -1166,6 +1170,482 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverNonDocker)
}
+#ifdef __linux__
+// This test verifies that we can launch a docker container with
+// persistent volume.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchWithPersistentVolumes)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockDocker* mockDocker =
+ new MockDocker(tests::flags.docker, tests::flags.docker_socket);
+
+ Shared<Docker> docker(mockDocker);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.resources = "cpu:2;mem:2048;disk(role1):2048";
+
+ Fetcher fetcher;
+
+ Try<ContainerLogger*> logger =
+ ContainerLogger::create(flags.container_logger);
+
+ ASSERT_SOME(logger);
+
+ MockDockerContainerizer dockerContainerizer(
+ flags,
+ &fetcher,
+ Owned<ContainerLogger>(logger.get()),
+ docker);
+
+ Try<PID<Slave>> slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role1");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ Offer offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ Resource volume = createPersistentVolume(
+ Megabytes(64),
+ "role1",
+ "id1",
+ "path1");
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(
+ Resources::parse("cpus:1;mem:64;").get() + volume);
+
+ CommandInfo command;
+ command.set_value("echo abc > " +
+ path::join(flags.sandbox_directory, "path1", "file"));
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("alpine");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (the default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ Future<ContainerID> containerId;
+ Future<string> directory;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<3>(&directory),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusFinished;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished))
+ .WillRepeatedly(DoDefault());
+
+ driver.acceptOffers(
+ {offer.id()},
+ {CREATE(volume), LAUNCH({task})},
+ filters);
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(directory);
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ AWAIT_READY(statusFinished);
+ EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(termination);
+
+ ASSERT_FALSE(
+ exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+ const string& volumePath = getPersistentVolumePath(
+ flags.work_dir,
+ volume);
+
+ EXPECT_SOME_EQ("abc\n", os::read(path::join(volumePath, "file")));
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ EXPECT_SOME(table);
+
+ // Verify that the persistent volume is unmounted.
+ foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
+ EXPECT_FALSE(
+ strings::contains(entry.target, path::join(directory.get(), "path1")));
+ }
+
+ Shutdown();
+}
+
+
+// This test checks the docker containerizer is able to recover containers
+// with persistent volumes and destroy it properly.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverPersistentVolumes)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockDocker* mockDocker =
+ new MockDocker(tests::flags.docker, tests::flags.docker_socket);
+
+ Shared<Docker> docker(mockDocker);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.resources = "cpu:2;mem:2048;disk(role1):2048";
+
+ Fetcher fetcher;
+
+ Try<ContainerLogger*> logger =
+ ContainerLogger::create(flags.container_logger);
+
+ ASSERT_SOME(logger);
+
+ MockDockerContainerizer* dockerContainerizer = new MockDockerContainerizer(
+ flags,
+ &fetcher,
+ Owned<ContainerLogger>(logger.get()),
+ docker);
+
+ Try<PID<Slave>> slave = StartSlave(dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role1");
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // NOTE: We set filter explicitly here so that the resources will
+ // not be filtered for 5 seconds (the default).
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ Offer offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ Resource volume = createPersistentVolume(
+ Megabytes(64),
+ "role1",
+ "id1",
+ "path1");
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(
+ Resources::parse("cpus:1;mem:64;").get() + volume);
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("alpine");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ Future<string> directory;
+ EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<3>(&directory),
+ Invoke(dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(DoDefault());
+
+ driver.acceptOffers(
+ {offer.id()},
+ {CREATE(volume), LAUNCH({task})},
+ filters);
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(directory);
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ Stop(slave.get());
+
+ // Recreate containerizer and start slave again.
+ delete dockerContainerizer;
+
+ logger = ContainerLogger::create(flags.container_logger);
+ ASSERT_SOME(logger);
+
+ dockerContainerizer = new MockDockerContainerizer(
+ flags,
+ &fetcher,
+ Owned<ContainerLogger>(logger.get()),
+ docker);
+
+ slave = StartSlave(dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+ // Wait until containerizer recover is complete.
+ AWAIT_READY(_recover);
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer->wait(containerId.get());
+
+ dockerContainerizer->destroy(containerId.get());
+
+ AWAIT_READY(termination);
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ EXPECT_SOME(table);
+
+ // Verify that the recovered container's persistent volume is
+ // unmounted.
+ foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
+ EXPECT_FALSE(
+ strings::contains(entry.target, path::join(directory.get(), "path1")));
+ }
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+ delete dockerContainerizer;
+}
+
+
+// This test checks the docker containerizer is able to clean up
+// orphaned containers with persistent volumes.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverOrphanedPersistentVolumes)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockDocker* mockDocker =
+ new MockDocker(tests::flags.docker, tests::flags.docker_socket);
+
+ Shared<Docker> docker(mockDocker);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.resources = "cpu:2;mem:2048;disk(role1):2048";
+
+ Fetcher fetcher;
+
+ Try<ContainerLogger*> logger =
+ ContainerLogger::create(flags.container_logger);
+
+ ASSERT_SOME(logger);
+
+ MockDockerContainerizer* dockerContainerizer = new MockDockerContainerizer(
+ flags,
+ &fetcher,
+ Owned<ContainerLogger>(logger.get()),
+ docker);
+
+ Try<PID<Slave>> slave = StartSlave(dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role1");
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // NOTE: We set filter explicitly here so that the resources will
+ // not be filtered for 5 seconds (the default).
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ Offer offer = offers.get()[0];
+
+ Resource volume = createPersistentVolume(
+ Megabytes(64),
+ "role1",
+ "id1",
+ "path1");
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(
+ Resources::parse("cpus:1;mem:64;").get() + volume);
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("alpine");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ Future<string> directory;
+ EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<3>(&directory),
+ Invoke(dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(DoDefault());
+
+ driver.acceptOffers(
+ {offer.id()},
+ {CREATE(volume), LAUNCH({task})},
+ filters);
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(directory);
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ Stop(slave.get());
+
+ // Wipe the framework directory so that the slave will treat the
+ // above running task as an orphan. We don't want to wipe the whole
+ // meta directory since Docker Containerizer will skip recover if
+ // state is not found.
+ ASSERT_SOME(
+ os::rmdir(getFrameworkPath(
+ getMetaRootDir(flags.work_dir),
+ offer.slave_id(),
+ frameworkId.get())));
+
+ // Recreate containerizer and start slave again.
+ delete dockerContainerizer;
+
+ logger = ContainerLogger::create(flags.container_logger);
+ ASSERT_SOME(logger);
+
+ dockerContainerizer = new MockDockerContainerizer(
+ flags,
+ &fetcher,
+ Owned<ContainerLogger>(logger.get()),
+ docker);
+
+ slave = StartSlave(dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+ // Wait until containerizer recover is complete.
+ AWAIT_READY(_recover);
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ EXPECT_SOME(table);
+
+ // Verify that the orphaned container's persistent volume is
+ // unmounted.
+ foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
+ EXPECT_FALSE(
+ strings::contains(entry.target, path::join(directory.get(), "path1")));
+ }
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+ delete dockerContainerizer;
+
+ EXPECT_FALSE(exists(docker, offer.slave_id(), containerId.get()));
+}
+#endif // __linux__
+
+
TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
{
Try<PID<Master> > master = StartMaster();