You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/12/08 16:02:21 UTC
[3/5] mesos git commit: Explicitly passed resource-provider
information in 'UpdateSlaveMessage'.
Explicitly passed resource-provider information in 'UpdateSlaveMessage'.
This patch changes the way resource-provider related information is
passed. Instead of aggregating all information from both the agent and
resource providers into global per-agent lists in
'UpdateSlaveMessage', with this patch we pass resource-provider
related information explicitly. We can in a subsequent patch surface
this information in e.g., the operator API.
Review: https://reviews.apache.org/r/64423
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7cfa0e92
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7cfa0e92
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7cfa0e92
Branch: refs/heads/master
Commit: 7cfa0e9206647ddd6217cd69090f5eb328a73529
Parents: b5d6441
Author: Benjamin Bannier <bb...@apache.org>
Authored: Thu Dec 7 11:05:05 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:41 2017 +0100
----------------------------------------------------------------------
src/master/master.cpp | 151 ++++++++++++++++----------
src/master/master.hpp | 1 +
src/slave/slave.cpp | 238 ++++++++++++++++++++++-------------------
src/slave/slave.hpp | 7 ++
src/tests/slave_tests.cpp | 66 +++++-------
5 files changed, 263 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5cba506..8cf699b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7241,9 +7241,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
newTotal = totalResources;
}
- // Since `total` always overwrites an existing total, we apply
- // `oversubscribed` after updating the total to be able to
- // independently apply it regardless of whether `total` was sent.
if (hasOversubscribed) {
const Resources& oversubscribedResources =
message_.oversubscribed_resources();
@@ -7254,6 +7251,10 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
newOversubscribed = oversubscribedResources;
}
+ auto agentResources = [](const Resource& resource) {
+ return !resource.has_provider_id();
+ };
+
const Resources newSlaveResources =
newTotal.getOrElse(slave->totalResources.nonRevocable()) +
newOversubscribed.getOrElse(slave->totalResources.revocable());
@@ -7266,30 +7267,59 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
protobuf::parseResourceVersions(message.resource_version_uuids());
+ foreach (
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ if (!resourceProvider.has_info()) {
+ continue;
+ }
+
+ Try<UUID> resourceVersion =
+ UUID::fromBytes(resourceProvider.resource_version_uuid());
+
+ CHECK_SOME(resourceVersion);
+
+ CHECK(resourceProvider.info().has_id());
+
+ const ResourceProviderID& resourceProviderId =
+ resourceProvider.info().id();
+
+ CHECK(!resourceVersions.contains(resourceProviderId));
+ resourceVersions.insert({resourceProviderId, resourceVersion.get()});
+ }
+
updated = updated || slave->resourceVersions != resourceVersions;
slave->resourceVersions = resourceVersions;
}
// Check if the known offer operations for this agent changed.
- updated =
- updated ||
- (slave->offerOperations.empty() && message.has_offer_operations()) ||
- (!slave->offerOperations.empty() && !message.has_offer_operations());
- if (!updated) {
- const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
- hashset<UUID> receivedOfferOperations;
+ const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
+ hashset<UUID> receivedOfferOperations;
+
+ foreach (
+ const OfferOperation& operation,
+ message.offer_operations().operations()) {
+ Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+ CHECK_SOME(operationUuid);
+ receivedOfferOperations.insert(operationUuid.get());
+ }
+ if (message.has_resource_providers()) {
foreach (
- const OfferOperation& operation,
- message.offer_operations().operations()) {
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ foreach (
+ const OfferOperation& operation,
+ resourceProvider.operations().operations()) {
Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
CHECK_SOME(operationUuid);
receivedOfferOperations.insert(operationUuid.get());
+ }
}
-
- updated = updated || knownOfferOperations != receivedOfferOperations;
}
+ updated = updated || knownOfferOperations != receivedOfferOperations;
+
if (!updated) {
LOG(INFO) << "Ignoring update on agent " << *slave
<< " as it reports no changes";
@@ -7302,6 +7332,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
Option<Resources> newTotal;
Option<hashmap<UUID, OfferOperation>> oldOfferOperations;
Option<hashmap<UUID, OfferOperation>> newOfferOperations;
+ Option<ResourceProviderInfo> info;
};
// We store information on the different `ResourceProvider`s on this agent in
@@ -7313,6 +7344,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
// Group the resources and operation updates by resource provider.
{
+ // Process known resources.
auto groupResourcesByProviderId = [](const Resources& resources) {
hashmap<Option<ResourceProviderID>, Resources> result;
@@ -7335,14 +7367,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
resourceProviders[providerId].oldTotal = resources;
}
- foreachpair (
- const Option<ResourceProviderID>& providerId,
- const Resources& resources,
- groupResourcesByProviderId(newSlaveResources)) {
- // Implicitly create a new record if none exists.
- resourceProviders[providerId].newTotal = resources;
- }
-
+ // Process known offer operations.
foreachpair (
const UUID& uuid,
OfferOperation* operation,
@@ -7370,36 +7395,55 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
.oldOfferOperations->emplace(uuid, *operation);
}
- // Process received offer operations.
+ // Explicitly add an entry for received agent resources.
+ resourceProviders[None()].newTotal =
+ newSlaveResources.filter(agentResources);
+
+ // Process received agent offer operations.
+ resourceProviders[None()].newOfferOperations =
+ hashmap<UUID, OfferOperation>();
+
foreach (
const OfferOperation& operation,
message.offer_operations().operations()) {
- Result<ResourceProviderID> providerId_ =
- getResourceProviderId(operation.info());
-
- CHECK(!providerId_.isError())
- << "Failed to extract resource provider id from known operation: "
- << providerId_.error();
-
- Option<ResourceProviderID> providerId =
- providerId_.isSome()
- ? providerId_.get()
- : Option<ResourceProviderID>::none();
-
- // Set up an init empty list of new operations. We might
- // create a record for this resource provider if needed.
- if (resourceProviders[providerId].newOfferOperations.isNone()) {
- resourceProviders.at(providerId).newOfferOperations =
- hashmap<UUID, OfferOperation>();
- }
-
Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
CHECK_SOME(uuid) << "Could not deserialize operation id when reconciling "
"offer operations";
- resourceProviders.at(providerId)
+ resourceProviders.at(None())
.newOfferOperations->emplace(uuid.get(), operation);
}
+
+ // Process explicitly received resource provider information.
+ if (message.has_resource_providers()) {
+ foreach (
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ CHECK(resourceProvider.has_info());
+ CHECK(resourceProvider.info().has_id());
+
+ ResourceProvider& provider =
+ resourceProviders[resourceProvider.info().id()];
+
+ provider.info = resourceProvider.info();
+
+ provider.newTotal = resourceProvider.total_resources();
+ if (provider.newOfferOperations.isNone()) {
+ provider.newOfferOperations = hashmap<UUID, OfferOperation>();
+ }
+
+ foreach (
+ const OfferOperation& operation,
+ resourceProvider.operations().operations()) {
+ Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+ CHECK_SOME(uuid)
+ << "Could not deserialize operation id when reconciling "
+ "offer operations";
+
+ provider.newOfferOperations->emplace(uuid.get(), operation);
+ }
+ }
+ }
}
// Check invariants of the received update.
@@ -7408,10 +7452,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
const Option<ResourceProviderID>& providerId,
const ResourceProvider& provider,
resourceProviders) {
- const bool isNewResourceProvider =
- provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
-
- if (!isNewResourceProvider) {
+ if (providerId.isSome() &&
+ slave->resourceProviders.contains(providerId.get())) {
// For known resource providers the master should always know at least
// as many non-terminal offer operations as the agent. While an
// operation might get lost on the way to the agent or resource
@@ -7494,9 +7536,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
const Option<ResourceProviderID>& providerId,
const ResourceProvider& provider,
resourceProviders) {
- const bool isNewResourceProvider =
- provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
-
// Below we only add offer operations to our state from resource providers
// which are unknown, or possibly remove them for known resource providers.
// This works since the master should always known more offer operations of
@@ -7517,14 +7556,16 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
// new (terminal) operations when observing messages from status
// update managers to frameworks.
- if (isNewResourceProvider) {
- // If this is a not previously seen resource provider with
- // operations we had a master failover. Add the resources and
- // operations to our state.
- CHECK_SOME(providerId);
+ if (providerId.isSome() &&
+ !slave->resourceProviders.contains(providerId.get())) {
+ // If this is a not previously seen resource provider we had a master
+ // failover. Add the resources and operations to our state.
CHECK_SOME(provider.newTotal);
CHECK(!slave->totalResources.contains(provider.newTotal.get()));
+ CHECK_SOME(provider.info);
+ slave->resourceProviders.insert({providerId.get(), provider.info.get()});
+
slave->totalResources += provider.newTotal.get();
hashmap<FrameworkID, Resources> usedByOperations;
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1f5daae..5c26f20 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -274,6 +274,7 @@ struct Slave
SlaveObserver* observer;
hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+ hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviders;
private:
Slave(const Slave&); // No copying.
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 54d8bcc..ee920c8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1321,45 +1321,14 @@ void Slave::registered(
break;
}
- // Send the latest total, including resources from resource providers. We send
- // this message here as a resource provider might have registered with the
- // agent between recovery completion and agent registration.
- bool sendUpdateSlaveMessage = false;
+ // If this agent can support resource providers or has had any oversubscribed
+ // resources set, send an `UpdateSlaveMessage` to the master to inform it of a
+ // possible changes between completion of recovery and agent registration.
+ if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
+ UpdateSlaveMessage message = generateUpdateSlaveMessage();
- UpdateSlaveMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(resourceVersions));
-
- if (capabilities.resourceProvider) {
- LOG(INFO) << "Forwarding total resources " << totalResources;
-
- message.mutable_resource_categories()->set_total(true);
- message.mutable_total_resources()->CopyFrom(totalResources);
-
- UpdateSlaveMessage::OfferOperations* operations =
- message.mutable_offer_operations();
-
- foreachvalue (const OfferOperation* operation, offerOperations) {
- operations->add_operations()->CopyFrom(*operation);
- }
-
- sendUpdateSlaveMessage = true;
- }
-
- // Send the latest estimate for oversubscribed resources.
- if (oversubscribedResources.isSome()) {
- LOG(INFO) << "Forwarding total oversubscribed resources "
- << oversubscribedResources.get();
+ LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
- message.mutable_resource_categories()->set_oversubscribed(true);
- message.mutable_oversubscribed_resources()->CopyFrom(
- oversubscribedResources.get());
-
- sendUpdateSlaveMessage = true;
- }
-
- if (sendUpdateSlaveMessage) {
send(master.get(), message);
}
}
@@ -1433,46 +1402,13 @@ void Slave::reregistered(
return;
}
- // Send the latest total, including resources from resource providers. We send
- // this message here as a resource provider might have registered with the
- // agent between recovery completion and agent registration.
- bool sendUpdateSlaveMessage = false;
-
- UpdateSlaveMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
-
- message.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(resourceVersions));
-
- if (capabilities.resourceProvider) {
- LOG(INFO) << "Forwarding total resources " << totalResources;
-
- message.mutable_resource_categories()->set_total(true);
- message.mutable_total_resources()->CopyFrom(totalResources);
-
- UpdateSlaveMessage::OfferOperations* operations =
- message.mutable_offer_operations();
+ // If this agent can support resource providers or has had any oversubscribed
+ // resources set, send an `UpdateSlaveMessage` to the master to inform it of a
+ // possible changes between completion of recovery and agent registration.
+ if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
+ UpdateSlaveMessage message = generateUpdateSlaveMessage();
- foreachvalue (const OfferOperation* operation, offerOperations) {
- operations->add_operations()->CopyFrom(*operation);
- }
-
- sendUpdateSlaveMessage = true;
- }
-
- // Send the latest estimate for oversubscribed resources.
- if (oversubscribedResources.isSome()) {
- LOG(INFO) << "Forwarding total oversubscribed resources "
- << oversubscribedResources.get();
-
- message.mutable_resource_categories()->set_oversubscribed(true);
- message.mutable_oversubscribed_resources()->CopyFrom(
- oversubscribedResources.get());
-
- sendUpdateSlaveMessage = true;
- }
-
- if (sendUpdateSlaveMessage) {
+ LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
send(master.get(), message);
}
@@ -7020,10 +6956,17 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
// Add oversubscribable resources to the total.
oversubscribed += oversubscribable.get();
+ // Remember the previous amount of oversubscribed resources.
+ const Option<Resources> previousOversubscribedResources =
+ oversubscribedResources;
+
+ // Update the estimate.
+ oversubscribedResources = oversubscribed;
+
// Only forward the estimate if it's different from the previous
// estimate. We also send this whenever we get (re-)registered
// (i.e. whenever we transition into the RUNNING state).
- if (state == RUNNING && oversubscribedResources != oversubscribed) {
+ if (state == RUNNING && previousOversubscribedResources != oversubscribed) {
LOG(INFO) << "Forwarding total oversubscribed resources "
<< oversubscribed;
@@ -7033,23 +6976,14 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
// intervals updating the version could cause a lot of offer
// operation churn.
//
- // TODO(bbannier): Revisit this if we modify the operations
+ // TODO(bbannier): Revisit this if we modify the operations
// possible on oversubscribed resources.
- UpdateSlaveMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_resource_categories()->set_oversubscribed(true);
- message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
-
- message.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(resourceVersions));
+ UpdateSlaveMessage message = generateOversubscribedUpdate();
CHECK_SOME(master);
send(master.get(), message);
}
-
- // Update the estimate.
- oversubscribedResources = oversubscribed;
}
delay(flags.oversubscribed_resources_interval,
@@ -7058,6 +6992,112 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
}
+UpdateSlaveMessage Slave::generateOversubscribedUpdate() const
+{
+ UpdateSlaveMessage message;
+
+ message.mutable_slave_id()->CopyFrom(info.id());
+ message.mutable_resource_categories()->set_oversubscribed(true);
+
+ if (oversubscribedResources.isSome()) {
+ message.mutable_oversubscribed_resources()->CopyFrom(
+ oversubscribedResources.get());
+ }
+
+ return message;
+}
+
+
+UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
+{
+ UpdateSlaveMessage message;
+
+ message.mutable_slave_id()->CopyFrom(info.id());
+
+ // Agent information (total resources, offer operations, resource
+ // versions) is not passed as part of some `ResourceProvider`, but
+ // globally in `UpdateStateMessage`.
+ //
+ // TODO(bbannier): Pass agent information as a resource provider.
+
+ // Process total resources.
+ hashmap<ResourceProviderID, UpdateSlaveMessage::ResourceProvider>
+ resourceProviders;
+
+ foreach (const Resource& resource, totalResources) {
+ if (resource.has_provider_id()) {
+ resourceProviders[resource.provider_id()].add_total_resources()->CopyFrom(
+ resource);
+ }
+ }
+
+ // Process offer operations.
+ UpdateSlaveMessage::OfferOperations* operations =
+ message.mutable_offer_operations();
+
+ foreachvalue (const OfferOperation* operation, offerOperations) {
+ Result<ResourceProviderID> resourceProviderId =
+ getResourceProviderId(operation->info());
+
+ if (resourceProviderId.isSome()) {
+ resourceProviders[resourceProviderId.get()]
+ .mutable_operations()
+ ->add_operations()
+ ->CopyFrom(*operation);
+ } else if (resourceProviderId.isNone()) {
+ operations->add_operations()->CopyFrom(*operation);
+ }
+ }
+
+ // Make sure 'offer_operations' is always set for resource providers.
+ foreachkey (
+ const ResourceProviderID& resourceProviderId,
+ resourceProviderInfos) {
+ resourceProviders[resourceProviderId].mutable_operations();
+ }
+
+ // Process resource versions.
+ CHECK(resourceVersions.contains(None()));
+ message.add_resource_version_uuids()->set_uuid(
+ resourceVersions.at(None()).toBytes());
+
+ foreachpair (
+ const ResourceProviderID& providerId,
+ UpdateSlaveMessage::ResourceProvider& provider,
+ resourceProviders) {
+ CHECK(resourceVersions.contains(providerId));
+ provider.set_resource_version_uuid(
+ resourceVersions.at(providerId).toBytes());
+
+ CHECK(resourceProviderInfos.contains(providerId));
+ provider.mutable_info()->CopyFrom(resourceProviderInfos.at(providerId));
+ }
+
+ // We only actually surface resource-provider related information if
+ // this agent is resource provider-capable.
+ if (capabilities.resourceProvider) {
+ list<UpdateSlaveMessage::ResourceProvider> resourceProviders_ =
+ resourceProviders.values();
+
+ message.mutable_resource_providers()->mutable_providers()->CopyFrom(
+ {resourceProviders_.begin(), resourceProviders_.end()});
+ }
+
+ return message;
+}
+
+
+UpdateSlaveMessage Slave::generateUpdateSlaveMessage() const
+{
+ UpdateSlaveMessage message;
+
+ message.MergeFrom(generateResourceProviderUpdate());
+ message.MergeFrom(generateOversubscribedUpdate());
+
+ return message;
+}
+
+
void Slave::handleResourceProviderMessage(
const Future<ResourceProviderMessage>& message)
{
@@ -7227,26 +7267,8 @@ void Slave::handleResourceProviderMessage(
if (updated) {
LOG(INFO) << "Forwarding new total resources " << totalResources;
- // Inform the master that the total capacity of this agent has
- // changed.
- UpdateSlaveMessage updateSlaveMessage;
- updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
- updateSlaveMessage.mutable_resource_categories()->set_total(true);
-
- updateSlaveMessage.mutable_total_resources()->CopyFrom(
- totalResources);
-
- updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(resourceVersions));
-
- UpdateSlaveMessage::OfferOperations* operations =
- updateSlaveMessage.mutable_offer_operations();
-
- foreachvalue (const OfferOperation* operation, offerOperations) {
- operations->add_operations()->CopyFrom(*operation);
- }
-
- send(master.get(), updateSlaveMessage);
+ // Inform the master about the update from the resource provider.
+ send(master.get(), generateResourceProviderUpdate());
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5cb0d55..b3a1e70 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -555,6 +555,13 @@ private:
void _forwardOversubscribed(
const process::Future<Resources>& oversubscribable);
+ // Helper functions to generate `UpdateSlaveMessage` for either just
+ // updates to oversubscribed resources, resource provider-related
+ // information, or both.
+ UpdateSlaveMessage generateOversubscribedUpdate() const;
+ UpdateSlaveMessage generateResourceProviderUpdate() const;
+ UpdateSlaveMessage generateUpdateSlaveMessage() const;
+
void handleResourceProviderMessage(
const process::Future<ResourceProviderMessage>& message);
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 0714543..3852a57 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -28,6 +28,7 @@
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
#include <mesos/authentication/http/basic_authenticator_factory.hpp>
@@ -8751,54 +8752,39 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
resourceProviderResources.mutable_provider_id()->CopyFrom(resourceProviderId);
+ const string resourceVersionUuid = UUID::random().toBytes();
+
{
mesos::v1::resource_provider::Call call;
call.set_type(mesos::v1::resource_provider::Call::UPDATE_STATE);
-
call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
- auto updateState = call.mutable_update_state();
+ mesos::v1::resource_provider::Call::UpdateState* updateState =
+ call.mutable_update_state();
+
updateState->mutable_resources()->CopyFrom(
v1::Resources(resourceProviderResources));
- updateState->set_resource_version_uuid(UUID::random().toBytes());
+
+ updateState->set_resource_version_uuid(resourceVersionUuid);
resourceProvider.send(call);
}
AWAIT_READY(updateSlaveMessage);
- EXPECT_TRUE(updateSlaveMessage->has_resource_categories());
- EXPECT_TRUE(updateSlaveMessage->resource_categories().has_total());
- EXPECT_TRUE(updateSlaveMessage->resource_categories().total());
-
- // We expect the updated agent total to contain both the resources of the
- // agent and of the newly subscribed resource provider. The resources from the
- // resource provider have a matching `ResourceProviderId` set.
- Resources expectedResources =
- Resources::parse(slaveFlags.resources.get()).get();
- expectedResources += devolve(resourceProviderResources);
-
- EXPECT_EQ(expectedResources, updateSlaveMessage->total_resources());
-
- // The update from the agent should now contain both the agent and
- // resource provider resource versions.
- ASSERT_EQ(2u, updateSlaveMessage->resource_version_uuids_size());
-
- hashset<Option<ResourceProviderID>> resourceProviderIds;
- foreach (
- const ResourceVersionUUID& resourceVersionUuid,
- updateSlaveMessage->resource_version_uuids()) {
- resourceProviderIds.insert(
- resourceVersionUuid.has_resource_provider_id()
- ? resourceVersionUuid.resource_provider_id()
- : Option<ResourceProviderID>::none());
- }
+ ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+ ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
- hashset<Option<ResourceProviderID>> expectedResourceProviderIds;
- expectedResourceProviderIds.insert(None());
- expectedResourceProviderIds.insert(devolve(resourceProviderId));
+ const UpdateSlaveMessage::ResourceProvider& receivedResourceProvider =
+ updateSlaveMessage->resource_providers().providers(0);
- EXPECT_EQ(expectedResourceProviderIds, resourceProviderIds);
+ EXPECT_EQ(
+ Resources(devolve(resourceProviderResources)),
+ Resources(receivedResourceProvider.total_resources()));
+
+ EXPECT_EQ(
+ resourceVersionUuid,
+ receivedResourceProvider.resource_version_uuid());
}
@@ -9251,6 +9237,9 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
v1::Resources reserved = offer.resources();
reserved = reserved.filter(
[](const v1::Resource& r) { return r.has_provider_id(); });
+
+ ASSERT_FALSE(reserved.empty());
+
reserved = reserved.pushReservation(v1::createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
@@ -9320,13 +9309,16 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
protobuf::parseResourceVersions(
updateSlaveMessage->resource_version_uuids());
+
// The reserve operation will still be reported as pending since no offer
// operation status update has been received from the resource provider.
- ASSERT_TRUE(updateSlaveMessage->has_offer_operations());
- ASSERT_EQ(1, updateSlaveMessage->offer_operations().operations_size());
+ ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+ ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+ auto provider = updateSlaveMessage->resource_providers().providers(0);
+ ASSERT_TRUE(provider.has_operations());
+ ASSERT_EQ(1, provider.operations().operations_size());
- const OfferOperation& reserve =
- updateSlaveMessage->offer_operations().operations(0);
+ const OfferOperation& reserve = provider.operations().operations(0);
EXPECT_EQ(Offer::Operation::RESERVE, reserve.info().type());
ASSERT_TRUE(reserve.has_latest_status());