You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/09/03 12:58:37 UTC

[mesos] branch master updated (a8432c9 -> 6a98857)

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from a8432c9  Fixed argument parsing in python 3 support script mesos-gtest-runner.py.
     new 701e014  Allowed tracking of resource providers w/o resource version in agent.
     new 700a623  Made resource provider manager emit an event when provider subscribed.
     new 4b558e2  Added methods to remove resource providers from provider manager.
     new f83b318  Added actions and ACLs to authorize removal of resource providers.
     new 75bc091  Made RP manager only send resource provider ID on state updates.
     new 6a98857  Sent an event to resource providers when they are removed.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/mesos/authorizer/acls.proto                |  13 +-
 include/mesos/authorizer/authorizer.proto          |  12 +-
 .../resource_provider/resource_provider.proto      |   1 +
 .../v1/resource_provider/resource_provider.proto   |   1 +
 src/authorizer/local/authorizer.cpp                |  45 +++--
 src/resource_provider/manager.cpp                  |  83 ++++++++-
 src/resource_provider/manager.hpp                  |   6 +
 src/resource_provider/message.hpp                  |  43 ++++-
 src/resource_provider/storage/provider.cpp         |   4 +
 src/slave/slave.cpp                                | 192 +++++++++++----------
 src/slave/slave.hpp                                |   4 +-
 src/tests/authorization_tests.cpp                  |  60 +++++++
 src/tests/mesos.hpp                                |  22 +++
 src/tests/resource_provider_manager_tests.cpp      | 150 ++++++++++++----
 14 files changed, 489 insertions(+), 147 deletions(-)


[mesos] 05/06: Made RP manager only send resource provider ID on state updates.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 75bc091e123744003f2ef956be54ad7f562c4815
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 15 09:37:12 2018 +0200

    Made RP manager only send resource provider ID on state updates.
    
    With the introduction of the resource provider `SUBSCRIBE` event which
    contains the full `ResourceProviderInfo` of a subscribed resource
    provider, we can avoid sending the `ResourceProviderInfo` in
    `UPDATE_STATE` events and instead only send a `ResourceProviderID`
    like in most other messages.
    
    Review: https://reviews.apache.org/r/68362
---
 src/resource_provider/manager.cpp             |   2 +-
 src/resource_provider/message.hpp             |   4 +-
 src/slave/slave.cpp                           | 153 +++++++++++---------------
 src/tests/resource_provider_manager_tests.cpp |   3 +-
 4 files changed, 70 insertions(+), 92 deletions(-)

diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 4199e5b..f4a5090 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -898,7 +898,7 @@ void ResourceProviderManagerProcess::updateState(
     << resourceProvider->info.id();
 
   ResourceProviderMessage::UpdateState updateState{
-      resourceProvider->info,
+      resourceProvider->info.id(),
       update.resource_version_uuid(),
       update.resources(),
       std::move(operations)};
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 19e1be1..0acde65 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -71,7 +71,7 @@ struct ResourceProviderMessage
 
   struct UpdateState
   {
-    ResourceProviderInfo info;
+    ResourceProviderID resourceProviderId;
     UUID resourceVersion;
     Resources totalResources;
     hashmap<UUID, Operation> operations;
@@ -124,7 +124,7 @@ inline std::ostream& operator<<(
       CHECK_SOME(updateState);
 
       return stream
-          << updateState->info.id() << " "
+          << updateState->resourceProviderId << " "
           << updateState->totalResources;
     }
 
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 4829f00..3bd808e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7670,105 +7670,84 @@ void Slave::handleResourceProviderMessage(
       const ResourceProviderMessage::UpdateState& updateState =
         message->updateState.get();
 
-      CHECK(updateState.info.has_id());
-      const ResourceProviderID& resourceProviderId = updateState.info.id();
-
       ResourceProvider* resourceProvider =
-        getResourceProvider(resourceProviderId);
-
-      if (resourceProvider == nullptr) {
-        resourceProvider = new ResourceProvider(
-            updateState.info,
-            updateState.totalResources,
-            updateState.resourceVersion);
+        getResourceProvider(updateState.resourceProviderId);
 
-        addResourceProvider(resourceProvider);
-
-        foreachvalue (const Operation& operation,
-                      updateState.operations) {
-          addOperation(new Operation(operation));
-        }
+      CHECK(resourceProvider);
 
+      if (resourceProvider->totalResources != updateState.totalResources) {
         // Update the 'total' in the Slave.
+        CHECK(totalResources.contains(resourceProvider->totalResources));
+        totalResources -= resourceProvider->totalResources;
         totalResources += updateState.totalResources;
-      } else {
-        // Always update the resource provider info.
-        resourceProvider->info = updateState.info;
 
-        if (resourceProvider->totalResources != updateState.totalResources) {
-          // Update the 'total' in the Slave.
-          CHECK(totalResources.contains(resourceProvider->totalResources));
-          totalResources -= resourceProvider->totalResources;
-          totalResources += updateState.totalResources;
-
-          // Update the 'total' in the resource provider.
-          resourceProvider->totalResources = updateState.totalResources;
-        }
+        // Update the 'total' in the resource provider.
+        resourceProvider->totalResources = updateState.totalResources;
+      }
 
-        // Update operation state.
-        //
-        // We only update operations which are not contained in both
-        // the known and just received sets. All other operations will
-        // be updated via relayed operation status updates.
-        const hashset<UUID> knownUuids = resourceProvider->operations.keys();
-        const hashset<UUID> receivedUuids = updateState.operations.keys();
-
-        // Handle operations known to the agent but not reported by
-        // the resource provider. These could be operations where the
-        // agent has started tracking an operation, but the resource
-        // provider failed over before it could bookkeep the
-        // operation.
+      // Update operation state.
+      //
+      // We only update operations which are not contained in both
+      // the known and just received sets. All other operations will
+      // be updated via relayed operation status updates.
+      const hashset<UUID> knownUuids = resourceProvider->operations.keys();
+      const hashset<UUID> receivedUuids = updateState.operations.keys();
+
+      // Handle operations known to the agent but not reported by
+      // the resource provider. These could be operations where the
+      // agent has started tracking an operation, but the resource
+      // provider failed over before it could bookkeep the
+      // operation.
+      //
+      // NOTE: We do not mutate operations statuses here; this would
+      // be the responsibility of an operation status update handler.
+      hashset<UUID> disappearedUuids = knownUuids - receivedUuids;
+      foreach (const UUID& uuid, disappearedUuids) {
+        // TODO(bbannier): Instead of simply dropping an operation
+        // with `removeOperation` here we should instead send a
+        // `Reconcile` message with a failed state to the resource
+        // provider so its status update manager can reliably
+        // deliver the operation status to the framework.
+        removeOperation(resourceProvider->operations.at(uuid));
+      }
+
+      // Handle operations known to the resource provider but not
+      // the agent. This can happen if the agent failed over and the
+      // resource provider reregistered.
+      hashset<UUID> reappearedUuids = receivedUuids - knownUuids;
+      foreach (const UUID& uuid, reappearedUuids) {
+        // Start tracking this operation.
         //
-        // NOTE: We do not mutate operations statuses here; this would
-        // be the responsibility of an operation status update handler.
-        hashset<UUID> disappearedUuids = knownUuids - receivedUuids;
-        foreach (const UUID& uuid, disappearedUuids) {
-          // TODO(bbannier): Instead of simply dropping an operation
-          // with `removeOperation` here we should instead send a
-          // `Reconcile` message with a failed state to the resource
-          // provider so its status update manager can reliably
-          // deliver the operation status to the framework.
-          removeOperation(resourceProvider->operations.at(uuid));
-        }
-
-        // Handle operations known to the resource provider but not
-        // the agent. This can happen if the agent failed over and the
-        // resource provider reregistered.
-        hashset<UUID> reappearedUuids = receivedUuids - knownUuids;
-        foreach (const UUID& uuid, reappearedUuids) {
-          // Start tracking this operation.
-          //
-          // NOTE: We do not need to update total resources here as its
-          // state was sync explicitly with the received total above.
-          addOperation(new Operation(updateState.operations.at(uuid)));
-        }
+        // NOTE: We do not need to update total resources here as its
+        // state was sync explicitly with the received total above.
+        addOperation(new Operation(updateState.operations.at(uuid)));
+      }
 
-        // Handle operations known to both the agent and the resource provider.
-        //
-        // If an operation became terminal, its result is already reflected in
-        // the total resources reported by the resource provider, and thus it
-        // should not be applied again in an operation status update handler
-        // when its terminal status update arrives. So we set the terminal
-        // `latest_status` here to prevent resource convervions elsewhere.
-        //
-        // NOTE: We only update the `latest_status` of a known operation if it
-        // is not terminal yet here; its `statuses` would be updated by an
-        // operation status update handler.
-        hashset<UUID> matchedUuids = knownUuids - disappearedUuids;
-        foreach (const UUID& uuid, matchedUuids) {
-          const Operation& operation = updateState.operations.at(uuid);
-          if (operation.has_latest_status() &&
-              protobuf::isTerminalState(operation.latest_status().state())) {
-            updateOperationLatestStatus(
-                getOperation(uuid),
-                operation.latest_status());
-          }
+      // Handle operations known to both the agent and the resource provider.
+      //
+      // If an operation became terminal, its result is already reflected in
+      // the total resources reported by the resource provider, and thus it
+      // should not be applied again in an operation status update handler
+      // when its terminal status update arrives. So we set the terminal
+      // `latest_status` here to prevent resource convervions elsewhere.
+      //
+      // NOTE: We only update the `latest_status` of a known operation if it
+      // is not terminal yet here; its `statuses` would be updated by an
+      // operation status update handler.
+      hashset<UUID> matchedUuids = knownUuids - disappearedUuids;
+      foreach (const UUID& uuid, matchedUuids) {
+        const Operation& operation = updateState.operations.at(uuid);
+        if (operation.has_latest_status() &&
+            protobuf::isTerminalState(operation.latest_status().state())) {
+          updateOperationLatestStatus(
+              getOperation(uuid),
+              operation.latest_status());
         }
-
-        // Update resource version of this resource provider.
-        resourceProvider->resourceVersion = updateState.resourceVersion;
       }
 
+      // Update resource version of this resource provider.
+      resourceProvider->resourceVersion = updateState.resourceVersion;
+
       // Send the updated resources to the master if the agent is running. Note
       // that since we have already updated our copy of the latest resource
       // provider resources, it is safe to consume this message and wait for the
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 33b1055..5bb740e 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -315,10 +315,9 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
     AWAIT_READY(message);
 
     EXPECT_EQ(ResourceProviderMessage::Type::UPDATE_STATE, message->type);
-    ASSERT_TRUE(message->updateState->info.has_id());
     EXPECT_EQ(
         devolve(resourceProviderId.get()),
-        message->updateState->info.id());
+        message->updateState->resourceProviderId);
     EXPECT_EQ(devolve(resources), message->updateState->totalResources);
   }
 }


[mesos] 02/06: Made resource provider manager emit an event when provider subscribed.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 700a62313e8f0fce94f43c9be6f54fc848cf6eb5
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Mon Aug 13 11:10:44 2018 +0200

    Made resource provider manager emit an event when provider subscribed.
    
    This patch adds a `SUBSCRIBE` resource provider message which is
    emitted by the resource provider manager when a resource provider
    subscribes. This event can e.g., be used to detect subscriptions when
    theresource provider never updates any resources.
    
    We currently do not expose this event from the agent outwards.
    
    Review: https://reviews.apache.org/r/68143/
---
 src/resource_provider/manager.cpp             |  8 +++++
 src/resource_provider/message.hpp             | 17 ++++++++++
 src/slave/slave.cpp                           | 21 ++++++++++++
 src/tests/resource_provider_manager_tests.cpp | 46 ++++++---------------------
 4 files changed, 56 insertions(+), 36 deletions(-)

diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index abd7e38..118c86c 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -798,10 +798,18 @@ void ResourceProviderManagerProcess::_subscribe(
         std::move(resourceProvider_));
   }
 
+  ResourceProviderMessage::Subscribe subscribe{resourceProvider->info};
+
+  ResourceProviderMessage message;
+  message.type = ResourceProviderMessage::Type::SUBSCRIBE;
+  message.subscribe = std::move(subscribe);
+
   // TODO(jieyu): Start heartbeat for the resource provider.
   resourceProviders.subscribed.put(
       resourceProviderId,
       std::move(resourceProvider));
+
+  messages.put(std::move(message));
 }
 
 
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 9307f88..39d9936 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -40,6 +40,7 @@ struct ResourceProviderMessage
 {
   enum class Type
   {
+    SUBSCRIBE,
     UPDATE_STATE,
     UPDATE_OPERATION_STATUS,
     DISCONNECT
@@ -47,6 +48,8 @@ struct ResourceProviderMessage
 
   friend std::ostream& operator<<(std::ostream& stream, const Type& type) {
     switch (type) {
+      case Type::SUBSCRIBE:
+        return stream << "SUBSCRIBE";
       case Type::UPDATE_STATE:
         return stream << "UPDATE_STATE";
       case Type::UPDATE_OPERATION_STATUS:
@@ -58,6 +61,11 @@ struct ResourceProviderMessage
     UNREACHABLE();
   }
 
+  struct Subscribe
+  {
+    ResourceProviderInfo info;
+  };
+
   struct UpdateState
   {
     ResourceProviderInfo info;
@@ -78,6 +86,7 @@ struct ResourceProviderMessage
 
   Type type;
 
+  Option<Subscribe> subscribe;
   Option<UpdateState> updateState;
   Option<UpdateOperationStatus> updateOperationStatus;
   Option<Disconnect> disconnect;
@@ -91,6 +100,14 @@ inline std::ostream& operator<<(
   stream << stringify(resourceProviderMessage.type) << ": ";
 
   switch (resourceProviderMessage.type) {
+    case ResourceProviderMessage::Type::SUBSCRIBE: {
+      const Option<ResourceProviderMessage::Subscribe>& subscribe =
+        resourceProviderMessage.subscribe;
+
+      CHECK_SOME(subscribe);
+
+      return stream << subscribe->info;
+    }
     case ResourceProviderMessage::Type::UPDATE_STATE: {
       const Option<ResourceProviderMessage::UpdateState>&
         updateState = resourceProviderMessage.updateState;
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7fc90d1..735795e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7643,6 +7643,27 @@ void Slave::handleResourceProviderMessage(
   LOG(INFO) << "Handling resource provider message '" << message.get() << "'";
 
   switch(message->type) {
+    case ResourceProviderMessage::Type::SUBSCRIBE: {
+      CHECK_SOME(message->subscribe);
+
+      const ResourceProviderMessage::Subscribe& subscribe =
+        message->subscribe.get();
+
+      CHECK(subscribe.info.has_id());
+
+      ResourceProvider* resourceProvider =
+        getResourceProvider(subscribe.info.id());
+
+      if (resourceProvider == nullptr) {
+        resourceProvider = new ResourceProvider(subscribe.info, {}, None());
+
+        addResourceProvider(resourceProvider);
+      } else {
+        // Always update the resource provider info.
+        resourceProvider->info = subscribe.info;
+      }
+      break;
+    }
     case ResourceProviderMessage::Type::UPDATE_STATE: {
       CHECK_SOME(message->updateState);
 
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0b9e985..433992c 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -267,24 +267,11 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
     CHECK_SOME(uuid);
     streamId = uuid.get();
 
-    Option<http::Pipe::Reader> reader = response->reader;
-    ASSERT_SOME(reader);
-
-    recordio::Reader<Event> responseDecoder(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
-        reader.get());
-
-    Future<Result<Event>> event = responseDecoder.read();
-    AWAIT_READY(event);
-    ASSERT_SOME(event.get());
-
-    // Check event type is subscribed and the resource provider id is set.
-    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
-
-    resourceProviderId = event->get().subscribed().provider_id();
-
-    EXPECT_FALSE(resourceProviderId->value().empty());
+    Future<ResourceProviderMessage> message = manager.messages().get();
+    AWAIT_READY(message);
+    ASSERT_EQ(ResourceProviderMessage::Type::SUBSCRIBE, message->type);
+    ASSERT_TRUE(message->subscribe->info.has_id());
+    resourceProviderId = evolve(message->subscribe->info.id());
   }
 
   // Then, update the total resources to the manager.
@@ -375,24 +362,11 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOperationStatus)
     CHECK_SOME(uuid);
     streamId = uuid.get();
 
-    Option<http::Pipe::Reader> reader = response->reader;
-    ASSERT_SOME(reader);
-
-    recordio::Reader<Event> responseDecoder(
-        ::recordio::Decoder<Event>(
-            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
-        reader.get());
-
-    Future<Result<Event>> event = responseDecoder.read();
-    AWAIT_READY(event);
-    ASSERT_SOME(event.get());
-
-    // Check event type is subscribed and the resource provider id is set.
-    ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
-
-    resourceProviderId = event->get().subscribed().provider_id();
-
-    EXPECT_FALSE(resourceProviderId->value().empty());
+    Future<ResourceProviderMessage> message = manager.messages().get();
+    AWAIT_READY(message);
+    ASSERT_EQ(ResourceProviderMessage::Type::SUBSCRIBE, message->type);
+    ASSERT_TRUE(message->subscribe->info.has_id());
+    resourceProviderId = evolve(message->subscribe->info.id());
   }
 
   // Then, send an operation status update to the manager.


[mesos] 01/06: Allowed tracking of resource providers w/o resource version in agent.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 701e014a69b1f692f011e79169582c701ebf4f3c
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Fri Aug 17 12:00:43 2018 +0200

    Allowed tracking of resource providers w/o resource version in agent.
    
    This will be used in a later patch introducing explicit handling of
    resource provider subscription events in the agent.
    
    Review: https://reviews.apache.org/r/68407
---
 src/slave/slave.cpp | 9 ++++++++-
 src/slave/slave.hpp | 4 ++--
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e6c7e68..7fc90d1 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7579,6 +7579,13 @@ UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
   }
 
   foreachvalue (ResourceProvider* resourceProvider, resourceProviders) {
+    // If the resource provider has not updated its state we do not
+    // need to and cannot include its information in an
+    // `UpdateSlaveMessage` since it requires a resource version.
+    if (resourceProvider->resourceVersion.isNone()) {
+      continue;
+    }
+
     UpdateSlaveMessage::ResourceProvider* provider =
       message.mutable_resource_providers()->add_providers();
 
@@ -7587,7 +7594,7 @@ UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
     provider->mutable_total_resources()->CopyFrom(
         resourceProvider->totalResources);
     provider->mutable_resource_version_uuid()->CopyFrom(
-        resourceProvider->resourceVersion);
+        resourceProvider->resourceVersion.get());
 
     provider->mutable_operations();
 
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b1d695b..6757eae 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -1182,7 +1182,7 @@ struct ResourceProvider
   ResourceProvider(
       const ResourceProviderInfo& _info,
       const Resources& _totalResources,
-      const UUID& _resourceVersion)
+      const Option<UUID>& _resourceVersion)
     : info(_info),
       totalResources(_totalResources),
       resourceVersion(_resourceVersion) {}
@@ -1204,7 +1204,7 @@ struct ResourceProvider
   // different resource version UUID than that it maintains, because
   // this means the operation is operating on resources that might
   // have already been invalidated.
-  UUID resourceVersion;
+  Option<UUID> resourceVersion;
 
   // Pending operations or terminal operations that have
   // unacknowledged status updates.


[mesos] 03/06: Added methods to remove resource providers from provider manager.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4b558e24594b43456f35de697ed8484bbb331fe1
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Mon Aug 13 11:10:51 2018 +0200

    Added methods to remove resource providers from provider manager.
    
    This patch adds a method to remove a resource provider from the
    resource provider manager. The resource provider will be marked as
    removed in the manager's registry and disconnected. We also expose a
    new `REMOVE` event whenever a resource provider was removed.
    
    This patch does not add integration with e.g., the agent.
    
    Review: https://reviews.apache.org/r/68144/
---
 src/resource_provider/manager.cpp             |  51 +++++++++++++
 src/resource_provider/manager.hpp             |   6 ++
 src/resource_provider/message.hpp             |  22 +++++-
 src/slave/slave.cpp                           |  11 +++
 src/tests/resource_provider_manager_tests.cpp | 101 ++++++++++++++++++++++++++
 5 files changed, 190 insertions(+), 1 deletion(-)

diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 118c86c..4199e5b 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -63,6 +63,7 @@ using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
+using mesos::resource_provider::RemoveResourceProvider;
 
 using mesos::resource_provider::registry::Registry;
 
@@ -191,6 +192,9 @@ public:
 
   void reconcileOperations(const ReconcileOperationsMessage& message);
 
+  Future<Nothing> removeResourceProvider(
+      const ResourceProviderID& resourceProviderId);
+
   Future<Nothing> publishResources(const Resources& resources);
 
   Queue<ResourceProviderMessage> messages;
@@ -601,6 +605,44 @@ void ResourceProviderManagerProcess::reconcileOperations(
 }
 
 
+Future<Nothing> ResourceProviderManagerProcess::removeResourceProvider(
+    const ResourceProviderID& resourceProviderId)
+{
+  LOG(INFO) << "Removing resource provider " << resourceProviderId;
+
+  Future<bool> removeResourceProvider = registrar
+    ->apply(Owned<mesos::resource_provider::Registrar::Operation>(
+        new RemoveResourceProvider(resourceProviderId)));
+
+  removeResourceProvider.onAny(
+      [resourceProviderId](const Future<bool>& removeResourceProvider) {
+        if (!removeResourceProvider.isReady()) {
+          LOG(ERROR) << "Not removing resource provider " << resourceProviderId
+                     << " as registry update did not succeed: "
+                     << removeResourceProvider;
+        }
+      });
+
+  return removeResourceProvider
+    .then(defer(
+        self(),
+        [this, resourceProviderId](const Future<bool>& removeResourceProvider) {
+          resourceProviders.known.erase(resourceProviderId);
+          resourceProviders.subscribed.erase(resourceProviderId);
+
+          ResourceProviderMessage::Remove remove{resourceProviderId};
+
+          ResourceProviderMessage message;
+          message.type = ResourceProviderMessage::Type::REMOVE;
+          message.remove = std::move(remove);
+
+          messages.put(std::move(message));
+
+          return Nothing();
+        }));
+}
+
+
 Future<Nothing> ResourceProviderManagerProcess::publishResources(
     const Resources& resources)
 {
@@ -988,6 +1030,15 @@ void ResourceProviderManager::reconcileOperations(
 }
 
 
+Future<Nothing> ResourceProviderManager::removeResourceProvider(
+    const ResourceProviderID& resourceProviderId) const
+{
+  return dispatch(
+      process.get(),
+      &ResourceProviderManagerProcess::removeResourceProvider,
+      resourceProviderId);
+}
+
 Future<Nothing> ResourceProviderManager::publishResources(
     const Resources& resources)
 {
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index 6c57956..33b25e3 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -17,6 +17,8 @@
 #ifndef __RESOURCE_PROVIDER_MANAGER_HPP__
 #define __RESOURCE_PROVIDER_MANAGER_HPP__
 
+#include <stout/nothing.hpp>
+
 #include <process/authenticator.hpp>
 #include <process/future.hpp>
 #include <process/http.hpp>
@@ -66,6 +68,10 @@ public:
   void reconcileOperations(
       const ReconcileOperationsMessage& message) const;
 
+  // Permanently remove a resource provider.
+  process::Future<Nothing> removeResourceProvider(
+      const ResourceProviderID& resourceProviderId) const;
+
   // Ensure that the resources are ready for use.
   process::Future<Nothing> publishResources(const Resources& resources);
 
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 39d9936..19e1be1 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -43,7 +43,8 @@ struct ResourceProviderMessage
     SUBSCRIBE,
     UPDATE_STATE,
     UPDATE_OPERATION_STATUS,
-    DISCONNECT
+    DISCONNECT,
+    REMOVE
   };
 
   friend std::ostream& operator<<(std::ostream& stream, const Type& type) {
@@ -56,6 +57,8 @@ struct ResourceProviderMessage
         return stream << "UPDATE_OPERATION_STATUS";
       case Type::DISCONNECT:
         return stream << "DISCONNECT";
+      case Type::REMOVE:
+        return stream << "REMOVE";
     }
 
     UNREACHABLE();
@@ -84,12 +87,18 @@ struct ResourceProviderMessage
     ResourceProviderID resourceProviderId;
   };
 
+  struct Remove
+  {
+    ResourceProviderID resourceProviderId;
+  };
+
   Type type;
 
   Option<Subscribe> subscribe;
   Option<UpdateState> updateState;
   Option<UpdateOperationStatus> updateOperationStatus;
   Option<Disconnect> disconnect;
+  Option<Remove> remove;
 };
 
 
@@ -147,6 +156,17 @@ inline std::ostream& operator<<(
           << "resource provider "
           << disconnect->resourceProviderId;
     }
+
+    case ResourceProviderMessage::Type::REMOVE: {
+      const Option<ResourceProviderMessage::Remove>& remove =
+        resourceProviderMessage.remove;
+
+      CHECK_SOME(remove);
+
+      return stream
+          << "resource provider "
+          << remove->resourceProviderId;
+    }
   }
 
   UNREACHABLE();
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 735795e..4829f00 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7919,6 +7919,17 @@ void Slave::handleResourceProviderMessage(
       }
       break;
     }
+    case ResourceProviderMessage::Type::REMOVE: {
+      CHECK_SOME(message->remove);
+
+      const ResourceProviderID& resourceProviderId =
+        message->remove->resourceProviderId;
+
+      resourceProviders.erase(resourceProviderId);
+
+      LOG(INFO) << "Removed resource provider '" << resourceProviderId << "'";
+      break;
+    }
   }
 
   // Wait for the next message.
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 433992c..33b1055 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -39,6 +39,7 @@
 #include <stout/error.hpp>
 #include <stout/gtest.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/recordio.hpp>
@@ -52,6 +53,8 @@
 
 #include "internal/devolve.hpp"
 
+#include "master/detector/standalone.hpp"
+
 #include "resource_provider/manager.hpp"
 #include "resource_provider/registrar.hpp"
 
@@ -65,6 +68,7 @@ namespace http = process::http;
 using mesos::internal::slave::Slave;
 
 using mesos::master::detector::MasterDetector;
+using mesos::master::detector::StandaloneMasterDetector;
 
 using mesos::state::InMemoryStorage;
 using mesos::state::State;
@@ -1478,6 +1482,103 @@ TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
   EXPECT_EQ(1, snapshot.values.at("resource_provider_manager/subscribed"));
 }
 
+
+TEST_F(ResourceProviderManagerHttpApiTest, RemoveResourceProvider)
+{
+  const ContentType contentType = ContentType::PROTOBUF;
+
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+
+  Future<ResourceProviderMessage> message = manager.messages().get();
+
+  Option<ResourceProviderID> resourceProviderId;
+
+  // Subscribe a resource provider.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+    AWAIT_READY(response);
+
+    AWAIT_READY(message);
+    ASSERT_EQ(ResourceProviderMessage::Type::SUBSCRIBE, message->type);
+    ASSERT_SOME(message->subscribe);
+    ASSERT_TRUE(message->subscribe->info.has_id());
+    resourceProviderId = message->subscribe->info.id();
+  }
+
+  // Remove the resource provider. We expect to receive a notification.
+  message = manager.messages().get();
+
+  Future<Nothing> removeResourceProvider =
+    manager.removeResourceProvider(resourceProviderId.get());
+
+  AWAIT_READY(removeResourceProvider);
+
+  AWAIT_READY(message);
+  ASSERT_EQ(ResourceProviderMessage::Type::REMOVE, message->type);
+  ASSERT_SOME(message->remove);
+  EXPECT_EQ(resourceProviderId.get(), message->remove->resourceProviderId);
+
+  // Attempting to resubscribe this resource provider fails.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    mesos::v1::ResourceProviderInfo* info =
+      subscribe->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.rp.test");
+    info->set_name("test");
+    info->mutable_id()->CopyFrom(evolve(resourceProviderId.get()));
+
+    http::Request request;
+    request.method = "POST";
+    request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    request.headers["Accept"] = stringify(contentType);
+    request.headers["Content-Type"] = stringify(contentType);
+    request.body = serialize(contentType, call);
+
+    Future<http::Response> response = manager.api(request, None());
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    ASSERT_EQ(http::Response::PIPE, response->type);
+
+    Option<http::Pipe::Reader> reader = response->reader;
+    ASSERT_SOME(reader);
+
+    recordio::Reader<Event> responseDecoder(
+        ::recordio::Decoder<Event>(
+            lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+        reader.get());
+
+    // We expect the manager to drop the subscribe call since
+    // the resource provider is not known at this point.
+    Future<Result<Event>> event = responseDecoder.read();
+    AWAIT_READY(event);
+    EXPECT_NONE(event.get());
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[mesos] 04/06: Added actions and ACLs to authorize removal of resource providers.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f83b31867c86e35f38fd538993138768939291f0
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Mon Aug 13 11:11:04 2018 +0200

    Added actions and ACLs to authorize removal of resource providers.
    
    Review: https://reviews.apache.org/r/68146/
---
 include/mesos/authorizer/acls.proto       | 13 ++++++-
 include/mesos/authorizer/authorizer.proto | 12 ++++---
 src/authorizer/local/authorizer.cpp       | 45 +++++++++++++++++------
 src/tests/authorization_tests.cpp         | 60 +++++++++++++++++++++++++++++++
 4 files changed, 114 insertions(+), 16 deletions(-)

diff --git a/include/mesos/authorizer/acls.proto b/include/mesos/authorizer/acls.proto
index f5d2580..4c3f290 100644
--- a/include/mesos/authorizer/acls.proto
+++ b/include/mesos/authorizer/acls.proto
@@ -494,6 +494,16 @@ message ACL {
     required Entity users = 2;
   }
 
+  // Which principals are authorized to mark resource providers as gone.
+  message MarkResourceProvidersGone {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: Given implicitly.
+    // Use Entity type ANY or NONE to allow or deny access.
+    required Entity resource_providers = 2;
+  }
+
   // Which principals are authorized to add, update and remove resource
   // provider config files.
   message ModifyResourceProviderConfig {
@@ -643,11 +653,12 @@ message ACLs {
   repeated ACL.RemoveStandaloneContainer remove_standalone_containers = 44;
   repeated ACL.ViewStandaloneContainer view_standalone_containers = 46;
   repeated ACL.ModifyResourceProviderConfig modify_resource_provider_configs = 45;
+  repeated ACL.MarkResourceProvidersGone mark_resource_providers_gone = 54;
+  repeated ACL.ViewResourceProvider view_resource_providers = 53;
   repeated ACL.PruneImages prune_images = 47;
   repeated ACL.ResizeVolume resize_volumes = 48;
   repeated ACL.CreateBlockDisk create_block_disks = 49;
   repeated ACL.DestroyBlockDisk destroy_block_disks = 50;
   repeated ACL.CreateMountDisk create_mount_disks = 51;
   repeated ACL.DestroyMountDisk destroy_mount_disks = 52;
-  repeated ACL.ViewResourceProvider view_resource_providers = 53;
 }
diff --git a/include/mesos/authorizer/authorizer.proto b/include/mesos/authorizer/authorizer.proto
index 7330416..a51d2f2 100644
--- a/include/mesos/authorizer/authorizer.proto
+++ b/include/mesos/authorizer/authorizer.proto
@@ -252,6 +252,14 @@ enum Action {
   MODIFY_RESOURCE_PROVIDER_CONFIG = 39;
 
   // This action will not fill in any object fields. A principal is either
+  // allowed to mark a resource provider as gone or is unauthorized.
+  MARK_RESOURCE_PROVIDER_GONE = 48;
+
+  // This action will not fill in any object fields. A principal is either
+  // allowed to view resource provider information or is unauthorized.
+  VIEW_RESOURCE_PROVIDER = 47;
+
+  // This action will not fill in any object fields. A principal is either
   // allowed to prune unused container images or is unauthorized.
   PRUNE_IMAGES = 41;
 
@@ -269,10 +277,6 @@ enum Action {
 
   // `DESTROY_MOUNT_DISK` will have an object with `Resource` set.
   DESTROY_MOUNT_DISK = 46;
-
-  // This action will not fill in any object fields. A principal is either
-  // allowed to view resource provider information or is unauthorized.
-  VIEW_RESOURCE_PROVIDER = 47;
 }
 
 
diff --git a/src/authorizer/local/authorizer.cpp b/src/authorizer/local/authorizer.cpp
index f99b88e..3ab1b3b 100644
--- a/src/authorizer/local/authorizer.cpp
+++ b/src/authorizer/local/authorizer.cpp
@@ -412,8 +412,9 @@ public:
         case authorization::STOP_MAINTENANCE:
         case authorization::UPDATE_MAINTENANCE_SCHEDULE:
         case authorization::MODIFY_RESOURCE_PROVIDER_CONFIG:
-        case authorization::PRUNE_IMAGES:
+        case authorization::MARK_RESOURCE_PROVIDER_GONE:
         case authorization::VIEW_RESOURCE_PROVIDER:
+        case authorization::PRUNE_IMAGES:
           aclObject.set_type(ACL::Entity::ANY);
 
           break;
@@ -732,6 +733,7 @@ public:
         case authorization::WAIT_NESTED_CONTAINER:
         case authorization::WAIT_STANDALONE_CONTAINER:
         case authorization::MODIFY_RESOURCE_PROVIDER_CONFIG:
+        case authorization::MARK_RESOURCE_PROVIDER_GONE:
         case authorization::VIEW_RESOURCE_PROVIDER:
         case authorization::UNKNOWN:
           UNREACHABLE();
@@ -977,6 +979,7 @@ public:
       case authorization::WAIT_NESTED_CONTAINER:
       case authorization::WAIT_STANDALONE_CONTAINER:
       case authorization::MODIFY_RESOURCE_PROVIDER_CONFIG:
+      case authorization::MARK_RESOURCE_PROVIDER_GONE:
       case authorization::VIEW_RESOURCE_PROVIDER:
         UNREACHABLE();
     }
@@ -1196,6 +1199,7 @@ public:
       case authorization::WAIT_NESTED_CONTAINER:
       case authorization::WAIT_STANDALONE_CONTAINER:
       case authorization::MODIFY_RESOURCE_PROVIDER_CONFIG:
+      case authorization::MARK_RESOURCE_PROVIDER_GONE:
       case authorization::VIEW_RESOURCE_PROVIDER:
       case authorization::UNKNOWN: {
         Result<vector<GenericACL>> genericACLs =
@@ -1552,11 +1556,12 @@ private:
         }
 
         return acls_;
-      case authorization::PRUNE_IMAGES:
-        foreach (const ACL::PruneImages& acl, acls.prune_images()) {
+      case authorization::MARK_RESOURCE_PROVIDER_GONE:
+        foreach (const ACL::MarkResourceProvidersGone& acl,
+                 acls.mark_resource_providers_gone()) {
           GenericACL acl_;
           acl_.subjects = acl.principals();
-          acl_.objects = acl.images();
+          acl_.objects = acl.resource_providers();
 
           acls_.push_back(acl_);
         }
@@ -1574,6 +1579,16 @@ private:
         }
 
         return acls_;
+      case authorization::PRUNE_IMAGES:
+        foreach (const ACL::PruneImages& acl, acls.prune_images()) {
+          GenericACL acl_;
+          acl_.subjects = acl.principals();
+          acl_.objects = acl.images();
+
+          acls_.push_back(acl_);
+        }
+
+        return acls_;
       case authorization::REGISTER_FRAMEWORK:
       case authorization::CREATE_VOLUME:
       case authorization::RESIZE_VOLUME:
@@ -1749,6 +1764,21 @@ Option<Error> LocalAuthorizer::validate(const ACLs& acls)
     }
   }
 
+  foreach (const ACL::MarkResourceProvidersGone& acl,
+           acls.mark_resource_providers_gone()) {
+    if (acl.resource_providers().type() == ACL::Entity::SOME) {
+      return Error(
+          "ACL.MarkResourceProvidersGone type must be either NONE or ANY");
+    }
+  }
+
+  foreach (const ACL::ViewResourceProvider& acl,
+           acls.view_resource_providers()) {
+    if (acl.resource_providers().type() == ACL::Entity::SOME) {
+      return Error("ACL.ViewResourceProvider type must be either NONE or ANY");
+    }
+  }
+
   foreach (const ACL::ModifyResourceProviderConfig& acl,
            acls.modify_resource_provider_configs()) {
     if (acl.resource_providers().type() == ACL::Entity::SOME) {
@@ -1763,13 +1793,6 @@ Option<Error> LocalAuthorizer::validate(const ACLs& acls)
     }
   }
 
-  foreach (const ACL::ViewResourceProvider& acl,
-           acls.view_resource_providers()) {
-    if (acl.resource_providers().type() == ACL::Entity::SOME) {
-      return Error("ACL.ViewResourceProvider type must be either NONE or ANY");
-    }
-  }
-
   // TODO(alexr): Consider validating not only protobuf, but also the original
   // JSON in order to spot misspelled names. A misspelled action may affect
   // authorization result and hence lead to a security issue (e.g. when there
diff --git a/src/tests/authorization_tests.cpp b/src/tests/authorization_tests.cpp
index de57fc9..ac52181 100644
--- a/src/tests/authorization_tests.cpp
+++ b/src/tests/authorization_tests.cpp
@@ -5783,6 +5783,66 @@ TYPED_TEST(AuthorizationTest, ViewStandaloneContainer)
 }
 
 
+// This tests the authorization of requests to MarkResourceProviderGone.
+TYPED_TEST(AuthorizationTest, MarkResourceProviderGone)
+{
+  ACLs acls;
+
+  {
+    // "foo" principal can mark resource providers gone.
+    mesos::ACL::MarkResourceProvidersGone* acl =
+      acls.add_mark_resource_providers_gone();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_resource_providers()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // Nobody else can mark resource providers gone.
+    mesos::ACL::MarkResourceProvidersGone* acl =
+      acls.add_mark_resource_providers_gone();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_resource_providers()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  {
+    // "foo" is allowed to mark resource providers gone. The request
+    // should succeed.
+    authorization::Request request;
+    request.set_action(authorization::MARK_RESOURCE_PROVIDER_GONE);
+    request.mutable_subject()->set_value("foo");
+
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    // "bar" is not allowed to mark resource provider gone. The
+    // request should fail.
+    authorization::Request request;
+    request.set_action(authorization::MARK_RESOURCE_PROVIDER_GONE);
+    request.mutable_subject()->set_value("bar");
+
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    // Test that no authorizer is created with invalid ACLs.
+    ACLs invalid;
+
+    mesos::ACL::MarkResourceProvidersGone* acl =
+      invalid.add_mark_resource_providers_gone();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_resource_providers()->add_values("yoda");
+
+    Try<Authorizer*> create = TypeParam::create(parameterize(invalid));
+    EXPECT_ERROR(create);
+  }
+}
+
+
 // This tests the authorization of requests to ModifyResourceProviderConfig.
 TYPED_TEST(AuthorizationTest, ModifyResourceProviderConfig)
 {


[mesos] 06/06: Sent an event to resource providers when they are removed.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 6a98857cf429580b72cd97ddd749a074c3e4524d
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Mon Aug 13 11:10:59 2018 +0200

    Sent an event to resource providers when they are removed.
    
    In order to allow proper cleanup the resource provider manager sends a
    `REMOVED` to a resource provider when it is being removed.
    
    The event is not sent reliably, i.e., if the resource provider was
    e.g., not subscribed when it was removed we currently will never
    attempt to resend the event.
    
    Review: https://reviews.apache.org/r/68145/
---
 .../resource_provider/resource_provider.proto      |  1 +
 .../v1/resource_provider/resource_provider.proto   |  1 +
 src/resource_provider/manager.cpp                  | 22 ++++++++++++++++++++++
 src/resource_provider/storage/provider.cpp         |  4 ++++
 src/tests/mesos.hpp                                | 22 ++++++++++++++++++++++
 5 files changed, 50 insertions(+)

diff --git a/include/mesos/resource_provider/resource_provider.proto b/include/mesos/resource_provider/resource_provider.proto
index 7c68333..5ea9e20 100644
--- a/include/mesos/resource_provider/resource_provider.proto
+++ b/include/mesos/resource_provider/resource_provider.proto
@@ -39,6 +39,7 @@ message Event {
     PUBLISH_RESOURCES = 3;            // See 'PublishResources' below.
     ACKNOWLEDGE_OPERATION_STATUS = 4; // See 'AcknowledgeOperationStatus' below.
     RECONCILE_OPERATIONS = 5;         // See 'ReconcileOperations' below.
+    TEARDOWN = 6;                     // Sent on resource provider teardown.
   }
 
   // First event received by the resource provider when it subscribes
diff --git a/include/mesos/v1/resource_provider/resource_provider.proto b/include/mesos/v1/resource_provider/resource_provider.proto
index 535b898..8f67673 100644
--- a/include/mesos/v1/resource_provider/resource_provider.proto
+++ b/include/mesos/v1/resource_provider/resource_provider.proto
@@ -39,6 +39,7 @@ message Event {
     PUBLISH_RESOURCES = 3;            // See 'PublishResources' below.
     ACKNOWLEDGE_OPERATION_STATUS = 4; // See 'AcknowledgeOperationStatus' below.
     RECONCILE_OPERATIONS = 5;         // See 'ReconcileOperations' below.
+    TEARDOWN = 6;                     // Sent on resource provider teardown.
   }
 
   // First event received by the resource provider when it subscribes
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index f4a5090..6c81c43 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -627,6 +627,28 @@ Future<Nothing> ResourceProviderManagerProcess::removeResourceProvider(
     .then(defer(
         self(),
         [this, resourceProviderId](const Future<bool>& removeResourceProvider) {
+          // TODO(bbannier): We should also on a best effort basis send a
+          // `TEARDOWN` event when a removed resource provider attempts to
+          // resubscribe. This can happen e.g., if the event gets lost on its
+          // way to the resource provider, or if the resource provider is not
+          // connected.
+          if (resourceProviders.subscribed.contains(resourceProviderId)) {
+            const Owned<ResourceProvider>& resourceProvider =
+              resourceProviders.subscribed.at(resourceProviderId);
+            Event event;
+            event.set_type(Event::TEARDOWN);
+
+            if (!resourceProvider->http.send(event)) {
+              LOG(WARNING)
+                << "Failed to send TEARDOWN event to resource provider "
+                << resourceProviderId << ": connection closed";
+            }
+          } else {
+            LOG(WARNING)
+              << "Failed to send TEARDOWN event to resource provider "
+              << resourceProviderId << ": resource provider not subscribed";
+          }
+
           resourceProviders.known.erase(resourceProviderId);
           resourceProviders.subscribed.erase(resourceProviderId);
 
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ab1467c..43a3ffc 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -582,6 +582,10 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
       reconcileOperations(event.reconcile_operations());
       break;
     }
+    case Event::TEARDOWN: {
+      // TODO(bbannier): Clean up state after teardown.
+      break;
+    }
     case Event::UNKNOWN: {
       LOG(WARNING) << "Received an UNKNOWN event and ignored";
       break;
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 75c5fae..40c63b0 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -2977,6 +2977,22 @@ public:
               Operation,
               Source>::publishDefault));
     EXPECT_CALL(*this, publishResources(_)).WillRepeatedly(DoDefault());
+
+    ON_CALL(*this, teardown())
+      .WillByDefault(Invoke(
+          this,
+          &MockResourceProvider<
+              Event,
+              Call,
+              Driver,
+              ResourceProviderInfo,
+              Resource,
+              Resources,
+              ResourceProviderID,
+              OperationState,
+              Operation,
+              Source>::teardownDefault));
+    EXPECT_CALL(*this, teardown()).WillRepeatedly(DoDefault());
   }
 
   MOCK_METHOD0_T(connected, void());
@@ -2992,6 +3008,7 @@ public:
   MOCK_METHOD1_T(
       reconcileOperations,
       void(const typename Event::ReconcileOperations&));
+  MOCK_METHOD0_T(teardown, void());
 
   void events(std::queue<Event> events)
   {
@@ -3015,6 +3032,9 @@ public:
         case Event::RECONCILE_OPERATIONS:
           reconcileOperations(event.reconcile_operations());
           break;
+        case Event::TEARDOWN:
+          teardown();
+          break;
         case Event::UNKNOWN:
           LOG(FATAL) << "Received unexpected UNKNOWN event";
           break;
@@ -3214,6 +3234,8 @@ public:
     driver->send(call);
   }
 
+  void teardownDefault() {}
+
   ResourceProviderInfo info;
 
 private: