You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2014/11/22 00:31:36 UTC

mesos git commit: Allow mesos containerizer to destroy correctly at each state.

Repository: mesos
Updated Branches:
  refs/heads/master 75bf214e6 -> 25489e53e


Allow mesos containerizer to destroy correctly at each state.

Review: https://reviews.apache.org/r/28141


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25489e53
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25489e53
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25489e53

Branch: refs/heads/master
Commit: 25489e53e9f308c5fca3d0293aeceb716b53149d
Parents: 75bf214
Author: Timothy Chen <tn...@apache.org>
Authored: Sat Oct 25 18:52:21 2014 -0700
Committer: Timothy Chen <tn...@apache.org>
Committed: Fri Nov 21 15:31:42 2014 -0800

----------------------------------------------------------------------
 src/slave/containerizer/mesos/containerizer.cpp | 231 +++++++++++++------
 src/slave/containerizer/mesos/containerizer.hpp |  96 +++++---
 src/tests/containerizer_tests.cpp               |  93 ++++++++
 3 files changed, 312 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index 24f90b6..55c2922 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -173,25 +173,31 @@ MesosContainerizer::MesosContainerizer(
     bool local,
     const Owned<Launcher>& launcher,
     const vector<Owned<Isolator>>& isolators)
+  : process(new MesosContainerizerProcess(flags, local, launcher, isolators))
 {
-  process = new MesosContainerizerProcess(
-      flags, local, launcher, isolators);
-  spawn(process);
+  spawn(process.get());
+}
+
+
+MesosContainerizer::MesosContainerizer(
+    const Owned<MesosContainerizerProcess>& _process)
+  : process(_process)
+{
+  spawn(process.get());
 }
 
 
 MesosContainerizer::~MesosContainerizer()
 {
-  terminate(process);
-  process::wait(process);
-  delete process;
+  terminate(process.get());
+  process::wait(process.get());
 }
 
 
 Future<Nothing> MesosContainerizer::recover(
     const Option<state::SlaveState>& state)
 {
-  return dispatch(process, &MesosContainerizerProcess::recover, state);
+  return dispatch(process.get(), &MesosContainerizerProcess::recover, state);
 }
 
 
@@ -204,7 +210,7 @@ Future<bool> MesosContainerizer::launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  return dispatch(process,
+  return dispatch(process.get(),
                   &MesosContainerizerProcess::launch,
                   containerId,
                   executorInfo,
@@ -226,7 +232,7 @@ Future<bool> MesosContainerizer::launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  return dispatch(process,
+  return dispatch(process.get(),
                   &MesosContainerizerProcess::launch,
                   containerId,
                   taskInfo,
@@ -243,7 +249,7 @@ Future<Nothing> MesosContainerizer::update(
     const ContainerID& containerId,
     const Resources& resources)
 {
-  return dispatch(process,
+  return dispatch(process.get(),
                   &MesosContainerizerProcess::update,
                   containerId,
                   resources);
@@ -253,26 +259,26 @@ Future<Nothing> MesosContainerizer::update(
 Future<ResourceStatistics> MesosContainerizer::usage(
     const ContainerID& containerId)
 {
-  return dispatch(process, &MesosContainerizerProcess::usage, containerId);
+  return dispatch(process.get(), &MesosContainerizerProcess::usage, containerId);
 }
 
 
 Future<containerizer::Termination> MesosContainerizer::wait(
     const ContainerID& containerId)
 {
-  return dispatch(process, &MesosContainerizerProcess::wait, containerId);
+  return dispatch(process.get(), &MesosContainerizerProcess::wait, containerId);
 }
 
 
 void MesosContainerizer::destroy(const ContainerID& containerId)
 {
-  dispatch(process, &MesosContainerizerProcess::destroy, containerId);
+  dispatch(process.get(), &MesosContainerizerProcess::destroy, containerId);
 }
 
 
 Future<hashset<ContainerID>> MesosContainerizer::containers()
 {
-  return dispatch(process, &MesosContainerizerProcess::containers);
+  return dispatch(process.get(), &MesosContainerizerProcess::containers);
 }
 
 
@@ -356,16 +362,20 @@ Future<Nothing> MesosContainerizerProcess::__recover(
 {
   foreach (const RunState& run, recovered) {
     CHECK_SOME(run.id);
-    const ContainerID& containerId = run.id.get();
+    CHECK_SOME(run.forkedPid);
 
-    Owned<Promise<containerizer::Termination>> promise(
-        new Promise<containerizer::Termination>());
-    promises.put(containerId, promise);
+    const ContainerID& containerId = run.id.get();
 
-    CHECK_SOME(run.forkedPid);
+    Container* container = new Container();
     Future<Option<int>> status = process::reap(run.forkedPid.get());
-    statuses[containerId] = status;
     status.onAny(defer(self(), &Self::reaped, containerId));
+    container->status = status;
+    // We only checkpoint the containerizer pid after the container
+    // successfully launched, therefore we can assume checkpointed
+    // containers should be running after recover.
+    container->state = RUNNING;
+
+    containers_[containerId] = Owned<Container>(container);
 
     foreach (const Owned<Isolator>& isolator, isolators) {
       isolator->watch(containerId)
@@ -395,7 +405,7 @@ Future<bool> MesosContainerizerProcess::launch(
     const PID<Slave>& slavePid,
     bool checkpoint)
 {
-  if (promises.contains(containerId)) {
+  if (containers_.contains(containerId)) {
     LOG(ERROR) << "Cannot start already running container '"
                << containerId << "'";
     return Failure("Container already started");
@@ -415,12 +425,10 @@ Future<bool> MesosContainerizerProcess::launch(
     return false;
   }
 
-  Owned<Promise<containerizer::Termination>> promise(
-      new Promise<containerizer::Termination>());
-  promises.put(containerId, promise);
-
-  // Store the resources for usage().
-  resources.put(containerId, executorInfo.resources());
+  Container* container = new Container();
+  container->resources = executorInfo.resources();
+  container->state = PREPARING;
+  containers_[containerId] = Owned<Container>(container);
 
   LOG(INFO) << "Starting container '" << containerId
             << "' for executor '" << executorInfo.executor_id()
@@ -525,6 +533,14 @@ Future<Nothing> MesosContainerizerProcess::fetch(
     const string& directory,
     const Option<string>& user)
 {
+  if (!containers_.contains(containerId)) {
+    return Failure("Container is already destroyed");
+  }
+
+  if (commandInfo.uris().size() == 0) {
+    return Nothing();
+  }
+
   Try<Subprocess> fetcher = fetcher::run(
       commandInfo,
       directory,
@@ -535,6 +551,13 @@ Future<Nothing> MesosContainerizerProcess::fetch(
     return Failure("Failed to execute mesos-fetcher: " + fetcher.error());
   }
 
+  // TODO(tnachen): Currently the fetcher won't shutdown when slave
+  // exits. This means the fetcher will still be running when slave
+  // restarts and after recovering. We won't resume the task since
+  // it hasn't checkpointed yet. Once the fetcher supports existing
+  // on slave it will be removed automatically.
+  containers_[containerId]->fetcher = fetcher.get();
+
   return fetcher.get().status()
     .then(lambda::bind(&fetcher::_run, containerId, lambda::_1));
 }
@@ -550,6 +573,14 @@ Future<bool> MesosContainerizerProcess::_launch(
     bool checkpoint,
     const list<Option<CommandInfo>>& commands)
 {
+  if (!containers_.contains(containerId)) {
+    return Failure("Container has been destroyed");
+  }
+
+  if (containers_[containerId]->state == DESTROYING) {
+    return Failure("Container is currently being destroyed");
+  }
+
   // Prepare environment variables for the executor.
   map<string, string> env = executorEnvironment(
       executorInfo,
@@ -643,8 +674,8 @@ Future<bool> MesosContainerizerProcess::_launch(
   // Monitor the executor's pid. We keep the future because we'll
   // refer to it again during container destroy.
   Future<Option<int>> status = process::reap(pid);
-  statuses.put(containerId, status);
   status.onAny(defer(self(), &Self::reaped, containerId));
+  containers_[containerId]->status = status;
 
   return isolate(containerId, pid)
     .then(defer(self(),
@@ -669,6 +700,10 @@ Future<bool> MesosContainerizerProcess::isolate(
     const ContainerID& containerId,
     pid_t _pid)
 {
+  CHECK(containers_.contains(containerId));
+
+  containers_[containerId]->state = ISOLATING;
+
   // Set up callbacks for isolator limitations.
   foreach (const Owned<Isolator>& isolator, isolators) {
     isolator->watch(containerId)
@@ -685,8 +720,11 @@ Future<bool> MesosContainerizerProcess::isolate(
   }
 
   // Wait for all isolators to complete.
-  return collect(futures)
-    .then(lambda::bind(&_isolate));
+  Future<list<Nothing>> future = collect(futures);
+
+  containers_[containerId]->isolation = future;
+
+  return future.then(lambda::bind(&_isolate));
 }
 
 
@@ -696,7 +734,8 @@ Future<bool> MesosContainerizerProcess::exec(
 {
   // The container may be destroyed before we exec the executor so
   // return failure here.
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId) ||
+      containers_[containerId]->state == DESTROYING) {
     return Failure("Container destroyed during launch");
   }
 
@@ -712,6 +751,8 @@ Future<bool> MesosContainerizerProcess::exec(
                    string(strerror(errno)));
   }
 
+  containers_[containerId]->state = RUNNING;
+
   return true;
 }
 
@@ -719,11 +760,11 @@ Future<bool> MesosContainerizerProcess::exec(
 Future<containerizer::Termination> MesosContainerizerProcess::wait(
     const ContainerID& containerId)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     return Failure("Unknown container: " + stringify(containerId));
   }
 
-  return promises[containerId]->future();
+  return containers_[containerId]->promise.future();
 }
 
 
@@ -731,10 +772,7 @@ Future<Nothing> MesosContainerizerProcess::update(
     const ContainerID& containerId,
     const Resources& _resources)
 {
-  // The resources hashmap won't initially contain the container's
-  // resources after recovery so we must check the promises hashmap
-  // (which is set during recovery).
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     // It is not considered a failure if the container is not known
     // because the slave will attempt to update the container's
     // resources on a task's terminal state change but the executor
@@ -743,8 +781,14 @@ Future<Nothing> MesosContainerizerProcess::update(
     return Nothing();
   }
 
+  if (containers_[containerId]->state == DESTROYING) {
+    LOG(WARNING) << "Ignoring update for currently being destroyed container: "
+                 << containerId;
+    return Nothing();
+  }
+
   // Store the resources for usage().
-  resources.put(containerId, _resources);
+  containers_[containerId]->resources = _resources;
 
   // Update each isolator.
   list<Future<Nothing>> futures;
@@ -802,7 +846,7 @@ Future<ResourceStatistics> _usage(
 Future<ResourceStatistics> MesosContainerizerProcess::usage(
     const ContainerID& containerId)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     return Failure("Unknown container: " + stringify(containerId));
   }
 
@@ -816,41 +860,81 @@ Future<ResourceStatistics> MesosContainerizerProcess::usage(
   // after an update() because they aren't part of the SlaveState.
   return await(futures)
     .then(lambda::bind(
-          _usage, containerId, resources.get(containerId), lambda::_1));
+          _usage,
+          containerId,
+          containers_[containerId]->resources,
+          lambda::_1));
 }
 
 
 void MesosContainerizerProcess::destroy(const ContainerID& containerId)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId;
     return;
   }
 
-  if (destroying.contains(containerId)) {
+  Container* container = containers_[containerId].get();
+
+  if (container->state == DESTROYING) {
     // Destroy has already been initiated.
     return;
   }
-  destroying.insert(containerId);
 
   LOG(INFO) << "Destroying container '" << containerId << "'";
 
-  if (statuses.contains(containerId)) {
-    // Kill all processes then continue destruction.
-    launcher->destroy(containerId)
-      .onAny(defer(self(), &Self::_destroy, containerId, lambda::_1));
-  } else {
-    // The executor never forked so no processes to kill, go straight
-    // to __destroy() with status = None().
-    __destroy(containerId, None());
+  if (container->state == PREPARING) {
+    // We cannot simply terminate the container if it's preparing
+    // since isolator's prepare doesn't need any cleanup.
+    containerizer::Termination termination;
+    termination.set_killed(true);
+    termination.set_message("Container destroyed while preparing isolators");
+    container->promise.set(termination);
+
+    containers_.erase(containerId);
+
+    return;
+  }
+
+  if (container->state == FETCHING && container->fetcher.isSome()) {
+    VLOG(1) << "Killing the fetcher for container '" << containerId << "'";
+    // Best effort kill the entire fetcher tree.
+    os::killtree(container->fetcher.get().pid(), SIGKILL);
+  }
+
+  if (container->state == ISOLATING) {
+    VLOG(1) << "Waiting for the isolators to complete for container '"
+            << containerId << "'";
+
+    container->state = DESTROYING;
+
+    // Wait for the isolators to finish isolating before we start
+    // to destroy the container.
+    container->isolation
+      .onAny(defer(self(), &Self::_destroy, containerId));
+
+    return;
   }
+
+  container->state = DESTROYING;
+  _destroy(containerId);
 }
 
 
-void MesosContainerizerProcess::_destroy(
+void MesosContainerizerProcess::_destroy(const ContainerID& containerId)
+{
+  // Kill all processes then continue destruction.
+  launcher->destroy(containerId)
+    .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
+}
+
+
+void MesosContainerizerProcess::__destroy(
     const ContainerID& containerId,
     const Future<Nothing>& future)
 {
+  CHECK(containers_.contains(containerId));
+
   // Something has gone wrong and the launcher wasn't able to kill all
   // the processes in the container. We cannot clean up the isolators
   // because they may require that all processes have exited so just
@@ -858,19 +942,20 @@ void MesosContainerizerProcess::_destroy(
   // TODO(idownes): This is a pretty bad state to be in but we should
   // consider cleaning up here.
   if (!future.isReady()) {
-    promises[containerId]->fail(
+    containers_[containerId]->promise.fail(
         "Failed to destroy container " + stringify(containerId) + ": " +
         (future.isFailed() ? future.failure() : "discarded future"));
 
-    destroying.erase(containerId);
+    containers_.erase(containerId);
+
     return;
   }
 
   // We've successfully killed all processes in the container so get
   // the exit status of the executor when it's ready (it may already
   // be) and continue the destroy.
-  statuses.get(containerId).get()
-    .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1));
+  containers_[containerId]->status
+    .onAny(defer(self(), &Self::___destroy, containerId, lambda::_1));
 }
 
 
@@ -911,7 +996,7 @@ static T reversed(const T& t)
 }
 
 
-void MesosContainerizerProcess::__destroy(
+void MesosContainerizerProcess::___destroy(
     const ContainerID& containerId,
     const Future<Option<int>>& status)
 {
@@ -930,7 +1015,7 @@ void MesosContainerizerProcess::__destroy(
 
   // Continue destroy when we're done trying to clean up.
   f.onAny(defer(self(),
-                &Self::___destroy,
+                &Self::____destroy,
                 containerId,
                 status,
                 lambda::_1));
@@ -939,7 +1024,7 @@ void MesosContainerizerProcess::__destroy(
 }
 
 
-void MesosContainerizerProcess::___destroy(
+void MesosContainerizerProcess::____destroy(
     const ContainerID& containerId,
     const Future<Option<int>>& status,
     const Future<list<Future<Nothing>>>& cleanups)
@@ -947,18 +1032,21 @@ void MesosContainerizerProcess::___destroy(
   // This should not occur because we only use the Future<list> to
   // facilitate chaining.
   CHECK_READY(cleanups);
+  CHECK(containers_.contains(containerId));
+
+  Container* container = containers_[containerId].get();
 
   // Check cleanup succeeded for all isolators. If not, we'll fail the
   // container termination and remove the 'destroying' flag but leave
   // all other state. The container is now in an inconsistent state.
   foreach (const Future<Nothing>& cleanup, cleanups.get()) {
     if (!cleanup.isReady()) {
-      promises[containerId]->fail(
+      container->promise.fail(
         "Failed to clean up an isolator when destroying container '" +
         stringify(containerId) + "' :" +
         (cleanup.isFailed() ? cleanup.failure() : "discarded future"));
 
-      destroying.erase(containerId);
+      containers_.erase(containerId);
 
       return;
     }
@@ -971,9 +1059,9 @@ void MesosContainerizerProcess::___destroy(
   // exit.
   bool killed = false;
   string message;
-  if (limitations.contains(containerId)) {
+  if (container->limitations.size() > 0) {
     killed = true;
-    foreach (const Limitation& limitation, limitations.get(containerId)) {
+    foreach (const Limitation& limitation, container->limitations) {
       message += limitation.message;
     }
     message = strings::trim(message);
@@ -988,19 +1076,15 @@ void MesosContainerizerProcess::___destroy(
     termination.set_status(status.get().get());
   }
 
-  promises[containerId]->set(termination);
+  container->promise.set(termination);
 
-  promises.erase(containerId);
-  statuses.erase(containerId);
-  limitations.erase(containerId);
-  resources.erase(containerId);
-  destroying.erase(containerId);
+  containers_.erase(containerId);
 }
 
 
 void MesosContainerizerProcess::reaped(const ContainerID& containerId)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId)) {
     return;
   }
 
@@ -1015,7 +1099,8 @@ void MesosContainerizerProcess::limited(
     const ContainerID& containerId,
     const Future<Limitation>& future)
 {
-  if (!promises.contains(containerId)) {
+  if (!containers_.contains(containerId) ||
+      containers_[containerId]->state == DESTROYING) {
     return;
   }
 
@@ -1023,7 +1108,7 @@ void MesosContainerizerProcess::limited(
     LOG(INFO) << "Container " << containerId << " has reached its limit for"
               << " resource " << future.get().resource
               << " and will be terminated";
-    limitations.put(containerId, future.get());
+    containers_[containerId]->limitations.push_back(future.get());
   } else {
     // TODO(idownes): A discarded future will not be an error when
     // isolators discard their promises after cleanup.
@@ -1039,7 +1124,7 @@ void MesosContainerizerProcess::limited(
 
 Future<hashset<ContainerID>> MesosContainerizerProcess::containers()
 {
-  return promises.keys();
+  return containers_.keys();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 3baea31..0b635d4 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -47,6 +47,9 @@ public:
       const process::Owned<Launcher>& launcher,
       const std::vector<process::Owned<Isolator>>& isolators);
 
+  // Used for testing.
+  MesosContainerizer(const process::Owned<MesosContainerizerProcess>& _process);
+
   virtual ~MesosContainerizer();
 
   virtual process::Future<Nothing> recover(
@@ -86,7 +89,7 @@ public:
   virtual process::Future<hashset<ContainerID>> containers();
 
 private:
-  MesosContainerizerProcess* process;
+  process::Owned<MesosContainerizerProcess> process;
 };
 
 
@@ -106,10 +109,10 @@ public:
 
   virtual ~MesosContainerizerProcess() {}
 
-  process::Future<Nothing> recover(
+  virtual process::Future<Nothing> recover(
       const Option<state::SlaveState>& state);
 
-  process::Future<bool> launch(
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const ExecutorInfo& executorInfo,
       const std::string& directory,
@@ -118,7 +121,7 @@ public:
       const process::PID<Slave>& slavePid,
       bool checkpoint);
 
-  process::Future<bool> launch(
+  virtual process::Future<bool> launch(
       const ContainerID& containerId,
       const TaskInfo& taskInfo,
       const ExecutorInfo& executorInfo,
@@ -128,19 +131,23 @@ public:
       const process::PID<Slave>& slavePid,
       bool checkpoint);
 
-  process::Future<Nothing> update(
+  virtual process::Future<Nothing> update(
       const ContainerID& containerId,
       const Resources& resources);
 
-  process::Future<ResourceStatistics> usage(
+  virtual process::Future<ResourceStatistics> usage(
       const ContainerID& containerId);
 
-  process::Future<containerizer::Termination> wait(
+  virtual process::Future<containerizer::Termination> wait(
       const ContainerID& containerId);
 
-  void destroy(const ContainerID& containerId);
+  virtual process::Future<bool> exec(
+      const ContainerID& containerId,
+      int pipeWrite);
+
+  virtual void destroy(const ContainerID& containerId);
 
-  process::Future<hashset<ContainerID>> containers();
+  virtual process::Future<hashset<ContainerID>> containers();
 
 private:
   process::Future<Nothing> _recover(
@@ -175,23 +182,22 @@ private:
       const ContainerID& containerId,
       pid_t _pid);
 
-  process::Future<bool> exec(
-      const ContainerID& containerId,
-      int pipeWrite);
+  // Continues 'destroy()' once isolators has completed.
+  void _destroy(const ContainerID& containerId);
 
   // Continues 'destroy()' once all processes have been killed by the launcher.
-  void _destroy(
+  void __destroy(
       const ContainerID& containerId,
       const process::Future<Nothing>& future);
 
   // Continues '_destroy()' once we get the exit status of the executor.
-  void __destroy(
+  void ___destroy(
       const ContainerID& containerId,
       const process::Future<Option<int>>& status);
 
   // Continues (and completes) '__destroy()' once all isolators have completed
   // cleanup.
-  void ___destroy(
+  void ____destroy(
       const ContainerID& containerId,
       const process::Future<Option<int>>& status,
       const process::Future<std::list<process::Future<Nothing>>>& cleanups);
@@ -211,26 +217,46 @@ private:
   const process::Owned<Launcher> launcher;
   const std::vector<process::Owned<Isolator>> isolators;
 
-  // TODO(idownes): Consider putting these per-container variables into a
-  // struct.
-  // Promises for futures returned from wait().
-  hashmap<ContainerID,
-    process::Owned<process::Promise<containerizer::Termination>>> promises;
-
-  // We need to keep track of the future exit status for each executor because
-  // we'll only get a single notification when the executor exits.
-  hashmap<ContainerID, process::Future<Option<int>>> statuses;
-
-  // We keep track of any limitations received from each isolator so we can
-  // determine the cause of an executor termination.
-  multihashmap<ContainerID, Limitation> limitations;
-
-  // We keep track of the resources for each container so we can set the
-  // ResourceStatistics limits in usage().
-  hashmap<ContainerID, Resources> resources;
-
-  // Set of containers that are in process of being destroyed.
-  hashset<ContainerID> destroying;
+  enum State
+  {
+    PREPARING,
+    ISOLATING,
+    FETCHING,
+    RUNNING,
+    DESTROYING
+  };
+
+  struct Container
+  {
+    // Promise for futures returned from wait().
+    process::Promise<containerizer::Termination> promise;
+
+    // We need to keep track of the future exit status for each
+    // executor because we'll only get a single notification when
+    // the executor exits.
+    process::Future<Option<int>> status;
+
+    // We keep track of the future that is waiting for all the
+    // isolator's futures, so that destroy will only start calling
+    // cleanup after all isolators has finished isolating.
+    process::Future<std::list<Nothing>> isolation;
+
+    // We keep track of any limitations received from each isolator so we can
+    // determine the cause of an executor termination.
+    std::vector<Limitation> limitations;
+
+    // The mesos-fetcher subprocess, that we keep around so we can
+    // stop the fetcher when the container is destroyed.
+    Option<process::Subprocess> fetcher;
+
+    // We keep track of the resources for each container so we can set the
+    // ResourceStatistics limits in usage().
+    Resources resources;
+
+    State state;
+  };
+
+  hashmap<ContainerID, process::Owned<Container>> containers_;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/25489e53/src/tests/containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer_tests.cpp b/src/tests/containerizer_tests.cpp
index a63897b..02a5f15 100644
--- a/src/tests/containerizer_tests.cpp
+++ b/src/tests/containerizer_tests.cpp
@@ -49,6 +49,11 @@ using std::map;
 using std::string;
 using std::vector;
 
+using testing::_;
+using testing::DoAll;
+using testing::Invoke;
+using testing::Return;
+
 class MesosContainerizerIsolatorPreparationTest :
   public tests::TemporaryDirectoryTest
 {
@@ -291,3 +296,91 @@ TEST_F(MesosContainerizerExecuteTest, IoRedirection)
 
   delete containerizer.get();
 }
+
+
+class MesosContainerizerDestroyTest : public tests::TemporaryDirectoryTest {};
+
+class MockMesosContainerizerProcess : public MesosContainerizerProcess
+{
+public:
+  MockMesosContainerizerProcess(
+      const Flags& flags,
+      bool local,
+      const process::Owned<Launcher>& launcher,
+      const std::vector<process::Owned<Isolator>>& isolators)
+    : MesosContainerizerProcess(flags, local, launcher, isolators)
+  {
+    // NOTE: See TestContainerizer::setup for why we use
+    // 'EXPECT_CALL' and 'WillRepeatedly' here instead of
+    // 'ON_CALL' and 'WillByDefault'.
+    EXPECT_CALL(*this, exec(_, _))
+      .WillRepeatedly(Invoke(this, &MockMesosContainerizerProcess::_exec));
+  }
+
+  MOCK_METHOD2(
+      exec,
+      process::Future<bool>(
+          const ContainerID& containerId,
+          int pipeWrite));
+
+  process::Future<bool> _exec(
+      const ContainerID& containerId,
+      int pipeWrite)
+  {
+    return MesosContainerizerProcess::exec(
+        containerId,
+        pipeWrite);
+  }
+};
+
+
+// Destroying a mesos containerizer while it is fetching should
+// complete without waiting for the fetching to finish.
+TEST_F(MesosContainerizerDestroyTest, DestroyWhileFetching)
+{
+  slave::Flags flags;
+  Try<Launcher*> launcher = PosixLauncher::create(flags);
+  ASSERT_SOME(launcher);
+  std::vector<process::Owned<Isolator>> isolators;
+
+  MockMesosContainerizerProcess* process = new MockMesosContainerizerProcess(
+      flags,
+      true,
+      Owned<Launcher>(launcher.get()),
+      isolators);
+
+  Future<Nothing> exec;
+  Promise<bool> promise;
+  // Letting exec hang to simulate a long fetch.
+  EXPECT_CALL(*process, exec(_, _))
+    .WillOnce(DoAll(FutureSatisfy(&exec),
+                    Return(promise.future())));
+
+  MesosContainerizer containerizer((Owned<MesosContainerizerProcess>(process)));
+
+  ContainerID containerId;
+  containerId.set_value("test_container");
+
+  TaskInfo taskInfo;
+  CommandInfo commandInfo;
+  taskInfo.mutable_command()->MergeFrom(commandInfo);
+
+  containerizer.launch(
+      containerId,
+      taskInfo,
+      CREATE_EXECUTOR_INFO("executor", "exit 0"),
+      os::getcwd(),
+      None(),
+      SlaveID(),
+      process::PID<Slave>(),
+      false);
+
+  Future<containerizer::Termination> wait = containerizer.wait(containerId);
+
+  AWAIT_READY(exec);
+
+  containerizer.destroy(containerId);
+
+  // The container should still exit even if fetch didn't complete.
+  AWAIT_READY(wait);
+}