You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/09/22 06:34:12 UTC

[mesos] 08/10: Cleaned up residual containers when removing resource provider configs.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 6366b5c1e5e60dfda5ca0368d6a22da998f0cfa4
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Sep 20 15:06:51 2018 -0700

    Cleaned up residual containers when removing resource provider configs.
    
    When processing `REMOVE_RESOURCE_PROVIDER_CONFIG`, the local resource
    provider daemon now performs a best-effort cleanup by killing all
    standalone containers prefixed by the 'cid_prefix' of the resource
    provider. During the cleanup, no resource provider config with the same
    type and name can be added.
    
    Review: https://reviews.apache.org/r/68763
---
 src/resource_provider/daemon.cpp | 239 +++++++++++++++++++++++++++++++++++----
 1 file changed, 220 insertions(+), 19 deletions(-)

diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp
index f1a941c..2fd82ad 100644
--- a/src/resource_provider/daemon.cpp
+++ b/src/resource_provider/daemon.cpp
@@ -22,6 +22,7 @@
 
 #include <mesos/type_utils.hpp>
 
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
@@ -35,25 +36,31 @@
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/protobuf.hpp>
+#include <stout/strings.hpp>
 #include <stout/try.hpp>
 
+#include "common/http.hpp"
 #include "common/validation.hpp"
 
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
 #include "resource_provider/local.hpp"
 
 namespace http = process::http;
 
 using std::list;
 using std::string;
+using std::vector;
 
+using process::await;
+using process::defer;
+using process::dispatch;
 using process::Failure;
 using process::Future;
 using process::Owned;
 using process::Process;
 using process::ProcessBase;
-
-using process::defer;
-using process::dispatch;
 using process::spawn;
 using process::terminate;
 using process::wait;
@@ -109,11 +116,17 @@ private:
 
     const string path;
     ResourceProviderInfo info;
+    Option<string> authToken;
 
     // The `version` is used to check if `provider` holds a resource
     // provider instance that is in sync with the current config.
     id::UUID version;
     Owned<LocalResourceProvider> provider;
+
+    // If set, it means that the resource provider is being removed, and the
+    // future would be completed once the removal is done. Note that this object
+    // will be destructed as soon as the future is ready.
+    Option<Future<Nothing>> removing;
   };
 
   Try<Nothing> load(const string& path);
@@ -133,6 +146,10 @@ private:
 
   Future<Option<string>> generateAuthToken(const ResourceProviderInfo& info);
 
+  Future<Nothing> cleanupContainers(
+      const ResourceProviderInfo& info,
+      const Option<string>& authToken);
+
   const http::URL url;
   const string workDir;
   const Option<string> configDir;
@@ -161,7 +178,13 @@ void LocalResourceProviderDaemonProcess::start(const SlaveID& _slaveId)
   slaveId = _slaveId;
 
   foreachkey (const string& type, providers) {
-    foreachkey (const string& name, providers[type]) {
+    foreachpair (const string& name,
+                 const ProviderData& data,
+                 providers[type]) {
+      if (data.removing.isSome()) {
+        continue;
+      }
+
       auto error = [=](const string& message) {
         LOG(ERROR) << "Failed to launch resource provider with type '" << type
                    << "' and name '" << name << "': " << message;
@@ -186,7 +209,15 @@ Future<bool> LocalResourceProviderDaemonProcess::add(
 
   // Return true if the info has been added for idempotency.
   if (providers[info.type()].contains(info.name())) {
-    return providers[info.type()].at(info.name()).info == info;
+    const ProviderData& data = providers[info.type()].at(info.name());
+
+    if (data.removing.isSome()) {
+      return Failure(
+          "Failed to add resource provider with type '" + info.type() +
+          "' and name '" + info.name() + "' as a removal is still in progress");
+    }
+
+    return data.info == info;
   }
 
   // Generate a filename for the config.
@@ -239,6 +270,12 @@ Future<bool> LocalResourceProviderDaemonProcess::update(
 
   ProviderData& data = providers[info.type()].at(info.name());
 
+  if (data.removing.isSome()) {
+    return Failure(
+        "Failed to update resource provider with type '" + info.type() +
+        "' and name '" + info.name() + "' as a removal is still in progress");
+  }
+
   // Return true if the info has been updated for idempotency.
   if (data.info == info) {
     return true;
@@ -285,19 +322,39 @@ Future<Nothing> LocalResourceProviderDaemonProcess::remove(
     return Nothing();
   }
 
-  const string path = providers[type].at(name).path;
+  ProviderData& data = providers[type].at(name);
 
-  Try<Nothing> rm = os::rm(path);
-  if (rm.isError()) {
-    return Failure(
-        "Failed to remove config file '" + path + "': " + rm.error());
+  // Return the same future if it is being removed for idempotency.
+  if (data.removing.isSome() && data.removing->isPending()) {
+    return data.removing.get();
   }
 
-  // Removing the provider data from `providers` will cause the resource
-  // provider to be destructed.
-  providers[type].erase(name);
+  // Destruct the resource provider instance to stop its container daemons, then
+  // do a best-effort cleanup of the standalone containers launched by the
+  // removed resource provider.
+  // TODO(chhsiao): This is not ideal since the daemon does not know how to
+  // perform resource-provider-specific cleanups. We should refactor this into a
+  // `LocalResourceProvider::stop` virtual function. However this also means
+  // that we need to ensure that we must have a resource provider instance with
+  // a running actor to invoke `stop`. Given that `launch` is asynchronous, we
+  // need to carefully redesign the state machine, and the semantics of the
+  // `{ADD,UPDATE,REMOVE}_RESOURCE_PROVIDER_CONFIG` API calls might be affected.
+  // In addition, we should consider how to reconcile orphaned containers.
+  data.provider.reset();
+  data.removing = cleanupContainers(data.info, data.authToken)
+    .then(defer(self(), [this, type, name]() -> Future<Nothing> {
+      Try<Nothing> rm = os::rm(providers[type].at(name).path);
+      if (rm.isError()) {
+        return Failure(
+            "Failed to remove config file '" + providers[type].at(name).path +
+            "': " + rm.error());
+      }
 
-  return Nothing();
+      providers[type].erase(name);
+      return Nothing();
+    }));
+
+  return data.removing.get();
 }
 
 
@@ -420,13 +477,10 @@ Future<Nothing> LocalResourceProviderDaemonProcess::launch(
     const string& name)
 {
   CHECK_SOME(slaveId);
-
-  // If the resource provider config is removed, nothing needs to be done.
-  if (!providers[type].contains(name)) {
-    return Nothing();
-  }
+  CHECK(providers[type].contains(name));
 
   ProviderData& data = providers[type].at(name);
+  CHECK(data.removing.isNone());
 
   // Destruct the previous resource provider (which will synchronously
   // terminate its actor and driver) if there is one.
@@ -450,6 +504,11 @@ Future<Nothing> LocalResourceProviderDaemonProcess::_launch(
 
   ProviderData& data = providers[type].at(name);
 
+  // If the resource provider config is being removed, nothing needs to be done.
+  if (data.removing.isSome()) {
+    return Nothing();
+  }
+
   // If there is a version mismatch, abort the launch sequence since
   // `authToken` might be outdated. The callback updating the version
   // should have dispatched another launch sequence.
@@ -466,6 +525,7 @@ Future<Nothing> LocalResourceProviderDaemonProcess::_launch(
         "' and name '" + name + "': " + provider.error());
   }
 
+  data.authToken = authToken;
   data.provider = std::move(provider.get());
 
   return Nothing();
@@ -501,6 +561,147 @@ Future<Option<string>> LocalResourceProviderDaemonProcess::generateAuthToken(
 }
 
 
+Future<Nothing> LocalResourceProviderDaemonProcess::cleanupContainers(
+    const ResourceProviderInfo& info,
+    const Option<string>& authToken)
+{
+  const Principal principal = LocalResourceProvider::principal(info);
+  CHECK(principal.claims.contains("cid_prefix"));
+
+  const string& cidPrefix = principal.claims.at("cid_prefix");
+
+  LOG(INFO) << "Cleaning up containers prefixed by '" << cidPrefix << "'";
+
+  // TODO(chhsiao): Consider using a more reliable way to get the v1 endpoint.
+  http::URL agentUrl = url;
+  agentUrl.path = Path(url.path).dirname();
+
+  http::Headers headers{{"Accept", stringify(ContentType::PROTOBUF)}};
+  if (authToken.isSome()) {
+    headers["Authorization"] = "Bearer " + authToken.get();
+  }
+
+  agent::Call call;
+  call.set_type(agent::Call::GET_CONTAINERS);
+  call.mutable_get_containers()->set_show_nested(false);
+  call.mutable_get_containers()->set_show_standalone(true);
+
+  return http::post(
+      agentUrl,
+      headers,
+      serialize(ContentType::PROTOBUF, evolve(call)),
+      stringify(ContentType::PROTOBUF))
+    .then(defer(self(), [=](
+        const http::Response& httpResponse) -> Future<Nothing> {
+      if (httpResponse.status != http::OK().status) {
+        return Failure(
+            "Failed to get containers: Unexpected response '" +
+            httpResponse.status + "' (" + httpResponse.body + ")");
+      }
+
+      Try<v1::agent::Response> v1Response = deserialize<v1::agent::Response>(
+          ContentType::PROTOBUF, httpResponse.body);
+      if (v1Response.isError()) {
+        return Failure("Failed to get containers: " + v1Response.error());
+      }
+
+      vector<Future<Nothing>> futures;
+
+      agent::Response response = devolve(v1Response.get());
+      foreach (const agent::Response::GetContainers::Container& container,
+               response.get_containers().containers()) {
+        const ContainerID& containerId = container.container_id();
+
+        if (!strings::startsWith(containerId.value(), cidPrefix)) {
+          continue;
+        }
+
+        // NOTE: We skip containers that are not actually running by checking
+        // their `executor_pid`s to avoid `ESRCH` errors or killing an arbitrary
+        // process. But we might skip the ones that are being launched as well.
+        if (!container.has_container_status() ||
+            !container.container_status().has_executor_pid()) {
+          LOG(WARNING)
+            << "Skipped killing container '" << containerId
+            << "' because it is not running";
+
+          continue;
+        }
+
+        agent::Call call;
+        call.set_type(agent::Call::KILL_CONTAINER);
+        call.mutable_kill_container()->mutable_container_id()
+          ->CopyFrom(containerId);
+
+        LOG(INFO) << "Killing container '" << containerId << "'";
+
+        futures.push_back(http::post(
+            agentUrl,
+            headers,
+            serialize(ContentType::PROTOBUF, evolve(call)),
+            stringify(ContentType::PROTOBUF))
+          .then(defer(self(), [=](
+              const http::Response& response) -> Future<Nothing> {
+            if (response.status == http::NotFound().status) {
+              LOG(WARNING)
+                << "Skipped waiting for container '" << containerId
+                << "' because it no longer exists";
+
+              return Nothing();
+            }
+
+            if (response.status != http::OK().status) {
+              return Failure(
+                  "Failed to kill container '" + stringify(containerId) +
+                  "': Unexpected response '" + response.status + "' (" +
+                  response.body + ")");
+            }
+
+            LOG(INFO) << "Waiting for container '" << containerId << "'";
+
+            agent::Call call;
+            call.set_type(agent::Call::WAIT_CONTAINER);
+            call.mutable_wait_container()->mutable_container_id()
+              ->CopyFrom(containerId);
+
+            return http::post(
+                agentUrl,
+                headers,
+                serialize(ContentType::PROTOBUF, evolve(call)),
+                stringify(ContentType::PROTOBUF))
+              .then([containerId](
+                  const http::Response& response) -> Future<Nothing> {
+                if (response.status != http::OK().status &&
+                    response.status != http::NotFound().status) {
+                  return Failure(
+                      "Failed to wait for container '" +
+                      stringify(containerId) + "': Unexpected response '" +
+                      response.status + "' (" + response.body + ")");
+                }
+
+                return Nothing();
+              });
+          })));
+      }
+
+      // We use await here to do a best-effort cleanup.
+      return await(futures)
+        .then([cidPrefix](
+            const vector<Future<Nothing>>& futures) -> Future<Nothing> {
+          foreach (const Future<Nothing>& future, futures) {
+            if (!future.isReady()) {
+              return Failure(
+                  "Failed to clean up containers prefixed by '" + cidPrefix +
+                  "': " + stringify(futures));
+            }
+          }
+
+          return Nothing();
+        });
+    }));
+}
+
+
 Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
     const http::URL& url,
     const slave::Flags& flags,