You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2014/11/10 08:27:20 UTC

mesos git commit: Add destroy tests for docker containerizer.

Repository: mesos
Updated Branches:
  refs/heads/master 4a1bbf7d3 -> 3bfb136e9


Add destroy tests for docker containerizer.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bfb136e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bfb136e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bfb136e

Branch: refs/heads/master
Commit: 3bfb136e9dcc83715657cf7862000987fc0699b6
Parents: 4a1bbf7
Author: Timothy Chen <tn...@apache.org>
Authored: Mon Nov 3 13:59:34 2014 -0800
Committer: Timothy Chen <tn...@gmail.com>
Committed: Sun Nov 9 23:26:14 2014 -0800

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp       | 408 ++++----------------------
 src/slave/containerizer/docker.hpp       | 332 ++++++++++++++++++++-
 src/tests/docker_containerizer_tests.cpp | 272 +++++++++++++++++
 3 files changed, 654 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 37f422a..5978ec2 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -79,329 +79,6 @@ const string DOCKER_NAME_PREFIX = "mesos-";
 // Declared in header, see explanation there.
 const string DOCKER_SYMLINK_DIRECTORY = "docker/links";
 
-
-class DockerContainerizerProcess
-  : public process::Process<DockerContainerizerProcess>
-{
-public:
-  DockerContainerizerProcess(
-      const Flags& _flags,
-      Shared<Docker> _docker)
-    : flags(_flags),
-      docker(_docker) {}
-
-  virtual process::Future<Nothing> recover(
-      const Option<state::SlaveState>& state);
-
-  virtual process::Future<bool> launch(
-      const ContainerID& containerId,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<string>& user,
-      const SlaveID& slaveId,
-      const PID<Slave>& slavePid,
-      bool checkpoint);
-
-  virtual process::Future<bool> launch(
-      const ContainerID& containerId,
-      const TaskInfo& taskInfo,
-      const ExecutorInfo& executorInfo,
-      const std::string& directory,
-      const Option<string>& user,
-      const SlaveID& slaveId,
-      const PID<Slave>& slavePid,
-      bool checkpoint);
-
-  virtual process::Future<Nothing> update(
-      const ContainerID& containerId,
-      const Resources& resources);
-
-  virtual process::Future<ResourceStatistics> usage(
-      const ContainerID& containerId);
-
-  virtual Future<containerizer::Termination> wait(
-      const ContainerID& containerId);
-
-  virtual void destroy(
-      const ContainerID& containerId,
-      bool killed = true); // process is either killed or reaped.
-
-  virtual process::Future<hashset<ContainerID> > containers();
-
-private:
-  // Continuations and helpers.
-  process::Future<Nothing> fetch(const ContainerID& containerId);
-
-  process::Future<Nothing> _fetch(
-      const ContainerID& containerId,
-      const Option<int>& status);
-
-  process::Future<Nothing> pull(
-      const ContainerID& containerId,
-      const std::string& directory,
-      const std::string& image);
-
-  process::Future<Nothing> _pull(const std::string& image);
-
-  Try<Nothing> checkpoint(
-      const ContainerID& containerId,
-      pid_t pid);
-
-  process::Future<Nothing> _recover(
-      const std::list<Docker::Container>& containers);
-
-  process::Future<Nothing> _launch(
-      const ContainerID& containerId);
-
-  process::Future<Nothing> __launch(
-      const ContainerID& containerId);
-
-  // NOTE: This continuation is only applicable when launching a
-  // container for a task.
-  process::Future<pid_t> ___launch(
-      const ContainerID& containerId);
-
-  // NOTE: This continuation is only applicable when launching a
-  // container for an executor.
-  process::Future<Docker::Container> ____launch(
-      const ContainerID& containerId);
-
-  // NOTE: This continuation is only applicable when launching a
-  // container for an executor.
-  process::Future<pid_t> _____launch(
-      const ContainerID& containerId,
-      const Docker::Container& container);
-
-  process::Future<bool> ______launch(
-    const ContainerID& containerId,
-    pid_t pid);
-
-  void _destroy(
-      const ContainerID& containerId,
-      bool killed);
-
-  void __destroy(
-      const ContainerID& containerId,
-      bool killed,
-      const Future<Nothing>& future);
-
-  void ___destroy(
-      const ContainerID& containerId,
-      bool killed,
-      const Future<Option<int> >& status);
-
-  process::Future<Nothing> _update(
-      const ContainerID& containerId,
-      const Resources& resources,
-      const Docker::Container& container);
-
-  process::Future<Nothing> __update(
-      const ContainerID& containerId,
-      const Resources& resources,
-      pid_t pid);
-
-  Future<ResourceStatistics> _usage(
-      const ContainerID& containerId,
-      const Docker::Container& container);
-
-  Future<ResourceStatistics> __usage(
-      const ContainerID& containerId,
-      pid_t pid);
-
-  // Call back for when the executor exits. This will trigger
-  // container destroy.
-  void reaped(const ContainerID& containerId);
-
-  // Removes the docker container.
-  void remove(const std::string& container);
-
-  const Flags flags;
-
-  Shared<Docker> docker;
-
-  struct Container
-  {
-    static Try<Container*> create(
-        const ContainerID& id,
-        const Option<TaskInfo>& taskInfo,
-        const ExecutorInfo& executorInfo,
-        const std::string& directory,
-        const Option<std::string>& user,
-        const SlaveID& slaveId,
-        const PID<Slave>& slavePid,
-        bool checkpoint,
-        const Flags& flags);
-
-    Container(const ContainerID& id)
-      : state(FETCHING), id(id) {}
-
-    Container(const ContainerID& id,
-              const Option<TaskInfo>& taskInfo,
-              const ExecutorInfo& executorInfo,
-              const std::string& directory,
-              const Option<std::string>& user,
-              const SlaveID& slaveId,
-              const PID<Slave>& slavePid,
-              bool checkpoint,
-              bool symlinked,
-              const Flags& flags)
-      : state(FETCHING),
-        id(id),
-        task(taskInfo),
-        executor(executorInfo),
-        directory(directory),
-        user(user),
-        slaveId(slaveId),
-        slavePid(slavePid),
-        checkpoint(checkpoint),
-        symlinked(symlinked),
-        flags(flags)
-    {
-      if (task.isSome()) {
-        resources = task.get().resources();
-      } else {
-        resources = executor.resources();
-      }
-    }
-
-    ~Container()
-    {
-      if (symlinked) {
-        // The sandbox directory is a symlink, remove it at container
-        // destroy.
-        os::rm(directory);
-      }
-    }
-
-    std::string name()
-    {
-      return DOCKER_NAME_PREFIX + stringify(id);
-    }
-
-    std::string image() const
-    {
-      if (task.isSome()) {
-        return task.get().container().docker().image();
-      }
-
-      return executor.container().docker().image();
-    }
-
-    ContainerInfo container() const
-    {
-      if (task.isSome()) {
-        return task.get().container();
-      }
-
-      return executor.container();
-    }
-
-    CommandInfo command() const
-    {
-      if (task.isSome()) {
-        return task.get().command();
-      }
-
-      return executor.command();
-    }
-
-    // Returns any extra environment varaibles to set when launching
-    // the Docker container (beyond the those found in CommandInfo).
-    std::map<std::string, std::string> environment() const
-    {
-      if (task.isNone()) {
-        return executorEnvironment(
-            executor,
-            directory,
-            slaveId,
-            slavePid,
-            checkpoint,
-            flags.recovery_timeout);
-      }
-
-      return std::map<std::string, std::string>();
-    }
-
-    // The DockerContainerier needs to be able to properly clean up
-    // Docker containers, regardless of when they are destroyed. For
-    // example, if a container gets destroyed while we are fetching,
-    // we need to not keep running the fetch, nor should we try and
-    // start the Docker container. For this reason, we've split out
-    // the states into:
-    //
-    //     FETCHING
-    //     PULLING
-    //     RUNNING
-    //     DESTROYING
-    //
-    // In particular, we made 'PULLING' be it's own state so that we
-    // could easily destroy and cleanup when a user initiated pulling
-    // a really big image but we timeout due to the executor
-    // registration timeout. Since we curently have no way to discard
-    // a Docker::run, we needed to explicitely do the pull (which is
-    // the part that takes the longest) so that we can also explicitly
-    // kill it when asked. Once the functions at Docker::* get support
-    // for discarding, then we won't need to make pull be it's own
-    // state anymore, although it doesn't hurt since it gives us
-    // better error messages.
-    enum State {
-      FETCHING = 1,
-      PULLING = 2,
-      RUNNING = 3,
-      DESTROYING = 4
-    } state;
-
-    ContainerID id;
-    Option<TaskInfo> task;
-    ExecutorInfo executor;
-
-    // The sandbox directory for the container. This holds the
-    // symlinked path if symlinked boolean is true.
-    std::string directory;
-
-    Option<std::string> user;
-    SlaveID slaveId;
-    PID<Slave> slavePid;
-    bool checkpoint;
-    bool symlinked;
-    Flags flags;
-
-    // Promise for future returned from wait().
-    Promise<containerizer::Termination> termination;
-
-    // Exit status of executor or container (depending on whether or
-    // not we used the command executor). Represented as a promise so
-    // that destroying can chain with it being set.
-    Promise<Future<Option<int> > > status;
-
-    // Future that tells us whether or not the run is still pending or
-    // has failed so we know whether or not to wait for 'status'.
-    Future<Nothing> run;
-
-    // We keep track of the resources for each container so we can set
-    // the ResourceStatistics limits in usage(). Note that this is
-    // different than just what we might get from TaskInfo::resources
-    // or ExecutorInfo::resources because they can change dynamically.
-    Resources resources;
-
-    // The mesos-fetcher subprocess, kept around so that we can do a
-    // killtree on it if we're asked to destroy a container while we
-    // are fetching.
-    Option<Subprocess> fetcher;
-
-    // The docker pull future is stored so we can discard when
-    // destroy is called while docker is pulling the image.
-    Future<Docker::Image> pull;
-
-    // Once the container is running, this saves the pid of the
-    // running container.
-    Option<pid_t> pid;
-  };
-
-  hashmap<ContainerID, Container*> containers_;
-};
-
-
 // Parse the ContainerID from a Docker container and return None if
 // the container was not launched from Mesos.
 Option<ContainerID> parse(const Docker::Container& container)
@@ -438,19 +115,26 @@ Try<DockerContainerizer*> DockerContainerizer::create(const Flags& flags)
 
 
 DockerContainerizer::DockerContainerizer(
+    const Owned<DockerContainerizerProcess>& _process)
+  : process(_process)
+{
+  spawn(process.get());
+}
+
+
+DockerContainerizer::DockerContainerizer(
     const Flags& flags,
     Shared<Docker> docker)
+  : process(new DockerContainerizerProcess(flags, docker))
 {
-  process = new DockerContainerizerProcess(flags, docker);
-  spawn(process);
+  spawn(process.get());
 }
 
 
 DockerContainerizer::~DockerContainerizer()
 {
-  terminate(process);
-  process::wait(process);
-  delete process;
+  terminate(process.get());
+  process::wait(process.get());
 }
 
 
@@ -644,7 +328,7 @@ Try<Nothing> DockerContainerizerProcess::checkpoint(
 Future<Nothing> DockerContainerizer::recover(
     const Option<SlaveState>& state)
 {
-  return dispatch(process, &DockerContainerizerProcess::recover, state);
+  return dispatch(process.get(), &DockerContainerizerProcess::recover, state);
 }
 
 
@@ -657,15 +341,16 @@ Future<bool> DockerContainerizer::launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  return dispatch(process,
-                  &DockerContainerizerProcess::launch,
-                  containerId,
-                  executorInfo,
-                  directory,
-                  user,
-                  slaveId,
-                  slavePid,
-                  checkpoint);
+  return dispatch(
+      process.get(),
+      &DockerContainerizerProcess::launch,
+      containerId,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint);
 }
 
 
@@ -679,16 +364,17 @@ Future<bool> DockerContainerizer::launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  return dispatch(process,
-                  &DockerContainerizerProcess::launch,
-                  containerId,
-                  taskInfo,
-                  executorInfo,
-                  directory,
-                  user,
-                  slaveId,
-                  slavePid,
-                  checkpoint);
+  return dispatch(
+      process.get(),
+      &DockerContainerizerProcess::launch,
+      containerId,
+      taskInfo,
+      executorInfo,
+      directory,
+      user,
+      slaveId,
+      slavePid,
+      checkpoint);
 }
 
 
@@ -696,36 +382,46 @@ Future<Nothing> DockerContainerizer::update(
     const ContainerID& containerId,
     const Resources& resources)
 {
-  return dispatch(process,
-                  &DockerContainerizerProcess::update,
-                  containerId,
-                  resources);
+  return dispatch(
+      process.get(),
+      &DockerContainerizerProcess::update,
+      containerId,
+      resources);
 }
 
 
 Future<ResourceStatistics> DockerContainerizer::usage(
     const ContainerID& containerId)
 {
-  return dispatch(process, &DockerContainerizerProcess::usage, containerId);
+  return dispatch(
+      process.get(),
+      &DockerContainerizerProcess::usage,
+      containerId);
 }
 
 
 Future<containerizer::Termination> DockerContainerizer::wait(
     const ContainerID& containerId)
 {
-  return dispatch(process, &DockerContainerizerProcess::wait, containerId);
+  return dispatch(
+      process.get(),
+      &DockerContainerizerProcess::wait,
+      containerId);
 }
 
 
 void DockerContainerizer::destroy(const ContainerID& containerId)
 {
-  dispatch(process, &DockerContainerizerProcess::destroy, containerId, true);
+  dispatch(
+      process.get(),
+      &DockerContainerizerProcess::destroy,
+      containerId, true);
 }
 
 
 Future<hashset<ContainerID> > DockerContainerizer::containers()
 {
-  return dispatch(process, &DockerContainerizerProcess::containers);
+  return dispatch(process.get(), &DockerContainerizerProcess::containers);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index ec6b9cd..f9f3ffb 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -56,6 +56,10 @@ public:
       const Flags& flags,
       process::Shared<Docker> docker);
 
+  // This is only public for tests.
+  DockerContainerizer(
+      const process::Owned<DockerContainerizerProcess>& _process);
+
   virtual ~DockerContainerizer();
 
   virtual process::Future<Nothing> recover(
@@ -92,12 +96,336 @@ public:
 
   virtual void destroy(const ContainerID& containerId);
 
-  virtual process::Future<hashset<ContainerID> > containers();
+  virtual process::Future<hashset<ContainerID>> containers();
 
 private:
-  DockerContainerizerProcess* process;
+  process::Owned<DockerContainerizerProcess> process;
 };
 
+
+
+class DockerContainerizerProcess
+  : public process::Process<DockerContainerizerProcess>
+{
+public:
+  DockerContainerizerProcess(
+      const Flags& _flags,
+      process::Shared<Docker> _docker)
+    : flags(_flags),
+      docker(_docker) {}
+
+  virtual process::Future<Nothing> recover(
+      const Option<state::SlaveState>& state);
+
+  virtual process::Future<bool> launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<bool> launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const Option<std::string>& user,
+      const SlaveID& slaveId,
+      const process::PID<Slave>& slavePid,
+      bool checkpoint);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId);
+
+  virtual Future<containerizer::Termination> wait(
+      const ContainerID& containerId);
+
+  virtual void destroy(
+      const ContainerID& containerId,
+      bool killed = true); // process is either killed or reaped.
+
+  virtual process::Future<Nothing> fetch(const ContainerID& containerId);
+
+  virtual process::Future<Nothing> pull(
+      const ContainerID& containerId,
+      const std::string& directory,
+      const std::string& image);
+
+  virtual process::Future<hashset<ContainerID>> containers();
+
+private:
+  // Continuations and helpers.
+  process::Future<Nothing> _fetch(
+      const ContainerID& containerId,
+      const Option<int>& status);
+
+  process::Future<Nothing> _pull(const std::string& image);
+
+  Try<Nothing> checkpoint(
+      const ContainerID& containerId,
+      pid_t pid);
+
+  process::Future<Nothing> _recover(
+      const std::list<Docker::Container>& containers);
+
+  process::Future<Nothing> _launch(
+      const ContainerID& containerId);
+
+  process::Future<Nothing> __launch(
+      const ContainerID& containerId);
+
+  // NOTE: This continuation is only applicable when launching a
+  // container for a task.
+  process::Future<pid_t> ___launch(
+      const ContainerID& containerId);
+
+  // NOTE: This continuation is only applicable when launching a
+  // container for an executor.
+  process::Future<Docker::Container> ____launch(
+      const ContainerID& containerId);
+
+  // NOTE: This continuation is only applicable when launching a
+  // container for an executor.
+  process::Future<pid_t> _____launch(
+      const ContainerID& containerId,
+      const Docker::Container& container);
+
+  process::Future<bool> ______launch(
+    const ContainerID& containerId,
+    pid_t pid);
+
+  void _destroy(
+      const ContainerID& containerId,
+      bool killed);
+
+  void __destroy(
+      const ContainerID& containerId,
+      bool killed,
+      const Future<Nothing>& future);
+
+  void ___destroy(
+      const ContainerID& containerId,
+      bool killed,
+      const Future<Option<int>>& status);
+
+  process::Future<Nothing> _update(
+      const ContainerID& containerId,
+      const Resources& resources,
+      const Docker::Container& container);
+
+  process::Future<Nothing> __update(
+      const ContainerID& containerId,
+      const Resources& resources,
+      pid_t pid);
+
+  Future<ResourceStatistics> _usage(
+      const ContainerID& containerId,
+      const Docker::Container& container);
+
+  Future<ResourceStatistics> __usage(
+      const ContainerID& containerId,
+      pid_t pid);
+
+  // Call back for when the executor exits. This will trigger
+  // container destroy.
+  void reaped(const ContainerID& containerId);
+
+  // Removes the docker container.
+  void remove(const std::string& container);
+
+  const Flags flags;
+
+  process::Shared<Docker> docker;
+
+  struct Container
+  {
+    static Try<Container*> create(
+        const ContainerID& id,
+        const Option<TaskInfo>& taskInfo,
+        const ExecutorInfo& executorInfo,
+        const std::string& directory,
+        const Option<std::string>& user,
+        const SlaveID& slaveId,
+        const process::PID<Slave>& slavePid,
+        bool checkpoint,
+        const Flags& flags);
+
+    Container(const ContainerID& id)
+      : state(FETCHING), id(id) {}
+
+    Container(const ContainerID& id,
+              const Option<TaskInfo>& taskInfo,
+              const ExecutorInfo& executorInfo,
+              const std::string& directory,
+              const Option<std::string>& user,
+              const SlaveID& slaveId,
+              const process::PID<Slave>& slavePid,
+              bool checkpoint,
+              bool symlinked,
+              const Flags& flags)
+      : state(FETCHING),
+        id(id),
+        task(taskInfo),
+        executor(executorInfo),
+        directory(directory),
+        user(user),
+        slaveId(slaveId),
+        slavePid(slavePid),
+        checkpoint(checkpoint),
+        symlinked(symlinked),
+        flags(flags)
+    {
+      if (task.isSome()) {
+        resources = task.get().resources();
+      } else {
+        resources = executor.resources();
+      }
+    }
+
+    ~Container()
+    {
+      if (symlinked) {
+        // The sandbox directory is a symlink, remove it at container
+        // destroy.
+        os::rm(directory);
+      }
+    }
+
+    std::string name()
+    {
+      return DOCKER_NAME_PREFIX + stringify(id);
+    }
+
+    std::string image() const
+    {
+      if (task.isSome()) {
+        return task.get().container().docker().image();
+      }
+
+      return executor.container().docker().image();
+    }
+
+    ContainerInfo container() const
+    {
+      if (task.isSome()) {
+        return task.get().container();
+      }
+
+      return executor.container();
+    }
+
+    CommandInfo command() const
+    {
+      if (task.isSome()) {
+        return task.get().command();
+      }
+
+      return executor.command();
+    }
+
+    // Returns any extra environment varaibles to set when launching
+    // the Docker container (beyond the those found in CommandInfo).
+    std::map<std::string, std::string> environment() const
+    {
+      if (task.isNone()) {
+        return executorEnvironment(
+            executor,
+            directory,
+            slaveId,
+            slavePid,
+            checkpoint,
+            flags.recovery_timeout);
+      }
+
+      return std::map<std::string, std::string>();
+    }
+
+    // The DockerContainerier needs to be able to properly clean up
+    // Docker containers, regardless of when they are destroyed. For
+    // example, if a container gets destroyed while we are fetching,
+    // we need to not keep running the fetch, nor should we try and
+    // start the Docker container. For this reason, we've split out
+    // the states into:
+    //
+    //     FETCHING
+    //     PULLING
+    //     RUNNING
+    //     DESTROYING
+    //
+    // In particular, we made 'PULLING' be it's own state so that we
+    // could easily destroy and cleanup when a user initiated pulling
+    // a really big image but we timeout due to the executor
+    // registration timeout. Since we curently have no way to discard
+    // a Docker::run, we needed to explicitely do the pull (which is
+    // the part that takes the longest) so that we can also explicitly
+    // kill it when asked. Once the functions at Docker::* get support
+    // for discarding, then we won't need to make pull be it's own
+    // state anymore, although it doesn't hurt since it gives us
+    // better error messages.
+    enum State {
+      FETCHING = 1,
+      PULLING = 2,
+      RUNNING = 3,
+      DESTROYING = 4
+    } state;
+
+    ContainerID id;
+    Option<TaskInfo> task;
+    ExecutorInfo executor;
+
+    // The sandbox directory for the container. This holds the
+    // symlinked path if symlinked boolean is true.
+    std::string directory;
+
+    Option<std::string> user;
+    SlaveID slaveId;
+    process::PID<Slave> slavePid;
+    bool checkpoint;
+    bool symlinked;
+    Flags flags;
+
+    // Promise for future returned from wait().
+    Promise<containerizer::Termination> termination;
+
+    // Exit status of executor or container (depending on whether or
+    // not we used the command executor). Represented as a promise so
+    // that destroying can chain with it being set.
+    Promise<Future<Option<int>>> status;
+
+    // Future that tells us whether or not the run is still pending or
+    // has failed so we know whether or not to wait for 'status'.
+    Future<Nothing> run;
+
+    // We keep track of the resources for each container so we can set
+    // the ResourceStatistics limits in usage(). Note that this is
+    // different than just what we might get from TaskInfo::resources
+    // or ExecutorInfo::resources because they can change dynamically.
+    Resources resources;
+
+    // The mesos-fetcher subprocess, kept around so that we can do a
+    // killtree on it if we're asked to destroy a container while we
+    // are fetching.
+    Option<Subprocess> fetcher;
+
+    // The docker pull future is stored so we can discard when
+    // destroy is called while docker is pulling the image.
+    Future<Docker::Image> pull;
+
+    // Once the container is running, this saves the pid of the
+    // running container.
+    Option<pid_t> pid;
+  };
+
+  hashmap<ContainerID, Container*> containers_;
+};
+
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3bfb136e/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index 9d4ccc5..66552ad 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -21,6 +21,7 @@
 
 #include <process/future.hpp>
 #include <process/gmock.hpp>
+#include <process/owned.hpp>
 #include <process/subprocess.hpp>
 
 #include "linux/cgroups.hpp"
@@ -48,6 +49,7 @@ using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
 using mesos::internal::slave::DockerContainerizer;
+using mesos::internal::slave::DockerContainerizerProcess;
 
 using process::Future;
 using process::Message;
@@ -59,6 +61,7 @@ using std::list;
 using std::string;
 
 using testing::_;
+using testing::DoAll;
 using testing::DoDefault;
 using testing::Eq;
 using testing::Invoke;
@@ -164,6 +167,17 @@ public:
       Shared<Docker> docker)
     : DockerContainerizer(flags, docker)
   {
+    initialize();
+  }
+
+  MockDockerContainerizer(const Owned<DockerContainerizerProcess>& process)
+    : DockerContainerizer(process)
+  {
+    initialize();
+  }
+
+  void initialize()
+  {
     // NOTE: See TestContainerizer::setup for why we use
     // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
     // 'ON_CALL' and 'WillByDefault'.
@@ -259,6 +273,50 @@ public:
 };
 
 
+class MockDockerContainerizerProcess : public DockerContainerizerProcess
+{
+public:
+  MockDockerContainerizerProcess(
+      const slave::Flags& flags,
+      const Shared<Docker>& docker)
+    : DockerContainerizerProcess(flags, docker)
+  {
+    EXPECT_CALL(*this, fetch(_))
+      .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
+
+    EXPECT_CALL(*this, pull(_, _, _))
+      .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_pull));
+  }
+
+  MOCK_METHOD1(
+      fetch,
+      process::Future<Nothing>(const ContainerID& containerId));
+
+  MOCK_METHOD3(
+      pull,
+      process::Future<Nothing>(
+          const ContainerID& containerId,
+          const std::string& directory,
+          const std::string& image));
+
+  process::Future<Nothing> _fetch(const ContainerID& containerId)
+  {
+    return DockerContainerizerProcess::fetch(containerId);
+  }
+
+  process::Future<Nothing> _pull(
+      const ContainerID& containerId,
+      const std::string& directory,
+      const std::string& image)
+  {
+    return DockerContainerizerProcess::pull(
+        containerId,
+        directory,
+        image);
+  }
+};
+
+
 // Only enable executor launch on linux as other platforms
 // requires running linux VM and need special port forwarding
 // to get host networking to work.
@@ -2276,3 +2334,217 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
 
   Shutdown();
 }
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  // The docker containerizer will free the process, so we must
+  // allocate on the heap.
+  MockDockerContainerizerProcess* process =
+    new MockDockerContainerizerProcess(flags, docker);
+
+  MockDockerContainerizer dockerContainerizer(
+      (Owned<DockerContainerizerProcess>(process)));
+
+  Promise<Nothing> promise;
+  Future<Nothing> fetch;
+
+  // We want to pause the fetch call to simulate a long fetch time.
+  EXPECT_CALL(*process, fetch(_))
+    .WillOnce(DoAll(FutureSatisfy(&fetch),
+                    Return(promise.future())));
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> statusFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusFailed));
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+
+  AWAIT_READY(fetch);
+
+  dockerContainerizer.destroy(containerId.get());
+
+  // Resume docker launch.
+  promise.set(Nothing());
+
+  AWAIT_READY(statusFailed);
+
+  EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  // The docker containerizer will free the process, so we must
+  // allocate on the heap.
+  MockDockerContainerizerProcess* process =
+    new MockDockerContainerizerProcess(flags, docker);
+
+  MockDockerContainerizer dockerContainerizer(
+      (Owned<DockerContainerizerProcess>(process)));
+
+  Future<Nothing> fetch;
+  EXPECT_CALL(*process, fetch(_))
+    .WillOnce(DoAll(FutureSatisfy(&fetch),
+                    Return(Nothing())));
+
+  Promise<Nothing> promise;
+
+  // We want to pause the fetch call to simulate a long fetch time.
+  EXPECT_CALL(*process, pull(_, _, _))
+    .WillOnce(Return(promise.future()));
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<TaskStatus> statusFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusFailed));
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+
+  // Wait until fetch is finished.
+  AWAIT_READY(fetch);
+
+  dockerContainerizer.destroy(containerId.get());
+
+  // Resume docker launch.
+  promise.set(Nothing());
+
+  AWAIT_READY(statusFailed);
+
+  EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}