You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/07/25 01:38:03 UTC

[10/12] mesos git commit: Moved containerizer related tests under src/tests/containerizer.

http://git-wip-us.apache.org/repos/asf/mesos/blob/96351372/src/tests/containerizer/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp
new file mode 100644
index 0000000..80ed60e
--- /dev/null
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -0,0 +1,2955 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/owned.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/duration.hpp>
+
+#include "linux/cgroups.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/flags.hpp"
+#include "tests/mesos.hpp"
+
+#include "slave/containerizer/docker.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+#include "slave/state.hpp"
+
+using namespace mesos::internal::slave::paths;
+using namespace mesos::internal::slave::state;
+
+using namespace process;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::DockerContainerizer;
+using mesos::internal::slave::DockerContainerizerProcess;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::Slave;
+
+using process::Future;
+using process::Message;
+using process::PID;
+using process::UPID;
+
+using std::vector;
+using std::list;
+using std::string;
+
+using testing::_;
+using testing::DoAll;
+using testing::DoDefault;
+using testing::Eq;
+using testing::Invoke;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+
+class MockDocker : public Docker
+{
+public:
+  MockDocker(const string& path) : Docker(path)
+  {
+    EXPECT_CALL(*this, pull(_, _, _))
+      .WillRepeatedly(Invoke(this, &MockDocker::_pull));
+
+    EXPECT_CALL(*this, stop(_, _, _))
+      .WillRepeatedly(Invoke(this, &MockDocker::_stop));
+
+    EXPECT_CALL(*this, run(_, _, _, _, _, _, _, _, _))
+      .WillRepeatedly(Invoke(this, &MockDocker::_run));
+
+    EXPECT_CALL(*this, inspect(_, _))
+      .WillRepeatedly(Invoke(this, &MockDocker::_inspect));
+  }
+
+  MOCK_CONST_METHOD9(
+      run,
+      process::Future<Nothing>(
+          const mesos::ContainerInfo&,
+          const mesos::CommandInfo&,
+          const std::string&,
+          const std::string&,
+          const std::string&,
+          const Option<mesos::Resources>&,
+          const Option<std::map<std::string, std::string>>&,
+          const Option<std::string>&,
+          const Option<std::string>&));
+
+  MOCK_CONST_METHOD3(
+      pull,
+      process::Future<Docker::Image>(
+          const string&,
+          const string&,
+          bool));
+
+  MOCK_CONST_METHOD3(
+      stop,
+      process::Future<Nothing>(
+          const string&,
+          const Duration&,
+          bool));
+
+  MOCK_CONST_METHOD2(
+      inspect,
+      process::Future<Docker::Container>(
+          const string&,
+          const Option<Duration>&));
+
+  process::Future<Nothing> _run(
+      const mesos::ContainerInfo& containerInfo,
+      const mesos::CommandInfo& commandInfo,
+      const std::string& name,
+      const std::string& sandboxDirectory,
+      const std::string& mappedDirectory,
+      const Option<mesos::Resources>& resources,
+      const Option<std::map<std::string, std::string>>& env,
+      const Option<std::string>& stdoutPath,
+      const Option<std::string>& stderrPath) const
+  {
+    return Docker::run(
+        containerInfo,
+        commandInfo,
+        name,
+        sandboxDirectory,
+        mappedDirectory,
+        resources,
+        env,
+        stdoutPath,
+        stderrPath);
+  }
+
+  process::Future<Docker::Image> _pull(
+      const string& directory,
+      const string& image,
+      bool force) const
+  {
+    return Docker::pull(directory, image, force);
+  }
+
+  process::Future<Nothing> _stop(
+      const string& containerName,
+      const Duration& timeout,
+      bool remove) const
+  {
+    return Docker::stop(containerName, timeout, remove);
+  }
+
+  process::Future<Docker::Container> _inspect(
+      const string& containerName,
+      const Option<Duration>& retryInterval)
+  {
+    return Docker::inspect(containerName, retryInterval);
+  }
+};
+
+
+class DockerContainerizerTest : public MesosTest
+{
+public:
+  static string containerName(
+      const SlaveID& slaveId,
+      const ContainerID& containerId)
+  {
+    return slave::DOCKER_NAME_PREFIX + slaveId.value() +
+      slave::DOCKER_NAME_SEPERATOR + containerId.value();
+  }
+
+  enum ContainerState {
+    EXISTS,
+    RUNNING
+  };
+
+  static bool exists(
+      const process::Shared<Docker>& docker,
+      const SlaveID& slaveId,
+      const ContainerID& containerId,
+      ContainerState state = ContainerState::EXISTS)
+  {
+    Duration waited = Duration::zero();
+    string expectedName = containerName(slaveId, containerId);
+
+    do {
+      Future<Docker::Container> inspect = docker->inspect(expectedName);
+
+      if (!inspect.await(Seconds(3))) {
+        return false;
+      }
+
+      if (inspect.isReady()) {
+        switch (state) {
+          case ContainerState::RUNNING:
+            if (inspect.get().pid.isSome()) {
+              return true;
+            }
+            // Retry looking for running pid until timeout.
+            break;
+          case ContainerState::EXISTS:
+            return true;
+        }
+      }
+
+      os::sleep(Milliseconds(200));
+      waited += Milliseconds(200);
+    } while (waited < Seconds(5));
+
+    return false;
+  }
+
+  static bool containsLine(
+    const vector<string>& lines,
+    const string& expectedLine)
+  {
+    foreach (const string& line, lines) {
+      if (line == expectedLine) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  virtual void TearDown()
+  {
+    Try<Docker*> docker = Docker::create(tests::flags.docker, false);
+    ASSERT_SOME(docker);
+    Future<list<Docker::Container>> containers =
+      docker.get()->ps(true, slave::DOCKER_NAME_PREFIX);
+
+    AWAIT_READY(containers);
+
+    // Cleanup all mesos launched containers.
+    foreach (const Docker::Container& container, containers.get()) {
+      AWAIT_READY_FOR(docker.get()->rm(container.id, true), Seconds(30));
+    }
+
+    delete docker.get();
+  }
+};
+
+
+class MockDockerContainerizer : public DockerContainerizer {
+public:
+  MockDockerContainerizer(
+      const slave::Flags& flags,
+      Fetcher* fetcher,
+      Shared<Docker> docker)
+    : DockerContainerizer(flags, fetcher, docker)
+  {
+    initialize();
+  }
+
+  MockDockerContainerizer(const Owned<DockerContainerizerProcess>& process)
+    : DockerContainerizer(process)
+  {
+    initialize();
+  }
+
+  void initialize()
+  {
+    // NOTE: See TestContainerizer::setup for why we use
+    // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+    // 'ON_CALL' and 'WillByDefault'.
+    EXPECT_CALL(*this, launch(_, _, _, _, _, _, _))
+      .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launchExecutor));
+
+    EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _))
+      .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launch));
+
+    EXPECT_CALL(*this, update(_, _))
+      .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_update));
+  }
+
+  MOCK_METHOD7(
+      launch,
+      process::Future<bool>(
+          const ContainerID&,
+          const ExecutorInfo&,
+          const std::string&,
+          const Option<std::string>&,
+          const SlaveID&,
+          const process::PID<slave::Slave>&,
+          bool checkpoint));
+
+  MOCK_METHOD8(
+      launch,
+      process::Future<bool>(
+          const ContainerID&,
+          const TaskInfo&,
+          const ExecutorInfo&,
+          const std::string&,
+          const Option<std::string>&,
+          const SlaveID&,
+          const process::PID<slave::Slave>&,
+          bool checkpoint));
+
+  MOCK_METHOD2(
+      update,
+      process::Future<Nothing>(
+          const ContainerID&,
+          const Resources&));
+
+  // Default 'launch' implementation (necessary because we can't just
+  // use &DockerContainerizer::launch with 'Invoke').
+  process::Future<bool> _launch(
+      const ContainerID& containerId,
+      const TaskInfo& taskInfo,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint)
+  {
+    return DockerContainerizer::launch(
+        containerId,
+        taskInfo,
+        executorInfo,
+        directory,
+        user,
+        slaveId,
+        slavePid,
+        checkpoint);
+  }
+
+  process::Future<bool> _launchExecutor(
+      const ContainerID& containerId,
+      const ExecutorInfo& executorInfo,
+      const string& directory,
+      const Option<string>& user,
+      const SlaveID& slaveId,
+      const PID<Slave>& slavePid,
+      bool checkpoint)
+  {
+    return DockerContainerizer::launch(
+        containerId,
+        executorInfo,
+        directory,
+        user,
+        slaveId,
+        slavePid,
+        checkpoint);
+  }
+
+  process::Future<Nothing> _update(
+      const ContainerID& containerId,
+      const Resources& resources)
+  {
+    return DockerContainerizer::update(
+        containerId,
+        resources);
+  }
+};
+
+
+class MockDockerContainerizerProcess : public DockerContainerizerProcess
+{
+public:
+  MockDockerContainerizerProcess(
+      const slave::Flags& flags,
+      Fetcher* fetcher,
+      const Shared<Docker>& docker)
+    : DockerContainerizerProcess(flags, fetcher, docker)
+  {
+    EXPECT_CALL(*this, fetch(_, _))
+      .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
+
+    EXPECT_CALL(*this, pull(_))
+      .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_pull));
+  }
+
+  MOCK_METHOD2(
+      fetch,
+      process::Future<Nothing>(
+          const ContainerID& containerId,
+          const SlaveID& slaveId));
+
+  MOCK_METHOD1(
+      pull,
+      process::Future<Nothing>(const ContainerID& containerId));
+
+  process::Future<Nothing> _fetch(
+      const ContainerID& containerId,
+      const SlaveID& slaveId)
+  {
+    return DockerContainerizerProcess::fetch(containerId, slaveId);
+  }
+
+  process::Future<Nothing> _pull(const ContainerID& containerId)
+  {
+    return DockerContainerizerProcess::pull(containerId);
+  }
+};
+
+
+// Only enable executor launch on linux as other platforms
+// requires running linux VM and need special port forwarding
+// to get host networking to work.
+#ifdef __linux__
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  ExecutorInfo executorInfo;
+  ExecutorID executorId;
+  executorId.set_value("e1");
+  executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+  CommandInfo command;
+  command.set_value("/bin/test-executor");
+  executorInfo.mutable_command()->CopyFrom(command);
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("tnachen/test-executor");
+
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+  executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launchExecutor)));
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  AWAIT_READY_FOR(statusFinished, Seconds(60));
+  EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(termination);
+
+  ASSERT_FALSE(
+    exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+  Shutdown();
+}
+
+
+// This test verifies that a custom executor can be launched and
+// registered with the slave with docker bridge network enabled.
+// We're assuming that the custom executor is registering it's public
+// ip instead of 0.0.0.0 or equivelent to the slave as that's the
+// default behavior for libprocess.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  ExecutorInfo executorInfo;
+  ExecutorID executorId;
+  executorId.set_value("e1");
+  executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+  CommandInfo command;
+  command.set_value("/bin/test-executor");
+  executorInfo.mutable_command()->CopyFrom(command);
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("tnachen/test-executor");
+  dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
+
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+  executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launchExecutor)));
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  AWAIT_READY_FOR(statusFinished, Seconds(60));
+  EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(termination);
+
+  ASSERT_FALSE(
+    exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+  Shutdown();
+}
+#endif // __linux__
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  ASSERT_TRUE(statusRunning.get().has_data());
+
+  Try<JSON::Array> parse = JSON::parse<JSON::Array>(statusRunning.get().data());
+  ASSERT_SOME(parse);
+
+  // Now verify that the Docker.NetworkSettings.IPAddress label is
+  // present.
+  ASSERT_TRUE(statusRunning.get().has_labels());
+  EXPECT_EQ(1, statusRunning.get().labels().labels().size());
+  EXPECT_EQ("Docker.NetworkSettings.IPAddress",
+            statusRunning.get().labels().labels(0).key());
+
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(termination);
+
+  ASSERT_FALSE(
+    exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+  Shutdown();
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  ASSERT_TRUE(
+    exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+  Future<TaskStatus> statusKilled;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusKilled));
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  driver.killTask(task.task_id());
+
+  AWAIT_READY(statusKilled);
+  EXPECT_EQ(TASK_KILLED, statusKilled.get().state());
+
+  AWAIT_READY(termination);
+
+  ASSERT_FALSE(
+    exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test tests DockerContainerizer::usage().
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = Option<string>("cpus:2;mem:1024");
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  // Run a CPU intensive command, so we can measure utime and stime later.
+  command.set_value("dd if=/dev/zero of=/dev/null");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  // We ignore all update calls to prevent resizing cgroup limits.
+  EXPECT_CALL(dockerContainerizer, update(_, _))
+    .WillRepeatedly(Return(Nothing()));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  // Verify the usage.
+  ResourceStatistics statistics;
+  Duration waited = Duration::zero();
+  do {
+    Future<ResourceStatistics> usage =
+      dockerContainerizer.usage(containerId.get());
+    // TODO(tnachen): Replace await with AWAIT_COMPLETED once
+    // implemented.
+    ASSERT_TRUE(usage.await(Seconds(3)));
+
+    if (usage.isReady()) {
+      statistics = usage.get();
+
+      if (statistics.cpus_user_time_secs() > 0 &&
+          statistics.cpus_system_time_secs() > 0) {
+        break;
+      }
+    }
+
+    os::sleep(Milliseconds(200));
+    waited += Milliseconds(200);
+  } while (waited < Seconds(3));
+
+  // Usage includes the executor resources.
+  EXPECT_EQ(2.0 + slave::DEFAULT_EXECUTOR_CPUS, statistics.cpus_limit());
+  EXPECT_EQ((Gigabytes(1) + slave::DEFAULT_EXECUTOR_MEM).bytes(),
+            statistics.mem_limit_bytes());
+  EXPECT_LT(0, statistics.cpus_user_time_secs());
+  EXPECT_LT(0, statistics.cpus_system_time_secs());
+  EXPECT_GT(statistics.mem_rss_bytes(), 0u);
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  dockerContainerizer.destroy(containerId.get());
+
+  AWAIT_READY(termination);
+
+  // Usage() should fail again since the container is destroyed.
+  Future<ResourceStatistics> usage =
+    dockerContainerizer.usage(containerId.get());
+
+  AWAIT_FAILED(usage);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+#ifdef __linux__
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(containerId);
+
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  ASSERT_TRUE(
+    exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+  string name = containerName(slaveId, containerId.get());
+
+  Future<Docker::Container> inspect = docker->inspect(name);
+
+  AWAIT_READY(inspect);
+
+  Try<Resources> newResources = Resources::parse("cpus:1;mem:128");
+
+  ASSERT_SOME(newResources);
+
+  Future<Nothing> update =
+    dockerContainerizer.update(containerId.get(), newResources.get());
+
+  AWAIT_READY(update);
+
+  Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
+  Result<string> memoryHierarchy = cgroups::hierarchy("memory");
+
+  ASSERT_SOME(cpuHierarchy);
+  ASSERT_SOME(memoryHierarchy);
+
+  Option<pid_t> pid = inspect.get().pid;
+  ASSERT_SOME(pid);
+
+  Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get());
+  ASSERT_SOME(cpuCgroup);
+
+  Result<string> memoryCgroup = cgroups::memory::cgroup(pid.get());
+  ASSERT_SOME(memoryCgroup);
+
+  Try<uint64_t> cpu = cgroups::cpu::shares(
+      cpuHierarchy.get(),
+      cpuCgroup.get());
+
+  ASSERT_SOME(cpu);
+
+  Try<Bytes> mem = cgroups::memory::soft_limit_in_bytes(
+      memoryHierarchy.get(),
+      memoryCgroup.get());
+
+  ASSERT_SOME(mem);
+
+  EXPECT_EQ(1024u, cpu.get());
+  EXPECT_EQ(128u, mem.get().megabytes());
+
+  newResources = Resources::parse("cpus:1;mem:144");
+
+  // Issue second update that uses the cached pid instead of inspect.
+  update = dockerContainerizer.update(containerId.get(), newResources.get());
+
+  AWAIT_READY(update);
+
+  cpu = cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get());
+
+  ASSERT_SOME(cpu);
+
+  mem = cgroups::memory::soft_limit_in_bytes(
+      memoryHierarchy.get(),
+      memoryCgroup.get());
+
+  ASSERT_SOME(mem);
+
+  EXPECT_EQ(1024u, cpu.get());
+  EXPECT_EQ(144u, mem.get().megabytes());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+#endif //__linux__
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Recover)
+{
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Future<string> stoppedContainer;
+  EXPECT_CALL(*mockDocker, stop(_, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&stoppedContainer),
+                    Return(Nothing())));
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  SlaveID slaveId;
+  slaveId.set_value("s1");
+  ContainerID containerId;
+  containerId.set_value("c1");
+  ContainerID reapedContainerId;
+  reapedContainerId.set_value("c2");
+
+  string container1 = containerName(slaveId, containerId);
+  string container2 = containerName(slaveId, reapedContainerId);
+
+  // Clean up artifacts if containers still exists.
+  ASSERT_TRUE(docker->rm(container1, true).await(Seconds(30)));
+  ASSERT_TRUE(docker->rm(container2, true).await(Seconds(30)));
+
+  Resources resources = Resources::parse("cpus:1;mem:512").get();
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  CommandInfo commandInfo;
+  commandInfo.set_value("sleep 1000");
+
+  Future<Nothing> d1 =
+    docker->run(
+        containerInfo,
+        commandInfo,
+        container1,
+        flags.work_dir,
+        flags.sandbox_directory,
+        resources);
+
+  Future<Nothing> d2 =
+    docker->run(
+        containerInfo,
+        commandInfo,
+        container2,
+        flags.work_dir,
+        flags.sandbox_directory,
+        resources);
+
+  ASSERT_TRUE(
+    exists(docker, slaveId, containerId, ContainerState::RUNNING));
+  ASSERT_TRUE(
+    exists(docker, slaveId, reapedContainerId, ContainerState::RUNNING));
+
+  Future<Docker::Container> inspect = docker->inspect(container2);
+  AWAIT_READY(inspect);
+
+  SlaveState slaveState;
+  slaveState.id = slaveId;
+  FrameworkState frameworkState;
+
+  ExecutorID execId;
+  execId.set_value("e1");
+
+  ExecutorState execState;
+  ExecutorInfo execInfo;
+  execState.info = execInfo;
+  execState.latest = containerId;
+
+  Try<process::Subprocess> wait =
+    process::subprocess(tests::flags.docker + " wait " + container1);
+
+  ASSERT_SOME(wait);
+
+  FrameworkID frameworkId;
+
+  RunState runState;
+  runState.id = containerId;
+  runState.forkedPid = wait.get().pid();
+  execState.runs.put(containerId, runState);
+  frameworkState.executors.put(execId, execState);
+
+  slaveState.frameworks.put(frameworkId, frameworkState);
+
+  Future<Nothing> recover = dockerContainerizer.recover(slaveState);
+
+  AWAIT_READY(recover);
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId);
+
+  ASSERT_FALSE(termination.isFailed());
+
+  AWAIT_FAILED(dockerContainerizer.wait(reapedContainerId));
+
+  AWAIT_EQ(inspect.get().id, stoppedContainer);
+
+  Shutdown();
+}
+
+
+// This test checks the docker containerizer doesn't recover executors
+// that were started by another containerizer (e.g: mesos).
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverNonDocker)
+{
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  ContainerID containerId;
+  containerId.set_value("c1");
+  ContainerID reapedContainerId;
+  reapedContainerId.set_value("c2");
+
+  ExecutorID executorId;
+  executorId.set_value(UUID::random().toString());
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_container()->set_type(ContainerInfo::MESOS);
+
+  ExecutorState executorState;
+  executorState.info = executorInfo;
+  executorState.latest = containerId;
+
+  RunState runState;
+  runState.id = containerId;
+  executorState.runs.put(containerId, runState);
+
+  FrameworkState frameworkState;
+  frameworkState.executors.put(executorId, executorState);
+
+  SlaveState slaveState;
+  FrameworkID frameworkId;
+  frameworkId.set_value(UUID::random().toString());
+  slaveState.frameworks.put(frameworkId, frameworkState);
+
+  Future<Nothing> recover = dockerContainerizer.recover(slaveState);
+  AWAIT_READY(recover);
+
+  Future<hashset<ContainerID>> containers = dockerContainerizer.containers();
+  AWAIT_READY(containers);
+
+  // A MesosContainerizer task shouldn't be recovered by
+  // DockerContainerizer.
+  EXPECT_EQ(0u, containers.get().size());
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  // We skip stopping the docker container because stopping a container
+  // even when it terminated might not flush the logs and we end up
+  // not getting stdout/stderr in our tests.
+  EXPECT_CALL(*mockDocker, stop(_, _, _))
+    .WillRepeatedly(Return(Nothing()));
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  string uuid = UUID::random().toString();
+
+  CommandInfo command;
+  command.set_value("echo out" + uuid + " ; echo err" + uuid + " 1>&2");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  Future<string> directory;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<3>(&directory),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY(directory);
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  AWAIT_READY_FOR(statusFinished, Seconds(60));
+  EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+  // Now check that the proper output is in stderr and stdout (which
+  // might also contain other things, hence the use of a UUID).
+  Try<string> read = os::read(path::join(directory.get(), "stderr"));
+  ASSERT_SOME(read);
+
+  vector<string> lines = strings::split(read.get(), "\n");
+
+  EXPECT_TRUE(containsLine(lines, "err" + uuid));
+  EXPECT_FALSE(containsLine(lines, "out" + uuid));
+
+  read = os::read(path::join(directory.get(), "stdout"));
+  ASSERT_SOME(read);
+
+  lines = strings::split(read.get(), "\n");
+
+  EXPECT_TRUE(containsLine(lines, "out" + uuid));
+  EXPECT_FALSE(containsLine(lines, "err" + uuid));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// The following test uses a Docker image (mesosphere/inky) that has
+// an entrypoint "echo" and a default command "inky".
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  // We skip stopping the docker container because stopping a container
+  // even when it terminated might not flush the logs and we end up
+  // not getting stdout/stderr in our tests.
+  EXPECT_CALL(*mockDocker, stop(_, _, _))
+    .WillRepeatedly(Return(Nothing()));
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_shell(false);
+
+  // NOTE: By not setting CommandInfo::value we're testing that we
+  // will still be able to run the container because it has a default
+  // entrypoint!
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("mesosphere/inky");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  Future<string> directory;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<3>(&directory),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY(directory);
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  AWAIT_READY_FOR(statusFinished, Seconds(60));
+  EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+  Try<string> read = os::read(path::join(directory.get(), "stdout"));
+  ASSERT_SOME(read);
+
+  vector<string> lines = strings::split(read.get(), "\n");
+
+  // Since we're not passing any command value, we're expecting the
+  // default entry point to be run which is 'echo' with the default
+  // command from the image which is 'inky'.
+  EXPECT_TRUE(containsLine(lines, "inky"));
+
+  read = os::read(path::join(directory.get(), "stderr"));
+  ASSERT_SOME(read);
+
+  lines = strings::split(read.get(), "\n");
+
+  EXPECT_FALSE(containsLine(lines, "inky"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// The following test uses a Docker image (mesosphere/inky) that has
+// an entrypoint "echo" and a default command "inky".
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  // We skip stopping the docker container because stopping  a container
+  // even when it terminated might not flush the logs and we end up
+  // not getting stdout/stderr in our tests.
+  EXPECT_CALL(*mockDocker, stop(_, _, _))
+    .WillRepeatedly(Return(Nothing()));
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  string uuid = UUID::random().toString();
+
+  CommandInfo command;
+  command.set_shell(false);
+
+  // We can set the value to just the 'uuid' since it should get
+  // passed as an argument to the entrypoint, i.e., 'echo uuid'.
+  command.set_value(uuid);
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("mesosphere/inky");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  Future<string> directory;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<3>(&directory),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY(directory);
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  AWAIT_READY_FOR(statusFinished, Seconds(60));
+  EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+  // Now check that the proper output is in stderr and stdout.
+  Try<string> read = os::read(path::join(directory.get(), "stdout"));
+  ASSERT_SOME(read);
+
+  vector<string> lines = strings::split(read.get(), "\n");
+
+  // We expect the passed in command value to override the image's
+  // default command, thus we should see the value of 'uuid' in the
+  // output instead of the default command which is 'inky'.
+  EXPECT_TRUE(containsLine(lines, uuid));
+  EXPECT_FALSE(containsLine(lines, "inky"));
+
+  read = os::read(path::join(directory.get(), "stderr"));
+  ASSERT_SOME(read);
+
+  lines = strings::split(read.get(), "\n");
+
+  EXPECT_FALSE(containsLine(lines, "inky"));
+  EXPECT_FALSE(containsLine(lines, uuid));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// The following test uses a Docker image (mesosphere/inky) that has
+// an entrypoint "echo" and a default command "inky".
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  // We skip stopping the docker container because stopping a container
+  // even when it terminated might not flush the logs and we end up
+  // not getting stdout/stderr in our tests.
+  EXPECT_CALL(*mockDocker, stop(_, _, _))
+    .WillRepeatedly(Return(Nothing()));
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  string uuid = UUID::random().toString();
+
+  CommandInfo command;
+  command.set_shell(false);
+
+  // We should also be able to skip setting the comamnd value and just
+  // set the arguments and those should also get passed through to the
+  // entrypoint!
+  command.add_arguments(uuid);
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("mesosphere/inky");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  Future<string> directory;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<3>(&directory),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY(directory);
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+  AWAIT_READY_FOR(statusFinished, Seconds(60));
+  EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+  // Now check that the proper output is in stderr and stdout.
+  Try<string> read = os::read(path::join(directory.get(), "stdout"));
+  ASSERT_SOME(read);
+
+  vector<string> lines = strings::split(read.get(), "\n");
+
+  // We expect the passed in command arguments to override the image's
+  // default command, thus we should see the value of 'uuid' in the
+  // output instead of the default command which is 'inky'.
+  EXPECT_TRUE(containsLine(lines, uuid));
+  EXPECT_FALSE(containsLine(lines, "inky"));
+
+  read = os::read(path::join(directory.get(), "stderr"));
+  ASSERT_SOME(read);
+
+  lines = strings::split(read.get(), "\n");
+
+  EXPECT_FALSE(containsLine(lines, "inky"));
+  EXPECT_FALSE(containsLine(lines, uuid));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up we make sure the executor
+// re-registers and the slave properly sends the update.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  // We put the containerizer on the heap so we can more easily
+  // control it's lifetime, i.e., when we invoke the destructor.
+  MockDockerContainerizer* dockerContainerizer1 =
+    new MockDockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
+  ASSERT_SOME(slave1);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(dockerContainerizer1,
+                           &MockDockerContainerizer::_launch)));
+
+  // Drop the first update from the executor.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(containerId);
+
+  // Stop the slave before the status update is received.
+  AWAIT_READY(statusUpdateMessage);
+
+  Stop(slave1.get());
+
+  delete dockerContainerizer1;
+
+  Future<Message> reregisterExecutorMessage =
+    FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  MockDockerContainerizer* dockerContainerizer2 =
+    new MockDockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
+  ASSERT_SOME(slave2);
+
+  // Ensure the executor re-registers.
+  AWAIT_READY(reregisterExecutorMessage);
+  UPID executorPid = reregisterExecutorMessage.get().from;
+
+  ReregisterExecutorMessage reregister;
+  reregister.ParseFromString(reregisterExecutorMessage.get().body);
+
+  // Executor should inform about the unacknowledged update.
+  ASSERT_EQ(1, reregister.updates_size());
+  const StatusUpdate& update = reregister.updates(0);
+  ASSERT_EQ(task.task_id(), update.status().task_id());
+  ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer2->wait(containerId.get());
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(termination);
+
+  Shutdown();
+
+  delete dockerContainerizer2;
+}
+
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up we make sure the executor
+// re-registers and the slave properly sends the update.
+//
+// TODO(benh): This test is currently disabled because the executor
+// inside the image mesosphere/test-executor does not properly set the
+// executor PID that is uses during registration, so when the new
+// slave recovers it can't reconnect and instead destroys that
+// container. In particular, it uses '0' for it's IP which we properly
+// parse and can even properly use for sending other messages, but the
+// current implementation of 'UPID::operator bool ()' fails if the IP
+// component of a PID is '0'.
+TEST_F(DockerContainerizerTest,
+       DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer* dockerContainerizer1 =
+    new MockDockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
+  ASSERT_SOME(slave1);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  ExecutorInfo executorInfo;
+  ExecutorID executorId;
+  executorId.set_value("e1");
+  executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+  CommandInfo command;
+  command.set_value("test-executor");
+  executorInfo.mutable_command()->CopyFrom(command);
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("mesosphere/test-executor");
+
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+  executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  Future<ContainerID> containerId;
+  Future<SlaveID> slaveId;
+  EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<4>(&slaveId),
+                    Invoke(dockerContainerizer1,
+                           &MockDockerContainerizer::_launchExecutor)));
+
+  // We need to wait until the container's pid has been been
+  // checkpointed so that when the next slave recovers it won't treat
+  // the executor as having gone lost! We know this has completed
+  // after Containerizer::launch returns and the
+  // Slave::executorLaunched gets dispatched.
+  Future<Nothing> executorLaunched =
+    FUTURE_DISPATCH(_, &Slave::executorLaunched);
+
+  // The test-executor in the image immediately sends a TASK_RUNNING
+  // followed by TASK_FINISHED (no sleep/delay in between) so we need
+  // to drop the first TWO updates that come from the executor rather
+  // than only the first update like above where we can control how
+  // the length of the task.
+  Future<StatusUpdateMessage> statusUpdateMessage1 =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  // Drop the first update from the executor.
+  Future<StatusUpdateMessage> statusUpdateMessage2 =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(containerId);
+  AWAIT_READY(slaveId);
+
+  AWAIT_READY(executorLaunched);
+  AWAIT_READY(statusUpdateMessage1);
+  AWAIT_READY(statusUpdateMessage2);
+
+  Stop(slave1.get());
+
+  delete dockerContainerizer1;
+
+  Future<Message> reregisterExecutorMessage =
+    FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  MockDockerContainerizer* dockerContainerizer2 =
+    new MockDockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
+  ASSERT_SOME(slave2);
+
+  // Ensure the executor re-registers.
+  AWAIT_READY(reregisterExecutorMessage);
+  UPID executorPid = reregisterExecutorMessage.get().from;
+
+  ReregisterExecutorMessage reregister;
+  reregister.ParseFromString(reregisterExecutorMessage.get().body);
+
+  // Executor should inform about the unacknowledged update.
+  ASSERT_EQ(1, reregister.updates_size());
+  const StatusUpdate& update = reregister.updates(0);
+  ASSERT_EQ(task.task_id(), update.status().task_id());
+  ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+  ASSERT_TRUE(exists(docker, slaveId.get(), containerId.get()));
+
+  driver.stop();
+  driver.join();
+
+  delete dockerContainerizer2;
+}
+
+
+// This test verifies that port mapping with bridge network is
+// exposing the host port to the container port, by sending data
+// to the host port and receiving it in the container by listening
+// to the mapped container port.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_NC_PortMapping)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  flags.resources = "cpus:1;mem:1024;ports:[10000-10000]";
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  // We skip stopping the docker container because stopping a container
+  // even when it terminated might not flush the logs and we end up
+  // not getting stdout/stderr in our tests.
+  EXPECT_CALL(*mockDocker, stop(_, _, _))
+    .WillRepeatedly(Return(Nothing()));
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_shell(false);
+  command.set_value("nc");
+  command.add_arguments("-l");
+  command.add_arguments("-p");
+  command.add_arguments("1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
+
+  ContainerInfo::DockerInfo::PortMapping portMapping;
+  portMapping.set_host_port(10000);
+  portMapping.set_container_port(1000);
+
+  dockerInfo.add_port_mappings()->CopyFrom(portMapping);
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  Future<string> directory;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<3>(&directory),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusFinished;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusFinished))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY(directory);
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  ASSERT_TRUE(
+    exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+  string uuid = UUID::random().toString();
+
+  // Write uuid to docker mapped host port.
+  Try<process::Subprocess> s = process::subprocess(
+      "echo " + uuid + " | nc localhost 10000");
+
+  ASSERT_SOME(s);
+  AWAIT_READY_FOR(s.get().status(), Seconds(60));
+
+  AWAIT_READY_FOR(statusFinished, Seconds(60));
+  EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+  // Now check that the proper output is in stdout.
+  Try<string> read = os::read(path::join(directory.get(), "stdout"));
+  ASSERT_SOME(read);
+
+  const vector<string> lines = strings::split(read.get(), "\n");
+
+  // We expect the uuid that is sent to host port to be written
+  // to stdout by the docker container running nc -l.
+  EXPECT_TRUE(containsLine(lines, uuid));
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(termination);
+
+  Shutdown();
+}
+
+
+// This test verifies that sandbox with ':' in the path can still
+// run successfully. This a limitation of the Docker CLI where
+// the volume map parameter treats colons (:) as seperators,
+// and incorrectly seperates the sandbox directory.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  SlaveID slaveId = offer.slave_id();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("test:colon");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillRepeatedly(DoDefault());
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(statusRunning, Seconds(60));
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+  Future<containerizer::Termination> termination =
+    dockerContainerizer.wait(containerId.get());
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(termination);
+
+  Shutdown();
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  // The docker containerizer will free the process, so we must
+  // allocate on the heap.
+  MockDockerContainerizerProcess* process =
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+  MockDockerContainerizer dockerContainerizer(
+      (Owned<DockerContainerizerProcess>(process)));
+
+  Promise<Nothing> promise;
+  Future<Nothing> fetch;
+
+  // We want to pause the fetch call to simulate a long fetch time.
+  EXPECT_CALL(*process, fetch(_, _))
+    .WillOnce(DoAll(FutureSatisfy(&fetch),
+                    Return(promise.future())));
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<TaskStatus> statusFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusFailed));
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+
+  AWAIT_READY(fetch);
+
+  dockerContainerizer.destroy(containerId.get());
+
+  // Resume docker launch.
+  promise.set(Nothing());
+
+  AWAIT_READY(statusFailed);
+
+  EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  // The docker containerizer will free the process, so we must
+  // allocate on the heap.
+  MockDockerContainerizerProcess* process =
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+  MockDockerContainerizer dockerContainerizer(
+      (Owned<DockerContainerizerProcess>(process)));
+
+  Future<Nothing> fetch;
+  EXPECT_CALL(*process, fetch(_, _))
+    .WillOnce(DoAll(FutureSatisfy(&fetch),
+                    Return(Nothing())));
+
+  Promise<Nothing> promise;
+
+  // We want to pause the fetch call to simulate a long fetch time.
+  EXPECT_CALL(*process, pull(_))
+    .WillOnce(Return(promise.future()));
+
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<TaskStatus> statusFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusFailed));
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+
+  // Wait until fetch is finished.
+  AWAIT_READY(fetch);
+
+  dockerContainerizer.destroy(containerId.get());
+
+  // Resume docker launch.
+  promise.set(Nothing());
+
+  AWAIT_READY(statusFailed);
+
+  EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test checks that when a docker containerizer update failed
+// and the container failed before the executor started, the executor
+// is properly killed and cleaned up.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  // The docker containerizer will free the process, so we must
+  // allocate on the heap.
+  MockDockerContainerizerProcess* process =
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+  MockDockerContainerizer dockerContainerizer(
+      (Owned<DockerContainerizerProcess>(process)));
+
+  Try<PID<Slave>> slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("ls");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<TaskStatus> statusFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusFailed));
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  // Fail the update so we don't proceed to send run task to the executor.
+  EXPECT_CALL(dockerContainerizer, update(_, _))
+    .WillRepeatedly(Return(Failure("Fail resource update")));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+
+  AWAIT_READY(statusFailed);
+
+  EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+  driver.stop();
+  driver.join();
+
+  // We expect the executor to have exited, and if not in Shutdown
+  // the test will fail because of the executor process still running.
+  Shutdown();
+}
+
+
+// When the fetch fails we should send the scheduler a status
+// update with message the shows the actual error.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_FetchFailure)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  // The docker containerizer will free the process, so we must
+  // allocate on the heap.
+  MockDockerContainerizerProcess* process =
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+  MockDockerContainerizer dockerContainerizer(
+      (Owned<DockerContainerizerProcess>(process)));
+
+  Try<PID<Slave>> slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("ls");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<TaskStatus> statusFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusFailed));
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  EXPECT_CALL(*process, fetch(_, _))
+    .WillOnce(Return(Failure("some error from fetch")));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+
+  AWAIT_READY(statusFailed);
+
+  EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+  EXPECT_EQ("Failed to launch container: some error from fetch",
+             statusFailed.get().message());
+
+  // TODO(jaybuff): When MESOS-2035 is addressed we should validate
+  // that statusFailed.get().reason() is correctly set here.
+
+  driver.stop();
+  driver.join();
+
+  // We expect the executor to have exited, and if not in Shutdown
+  // the test will fail because of the executor process still running.
+  Shutdown();
+}
+
+
+// When the docker pull fails we should send the scheduler a status
+// update with message the shows the actual error.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerPullFailure)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Fetcher fetcher;
+
+  // The docker containerizer will free the process, so we must
+  // allocate on the heap.
+  MockDockerContainerizerProcess* process =
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+  MockDockerContainerizer dockerContainerizer(
+      (Owned<DockerContainerizerProcess>(process)));
+
+  Try<PID<Slave>> slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("ls");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  Future<TaskStatus> statusFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusFailed));
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(&dockerContainerizer,
+                           &MockDockerContainerizer::_launch)));
+
+  EXPECT_CALL(*mockDocker, pull(_, _, _))
+    .WillOnce(Return(Failure("some error from docker pull")));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY_FOR(containerId, Seconds(60));
+
+  AWAIT_READY(statusFailed);
+
+  EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+  EXPECT_EQ("Failed to launch container: some error from docker pull",
+             statusFailed.get().message());
+
+  // TODO(jaybuff): When MESOS-2035 is addressed we should validate
+  // that statusFailed.get().reason() is correctly set here.
+
+  driver.stop();
+  driver.join();
+
+  // We expect the executor to have exited, and if not in Shutdown
+  // the test will fail because of the executor process still running.
+  Shutdown();
+}
+
+
+// When the docker executor container fails to launch, docker inspect
+// future that is in a retry loop should be discarded.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerInspectDiscard)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+  Shared<Docker> docker(mockDocker);
+
+  Future<Docker::Container> inspect;
+  EXPECT_CALL(*mockDocker, inspect(_, _))
+    .WillOnce(FutureResult(&inspect,
+                           Invoke((MockDocker*) docker.get(),
+                                  &MockDocker::_inspect)));
+
+  EXPECT_CALL(*mockDocker, run(_, _, _, _, _, _, _, _, _))
+    .WillOnce(Return(Failure("Run failed")));
+
+  Fetcher fetcher;
+
+  // The docker containerizer will free the process, so we must
+  // allocate on the heap.
+  MockDockerContainerizerProcess* process =
+    new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+  MockDockerContainerizer dockerContainerizer(
+      (Owned<DockerContainerizerProcess>(process)));
+
+  Try<PID<Slave>> slave = StartSlave(&dockerContainerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  ExecutorInfo executorInfo;
+  ExecutorID executorId;
+  executorId.set_value("e1");
+  executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+  CommandInfo command;
+  command.set_value("/bin/test-executor");
+  executorInfo.mutable_command()->CopyFrom(command);
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  // TODO(tnachen): Use local image to test if possible.
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("tnachen/test-executor");
+
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+  executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  Future<TaskStatus> statusFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusFaile

<TRUNCATED>