You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ti...@apache.org on 2014/05/29 03:29:49 UTC

git commit: Fixed orphaned container handling in the ExternalContainerizer recover implementation.

Repository: mesos
Updated Branches:
  refs/heads/master aa27d9344 -> 9370001a4


Fixed orphaned container handling in the ExternalContainerizer recover implementation.

An orphaned container is known to the ECP but not to the EC, thus not
recoverable but pending. This patch enforces a call to destroy for any
orphan that has been identified as such during the recovery phase.

Review: https://reviews.apache.org/r/21424


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9370001a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9370001a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9370001a

Branch: refs/heads/master
Commit: 9370001a4f6f0b1187c2910deb65ac474e289d12
Parents: aa27d93
Author: Till Toenshoff <to...@me.com>
Authored: Thu May 29 03:14:56 2014 +0200
Committer: Till Toenshoff <to...@me.com>
Committed: Thu May 29 03:14:56 2014 +0200

----------------------------------------------------------------------
 .../containerizer/external_containerizer.cpp    | 192 ++++++++++++-------
 .../containerizer/external_containerizer.hpp    |  13 +-
 2 files changed, 128 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9370001a/src/slave/containerizer/external_containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp
index b39c845..4850549 100644
--- a/src/slave/containerizer/external_containerizer.cpp
+++ b/src/slave/containerizer/external_containerizer.cpp
@@ -246,13 +246,6 @@ Future<Nothing> ExternalContainerizerProcess::recover(
 {
   LOG(INFO) << "Recovering containerizer";
 
-  // We need a slave state for recovery as otherwise we will not be
-  // able to reconstruct the sandbox of an active container.
-  if (state.isNone()) {
-    LOG(WARNING) << "No slave state available to recover from";
-    return Nothing();
-  }
-
   // Ask the external containerizer to recover its internal state.
   Try<Subprocess> invoked = invoke("recover");
 
@@ -264,13 +257,13 @@ Future<Nothing> ExternalContainerizerProcess::recover(
     .then(defer(
         PID<ExternalContainerizerProcess>(this),
         &ExternalContainerizerProcess::_recover,
-        state.get(),
+        state,
         lambda::_1));
 }
 
 
 Future<Nothing> ExternalContainerizerProcess::_recover(
-    const state::SlaveState& state,
+    const Option<state::SlaveState>& state,
     const Future<Option<int> >& future)
 {
   VLOG(1) << "Recover validation callback triggered";
@@ -292,84 +285,137 @@ Future<Nothing> ExternalContainerizerProcess::_recover(
 
 
 Future<Nothing> ExternalContainerizerProcess::__recover(
-    const state::SlaveState& state,
+    const Option<state::SlaveState>& state,
     const hashset<ContainerID>& containers)
 {
   VLOG(1) << "Recover continuation triggered";
 
-  foreachvalue (const FrameworkState& framework, state.frameworks) {
-    foreachvalue (const ExecutorState& executor, framework.executors) {
-      if (executor.info.isNone()) {
-        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                     << "' of framework " << framework.id
-                     << " because its info could not be recovered";
-        continue;
-      }
+  // An orphaned container is known to the external containerizer but
+  // not to the slave, thus not recoverable but pending.
+  hashset<ContainerID> orphaned = containers;
+
+  if (state.isSome()) {
+    foreachvalue (const FrameworkState& framework, state.get().frameworks) {
+      foreachvalue (const ExecutorState& executor, framework.executors) {
+        if (executor.info.isNone()) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework " << framework.id
+                       << " because its info could not be recovered";
+          continue;
+        }
 
-      if (executor.latest.isNone()) {
-        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                     << "' of framework " << framework.id
-                     << " because its latest run could not be recovered";
-        continue;
-      }
+        if (executor.latest.isNone()) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework " << framework.id
+                       << " because its latest run could not be recovered";
+          continue;
+        }
 
-      // We are only interested in the latest run of the executor!
-      const ContainerID& containerId = executor.latest.get();
-      Option<RunState> run = executor.runs.get(containerId);
-      CHECK_SOME(run);
-
-      if (run.get().completed) {
-        VLOG(1) << "Skipping recovery of executor '" << executor.id
-                << "' of framework " << framework.id
-                << " because its latest run "
-                << containerId << " is completed";
-        continue;
-      }
+        // We are only interested in the latest run of the executor!
+        const ContainerID& containerId = executor.latest.get();
+        Option<RunState> run = executor.runs.get(containerId);
+        CHECK_SOME(run);
+
+        if (run.get().completed) {
+          VLOG(1) << "Skipping recovery of executor '" << executor.id
+                  << "' of framework " << framework.id
+                  << " because its latest run "
+                  << containerId << " is completed";
+          continue;
+        }
 
-      // Containers the external containerizer does not have
-      // information on, should be skipped as their state is not
-      // recoverable.
-      if (!containers.contains(containerId)) {
-        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                     << "' of framework " << framework.id
-                     << " because the external containerizer has not "
-                     << " identified " << containerId << " as active";
-        continue;
-      }
+        // Containers the external containerizer does not have
+        // information on, should be skipped as their state is not
+        // recoverable.
+        if (!containers.contains(containerId)) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework " << framework.id
+                       << " because the external containerizer has not "
+                       << " identified " << containerId << " as active";
+          continue;
+        }
 
-      // Re-create the sandbox for this container.
-      const string& directory = paths::createExecutorDirectory(
-          flags.work_dir,
-          state.id,
-          framework.id,
-          executor.id,
-          containerId);
-
-      Option<string> user = None();
-      if (flags.switch_user) {
-        // The command (either in form of task or executor command)
-        // can define a specific user to run as. If present, this
-        // precedes the framework user value.
-        if (executor.info.isSome() &&
-            executor.info.get().command().has_user()) {
-          user = executor.info.get().command().user();
-        } else if (framework.info.isSome()) {
-          user = framework.info.get().user();
+        LOG(INFO) << "Recovering container '" << containerId
+                  << "' for executor '" << executor.id
+                  << "' of framework " << framework.id;
+
+        // Re-create the sandbox for this container.
+        const string& directory = paths::createExecutorDirectory(
+            flags.work_dir,
+            state.get().id,
+            framework.id,
+            executor.id,
+            containerId);
+
+        Option<string> user = None();
+        if (flags.switch_user) {
+          // The command (either in form of task or executor command)
+          // can define a specific user to run as. If present, this
+          // precedes the framework user value.
+          if (executor.info.isSome() &&
+              executor.info.get().command().has_user()) {
+            user = executor.info.get().command().user();
+          } else if (framework.info.isSome()) {
+            user = framework.info.get().user();
+          }
         }
-      }
 
-      Sandbox sandbox(directory, user);
+        Sandbox sandbox(directory, user);
+
+        // Collect this container as being active.
+        actives.put(containerId, Owned<Container>(new Container(sandbox)));
 
-      // Collect this container as being active.
-      actives.put(containerId, Owned<Container>(new Container(sandbox)));
+        // Assume that this container had been launched, if this proves
+        // to be wrong, the containerizer::Termination delivered by the
+        // subsequent wait invocation will tell us.
+        actives[containerId]->launched.set(Nothing());
 
-      // Assume that this container had been launched, if this proves
-      // to be wrong, the containerizer::Termination delivered by the
-      // subsequent wait invocation will tell us.
-      actives[containerId]->launched.set(Nothing());
+        // Remove this container from the orphan collection as it is not
+        // orphaned.
+        orphaned.erase(containerId);
+      }
     }
   }
 
+  // Done when we got no orphans to take care of.
+  if (orphaned.empty()) {
+    VLOG(1) << "Recovery done";
+    return Nothing();
+  }
+
+  list<Future<containerizer::Termination> > futures;
+
+  // Enforce a 'destroy' on all orphaned containers.
+  foreach (const ContainerID& containerId, orphaned) {
+    LOG(INFO) << "Destroying container '" << containerId << "' as it "
+              << "is in an orphaned state.";
+    // For being able to wait on an orphan, we need to create an
+    // internal Container state - we just can not have a sandbox for
+    // it.
+    actives.put(containerId, Owned<Container>(new Container(None())));
+    actives[containerId]->launched.set(Nothing());
+
+    // Wrap the orphan destruction by a wait so we know when it is
+    // finally gone.
+    futures.push_back(_wait(containerId));
+
+    destroy(containerId);
+  }
+
+  VLOG(1) << "Awaiting all orphans to get destructed";
+
+  // Orphan destruction needs to complete before we satisfy the
+  // returned future.
+  return collect(futures)
+    .then(defer(
+        PID<ExternalContainerizerProcess>(this),
+        &ExternalContainerizerProcess::___recover));
+}
+
+
+Future<Nothing> ExternalContainerizerProcess::___recover()
+{
+  VLOG(1) << "Recovery done";
   return Nothing();
 }
 
@@ -879,7 +925,7 @@ void ExternalContainerizerProcess::__destroy(
   VLOG(1) << "Destroy callback triggered on container '" << containerId << "'";
 
   if (!actives.contains(containerId)) {
-    LOG(ERROR) << "Container '" << containerId.value() << "' not running";
+    LOG(ERROR) << "Container '" << containerId << "' not running ";
     return;
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9370001a/src/slave/containerizer/external_containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/external_containerizer.hpp b/src/slave/containerizer/external_containerizer.hpp
index 7e5474c..94dffbb 100644
--- a/src/slave/containerizer/external_containerizer.hpp
+++ b/src/slave/containerizer/external_containerizer.hpp
@@ -176,12 +176,12 @@ private:
   // Information describing a running container.
   struct Container
   {
-    Container(const Sandbox& sandbox)
+    Container(const Option<Sandbox>& sandbox)
       : sandbox(sandbox), pid(None()), destroying(false) {}
 
     // Keep sandbox information available for subsequent containerizer
     // invocations.
-    Sandbox sandbox;
+    Option<Sandbox> sandbox;
 
     // External containerizer pid as per wait-invocation.
     // Wait should block on the external containerizer side, hence we
@@ -198,6 +198,9 @@ private:
     // queued until then to reduce complexity within external
     // containerizer program implementations. To achieve that, we
     // simply queue all events onto this promise.
+    // TODO(tillt): Consider adding a timeout when queuing onto this
+    // promise to account for external containerizer launch
+    // invocations that got stuck.
     process::Promise<Nothing> launched;
 
     Resources resources;
@@ -207,13 +210,15 @@ private:
   hashmap<ContainerID, process::Owned<Container> > actives;
 
   process::Future<Nothing> _recover(
-      const state::SlaveState& state,
+      const Option<state::SlaveState>& state,
       const process::Future<Option<int> >& future);
 
   process::Future<Nothing> __recover(
-      const state::SlaveState& state,
+      const Option<state::SlaveState>& state,
       const hashset<ContainerID>& containers);
 
+  process::Future<Nothing> ___recover();
+
   process::Future<Nothing> _launch(
       const ContainerID& containerId,
       const process::Future<Option<int> >& future);