You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2014/11/22 00:31:36 UTC
mesos git commit: Allow mesos containerizer to destroy correctly at
each state.
Repository: mesos
Updated Branches:
refs/heads/master 75bf214e6 -> 25489e53e
Allow mesos containerizer to destroy correctly at each state.
Review: https://reviews.apache.org/r/28141
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25489e53
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25489e53
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25489e53
Branch: refs/heads/master
Commit: 25489e53e9f308c5fca3d0293aeceb716b53149d
Parents: 75bf214
Author: Timothy Chen <tn...@apache.org>
Authored: Sat Oct 25 18:52:21 2014 -0700
Committer: Timothy Chen <tn...@apache.org>
Committed: Fri Nov 21 15:31:42 2014 -0800
----------------------------------------------------------------------
src/slave/containerizer/mesos/containerizer.cpp | 231 +++++++++++++------
src/slave/containerizer/mesos/containerizer.hpp | 96 +++++---
src/tests/containerizer_tests.cpp | 93 ++++++++
3 files changed, 312 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 24f90b6..55c2922 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -173,25 +173,31 @@ MesosContainerizer::MesosContainerizer(
bool local,
const Owned<Launcher>& launcher,
const vector<Owned<Isolator>>& isolators)
+ : process(new MesosContainerizerProcess(flags, local, launcher, isolators))
{
- process = new MesosContainerizerProcess(
- flags, local, launcher, isolators);
- spawn(process);
+ spawn(process.get());
+}
+
+
+MesosContainerizer::MesosContainerizer(
+ const Owned<MesosContainerizerProcess>& _process)
+ : process(_process)
+{
+ spawn(process.get());
}
MesosContainerizer::~MesosContainerizer()
{
- terminate(process);
- process::wait(process);
- delete process;
+ terminate(process.get());
+ process::wait(process.get());
}
Future<Nothing> MesosContainerizer::recover(
const Option<state::SlaveState>& state)
{
- return dispatch(process, &MesosContainerizerProcess::recover, state);
+ return dispatch(process.get(), &MesosContainerizerProcess::recover, state);
}
@@ -204,7 +210,7 @@ Future<bool> MesosContainerizer::launch(
const PID<Slave>& slavePid,
bool checkpoint)
{
- return dispatch(process,
+ return dispatch(process.get(),
&MesosContainerizerProcess::launch,
containerId,
executorInfo,
@@ -226,7 +232,7 @@ Future<bool> MesosContainerizer::launch(
const PID<Slave>& slavePid,
bool checkpoint)
{
- return dispatch(process,
+ return dispatch(process.get(),
&MesosContainerizerProcess::launch,
containerId,
taskInfo,
@@ -243,7 +249,7 @@ Future<Nothing> MesosContainerizer::update(
const ContainerID& containerId,
const Resources& resources)
{
- return dispatch(process,
+ return dispatch(process.get(),
&MesosContainerizerProcess::update,
containerId,
resources);
@@ -253,26 +259,26 @@ Future<Nothing> MesosContainerizer::update(
Future<ResourceStatistics> MesosContainerizer::usage(
const ContainerID& containerId)
{
- return dispatch(process, &MesosContainerizerProcess::usage, containerId);
+ return dispatch(process.get(), &MesosContainerizerProcess::usage, containerId);
}
Future<containerizer::Termination> MesosContainerizer::wait(
const ContainerID& containerId)
{
- return dispatch(process, &MesosContainerizerProcess::wait, containerId);
+ return dispatch(process.get(), &MesosContainerizerProcess::wait, containerId);
}
void MesosContainerizer::destroy(const ContainerID& containerId)
{
- dispatch(process, &MesosContainerizerProcess::destroy, containerId);
+ dispatch(process.get(), &MesosContainerizerProcess::destroy, containerId);
}
Future<hashset<ContainerID>> MesosContainerizer::containers()
{
- return dispatch(process, &MesosContainerizerProcess::containers);
+ return dispatch(process.get(), &MesosContainerizerProcess::containers);
}
@@ -356,16 +362,20 @@ Future<Nothing> MesosContainerizerProcess::__recover(
{
foreach (const RunState& run, recovered) {
CHECK_SOME(run.id);
- const ContainerID& containerId = run.id.get();
+ CHECK_SOME(run.forkedPid);
- Owned<Promise<containerizer::Termination>> promise(
- new Promise<containerizer::Termination>());
- promises.put(containerId, promise);
+ const ContainerID& containerId = run.id.get();
- CHECK_SOME(run.forkedPid);
+ Container* container = new Container();
Future<Option<int>> status = process::reap(run.forkedPid.get());
- statuses[containerId] = status;
status.onAny(defer(self(), &Self::reaped, containerId));
+ container->status = status;
+ // We only checkpoint the containerizer pid after the container
+ // successfully launched, therefore we can assume checkpointed
+ // containers should be running after recover.
+ container->state = RUNNING;
+
+ containers_[containerId] = Owned<Container>(container);
foreach (const Owned<Isolator>& isolator, isolators) {
isolator->watch(containerId)
@@ -395,7 +405,7 @@ Future<bool> MesosContainerizerProcess::launch(
const PID<Slave>& slavePid,
bool checkpoint)
{
- if (promises.contains(containerId)) {
+ if (containers_.contains(containerId)) {
LOG(ERROR) << "Cannot start already running container '"
<< containerId << "'";
return Failure("Container already started");
@@ -415,12 +425,10 @@ Future<bool> MesosContainerizerProcess::launch(
return false;
}
- Owned<Promise<containerizer::Termination>> promise(
- new Promise<containerizer::Termination>());
- promises.put(containerId, promise);
-
- // Store the resources for usage().
- resources.put(containerId, executorInfo.resources());
+ Container* container = new Container();
+ container->resources = executorInfo.resources();
+ container->state = PREPARING;
+ containers_[containerId] = Owned<Container>(container);
LOG(INFO) << "Starting container '" << containerId
<< "' for executor '" << executorInfo.executor_id()
@@ -525,6 +533,14 @@ Future<Nothing> MesosContainerizerProcess::fetch(
const string& directory,
const Option<string>& user)
{
+ if (!containers_.contains(containerId)) {
+ return Failure("Container is already destroyed");
+ }
+
+ if (commandInfo.uris().size() == 0) {
+ return Nothing();
+ }
+
Try<Subprocess> fetcher = fetcher::run(
commandInfo,
directory,
@@ -535,6 +551,13 @@ Future<Nothing> MesosContainerizerProcess::fetch(
return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
}
+ // TODO(tnachen): Currently the fetcher won't shutdown when slave
+ // exits. This means the fetcher will still be running when slave
+ // restarts and after recovering. We won't resume the task since
+ // it hasn't checkpointed yet. Once the fetcher supports existing
+ // on slave it will be removed automatically.
+ containers_[containerId]->fetcher = fetcher.get();
+
return fetcher.get().status()
.then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
}
@@ -550,6 +573,14 @@ Future<bool> MesosContainerizerProcess::_launch(
bool checkpoint,
const list<Option<CommandInfo>>& commands)
{
+ if (!containers_.contains(containerId)) {
+ return Failure("Container has been destroyed");
+ }
+
+ if (containers_[containerId]->state == DESTROYING) {
+ return Failure("Container is currently being destroyed");
+ }
+
// Prepare environment variables for the executor.
map<string, string> env = executorEnvironment(
executorInfo,
@@ -643,8 +674,8 @@ Future<bool> MesosContainerizerProcess::_launch(
// Monitor the executor's pid. We keep the future because we'll
// refer to it again during container destroy.
Future<Option<int>> status = process::reap(pid);
- statuses.put(containerId, status);
status.onAny(defer(self(), &Self::reaped, containerId));
+ containers_[containerId]->status = status;
return isolate(containerId, pid)
.then(defer(self(),
@@ -669,6 +700,10 @@ Future<bool> MesosContainerizerProcess::isolate(
const ContainerID& containerId,
pid_t _pid)
{
+ CHECK(containers_.contains(containerId));
+
+ containers_[containerId]->state = ISOLATING;
+
// Set up callbacks for isolator limitations.
foreach (const Owned<Isolator>& isolator, isolators) {
isolator->watch(containerId)
@@ -685,8 +720,11 @@ Future<bool> MesosContainerizerProcess::isolate(
}
// Wait for all isolators to complete.
- return collect(futures)
- .then(lambda::bind(&_isolate));
+ Future<list<Nothing>> future = collect(futures);
+
+ containers_[containerId]->isolation = future;
+
+ return future.then(lambda::bind(&_isolate));
}
@@ -696,7 +734,8 @@ Future<bool> MesosContainerizerProcess::exec(
{
// The container may be destroyed before we exec the executor so
// return failure here.
- if (!promises.contains(containerId)) {
+ if (!containers_.contains(containerId) ||
+ containers_[containerId]->state == DESTROYING) {
return Failure("Container destroyed during launch");
}
@@ -712,6 +751,8 @@ Future<bool> MesosContainerizerProcess::exec(
string(strerror(errno)));
}
+ containers_[containerId]->state = RUNNING;
+
return true;
}
@@ -719,11 +760,11 @@ Future<bool> MesosContainerizerProcess::exec(
Future<containerizer::Termination> MesosContainerizerProcess::wait(
const ContainerID& containerId)
{
- if (!promises.contains(containerId)) {
+ if (!containers_.contains(containerId)) {
return Failure("Unknown container: " + stringify(containerId));
}
- return promises[containerId]->future();
+ return containers_[containerId]->promise.future();
}
@@ -731,10 +772,7 @@ Future<Nothing> MesosContainerizerProcess::update(
const ContainerID& containerId,
const Resources& _resources)
{
- // The resources hashmap won't initially contain the container's
- // resources after recovery so we must check the promises hashmap
- // (which is set during recovery).
- if (!promises.contains(containerId)) {
+ if (!containers_.contains(containerId)) {
// It is not considered a failure if the container is not known
// because the slave will attempt to update the container's
// resources on a task's terminal state change but the executor
@@ -743,8 +781,14 @@ Future<Nothing> MesosContainerizerProcess::update(
return Nothing();
}
+ if (containers_[containerId]->state == DESTROYING) {
+ LOG(WARNING) << "Ignoring update for currently being destroyed container: "
+ << containerId;
+ return Nothing();
+ }
+
// Store the resources for usage().
- resources.put(containerId, _resources);
+ containers_[containerId]->resources = _resources;
// Update each isolator.
list<Future<Nothing>> futures;
@@ -802,7 +846,7 @@ Future<ResourceStatistics> _usage(
Future<ResourceStatistics> MesosContainerizerProcess::usage(
const ContainerID& containerId)
{
- if (!promises.contains(containerId)) {
+ if (!containers_.contains(containerId)) {
return Failure("Unknown container: " + stringify(containerId));
}
@@ -816,41 +860,81 @@ Future<ResourceStatistics> MesosContainerizerProcess::usage(
// after an update() because they aren't part of the SlaveState.
return await(futures)
.then(lambda::bind(
- _usage, containerId, resources.get(containerId), lambda::_1));
+ _usage,
+ containerId,
+ containers_[containerId]->resources,
+ lambda::_1));
}
void MesosContainerizerProcess::destroy(const ContainerID& containerId)
{
- if (!promises.contains(containerId)) {
+ if (!containers_.contains(containerId)) {
LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
return;
}
- if (destroying.contains(containerId)) {
+ Container* container = containers_[containerId].get();
+
+ if (container->state == DESTROYING) {
// Destroy has already been initiated.
return;
}
- destroying.insert(containerId);
LOG(INFO) << "Destroying container '" << containerId << "'";
- if (statuses.contains(containerId)) {
- // Kill all processes then continue destruction.
- launcher->destroy(containerId)
- .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
- } else {
- // The executor never forked so no processes to kill, go straight
- // to __destroy() with status = None().
- __destroy(containerId, None());
+ if (container->state == PREPARING) {
+ // We cannot simply terminate the container if it's preparing
+ // since isolator's prepare doesn't need any cleanup.
+ containerizer::Termination termination;
+ termination.set_killed(true);
+ termination.set_message("Container destroyed while preparing isolators");
+ container->promise.set(termination);
+
+ containers_.erase(containerId);
+
+ return;
+ }
+
+ if (container->state == FETCHING && container->fetcher.isSome()) {
+ VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
+ // Best effort kill the entire fetcher tree.
+ os::killtree(container->fetcher.get().pid(), SIGKILL);
+ }
+
+ if (container->state == ISOLATING) {
+ VLOG(1) << "Waiting for the isolators to complete for container '"
+ << containerId << "'";
+
+ container->state = DESTROYING;
+
+ // Wait for the isolators to finish isolating before we start
+ // to destroy the container.
+ container->isolation
+ .onAny(defer(self(), &Self::_destroy, containerId));
+
+ return;
}
+
+ container->state = DESTROYING;
+ _destroy(containerId);
}
-void MesosContainerizerProcess::_destroy(
+void MesosContainerizerProcess::_destroy(const ContainerID& containerId)
+{
+ // Kill all processes then continue destruction.
+ launcher->destroy(containerId)
+ .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::__destroy(
const ContainerID& containerId,
const Future<Nothing>& future)
{
+ CHECK(containers_.contains(containerId));
+
// Something has gone wrong and the launcher wasn't able to kill all
// the processes in the container. We cannot clean up the isolators
// because they may require that all processes have exited so just
@@ -858,19 +942,20 @@ void MesosContainerizerProcess::_destroy(
// TODO(idownes): This is a pretty bad state to be in but we should
// consider cleaning up here.
if (!future.isReady()) {
- promises[containerId]->fail(
+ containers_[containerId]->promise.fail(
"Failed to destroy container " + stringify(containerId) + ": " +
(future.isFailed() ? future.failure() : "discarded future"));
- destroying.erase(containerId);
+ containers_.erase(containerId);
+
return;
}
// We've successfully killed all processes in the container so get
// the exit status of the executor when it's ready (it may already
// be) and continue the destroy.
- statuses.get(containerId).get()
- .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
+ containers_[containerId]->status
+ .onAny(defer(self(), &Self::___destroy, containerId, lambda::_1));
}
@@ -911,7 +996,7 @@ static T reversed(const T& t)
}
-void MesosContainerizerProcess::__destroy(
+void MesosContainerizerProcess::___destroy(
const ContainerID& containerId,
const Future<Option<int>>& status)
{
@@ -930,7 +1015,7 @@ void MesosContainerizerProcess::__destroy(
// Continue destroy when we're done trying to clean up.
f.onAny(defer(self(),
- &Self::___destroy,
+ &Self::____destroy,
containerId,
status,
lambda::_1));
@@ -939,7 +1024,7 @@ void MesosContainerizerProcess::__destroy(
}
-void MesosContainerizerProcess::___destroy(
+void MesosContainerizerProcess::____destroy(
const ContainerID& containerId,
const Future<Option<int>>& status,
const Future<list<Future<Nothing>>>& cleanups)
@@ -947,18 +1032,21 @@ void MesosContainerizerProcess::___destroy(
// This should not occur because we only use the Future<list> to
// facilitate chaining.
CHECK_READY(cleanups);
+ CHECK(containers_.contains(containerId));
+
+ Container* container = containers_[containerId].get();
// Check cleanup succeeded for all isolators. If not, we'll fail the
// container termination and remove the 'destroying' flag but leave
// all other state. The container is now in an inconsistent state.
foreach (const Future<Nothing>& cleanup, cleanups.get()) {
if (!cleanup.isReady()) {
- promises[containerId]->fail(
+ container->promise.fail(
"Failed to clean up an isolator when destroying container '" +
stringify(containerId) + "' :" +
(cleanup.isFailed() ? cleanup.failure() : "discarded future"));
- destroying.erase(containerId);
+ containers_.erase(containerId);
return;
}
@@ -971,9 +1059,9 @@ void MesosContainerizerProcess::___destroy(
// exit.
bool killed = false;
string message;
- if (limitations.contains(containerId)) {
+ if (container->limitations.size() > 0) {
killed = true;
- foreach (const Limitation& limitation, limitations.get(containerId)) {
+ foreach (const Limitation& limitation, container->limitations) {
message += limitation.message;
}
message = strings::trim(message);
@@ -988,19 +1076,15 @@ void MesosContainerizerProcess::___destroy(
termination.set_status(status.get().get());
}
- promises[containerId]->set(termination);
+ container->promise.set(termination);
- promises.erase(containerId);
- statuses.erase(containerId);
- limitations.erase(containerId);
- resources.erase(containerId);
- destroying.erase(containerId);
+ containers_.erase(containerId);
}
void MesosContainerizerProcess::reaped(const ContainerID& containerId)
{
- if (!promises.contains(containerId)) {
+ if (!containers_.contains(containerId)) {
return;
}
@@ -1015,7 +1099,8 @@ void MesosContainerizerProcess::limited(
const ContainerID& containerId,
const Future<Limitation>& future)
{
- if (!promises.contains(containerId)) {
+ if (!containers_.contains(containerId) ||
+ containers_[containerId]->state == DESTROYING) {
return;
}
@@ -1023,7 +1108,7 @@ void MesosContainerizerProcess::limited(
LOG(INFO) << "Container " << containerId << " has reached its limit for"
<< " resource " << future.get().resource
<< " and will be terminated";
- limitations.put(containerId, future.get());
+ containers_[containerId]->limitations.push_back(future.get());
} else {
// TODO(idownes): A discarded future will not be an error when
// isolators discard their promises after cleanup.
@@ -1039,7 +1124,7 @@ void MesosContainerizerProcess::limited(
Future<hashset<ContainerID>> MesosContainerizerProcess::containers()
{
- return promises.keys();
+ return containers_.keys();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 3baea31..0b635d4 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -47,6 +47,9 @@ public:
const process::Owned<Launcher>& launcher,
const std::vector<process::Owned<Isolator>>& isolators);
+ // Used for testing.
+ MesosContainerizer(const process::Owned<MesosContainerizerProcess>& _process);
+
virtual ~MesosContainerizer();
virtual process::Future<Nothing> recover(
@@ -86,7 +89,7 @@ public:
virtual process::Future<hashset<ContainerID>> containers();
private:
- MesosContainerizerProcess* process;
+ process::Owned<MesosContainerizerProcess> process;
};
@@ -106,10 +109,10 @@ public:
virtual ~MesosContainerizerProcess() {}
- process::Future<Nothing> recover(
+ virtual process::Future<Nothing> recover(
const Option<state::SlaveState>& state);
- process::Future<bool> launch(
+ virtual process::Future<bool> launch(
const ContainerID& containerId,
const ExecutorInfo& executorInfo,
const std::string& directory,
@@ -118,7 +121,7 @@ public:
const process::PID<Slave>& slavePid,
bool checkpoint);
- process::Future<bool> launch(
+ virtual process::Future<bool> launch(
const ContainerID& containerId,
const TaskInfo& taskInfo,
const ExecutorInfo& executorInfo,
@@ -128,19 +131,23 @@ public:
const process::PID<Slave>& slavePid,
bool checkpoint);
- process::Future<Nothing> update(
+ virtual process::Future<Nothing> update(
const ContainerID& containerId,
const Resources& resources);
- process::Future<ResourceStatistics> usage(
+ virtual process::Future<ResourceStatistics> usage(
const ContainerID& containerId);
- process::Future<containerizer::Termination> wait(
+ virtual process::Future<containerizer::Termination> wait(
const ContainerID& containerId);
- void destroy(const ContainerID& containerId);
+ virtual process::Future<bool> exec(
+ const ContainerID& containerId,
+ int pipeWrite);
+
+ virtual void destroy(const ContainerID& containerId);
- process::Future<hashset<ContainerID>> containers();
+ virtual process::Future<hashset<ContainerID>> containers();
private:
process::Future<Nothing> _recover(
@@ -175,23 +182,22 @@ private:
const ContainerID& containerId,
pid_t _pid);
- process::Future<bool> exec(
- const ContainerID& containerId,
- int pipeWrite);
+ // Continues 'destroy()' once isolators has completed.
+ void _destroy(const ContainerID& containerId);
// Continues 'destroy()' once all processes have been killed by the launcher.
- void _destroy(
+ void __destroy(
const ContainerID& containerId,
const process::Future<Nothing>& future);
// Continues '_destroy()' once we get the exit status of the executor.
- void __destroy(
+ void ___destroy(
const ContainerID& containerId,
const process::Future<Option<int>>& status);
// Continues (and completes) '__destroy()' once all isolators have completed
// cleanup.
- void ___destroy(
+ void ____destroy(
const ContainerID& containerId,
const process::Future<Option<int>>& status,
const process::Future<std::list<process::Future<Nothing>>>& cleanups);
@@ -211,26 +217,46 @@ private:
const process::Owned<Launcher> launcher;
const std::vector<process::Owned<Isolator>> isolators;
- // TODO(idownes): Consider putting these per-container variables into a
- // struct.
- // Promises for futures returned from wait().
- hashmap<ContainerID,
- process::Owned<process::Promise<containerizer::Termination>>> promises;
-
- // We need to keep track of the future exit status for each executor because
- // we'll only get a single notification when the executor exits.
- hashmap<ContainerID, process::Future<Option<int>>> statuses;
-
- // We keep track of any limitations received from each isolator so we can
- // determine the cause of an executor termination.
- multihashmap<ContainerID, Limitation> limitations;
-
- // We keep track of the resources for each container so we can set the
- // ResourceStatistics limits in usage().
- hashmap<ContainerID, Resources> resources;
-
- // Set of containers that are in process of being destroyed.
- hashset<ContainerID> destroying;
+ enum State
+ {
+ PREPARING,
+ ISOLATING,
+ FETCHING,
+ RUNNING,
+ DESTROYING
+ };
+
+ struct Container
+ {
+ // Promise for futures returned from wait().
+ process::Promise<containerizer::Termination> promise;
+
+ // We need to keep track of the future exit status for each
+ // executor because we'll only get a single notification when
+ // the executor exits.
+ process::Future<Option<int>> status;
+
+ // We keep track of the future that is waiting for all the
+ // isolator's futures, so that destroy will only start calling
+ // cleanup after all isolators has finished isolating.
+ process::Future<std::list<Nothing>> isolation;
+
+ // We keep track of any limitations received from each isolator so we can
+ // determine the cause of an executor termination.
+ std::vector<Limitation> limitations;
+
+ // The mesos-fetcher subprocess, that we keep around so we can
+ // stop the fetcher when the container is destroyed.
+ Option<process::Subprocess> fetcher;
+
+ // We keep track of the resources for each container so we can set the
+ // ResourceStatistics limits in usage().
+ Resources resources;
+
+ State state;
+ };
+
+ hashmap<ContainerID, process::Owned<Container>> containers_;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index a63897b..02a5f15 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -49,6 +49,11 @@ using std::map;
using std::string;
using std::vector;
+using testing::_;
+using testing::DoAll;
+using testing::Invoke;
+using testing::Return;
+
class MesosContainerizerIsolatorPreparationTest :
public tests::TemporaryDirectoryTest
{
@@ -291,3 +296,91 @@ TEST_F(MesosContainerizerExecuteTest, IoRedirection)
delete containerizer.get();
}
+
+
+class MesosContainerizerDestroyTest : public tests::TemporaryDirectoryTest {};
+
+class MockMesosContainerizerProcess : public MesosContainerizerProcess
+{
+public:
+ MockMesosContainerizerProcess(
+ const Flags& flags,
+ bool local,
+ const process::Owned<Launcher>& launcher,
+ const std::vector<process::Owned<Isolator>>& isolators)
+ : MesosContainerizerProcess(flags, local, launcher, isolators)
+ {
+ // NOTE: See TestContainerizer::setup for why we use
+ // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+ // 'ON_CALL' and 'WillByDefault'.
+ EXPECT_CALL(*this, exec(_, _))
+ .WillRepeatedly(Invoke(this, &MockMesosContainerizerProcess::_exec));
+ }
+
+ MOCK_METHOD2(
+ exec,
+ process::Future<bool>(
+ const ContainerID& containerId,
+ int pipeWrite));
+
+ process::Future<bool> _exec(
+ const ContainerID& containerId,
+ int pipeWrite)
+ {
+ return MesosContainerizerProcess::exec(
+ containerId,
+ pipeWrite);
+ }
+};
+
+
+// Destroying a mesos containerizer while it is fetching should
+// complete without waiting for the fetching to finish.
+TEST_F(MesosContainerizerDestroyTest, DestroyWhileFetching)
+{
+ slave::Flags flags;
+ Try<Launcher*> launcher = PosixLauncher::create(flags);
+ ASSERT_SOME(launcher);
+ std::vector<process::Owned<Isolator>> isolators;
+
+ MockMesosContainerizerProcess* process = new MockMesosContainerizerProcess(
+ flags,
+ true,
+ Owned<Launcher>(launcher.get()),
+ isolators);
+
+ Future<Nothing> exec;
+ Promise<bool> promise;
+ // Letting exec hang to simulate a long fetch.
+ EXPECT_CALL(*process, exec(_, _))
+ .WillOnce(DoAll(FutureSatisfy(&exec),
+ Return(promise.future())));
+
+ MesosContainerizer containerizer((Owned<MesosContainerizerProcess>(process)));
+
+ ContainerID containerId;
+ containerId.set_value("test_container");
+
+ TaskInfo taskInfo;
+ CommandInfo commandInfo;
+ taskInfo.mutable_command()->MergeFrom(commandInfo);
+
+ containerizer.launch(
+ containerId,
+ taskInfo,
+ CREATE_EXECUTOR_INFO("executor", "exit 0"),
+ os::getcwd(),
+ None(),
+ SlaveID(),
+ process::PID<Slave>(),
+ false);
+
+ Future<containerizer::Termination> wait = containerizer.wait(containerId);
+
+ AWAIT_READY(exec);
+
+ containerizer.destroy(containerId);
+
+ // The container should still exit even if fetch didn't complete.
+ AWAIT_READY(wait);
+}