You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/08/16 03:31:34 UTC

[2/7] git commit: Introduced "states" for Docker containers to transition between.

Introduced "states" for Docker containers to transition between.

The DockerContainerier needs to be able to properly clean up Docker
containers, regardless of when they are destroyed. For example, if a
container gets destroyed while we are fetching, we need to not keep
running the fetch, nor should we try and start the Docker
container. For this reason, we've split out the states into:

  FETCHING
  PULLING
  RUNNING
  DESTROYING

In particular, we made 'PULLING' be it's own state so that we could
easily destroy and cleanup when a user initiated pulling a really big
image but we timeout due to the executor registration timeout. Since
we curently have no way to discard a Docker::run, we needed to
explicitely do the pull (which is the part that takes the longest) so
that we can assume we won't have to wait very long for Docker::run to
complete.

Review: https://reviews.apache.org/r/24754


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02a35ab2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02a35ab2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02a35ab2

Branch: refs/heads/master
Commit: 02a35ab213fb074f6c532075cada76f13eb9d552
Parents: f66fa52
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Aug 14 22:46:37 2014 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Aug 15 18:21:59 2014 -0700

----------------------------------------------------------------------
 src/docker/docker.cpp                    |  48 ++
 src/docker/docker.hpp                    |  11 +
 src/slave/containerizer/docker.cpp       | 755 ++++++++++++++++++--------
 src/slave/slave.cpp                      |   2 +-
 src/tests/docker_containerizer_tests.cpp |  11 +-
 5 files changed, 588 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/02a35ab2/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index ebdf226..9ae229a 100644
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -494,6 +494,54 @@ Future<Docker::Container> Docker::__inspect(const string& output)
 }
 
 
+Future<Nothing> Docker::logs(
+    const std::string& container,
+    const std::string& directory)
+{
+  // Redirect the logs into stdout/stderr.
+  //
+  // TODO(benh): This is an intermediate solution for now until we can
+  // reliably stream the logs either from the CLI or from the REST
+  // interface directly. The problem is that it's possible that the
+  // 'docker logs --follow' command will be started AFTER the
+  // container has already terminated, and thus it will continue
+  // running forever because the container has stopped. Unfortunately,
+  // when we later remove the container that still doesn't cause the
+  // 'logs' process to exit. Thus, we wait some period of time until
+  // after the container has terminated in order to let any log data
+  // get flushed, then we kill the 'logs' process ourselves.  A better
+  // solution would be to first "create" the container, then start
+  // following the logs, then finally "start" the container so that
+  // when the container terminates Docker will properly close the log
+  // stream and 'docker logs' will exit. For more information, please
+  // see: https://github.com/docker/docker/issues/7020
+
+  string logs =
+    "logs() {\n"
+    "  " + path + " logs --follow $1 &\n"
+    "  pid=$!\n"
+    "  " + path + " wait $1 >/dev/null 2>&1\n"
+    "  sleep 10" // Sleep 10 seconds to make sure the logs are flushed.
+    "  kill -TERM $pid >/dev/null 2>&1 &\n"
+    "}\n"
+    "logs " + container;
+
+  VLOG(1) << "Running " << logs;
+
+  Try<Subprocess> s = subprocess(
+      logs,
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH(path::join(directory, "stdout")),
+      Subprocess::PATH(path::join(directory, "stderr")));
+
+  if (s.isError()) {
+    return Failure("Unable to launch docker logs: " + s.error());
+  }
+
+  return Nothing();
+}
+
+
 Future<list<Docker::Container> > Docker::ps(
     bool all,
     const Option<string>& prefix) const

http://git-wip-us.apache.org/repos/asf/mesos/blob/02a35ab2/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index 3270c91..e7adedb 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -34,6 +34,8 @@
 
 
 // Abstraction for working with Docker (modeled on CLI).
+//
+// TODO(benh): Make futures returned by functions be discardable.
 class Docker
 {
 public:
@@ -93,6 +95,15 @@ public:
       bool all = false,
       const Option<std::string>& prefix = None()) const;
 
+  // Performs a 'docker logs --follow' and sends the output into a
+  // 'stderr' and 'stdout' file in the specfied directory.
+  //
+  // TODO(benh): Return the file descriptors, or some struct around
+  // them so others can do what they want with stdout/stderr.
+  process::Future<Nothing> logs(
+      const std::string& container,
+      const std::string& directory);
+
 private:
   // Uses the specified path to the Docker CLI tool.
   Docker(const std::string& _path) : path(_path) {};

http://git-wip-us.apache.org/repos/asf/mesos/blob/02a35ab2/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index d5292b6..ced0f92 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -19,7 +19,9 @@
 #include <list>
 #include <map>
 #include <string>
+#include <vector>
 
+#include <process/check.hpp>
 #include <process/defer.hpp>
 #include <process/io.hpp>
 #include <process/reap.hpp>
@@ -52,6 +54,7 @@
 using std::list;
 using std::map;
 using std::string;
+using std::vector;
 
 using namespace process;
 
@@ -120,6 +123,7 @@ public:
   virtual process::Future<hashset<ContainerID> > containers();
 
 private:
+  // Continuations and helpers.
   process::Future<Nothing> fetch(
       const ContainerID& containerId,
       const CommandInfo& commandInfo,
@@ -129,11 +133,18 @@ private:
       const ContainerID& containerId,
       const Option<int>& status);
 
-  process::Future<Nothing> _fetchFailed(
+  process::Future<Nothing> pull(
       const ContainerID& containerId,
-      const std::string& failure);
+      const std::string& directory,
+      const ContainerInfo::DockerInfo& dockerInfo);
+
+  process::Future<Nothing> _pull(
+      const Subprocess& s);
+
+  process::Future<Nothing> __pull(
+      const Subprocess& s,
+      const string& output);
 
-  // Continuations and helpers.
   process::Future<Nothing> _recover(
       const std::list<Docker::Container>& containers);
 
@@ -166,13 +177,32 @@ private:
   process::Future<bool> __launch(
       const ContainerID& containerId,
       const ExecutorInfo& executorInfo,
+      const string& directory,
       const SlaveID& slaveId,
       const PID<Slave>& slavePid,
       bool checkpoint);
 
   process::Future<bool> ___launch(
       const ContainerID& containerId,
+      const TaskInfo& taskInfo,
       const ExecutorInfo& executorInfo,
+      const std::string& directory,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint);
+
+  process::Future<bool> ___launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint);
+
+  process::Future<bool> ____launch(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
       const SlaveID& slaveId,
       const PID<Slave>& slavePid,
       bool checkpoint,
@@ -180,10 +210,14 @@ private:
 
   void _destroy(
       const ContainerID& containerId,
+      bool killed);
+
+  void __destroy(
+      const ContainerID& containerId,
       bool killed,
       const Future<Nothing>& future);
 
-  void __destroy(
+  void ___destroy(
       const ContainerID& containerId,
       bool killed,
       const Future<Option<int> >& status);
@@ -207,22 +241,69 @@ private:
 
   Docker docker;
 
-  // TODO(idownes): Consider putting these per-container variables into a
-  // struct.
-  // Promises for futures returned from wait().
-  hashmap<ContainerID,
-    process::Owned<process::Promise<containerizer::Termination> > > promises;
+  struct Container
+  {
+    Container(const ContainerID& id)
+      : state(FETCHING), id(id) {}
+
+    // The DockerContainerier needs to be able to properly clean up
+    // Docker containers, regardless of when they are destroyed. For
+    // example, if a container gets destroyed while we are fetching,
+    // we need to not keep running the fetch, nor should we try and
+    // start the Docker container. For this reason, we've split out
+    // the states into:
+    //
+    //     FETCHING
+    //     PULLING
+    //     RUNNING
+    //     DESTROYING
+    //
+    // In particular, we made 'PULLING' be it's own state so that we
+    // could easily destroy and cleanup when a user initiated pulling
+    // a really big image but we timeout due to the executor
+    // registration timeout. Since we curently have no way to discard
+    // a Docker::run, we needed to explicitely do the pull (which is
+    // the part that takes the longest) so that we can also explicitly
+    // kill it when asked. Once the functions at Docker::* get support
+    // for discarding, then we won't need to make pull be it's own
+    // state anymore, although it doesn't hurt since it gives us
+    // better error messages.
+    enum State {
+      FETCHING,
+      PULLING,
+      RUNNING,
+      DESTROYING
+    } state;
+
+    ContainerID id;
+
+    // Promise for future returned from wait().
+    Promise<containerizer::Termination> termination;
+
+    // Exit status of executor or container (depending on whether or
+    // not we used the command executor). Represented as a promise so
+    // that destroying can chain with it being set.
+    Promise<Future<Option<int> > > status;
+
+    // Future that tells us whether or not the run is still pending or
+    // has failed so we know whether or not to wait for 'status'.
+    Future<Nothing> run;
+
+    // We keep track of the resources for each container so we can set
+    // the ResourceStatistics limits in usage().
+    Resources resources;
 
-  // We need to keep track of the future exit status for each executor because
-  // we'll only get a single notification when the executor exits.
-  hashmap<ContainerID, process::Future<Option<int> > > statuses;
+    // The mesos-fetcher subprocess, kept around so that we can do a
+    // killtree on it if we're asked to destroy a container while we
+    // are fetching.
+    Option<Subprocess> fetcher;
 
-  // We keep track of the resources for each container so we can set the
-  // ResourceStatistics limits in usage().
-  hashmap<ContainerID, Resources> resources;
+    // The docker pull subprocess is stored so we can killtree the
+    // pid when destroy is called while docker is pulling the image.
+    Option<Subprocess> pull;
+  };
 
-  // Set of containers that are in process of being destroyed.
-  hashset<ContainerID> destroying;
+  hashmap<ContainerID, Container*> containers_;
 };
 
 
@@ -289,6 +370,7 @@ Future<Nothing> DockerContainerizerProcess::fetch(
 
   Result<string> realpath = os::realpath(
       path::join(flags.launcher_dir, "mesos-fetcher"));
+
   if (!realpath.isSome()) {
     LOG(ERROR) << "Failed to determine the canonical path "
                << "for the mesos-fetcher '"
@@ -308,9 +390,11 @@ Future<Nothing> DockerContainerizerProcess::fetch(
   VLOG(1) << "Starting to fetch URIs for container: " << containerId
           << ", directory: " << directory;
 
+  // NOTE: It's important that we create a pipe for the mesos-fetcher
+  // stdin so that when the slave exits it will terminate itself.
   Try<Subprocess> fetcher = subprocess(
       realpath.get(),
-      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
       Subprocess::PATH(path::join(directory, "stdout")),
       Subprocess::PATH(path::join(directory, "stderr")),
       fetcherEnv);
@@ -319,41 +403,109 @@ Future<Nothing> DockerContainerizerProcess::fetch(
     return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
   }
 
-  const Future<Option<int> >& status = fetcher.get().status();
+  containers_[containerId]->fetcher = fetcher.get();
 
-  return status
-    .onFailed(defer(self(), &Self::_fetchFailed, containerId, lambda::_1))
+  return fetcher.get().status()
     .then(defer(self(), &Self::_fetch, containerId, lambda::_1));
 }
 
 
-Future<Nothing> DockerContainerizerProcess::_fetchFailed(
+Future<Nothing> DockerContainerizerProcess::_fetch(
     const ContainerID& containerId,
-    const string& failure)
+    const Option<int>& status)
 {
-  containerizer::Termination termination;
-  termination.set_message("Fetch failed for container: " + failure);
-  promises[containerId]->set(termination);
-  promises.erase(containerId);
+  if (!status.isSome()) {
+    return Failure("No status available from fetch");
+  } else if (status.get() != 0) {
+    return Failure("Fetch exited with status " + WSTRINGIFY(status.get()));
+  }
+
   return Nothing();
 }
 
 
-Future<Nothing> DockerContainerizerProcess::_fetch(
+// TODO(benh): Move this into Docker::pull after we've correctly made
+// the futures returned from Docker::* functions be discardable.
+Future<Nothing> DockerContainerizerProcess::pull(
     const ContainerID& containerId,
-    const Option<int>& status)
+    const string& directory,
+    const ContainerInfo::DockerInfo& dockerInfo)
 {
-  if (!status.isSome()) {
-    return _fetchFailed(
-        containerId,
-        "No status available from fetch");
-  } else if (status.get() != 0) {
-    return _fetchFailed(
-        containerId,
-        "Fetch failed with status " + WSTRINGIFY(status.get()));
+  vector<string> argv;
+  argv.push_back(flags.docker);
+  argv.push_back("pull");
+
+  vector<string> parts = strings::split(dockerInfo.image(), ":");
+
+  if (parts.size() > 2) {
+    return Failure("Not expecting multiple ':' in image: " +
+                   dockerInfo.image());
   }
 
-  return Nothing();
+  if (parts.size() == 2) {
+    argv.push_back(dockerInfo.image());
+  } else {
+    argv.push_back(parts[0] + ":latest");
+  }
+
+  VLOG(1) << "Running " << strings::join(" ", argv);
+
+  map<string, string> environment;
+  environment["HOME"] = directory;
+
+  Try<Subprocess> s = subprocess(
+      flags.docker,
+      argv,
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      None(),
+      environment);
+
+  if (s.isError()) {
+    return Failure("Failed to execute 'docker pull': " + s.error());
+  }
+
+  containers_[containerId]->pull = s.get();
+
+  return s.get().status()
+    .then(defer(self(), &Self::_pull, s.get()));
+}
+
+
+Future<Nothing> DockerContainerizerProcess::_pull(
+    const Subprocess& s)
+{
+  CHECK_READY(s.status());
+
+  Option<int> status = s.status().get();
+
+  if (status.isSome() && status.get() == 0) {
+    return Nothing();
+  }
+
+  CHECK_SOME(s.err());
+  return io::read(s.err().get())
+    .then(defer(self(), &Self::__pull, s, lambda::_1));
+ }
+
+
+Future<Nothing> DockerContainerizerProcess::__pull(
+    const Subprocess& s,
+    const string& output)
+{
+  CHECK_READY(s.status());
+
+  Option<int> status = s.status().get();
+
+  if (status.isNone()) {
+    return Failure("No exit status available from 'docker pull': \n" + output);
+  }
+
+  CHECK_NE(0, status.get());
+
+  return Failure("Failed to execute 'docker pull', exited with status (" +
+                 WSTRINGIFY(status.get()) + "): \n" + output);
 }
 
 
@@ -539,17 +691,17 @@ Future<Nothing> DockerContainerizerProcess::recover(
                   << "' for executor '" << executor.id
                   << "' of framework " << framework.id;
 
-        // Save a termination promise.
-        Owned<Promise<containerizer::Termination> > promise(
-            new Promise<containerizer::Termination>());
+        // Create and store a container.
+        Container* container = new Container(containerId);
+        containers_[containerId] = container;
 
-        promises.put(containerId, promise);
+        container->state = Container::RUNNING;
 
         pid_t pid = run.get().forkedPid.get();
 
-        statuses[containerId] = process::reap(pid);
+        container->status.set(process::reap(pid));
 
-        statuses[containerId]
+        container->status.future().get()
           .onAny(defer(self(), &Self::reaped, containerId));
 
         if (pids.containsValue(pid)) {
@@ -577,9 +729,9 @@ Future<Nothing> DockerContainerizerProcess::recover(
 
 
 Future<Nothing> DockerContainerizerProcess::_recover(
-    const list<Docker::Container>& containers)
+    const list<Docker::Container>& _containers)
 {
-  foreach (const Docker::Container& container, containers) {
+  foreach (const Docker::Container& container, _containers) {
     VLOG(1) << "Checking if Docker container named '"
             << container.name << "' was started by Mesos";
 
@@ -595,7 +747,7 @@ Future<Nothing> DockerContainerizerProcess::_recover(
 
     // Check if we're watching an executor for this container ID and
     // if not, rm -f the Docker container.
-    if (!statuses.keys().contains(id.get())) {
+    if (!containers_.contains(id.get())) {
       // TODO(benh): Retry 'docker rm -f' if it failed but the container
       // still exists (asynchronously).
       docker.kill(container.id, true);
@@ -608,6 +760,7 @@ Future<Nothing> DockerContainerizerProcess::_recover(
 
 Future<bool> DockerContainerizerProcess::launch(
     const ContainerID& containerId,
+    const TaskInfo& taskInfo,
     const ExecutorInfo& executorInfo,
     const string& directory,
     const Option<string>& user,
@@ -615,11 +768,11 @@ Future<bool> DockerContainerizerProcess::launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  if (promises.contains(containerId)) {
+  if (containers_.contains(containerId)) {
     return Failure("Container already started");
   }
 
-  if (!executorInfo.has_container()) {
+  if (!taskInfo.has_container()) {
     LOG(INFO) << "No container info found, skipping launch";
     return false;
   }
@@ -632,67 +785,47 @@ Future<bool> DockerContainerizerProcess::launch(
   }
 
   LOG(INFO) << "Starting container '" << containerId
-            << "' for executor '" << executorInfo.executor_id()
-            << "' and framework '" << executorInfo.framework_id() << "'";
-
-  Owned<Promise<containerizer::Termination> > promise(
-    new Promise<containerizer::Termination>());
+            << "' for task '" << taskInfo.task_id()
+            << "' (and executor '" << executorInfo.executor_id()
+            << "') of framework '" << executorInfo.framework_id() << "'";
 
-  promises.put(containerId, promise);
+  Container* container = new Container(containerId);
+  containers_[containerId] = container;
 
-  return fetch(containerId, executorInfo.command(), directory)
+  return fetch(containerId, taskInfo.command(), directory)
     .then(defer(self(),
                 &Self::_launch,
                 containerId,
+                taskInfo,
                 executorInfo,
                 directory,
                 slaveId,
                 slavePid,
-                checkpoint));
+                checkpoint))
+    .onFailed(defer(self(), &Self::destroy, containerId, true));
 }
 
 
-Future<bool> DockerContainerizerProcess::launch(
+Future<bool> DockerContainerizerProcess::_launch(
     const ContainerID& containerId,
     const TaskInfo& taskInfo,
     const ExecutorInfo& executorInfo,
     const string& directory,
-    const Option<string>& user,
     const SlaveID& slaveId,
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  if (promises.contains(containerId)) {
-    return Failure("Container already started");
-  }
-
-  if (!taskInfo.has_container()) {
-    LOG(INFO) << "No container info found, skipping launch";
-    return false;
+  // Doing the fetch might have succeded but we were actually asked to
+  // destroy the container, which we did, so don't continue.
+  if (!containers_.contains(containerId)) {
+    return Failure("Container was destroyed while launching");
   }
 
-  ContainerInfo containerInfo = executorInfo.container();
+  containers_[containerId]->state = Container::PULLING;
 
-  if (containerInfo.type() != ContainerInfo::DOCKER) {
-    LOG(INFO) << "Skipping non-docker container";
-    return false;
-  }
-
-  LOG(INFO) << "Starting container '" << containerId
-            << "' for task '" << taskInfo.task_id()
-            << "' (and executor '" << executorInfo.executor_id()
-            << "') of framework '" << executorInfo.framework_id() << "'";
-
-  Owned<Promise<containerizer::Termination> > promise(
-    new Promise<containerizer::Termination>());
-
-  promises.put(containerId, promise);
-
-  const CommandInfo& command = taskInfo.command();
-
-  return fetch(containerId, command, directory)
+  return pull(containerId, directory, taskInfo.container().docker())
     .then(defer(self(),
-                &Self::_launch,
+                &Self::__launch,
                 containerId,
                 taskInfo,
                 executorInfo,
@@ -702,8 +835,7 @@ Future<bool> DockerContainerizerProcess::launch(
                 checkpoint));
 }
 
-
-Future<bool> DockerContainerizerProcess::_launch(
+Future<bool> DockerContainerizerProcess::__launch(
     const ContainerID& containerId,
     const TaskInfo& taskInfo,
     const ExecutorInfo& executorInfo,
@@ -712,44 +844,35 @@ Future<bool> DockerContainerizerProcess::_launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  // Store the resources for usage().
-  resources.put(containerId, taskInfo.resources());
-
-  const ContainerInfo& container = taskInfo.container();
-
-  // Extract the Docker image.
-  string image = container.docker().image();
-
-  // Construct the Docker container name.
-  string name = containerName(containerId);
+  if (!containers_.contains(containerId)) {
+    return Failure("Container was destroyed while pulling image");
+  }
 
-  const CommandInfo& command = taskInfo.command();
+  containers_[containerId]->state = Container::RUNNING;
 
-  // Start a docker container and docker logs then launch the executor
-  // (but destroy the Docker container if launching the executor
-  // failed). Docker logs will automatically exit once the container
-  // is stopped.
-  return docker.run(
-      container,
-      command,
-      name,
+  // Try and start the Docker container.
+  containers_[containerId]->run = docker.run(
+      taskInfo.container(),
+      taskInfo.command(),
+      containerName(containerId),
       directory,
       flags.docker_sandbox_directory,
-      taskInfo.resources())
+      taskInfo.resources());
+
+  return containers_[containerId]->run
     .then(defer(self(),
-                &Self::__launch,
+                &Self::___launch,
                 containerId,
                 taskInfo,
                 executorInfo,
                 directory,
                 slaveId,
                 slavePid,
-                checkpoint))
-    .onFailed(defer(self(), &Self::destroy, containerId, false));
+                checkpoint));
 }
 
 
-Future<bool> DockerContainerizerProcess::__launch(
+Future<bool> DockerContainerizerProcess::___launch(
     const ContainerID& containerId,
     const TaskInfo& taskInfo,
     const ExecutorInfo& executorInfo,
@@ -758,6 +881,10 @@ Future<bool> DockerContainerizerProcess::__launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
+  // After we do Docker::run we shouldn't remove a container until
+  // after we set 'status', which we do in this function.
+  CHECK(containers_.contains(containerId));
+
   // Prepare environment variables for the executor.
   map<string, string> env = executorEnvironment(
       executorInfo,
@@ -835,50 +962,67 @@ Future<bool> DockerContainerizerProcess::__launch(
     return Failure("Failed to synchronize with child process: " + error);
   }
 
+  // Store the resources for usage().
+  containers_[containerId]->resources = taskInfo.resources();
+
   // And finally watch for when the executor gets reaped.
-  statuses[containerId] = process::reap(s.get().pid());
+  containers_[containerId]->status.set(process::reap(s.get().pid()));
 
-  statuses[containerId]
+  containers_[containerId]->status.future().get()
     .onAny(defer(self(), &Self::reaped, containerId));
 
-  // Redirect the logs into stdout/stderr.
-  //
-  // TODO(benh): This is an intermediate solution for now until we can
-  // reliably stream the logs either from the CLI or from the REST
-  // interface directly. The problem is that it's possible that the
-  // 'docker logs --follow' command will be started AFTER the
-  // container has already terminated, and thus it will continue
-  // running forever because the container has stopped. Unfortunately,
-  // when we later remove the container that still doesn't cause the
-  // 'logs' process to exit. Thus, we wait some period of time until
-  // after the container has terminated in order to let any log data
-  // get flushed, then we kill the 'logs' process ourselves.  A better
-  // solution would be to first "create" the container, then start
-  // following the logs, then finally "start" the container so that
-  // when the container terminates Docker will properly close the log
-  // stream and 'docker logs' will exit. For more information, please
-  // see: https://github.com/docker/docker/issues/7020
-
-  string logs =
-    "logs() {\n"
-    "  " + flags.docker + " logs --follow $1 &\n"
-    "  pid=$!\n"
-    "  " + flags.docker + " wait $1 >/dev/null 2>&1\n"
-    "  sleep 10" // Sleep 10 seconds to make sure the logs are flushed.
-    "  kill -TERM $pid >/dev/null 2>&1 &\n"
-    "}\n"
-    "logs " + containerName(containerId);
-
-  subprocess(
-      logs,
-      Subprocess::PATH("/dev/null"),
-      Subprocess::PATH(path::join(directory, "stdout")),
-      Subprocess::PATH(path::join(directory, "stderr")));
+  // TODO(benh): Check failure of Docker::logs.
+  docker.logs(containerName(containerId), directory);
 
   return true;
 }
 
 
+Future<bool> DockerContainerizerProcess::launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const Option<string>& user,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  if (containers_.contains(containerId)) {
+    return Failure("Container already started");
+  }
+
+  if (!executorInfo.has_container()) {
+    LOG(INFO) << "No container info found, skipping launch";
+    return false;
+  }
+
+  ContainerInfo containerInfo = executorInfo.container();
+
+  if (containerInfo.type() != ContainerInfo::DOCKER) {
+    LOG(INFO) << "Skipping non-docker container";
+    return false;
+  }
+
+  LOG(INFO) << "Starting container '" << containerId
+            << "' for executor '" << executorInfo.executor_id()
+            << "' and framework '" << executorInfo.framework_id() << "'";
+
+  Container* container = new Container(containerId);
+  containers_[containerId] = container;
+
+  return fetch(containerId, executorInfo.command(), directory)
+    .then(defer(self(),
+                &Self::_launch,
+                containerId,
+                executorInfo,
+                directory,
+                slaveId,
+                slavePid,
+                checkpoint))
+    .onFailed(defer(self(), &Self::destroy, containerId, true));
+}
+
+
 Future<bool> DockerContainerizerProcess::_launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
@@ -887,13 +1031,39 @@ Future<bool> DockerContainerizerProcess::_launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  Owned<Promise<containerizer::Termination> > promise(
-      new Promise<containerizer::Termination>());
+  // Doing the fetch might have succeded but we were actually asked to
+  // destroy the container, which we did, so don't continue.
+  if (!containers_.contains(containerId)) {
+    return Failure("Container was destroyed while launching");
+  }
 
-  promises.put(containerId, promise);
+  containers_[containerId]->state = Container::PULLING;
 
-  // Construct the Docker container name.
-  string name = containerName(containerId);
+  return pull(containerId, directory, executorInfo.container().docker())
+    .then(defer(self(),
+                &Self::__launch,
+                containerId,
+                executorInfo,
+                directory,
+                slaveId,
+                slavePid,
+                checkpoint));
+}
+
+
+Future<bool> DockerContainerizerProcess::__launch(
+    const ContainerID& containerId,
+    const ExecutorInfo& executorInfo,
+    const string& directory,
+    const SlaveID& slaveId,
+    const PID<Slave>& slavePid,
+    bool checkpoint)
+{
+  if (!containers_.contains(containerId)) {
+    return Failure("Container was destroyed while pulling image");
+  }
+
+  containers_[containerId]->state = Container::RUNNING;
 
   map<string, string> env = executorEnvironment(
       executorInfo,
@@ -909,43 +1079,44 @@ Future<bool> DockerContainerizerProcess::_launch(
     env[variable.name()] = variable.value();
   }
 
-  const ContainerInfo& container = executorInfo.container();
-  const CommandInfo& command = executorInfo.command();
-
-  // Start a docker container which is an executor and docker logs.
-  // Docker logs will automatically exit once the container is
-  // stopped.
-  return docker.run(
-      container,
-      command,
-      name,
+  // Try and start the Docker container.
+  containers_[containerId]->run = docker.run(
+      executorInfo.container(),
+      executorInfo.command(),
+      containerName(containerId),
       directory,
       flags.docker_sandbox_directory,
       None(),
-      env)
+      env);
+
+  return containers_[containerId]->run
     .then(defer(self(),
-                &Self::__launch,
+                &Self::___launch,
                 containerId,
                 executorInfo,
+                directory,
                 slaveId,
                 slavePid,
-                checkpoint))
-    .onFailed(defer(self(), &Self::destroy, containerId, false));
+                checkpoint));
 }
 
 
-Future<bool> DockerContainerizerProcess::__launch(
+Future<bool> DockerContainerizerProcess::___launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
+    const string& directory,
     const SlaveID& slaveId,
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
+  // We shouldn't remove container until we set 'status'.
+  CHECK(containers_.contains(containerId));
   return docker.inspect(containerName(containerId))
      .then(defer(self(),
-                &Self::___launch,
+                &Self::____launch,
                 containerId,
                 executorInfo,
+                directory,
                 slaveId,
                 slavePid,
                 checkpoint,
@@ -953,14 +1124,19 @@ Future<bool> DockerContainerizerProcess::__launch(
 }
 
 
-Future<bool> DockerContainerizerProcess::___launch(
+Future<bool> DockerContainerizerProcess::____launch(
     const ContainerID& containerId,
     const ExecutorInfo& executorInfo,
+    const string& directory,
     const SlaveID& slaveId,
     const PID<Slave>& slavePid,
     bool checkpoint,
     const Docker::Container& container)
 {
+  // After we do Docker::run we shouldn't remove a container until
+  // after we set 'status', which we do in this function.
+  CHECK(containers_.contains(containerId));
+
   Option<int> pid = container.pid;
 
   if (!pid.isSome()) {
@@ -988,18 +1164,23 @@ Future<bool> DockerContainerizerProcess::___launch(
       slave::state::checkpoint(path, stringify(pid.get()));
 
     if (checkpointed.isError()) {
-      LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
-                 << path << "': " << checkpointed.error();
-
-      return Failure("Could not checkpoint executor's pid");
+      return Failure("Failed to checkpoint executor's forked pid to '"
+                     + path + "': " + checkpointed.error());
     }
   }
 
-  statuses[containerId] = process::reap(pid.get());
+  // Store the resources for usage().
+  containers_[containerId]->resources = executorInfo.resources();
+
+  // And finally watch for when the container gets reaped.
+  containers_[containerId]->status.set(process::reap(pid.get()));
 
-  statuses[containerId]
+  containers_[containerId]->status.future().get()
     .onAny(defer(self(), &Self::reaped, containerId));
 
+  // TODO(benh): Check failure of Docker::logs.
+  docker.logs(containerName(containerId), directory);
+
   return true;
 }
 
@@ -1008,14 +1189,14 @@ Future<Nothing> DockerContainerizerProcess::update(
     const ContainerID& containerId,
     const Resources& _resources)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     LOG(WARNING) << "Ignoring updating unknown container: "
                  << containerId;
     return Nothing();
   }
 
-  // Store the resources for usage()
-  resources.put(containerId, _resources);
+  // Store the resources for usage().
+  containers_[containerId]->resources = _resources;
 
 #ifdef __linux__
   if (!_resources.cpus().isSome() && !_resources.mem().isSome()) {
@@ -1174,17 +1355,17 @@ Future<ResourceStatistics> DockerContainerizerProcess::usage(
 #ifndef __linux__
   return Failure("Does not support usage() on non-linux platform");
 #else
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     return Failure("Unknown container: " + stringify(containerId));
   }
 
-  if (destroying.contains(containerId)) {
+  Container* container = containers_[containerId];
+
+  if (container->state == Container::DESTROYING) {
     return Failure("Container is being removed: " + stringify(containerId));
   }
 
-  // Construct the Docker container name.
-  string name = containerName(containerId);
-  return docker.inspect(name)
+  return docker.inspect(containerName(containerId))
     .then(defer(self(), &Self::_usage, containerId, lambda::_1));
 #endif // __linux__
 }
@@ -1192,9 +1373,19 @@ Future<ResourceStatistics> DockerContainerizerProcess::usage(
 
 Future<ResourceStatistics> DockerContainerizerProcess::_usage(
     const ContainerID& containerId,
-    const Docker::Container& container)
+    const Docker::Container& _container)
 {
-  Option<pid_t> pid = container.pid;
+  if (!containers_.contains(containerId)) {
+    return Failure("Container has been destroyed:" + stringify(containerId));
+  }
+
+  Container* container = containers_[containerId];
+
+  if (container->state == Container::DESTROYING) {
+    return Failure("Container is being removed: " + stringify(containerId));
+  }
+
+  Option<pid_t> pid = _container.pid;
   if (pid.isNone()) {
     return Failure("Container is not running");
   }
@@ -1211,7 +1402,7 @@ Future<ResourceStatistics> DockerContainerizerProcess::_usage(
   ResourceStatistics result = statistics.get();
 
   // Set the resource allocations.
-  Resources resource = resources[containerId];
+  const Resources& resource = container->resources;
   Option<Bytes> mem = resource.mem();
   if (mem.isSome()) {
     result.set_mem_limit_bytes(mem.get().bytes());
@@ -1229,11 +1420,11 @@ Future<ResourceStatistics> DockerContainerizerProcess::_usage(
 Future<containerizer::Termination> DockerContainerizerProcess::wait(
     const ContainerID& containerId)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     return Failure("Unknown container: " + stringify(containerId));
   }
 
-  return promises[containerId]->future();
+  return containers_[containerId]->termination.future();
 }
 
 
@@ -1241,102 +1432,210 @@ void DockerContainerizerProcess::destroy(
     const ContainerID& containerId,
     bool killed)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
     return;
   }
 
-  if (destroying.contains(containerId)) {
-    // Destroy has already been initiated.
+  Container* container = containers_[containerId];
+
+  if (container->run.isFailed()) {
+    VLOG(1) << "Container '" << containerId << "' run failed";
+
+    // This means we failed to do Docker::run and we're trying to
+    // cleanup (or someone happens to have asked to destroy this
+    // container before the destroy that we enqueued has had a chance
+    // to get executed, which when it does, will just be skipped).
+    CHECK_PENDING(container->status.future());
+    containerizer::Termination termination;
+    termination.set_killed(killed);
+    termination.set_message(
+        "Failed to run container: " + container->run.failure());
+    container->termination.set(termination);
+
+    containers_.erase(containerId);
+    delete container;
+
     return;
   }
 
-  destroying.insert(containerId);
+  if (container->state == Container::DESTROYING) {
+    // Destroy has already been initiated.
+    return;
+  }
 
   LOG(INFO) << "Destroying container '" << containerId << "'";
 
-  // Do a 'docker rm -f' which we'll then find out about in '_wait'
+  // It's possible that destroy is getting called before
+  // DockerContainerizer::launch has completed (i.e., after we've
+  // returned a future but before we've completed the fetching of the
+  // URIs, or the Docker::run, or the wait, etc.).
+  //
+  // If we're FETCHING, we want to stop the fetching and then
+  // cleanup. Note, we need to make sure that we deal with the race
+  // with trying to terminate the fetcher so that even if the fetcher
+  // returns successfully we won't try to do a Docker::run.
+  //
+  // If we're PULLING, we want to terminate the 'docker pull' and then
+  // cleanup. Just as above, we'll need to deal with the race with
+  // 'docker pull' returning successfully.
+  //
+  // If we're RUNNING, we want to wait for the status to get set, then
+  // do a Docker::kill, then wait for the status to complete, then
+  // cleanup.
+
+  if (container->state == Container::FETCHING) {
+    LOG(INFO) << "Destroying Container '"
+              << containerId << "' in FETCHING state";
+
+    if (container->fetcher.isSome()) {
+      // Best effort kill the entire fetcher tree.
+      os::killtree(container->fetcher.get().pid(), SIGKILL);
+    }
+
+    containerizer::Termination termination;
+    termination.set_killed(killed);
+    termination.set_message("Container destroyed while fetching");
+    container->termination.set(termination);
+
+    // Even if the fetch succeeded just before we did the killtree,
+    // removing the container here means that we won't proceed with
+    // the Docker::run.
+    containers_.erase(containerId);
+    delete container;
+
+    return;
+  }
+
+  if (container->state == Container::PULLING) {
+    LOG(INFO) << "Destroying Container '"
+              << containerId << "' in PULLING state";
+
+    if (container->pull.isSome()) {
+      os::killtree(container->pull.get().pid(), SIGKILL);
+    }
+
+    containerizer::Termination termination;
+    termination.set_killed(killed);
+    termination.set_message("Container destroyed while pulling image");
+    container->termination.set(termination);
+
+    containers_.erase(containerId);
+    delete container;
+
+    return;
+  }
+
+  CHECK(container->state == Container::RUNNING);
+
+  container->state = Container::DESTROYING;
+
+  // Otherwise, wait for Docker::run to succeed, in which case we'll
+  // continue in _destroy (calling Docker::kill) or for Docker::run to
+  // fail, in which case we'll re-execute this function and cleanup
+  // above.
+
+  container->status.future()
+    .onAny(defer(self(), &Self::_destroy, containerId, killed));
+}
+
+
+void DockerContainerizerProcess::_destroy(
+    const ContainerID& containerId,
+    bool killed)
+{
+  CHECK(containers_.contains(containerId));
+
+  Container* container = containers_[containerId];
+
+  CHECK(container->state == Container::DESTROYING);
+
+  // Do a 'docker rm -f' which we'll then find out about in '_destroy'
   // after we've reaped either the container's root process (in the
   // event that we had just launched a container for an executor) or
   // the mesos-executor (in the case we launched a container for a
   // task). As a reminder, the mesos-executor exits because it's doing
   // a 'docker wait' on the container using the --override flag of
   // mesos-executor.
-  //
-  // NOTE: We might not actually have a container or mesos-executor
-  // running (which we could check by looking if 'containerId' is a
-  // key in 'statuses'). If that is the case then we're doing a
-  // destroy because we failed to launch (see defer at bottom of
-  // 'launch'). We try and destroy regardless for now, just to be
-  // safe.
 
   // TODO(benh): Retry 'docker rm -f' if it failed but the container
   // still exists (asynchronously).
+
+  LOG(INFO) << "Running docker kill on container '" << containerId << "'";
+
   docker.kill(containerName(containerId), true)
-    .onAny(defer(self(), &Self::_destroy, containerId, killed, lambda::_1));
+    .onAny(defer(self(), &Self::__destroy, containerId, killed, lambda::_1));
 }
 
 
-void DockerContainerizerProcess::_destroy(
+void DockerContainerizerProcess::__destroy(
     const ContainerID& containerId,
     bool killed,
-    const Future<Nothing>& future)
+    const Future<Nothing>& kill)
 {
-  if (!future.isReady()) {
-    promises[containerId]->fail(
-        "Failed to destroy container: " +
-        (future.isFailed() ? future.failure() : "discarded future"));
+  CHECK(containers_.contains(containerId));
 
-    destroying.erase(containerId);
+  Container* container = containers_[containerId];
+
+  if (!kill.isReady()) {
+    // TODO(benh): This means we've failed to do a Docker::kill, which
+    // means it's possible that the container is still going to be
+    // running after we return! We either need to have a periodic
+    // "garbage collector", or we need to retry the Docker::kill
+    // indefinitely until it has been sucessful.
+    container->termination.fail(
+        "Failed to kill the Docker container: " +
+        (kill.isFailed() ? kill.failure() : "discarded future"));
+
+    containers_.erase(containerId);
+    delete container;
 
     return;
   }
 
-  // It's possible we've been asked to destroy a container that we
-  // aren't actually reaping any status because we failed to start the
-  // container in the first place (e.g., because we returned a Failure
-  // in 'launch' or '_launch'). In this case, we just put a None
-  // status in place so that the rest of the destroy workflow
-  // completes.
-  if (!statuses.contains(containerId)) {
-    statuses[containerId] = None();
-  }
+  // Status must be ready since we did a Docker::kill.
+  CHECK_READY(containers_[containerId]->status.future());
 
-  statuses[containerId]
-    .onAny(defer(self(), &Self::__destroy, containerId, killed, lambda::_1));
+  container->status.future().get()
+    .onAny(defer(self(), &Self::___destroy, containerId, killed, lambda::_1));
 }
 
 
-void DockerContainerizerProcess::__destroy(
+void DockerContainerizerProcess::___destroy(
     const ContainerID& containerId,
     bool killed,
     const Future<Option<int> >& status)
 {
+  CHECK(containers_.contains(containerId));
+
+  Container* container = containers_[containerId];
+
   containerizer::Termination termination;
   termination.set_killed(killed);
+
   if (status.isReady() && status.get().isSome()) {
     termination.set_status(status.get().get());
   }
-  termination.set_message(killed ?
-                          "Docker task killed" : "Docker process terminated");
 
-  promises[containerId]->set(termination);
+  termination.set_message(
+      killed ? "Container killed" : "Container terminated");
+
+  container->termination.set(termination);
 
-  destroying.erase(containerId);
-  promises.erase(containerId);
-  statuses.erase(containerId);
+  containers_.erase(containerId);
+  delete container;
 }
 
 
 Future<hashset<ContainerID> > DockerContainerizerProcess::containers()
 {
-  return promises.keys();
+  return containers_.keys();
 }
 
 
 void DockerContainerizerProcess::reaped(const ContainerID& containerId)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     return;
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/02a35ab2/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1eaab04..6253a5d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2204,7 +2204,7 @@ void Slave::_statusUpdate(
     LOG(ERROR) << "Failed to update resources for container " << containerId
                << " of executor " << executorId
                << " running task " << update.status().task_id()
-               << " on status update for terminal task, destroying container:"
+               << " on status update for terminal task, destroying container: "
                << (future.get().isFailed() ? future.get().failure()
                                            : "discarded");
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/02a35ab2/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index e0fd62f..60d9b2d 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -372,8 +372,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
 
   ASSERT_TRUE(exists(containers.get(), containerId.get()));
 
-  dockerContainerizer.destroy(containerId.get());
-
   driver.stop();
   driver.join();
 
@@ -596,7 +594,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
 
   AWAIT_READY(termination);
 
-  // Usage() should fail again since the container is destroyed
+  // Usage() should fail again since the container is destroyed.
   Future<ResourceStatistics> usage =
     dockerContainerizer.usage(containerId.get());
   AWAIT_FAILED(usage);
@@ -729,13 +727,6 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
   EXPECT_EQ(1024u, cpu.get());
   EXPECT_EQ(128u, mem.get().megabytes());
 
-  Future<containerizer::Termination> termination =
-    dockerContainerizer.wait(containerId.get());
-
-  dockerContainerizer.destroy(containerId.get());
-
-  AWAIT_READY(termination);
-
   driver.stop();
   driver.join();