You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/09/03 12:58:40 UTC

[mesos] 03/06: Added methods to remove resource providers from provider manager.

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4b558e24594b43456f35de697ed8484bbb331fe1
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Mon Aug 13 11:10:51 2018 +0200

    Added methods to remove resource providers from provider manager.
    
    This patch adds a method to remove a resource provider from the
    resource provider manager. The resource provider will be marked as
    removed in the manager's registry and disconnected. We also expose a
    new `REMOVE` event whenever a resource provider was removed.
    
    This patch does not add integration with e.g., the agent.
    
    Review: https://reviews.apache.org/r/68144/
---
 src/resource_provider/manager.cpp             |  51 +++++++++++++
 src/resource_provider/manager.hpp             |   6 ++
 src/resource_provider/message.hpp             |  22 +++++-
 src/slave/slave.cpp                           |  11 +++
 src/tests/resource_provider_manager_tests.cpp | 101 ++++++++++++++++++++++++++
 5 files changed, 190 insertions(+), 1 deletion(-)

diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 118c86c..4199e5b 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -63,6 +63,7 @@ using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
+using mesos::resource_provider::RemoveResourceProvider;
 
 using mesos::resource_provider::registry::Registry;
 
@@ -191,6 +192,9 @@ public:
 
   void reconcileOperations(const ReconcileOperationsMessage& message);
 
+  Future<Nothing> removeResourceProvider(
+      const ResourceProviderID& resourceProviderId);
+
   Future<Nothing> publishResources(const Resources& resources);
 
   Queue<ResourceProviderMessage> messages;
@@ -601,6 +605,44 @@ void ResourceProviderManagerProcess::reconcileOperations(
 }
 
 
+Future<Nothing> ResourceProviderManagerProcess::removeResourceProvider(
+    const ResourceProviderID& resourceProviderId)
+{
+  LOG(INFO) << "Removing resource provider " << resourceProviderId;
+
+  Future<bool> removeResourceProvider = registrar
+    ->apply(Owned<mesos::resource_provider::Registrar::Operation>(
+        new RemoveResourceProvider(resourceProviderId)));
+
+  removeResourceProvider.onAny(
+      [resourceProviderId](const Future<bool>& removeResourceProvider) {
+        if (!removeResourceProvider.isReady()) {
+          LOG(ERROR) << "Not removing resource provider " << resourceProviderId
+                     << " as registry update did not succeed: "
+                     << removeResourceProvider;
+        }
+      });
+
+  return removeResourceProvider
+    .then(defer(
+        self(),
+        [this, resourceProviderId](const Future<bool>& removeResourceProvider) {
+          resourceProviders.known.erase(resourceProviderId);
+          resourceProviders.subscribed.erase(resourceProviderId);
+
+          ResourceProviderMessage::Remove remove{resourceProviderId};
+
+          ResourceProviderMessage message;
+          message.type = ResourceProviderMessage::Type::REMOVE;
+          message.remove = std::move(remove);
+
+          messages.put(std::move(message));
+
+          return Nothing();
+        }));
+}
+
+
 Future<Nothing> ResourceProviderManagerProcess::publishResources(
     const Resources& resources)
 {
@@ -988,6 +1030,15 @@ void ResourceProviderManager::reconcileOperations(
 }
 
 
+Future<Nothing> ResourceProviderManager::removeResourceProvider(
+    const ResourceProviderID& resourceProviderId) const
+{
+  return dispatch(
+      process.get(),
+      &ResourceProviderManagerProcess::removeResourceProvider,
+      resourceProviderId);
+}
+
 Future<Nothing> ResourceProviderManager::publishResources(
     const Resources& resources)
 {
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index 6c57956..33b25e3 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -17,6 +17,8 @@
 #ifndef __RESOURCE_PROVIDER_MANAGER_HPP__
 #define __RESOURCE_PROVIDER_MANAGER_HPP__
 
+#include <stout/nothing.hpp>
+
 #include <process/authenticator.hpp>
 #include <process/future.hpp>
 #include <process/http.hpp>
@@ -66,6 +68,10 @@ public:
   void reconcileOperations(
       const ReconcileOperationsMessage& message) const;
 
+  // Permanently remove a resource provider.
+  process::Future<Nothing> removeResourceProvider(
+      const ResourceProviderID& resourceProviderId) const;
+
   // Ensure that the resources are ready for use.
   process::Future<Nothing> publishResources(const Resources& resources);
 
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 39d9936..19e1be1 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -43,7 +43,8 @@ struct ResourceProviderMessage
     SUBSCRIBE,
     UPDATE_STATE,
     UPDATE_OPERATION_STATUS,
-    DISCONNECT
+    DISCONNECT,
+    REMOVE
   };
 
   friend std::ostream& operator<<(std::ostream& stream, const Type& type) {
@@ -56,6 +57,8 @@ struct ResourceProviderMessage
         return stream << "UPDATE_OPERATION_STATUS";
       case Type::DISCONNECT:
         return stream << "DISCONNECT";
+      case Type::REMOVE:
+        return stream << "REMOVE";
     }
 
     UNREACHABLE();
@@ -84,12 +87,18 @@ struct ResourceProviderMessage
     ResourceProviderID resourceProviderId;
   };
 
+  struct Remove
+  {
+    ResourceProviderID resourceProviderId;
+  };
+
   Type type;
 
   Option<Subscribe> subscribe;
   Option<UpdateState> updateState;
   Option<UpdateOperationStatus> updateOperationStatus;
   Option<Disconnect> disconnect;
+  Option<Remove> remove;
 };
 
 
@@ -147,6 +156,17 @@ inline std::ostream& operator<<(
           << "resource provider "
           << disconnect->resourceProviderId;
     }
+
+    case ResourceProviderMessage::Type::REMOVE: {
+      const Option<ResourceProviderMessage::Remove>& remove =
+        resourceProviderMessage.remove;
+
+      CHECK_SOME(remove);
+
+      return stream
+          << "resource provider "
+          << remove->resourceProviderId;
+    }
   }
 
   UNREACHABLE();
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 735795e..4829f00 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7919,6 +7919,17 @@ void Slave::handleResourceProviderMessage(
       }
       break;
     }
+    case ResourceProviderMessage::Type::REMOVE: {
+      CHECK_SOME(message->remove);
+
+      const ResourceProviderID& resourceProviderId =
+        message->remove->resourceProviderId;
+
+      resourceProviders.erase(resourceProviderId);
+
+      LOG(INFO) << "Removed resource provider '" << resourceProviderId << "'";
+      break;
+    }
   }
 
   // Wait for the next message.
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 433992c..33b1055 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -39,6 +39,7 @@
 #include <stout/error.hpp>
 #include <stout/gtest.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/recordio.hpp>
@@ -52,6 +53,8 @@
 
 #include "internal/devolve.hpp"
 
+#include "master/detector/standalone.hpp"
+
 #include "resource_provider/manager.hpp"
 #include "resource_provider/registrar.hpp"
 
@@ -65,6 +68,7 @@ namespace http = process::http;
 using mesos::internal::slave::Slave;
 
 using mesos::master::detector::MasterDetector;
+using mesos::master::detector::StandaloneMasterDetector;
 
 using mesos::state::InMemoryStorage;
 using mesos::state::State;
@@ -1478,6 +1482,103 @@ TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
   EXPECT_EQ(1, snapshot.values.at("resource_provider_manager/subscribed"));
 }
 
+
+TEST_F(ResourceProviderManagerHttpApiTest, RemoveResourceProvider)
+{
+  const ContentType contentType = ContentType::PROTOBUF;
+
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+
+  Future<ResourceProviderMessage> message = manager.messages().get();
+
+  Option<ResourceProviderID> resourceProviderId;
+
+  // Subscribe a resource provider.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+    AWAIT_READY(response);
+
+    AWAIT_READY(message);
+    ASSERT_EQ(ResourceProviderMessage::Type::SUBSCRIBE, message->type);
+    ASSERT_SOME(message->subscribe);
+    ASSERT_TRUE(message->subscribe->info.has_id());
+    resourceProviderId = message->subscribe->info.id();
+  }
+
+  // Remove the resource provider. We expect to receive a notification.
+  message = manager.messages().get();
+
+  Future<Nothing> removeResourceProvider =
+    manager.removeResourceProvider(resourceProviderId.get());
+
+  AWAIT_READY(removeResourceProvider);
+
+  AWAIT_READY(message);
+  ASSERT_EQ(ResourceProviderMessage::Type::REMOVE, message->type);
+  ASSERT_SOME(message->remove);
+  EXPECT_EQ(resourceProviderId.get(), message->remove->resourceProviderId);
+
+  // Attempting to resubscribe this resource provider fails.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
+    info->mutable_id()->CopyFrom(evolve(resourceProviderId.get()));
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    ASSERT_EQ(http::Response::PIPE, response->type);
+
+    Option<http::Pipe::Reader> reader = response->reader;
+    ASSERT_SOME(reader);
+
+    recordio::Reader<Event> responseDecoder(
+        ::recordio::Decoder<Event>(
+            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        reader.get());
+
+    // We expect the manager to drop the subscribe call since
+    // the resource provider is not known at this point.
+    Future<Result<Event>> event = responseDecoder.read();
+    AWAIT_READY(event);
+    EXPECT_NONE(event.get());
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {