You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/01/18 10:02:46 UTC

[mesos] branch master updated (97703be -> 3de5efb)

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 97703be  Moved logging of resource provider HTTP actions before authorization.
     new c5a11ce  Added agent support to remove local resource providers.
     new 3de5efb  Added an integration test for resource provider removal.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/mesos/agent/agent.proto    |   9 ++
 include/mesos/v1/agent/agent.proto |   9 ++
 src/common/protobuf_utils.cpp      |  12 +-
 src/master/master.cpp              |   6 +-
 src/slave/http.cpp                 |  35 ++++++
 src/slave/http.hpp                 |   4 +
 src/slave/slave.cpp                | 117 +++++++++++++++++-
 src/slave/slave.hpp                |   3 +
 src/slave/validation.cpp           |   8 ++
 src/tests/api_tests.cpp            | 133 ++++++++++++++++++++
 src/tests/slave_tests.cpp          | 247 +++++++++++++++++++++++++++++++++++++
 11 files changed, 571 insertions(+), 12 deletions(-)


[mesos] 02/02: Added an integration test for resource provider removal.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3de5efba936c8b7bd1bf88c2fd05006a93271b73
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Fri Jan 18 09:39:52 2019 +0100

    Added an integration test for resource provider removal.
    
    Review: https://reviews.apache.org/r/69158/
---
 src/tests/slave_tests.cpp | 247 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 247 insertions(+)

diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index dc711de..9168e06 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -142,6 +142,7 @@ using testing::_;
 using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
+using testing::Exactly;
 using testing::StrEq;
 using testing::Invoke;
 using testing::InvokeWithoutArgs;
@@ -10700,6 +10701,252 @@ TEST_F(SlaveTest, ResourceProviderPublishAll)
 }
 
 
+// This test checks that a resource provider gets properly disconnected when
+// being marked gone, and is not able to reconnect. We expect pending operations
+// to be transition to the correct terminal status.
+TEST_F(SlaveTest, RemoveResourceProvider)
+{
+  Clock::pause();
+
+  // Start an agent and a master.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  AWAIT_READY(updateSlaveMessage);
+
+  v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  // Register a local resource provider with the agent.
+  v1::Resource disk = v1::createDiskResource(
+      "200",
+      "storage",
+      None(),
+      None(),
+      v1::createDiskSourceRaw(None(), "profile"));
+
+  v1::MockResourceProvider resourceProvider(resourceProviderInfo, disk);
+
+  Owned<EndpointDetector> endpointDetector(
+      resource_provider::createEndpointDetector(slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  resourceProvider.start(std::move(endpointDetector), ContentType::PROTOBUF);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Register a framework and perform a pending operations on the
+  // resource provider resources.
+  v1::FrameworkInfo framework = v1::DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, disk.reservations(0).role());
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(framework));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  // Since the resources from the resource provider have already reached the
+  // master at this point, the framework will be offered resource provider
+  // resources.
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+  const v1::Offer& offer = offers->offers(0);
+
+  Option<v1::Resource> rawDisk;
+  foreach (const v1::Resource& resource, offer.resources()) {
+    if (resource.has_provider_id() && resource.has_disk() &&
+        resource.disk().has_source() &&
+        resource.disk().source().type() ==
+          v1::Resource::DiskInfo::Source::RAW) {
+      rawDisk = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(rawDisk);
+
+  // Create a pending operation.
+  Future<v1::resource_provider::Event::ApplyOperation> applyOperation;
+  EXPECT_CALL(resourceProvider, applyOperation(_))
+    .WillOnce(FutureArg<0>(&applyOperation));
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId,
+      offer,
+      {v1::CREATE_DISK(
+          rawDisk.get(),
+          v1::Resource::DiskInfo::Source::MOUNT,
+          None(),
+          operationId)}));
+
+  // Wait for the operation to reach the resource provider.
+  AWAIT_READY(applyOperation);
+
+  // A resource provider cannot be removed while it still has resources.
+  ASSERT_TRUE(resourceProvider.info.has_id());
+
+  const mesos::v1::ResourceProviderID& resourceProviderId =
+    resourceProvider.info.id();
+
+  v1::agent::Call v1Call;
+  v1Call.set_type(v1::agent::Call::MARK_RESOURCE_PROVIDER_GONE);
+  v1Call.mutable_mark_resource_provider_gone()
+    ->mutable_resource_provider_id()
+    ->CopyFrom(resourceProviderId);
+
+  constexpr ContentType contentType = ContentType::PROTOBUF;
+  process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  headers["Accept"] = stringify(contentType);
+
+  Future<process::http::Response> response = process::http::post(
+    slave.get()->pid,
+    "api/v1",
+    headers,
+    serialize(contentType, v1Call),
+    stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(InternalServerError().status, response);
+
+  // Remove all resources on the resource provider.
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  {
+    using mesos::v1::resource_provider::Call;
+    using mesos::v1::resource_provider::Event;
+
+    Call call;
+    call.set_type(Call::UPDATE_STATE);
+    call.mutable_resource_provider_id()->CopyFrom(resourceProvider.info.id());
+
+    Call::UpdateState* update = call.mutable_update_state();
+    update->mutable_resources()->Clear();
+    update->mutable_resource_version_uuid()->set_value(
+        id::UUID::random().toBytes());
+
+    // Still report the operation. This allows us to send an operation
+    // status update later on.
+    mesos::v1::Operation* operation = update->add_operations();
+
+    ASSERT_TRUE(applyOperation->has_framework_id());
+    operation->mutable_framework_id()->CopyFrom(applyOperation->framework_id());
+
+    operation->mutable_info()->CopyFrom(applyOperation->info());
+    operation->mutable_uuid()->CopyFrom(applyOperation->operation_uuid());
+    operation->mutable_latest_status()->CopyFrom(
+        evolve(protobuf::createOperationStatus(OPERATION_PENDING)));
+
+    operation->add_statuses()->CopyFrom(operation->latest_status());
+
+    AWAIT_READY(resourceProvider.send(call));
+  }
+
+  // Once the master has seen that there is no resource managed
+  // by the resource provider it can be removed successfully.
+  AWAIT_READY(updateSlaveMessage);
+
+  Future<v1::scheduler::Event::UpdateOperationStatus> updateOperationStatus;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&updateOperationStatus));
+
+  // The agent will eventually update the master on its resources
+  // after the resource provider disconnected.
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // The resource provider will receive a TEARDOWN event on being marked gone.
+  Future<Nothing> teardown;
+  EXPECT_CALL(resourceProvider, teardown())
+    .WillOnce(FutureSatisfy(&teardown));
+
+  // We expect at least two disconnection events, one initially when the
+  // connected resource provider gets removed, and when the automatic attempt
+  // to resubscribe fails and leads the remote to close the connection.
+  Future<Nothing> disconnected;
+  EXPECT_CALL(resourceProvider, disconnected())
+    .WillOnce(DoDefault())
+    .WillOnce(FutureSatisfy(&disconnected))
+    .WillRepeatedly(Return()); // Ignore additional ddisconnection events.
+
+  // The resource provider will automatically attempt to reconnect.
+  Future<Nothing> connected;
+  EXPECT_CALL(resourceProvider, connected())
+    .WillOnce(DoDefault())
+    .WillRepeatedly(Return());
+
+  // The resource provider should never successfully resubscribe.
+  EXPECT_CALL(resourceProvider, subscribed(_))
+    .Times(Exactly(0));
+
+  response = process::http::post(
+      slave.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  AWAIT_READY(teardown);
+
+  // On resource provider removal the framework should have received
+  // an operation status update.
+  AWAIT_READY(updateOperationStatus);
+
+  EXPECT_EQ(
+    v1::OperationState::OPERATION_GONE_BY_OPERATOR,
+    updateOperationStatus->status().state());
+
+  // The status update should be generated by the agent and have no
+  // associated resource provider ID.
+  EXPECT_TRUE(updateOperationStatus->status().has_agent_id());
+  EXPECT_FALSE(updateOperationStatus->status().has_resource_provider_id());
+
+  // The agent should also report a change to its resources.
+  AWAIT_READY(updateSlaveMessage);
+
+  // Once we have seen the second disconnection event we know that the
+  // attempt to resubscribe was unsuccessful.
+  AWAIT_READY(disconnected);
+
+  // Settle the clock to ensure no more `subscribed` calls to the
+  // resource provider are enqueued.
+  Clock::settle();
+}
+
+
 // This test checks that the agent correctly updates and sends
 // resource version values when it registers or reregisters.
 TEST_F(SlaveTest, ResourceVersions)


[mesos] 01/02: Added agent support to remove local resource providers.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit c5a11ce7adcad2f024b64d5cfc678ac678e1d5df
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Fri Jan 18 09:39:43 2019 +0100

    Added agent support to remove local resource providers.
    
    This patch adds support for triggering permanent removal of local
    resource providers. We also add authorization and tests as part of this
    patch.
    
    Review: https://reviews.apache.org/r/68147/
---
 include/mesos/agent/agent.proto    |   9 +++
 include/mesos/v1/agent/agent.proto |   9 +++
 src/common/protobuf_utils.cpp      |  12 ++--
 src/master/master.cpp              |   6 +-
 src/slave/http.cpp                 |  35 ++++++++++
 src/slave/http.hpp                 |   4 ++
 src/slave/slave.cpp                | 117 +++++++++++++++++++++++++++++++-
 src/slave/slave.hpp                |   3 +
 src/slave/validation.cpp           |   8 +++
 src/tests/api_tests.cpp            | 133 +++++++++++++++++++++++++++++++++++++
 10 files changed, 324 insertions(+), 12 deletions(-)

diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 74488e8..ff408a4 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -97,6 +97,8 @@ message Call {
     UPDATE_RESOURCE_PROVIDER_CONFIG = 28; // See 'UpdateResourceProviderConfig' below. // NOLINT
     REMOVE_RESOURCE_PROVIDER_CONFIG = 29; // See 'RemoveResourceProviderConfig' below. // NOLINT
 
+    MARK_RESOURCE_PROVIDER_GONE = 32; // See 'MarkResourceProviderGone' below.
+
     // Prune unused container images.
     PRUNE_IMAGES = 30;
   }
@@ -363,6 +365,11 @@ message Call {
     required string name = 2;
   }
 
+  // Mark a resource provider as gone.
+  message MarkResourceProviderGone {
+    required ResourceProviderID resource_provider_id = 1;
+  }
+
   // Prune unused container images from image store.
   //
   // Images and layers referenced by active containers as well as
@@ -400,6 +407,8 @@ message Call {
   optional UpdateResourceProviderConfig update_resource_provider_config = 18;
   optional RemoveResourceProviderConfig remove_resource_provider_config = 19;
 
+  optional MarkResourceProviderGone mark_resource_provider_gone = 22;
+
   optional PruneImages prune_images = 21;
 }
 
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index 5d1ab6f..19d6c42 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -97,6 +97,8 @@ message Call {
     UPDATE_RESOURCE_PROVIDER_CONFIG = 28; // See 'UpdateResourceProviderConfig' below. // NOLINT
     REMOVE_RESOURCE_PROVIDER_CONFIG = 29; // See 'RemoveResourceProviderConfig' below. // NOLINT
 
+    MARK_RESOURCE_PROVIDER_GONE = 32; // See 'MarkResourceProviderGone' below.
+
     // Prune unused container images.
     PRUNE_IMAGES = 30;
   }
@@ -363,6 +365,11 @@ message Call {
     required string name = 2;
   }
 
+  // Mark a resource provider as gone.
+  message MarkResourceProviderGone {
+    required ResourceProviderID resource_provider_id = 1;
+  }
+
   // Prune unused container images from image store.
   //
   // Images and layers referenced by active containers as well as
@@ -400,6 +407,8 @@ message Call {
   optional UpdateResourceProviderConfig update_resource_provider_config = 18;
   optional RemoveResourceProviderConfig remove_resource_provider_config = 19;
 
+  optional MarkResourceProviderGone mark_resource_provider_gone = 22;
+
   optional PruneImages prune_images = 21;
 }
 
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index a0159fe..9a03c35 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -421,17 +421,17 @@ Option<ContainerStatus> getTaskContainerStatus(const Task& task)
 bool isTerminalState(const OperationState& state)
 {
   switch (state) {
-    case OPERATION_FINISHED:
-    case OPERATION_FAILED:
-    case OPERATION_ERROR:
     case OPERATION_DROPPED:
+    case OPERATION_ERROR:
+    case OPERATION_FAILED:
+    case OPERATION_FINISHED:
+    case OPERATION_GONE_BY_OPERATOR:
       return true;
-    case OPERATION_UNSUPPORTED:
     case OPERATION_PENDING:
-    case OPERATION_UNREACHABLE:
-    case OPERATION_GONE_BY_OPERATOR:
     case OPERATION_RECOVERING:
     case OPERATION_UNKNOWN:
+    case OPERATION_UNREACHABLE:
+    case OPERATION_UNSUPPORTED:
       return false;
   }
 
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7c8d3ce..2e0a0de 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -11460,9 +11460,10 @@ void Master::updateOperation(
     }
 
     // Terminal state, and the conversion has failed.
-    case OPERATION_FAILED:
+    case OPERATION_DROPPED:
     case OPERATION_ERROR:
-    case OPERATION_DROPPED: {
+    case OPERATION_FAILED:
+    case OPERATION_GONE_BY_OPERATOR: {
       allocator->recoverResources(
           operation->framework_id(),
           operation->slave_id(),
@@ -11476,7 +11477,6 @@ void Master::updateOperation(
     case OPERATION_UNSUPPORTED:
     case OPERATION_PENDING:
     case OPERATION_UNREACHABLE:
-    case OPERATION_GONE_BY_OPERATOR:
     case OPERATION_RECOVERING:
     case OPERATION_UNKNOWN: {
       LOG(FATAL) << "Unexpected operation state "
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4b48756..d66ae52 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -104,6 +104,7 @@ using mesos::authorization::KILL_STANDALONE_CONTAINER;
 using mesos::authorization::REMOVE_NESTED_CONTAINER;
 using mesos::authorization::REMOVE_STANDALONE_CONTAINER;
 using mesos::authorization::MODIFY_RESOURCE_PROVIDER_CONFIG;
+using mesos::authorization::MARK_RESOURCE_PROVIDER_GONE;
 using mesos::authorization::PRUNE_IMAGES;
 
 using mesos::internal::recordio::Reader;
@@ -655,6 +656,9 @@ Future<Response> Http::_api(
     case mesos::agent::Call::REMOVE_RESOURCE_PROVIDER_CONFIG:
       return removeResourceProviderConfig(call, principal);
 
+    case mesos::agent::Call::MARK_RESOURCE_PROVIDER_GONE:
+      return markResourceProviderGone(call, principal);
+
     case mesos::agent::Call::PRUNE_IMAGES:
       return pruneImages(call, mediaTypes.accept, principal);
   }
@@ -3325,6 +3329,37 @@ Future<Response> Http::removeResourceProviderConfig(
 }
 
 
+Future<Response> Http::markResourceProviderGone(
+    const mesos::agent::Call& call,
+    const Option<Principal>& principal) const
+{
+  CHECK_EQ(mesos::agent::Call::MARK_RESOURCE_PROVIDER_GONE, call.type());
+  CHECK(call.has_mark_resource_provider_gone());
+
+  const ResourceProviderID& resourceProviderId =
+    call.mark_resource_provider_gone().resource_provider_id();
+
+  LOG(INFO)
+    << "Processing MARK_RESOURCE_PROVIDER_GONE for resource provider "
+    << resourceProviderId;
+
+  return ObjectApprovers::create(
+      slave->authorizer, principal, {MARK_RESOURCE_PROVIDER_GONE})
+    .then(defer(
+        slave->self(),
+        [this, resourceProviderId](
+            const Owned<ObjectApprovers>& approvers) -> Future<Response> {
+          if (!approvers->approved<MARK_RESOURCE_PROVIDER_GONE>()) {
+            return Forbidden();
+          }
+
+          return slave->markResourceProviderGone(resourceProviderId)
+            .then([](const Future<Nothing>&) -> Future<Response> {
+              return OK();
+            });
+        }));
+}
+
 // Helper that reads data from `writer` and writes to `reader`.
 // Returns a failed future if there are any errors reading or writing.
 // The future is satisfied when we get a EOF.
diff --git a/src/slave/http.hpp b/src/slave/http.hpp
index 5b113fa..b8c83f1 100644
--- a/src/slave/http.hpp
+++ b/src/slave/http.hpp
@@ -350,6 +350,10 @@ private:
       const mesos::agent::Call& call,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  process::Future<process::http::Response> markResourceProviderGone(
+      const mesos::agent::Call& call,
+      const Option<process::http::authentication::Principal>& principal) const;
+
   process::Future<process::http::Response> pruneImages(
       const mesos::agent::Call& call,
       ContentType acceptType,
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 10cbc19..ed92f67 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -8000,8 +8000,98 @@ void Slave::handleResourceProviderMessage(
       const ResourceProviderID& resourceProviderId =
         message->remove->resourceProviderId;
 
+      if (!resourceProviders.contains(resourceProviderId)) {
+        break;
+      }
+
+      const ResourceProvider* resourceProvider =
+        resourceProviders.at(resourceProviderId);
+
+      CHECK_NOTNULL(resourceProvider);
+
+      // Transition all non-terminal operations on the resource provider to a
+      // terminal state.
+      //
+      // NOTE: We operate on a copy of the operations container since we trigger
+      // removal of current operation in below loop. This invalidates the loop
+      // iterator so it cannot be safely incremented after the loop body.
+      const hashmap<UUID, Operation*> operations = resourceProvider->operations;
+      foreachpair (const UUID& uuid, Operation * operation, operations) {
+        CHECK_NOTNULL(operation);
+
+        if (protobuf::isTerminalState(operation->latest_status().state())) {
+          continue;
+        }
+
+        // The operation might be from an operator API call, thus the framework
+        // ID here is optional.
+        Option<FrameworkID> frameworkId =
+          operation->has_framework_id()
+            ? operation->framework_id()
+            : Option<FrameworkID>::none();
+
+        Option<OperationID> operationId =
+          operation->info().has_id()
+            ? operation->info().id()
+            : Option<OperationID>::none();
+
+        UpdateOperationStatusMessage update =
+          protobuf::createUpdateOperationStatusMessage(
+              uuid,
+              protobuf::createOperationStatus(
+                  OPERATION_GONE_BY_OPERATOR,
+                  operationId,
+                  "The resource provider was removed before a terminal "
+                  "operation status update was received",
+                  None(),
+                  None(),
+                  info.id()),
+              None(),
+              frameworkId);
+
+        updateOperation(operation, update);
+
+        removeOperation(operation);
+
+        // Forward the operation status update to the master.
+        //
+        // The status update from the resource provider does not
+        // provide the agent ID (because the resource provider doesn't
+        // know it), so we inject it here.
+        UpdateOperationStatusMessage _update;
+        _update.CopyFrom(update);
+        _update.mutable_slave_id()->CopyFrom(info.id());
+        send(master.get(), _update);
+      };
+
+      // TODO(bbannier): Consider transitioning all tasks using resources from
+      // this resource provider to e.g., `TASK_GONE_BY_OPERATOR` and terminating
+      // them.
+
+      // Remove the resources of the resource provider from the agent's total.
+      // This needs to be done after triggering the operation status update so
+      // that master does not receive a operations status update for an unknown
+      // operation (gone from `UpdateSlaveMessage`).
+      totalResources -= resourceProvider->totalResources;
+
       resourceProviders.erase(resourceProviderId);
 
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case TERMINATING: {
+          break;
+        }
+        case RUNNING: {
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
+
+          // Inform the master about the updated resources.
+          send(master.get(), generateResourceProviderUpdate());
+
+          break;
+        }
+      }
+
       LOG(INFO) << "Removed resource provider '" << resourceProviderId << "'";
       break;
     }
@@ -8112,9 +8202,10 @@ void Slave::updateOperation(
     }
 
     // Terminal state, and the conversion has failed.
-    case OPERATION_FAILED:
+    case OPERATION_DROPPED:
     case OPERATION_ERROR:
-    case OPERATION_DROPPED: {
+    case OPERATION_FAILED:
+    case OPERATION_GONE_BY_OPERATOR: {
       break;
     }
 
@@ -8122,7 +8213,6 @@ void Slave::updateOperation(
     case OPERATION_UNSUPPORTED:
     case OPERATION_PENDING:
     case OPERATION_UNREACHABLE:
-    case OPERATION_GONE_BY_OPERATOR:
     case OPERATION_RECOVERING:
     case OPERATION_UNKNOWN: {
       LOG(FATAL)
@@ -8201,6 +8291,27 @@ ResourceProvider* Slave::getResourceProvider(const ResourceProviderID& id) const
 }
 
 
+Future<Nothing> Slave::markResourceProviderGone(
+    const ResourceProviderID& resourceProviderId) const
+{
+  auto message = [&resourceProviderId](const string& reason) {
+    return
+      "Could not mark resource provider '" + stringify(resourceProviderId) +
+      "' as gone: " + reason;
+  };
+
+  if (!resourceProviderManager.get()) {
+    return Failure(message("Agent has not registered yet"));
+  }
+
+  if (resourceProviders.contains(resourceProviderId) &&
+      !resourceProviders.at(resourceProviderId)->totalResources.empty()) {
+    return Failure(message("Resource provider has resources"));
+  }
+
+  return resourceProviderManager->removeResourceProvider(resourceProviderId);
+}
+
 void Slave::apply(Operation* operation)
 {
   vector<ResourceConversion> conversions;
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 2eadf5f..2bcd7a9 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -661,6 +661,9 @@ private:
 
   void removeOperation(Operation* operation);
 
+  process::Future<Nothing> markResourceProviderGone(
+      const ResourceProviderID& resourceProviderId) const;
+
   Operation* getOperation(const UUID& uuid) const;
 
   void addResourceProvider(ResourceProvider* resourceProvider);
diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp
index df5e137..99b17c9 100644
--- a/src/slave/validation.cpp
+++ b/src/slave/validation.cpp
@@ -533,6 +533,14 @@ Option<Error> validate(
       return None();
     }
 
+    case mesos::agent::Call::MARK_RESOURCE_PROVIDER_GONE: {
+      if (!call.has_mark_resource_provider_gone()) {
+        return Error("Expecting 'mark_resource_provider_gone' to be present");
+      }
+
+      return None();
+    }
+
     case mesos::agent::Call::PRUNE_IMAGES: {
       return None();
     }
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 76ec56b..12ecea1 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -8334,6 +8334,139 @@ TEST_P(AgentAPITest, GetResourceProviders)
 }
 
 
+TEST_P(AgentAPITest, MarkResourceProviderGone)
+{
+  Clock::pause();
+
+  const ContentType contentType = GetParam();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.authenticate_http_readwrite = true;
+
+  {
+    // Default principal 2 is not allowed to mark any resource provider gone.
+    mesos::ACL::MarkResourceProvidersGone* acl =
+      slaveFlags.acls->add_mark_resource_providers_gone();
+    acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL_2.principal());
+    acl->mutable_resource_providers()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  mesos::v1::ResourceProviderInfo info;
+  info.set_type("org.apache.mesos.rp.test");
+  info.set_name("test");
+
+  // Start a resource provider without resources since resource
+  // providers with resources cannot be marked gone.
+  v1::MockResourceProvider resourceProvider(info, v1::Resource());
+
+  // Start and register a resource provider.
+  Owned<EndpointDetector> endpointDetector(
+      resource_provider::createEndpointDetector(slave.get()->pid));
+
+  Future<mesos::v1::resource_provider::Event::Subscribed> subscribed;
+
+  EXPECT_CALL(resourceProvider, subscribed(_))
+    .WillOnce(DoAll(
+        Invoke(&resourceProvider, &v1::MockResourceProvider::subscribedDefault),
+        FutureArg<0>(&subscribed)))
+    .WillRepeatedly(Return());
+
+  // After the resource provider it will update its resources which
+  // triggers an `UpdateSlaveMessage`.
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  resourceProvider.start(std::move(endpointDetector), contentType);
+
+  AWAIT_READY(subscribed);
+  AWAIT_READY(updateSlaveMessage);
+
+  const mesos::v1::ResourceProviderID& resourceProviderId =
+    subscribed->provider_id();
+
+  // Removing the resource provider should trigger an `UpdateSlaveMessage`.
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // Mark the resource provider gone.
+  {
+    // We explicitly track whether the resource provider was disconnected as it
+    // indicates whether the agent has cleaned up the connection to the
+    // provider.
+    //
+    // NOTE: As the resource provider driver will try to resubscribe if the
+    // connection is broken will does not succeed, we might observe other
+    // `disconnected` events.
+    Future<Nothing> disconnected;
+    EXPECT_CALL(resourceProvider, disconnected())
+      .WillOnce(FutureSatisfy(&disconnected))
+      .WillRepeatedly(Return());
+
+    v1::agent::Call v1Call;
+    v1Call.set_type(v1::agent::Call::MARK_RESOURCE_PROVIDER_GONE);
+    v1Call.mutable_mark_resource_provider_gone()
+      ->mutable_resource_provider_id()
+      ->CopyFrom(resourceProviderId);
+
+    // `DEFAULT_CREDENTIAL_2` is not able to mark the resource provider as gone.
+    http::Headers headers2 = createBasicAuthHeaders(DEFAULT_CREDENTIAL_2);
+    headers2["Accept"] = stringify(contentType);
+
+    Future<http::Response> v1Response = http::post(
+        slave.get()->pid,
+        "api/v1",
+        headers2,
+        serialize(contentType, v1Call),
+        stringify(contentType));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, v1Response);
+
+    // Other principals are able to remove the resource provider.
+    http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(contentType);
+
+    v1Response = http::post(
+        slave.get()->pid,
+        "api/v1",
+        headers,
+        serialize(contentType, v1Call),
+        stringify(contentType));
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, v1Response);
+
+    AWAIT_READY(disconnected);
+  }
+
+  // Verify that resource provider is not be there anymore.
+  {
+    AWAIT_READY(updateSlaveMessage);
+
+    v1::agent::Call v1Call;
+    v1Call.set_type(v1::agent::Call::GET_RESOURCE_PROVIDERS);
+
+    auto v1Response = post(slave.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::agent::Response::GET_RESOURCE_PROVIDERS, v1Response->type());
+
+    EXPECT_TRUE(
+        v1Response->get_resource_providers().resource_providers().empty());
+  }
+}
+
+
 TEST_P(AgentAPITest, GetOperations)
 {
   Clock::pause();