You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/09/03 12:58:40 UTC
[mesos] 03/06: Added methods to remove resource providers from
provider manager.
This is an automated email from the ASF dual-hosted git repository.
bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4b558e24594b43456f35de697ed8484bbb331fe1
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Mon Aug 13 11:10:51 2018 +0200
Added methods to remove resource providers from provider manager.
This patch adds a method to remove a resource provider from the
resource provider manager. The resource provider will be marked as
removed in the manager's registry and disconnected. We also expose a
new `REMOVE` event whenever a resource provider was removed.
This patch does not add integration with e.g., the agent.
Review: https://reviews.apache.org/r/68144/
---
src/resource_provider/manager.cpp | 51 +++++++++++++
src/resource_provider/manager.hpp | 6 ++
src/resource_provider/message.hpp | 22 +++++-
src/slave/slave.cpp | 11 +++
src/tests/resource_provider_manager_tests.cpp | 101 ++++++++++++++++++++++++++
5 files changed, 190 insertions(+), 1 deletion(-)
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 118c86c..4199e5b 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -63,6 +63,7 @@ using mesos::resource_provider::AdmitResourceProvider;
using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
using mesos::resource_provider::Registrar;
+using mesos::resource_provider::RemoveResourceProvider;
using mesos::resource_provider::registry::Registry;
@@ -191,6 +192,9 @@ public:
void reconcileOperations(const ReconcileOperationsMessage& message);
+ Future<Nothing> removeResourceProvider(
+ const ResourceProviderID& resourceProviderId);
+
Future<Nothing> publishResources(const Resources& resources);
Queue<ResourceProviderMessage> messages;
@@ -601,6 +605,44 @@ void ResourceProviderManagerProcess::reconcileOperations(
}
+Future<Nothing> ResourceProviderManagerProcess::removeResourceProvider(
+ const ResourceProviderID& resourceProviderId)
+{
+ LOG(INFO) << "Removing resource provider " << resourceProviderId;
+
+ Future<bool> removeResourceProvider = registrar
+ ->apply(Owned<mesos::resource_provider::Registrar::Operation>(
+ new RemoveResourceProvider(resourceProviderId)));
+
+ removeResourceProvider.onAny(
+ [resourceProviderId](const Future<bool>& removeResourceProvider) {
+ if (!removeResourceProvider.isReady()) {
+ LOG(ERROR) << "Not removing resource provider " << resourceProviderId
+ << " as registry update did not succeed: "
+ << removeResourceProvider;
+ }
+ });
+
+ return removeResourceProvider
+ .then(defer(
+ self(),
+ [this, resourceProviderId](const Future<bool>& removeResourceProvider) {
+ resourceProviders.known.erase(resourceProviderId);
+ resourceProviders.subscribed.erase(resourceProviderId);
+
+ ResourceProviderMessage::Remove remove{resourceProviderId};
+
+ ResourceProviderMessage message;
+ message.type = ResourceProviderMessage::Type::REMOVE;
+ message.remove = std::move(remove);
+
+ messages.put(std::move(message));
+
+ return Nothing();
+ }));
+}
+
+
Future<Nothing> ResourceProviderManagerProcess::publishResources(
const Resources& resources)
{
@@ -988,6 +1030,15 @@ void ResourceProviderManager::reconcileOperations(
}
+Future<Nothing> ResourceProviderManager::removeResourceProvider(
+ const ResourceProviderID& resourceProviderId) const
+{
+ return dispatch(
+ process.get(),
+ &ResourceProviderManagerProcess::removeResourceProvider,
+ resourceProviderId);
+}
+
Future<Nothing> ResourceProviderManager::publishResources(
const Resources& resources)
{
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index 6c57956..33b25e3 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -17,6 +17,8 @@
#ifndef __RESOURCE_PROVIDER_MANAGER_HPP__
#define __RESOURCE_PROVIDER_MANAGER_HPP__
+#include <stout/nothing.hpp>
+
#include <process/authenticator.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
@@ -66,6 +68,10 @@ public:
void reconcileOperations(
const ReconcileOperationsMessage& message) const;
+ // Permanently remove a resource provider.
+ process::Future<Nothing> removeResourceProvider(
+ const ResourceProviderID& resourceProviderId) const;
+
// Ensure that the resources are ready for use.
process::Future<Nothing> publishResources(const Resources& resources);
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 39d9936..19e1be1 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -43,7 +43,8 @@ struct ResourceProviderMessage
SUBSCRIBE,
UPDATE_STATE,
UPDATE_OPERATION_STATUS,
- DISCONNECT
+ DISCONNECT,
+ REMOVE
};
friend std::ostream& operator<<(std::ostream& stream, const Type& type) {
@@ -56,6 +57,8 @@ struct ResourceProviderMessage
return stream << "UPDATE_OPERATION_STATUS";
case Type::DISCONNECT:
return stream << "DISCONNECT";
+ case Type::REMOVE:
+ return stream << "REMOVE";
}
UNREACHABLE();
@@ -84,12 +87,18 @@ struct ResourceProviderMessage
ResourceProviderID resourceProviderId;
};
+ struct Remove
+ {
+ ResourceProviderID resourceProviderId;
+ };
+
Type type;
Option<Subscribe> subscribe;
Option<UpdateState> updateState;
Option<UpdateOperationStatus> updateOperationStatus;
Option<Disconnect> disconnect;
+ Option<Remove> remove;
};
@@ -147,6 +156,17 @@ inline std::ostream& operator<<(
<< "resource provider "
<< disconnect->resourceProviderId;
}
+
+ case ResourceProviderMessage::Type::REMOVE: {
+ const Option<ResourceProviderMessage::Remove>& remove =
+ resourceProviderMessage.remove;
+
+ CHECK_SOME(remove);
+
+ return stream
+ << "resource provider "
+ << remove->resourceProviderId;
+ }
}
UNREACHABLE();
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 735795e..4829f00 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7919,6 +7919,17 @@ void Slave::handleResourceProviderMessage(
}
break;
}
+ case ResourceProviderMessage::Type::REMOVE: {
+ CHECK_SOME(message->remove);
+
+ const ResourceProviderID& resourceProviderId =
+ message->remove->resourceProviderId;
+
+ resourceProviders.erase(resourceProviderId);
+
+ LOG(INFO) << "Removed resource provider '" << resourceProviderId << "'";
+ break;
+ }
}
// Wait for the next message.
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 433992c..33b1055 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -39,6 +39,7 @@
#include <stout/error.hpp>
#include <stout/gtest.hpp>
#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/protobuf.hpp>
#include <stout/recordio.hpp>
@@ -52,6 +53,8 @@
#include "internal/devolve.hpp"
+#include "master/detector/standalone.hpp"
+
#include "resource_provider/manager.hpp"
#include "resource_provider/registrar.hpp"
@@ -65,6 +68,7 @@ namespace http = process::http;
using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
+using mesos::master::detector::StandaloneMasterDetector;
using mesos::state::InMemoryStorage;
using mesos::state::State;
@@ -1478,6 +1482,103 @@ TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
EXPECT_EQ(1, snapshot.values.at("resource_provider_manager/subscribed"));
}
+
+TEST_F(ResourceProviderManagerHttpApiTest, RemoveResourceProvider)
+{
+ const ContentType contentType = ContentType::PROTOBUF;
+
+ ResourceProviderManager manager(
+ Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+
+ Future<ResourceProviderMessage> message = manager.messages().get();
+
+ Option<ResourceProviderID> resourceProviderId;
+
+ // Subscribe a resource provider.
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ mesos::v1::ResourceProviderInfo* info =
+ subscribe->mutable_resource_provider_info();
+
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.body = serialize(contentType, call);
+
+ Future<http::Response> response = manager.api(request, None());
+ AWAIT_READY(response);
+
+ AWAIT_READY(message);
+ ASSERT_EQ(ResourceProviderMessage::Type::SUBSCRIBE, message->type);
+ ASSERT_SOME(message->subscribe);
+ ASSERT_TRUE(message->subscribe->info.has_id());
+ resourceProviderId = message->subscribe->info.id();
+ }
+
+ // Remove the resource provider. We expect to receive a notification.
+ message = manager.messages().get();
+
+ Future<Nothing> removeResourceProvider =
+ manager.removeResourceProvider(resourceProviderId.get());
+
+ AWAIT_READY(removeResourceProvider);
+
+ AWAIT_READY(message);
+ ASSERT_EQ(ResourceProviderMessage::Type::REMOVE, message->type);
+ ASSERT_SOME(message->remove);
+ EXPECT_EQ(resourceProviderId.get(), message->remove->resourceProviderId);
+
+ // Attempting to resubscribe this resource provider fails.
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ mesos::v1::ResourceProviderInfo* info =
+ subscribe->mutable_resource_provider_info();
+
+ info->set_type("org.apache.mesos.rp.test");
+ info->set_name("test");
+ info->mutable_id()->CopyFrom(evolve(resourceProviderId.get()));
+
+ http::Request request;
+ request.method = "POST";
+ request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ request.headers["Accept"] = stringify(contentType);
+ request.headers["Content-Type"] = stringify(contentType);
+ request.body = serialize(contentType, call);
+
+ Future<http::Response> response = manager.api(request, None());
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+
+ Option<http::Pipe::Reader> reader = response->reader;
+ ASSERT_SOME(reader);
+
+ recordio::Reader<Event> responseDecoder(
+ ::recordio::Decoder<Event>(
+ lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+ reader.get());
+
+ // We expect the manager to drop the subscribe call since
+ // the resource provider is not known at this point.
+ Future<Result<Event>> event = responseDecoder.read();
+ AWAIT_READY(event);
+ EXPECT_NONE(event.get());
+ }
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {