You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2016/04/08 00:14:50 UTC

[2/3] mesos git commit: Cleaned up orphaned docker containers owned by previous agent instance.

Cleaned up orphaned docker containers owned by previous agent instance.

This change modifies the docker containerizer to cleanup docker
containers left from another agent instance. The containers can
become orphans due to any of the scenarios mentioned here:
http://bit.ly/1RxCpPl

This change modifies the logic to invoke docker `ps` on all
containers on the agent instead of limiting itself to the
current slaveID. This change also means that running multiple
agent instances on the same host might not work well for docker
containers from now on i.e. another agent instance might
cleanup the docker containers that belong to another instance.
The cgroup isolators/linux launcher for the Mesos containerizer
anyways don't recommend running multiple instances of the agent
on the same host.

In case one still wants to run multiple agent instances on a
test cluster using the docker containerizer, we can use the
`--no-docker_kill_orphans` flag and then kill the docker
containers manually using a script.

Review: https://reviews.apache.org/r/45454/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/54926ad1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/54926ad1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/54926ad1

Branch: refs/heads/master
Commit: 54926ad18d3ef90ad452a8e216ff7b1cd465df0a
Parents: ca74740
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Apr 5 09:35:45 2016 -0700
Committer: Timothy Chen <tn...@gmail.com>
Committed: Thu Apr 7 15:00:09 2016 -0700

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp | 271 ++++++++++++++++----------------
 src/slave/containerizer/docker.hpp |   2 +-
 2 files changed, 135 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/54926ad1/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp
index 9f51e05..5755eff 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -743,162 +743,159 @@ Future<Nothing> DockerContainerizerProcess::recover(
 {
   LOG(INFO) << "Recovering Docker containers";
 
-  if (state.isSome()) {
-    // Get the list of all Docker containers (running and exited) in
-    // order to remove any orphans and reconcile checkpointed executors.
-    // TODO(tnachen): Remove this when we expect users to have already
-    // upgraded to 0.23.
-    return docker->ps(true, DOCKER_NAME_PREFIX + state.get().id.value())
-      .then(defer(self(), &Self::_recover, state.get(), lambda::_1));
-  }
-
-  return Nothing();
+  // Get the list of all Docker containers (running and exited) in
+  // order to remove any orphans and reconcile checkpointed executors.
+  return docker->ps(true, DOCKER_NAME_PREFIX)
+    .then(defer(self(), &Self::_recover, state, lambda::_1));
 }
 
 
 Future<Nothing> DockerContainerizerProcess::_recover(
-    const SlaveState& state,
+    const Option<SlaveState>& state,
     const list<Docker::Container>& _containers)
 {
-  // Although the slave checkpoints executor pids, before 0.23
-  // docker containers without custom executors didn't record the
-  // container type in the executor info, therefore the Docker
-  // containerizer doesn't know if it should recover that container
-  // as it could be launched from another containerizer. The
-  // workaround is to reconcile running Docker containers and see
-  // if we can find an known container that matches the
-  // checkpointed container id.
-  // TODO(tnachen): Remove this explicit reconciliation 0.24.
-  hashset<ContainerID> existingContainers;
-
-  // Tracks all the task containers that launched an executor in
-  // a docker container.
-  hashset<ContainerID> executorContainers;
-
-  foreach (const Docker::Container& container, _containers) {
-    Option<ContainerID> id = parse(container);
-    if (id.isSome()) {
-      existingContainers.insert(id.get());
-      if (strings::contains(container.name, ".executor")) {
-        executorContainers.insert(id.get());
+  if (state.isSome()) {
+    // Although the slave checkpoints executor pids, before 0.23
+    // docker containers without custom executors didn't record the
+    // container type in the executor info, therefore the Docker
+    // containerizer doesn't know if it should recover that container
+    // as it could be launched from another containerizer. The
+    // workaround is to reconcile running Docker containers and see
+    // if we can find an known container that matches the
+    // checkpointed container id.
+    // TODO(tnachen): Remove this explicit reconciliation 0.24.
+    hashset<ContainerID> existingContainers;
+
+    // Tracks all the task containers that launched an executor in
+    // a docker container.
+    hashset<ContainerID> executorContainers;
+
+    foreach (const Docker::Container& container, _containers) {
+      Option<ContainerID> id = parse(container);
+      if (id.isSome()) {
+        existingContainers.insert(id.get());
+        if (strings::contains(container.name, ".executor")) {
+          executorContainers.insert(id.get());
+        }
       }
     }
-  }
 
-  // Collection of pids that we've started reaping in order to
-  // detect very unlikely duplicate scenario (see below).
-  hashmap<ContainerID, pid_t> pids;
+    // Collection of pids that we've started reaping in order to
+    // detect very unlikely duplicate scenario (see below).
+    hashmap<ContainerID, pid_t> pids;
+
+    foreachvalue (const FrameworkState& framework, state->frameworks) {
+      foreachvalue (const ExecutorState& executor, framework.executors) {
+        if (executor.info.isNone()) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework '" << framework.id
+                       << "' because its info could not be recovered";
+          continue;
+        }
 
-  foreachvalue (const FrameworkState& framework, state.frameworks) {
-    foreachvalue (const ExecutorState& executor, framework.executors) {
-      if (executor.info.isNone()) {
-        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                     << "' of framework '" << framework.id
-                     << "' because its info could not be recovered";
-        continue;
-      }
+        if (executor.latest.isNone()) {
+          LOG(WARNING) << "Skipping recovery of executor '" << executor.id
+                       << "' of framework '" << framework.id
+                       << "' because its latest run could not be recovered";
+          continue;
+        }
 
-      if (executor.latest.isNone()) {
-        LOG(WARNING) << "Skipping recovery of executor '" << executor.id
-                     << "' of framework '" << framework.id
-                     << "' because its latest run could not be recovered";
-        continue;
-      }
+        // We are only interested in the latest run of the executor!
+        const ContainerID& containerId = executor.latest.get();
+        Option<RunState> run = executor.runs.get(containerId);
+        CHECK_SOME(run);
+        CHECK_SOME(run.get().id);
+        CHECK_EQ(containerId, run.get().id.get());
+
+        // We need the pid so the reaper can monitor the executor so
+        // skip this executor if it's not present. This is not an
+        // error because the slave will try to wait on the container
+        // which will return a failed Termination and everything will
+        // get cleaned up.
+        if (!run.get().forkedPid.isSome()) {
+          continue;
+        }
 
-      // We are only interested in the latest run of the executor!
-      const ContainerID& containerId = executor.latest.get();
-      Option<RunState> run = executor.runs.get(containerId);
-      CHECK_SOME(run);
-      CHECK_SOME(run.get().id);
-      CHECK_EQ(containerId, run.get().id.get());
-
-      // We need the pid so the reaper can monitor the executor so
-      // skip this executor if it's not present. This is not an
-      // error because the slave will try to wait on the container
-      // which will return a failed Termination and everything will
-      // get cleaned up.
-      if (!run.get().forkedPid.isSome()) {
-        continue;
-      }
+        if (run.get().completed) {
+          VLOG(1) << "Skipping recovery of executor '" << executor.id
+                  << "' of framework '" << framework.id
+                  << "' because its latest run "
+                  << containerId << " is completed";
+          continue;
+        }
 
-      if (run.get().completed) {
-        VLOG(1) << "Skipping recovery of executor '" << executor.id
-                << "' of framework '" << framework.id
-                << "' because its latest run "
-                << containerId << " is completed";
-        continue;
-      }
+        const ExecutorInfo executorInfo = executor.info.get();
+        if (executorInfo.has_container() &&
+            executorInfo.container().type() != ContainerInfo::DOCKER) {
+          LOG(INFO) << "Skipping recovery of executor '" << executor.id
+                    << "' of framework '" << framework.id
+                    << "' because it was not launched from docker "
+                    << "containerizer";
+          continue;
+        }
 
-      const ExecutorInfo executorInfo = executor.info.get();
-      if (executorInfo.has_container() &&
-          executorInfo.container().type() != ContainerInfo::DOCKER) {
-        LOG(INFO) << "Skipping recovery of executor '" << executor.id
-                  << "' of framework '" << framework.id
-                  << "' because it was not launched from docker containerizer";
-        continue;
-      }
+        if (!executorInfo.has_container() &&
+            !existingContainers.contains(containerId)) {
+          LOG(INFO) << "Skipping recovery of executor '" << executor.id
+                    << "' of framework '" << framework.id
+                    << "' because its executor is not marked as docker "
+                    << "and the docker container doesn't exist";
+          continue;
+        }
 
-      if (!executorInfo.has_container() &&
-          !existingContainers.contains(containerId)) {
-        LOG(INFO) << "Skipping recovery of executor '" << executor.id
-                  << "' of framework '" << framework.id
-                  << "' because its executor is not marked as docker "
-                  << "and the docker container doesn't exist";
-        continue;
-      }
+        LOG(INFO) << "Recovering container '" << containerId
+                  << "' for executor '" << executor.id
+                  << "' of framework '" << framework.id << "'";
 
-      LOG(INFO) << "Recovering container '" << containerId
-                << "' for executor '" << executor.id
-                << "' of framework '" << framework.id << "'";
-
-      // Create and store a container.
-      Container* container = new Container(containerId);
-      containers_[containerId] = container;
-      container->slaveId = state.id;
-      container->state = Container::RUNNING;
-      container->launchesExecutorContainer =
-        executorContainers.contains(containerId);
-
-      pid_t pid = run.get().forkedPid.get();
-
-      container->status.set(process::reap(pid));
-
-      container->status.future().get()
-        .onAny(defer(self(), &Self::reaped, containerId));
-
-      if (pids.containsValue(pid)) {
-        // This should (almost) never occur. There is the
-        // possibility that a new executor is launched with the same
-        // pid as one that just exited (highly unlikely) and the
-        // slave dies after the new executor is launched but before
-        // it hears about the termination of the earlier executor
-        // (also unlikely).
-        return Failure(
-            "Detected duplicate pid " + stringify(pid) +
-            " for container " + stringify(containerId));
-      }
+        // Create and store a container.
+        Container* container = new Container(containerId);
+        containers_[containerId] = container;
+        container->slaveId = state->id;
+        container->state = Container::RUNNING;
+        container->launchesExecutorContainer =
+          executorContainers.contains(containerId);
 
-      pids.put(containerId, pid);
+        pid_t pid = run.get().forkedPid.get();
 
-      const string sandboxDirectory = paths::getExecutorRunPath(
-          flags.work_dir,
-          state.id,
-          framework.id,
-          executor.id,
-          containerId);
+        container->status.set(process::reap(pid));
+
+        container->status.future().get()
+          .onAny(defer(self(), &Self::reaped, containerId));
 
-      container->directory = sandboxDirectory;
-
-      // Pass recovered containers to the container logger.
-      // NOTE: The current implementation of the container logger only
-      // outputs a warning and does not have any other consequences.
-      // See `ContainerLogger::recover` for more information.
-      logger->recover(executorInfo, sandboxDirectory)
-        .onFailed(defer(self(), [executorInfo](const string& message) {
-          LOG(WARNING) << "Container logger failed to recover executor '"
-                       << executorInfo.executor_id() << "': "
-                       << message;
-        }));
+        if (pids.containsValue(pid)) {
+          // This should (almost) never occur. There is the
+          // possibility that a new executor is launched with the same
+          // pid as one that just exited (highly unlikely) and the
+          // slave dies after the new executor is launched but before
+          // it hears about the termination of the earlier executor
+          // (also unlikely).
+          return Failure(
+              "Detected duplicate pid " + stringify(pid) +
+              " for container " + stringify(containerId));
+        }
+
+        pids.put(containerId, pid);
+
+        const string sandboxDirectory = paths::getExecutorRunPath(
+            flags.work_dir,
+            state->id,
+            framework.id,
+            executor.id,
+            containerId);
+
+        container->directory = sandboxDirectory;
+
+        // Pass recovered containers to the container logger.
+        // NOTE: The current implementation of the container logger only
+        // outputs a warning and does not have any other consequences.
+        // See `ContainerLogger::recover` for more information.
+        logger->recover(executorInfo, sandboxDirectory)
+          .onFailed(defer(self(), [executorInfo](const string& message) {
+            LOG(WARNING) << "Container logger failed to recover executor '"
+                         << executorInfo.executor_id() << "': "
+                         << message;
+          }));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/54926ad1/src/slave/containerizer/docker.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp
index 89d450e..3567321 100644
--- a/src/slave/containerizer/docker.hpp
+++ b/src/slave/containerizer/docker.hpp
@@ -178,7 +178,7 @@ private:
       pid_t pid);
 
   process::Future<Nothing> _recover(
-      const state::SlaveState& state,
+      const Option<state::SlaveState>& state,
       const std::list<Docker::Container>& containers);
 
   process::Future<Nothing> __recover(