You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/12/08 16:02:21 UTC

[3/5] mesos git commit: Explicitly passed resource-provider information in 'UpdateSlaveMessage'.

Explicitly passed resource-provider information in 'UpdateSlaveMessage'.

This patch changes the way resource-provider related information is
passed. Instead of aggregating all information from both the agent and
resource providers into global per-agent lists in
'UpdateSlaveMessage', with this patch we pass resource-provider
related information explicitly. We can in a subsequent patch surface
this information in e.g., the operator API.

Review: https://reviews.apache.org/r/64423


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7cfa0e92
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7cfa0e92
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7cfa0e92

Branch: refs/heads/master
Commit: 7cfa0e9206647ddd6217cd69090f5eb328a73529
Parents: b5d6441
Author: Benjamin Bannier <bb...@apache.org>
Authored: Thu Dec 7 11:05:05 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:41 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp     | 151 ++++++++++++++++----------
 src/master/master.hpp     |   1 +
 src/slave/slave.cpp       | 238 ++++++++++++++++++++++-------------------
 src/slave/slave.hpp       |   7 ++
 src/tests/slave_tests.cpp |  66 +++++-------
 5 files changed, 263 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5cba506..8cf699b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7241,9 +7241,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newTotal = totalResources;
   }
 
-  // Since `total` always overwrites an existing total, we apply
-  // `oversubscribed` after updating the total to be able to
-  // independently apply it regardless of whether `total` was sent.
   if (hasOversubscribed) {
     const Resources& oversubscribedResources =
       message_.oversubscribed_resources();
@@ -7254,6 +7251,10 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newOversubscribed = oversubscribedResources;
   }
 
+  auto agentResources = [](const Resource& resource) {
+    return !resource.has_provider_id();
+  };
+
   const Resources newSlaveResources =
     newTotal.getOrElse(slave->totalResources.nonRevocable()) +
     newOversubscribed.getOrElse(slave->totalResources.revocable());
@@ -7266,30 +7267,59 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
       protobuf::parseResourceVersions(message.resource_version_uuids());
 
+    foreach (
+        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+        message.resource_providers().providers()) {
+      if (!resourceProvider.has_info()) {
+        continue;
+      }
+
+      Try<UUID> resourceVersion =
+        UUID::fromBytes(resourceProvider.resource_version_uuid());
+
+      CHECK_SOME(resourceVersion);
+
+      CHECK(resourceProvider.info().has_id());
+
+      const ResourceProviderID& resourceProviderId =
+        resourceProvider.info().id();
+
+      CHECK(!resourceVersions.contains(resourceProviderId));
+      resourceVersions.insert({resourceProviderId, resourceVersion.get()});
+    }
+
     updated = updated || slave->resourceVersions != resourceVersions;
     slave->resourceVersions = resourceVersions;
   }
 
   // Check if the known offer operations for this agent changed.
-  updated =
-    updated ||
-    (slave->offerOperations.empty() && message.has_offer_operations()) ||
-    (!slave->offerOperations.empty() && !message.has_offer_operations());
-  if (!updated) {
-    const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
-    hashset<UUID> receivedOfferOperations;
+  const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
+  hashset<UUID> receivedOfferOperations;
+
+  foreach (
+      const OfferOperation& operation,
+      message.offer_operations().operations()) {
+    Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+    CHECK_SOME(operationUuid);
+    receivedOfferOperations.insert(operationUuid.get());
+  }
 
+  if (message.has_resource_providers()) {
     foreach (
-        const OfferOperation& operation,
-        message.offer_operations().operations()) {
+        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+        message.resource_providers().providers()) {
+      foreach (
+          const OfferOperation& operation,
+          resourceProvider.operations().operations()) {
         Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
         CHECK_SOME(operationUuid);
         receivedOfferOperations.insert(operationUuid.get());
+      }
     }
-
-    updated = updated || knownOfferOperations != receivedOfferOperations;
   }
 
+  updated = updated || knownOfferOperations != receivedOfferOperations;
+
   if (!updated) {
     LOG(INFO) << "Ignoring update on agent " << *slave
               << " as it reports no changes";
@@ -7302,6 +7332,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     Option<Resources> newTotal;
     Option<hashmap<UUID, OfferOperation>> oldOfferOperations;
     Option<hashmap<UUID, OfferOperation>> newOfferOperations;
+    Option<ResourceProviderInfo> info;
   };
 
   // We store information on the different `ResourceProvider`s on this agent in
@@ -7313,6 +7344,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
 
   // Group the resources and operation updates by resource provider.
   {
+    // Process known resources.
     auto groupResourcesByProviderId = [](const Resources& resources) {
       hashmap<Option<ResourceProviderID>, Resources> result;
 
@@ -7335,14 +7367,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       resourceProviders[providerId].oldTotal = resources;
     }
 
-    foreachpair (
-        const Option<ResourceProviderID>& providerId,
-        const Resources& resources,
-        groupResourcesByProviderId(newSlaveResources)) {
-      // Implicitly create a new record if none exists.
-      resourceProviders[providerId].newTotal = resources;
-    }
-
+    // Process known offer operations.
     foreachpair (
         const UUID& uuid,
         OfferOperation* operation,
@@ -7370,36 +7395,55 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
         .oldOfferOperations->emplace(uuid, *operation);
     }
 
-    // Process received offer operations.
+    // Explicitly add an entry for received agent resources.
+    resourceProviders[None()].newTotal =
+      newSlaveResources.filter(agentResources);
+
+    // Process received agent offer operations.
+    resourceProviders[None()].newOfferOperations =
+      hashmap<UUID, OfferOperation>();
+
     foreach (
         const OfferOperation& operation,
         message.offer_operations().operations()) {
-      Result<ResourceProviderID> providerId_ =
-        getResourceProviderId(operation.info());
-
-      CHECK(!providerId_.isError())
-        << "Failed to extract resource provider id from known operation: "
-        << providerId_.error();
-
-      Option<ResourceProviderID> providerId =
-        providerId_.isSome()
-          ? providerId_.get()
-          : Option<ResourceProviderID>::none();
-
-      // Set up an init empty list of new operations. We might
-      // create a record for this resource provider if needed.
-      if (resourceProviders[providerId].newOfferOperations.isNone()) {
-        resourceProviders.at(providerId).newOfferOperations =
-          hashmap<UUID, OfferOperation>();
-      }
-
       Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
       CHECK_SOME(uuid) << "Could not deserialize operation id when reconciling "
                           "offer operations";
 
-      resourceProviders.at(providerId)
+      resourceProviders.at(None())
         .newOfferOperations->emplace(uuid.get(), operation);
     }
+
+    // Process explicitly received resource provider information.
+    if (message.has_resource_providers()) {
+      foreach (
+          const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+          message.resource_providers().providers()) {
+        CHECK(resourceProvider.has_info());
+        CHECK(resourceProvider.info().has_id());
+
+        ResourceProvider& provider =
+          resourceProviders[resourceProvider.info().id()];
+
+        provider.info = resourceProvider.info();
+
+        provider.newTotal = resourceProvider.total_resources();
+        if (provider.newOfferOperations.isNone()) {
+          provider.newOfferOperations = hashmap<UUID, OfferOperation>();
+        }
+
+        foreach (
+            const OfferOperation& operation,
+            resourceProvider.operations().operations()) {
+          Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+          CHECK_SOME(uuid)
+            << "Could not deserialize operation id when reconciling "
+            "offer operations";
+
+          provider.newOfferOperations->emplace(uuid.get(), operation);
+        }
+      }
+    }
   }
 
   // Check invariants of the received update.
@@ -7408,10 +7452,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
         const Option<ResourceProviderID>& providerId,
         const ResourceProvider& provider,
         resourceProviders) {
-      const bool isNewResourceProvider =
-        provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
-
-      if (!isNewResourceProvider) {
+      if (providerId.isSome() &&
+          slave->resourceProviders.contains(providerId.get())) {
         // For known resource providers the master should always know at least
         // as many non-terminal offer operations as the agent. While an
         // operation might get lost on the way to the agent or resource
@@ -7494,9 +7536,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       const Option<ResourceProviderID>& providerId,
       const ResourceProvider& provider,
       resourceProviders) {
-    const bool isNewResourceProvider =
-      provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
-
     // Below we only add offer operations to our state from resource providers
     // which are unknown, or possibly remove them for known resource providers.
     // This works since the master should always known more offer operations of
@@ -7517,14 +7556,16 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     // new (terminal) operations when observing messages from status
     // update managers to frameworks.
 
-    if (isNewResourceProvider) {
-      // If this is a not previously seen resource provider with
-      // operations we had a master failover. Add the resources and
-      // operations to our state.
-      CHECK_SOME(providerId);
+    if (providerId.isSome() &&
+        !slave->resourceProviders.contains(providerId.get())) {
+      // If this is a not previously seen resource provider we had a master
+      // failover. Add the resources and operations to our state.
       CHECK_SOME(provider.newTotal);
       CHECK(!slave->totalResources.contains(provider.newTotal.get()));
 
+      CHECK_SOME(provider.info);
+      slave->resourceProviders.insert({providerId.get(), provider.info.get()});
+
       slave->totalResources += provider.newTotal.get();
 
       hashmap<FrameworkID, Resources> usedByOperations;

http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1f5daae..5c26f20 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -274,6 +274,7 @@ struct Slave
   SlaveObserver* observer;
 
   hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+  hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviders;
 
 private:
   Slave(const Slave&);              // No copying.

http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 54d8bcc..ee920c8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1321,45 +1321,14 @@ void Slave::registered(
       break;
   }
 
-  // Send the latest total, including resources from resource providers. We send
-  // this message here as a resource provider might have registered with the
-  // agent between recovery completion and agent registration.
-  bool sendUpdateSlaveMessage = false;
+  // If this agent can support resource providers or has had any oversubscribed
+  // resources set, send an `UpdateSlaveMessage` to the master to inform it of a
+  // possible changes between completion of recovery and agent registration.
+  if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
+    UpdateSlaveMessage message = generateUpdateSlaveMessage();
 
-  UpdateSlaveMessage message;
-  message.mutable_slave_id()->CopyFrom(info.id());
-  message.mutable_resource_version_uuids()->CopyFrom(
-      protobuf::createResourceVersions(resourceVersions));
-
-  if (capabilities.resourceProvider) {
-    LOG(INFO) << "Forwarding total resources " << totalResources;
-
-    message.mutable_resource_categories()->set_total(true);
-    message.mutable_total_resources()->CopyFrom(totalResources);
-
-    UpdateSlaveMessage::OfferOperations* operations =
-      message.mutable_offer_operations();
-
-    foreachvalue (const OfferOperation* operation, offerOperations) {
-      operations->add_operations()->CopyFrom(*operation);
-    }
-
-    sendUpdateSlaveMessage = true;
-  }
-
-  // Send the latest estimate for oversubscribed resources.
-  if (oversubscribedResources.isSome()) {
-    LOG(INFO) << "Forwarding total oversubscribed resources "
-              << oversubscribedResources.get();
+    LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
 
-    message.mutable_resource_categories()->set_oversubscribed(true);
-    message.mutable_oversubscribed_resources()->CopyFrom(
-        oversubscribedResources.get());
-
-    sendUpdateSlaveMessage = true;
-  }
-
-  if (sendUpdateSlaveMessage) {
     send(master.get(), message);
   }
 }
@@ -1433,46 +1402,13 @@ void Slave::reregistered(
       return;
   }
 
-  // Send the latest total, including resources from resource providers. We send
-  // this message here as a resource provider might have registered with the
-  // agent between recovery completion and agent registration.
-  bool sendUpdateSlaveMessage = false;
-
-  UpdateSlaveMessage message;
-  message.mutable_slave_id()->CopyFrom(info.id());
-
-  message.mutable_resource_version_uuids()->CopyFrom(
-      protobuf::createResourceVersions(resourceVersions));
-
-  if (capabilities.resourceProvider) {
-    LOG(INFO) << "Forwarding total resources " << totalResources;
-
-    message.mutable_resource_categories()->set_total(true);
-    message.mutable_total_resources()->CopyFrom(totalResources);
-
-    UpdateSlaveMessage::OfferOperations* operations =
-      message.mutable_offer_operations();
+  // If this agent can support resource providers or has had any oversubscribed
+  // resources set, send an `UpdateSlaveMessage` to the master to inform it of a
+  // possible changes between completion of recovery and agent registration.
+  if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
+    UpdateSlaveMessage message = generateUpdateSlaveMessage();
 
-    foreachvalue (const OfferOperation* operation, offerOperations) {
-      operations->add_operations()->CopyFrom(*operation);
-    }
-
-    sendUpdateSlaveMessage = true;
-  }
-
-  // Send the latest estimate for oversubscribed resources.
-  if (oversubscribedResources.isSome()) {
-    LOG(INFO) << "Forwarding total oversubscribed resources "
-              << oversubscribedResources.get();
-
-    message.mutable_resource_categories()->set_oversubscribed(true);
-    message.mutable_oversubscribed_resources()->CopyFrom(
-        oversubscribedResources.get());
-
-    sendUpdateSlaveMessage = true;
-  }
-
-  if (sendUpdateSlaveMessage) {
+    LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
     send(master.get(), message);
   }
 
@@ -7020,10 +6956,17 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
     // Add oversubscribable resources to the total.
     oversubscribed += oversubscribable.get();
 
+    // Remember the previous amount of oversubscribed resources.
+    const Option<Resources> previousOversubscribedResources =
+      oversubscribedResources;
+
+    // Update the estimate.
+    oversubscribedResources = oversubscribed;
+
     // Only forward the estimate if it's different from the previous
     // estimate. We also send this whenever we get (re-)registered
     // (i.e. whenever we transition into the RUNNING state).
-    if (state == RUNNING && oversubscribedResources != oversubscribed) {
+    if (state == RUNNING && previousOversubscribedResources != oversubscribed) {
       LOG(INFO) << "Forwarding total oversubscribed resources "
                 << oversubscribed;
 
@@ -7033,23 +6976,14 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
       // intervals updating the version could cause a lot of offer
       // operation churn.
       //
-      // TODO(bbannier): Revisit this if  we modify the operations
+      // TODO(bbannier): Revisit this if we modify the operations
       // possible on oversubscribed resources.
 
-      UpdateSlaveMessage message;
-      message.mutable_slave_id()->CopyFrom(info.id());
-      message.mutable_resource_categories()->set_oversubscribed(true);
-      message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
-
-      message.mutable_resource_version_uuids()->CopyFrom(
-          protobuf::createResourceVersions(resourceVersions));
+      UpdateSlaveMessage message = generateOversubscribedUpdate();
 
       CHECK_SOME(master);
       send(master.get(), message);
     }
-
-    // Update the estimate.
-    oversubscribedResources = oversubscribed;
   }
 
   delay(flags.oversubscribed_resources_interval,
@@ -7058,6 +6992,112 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 }
 
 
+UpdateSlaveMessage Slave::generateOversubscribedUpdate() const
+{
+  UpdateSlaveMessage message;
+
+  message.mutable_slave_id()->CopyFrom(info.id());
+  message.mutable_resource_categories()->set_oversubscribed(true);
+
+  if (oversubscribedResources.isSome()) {
+    message.mutable_oversubscribed_resources()->CopyFrom(
+        oversubscribedResources.get());
+  }
+
+  return message;
+}
+
+
+UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
+{
+  UpdateSlaveMessage message;
+
+  message.mutable_slave_id()->CopyFrom(info.id());
+
+  // Agent information (total resources, offer operations, resource
+  // versions) is not passed as part of some `ResourceProvider`, but
+  // globally in `UpdateStateMessage`.
+  //
+  // TODO(bbannier): Pass agent information as a resource provider.
+
+  // Process total resources.
+  hashmap<ResourceProviderID, UpdateSlaveMessage::ResourceProvider>
+    resourceProviders;
+
+  foreach (const Resource& resource, totalResources) {
+    if (resource.has_provider_id()) {
+      resourceProviders[resource.provider_id()].add_total_resources()->CopyFrom(
+          resource);
+    }
+  }
+
+  // Process offer operations.
+  UpdateSlaveMessage::OfferOperations* operations =
+    message.mutable_offer_operations();
+
+  foreachvalue (const OfferOperation* operation, offerOperations) {
+    Result<ResourceProviderID> resourceProviderId =
+      getResourceProviderId(operation->info());
+
+    if (resourceProviderId.isSome()) {
+      resourceProviders[resourceProviderId.get()]
+        .mutable_operations()
+        ->add_operations()
+        ->CopyFrom(*operation);
+    } else if (resourceProviderId.isNone()) {
+      operations->add_operations()->CopyFrom(*operation);
+    }
+  }
+
+  // Make sure 'offer_operations' is always set for resource providers.
+  foreachkey (
+      const ResourceProviderID& resourceProviderId,
+      resourceProviderInfos) {
+    resourceProviders[resourceProviderId].mutable_operations();
+  }
+
+  // Process resource versions.
+  CHECK(resourceVersions.contains(None()));
+  message.add_resource_version_uuids()->set_uuid(
+      resourceVersions.at(None()).toBytes());
+
+  foreachpair (
+      const ResourceProviderID& providerId,
+      UpdateSlaveMessage::ResourceProvider& provider,
+      resourceProviders) {
+    CHECK(resourceVersions.contains(providerId));
+    provider.set_resource_version_uuid(
+        resourceVersions.at(providerId).toBytes());
+
+    CHECK(resourceProviderInfos.contains(providerId));
+    provider.mutable_info()->CopyFrom(resourceProviderInfos.at(providerId));
+  }
+
+  // We only actually surface resource-provider related information if
+  // this agent is resource provider-capable.
+  if (capabilities.resourceProvider) {
+    list<UpdateSlaveMessage::ResourceProvider> resourceProviders_ =
+      resourceProviders.values();
+
+    message.mutable_resource_providers()->mutable_providers()->CopyFrom(
+        {resourceProviders_.begin(), resourceProviders_.end()});
+  }
+
+  return message;
+}
+
+
+UpdateSlaveMessage Slave::generateUpdateSlaveMessage() const
+{
+  UpdateSlaveMessage message;
+
+  message.MergeFrom(generateResourceProviderUpdate());
+  message.MergeFrom(generateOversubscribedUpdate());
+
+  return message;
+}
+
+
 void Slave::handleResourceProviderMessage(
     const Future<ResourceProviderMessage>& message)
 {
@@ -7227,26 +7267,8 @@ void Slave::handleResourceProviderMessage(
           if (updated) {
             LOG(INFO) << "Forwarding new total resources " << totalResources;
 
-            // Inform the master that the total capacity of this agent has
-            // changed.
-            UpdateSlaveMessage updateSlaveMessage;
-            updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
-            updateSlaveMessage.mutable_resource_categories()->set_total(true);
-
-            updateSlaveMessage.mutable_total_resources()->CopyFrom(
-                totalResources);
-
-            updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
-                protobuf::createResourceVersions(resourceVersions));
-
-            UpdateSlaveMessage::OfferOperations* operations =
-              updateSlaveMessage.mutable_offer_operations();
-
-            foreachvalue (const OfferOperation* operation, offerOperations) {
-              operations->add_operations()->CopyFrom(*operation);
-            }
-
-            send(master.get(), updateSlaveMessage);
+            // Inform the master about the update from the resource provider.
+            send(master.get(), generateResourceProviderUpdate());
 
             break;
           }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5cb0d55..b3a1e70 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -555,6 +555,13 @@ private:
   void _forwardOversubscribed(
       const process::Future<Resources>& oversubscribable);
 
+  // Helper functions to generate `UpdateSlaveMessage` for either just
+  // updates to oversubscribed resources, resource provider-related
+  // information, or both.
+  UpdateSlaveMessage generateOversubscribedUpdate() const;
+  UpdateSlaveMessage generateResourceProviderUpdate() const;
+  UpdateSlaveMessage generateUpdateSlaveMessage() const;
+
   void handleResourceProviderMessage(
       const process::Future<ResourceProviderMessage>& message);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 0714543..3852a57 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -28,6 +28,7 @@
 
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
 
 #include <mesos/authentication/http/basic_authenticator_factory.hpp>
 
@@ -8751,54 +8752,39 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
 
   resourceProviderResources.mutable_provider_id()->CopyFrom(resourceProviderId);
 
+  const string resourceVersionUuid = UUID::random().toBytes();
+
   {
     mesos::v1::resource_provider::Call call;
     call.set_type(mesos::v1::resource_provider::Call::UPDATE_STATE);
-
     call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
 
-    auto updateState = call.mutable_update_state();
+    mesos::v1::resource_provider::Call::UpdateState* updateState =
+      call.mutable_update_state();
+
     updateState->mutable_resources()->CopyFrom(
         v1::Resources(resourceProviderResources));
-    updateState->set_resource_version_uuid(UUID::random().toBytes());
+
+    updateState->set_resource_version_uuid(resourceVersionUuid);
 
     resourceProvider.send(call);
   }
 
   AWAIT_READY(updateSlaveMessage);
 
-  EXPECT_TRUE(updateSlaveMessage->has_resource_categories());
-  EXPECT_TRUE(updateSlaveMessage->resource_categories().has_total());
-  EXPECT_TRUE(updateSlaveMessage->resource_categories().total());
-
-  // We expect the updated agent total to contain both the resources of the
-  // agent and of the newly subscribed resource provider. The resources from the
-  // resource provider have a matching `ResourceProviderId` set.
-  Resources expectedResources =
-    Resources::parse(slaveFlags.resources.get()).get();
-  expectedResources += devolve(resourceProviderResources);
-
-  EXPECT_EQ(expectedResources, updateSlaveMessage->total_resources());
-
-  // The update from the agent should now contain both the agent and
-  // resource provider resource versions.
-  ASSERT_EQ(2u, updateSlaveMessage->resource_version_uuids_size());
-
-  hashset<Option<ResourceProviderID>> resourceProviderIds;
-  foreach (
-      const ResourceVersionUUID& resourceVersionUuid,
-      updateSlaveMessage->resource_version_uuids()) {
-    resourceProviderIds.insert(
-        resourceVersionUuid.has_resource_provider_id()
-          ? resourceVersionUuid.resource_provider_id()
-          : Option<ResourceProviderID>::none());
-  }
+  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
 
-  hashset<Option<ResourceProviderID>> expectedResourceProviderIds;
-  expectedResourceProviderIds.insert(None());
-  expectedResourceProviderIds.insert(devolve(resourceProviderId));
+  const UpdateSlaveMessage::ResourceProvider& receivedResourceProvider =
+    updateSlaveMessage->resource_providers().providers(0);
 
-  EXPECT_EQ(expectedResourceProviderIds, resourceProviderIds);
+  EXPECT_EQ(
+      Resources(devolve(resourceProviderResources)),
+      Resources(receivedResourceProvider.total_resources()));
+
+  EXPECT_EQ(
+      resourceVersionUuid,
+      receivedResourceProvider.resource_version_uuid());
 }
 
 
@@ -9251,6 +9237,9 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
     v1::Resources reserved = offer.resources();
     reserved = reserved.filter(
         [](const v1::Resource& r) { return r.has_provider_id(); });
+
+    ASSERT_FALSE(reserved.empty());
+
     reserved = reserved.pushReservation(v1::createDynamicReservationInfo(
         frameworkInfo.roles(0), frameworkInfo.principal()));
 
@@ -9320,13 +9309,16 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
   hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
     protobuf::parseResourceVersions(
         updateSlaveMessage->resource_version_uuids());
+
   // The reserve operation will still be reported as pending since no offer
   // operation status update has been received from the resource provider.
-  ASSERT_TRUE(updateSlaveMessage->has_offer_operations());
-  ASSERT_EQ(1, updateSlaveMessage->offer_operations().operations_size());
+  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+  auto provider = updateSlaveMessage->resource_providers().providers(0);
+  ASSERT_TRUE(provider.has_operations());
+  ASSERT_EQ(1, provider.operations().operations_size());
 
-  const OfferOperation& reserve =
-    updateSlaveMessage->offer_operations().operations(0);
+  const OfferOperation& reserve = provider.operations().operations(0);
 
   EXPECT_EQ(Offer::Operation::RESERVE, reserve.info().type());
   ASSERT_TRUE(reserve.has_latest_status());