You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/13 22:13:55 UTC

[1/3] mesos git commit: Passed rvalue reference for UpdateSlaveMessage handler.

Repository: mesos
Updated Branches:
  refs/heads/master da9ca553f -> aaf043382


Passed rvalue reference for UpdateSlaveMessage handler.

This patch also fixed a bug where we don't consistently use the mutated
message (POST_RESERVATION_REFINEMENT) consistently in the handler.

This patch is split from https://reviews.apache.org/r/64561.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aaf04338
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aaf04338
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aaf04338

Branch: refs/heads/master
Commit: aaf043382ce06a952bb6c9f499e0c11ec429c5d1
Parents: da71f58
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Dec 13 09:34:51 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 13 14:13:36 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 9 +++------
 src/master/master.hpp | 2 +-
 2 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/aaf04338/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 34ae82d..2f2608f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7218,7 +7218,7 @@ void Master::updateFramework(
 }
 
 
-void Master::updateSlave(const UpdateSlaveMessage& message)
+void Master::updateSlave(UpdateSlaveMessage&& message)
 {
   ++metrics->messages_update_slave;
 
@@ -7255,16 +7255,13 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
 
   Option<Resources> newOversubscribed;
 
-  // Make a copy of the message so we can transform its resources.
-  UpdateSlaveMessage message_(message);
-
   convertResourceFormat(
-      message_.mutable_oversubscribed_resources(),
+      message.mutable_oversubscribed_resources(),
       POST_RESERVATION_REFINEMENT);
 
   if (hasOversubscribed) {
     const Resources& oversubscribedResources =
-      message_.oversubscribed_resources();
+      message.oversubscribed_resources();
 
     LOG(INFO) << "Received update of agent " << *slave << " with total"
               << " oversubscribed resources " << oversubscribedResources;

http://git-wip-us.apache.org/repos/asf/mesos/blob/aaf04338/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 232cc37..2d74ec4 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -499,7 +499,7 @@ public:
       const ExecutorID& executorId,
       int32_t status);
 
-  void updateSlave(const UpdateSlaveMessage& message);
+  void updateSlave(UpdateSlaveMessage&& message);
 
   void updateUnavailability(
       const MachineID& machineId,


[2/3] mesos git commit: Removed resource categories in UpdateSlaveMessage.

Posted by ji...@apache.org.
Removed resource categories in UpdateSlaveMessage.

Given that now we use `UpdateSlaveMessage` to send resource provider
information directly, having resource categories in the message is
unnecessary and misleading.

Instead, this patch introduced a single optional boolean to indicate if
oversubscribed resources need to be updated or not.

Review: https://reviews.apache.org/r/64561


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/da71f588
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/da71f588
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/da71f588

Branch: refs/heads/master
Commit: da71f5881a6b7e06dbc4a64837583f559fb78754
Parents: 00a96a3
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 12 16:53:52 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 13 14:13:36 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp                | 11 ++++----
 src/messages/messages.proto          | 19 +++++---------
 src/slave/slave.cpp                  | 43 +++++++++++--------------------
 src/slave/slave.hpp                  |  8 +++---
 src/tests/oversubscription_tests.cpp | 20 ++++++--------
 5 files changed, 39 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/da71f588/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 806fbc2..34ae82d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7246,13 +7246,12 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
   // updating the agent in the allocator. This would lead us to
   // re-send out the stale oversubscribed resources!
 
-  // If the caller did not specify a resource category we assume we should set
-  // `oversubscribedResources` to be backwards-compatibility with older clients.
+  // If agent does not specify the `update_oversubscribed_resources`
+  // field, we assume we should set `oversubscribedResources` to be
+  // backwards-compatibility with older agents (version < 1.5).
   const bool hasOversubscribed =
-    (message.has_resource_categories() &&
-     message.resource_categories().has_oversubscribed() &&
-     message.resource_categories().oversubscribed()) ||
-    !message.has_resource_categories();
+    !message.has_update_oversubscribed_resources() ||
+     message.update_oversubscribed_resources();
 
   Option<Resources> newOversubscribed;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/da71f588/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index e680cd5..b8eb8fa 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -725,18 +725,13 @@ message UpdateSlaveMessage {
   // TODO(bbannier): Consider passing agent information inside a
   // `ResourceProvider` value as well where applicable.
 
-  // This message can contain `oversubscribed_resources` or resource
-  // providers. Callers are expected to set the `oversubscribed`
-  // category in `resource_categories` to denote whether the
-  // `oversubscribed_resources` field should be examined. For
-  // backwards compatibility we interpret an unset `category` field as
-  // if only oversubscribed was set.
-  message ResourceCategories {
-    optional bool oversubscribed = 2;
-  }
-
-  optional ResourceCategories resource_categories = 5;
-
+  // Whether to update oversubscribed resources or not. If we just use
+  // `oversubscribed_resources`, we don't have a way to tell if the
+  // intention is to update the oversubscribed resoruces to empty, or
+  // leave it unchanged. For backwards compatibility, if this field is
+  // unset (version < 1.5), we treat that as if only
+  // `oversubscribed_resources` was set.
+  optional bool update_oversubscribed_resources = 5;
   repeated Resource oversubscribed_resources = 2;
 
   message OfferOperations {

http://git-wip-us.apache.org/repos/asf/mesos/blob/da71f588/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 15bdd8f..e8f7591 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -6976,17 +6976,10 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
     // Add oversubscribable resources to the total.
     oversubscribed += oversubscribable.get();
 
-    // Remember the previous amount of oversubscribed resources.
-    const Option<Resources> previousOversubscribedResources =
-      oversubscribedResources;
-
-    // Update the estimate.
-    oversubscribedResources = oversubscribed;
-
     // Only forward the estimate if it's different from the previous
     // estimate. We also send this whenever we get (re-)registered
     // (i.e. whenever we transition into the RUNNING state).
-    if (state == RUNNING && previousOversubscribedResources != oversubscribed) {
+    if (state == RUNNING && oversubscribedResources != oversubscribed) {
       LOG(INFO) << "Forwarding total oversubscribed resources "
                 << oversubscribed;
 
@@ -6999,11 +6992,17 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
       // TODO(bbannier): Revisit this if we modify the operations
       // possible on oversubscribed resources.
 
-      UpdateSlaveMessage message = generateOversubscribedUpdate();
+      UpdateSlaveMessage message;
+      message.mutable_slave_id()->CopyFrom(info.id());
+      message.set_update_oversubscribed_resources(true);
+      message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
 
       CHECK_SOME(master);
       send(master.get(), message);
     }
+
+    // Update the estimate.
+    oversubscribedResources = oversubscribed;
   }
 
   delay(flags.oversubscribed_resources_interval,
@@ -7012,22 +7011,6 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 }
 
 
-UpdateSlaveMessage Slave::generateOversubscribedUpdate() const
-{
-  UpdateSlaveMessage message;
-
-  message.mutable_slave_id()->CopyFrom(info.id());
-  message.mutable_resource_categories()->set_oversubscribed(true);
-
-  if (oversubscribedResources.isSome()) {
-    message.mutable_oversubscribed_resources()->CopyFrom(
-        oversubscribedResources.get());
-  }
-
-  return message;
-}
-
-
 UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
 {
   // Agent information (total resources, offer operations, resource
@@ -7037,6 +7020,7 @@ UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
   // TODO(bbannier): Pass agent information as a resource provider.
   UpdateSlaveMessage message;
   message.mutable_slave_id()->CopyFrom(info.id());
+  message.set_update_oversubscribed_resources(false);
   message.set_resource_version_uuid(resourceVersion.toBytes());
   message.mutable_offer_operations();
 
@@ -7076,10 +7060,13 @@ UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
 
 UpdateSlaveMessage Slave::generateUpdateSlaveMessage() const
 {
-  UpdateSlaveMessage message;
+  UpdateSlaveMessage message = generateResourceProviderUpdate();
 
-  message.MergeFrom(generateResourceProviderUpdate());
-  message.MergeFrom(generateOversubscribedUpdate());
+  if (oversubscribedResources.isSome()) {
+    message.set_update_oversubscribed_resources(true);
+    message.mutable_oversubscribed_resources()->CopyFrom(
+        oversubscribedResources.get());
+  }
 
   return message;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/da71f588/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 7c40fc7..de2b2e2 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -556,10 +556,10 @@ private:
   void _forwardOversubscribed(
       const process::Future<Resources>& oversubscribable);
 
-  // Helper functions to generate `UpdateSlaveMessage` for either just
-  // updates to oversubscribed resources, resource provider-related
-  // information, or both.
-  UpdateSlaveMessage generateOversubscribedUpdate() const;
+  // Helper functions to generate `UpdateSlaveMessage` for either
+  // just updates to resource provider-related information, or both
+  // resource provider-related information and oversubscribed
+  // resources.
   UpdateSlaveMessage generateResourceProviderUpdate() const;
   UpdateSlaveMessage generateUpdateSlaveMessage() const;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/da71f588/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 3f57ce1..e6139b0 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -323,9 +323,8 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
 
   AWAIT_READY(update);
 
-  EXPECT_TRUE(update->has_resource_categories());
-  EXPECT_TRUE(update->resource_categories().has_oversubscribed());
-  EXPECT_TRUE(update->resource_categories().oversubscribed());
+  EXPECT_TRUE(update->has_update_oversubscribed_resources());
+  EXPECT_TRUE(update->update_oversubscribed_resources());
   EXPECT_EQ(update->oversubscribed_resources(), resources);
 
   // Ensure the metric is updated.
@@ -698,9 +697,8 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
   Clock::settle();
 
   AWAIT_READY(update);
-  ASSERT_TRUE(update->has_resource_categories());
-  ASSERT_TRUE(update->resource_categories().has_oversubscribed());
-  ASSERT_TRUE(update->resource_categories().oversubscribed());
+  ASSERT_TRUE(update->has_update_oversubscribed_resources());
+  ASSERT_TRUE(update->update_oversubscribed_resources());
 
   Resources resources = update->oversubscribed_resources();
   EXPECT_SOME_EQ(2.0, resources.cpus());
@@ -902,9 +900,8 @@ TEST_F(OversubscriptionTest, Reregistration)
   Clock::advance(agentFlags.registration_backoff_factor);
   AWAIT_READY(slaveRegistered);
   AWAIT_READY(update);
-  ASSERT_TRUE(update->has_resource_categories());
-  ASSERT_TRUE(update->resource_categories().has_oversubscribed());
-  ASSERT_TRUE(update->resource_categories().oversubscribed());
+  ASSERT_TRUE(update->has_update_oversubscribed_resources());
+  ASSERT_TRUE(update->update_oversubscribed_resources());
 
   Resources resources = update->oversubscribed_resources();
   EXPECT_SOME_EQ(2.0, resources.cpus());
@@ -921,9 +918,8 @@ TEST_F(OversubscriptionTest, Reregistration)
   Clock::advance(agentFlags.registration_backoff_factor);
   AWAIT_READY(slaveReregistered);
   AWAIT_READY(update);
-  EXPECT_TRUE(update->has_resource_categories());
-  EXPECT_TRUE(update->resource_categories().has_oversubscribed());
-  EXPECT_TRUE(update->resource_categories().oversubscribed());
+  EXPECT_TRUE(update->has_update_oversubscribed_resources());
+  EXPECT_TRUE(update->update_oversubscribed_resources());
 }
 
 


[3/3] mesos git commit: Handled the RP disconnection case in the agent.

Posted by ji...@apache.org.
Handled the RP disconnection case in the agent.

If an RP is disconnected, we'll shrink its total resources to zero so
that no offer will be made on this RP until it reconnects. This prevents
frameworks from sending operations to the disconnected RP.

Review: https://reviews.apache.org/r/64557


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00a96a3c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00a96a3c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00a96a3c

Branch: refs/heads/master
Commit: 00a96a3c874c2ace0424295c11c8ece3124b6bfc
Parents: da9ca55
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 12 14:52:33 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 13 14:13:36 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             |  8 ++
 src/resource_provider/message.hpp             | 20 ++++-
 src/slave/slave.cpp                           | 50 +++++++++++
 src/tests/resource_provider_manager_tests.cpp | 98 ++++++++++++++++++++++
 4 files changed, 175 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/00a96a3c/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index fd138b9..046dba7 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -609,6 +609,14 @@ void ResourceProviderManagerProcess::subscribe(
       // NOTE: All pending futures of publish requests for the resource
       // provider will become failed.
       resourceProviders.subscribed.erase(resourceProviderId);
+
+      ResourceProviderMessage::Disconnect disconnect{resourceProviderId};
+
+      ResourceProviderMessage message;
+      message.type = ResourceProviderMessage::Type::DISCONNECT;
+      message.disconnect = std::move(disconnect);
+
+      messages.put(std::move(message));
     }));
 
   // TODO(jieyu): Start heartbeat for the resource provider.

http://git-wip-us.apache.org/repos/asf/mesos/blob/00a96a3c/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index eab90cf..137554a 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -41,7 +41,8 @@ struct ResourceProviderMessage
   enum class Type
   {
     UPDATE_STATE,
-    UPDATE_OFFER_OPERATION_STATUS
+    UPDATE_OFFER_OPERATION_STATUS,
+    DISCONNECT
   };
 
   struct UpdateState
@@ -57,10 +58,16 @@ struct ResourceProviderMessage
     OfferOperationStatusUpdate update;
   };
 
+  struct Disconnect
+  {
+    ResourceProviderID resourceProviderId;
+  };
+
   Type type;
 
   Option<UpdateState> updateState;
   Option<UpdateOfferOperationStatus> updateOfferOperationStatus;
+  Option<Disconnect> disconnect;
 };
 
 
@@ -101,6 +108,17 @@ inline std::ostream& operator<<(
           << ", status update state: "
           << updateOfferOperationStatus->update.status().state() << ")";
     }
+
+    case ResourceProviderMessage::Type::DISCONNECT: {
+      const Option<ResourceProviderMessage::Disconnect>& disconnect =
+        resourceProviderMessage.disconnect;
+
+      CHECK_SOME(disconnect);
+
+      return stream
+          << "DISCONNECT: resource provider "
+          << disconnect->resourceProviderId;
+    }
   }
 
   UNREACHABLE();

http://git-wip-us.apache.org/repos/asf/mesos/blob/00a96a3c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d997b42..15bdd8f 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7281,6 +7281,56 @@ void Slave::handleResourceProviderMessage(
           break;
         }
       }
+      break;
+    }
+    case ResourceProviderMessage::Type::DISCONNECT: {
+      CHECK_SOME(message->disconnect);
+
+      const ResourceProviderID& resourceProviderId =
+        message->disconnect->resourceProviderId;
+
+      ResourceProvider* resourceProvider =
+        getResourceProvider(resourceProviderId);
+
+      if (resourceProvider == nullptr) {
+        LOG(ERROR) << "Failed to find the disconnected resource provider "
+                   << resourceProviderId << ", ignoring the message";
+        break;
+      }
+
+      // A disconnected resource provider effectively results in its
+      // total resources to be set to empty. This will cause offers to
+      // be rescinded so that no operation or task can be launched
+      // using resources from the disconnected resource provider. If
+      // later, the resource provider reconnects, it'll result in
+      // an `UpdateSlaveMessage` so that its resources can be offered
+      // again by the master.
+      CHECK(totalResources.contains(resourceProvider->totalResources));
+      totalResources -= resourceProvider->totalResources;
+
+      resourceProvider->totalResources = Resources();
+
+      // Send the updated resources to the master if the agent is running. Note
+      // that since we have already updated our copy of the latest resource
+      // provider resources, it is safe to consume this message and wait for the
+      // next one; even if we do not send the update to the master right now, an
+      // update will be send once the agent reregisters.
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case TERMINATING: {
+          break;
+        }
+        case RUNNING: {
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
+
+          // Inform the master about the update from the resource provider.
+          send(master.get(), generateResourceProviderUpdate());
+
+          break;
+        }
+      }
+      break;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/00a96a3c/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index e37a53a..41fc4ea 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -1173,6 +1173,104 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
   EXPECT_EQ(resourceProviderId, subscribed2->provider_id());
 }
 
+
+// This test verifies that a disconnected resource provider will
+// result in an `UpdateSlaveMessage` to be sent to the master and the
+// total resources of the disconnected resource provider will be
+// reduced to empty.
+TEST_P(ResourceProviderManagerHttpApiTest, ResourceProviderDisconnect)
+{
+  Clock::pause();
+
+  // Start master and agent.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(agent);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::settle();
+
+  AWAIT_READY(updateSlaveMessage);
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  v1::Resource disk = v1::createDiskResource(
+      "200", "*", None(), None(), v1::createDiskSourceRaw());
+
+  Owned<v1::MockResourceProvider> resourceProvider(
+      new v1::MockResourceProvider(
+          resourceProviderInfo,
+          v1::Resources(disk)));
+
+  // Start and register a resource provider.
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  http::URL url(
+      scheme,
+      agent.get()->pid.address.ip,
+      agent.get()->pid.address.port,
+      agent.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  const ContentType contentType = GetParam();
+
+  resourceProvider->start(
+      endpointDetector,
+      contentType,
+      v1::DEFAULT_CREDENTIAL);
+
+  {
+    // Wait until the agent's resources have been updated to include
+    // the resource provider resources. At this point the resource
+    // provider will have an ID assigned by the agent.
+    AWAIT_READY(updateSlaveMessage);
+
+    ASSERT_TRUE(resourceProvider->info.has_id());
+    disk.mutable_provider_id()->CopyFrom(resourceProvider->info.id());
+
+    const Resources& totalResources =
+      updateSlaveMessage->resource_providers().providers(0).total_resources();
+
+    EXPECT_TRUE(totalResources.contains(devolve(disk)));
+  }
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // Simulate a resource provider disconnection.
+  resourceProvider.reset();
+
+  {
+    AWAIT_READY(updateSlaveMessage);
+    ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+    ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+
+    const Resources& totalResources =
+      updateSlaveMessage->resource_providers().providers(0).total_resources();
+
+    EXPECT_FALSE(totalResources.contains(devolve(disk)));
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {