You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2016/02/18 23:02:54 UTC

mesos git commit: Fixed persistent volumes with docker tasks.

Repository: mesos
Updated Branches:
  refs/heads/master 454cdf42d -> 541b3d963


Fixed persistent volumes with docker tasks.

Review: https://reviews.apache.org/r/43015


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/541b3d96
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/541b3d96
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/541b3d96

Branch: refs/heads/master
Commit: 541b3d963cccf07e979ce5362cbb6ace0144f31a
Parents: 454cdf4
Author: Timothy Chen <tn...@gmail.com>
Authored: Fri Jan 29 18:09:52 2016 -0500
Committer: Timothy Chen <tn...@apache.org>
Committed: Thu Feb 18 14:02:42 2016 -0800

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp              | 270 ++++++++++-
 src/slave/containerizer/docker.hpp              |  17 +-
 .../docker_containerizer_tests.cpp              | 480 +++++++++++++++++++
 3 files changed, 762 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index ed1c9a5..50248e5 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -22,6 +22,7 @@
 #include <mesos/slave/container_logger.hpp>
 
 #include <process/check.hpp>
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/io.hpp>
@@ -29,6 +30,7 @@
 #include <process/reap.hpp>
 #include <process/subprocess.hpp>
 
+#include <stout/adaptor.hpp>
 #include <stout/fs.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
@@ -40,6 +42,7 @@
 
 #ifdef __linux__
 #include "linux/cgroups.hpp"
+#include "linux/fs.hpp"
 #include "linux/systemd.hpp"
 #endif // __linux__
 
@@ -155,6 +158,9 @@ Try<DockerContainerizer*> DockerContainerizer::create(
     }
   }
 
+  // TODO(tnachen): We should also mark the work directory as shared
+  // mount here, more details please refer to MESOS-3483.
+
   return new DockerContainerizer(
       flags,
       fetcher,
@@ -387,6 +393,163 @@ Future<Nothing> DockerContainerizerProcess::pull(
 }
 
 
+Try<Nothing> DockerContainerizerProcess::updatePersistentVolumes(
+    const ContainerID& containerId,
+    const string& directory,
+    const Resources& current,
+    const Resources& updated)
+{
+  // Docker Containerizer currently is only expected to run on Linux.
+#ifdef __linux__
+  // Unmount all persistent volumes that are no longer present.
+  foreach (const Resource& resource, current.persistentVolumes()) {
+    // This is enforced by the master.
+    CHECK(resource.disk().has_volume());
+
+    // Ignore absolute and nested paths.
+    const string& containerPath = resource.disk().volume().container_path();
+    if (strings::contains(containerPath, "/")) {
+      LOG(WARNING) << "Skipping updating mount for persistent volume "
+                   << resource << " of container " << containerId
+                   << " because the container path '" << containerPath
+                   << "' contains slash";
+      continue;
+    }
+
+    if (updated.contains(resource)) {
+      continue;
+    }
+
+    const string target = path::join(
+        directory, resource.disk().volume().container_path());
+
+    Try<Nothing> unmount = fs::unmount(target);
+    if (unmount.isError()) {
+      return Error("Failed to unmount persistent volume at '" + target +
+                   "': " + unmount.error());
+    }
+
+    // TODO(tnachen): Remove mount point after unmounting. This requires
+    // making sure the work directory is marked as a shared mount. For
+    // more details please refer to MESOS-3483.
+  }
+
+  // Set the ownership of the persistent volume to match that of the
+  // sandbox directory.
+  //
+  // NOTE: Currently, persistent volumes in Mesos are exclusive,
+  // meaning that if a persistent volume is used by one task or
+  // executor, it cannot be concurrently used by other task or
+  // executor. But if we allow multiple executors to use same
+  // persistent volume at the same time in the future, the ownership
+  // of the persistent volume may conflict here.
+  //
+  // TODO(haosdent): Consider letting the frameworks specify the
+  // user/group of the persistent volumes.
+  struct stat s;
+  if (::stat(directory.c_str(), &s) < 0) {
+    return Error("Failed to get ownership for '" + directory + "': " +
+                 os::strerror(errno));
+  }
+
+  // Mount all new persistent volumes added.
+  foreach (const Resource& resource, updated.persistentVolumes()) {
+    // This is enforced by the master.
+    CHECK(resource.disk().has_volume());
+
+    if (current.contains(resource)) {
+      continue;
+    }
+
+    const string source =
+      paths::getPersistentVolumePath(flags.work_dir, resource);
+
+    // Ignore absolute and nested paths.
+    const string& containerPath = resource.disk().volume().container_path();
+    if (strings::contains(containerPath, "/")) {
+      LOG(WARNING) << "Skipping updating mount for persistent volume "
+                   << resource << " of container " << containerId
+                   << " because the container path '" << containerPath
+                   << "' contains slash";
+      continue;
+    }
+
+    const string target = path::join(directory, containerPath);
+
+    LOG(INFO) << "Changing the ownership of the persistent volume at '"
+              << source << "' with uid " << s.st_uid
+              << " and gid " << s.st_gid;
+
+    Try<Nothing> chown = os::chown(s.st_uid, s.st_gid, source, true);
+    if (chown.isError()) {
+      return Error(
+          "Failed to change the ownership of the persistent volume at '" +
+          source + "' with uid " + stringify(s.st_uid) +
+          " and gid " + stringify(s.st_gid) + ": " + chown.error());
+    }
+
+    // TODO(tnachen): We should check if the target already exists
+    // when we support updating persistent mounts.
+
+    Try<Nothing> mkdir = os::mkdir(target);
+    if (mkdir.isError()) {
+      return Error("Failed to create persistent mount point at '" + target
+                     + "': " + mkdir.error());
+    }
+
+    LOG(INFO) << "Mounting '" << source << "' to '" << target
+              << "' for persistent volume " << resource
+              << " of container " << containerId;
+
+    Try<Nothing> mount = fs::mount(source, target, None(), MS_BIND, NULL);
+    if (mount.isError()) {
+      return Error(
+          "Failed to mount persistent volume from '" +
+          source + "' to '" + target + "': " + mount.error());
+    }
+  }
+#else
+  if (!current.persistentVolumes().empty() ||
+      !updated.persistentVolumes().empty()) {
+    return Error("Persistent volumes are only supported on linux");
+  }
+#endif // __linux__
+
+  return Nothing();
+}
+
+
+Future<Nothing> DockerContainerizerProcess::mountPersistentVolumes(
+    const ContainerID& containerId)
+{
+  if (!containers_.contains(containerId)) {
+    return Failure("Container is already destroyed");
+  }
+
+  Container* container = containers_[containerId];
+  container->state = Container::MOUNTING;
+
+  if (container->task.isNone() &&
+      !container->resources.persistentVolumes().empty()) {
+    LOG(ERROR) << "Persistent volumes found with container '" << containerId
+               << "' but are not supported with custom executors";
+    return Nothing();
+  }
+
+  Try<Nothing> updateVolumes = updatePersistentVolumes(
+      containerId,
+      container->directory,
+      Resources(),
+      container->resources);
+
+  if (updateVolumes.isError()) {
+    return Failure(updateVolumes.error());
+  }
+
+  return Nothing();
+}
+
+
 Try<Nothing> DockerContainerizerProcess::checkpoint(
     const ContainerID& containerId,
     pid_t pid)
@@ -699,6 +862,7 @@ Future<Nothing> DockerContainerizerProcess::_recover(
           framework.id,
           executor.id,
           containerId);
+      container->directory = sandboxDirectory;
 
       // Pass recovered containers to the container logger.
       // NOTE: The current implementation of the container logger only
@@ -721,9 +885,44 @@ Future<Nothing> DockerContainerizerProcess::_recover(
 }
 
 
+/**
+ *  Unmount persistent volumes that is mounted for a container.
+ */
+Try<Nothing> unmountPersistentVolumes(const ContainerID& containerId)
+{
+  // We assume volumes are only supported on Linux, and also
+  // the target path contains the containerId.
+#ifdef __linux__
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return Error("Failed to get mount table: " + table.error());
+  }
+
+  foreach (const fs::MountInfoTable::Entry& entry,
+           adaptor::reverse(table.get().entries)) {
+    // TODO(tnachen): We assume there is only one docker container
+    // running per container Id and no other mounts will have the
+    // container Id name. We might need to revisit if this is no
+    // longer true.
+    if (strings::contains(entry.target, containerId.value())) {
+      LOG(INFO) << "Unmounting volume for container '" << containerId
+                << "'";
+      Try<Nothing> unmount = fs::unmount(entry.target);
+      if (unmount.isError()) {
+        return Error("Failed to unmount volume '" + entry.target +
+                     "': " + unmount.error());
+      }
+    }
+  }
+#endif // __linux__
+  return Nothing();
+}
+
+
 Future<Nothing> DockerContainerizerProcess::__recover(
     const list<Docker::Container>& _containers)
 {
+  list<Future<ContainerID>> futures;
   foreach (const Docker::Container& container, _containers) {
     VLOG(1) << "Checking if Docker container named '"
             << container.name << "' was started by Mesos";
@@ -742,11 +941,33 @@ Future<Nothing> DockerContainerizerProcess::__recover(
     // if not, rm -f the Docker container.
     if (!containers_.contains(id.get())) {
       // TODO(tnachen): Consider using executor_shutdown_grace_period.
-      docker->stop(container.id, flags.docker_stop_timeout, true);
+      futures.push_back(
+          docker->stop(
+              container.id,
+              flags.docker_stop_timeout,
+              true)
+            .then([id]() { return id.get(); }));
     }
   }
 
-  return Nothing();
+  return collect(futures)
+    .then([](Future<list<ContainerID>> future) -> Future<Nothing> {
+      if (!future.isReady()) {
+        return Failure("Unable to stop orphaned Docker containers: " +
+                       (future.isFailed() ?
+                        future.failure() : "future discarded"));
+      }
+
+      foreach (const ContainerID& containerId, future.get()) {
+        Try<Nothing> unmount = unmountPersistentVolumes(containerId);
+        if (unmount.isError()) {
+          return Failure("Unable to unmount volumes for Docker container '" +
+                         containerId.value() + "': " + unmount.error());
+        }
+      }
+
+      return Nothing();
+    });
 }
 
 
@@ -827,6 +1048,9 @@ Future<bool> DockerContainerizerProcess::launch(
     // Launching task by forking a subprocess to run docker executor.
     return container.get()->launch = fetch(containerId, slaveId)
       .then(defer(self(), [=]() { return pull(containerId); }))
+      .then(defer(self(), [=]() {
+        return mountPersistentVolumes(containerId);
+      }))
       .then(defer(self(), [=]() { return launchExecutorProcess(containerId); }))
       .then(defer(self(), [=](pid_t pid) {
         return reapExecutor(containerId, pid);
@@ -850,6 +1074,9 @@ Future<bool> DockerContainerizerProcess::launch(
   return container.get()->launch = fetch(containerId, slaveId)
     .then(defer(self(), [=]() { return pull(containerId); }))
     .then(defer(self(), [=]() {
+      return mountPersistentVolumes(containerId);
+    }))
+    .then(defer(self(), [=]() {
       return launchExecutorContainer(containerId, containerName);
     }))
     .then(defer(self(), [=](const Docker::Container& dockerContainer) {
@@ -1080,6 +1307,8 @@ Future<Nothing> DockerContainerizerProcess::update(
     return Nothing();
   }
 
+  // TODO(tnachen): Support updating persistent volumes, which requires
+  // Docker mount propagation support.
 
   // Store the resources for usage().
   container->resources = _resources;
@@ -1468,6 +1697,9 @@ void DockerContainerizerProcess::destroy(
   // cleanup. Just as above, we'll need to deal with the race with
   // 'docker pull' returning successfully.
   //
+  // If we're MOUNTING, we want to unmount all the persistent volumes
+  // that has been mounted.
+  //
   // If we're RUNNING, we want to wait for the status to get set, then
   // do a Docker::kill, then wait for the status to complete, then
   // cleanup.
@@ -1507,6 +1739,29 @@ void DockerContainerizerProcess::destroy(
     return;
   }
 
+  if (container->state == Container::MOUNTING) {
+    LOG(INFO) << "Destroying Container '" << containerId
+              << "' in MOUNTING state";
+
+    // Persistent volumes might already been mounted, remove them
+    // if necessary.
+    Try<Nothing> unmount = unmountPersistentVolumes(containerId);
+    if (unmount.isError()) {
+      LOG(WARNING) << "Failed to remove persistent volumes on destroy for "
+                   << "container '" << containerId << "': "
+                   << unmount.error();
+    }
+
+    containerizer::Termination termination;
+    termination.set_message("Container destroyed while mounting volumes");
+    container->termination.set(termination);
+
+    containers_.erase(containerId);
+    delete container;
+
+    return;
+  }
+
   CHECK(container->state == Container::RUNNING);
 
   container->state = Container::DESTROYING;
@@ -1613,6 +1868,17 @@ void DockerContainerizerProcess::___destroy(
 {
   CHECK(containers_.contains(containerId));
 
+  Try<Nothing> unmount = unmountPersistentVolumes(containerId);
+  if (unmount.isError()) {
+    // TODO(tnachen): Failing to unmount a persistent volume now
+    // leads to leaving the volume on the host, and we won't retry
+    // again since the Docker container is removed. We should consider
+    // not removing the container so we can retry.
+    LOG(WARNING) << "Failed to remove persistent volumes on destroy for "
+                 << "container '" << containerId << "': "
+                 << unmount.error();
+  }
+
   Container* container = containers_[containerId];
 
   containerizer::Termination termination;

http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 77a50d8..4d70381 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -223,6 +223,15 @@ private:
       const Resources& resources,
       pid_t pid);
 
+  process::Future<Nothing> mountPersistentVolumes(
+      const ContainerID& containerId);
+
+  Try<Nothing> updatePersistentVolumes(
+    const ContainerID& containerId,
+    const std::string& directory,
+    const Resources& current,
+    const Resources& updated);
+
   Try<ResourceStatistics> cgroupsStatistics(pid_t pid) const;
 
   // Call back for when the executor exits. This will trigger
@@ -387,6 +396,7 @@ private:
     //
     //     FETCHING
     //     PULLING
+    //     MOUNTING
     //     RUNNING
     //     DESTROYING
     //
@@ -404,8 +414,9 @@ private:
     {
       FETCHING = 1,
       PULLING = 2,
-      RUNNING = 3,
-      DESTROYING = 4
+      MOUNTING = 3,
+      RUNNING = 4,
+      DESTROYING = 5
     } state;
 
     const ContainerID id;
@@ -417,7 +428,7 @@ private:
 
     // The sandbox directory for the container. This holds the
     // symlinked path if symlinked boolean is true.
-    const std::string directory;
+    std::string directory;
 
     const Option<std::string> user;
     SlaveID slaveId;

http://git-wip-us.apache.org/repos/asf/mesos/blob/541b3d96/src/tests/containerizer/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp
index 645bdcf..8541a9a 100644
--- a/src/tests/containerizer/docker_containerizer_tests.cpp
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -27,7 +27,10 @@
 
 #include <stout/duration.hpp>
 
+#ifdef __linux__
 #include "linux/cgroups.hpp"
+#include "linux/fs.hpp"
+#endif // __linux__
 
 #include "messages/messages.hpp"
 
@@ -1081,6 +1084,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Recover)
   RunState runState;
   runState.id = containerId;
   runState.forkedPid = wait.get().pid();
+
   execState.runs.put(containerId, runState);
   frameworkState.executors.put(execId, execState);
 
@@ -1166,6 +1170,482 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverNonDocker)
 }
 
 
+#ifdef __linux__
+// This test verifies that we can launch a docker container with
+// persistent volume.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchWithPersistentVolumes)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockDocker* mockDocker =
+    new MockDocker(tests::flags.docker, tests::flags.docker_socket);
+
+  Shared<Docker> docker(mockDocker);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = "cpu:2;mem:2048;disk(role1):2048";
+
+  Fetcher fetcher;
+
+  Try<ContainerLogger*> logger =
+    ContainerLogger::create(flags.container_logger);
+
+  ASSERT_SOME(logger);
+
+  MockDockerContainerizer dockerContainerizer(
+      flags,
+      &fetcher,
+      Owned<ContainerLogger>(logger.get()),
+      docker);
+
+  Try<PID<Slave>> slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role1");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  Offer offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  Resource volume = createPersistentVolume(
+    Megabytes(64),
+    "role1",
+    "id1",
+    "path1");
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(
+      Resources::parse("cpus:1;mem:64;").get() + volume);
+
+  CommandInfo command;
+  command.set_value("echo abc > " +
+                    path::join(flags.sandbox_directory, "path1", "file"));
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("alpine");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  Future<ContainerID> containerId;
+  Future<string> directory;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<3>(&directory),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished))
+    .WillRepeatedly(DoDefault());
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume), LAUNCH({task})},
+      filters);
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY(directory);
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  AWAIT_READY(statusFinished);
+  EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(termination);
+
+  ASSERT_FALSE(
+    exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+  const string& volumePath = getPersistentVolumePath(
+      flags.work_dir,
+      volume);
+
+  EXPECT_SOME_EQ("abc\n", os::read(path::join(volumePath, "file")));
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  EXPECT_SOME(table);
+
+  // Verify that the persistent volume is unmounted.
+  foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
+    EXPECT_FALSE(
+        strings::contains(entry.target, path::join(directory.get(), "path1")));
+  }
+
+  Shutdown();
+}
+
+
+// This test checks the docker containerizer is able to recover containers
+// with persistent volumes and destroy it properly.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverPersistentVolumes)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockDocker* mockDocker =
+    new MockDocker(tests::flags.docker, tests::flags.docker_socket);
+
+  Shared<Docker> docker(mockDocker);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = "cpu:2;mem:2048;disk(role1):2048";
+
+  Fetcher fetcher;
+
+  Try<ContainerLogger*> logger =
+    ContainerLogger::create(flags.container_logger);
+
+  ASSERT_SOME(logger);
+
+  MockDockerContainerizer* dockerContainerizer = new MockDockerContainerizer(
+      flags,
+      &fetcher,
+      Owned<ContainerLogger>(logger.get()),
+      docker);
+
+  Try<PID<Slave>> slave = StartSlave(dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role1");
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // NOTE: We set filter explicitly here so that the resources will
+  // not be filtered for 5 seconds (the default).
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers(filters));      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  Offer offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  Resource volume = createPersistentVolume(
+    Megabytes(64),
+    "role1",
+    "id1",
+    "path1");
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(
+      Resources::parse("cpus:1;mem:64;").get() + volume);
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("alpine");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  Future<string> directory;
+  EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<3>(&directory),
+                    Invoke(dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillRepeatedly(DoDefault());
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume), LAUNCH({task})},
+      filters);
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY(directory);
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  Stop(slave.get());
+
+  // Recreate containerizer and start slave again.
+  delete dockerContainerizer;
+
+  logger = ContainerLogger::create(flags.container_logger);
+  ASSERT_SOME(logger);
+
+  dockerContainerizer = new MockDockerContainerizer(
+      flags,
+      &fetcher,
+      Owned<ContainerLogger>(logger.get()),
+      docker);
+
+  slave = StartSlave(dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+  // Wait until containerizer recover is complete.
+  AWAIT_READY(_recover);
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer->wait(containerId.get());
+
+  dockerContainerizer->destroy(containerId.get());
+
+  AWAIT_READY(termination);
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  EXPECT_SOME(table);
+
+  // Verify that the recovered container's persistent volume is
+  // unmounted.
+  foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
+    EXPECT_FALSE(
+        strings::contains(entry.target, path::join(directory.get(), "path1")));
+  }
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+  delete dockerContainerizer;
+}
+
+
+// This test checks the docker containerizer is able to clean up
+// orphaned containers with persistent volumes.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_RecoverOrphanedPersistentVolumes)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockDocker* mockDocker =
+    new MockDocker(tests::flags.docker, tests::flags.docker_socket);
+
+  Shared<Docker> docker(mockDocker);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = "cpu:2;mem:2048;disk(role1):2048";
+
+  Fetcher fetcher;
+
+  Try<ContainerLogger*> logger =
+    ContainerLogger::create(flags.container_logger);
+
+  ASSERT_SOME(logger);
+
+  MockDockerContainerizer* dockerContainerizer = new MockDockerContainerizer(
+      flags,
+      &fetcher,
+      Owned<ContainerLogger>(logger.get()),
+      docker);
+
+  Try<PID<Slave>> slave = StartSlave(dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role1");
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // NOTE: We set filter explicitly here so that the resources will
+  // not be filtered for 5 seconds (the default).
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(DeclineOffers(filters)); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  Offer offer = offers.get()[0];
+
+  Resource volume = createPersistentVolume(
+    Megabytes(64),
+    "role1",
+    "id1",
+    "path1");
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(
+      Resources::parse("cpus:1;mem:64;").get() + volume);
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("alpine");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  Future<string> directory;
+  EXPECT_CALL(*dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<3>(&directory),
+                    Invoke(dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillRepeatedly(DoDefault());
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume), LAUNCH({task})},
+      filters);
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY(directory);
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  Stop(slave.get());
+
+  // Wipe the framework directory so that the slave will treat the
+  // above running task as an orphan. We don't want to wipe the whole
+  // meta directory since Docker Containerizer will skip recover if
+  // state is not found.
+  ASSERT_SOME(
+      os::rmdir(getFrameworkPath(
+          getMetaRootDir(flags.work_dir),
+          offer.slave_id(),
+          frameworkId.get())));
+
+  // Recreate containerizer and start slave again.
+  delete dockerContainerizer;
+
+  logger = ContainerLogger::create(flags.container_logger);
+  ASSERT_SOME(logger);
+
+  dockerContainerizer = new MockDockerContainerizer(
+      flags,
+      &fetcher,
+      Owned<ContainerLogger>(logger.get()),
+      docker);
+
+  slave = StartSlave(dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+  // Wait until containerizer recover is complete.
+  AWAIT_READY(_recover);
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  EXPECT_SOME(table);
+
+  // Verify that the orphaned container's persistent volume is
+  // unmounted.
+  foreach (const fs::MountInfoTable::Entry& entry, table.get().entries) {
+    EXPECT_FALSE(
+        strings::contains(entry.target, path::join(directory.get(), "path1")));
+  }
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+  delete dockerContainerizer;
+
+  EXPECT_FALSE(exists(docker, offer.slave_id(), containerId.get()));
+}
+#endif // __linux__
+
+
 TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
 {
   Try<PID<Master> > master = StartMaster();