You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/07/25 01:38:03 UTC
[10/12] mesos git commit: Moved containerizer related tests under
src/tests/containerizer.
http://git-wip-us.apache.org/repos/asf/mesos/blob/96351372/src/tests/containerizer/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp
new file mode 100644
index 0000000..80ed60e
--- /dev/null
+++ b/src/tests/containerizer/docker_containerizer_tests.cpp
@@ -0,0 +1,2955 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/owned.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/duration.hpp>
+
+#include "linux/cgroups.hpp"
+
+#include "messages/messages.hpp"
+
+#include "tests/flags.hpp"
+#include "tests/mesos.hpp"
+
+#include "slave/containerizer/docker.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
+#include "slave/paths.hpp"
+#include "slave/slave.hpp"
+#include "slave/state.hpp"
+
+using namespace mesos::internal::slave::paths;
+using namespace mesos::internal::slave::state;
+
+using namespace process;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::DockerContainerizer;
+using mesos::internal::slave::DockerContainerizerProcess;
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::Slave;
+
+using process::Future;
+using process::Message;
+using process::PID;
+using process::UPID;
+
+using std::vector;
+using std::list;
+using std::string;
+
+using testing::_;
+using testing::DoAll;
+using testing::DoDefault;
+using testing::Eq;
+using testing::Invoke;
+using testing::Return;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+
+class MockDocker : public Docker
+{
+public:
+ MockDocker(const string& path) : Docker(path)
+ {
+ EXPECT_CALL(*this, pull(_, _, _))
+ .WillRepeatedly(Invoke(this, &MockDocker::_pull));
+
+ EXPECT_CALL(*this, stop(_, _, _))
+ .WillRepeatedly(Invoke(this, &MockDocker::_stop));
+
+ EXPECT_CALL(*this, run(_, _, _, _, _, _, _, _, _))
+ .WillRepeatedly(Invoke(this, &MockDocker::_run));
+
+ EXPECT_CALL(*this, inspect(_, _))
+ .WillRepeatedly(Invoke(this, &MockDocker::_inspect));
+ }
+
+ MOCK_CONST_METHOD9(
+ run,
+ process::Future<Nothing>(
+ const mesos::ContainerInfo&,
+ const mesos::CommandInfo&,
+ const std::string&,
+ const std::string&,
+ const std::string&,
+ const Option<mesos::Resources>&,
+ const Option<std::map<std::string, std::string>>&,
+ const Option<std::string>&,
+ const Option<std::string>&));
+
+ MOCK_CONST_METHOD3(
+ pull,
+ process::Future<Docker::Image>(
+ const string&,
+ const string&,
+ bool));
+
+ MOCK_CONST_METHOD3(
+ stop,
+ process::Future<Nothing>(
+ const string&,
+ const Duration&,
+ bool));
+
+ MOCK_CONST_METHOD2(
+ inspect,
+ process::Future<Docker::Container>(
+ const string&,
+ const Option<Duration>&));
+
+ process::Future<Nothing> _run(
+ const mesos::ContainerInfo& containerInfo,
+ const mesos::CommandInfo& commandInfo,
+ const std::string& name,
+ const std::string& sandboxDirectory,
+ const std::string& mappedDirectory,
+ const Option<mesos::Resources>& resources,
+ const Option<std::map<std::string, std::string>>& env,
+ const Option<std::string>& stdoutPath,
+ const Option<std::string>& stderrPath) const
+ {
+ return Docker::run(
+ containerInfo,
+ commandInfo,
+ name,
+ sandboxDirectory,
+ mappedDirectory,
+ resources,
+ env,
+ stdoutPath,
+ stderrPath);
+ }
+
+ process::Future<Docker::Image> _pull(
+ const string& directory,
+ const string& image,
+ bool force) const
+ {
+ return Docker::pull(directory, image, force);
+ }
+
+ process::Future<Nothing> _stop(
+ const string& containerName,
+ const Duration& timeout,
+ bool remove) const
+ {
+ return Docker::stop(containerName, timeout, remove);
+ }
+
+ process::Future<Docker::Container> _inspect(
+ const string& containerName,
+ const Option<Duration>& retryInterval)
+ {
+ return Docker::inspect(containerName, retryInterval);
+ }
+};
+
+
+class DockerContainerizerTest : public MesosTest
+{
+public:
+ static string containerName(
+ const SlaveID& slaveId,
+ const ContainerID& containerId)
+ {
+ return slave::DOCKER_NAME_PREFIX + slaveId.value() +
+ slave::DOCKER_NAME_SEPERATOR + containerId.value();
+ }
+
+ enum ContainerState {
+ EXISTS,
+ RUNNING
+ };
+
+ static bool exists(
+ const process::Shared<Docker>& docker,
+ const SlaveID& slaveId,
+ const ContainerID& containerId,
+ ContainerState state = ContainerState::EXISTS)
+ {
+ Duration waited = Duration::zero();
+ string expectedName = containerName(slaveId, containerId);
+
+ do {
+ Future<Docker::Container> inspect = docker->inspect(expectedName);
+
+ if (!inspect.await(Seconds(3))) {
+ return false;
+ }
+
+ if (inspect.isReady()) {
+ switch (state) {
+ case ContainerState::RUNNING:
+ if (inspect.get().pid.isSome()) {
+ return true;
+ }
+ // Retry looking for running pid until timeout.
+ break;
+ case ContainerState::EXISTS:
+ return true;
+ }
+ }
+
+ os::sleep(Milliseconds(200));
+ waited += Milliseconds(200);
+ } while (waited < Seconds(5));
+
+ return false;
+ }
+
+ static bool containsLine(
+ const vector<string>& lines,
+ const string& expectedLine)
+ {
+ foreach (const string& line, lines) {
+ if (line == expectedLine) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ virtual void TearDown()
+ {
+ Try<Docker*> docker = Docker::create(tests::flags.docker, false);
+ ASSERT_SOME(docker);
+ Future<list<Docker::Container>> containers =
+ docker.get()->ps(true, slave::DOCKER_NAME_PREFIX);
+
+ AWAIT_READY(containers);
+
+ // Cleanup all mesos launched containers.
+ foreach (const Docker::Container& container, containers.get()) {
+ AWAIT_READY_FOR(docker.get()->rm(container.id, true), Seconds(30));
+ }
+
+ delete docker.get();
+ }
+};
+
+
+class MockDockerContainerizer : public DockerContainerizer {
+public:
+ MockDockerContainerizer(
+ const slave::Flags& flags,
+ Fetcher* fetcher,
+ Shared<Docker> docker)
+ : DockerContainerizer(flags, fetcher, docker)
+ {
+ initialize();
+ }
+
+ MockDockerContainerizer(const Owned<DockerContainerizerProcess>& process)
+ : DockerContainerizer(process)
+ {
+ initialize();
+ }
+
+ void initialize()
+ {
+ // NOTE: See TestContainerizer::setup for why we use
+ // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+ // 'ON_CALL' and 'WillByDefault'.
+ EXPECT_CALL(*this, launch(_, _, _, _, _, _, _))
+ .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launchExecutor));
+
+ EXPECT_CALL(*this, launch(_, _, _, _, _, _, _, _))
+ .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_launch));
+
+ EXPECT_CALL(*this, update(_, _))
+ .WillRepeatedly(Invoke(this, &MockDockerContainerizer::_update));
+ }
+
+ MOCK_METHOD7(
+ launch,
+ process::Future<bool>(
+ const ContainerID&,
+ const ExecutorInfo&,
+ const std::string&,
+ const Option<std::string>&,
+ const SlaveID&,
+ const process::PID<slave::Slave>&,
+ bool checkpoint));
+
+ MOCK_METHOD8(
+ launch,
+ process::Future<bool>(
+ const ContainerID&,
+ const TaskInfo&,
+ const ExecutorInfo&,
+ const std::string&,
+ const Option<std::string>&,
+ const SlaveID&,
+ const process::PID<slave::Slave>&,
+ bool checkpoint));
+
+ MOCK_METHOD2(
+ update,
+ process::Future<Nothing>(
+ const ContainerID&,
+ const Resources&));
+
+ // Default 'launch' implementation (necessary because we can't just
+ // use &DockerContainerizer::launch with 'Invoke').
+ process::Future<bool> _launch(
+ const ContainerID& containerId,
+ const TaskInfo& taskInfo,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+ {
+ return DockerContainerizer::launch(
+ containerId,
+ taskInfo,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+ }
+
+ process::Future<bool> _launchExecutor(
+ const ContainerID& containerId,
+ const ExecutorInfo& executorInfo,
+ const string& directory,
+ const Option<string>& user,
+ const SlaveID& slaveId,
+ const PID<Slave>& slavePid,
+ bool checkpoint)
+ {
+ return DockerContainerizer::launch(
+ containerId,
+ executorInfo,
+ directory,
+ user,
+ slaveId,
+ slavePid,
+ checkpoint);
+ }
+
+ process::Future<Nothing> _update(
+ const ContainerID& containerId,
+ const Resources& resources)
+ {
+ return DockerContainerizer::update(
+ containerId,
+ resources);
+ }
+};
+
+
+class MockDockerContainerizerProcess : public DockerContainerizerProcess
+{
+public:
+ MockDockerContainerizerProcess(
+ const slave::Flags& flags,
+ Fetcher* fetcher,
+ const Shared<Docker>& docker)
+ : DockerContainerizerProcess(flags, fetcher, docker)
+ {
+ EXPECT_CALL(*this, fetch(_, _))
+ .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_fetch));
+
+ EXPECT_CALL(*this, pull(_))
+ .WillRepeatedly(Invoke(this, &MockDockerContainerizerProcess::_pull));
+ }
+
+ MOCK_METHOD2(
+ fetch,
+ process::Future<Nothing>(
+ const ContainerID& containerId,
+ const SlaveID& slaveId));
+
+ MOCK_METHOD1(
+ pull,
+ process::Future<Nothing>(const ContainerID& containerId));
+
+ process::Future<Nothing> _fetch(
+ const ContainerID& containerId,
+ const SlaveID& slaveId)
+ {
+ return DockerContainerizerProcess::fetch(containerId, slaveId);
+ }
+
+ process::Future<Nothing> _pull(const ContainerID& containerId)
+ {
+ return DockerContainerizerProcess::pull(containerId);
+ }
+};
+
+
+// Only enable executor launch on linux as other platforms
+// requires running linux VM and need special port forwarding
+// to get host networking to work.
+#ifdef __linux__
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ ExecutorInfo executorInfo;
+ ExecutorID executorId;
+ executorId.set_value("e1");
+ executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+ CommandInfo command;
+ command.set_value("/bin/test-executor");
+ executorInfo.mutable_command()->CopyFrom(command);
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("tnachen/test-executor");
+
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+ executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+ task.mutable_executor()->CopyFrom(executorInfo);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launchExecutor)));
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusFinished;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ AWAIT_READY_FOR(statusFinished, Seconds(60));
+ EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+ ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(termination);
+
+ ASSERT_FALSE(
+ exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+ Shutdown();
+}
+
+
+// This test verifies that a custom executor can be launched and
+// registered with the slave with docker bridge network enabled.
+// We're assuming that the custom executor is registering it's public
+// ip instead of 0.0.0.0 or equivelent to the slave as that's the
+// default behavior for libprocess.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor_Bridged)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ ExecutorInfo executorInfo;
+ ExecutorID executorId;
+ executorId.set_value("e1");
+ executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+ CommandInfo command;
+ command.set_value("/bin/test-executor");
+ executorInfo.mutable_command()->CopyFrom(command);
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("tnachen/test-executor");
+ dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
+
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+ executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+ task.mutable_executor()->CopyFrom(executorInfo);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launchExecutor)));
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusFinished;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ AWAIT_READY_FOR(statusFinished, Seconds(60));
+ EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+ ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(termination);
+
+ ASSERT_FALSE(
+ exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+ Shutdown();
+}
+#endif // __linux__
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ ASSERT_TRUE(statusRunning.get().has_data());
+
+ Try<JSON::Array> parse = JSON::parse<JSON::Array>(statusRunning.get().data());
+ ASSERT_SOME(parse);
+
+ // Now verify that the Docker.NetworkSettings.IPAddress label is
+ // present.
+ ASSERT_TRUE(statusRunning.get().has_labels());
+ EXPECT_EQ(1, statusRunning.get().labels().labels().size());
+ EXPECT_EQ("Docker.NetworkSettings.IPAddress",
+ statusRunning.get().labels().labels(0).key());
+
+ ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(termination);
+
+ ASSERT_FALSE(
+ exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+ Shutdown();
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ ASSERT_TRUE(
+ exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+ Future<TaskStatus> statusKilled;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusKilled));
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ driver.killTask(task.task_id());
+
+ AWAIT_READY(statusKilled);
+ EXPECT_EQ(TASK_KILLED, statusKilled.get().state());
+
+ AWAIT_READY(termination);
+
+ ASSERT_FALSE(
+ exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test tests DockerContainerizer::usage().
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Usage)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.resources = Option<string>("cpus:2;mem:1024");
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ // Run a CPU intensive command, so we can measure utime and stime later.
+ command.set_value("dd if=/dev/zero of=/dev/null");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ // We ignore all update calls to prevent resizing cgroup limits.
+ EXPECT_CALL(dockerContainerizer, update(_, _))
+ .WillRepeatedly(Return(Nothing()));
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ // Verify the usage.
+ ResourceStatistics statistics;
+ Duration waited = Duration::zero();
+ do {
+ Future<ResourceStatistics> usage =
+ dockerContainerizer.usage(containerId.get());
+ // TODO(tnachen): Replace await with AWAIT_COMPLETED once
+ // implemented.
+ ASSERT_TRUE(usage.await(Seconds(3)));
+
+ if (usage.isReady()) {
+ statistics = usage.get();
+
+ if (statistics.cpus_user_time_secs() > 0 &&
+ statistics.cpus_system_time_secs() > 0) {
+ break;
+ }
+ }
+
+ os::sleep(Milliseconds(200));
+ waited += Milliseconds(200);
+ } while (waited < Seconds(3));
+
+ // Usage includes the executor resources.
+ EXPECT_EQ(2.0 + slave::DEFAULT_EXECUTOR_CPUS, statistics.cpus_limit());
+ EXPECT_EQ((Gigabytes(1) + slave::DEFAULT_EXECUTOR_MEM).bytes(),
+ statistics.mem_limit_bytes());
+ EXPECT_LT(0, statistics.cpus_user_time_secs());
+ EXPECT_LT(0, statistics.cpus_system_time_secs());
+ EXPECT_GT(statistics.mem_rss_bytes(), 0u);
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ dockerContainerizer.destroy(containerId.get());
+
+ AWAIT_READY(termination);
+
+ // Usage() should fail again since the container is destroyed.
+ Future<ResourceStatistics> usage =
+ dockerContainerizer.usage(containerId.get());
+
+ AWAIT_FAILED(usage);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+#ifdef __linux__
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(containerId);
+
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ ASSERT_TRUE(
+ exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+ string name = containerName(slaveId, containerId.get());
+
+ Future<Docker::Container> inspect = docker->inspect(name);
+
+ AWAIT_READY(inspect);
+
+ Try<Resources> newResources = Resources::parse("cpus:1;mem:128");
+
+ ASSERT_SOME(newResources);
+
+ Future<Nothing> update =
+ dockerContainerizer.update(containerId.get(), newResources.get());
+
+ AWAIT_READY(update);
+
+ Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
+ Result<string> memoryHierarchy = cgroups::hierarchy("memory");
+
+ ASSERT_SOME(cpuHierarchy);
+ ASSERT_SOME(memoryHierarchy);
+
+ Option<pid_t> pid = inspect.get().pid;
+ ASSERT_SOME(pid);
+
+ Result<string> cpuCgroup = cgroups::cpu::cgroup(pid.get());
+ ASSERT_SOME(cpuCgroup);
+
+ Result<string> memoryCgroup = cgroups::memory::cgroup(pid.get());
+ ASSERT_SOME(memoryCgroup);
+
+ Try<uint64_t> cpu = cgroups::cpu::shares(
+ cpuHierarchy.get(),
+ cpuCgroup.get());
+
+ ASSERT_SOME(cpu);
+
+ Try<Bytes> mem = cgroups::memory::soft_limit_in_bytes(
+ memoryHierarchy.get(),
+ memoryCgroup.get());
+
+ ASSERT_SOME(mem);
+
+ EXPECT_EQ(1024u, cpu.get());
+ EXPECT_EQ(128u, mem.get().megabytes());
+
+ newResources = Resources::parse("cpus:1;mem:144");
+
+ // Issue second update that uses the cached pid instead of inspect.
+ update = dockerContainerizer.update(containerId.get(), newResources.get());
+
+ AWAIT_READY(update);
+
+ cpu = cgroups::cpu::shares(cpuHierarchy.get(), cpuCgroup.get());
+
+ ASSERT_SOME(cpu);
+
+ mem = cgroups::memory::soft_limit_in_bytes(
+ memoryHierarchy.get(),
+ memoryCgroup.get());
+
+ ASSERT_SOME(mem);
+
+ EXPECT_EQ(1024u, cpu.get());
+ EXPECT_EQ(144u, mem.get().megabytes());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+#endif //__linux__
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Recover)
+{
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Future<string> stoppedContainer;
+ EXPECT_CALL(*mockDocker, stop(_, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&stoppedContainer),
+ Return(Nothing())));
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ SlaveID slaveId;
+ slaveId.set_value("s1");
+ ContainerID containerId;
+ containerId.set_value("c1");
+ ContainerID reapedContainerId;
+ reapedContainerId.set_value("c2");
+
+ string container1 = containerName(slaveId, containerId);
+ string container2 = containerName(slaveId, reapedContainerId);
+
+ // Clean up artifacts if containers still exists.
+ ASSERT_TRUE(docker->rm(container1, true).await(Seconds(30)));
+ ASSERT_TRUE(docker->rm(container2, true).await(Seconds(30)));
+
+ Resources resources = Resources::parse("cpus:1;mem:512").get();
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ CommandInfo commandInfo;
+ commandInfo.set_value("sleep 1000");
+
+ Future<Nothing> d1 =
+ docker->run(
+ containerInfo,
+ commandInfo,
+ container1,
+ flags.work_dir,
+ flags.sandbox_directory,
+ resources);
+
+ Future<Nothing> d2 =
+ docker->run(
+ containerInfo,
+ commandInfo,
+ container2,
+ flags.work_dir,
+ flags.sandbox_directory,
+ resources);
+
+ ASSERT_TRUE(
+ exists(docker, slaveId, containerId, ContainerState::RUNNING));
+ ASSERT_TRUE(
+ exists(docker, slaveId, reapedContainerId, ContainerState::RUNNING));
+
+ Future<Docker::Container> inspect = docker->inspect(container2);
+ AWAIT_READY(inspect);
+
+ SlaveState slaveState;
+ slaveState.id = slaveId;
+ FrameworkState frameworkState;
+
+ ExecutorID execId;
+ execId.set_value("e1");
+
+ ExecutorState execState;
+ ExecutorInfo execInfo;
+ execState.info = execInfo;
+ execState.latest = containerId;
+
+ Try<process::Subprocess> wait =
+ process::subprocess(tests::flags.docker + " wait " + container1);
+
+ ASSERT_SOME(wait);
+
+ FrameworkID frameworkId;
+
+ RunState runState;
+ runState.id = containerId;
+ runState.forkedPid = wait.get().pid();
+ execState.runs.put(containerId, runState);
+ frameworkState.executors.put(execId, execState);
+
+ slaveState.frameworks.put(frameworkId, frameworkState);
+
+ Future<Nothing> recover = dockerContainerizer.recover(slaveState);
+
+ AWAIT_READY(recover);
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId);
+
+ ASSERT_FALSE(termination.isFailed());
+
+ AWAIT_FAILED(dockerContainerizer.wait(reapedContainerId));
+
+ AWAIT_EQ(inspect.get().id, stoppedContainer);
+
+ Shutdown();
+}
+
+
+// This test checks the docker containerizer doesn't recover executors
+// that were started by another containerizer (e.g: mesos).
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_SkipRecoverNonDocker)
+{
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ ContainerID containerId;
+ containerId.set_value("c1");
+ ContainerID reapedContainerId;
+ reapedContainerId.set_value("c2");
+
+ ExecutorID executorId;
+ executorId.set_value(UUID::random().toString());
+
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_container()->set_type(ContainerInfo::MESOS);
+
+ ExecutorState executorState;
+ executorState.info = executorInfo;
+ executorState.latest = containerId;
+
+ RunState runState;
+ runState.id = containerId;
+ executorState.runs.put(containerId, runState);
+
+ FrameworkState frameworkState;
+ frameworkState.executors.put(executorId, executorState);
+
+ SlaveState slaveState;
+ FrameworkID frameworkId;
+ frameworkId.set_value(UUID::random().toString());
+ slaveState.frameworks.put(frameworkId, frameworkState);
+
+ Future<Nothing> recover = dockerContainerizer.recover(slaveState);
+ AWAIT_READY(recover);
+
+ Future<hashset<ContainerID>> containers = dockerContainerizer.containers();
+ AWAIT_READY(containers);
+
+ // A MesosContainerizer task shouldn't be recovered by
+ // DockerContainerizer.
+ EXPECT_EQ(0u, containers.get().size());
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ // We skip stopping the docker container because stopping a container
+ // even when it terminated might not flush the logs and we end up
+ // not getting stdout/stderr in our tests.
+ EXPECT_CALL(*mockDocker, stop(_, _, _))
+ .WillRepeatedly(Return(Nothing()));
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ string uuid = UUID::random().toString();
+
+ CommandInfo command;
+ command.set_value("echo out" + uuid + " ; echo err" + uuid + " 1>&2");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ Future<string> directory;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<3>(&directory),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusFinished;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(directory);
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ AWAIT_READY_FOR(statusFinished, Seconds(60));
+ EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+ // Now check that the proper output is in stderr and stdout (which
+ // might also contain other things, hence the use of a UUID).
+ Try<string> read = os::read(path::join(directory.get(), "stderr"));
+ ASSERT_SOME(read);
+
+ vector<string> lines = strings::split(read.get(), "\n");
+
+ EXPECT_TRUE(containsLine(lines, "err" + uuid));
+ EXPECT_FALSE(containsLine(lines, "out" + uuid));
+
+ read = os::read(path::join(directory.get(), "stdout"));
+ ASSERT_SOME(read);
+
+ lines = strings::split(read.get(), "\n");
+
+ EXPECT_TRUE(containsLine(lines, "out" + uuid));
+ EXPECT_FALSE(containsLine(lines, "err" + uuid));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// The following test uses a Docker image (mesosphere/inky) that has
+// an entrypoint "echo" and a default command "inky".
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ // We skip stopping the docker container because stopping a container
+ // even when it terminated might not flush the logs and we end up
+ // not getting stdout/stderr in our tests.
+ EXPECT_CALL(*mockDocker, stop(_, _, _))
+ .WillRepeatedly(Return(Nothing()));
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_shell(false);
+
+ // NOTE: By not setting CommandInfo::value we're testing that we
+ // will still be able to run the container because it has a default
+ // entrypoint!
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("mesosphere/inky");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ Future<string> directory;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<3>(&directory),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusFinished;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(directory);
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ AWAIT_READY_FOR(statusFinished, Seconds(60));
+ EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+ Try<string> read = os::read(path::join(directory.get(), "stdout"));
+ ASSERT_SOME(read);
+
+ vector<string> lines = strings::split(read.get(), "\n");
+
+ // Since we're not passing any command value, we're expecting the
+ // default entry point to be run which is 'echo' with the default
+ // command from the image which is 'inky'.
+ EXPECT_TRUE(containsLine(lines, "inky"));
+
+ read = os::read(path::join(directory.get(), "stderr"));
+ ASSERT_SOME(read);
+
+ lines = strings::split(read.get(), "\n");
+
+ EXPECT_FALSE(containsLine(lines, "inky"));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// The following test uses a Docker image (mesosphere/inky) that has
+// an entrypoint "echo" and a default command "inky".
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ // We skip stopping the docker container because stopping a container
+ // even when it terminated might not flush the logs and we end up
+ // not getting stdout/stderr in our tests.
+ EXPECT_CALL(*mockDocker, stop(_, _, _))
+ .WillRepeatedly(Return(Nothing()));
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ string uuid = UUID::random().toString();
+
+ CommandInfo command;
+ command.set_shell(false);
+
+ // We can set the value to just the 'uuid' since it should get
+ // passed as an argument to the entrypoint, i.e., 'echo uuid'.
+ command.set_value(uuid);
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("mesosphere/inky");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ Future<string> directory;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<3>(&directory),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusFinished;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(directory);
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ AWAIT_READY_FOR(statusFinished, Seconds(60));
+ EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+ // Now check that the proper output is in stderr and stdout.
+ Try<string> read = os::read(path::join(directory.get(), "stdout"));
+ ASSERT_SOME(read);
+
+ vector<string> lines = strings::split(read.get(), "\n");
+
+ // We expect the passed in command value to override the image's
+ // default command, thus we should see the value of 'uuid' in the
+ // output instead of the default command which is 'inky'.
+ EXPECT_TRUE(containsLine(lines, uuid));
+ EXPECT_FALSE(containsLine(lines, "inky"));
+
+ read = os::read(path::join(directory.get(), "stderr"));
+ ASSERT_SOME(read);
+
+ lines = strings::split(read.get(), "\n");
+
+ EXPECT_FALSE(containsLine(lines, "inky"));
+ EXPECT_FALSE(containsLine(lines, uuid));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// The following test uses a Docker image (mesosphere/inky) that has
+// an entrypoint "echo" and a default command "inky".
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ // We skip stopping the docker container because stopping a container
+ // even when it terminated might not flush the logs and we end up
+ // not getting stdout/stderr in our tests.
+ EXPECT_CALL(*mockDocker, stop(_, _, _))
+ .WillRepeatedly(Return(Nothing()));
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ string uuid = UUID::random().toString();
+
+ CommandInfo command;
+ command.set_shell(false);
+
+ // We should also be able to skip setting the comamnd value and just
+ // set the arguments and those should also get passed through to the
+ // entrypoint!
+ command.add_arguments(uuid);
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("mesosphere/inky");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ Future<string> directory;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<3>(&directory),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusFinished;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(directory);
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+ AWAIT_READY_FOR(statusFinished, Seconds(60));
+ EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+ // Now check that the proper output is in stderr and stdout.
+ Try<string> read = os::read(path::join(directory.get(), "stdout"));
+ ASSERT_SOME(read);
+
+ vector<string> lines = strings::split(read.get(), "\n");
+
+ // We expect the passed in command arguments to override the image's
+ // default command, thus we should see the value of 'uuid' in the
+ // output instead of the default command which is 'inky'.
+ EXPECT_TRUE(containsLine(lines, uuid));
+ EXPECT_FALSE(containsLine(lines, "inky"));
+
+ read = os::read(path::join(directory.get(), "stderr"));
+ ASSERT_SOME(read);
+
+ lines = strings::split(read.get(), "\n");
+
+ EXPECT_FALSE(containsLine(lines, "inky"));
+ EXPECT_FALSE(containsLine(lines, uuid));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up we make sure the executor
+// re-registers and the slave properly sends the update.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ // We put the containerizer on the heap so we can more easily
+ // control it's lifetime, i.e., when we invoke the destructor.
+ MockDockerContainerizer* dockerContainerizer1 =
+ new MockDockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
+ ASSERT_SOME(slave1);
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(dockerContainerizer1,
+ &MockDockerContainerizer::_launch)));
+
+ // Drop the first update from the executor.
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(containerId);
+
+ // Stop the slave before the status update is received.
+ AWAIT_READY(statusUpdateMessage);
+
+ Stop(slave1.get());
+
+ delete dockerContainerizer1;
+
+ Future<Message> reregisterExecutorMessage =
+ FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ MockDockerContainerizer* dockerContainerizer2 =
+ new MockDockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
+ ASSERT_SOME(slave2);
+
+ // Ensure the executor re-registers.
+ AWAIT_READY(reregisterExecutorMessage);
+ UPID executorPid = reregisterExecutorMessage.get().from;
+
+ ReregisterExecutorMessage reregister;
+ reregister.ParseFromString(reregisterExecutorMessage.get().body);
+
+ // Executor should inform about the unacknowledged update.
+ ASSERT_EQ(1, reregister.updates_size());
+ const StatusUpdate& update = reregister.updates(0);
+ ASSERT_EQ(task.task_id(), update.status().task_id());
+ ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+ // Scheduler should receive the recovered update.
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+ ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer2->wait(containerId.get());
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(termination);
+
+ Shutdown();
+
+ delete dockerContainerizer2;
+}
+
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up we make sure the executor
+// re-registers and the slave properly sends the update.
+//
+// TODO(benh): This test is currently disabled because the executor
+// inside the image mesosphere/test-executor does not properly set the
+// executor PID that is uses during registration, so when the new
+// slave recovers it can't reconnect and instead destroys that
+// container. In particular, it uses '0' for it's IP which we properly
+// parse and can even properly use for sending other messages, but the
+// current implementation of 'UPID::operator bool ()' fails if the IP
+// component of a PID is '0'.
+TEST_F(DockerContainerizerTest,
+ DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer* dockerContainerizer1 =
+ new MockDockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
+ ASSERT_SOME(slave1);
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ ExecutorInfo executorInfo;
+ ExecutorID executorId;
+ executorId.set_value("e1");
+ executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+ CommandInfo command;
+ command.set_value("test-executor");
+ executorInfo.mutable_command()->CopyFrom(command);
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("mesosphere/test-executor");
+
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+ executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+ task.mutable_executor()->CopyFrom(executorInfo);
+
+ Future<ContainerID> containerId;
+ Future<SlaveID> slaveId;
+ EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<4>(&slaveId),
+ Invoke(dockerContainerizer1,
+ &MockDockerContainerizer::_launchExecutor)));
+
+ // We need to wait until the container's pid has been been
+ // checkpointed so that when the next slave recovers it won't treat
+ // the executor as having gone lost! We know this has completed
+ // after Containerizer::launch returns and the
+ // Slave::executorLaunched gets dispatched.
+ Future<Nothing> executorLaunched =
+ FUTURE_DISPATCH(_, &Slave::executorLaunched);
+
+ // The test-executor in the image immediately sends a TASK_RUNNING
+ // followed by TASK_FINISHED (no sleep/delay in between) so we need
+ // to drop the first TWO updates that come from the executor rather
+ // than only the first update like above where we can control how
+ // the length of the task.
+ Future<StatusUpdateMessage> statusUpdateMessage1 =
+ DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+ // Drop the first update from the executor.
+ Future<StatusUpdateMessage> statusUpdateMessage2 =
+ DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(containerId);
+ AWAIT_READY(slaveId);
+
+ AWAIT_READY(executorLaunched);
+ AWAIT_READY(statusUpdateMessage1);
+ AWAIT_READY(statusUpdateMessage2);
+
+ Stop(slave1.get());
+
+ delete dockerContainerizer1;
+
+ Future<Message> reregisterExecutorMessage =
+ FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ MockDockerContainerizer* dockerContainerizer2 =
+ new MockDockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
+ ASSERT_SOME(slave2);
+
+ // Ensure the executor re-registers.
+ AWAIT_READY(reregisterExecutorMessage);
+ UPID executorPid = reregisterExecutorMessage.get().from;
+
+ ReregisterExecutorMessage reregister;
+ reregister.ParseFromString(reregisterExecutorMessage.get().body);
+
+ // Executor should inform about the unacknowledged update.
+ ASSERT_EQ(1, reregister.updates_size());
+ const StatusUpdate& update = reregister.updates(0);
+ ASSERT_EQ(task.task_id(), update.status().task_id());
+ ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+ // Scheduler should receive the recovered update.
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+ ASSERT_TRUE(exists(docker, slaveId.get(), containerId.get()));
+
+ driver.stop();
+ driver.join();
+
+ delete dockerContainerizer2;
+}
+
+
+// This test verifies that port mapping with bridge network is
+// exposing the host port to the container port, by sending data
+// to the host port and receiving it in the container by listening
+// to the mapped container port.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_NC_PortMapping)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ flags.resources = "cpus:1;mem:1024;ports:[10000-10000]";
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ // We skip stopping the docker container because stopping a container
+ // even when it terminated might not flush the logs and we end up
+ // not getting stdout/stderr in our tests.
+ EXPECT_CALL(*mockDocker, stop(_, _, _))
+ .WillRepeatedly(Return(Nothing()));
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_shell(false);
+ command.set_value("nc");
+ command.add_arguments("-l");
+ command.add_arguments("-p");
+ command.add_arguments("1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ dockerInfo.set_network(ContainerInfo::DockerInfo::BRIDGE);
+
+ ContainerInfo::DockerInfo::PortMapping portMapping;
+ portMapping.set_host_port(10000);
+ portMapping.set_container_port(1000);
+
+ dockerInfo.add_port_mappings()->CopyFrom(portMapping);
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ Future<string> directory;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ FutureArg<3>(&directory),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusFinished;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusFinished))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY(directory);
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ ASSERT_TRUE(
+ exists(docker, slaveId, containerId.get(), ContainerState::RUNNING));
+
+ string uuid = UUID::random().toString();
+
+ // Write uuid to docker mapped host port.
+ Try<process::Subprocess> s = process::subprocess(
+ "echo " + uuid + " | nc localhost 10000");
+
+ ASSERT_SOME(s);
+ AWAIT_READY_FOR(s.get().status(), Seconds(60));
+
+ AWAIT_READY_FOR(statusFinished, Seconds(60));
+ EXPECT_EQ(TASK_FINISHED, statusFinished.get().state());
+
+ // Now check that the proper output is in stdout.
+ Try<string> read = os::read(path::join(directory.get(), "stdout"));
+ ASSERT_SOME(read);
+
+ const vector<string> lines = strings::split(read.get(), "\n");
+
+ // We expect the uuid that is sent to host port to be written
+ // to stdout by the docker container running nc -l.
+ EXPECT_TRUE(containsLine(lines, uuid));
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(termination);
+
+ Shutdown();
+}
+
+
+// This test verifies that sandbox with ':' in the path can still
+// run successfully. This a limitation of the Docker CLI where
+// the volume map parameter treats colons (:) as seperators,
+// and incorrectly seperates the sandbox directory.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_LaunchSandboxWithColon)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ MockDockerContainerizer dockerContainerizer(flags, &fetcher, docker);
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ SlaveID slaveId = offer.slave_id();
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("test:colon");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(DoDefault());
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusRunning, Seconds(60));
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ ASSERT_TRUE(exists(docker, slaveId, containerId.get()));
+
+ Future<containerizer::Termination> termination =
+ dockerContainerizer.wait(containerId.get());
+
+ driver.stop();
+ driver.join();
+
+ AWAIT_READY(termination);
+
+ Shutdown();
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhileFetching)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ // The docker containerizer will free the process, so we must
+ // allocate on the heap.
+ MockDockerContainerizerProcess* process =
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+ MockDockerContainerizer dockerContainerizer(
+ (Owned<DockerContainerizerProcess>(process)));
+
+ Promise<Nothing> promise;
+ Future<Nothing> fetch;
+
+ // We want to pause the fetch call to simulate a long fetch time.
+ EXPECT_CALL(*process, fetch(_, _))
+ .WillOnce(DoAll(FutureSatisfy(&fetch),
+ Return(promise.future())));
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<TaskStatus> statusFailed;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusFailed));
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+
+ AWAIT_READY(fetch);
+
+ dockerContainerizer.destroy(containerId.get());
+
+ // Resume docker launch.
+ promise.set(Nothing());
+
+ AWAIT_READY(statusFailed);
+
+ EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DestroyWhilePulling)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ // The docker containerizer will free the process, so we must
+ // allocate on the heap.
+ MockDockerContainerizerProcess* process =
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+ MockDockerContainerizer dockerContainerizer(
+ (Owned<DockerContainerizerProcess>(process)));
+
+ Future<Nothing> fetch;
+ EXPECT_CALL(*process, fetch(_, _))
+ .WillOnce(DoAll(FutureSatisfy(&fetch),
+ Return(Nothing())));
+
+ Promise<Nothing> promise;
+
+ // We want to pause the fetch call to simulate a long fetch time.
+ EXPECT_CALL(*process, pull(_))
+ .WillOnce(Return(promise.future()));
+
+ Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("sleep 1000");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<TaskStatus> statusFailed;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusFailed));
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+
+ // Wait until fetch is finished.
+ AWAIT_READY(fetch);
+
+ dockerContainerizer.destroy(containerId.get());
+
+ // Resume docker launch.
+ promise.set(Nothing());
+
+ AWAIT_READY(statusFailed);
+
+ EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test checks that when a docker containerizer update failed
+// and the container failed before the executor started, the executor
+// is properly killed and cleaned up.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ // The docker containerizer will free the process, so we must
+ // allocate on the heap.
+ MockDockerContainerizerProcess* process =
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+ MockDockerContainerizer dockerContainerizer(
+ (Owned<DockerContainerizerProcess>(process)));
+
+ Try<PID<Slave>> slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("ls");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<TaskStatus> statusFailed;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusFailed));
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ // Fail the update so we don't proceed to send run task to the executor.
+ EXPECT_CALL(dockerContainerizer, update(_, _))
+ .WillRepeatedly(Return(Failure("Fail resource update")));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+
+ AWAIT_READY(statusFailed);
+
+ EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+
+ driver.stop();
+ driver.join();
+
+ // We expect the executor to have exited, and if not in Shutdown
+ // the test will fail because of the executor process still running.
+ Shutdown();
+}
+
+
+// When the fetch fails we should send the scheduler a status
+// update with message the shows the actual error.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_FetchFailure)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ // The docker containerizer will free the process, so we must
+ // allocate on the heap.
+ MockDockerContainerizerProcess* process =
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+ MockDockerContainerizer dockerContainerizer(
+ (Owned<DockerContainerizerProcess>(process)));
+
+ Try<PID<Slave>> slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("ls");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<TaskStatus> statusFailed;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusFailed));
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ EXPECT_CALL(*process, fetch(_, _))
+ .WillOnce(Return(Failure("some error from fetch")));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+
+ AWAIT_READY(statusFailed);
+
+ EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+ EXPECT_EQ("Failed to launch container: some error from fetch",
+ statusFailed.get().message());
+
+ // TODO(jaybuff): When MESOS-2035 is addressed we should validate
+ // that statusFailed.get().reason() is correctly set here.
+
+ driver.stop();
+ driver.join();
+
+ // We expect the executor to have exited, and if not in Shutdown
+ // the test will fail because of the executor process still running.
+ Shutdown();
+}
+
+
+// When the docker pull fails we should send the scheduler a status
+// update with message the shows the actual error.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerPullFailure)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Fetcher fetcher;
+
+ // The docker containerizer will free the process, so we must
+ // allocate on the heap.
+ MockDockerContainerizerProcess* process =
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+ MockDockerContainerizer dockerContainerizer(
+ (Owned<DockerContainerizerProcess>(process)));
+
+ Try<PID<Slave>> slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ CommandInfo command;
+ command.set_value("ls");
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("busybox");
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+ task.mutable_command()->CopyFrom(command);
+ task.mutable_container()->CopyFrom(containerInfo);
+
+ Future<TaskStatus> statusFailed;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusFailed));
+
+ Future<ContainerID> containerId;
+ EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _))
+ .WillOnce(DoAll(FutureArg<0>(&containerId),
+ Invoke(&dockerContainerizer,
+ &MockDockerContainerizer::_launch)));
+
+ EXPECT_CALL(*mockDocker, pull(_, _, _))
+ .WillOnce(Return(Failure("some error from docker pull")));
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY_FOR(containerId, Seconds(60));
+
+ AWAIT_READY(statusFailed);
+
+ EXPECT_EQ(TASK_FAILED, statusFailed.get().state());
+ EXPECT_EQ("Failed to launch container: some error from docker pull",
+ statusFailed.get().message());
+
+ // TODO(jaybuff): When MESOS-2035 is addressed we should validate
+ // that statusFailed.get().reason() is correctly set here.
+
+ driver.stop();
+ driver.join();
+
+ // We expect the executor to have exited, and if not in Shutdown
+ // the test will fail because of the executor process still running.
+ Shutdown();
+}
+
+
+// When the docker executor container fails to launch, docker inspect
+// future that is in a retry loop should be discarded.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_DockerInspectDiscard)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+
+ MockDocker* mockDocker = new MockDocker(tests::flags.docker);
+ Shared<Docker> docker(mockDocker);
+
+ Future<Docker::Container> inspect;
+ EXPECT_CALL(*mockDocker, inspect(_, _))
+ .WillOnce(FutureResult(&inspect,
+ Invoke((MockDocker*) docker.get(),
+ &MockDocker::_inspect)));
+
+ EXPECT_CALL(*mockDocker, run(_, _, _, _, _, _, _, _, _))
+ .WillOnce(Return(Failure("Run failed")));
+
+ Fetcher fetcher;
+
+ // The docker containerizer will free the process, so we must
+ // allocate on the heap.
+ MockDockerContainerizerProcess* process =
+ new MockDockerContainerizerProcess(flags, &fetcher, docker);
+
+ MockDockerContainerizer dockerContainerizer(
+ (Owned<DockerContainerizerProcess>(process)));
+
+ Try<PID<Slave>> slave = StartSlave(&dockerContainerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->CopyFrom(offer.slave_id());
+ task.mutable_resources()->CopyFrom(offer.resources());
+
+ ExecutorInfo executorInfo;
+ ExecutorID executorId;
+ executorId.set_value("e1");
+ executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+ CommandInfo command;
+ command.set_value("/bin/test-executor");
+ executorInfo.mutable_command()->CopyFrom(command);
+
+ ContainerInfo containerInfo;
+ containerInfo.set_type(ContainerInfo::DOCKER);
+
+ // TODO(tnachen): Use local image to test if possible.
+ ContainerInfo::DockerInfo dockerInfo;
+ dockerInfo.set_image("tnachen/test-executor");
+
+ containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+ executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+ task.mutable_executor()->CopyFrom(executorInfo);
+
+ Future<TaskStatus> statusFailed;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusFaile
<TRUNCATED>