You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/09/22 06:34:12 UTC
[mesos] 08/10: Cleaned up residual containers when removing
resource provider configs.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 6366b5c1e5e60dfda5ca0368d6a22da998f0cfa4
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Sep 20 15:06:51 2018 -0700
Cleaned up residual containers when removing resource provider configs.
When processing `REMOVE_RESOURCE_PROVIDER_CONFIG`, the local resource
provider daemon now performs a best-effort cleanup by killing all
standalone containers prefixed by the 'cid_prefix' of the resource
provider. During the cleanup, no resource provider config with the same
type and name can be added.
Review: https://reviews.apache.org/r/68763
---
src/resource_provider/daemon.cpp | 239 +++++++++++++++++++++++++++++++++++----
1 file changed, 220 insertions(+), 19 deletions(-)
diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp
index f1a941c..2fd82ad 100644
--- a/src/resource_provider/daemon.cpp
+++ b/src/resource_provider/daemon.cpp
@@ -22,6 +22,7 @@
#include <mesos/type_utils.hpp>
+#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
@@ -35,25 +36,31 @@
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
+#include <stout/strings.hpp>
#include <stout/try.hpp>
+#include "common/http.hpp"
#include "common/validation.hpp"
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
#include "resource_provider/local.hpp"
namespace http = process::http;
using std::list;
using std::string;
+using std::vector;
+using process::await;
+using process::defer;
+using process::dispatch;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
using process::ProcessBase;
-
-using process::defer;
-using process::dispatch;
using process::spawn;
using process::terminate;
using process::wait;
@@ -109,11 +116,17 @@ private:
const string path;
ResourceProviderInfo info;
+ Option<string> authToken;
// The `version` is used to check if `provider` holds a resource
// provider instance that is in sync with the current config.
id::UUID version;
Owned<LocalResourceProvider> provider;
+
+ // If set, it means that the resource provider is being removed, and the
+ // future would be completed once the removal is done. Note that this object
+ // will be destructed as soon as the future is ready.
+ Option<Future<Nothing>> removing;
};
Try<Nothing> load(const string& path);
@@ -133,6 +146,10 @@ private:
Future<Option<string>> generateAuthToken(const ResourceProviderInfo& info);
+ Future<Nothing> cleanupContainers(
+ const ResourceProviderInfo& info,
+ const Option<string>& authToken);
+
const http::URL url;
const string workDir;
const Option<string> configDir;
@@ -161,7 +178,13 @@ void LocalResourceProviderDaemonProcess::start(const SlaveID& _slaveId)
slaveId = _slaveId;
foreachkey (const string& type, providers) {
- foreachkey (const string& name, providers[type]) {
+ foreachpair (const string& name,
+ const ProviderData& data,
+ providers[type]) {
+ if (data.removing.isSome()) {
+ continue;
+ }
+
auto error = [=](const string& message) {
LOG(ERROR) << "Failed to launch resource provider with type '" << type
<< "' and name '" << name << "': " << message;
@@ -186,7 +209,15 @@ Future<bool> LocalResourceProviderDaemonProcess::add(
// Return true if the info has been added for idempotency.
if (providers[info.type()].contains(info.name())) {
- return providers[info.type()].at(info.name()).info == info;
+ const ProviderData& data = providers[info.type()].at(info.name());
+
+ if (data.removing.isSome()) {
+ return Failure(
+ "Failed to add resource provider with type '" + info.type() +
+ "' and name '" + info.name() + "' as a removal is still in progress");
+ }
+
+ return data.info == info;
}
// Generate a filename for the config.
@@ -239,6 +270,12 @@ Future<bool> LocalResourceProviderDaemonProcess::update(
ProviderData& data = providers[info.type()].at(info.name());
+ if (data.removing.isSome()) {
+ return Failure(
+ "Failed to update resource provider with type '" + info.type() +
+ "' and name '" + info.name() + "' as a removal is still in progress");
+ }
+
// Return true if the info has been updated for idempotency.
if (data.info == info) {
return true;
@@ -285,19 +322,39 @@ Future<Nothing> LocalResourceProviderDaemonProcess::remove(
return Nothing();
}
- const string path = providers[type].at(name).path;
+ ProviderData& data = providers[type].at(name);
- Try<Nothing> rm = os::rm(path);
- if (rm.isError()) {
- return Failure(
- "Failed to remove config file '" + path + "': " + rm.error());
+ // Return the same future if it is being removed for idempotency.
+ if (data.removing.isSome() && data.removing->isPending()) {
+ return data.removing.get();
}
- // Removing the provider data from `providers` will cause the resource
- // provider to be destructed.
- providers[type].erase(name);
+ // Destruct the resource provider instance to stop its container daemons, then
+ // do a best-effort cleanup of the standalone containers launched by the
+ // removed resource provider.
+ // TODO(chhsiao): This is not ideal since the daemon does not know how to
+ // perform resource-provider-specific cleanups. We should refactor this into a
+ // `LocalResourceProvider::stop` virtual function. However this also means
+ // that we need to ensure that we must have a resource provider instance with
+ // a running actor to invoke `stop`. Given that `launch` is asynchronous, we
+ // need to carefully redesign the state machine, and the semantics of the
+ // `{ADD,UPDATE,REMOVE}_RESOURCE_PROVIDER_CONFIG` API calls might be affected.
+ // In addition, we should consider how to reconcile orphaned containers.
+ data.provider.reset();
+ data.removing = cleanupContainers(data.info, data.authToken)
+ .then(defer(self(), [this, type, name]() -> Future<Nothing> {
+ Try<Nothing> rm = os::rm(providers[type].at(name).path);
+ if (rm.isError()) {
+ return Failure(
+ "Failed to remove config file '" + providers[type].at(name).path +
+ "': " + rm.error());
+ }
- return Nothing();
+ providers[type].erase(name);
+ return Nothing();
+ }));
+
+ return data.removing.get();
}
@@ -420,13 +477,10 @@ Future<Nothing> LocalResourceProviderDaemonProcess::launch(
const string& name)
{
CHECK_SOME(slaveId);
-
- // If the resource provider config is removed, nothing needs to be done.
- if (!providers[type].contains(name)) {
- return Nothing();
- }
+ CHECK(providers[type].contains(name));
ProviderData& data = providers[type].at(name);
+ CHECK(data.removing.isNone());
// Destruct the previous resource provider (which will synchronously
// terminate its actor and driver) if there is one.
@@ -450,6 +504,11 @@ Future<Nothing> LocalResourceProviderDaemonProcess::_launch(
ProviderData& data = providers[type].at(name);
+ // If the resource provider config is being removed, nothing needs to be done.
+ if (data.removing.isSome()) {
+ return Nothing();
+ }
+
// If there is a version mismatch, abort the launch sequence since
// `authToken` might be outdated. The callback updating the version
// should have dispatched another launch sequence.
@@ -466,6 +525,7 @@ Future<Nothing> LocalResourceProviderDaemonProcess::_launch(
"' and name '" + name + "': " + provider.error());
}
+ data.authToken = authToken;
data.provider = std::move(provider.get());
return Nothing();
@@ -501,6 +561,147 @@ Future<Option<string>> LocalResourceProviderDaemonProcess::generateAuthToken(
}
+Future<Nothing> LocalResourceProviderDaemonProcess::cleanupContainers(
+ const ResourceProviderInfo& info,
+ const Option<string>& authToken)
+{
+ const Principal principal = LocalResourceProvider::principal(info);
+ CHECK(principal.claims.contains("cid_prefix"));
+
+ const string& cidPrefix = principal.claims.at("cid_prefix");
+
+ LOG(INFO) << "Cleaning up containers prefixed by '" << cidPrefix << "'";
+
+ // TODO(chhsiao): Consider using a more reliable way to get the v1 endpoint.
+ http::URL agentUrl = url;
+ agentUrl.path = Path(url.path).dirname();
+
+ http::Headers headers{{"Accept", stringify(ContentType::PROTOBUF)}};
+ if (authToken.isSome()) {
+ headers["Authorization"] = "Bearer " + authToken.get();
+ }
+
+ agent::Call call;
+ call.set_type(agent::Call::GET_CONTAINERS);
+ call.mutable_get_containers()->set_show_nested(false);
+ call.mutable_get_containers()->set_show_standalone(true);
+
+ return http::post(
+ agentUrl,
+ headers,
+ serialize(ContentType::PROTOBUF, evolve(call)),
+ stringify(ContentType::PROTOBUF))
+ .then(defer(self(), [=](
+ const http::Response& httpResponse) -> Future<Nothing> {
+ if (httpResponse.status != http::OK().status) {
+ return Failure(
+ "Failed to get containers: Unexpected response '" +
+ httpResponse.status + "' (" + httpResponse.body + ")");
+ }
+
+ Try<v1::agent::Response> v1Response = deserialize<v1::agent::Response>(
+ ContentType::PROTOBUF, httpResponse.body);
+ if (v1Response.isError()) {
+ return Failure("Failed to get containers: " + v1Response.error());
+ }
+
+ vector<Future<Nothing>> futures;
+
+ agent::Response response = devolve(v1Response.get());
+ foreach (const agent::Response::GetContainers::Container& container,
+ response.get_containers().containers()) {
+ const ContainerID& containerId = container.container_id();
+
+ if (!strings::startsWith(containerId.value(), cidPrefix)) {
+ continue;
+ }
+
+ // NOTE: We skip containers that are not actually running by checking
+ // their `executor_pid`s to avoid `ESRCH` errors or killing an arbitrary
+ // process. But we might skip the ones that are being launched as well.
+ if (!container.has_container_status() ||
+ !container.container_status().has_executor_pid()) {
+ LOG(WARNING)
+ << "Skipped killing container '" << containerId
+ << "' because it is not running";
+
+ continue;
+ }
+
+ agent::Call call;
+ call.set_type(agent::Call::KILL_CONTAINER);
+ call.mutable_kill_container()->mutable_container_id()
+ ->CopyFrom(containerId);
+
+ LOG(INFO) << "Killing container '" << containerId << "'";
+
+ futures.push_back(http::post(
+ agentUrl,
+ headers,
+ serialize(ContentType::PROTOBUF, evolve(call)),
+ stringify(ContentType::PROTOBUF))
+ .then(defer(self(), [=](
+ const http::Response& response) -> Future<Nothing> {
+ if (response.status == http::NotFound().status) {
+ LOG(WARNING)
+ << "Skipped waiting for container '" << containerId
+ << "' because it no longer exists";
+
+ return Nothing();
+ }
+
+ if (response.status != http::OK().status) {
+ return Failure(
+ "Failed to kill container '" + stringify(containerId) +
+ "': Unexpected response '" + response.status + "' (" +
+ response.body + ")");
+ }
+
+ LOG(INFO) << "Waiting for container '" << containerId << "'";
+
+ agent::Call call;
+ call.set_type(agent::Call::WAIT_CONTAINER);
+ call.mutable_wait_container()->mutable_container_id()
+ ->CopyFrom(containerId);
+
+ return http::post(
+ agentUrl,
+ headers,
+ serialize(ContentType::PROTOBUF, evolve(call)),
+ stringify(ContentType::PROTOBUF))
+ .then([containerId](
+ const http::Response& response) -> Future<Nothing> {
+ if (response.status != http::OK().status &&
+ response.status != http::NotFound().status) {
+ return Failure(
+ "Failed to wait for container '" +
+ stringify(containerId) + "': Unexpected response '" +
+ response.status + "' (" + response.body + ")");
+ }
+
+ return Nothing();
+ });
+ })));
+ }
+
+ // We use await here to do a best-effort cleanup.
+ return await(futures)
+ .then([cidPrefix](
+ const vector<Future<Nothing>>& futures) -> Future<Nothing> {
+ foreach (const Future<Nothing>& future, futures) {
+ if (!future.isReady()) {
+ return Failure(
+ "Failed to clean up containers prefixed by '" + cidPrefix +
+ "': " + stringify(futures));
+ }
+ }
+
+ return Nothing();
+ });
+ }));
+}
+
+
Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
const http::URL& url,
const slave::Flags& flags,