You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/12/08 16:02:19 UTC
[1/5] mesos git commit: Added explicit resource provider information
to 'UpdateSlaveMessage'.
Repository: mesos
Updated Branches:
refs/heads/master f8771c7f5 -> a345c2a65
Added explicit resource provider information to 'UpdateSlaveMessage'.
The added fields will allow us to explicitly surface resource
provider-related information in the master.
Review: https://reviews.apache.org/r/64422
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b5d64414
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b5d64414
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b5d64414
Branch: refs/heads/master
Commit: b5d64414e0def423aa1beb5552391ea7b975fcc6
Parents: 396b927
Author: Benjamin Bannier <bb...@apache.org>
Authored: Tue Dec 5 13:39:07 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:40 2017 +0100
----------------------------------------------------------------------
src/messages/messages.proto | 22 ++++++++++++++++++++++
1 file changed, 22 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b5d64414/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index f711784..53db010 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -716,6 +716,13 @@ message ResourceVersionUUID {
message UpdateSlaveMessage {
required SlaveID slave_id = 1;
+ // Top-level fields in this message should only contain information
+ // on the agent itself; information on local resource providers is
+ // passed explicitly in the `resource_providers` message.
+ //
+ // TODO(bbannier): Consider passing agent information inside a
+ // `ResourceProvider` value as well where applicable.
+
// This message can contain `oversubscribed_resources` or
// `total_resources`. Callers are expected to set the respective
// categories in `resource_categories` to denote which fields should
@@ -754,6 +761,21 @@ message UpdateSlaveMessage {
// because this means the operation is operating on resources that
// might have already been invalidated.
repeated ResourceVersionUUID resource_version_uuids = 7;
+
+ // Describes an agent-local resource provider.
+ message ResourceProvider {
+ optional ResourceProviderInfo info = 1;
+ repeated Resource total_resources = 2;
+ required OfferOperations operations = 3;
+ required bytes resource_version_uuid = 4;
+ }
+
+ message ResourceProviders {
+ repeated ResourceProvider providers = 1;
+ }
+
+ // The list of all resource providers on this agent.
+ optional ResourceProviders resource_providers = 8;
}
[2/5] mesos git commit: Fixed 'getResourceProviderId' for operations
without resources.
Posted by bb...@apache.org.
Fixed 'getResourceProviderId' for operations without resources.
The code in 'getResourceProviderId' was written in a way assuming that
any operation containing a list resources always contained at least a
single resource. This is both incorrect since such operations pass
validation, and also neglects to check a crucial precondition since
accessing a non-existent element in this context would lead to a check
failure in protobuf and a subsequent abort.
This code updates 'getResourceProviderId' to check the precondition of
operations holding at least one element.
Review: https://reviews.apache.org/r/64425
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/396b927c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/396b927c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/396b927c
Branch: refs/heads/master
Commit: 396b927c19bc61a986c8c0fb4068af159fd1e87e
Parents: f8771c7
Author: Benjamin Bannier <bb...@apache.org>
Authored: Thu Dec 7 17:14:46 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:40 2017 +0100
----------------------------------------------------------------------
src/common/resources_utils.cpp | 12 ++++++++++++
1 file changed, 12 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/396b927c/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 8e3d304..1676b72 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -234,15 +234,27 @@ Result<ResourceProviderID> getResourceProviderId(
case Offer::Operation::LAUNCH_GROUP:
return Error("Unexpected LAUNCH_GROUP operation");
case Offer::Operation::RESERVE:
+ if (operation.reserve().resources().empty()) {
+ return Error("Operation contains no resources");
+ }
resource = operation.reserve().resources(0);
break;
case Offer::Operation::UNRESERVE:
+ if (operation.unreserve().resources().empty()) {
+ return Error("Operation contains no resources");
+ }
resource = operation.unreserve().resources(0);
break;
case Offer::Operation::CREATE:
+ if (operation.create().volumes().empty()) {
+ return Error("Operation contains no resources");
+ }
resource = operation.create().volumes(0);
break;
case Offer::Operation::DESTROY:
+ if (operation.destroy().volumes().empty()) {
+ return Error("Operation contains no resources");
+ }
resource = operation.destroy().volumes(0);
break;
case Offer::Operation::CREATE_VOLUME:
[5/5] mesos git commit: Only passed agent's resource version in
top-level 'UpdateSlaveMessage'.
Posted by bb...@apache.org.
Only passed agent's resource version in top-level 'UpdateSlaveMessage'.
Since we have moved all resource provider-related information like
e.g., resource versions to an explicit field in 'UpdateSlaveMessage',
we now only pass agent information in top-level fields.
To model that we changed the top-level resource versions there from a
repeated field to a single optional field. This will allow us to in
the future pass agent information in the resource provider data
structure as well.
Review: https://reviews.apache.org/r/64430
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a345c2a6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a345c2a6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a345c2a6
Branch: refs/heads/master
Commit: a345c2a656598656d6172231e96cf3d5e7be0800
Parents: bd55be1
Author: Benjamin Bannier <bb...@apache.org>
Authored: Thu Dec 7 23:03:01 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:41 2017 +0100
----------------------------------------------------------------------
src/master/master.cpp | 13 ++++++++++---
src/messages/messages.proto | 21 ++++++++++-----------
src/slave/slave.cpp | 3 +--
src/tests/slave_tests.cpp | 6 ------
4 files changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a345c2a6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3b2fcb8..b3e074c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7256,10 +7256,17 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
message.has_resource_providers();
// Agents which can support resource providers always update the
- // master on their resource versions uuids via `UpdateSlaveMessage`.
+ // master on their resource versions uuid via `UpdateSlaveMessage`.
if (slave->capabilities.resourceProvider) {
- hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
- protobuf::parseResourceVersions(message.resource_version_uuids());
+ CHECK(message.has_resource_version_uuid());
+
+ hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+
+ const Try<UUID> slaveResourceVersion =
+ UUID::fromBytes(message.resource_version_uuid());
+
+ CHECK_SOME(slaveResourceVersion);
+ resourceVersions.insert({None(), slaveResourceVersion.get()});
foreach (
const UpdateSlaveMessage::ResourceProvider& resourceProvider,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a345c2a6/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 0b44ba2..a13a641 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -746,17 +746,16 @@ message UpdateSlaveMessage {
optional OfferOperations offer_operations = 6;
// Used to establish the relationship between the operation and the
- // resources that the operation is operating on. Each resource
- // provider will keep a resource version UUID, and change it when
- // it believes that the resources from this resource provider are
- // out of sync from the master's view. The master will keep track
- // of the last known resource version UUID for each resource
- // provider, and attach the resource version UUID in each operation
- // it sends out. The resource provider should reject operations that
- // have a different resource version UUID than that it maintains,
- // because this means the operation is operating on resources that
- // might have already been invalidated.
- repeated ResourceVersionUUID resource_version_uuids = 7;
+ // resources that the operation is operating on. Each agent will
+ // keep a resource version UUID, and change it when it believes that
+ // its resources are out of sync from the master's view. The master
+ // will keep track of the last known resource version UUID for each
+ // resource provider or agent, and attach the resource version UUID
+ // in each operation it sends out. The resource provider or agent
+ // should reject operations that have a different resource version
+ // UUID than that it maintains, because this means the operation is
+ // operating on resources that might have already been invalidated.
+ optional bytes resource_version_uuid = 7;
// Describes an agent-local resource provider.
message ResourceProvider {
http://git-wip-us.apache.org/repos/asf/mesos/blob/a345c2a6/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ee920c8..373e393 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7058,8 +7058,7 @@ UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
// Process resource versions.
CHECK(resourceVersions.contains(None()));
- message.add_resource_version_uuids()->set_uuid(
- resourceVersions.at(None()).toBytes());
+ message.set_resource_version_uuid(resourceVersions.at(None()).toBytes());
foreachpair (
const ResourceProviderID& providerId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a345c2a6/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 3852a57..0fb2a63 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -9304,12 +9304,6 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
AWAIT_READY(updateSlaveMessage);
- // We expect to see the new resource provider resource version in
- // the `UpdateSlaveMessage`.
- hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
- protobuf::parseResourceVersions(
- updateSlaveMessage->resource_version_uuids());
-
// The reserve operation will still be reported as pending since no offer
// operation status update has been received from the resource provider.
ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
[3/5] mesos git commit: Explicitly passed resource-provider
information in 'UpdateSlaveMessage'.
Posted by bb...@apache.org.
Explicitly passed resource-provider information in 'UpdateSlaveMessage'.
This patch changes the way resource-provider related information is
passed. Instead of aggregating all information from both the agent and
resource providers into global per-agent lists in
'UpdateSlaveMessage', with this patch we pass resource-provider
related information explicitly. We can in a subsequent patch surface
this information in e.g., the operator API.
Review: https://reviews.apache.org/r/64423
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7cfa0e92
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7cfa0e92
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7cfa0e92
Branch: refs/heads/master
Commit: 7cfa0e9206647ddd6217cd69090f5eb328a73529
Parents: b5d6441
Author: Benjamin Bannier <bb...@apache.org>
Authored: Thu Dec 7 11:05:05 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:41 2017 +0100
----------------------------------------------------------------------
src/master/master.cpp | 151 ++++++++++++++++----------
src/master/master.hpp | 1 +
src/slave/slave.cpp | 238 ++++++++++++++++++++++-------------------
src/slave/slave.hpp | 7 ++
src/tests/slave_tests.cpp | 66 +++++-------
5 files changed, 263 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5cba506..8cf699b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7241,9 +7241,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
newTotal = totalResources;
}
- // Since `total` always overwrites an existing total, we apply
- // `oversubscribed` after updating the total to be able to
- // independently apply it regardless of whether `total` was sent.
if (hasOversubscribed) {
const Resources& oversubscribedResources =
message_.oversubscribed_resources();
@@ -7254,6 +7251,10 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
newOversubscribed = oversubscribedResources;
}
+ auto agentResources = [](const Resource& resource) {
+ return !resource.has_provider_id();
+ };
+
const Resources newSlaveResources =
newTotal.getOrElse(slave->totalResources.nonRevocable()) +
newOversubscribed.getOrElse(slave->totalResources.revocable());
@@ -7266,30 +7267,59 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
protobuf::parseResourceVersions(message.resource_version_uuids());
+ foreach (
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ if (!resourceProvider.has_info()) {
+ continue;
+ }
+
+ Try<UUID> resourceVersion =
+ UUID::fromBytes(resourceProvider.resource_version_uuid());
+
+ CHECK_SOME(resourceVersion);
+
+ CHECK(resourceProvider.info().has_id());
+
+ const ResourceProviderID& resourceProviderId =
+ resourceProvider.info().id();
+
+ CHECK(!resourceVersions.contains(resourceProviderId));
+ resourceVersions.insert({resourceProviderId, resourceVersion.get()});
+ }
+
updated = updated || slave->resourceVersions != resourceVersions;
slave->resourceVersions = resourceVersions;
}
// Check if the known offer operations for this agent changed.
- updated =
- updated ||
- (slave->offerOperations.empty() && message.has_offer_operations()) ||
- (!slave->offerOperations.empty() && !message.has_offer_operations());
- if (!updated) {
- const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
- hashset<UUID> receivedOfferOperations;
+ const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
+ hashset<UUID> receivedOfferOperations;
+
+ foreach (
+ const OfferOperation& operation,
+ message.offer_operations().operations()) {
+ Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+ CHECK_SOME(operationUuid);
+ receivedOfferOperations.insert(operationUuid.get());
+ }
+ if (message.has_resource_providers()) {
foreach (
- const OfferOperation& operation,
- message.offer_operations().operations()) {
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ foreach (
+ const OfferOperation& operation,
+ resourceProvider.operations().operations()) {
Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
CHECK_SOME(operationUuid);
receivedOfferOperations.insert(operationUuid.get());
+ }
}
-
- updated = updated || knownOfferOperations != receivedOfferOperations;
}
+ updated = updated || knownOfferOperations != receivedOfferOperations;
+
if (!updated) {
LOG(INFO) << "Ignoring update on agent " << *slave
<< " as it reports no changes";
@@ -7302,6 +7332,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
Option<Resources> newTotal;
Option<hashmap<UUID, OfferOperation>> oldOfferOperations;
Option<hashmap<UUID, OfferOperation>> newOfferOperations;
+ Option<ResourceProviderInfo> info;
};
// We store information on the different `ResourceProvider`s on this agent in
@@ -7313,6 +7344,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
// Group the resources and operation updates by resource provider.
{
+ // Process known resources.
auto groupResourcesByProviderId = [](const Resources& resources) {
hashmap<Option<ResourceProviderID>, Resources> result;
@@ -7335,14 +7367,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
resourceProviders[providerId].oldTotal = resources;
}
- foreachpair (
- const Option<ResourceProviderID>& providerId,
- const Resources& resources,
- groupResourcesByProviderId(newSlaveResources)) {
- // Implicitly create a new record if none exists.
- resourceProviders[providerId].newTotal = resources;
- }
-
+ // Process known offer operations.
foreachpair (
const UUID& uuid,
OfferOperation* operation,
@@ -7370,36 +7395,55 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
.oldOfferOperations->emplace(uuid, *operation);
}
- // Process received offer operations.
+ // Explicitly add an entry for received agent resources.
+ resourceProviders[None()].newTotal =
+ newSlaveResources.filter(agentResources);
+
+ // Process received agent offer operations.
+ resourceProviders[None()].newOfferOperations =
+ hashmap<UUID, OfferOperation>();
+
foreach (
const OfferOperation& operation,
message.offer_operations().operations()) {
- Result<ResourceProviderID> providerId_ =
- getResourceProviderId(operation.info());
-
- CHECK(!providerId_.isError())
- << "Failed to extract resource provider id from known operation: "
- << providerId_.error();
-
- Option<ResourceProviderID> providerId =
- providerId_.isSome()
- ? providerId_.get()
- : Option<ResourceProviderID>::none();
-
- // Set up an init empty list of new operations. We might
- // create a record for this resource provider if needed.
- if (resourceProviders[providerId].newOfferOperations.isNone()) {
- resourceProviders.at(providerId).newOfferOperations =
- hashmap<UUID, OfferOperation>();
- }
-
Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
CHECK_SOME(uuid) << "Could not deserialize operation id when reconciling "
"offer operations";
- resourceProviders.at(providerId)
+ resourceProviders.at(None())
.newOfferOperations->emplace(uuid.get(), operation);
}
+
+ // Process explicitly received resource provider information.
+ if (message.has_resource_providers()) {
+ foreach (
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ CHECK(resourceProvider.has_info());
+ CHECK(resourceProvider.info().has_id());
+
+ ResourceProvider& provider =
+ resourceProviders[resourceProvider.info().id()];
+
+ provider.info = resourceProvider.info();
+
+ provider.newTotal = resourceProvider.total_resources();
+ if (provider.newOfferOperations.isNone()) {
+ provider.newOfferOperations = hashmap<UUID, OfferOperation>();
+ }
+
+ foreach (
+ const OfferOperation& operation,
+ resourceProvider.operations().operations()) {
+ Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+ CHECK_SOME(uuid)
+ << "Could not deserialize operation id when reconciling "
+ "offer operations";
+
+ provider.newOfferOperations->emplace(uuid.get(), operation);
+ }
+ }
+ }
}
// Check invariants of the received update.
@@ -7408,10 +7452,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
const Option<ResourceProviderID>& providerId,
const ResourceProvider& provider,
resourceProviders) {
- const bool isNewResourceProvider =
- provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
-
- if (!isNewResourceProvider) {
+ if (providerId.isSome() &&
+ slave->resourceProviders.contains(providerId.get())) {
// For known resource providers the master should always know at least
// as many non-terminal offer operations as the agent. While an
// operation might get lost on the way to the agent or resource
@@ -7494,9 +7536,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
const Option<ResourceProviderID>& providerId,
const ResourceProvider& provider,
resourceProviders) {
- const bool isNewResourceProvider =
- provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
-
// Below we only add offer operations to our state from resource providers
// which are unknown, or possibly remove them for known resource providers.
// This works since the master should always known more offer operations of
@@ -7517,14 +7556,16 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
// new (terminal) operations when observing messages from status
// update managers to frameworks.
- if (isNewResourceProvider) {
- // If this is a not previously seen resource provider with
- // operations we had a master failover. Add the resources and
- // operations to our state.
- CHECK_SOME(providerId);
+ if (providerId.isSome() &&
+ !slave->resourceProviders.contains(providerId.get())) {
+ // If this is a not previously seen resource provider we had a master
+ // failover. Add the resources and operations to our state.
CHECK_SOME(provider.newTotal);
CHECK(!slave->totalResources.contains(provider.newTotal.get()));
+ CHECK_SOME(provider.info);
+ slave->resourceProviders.insert({providerId.get(), provider.info.get()});
+
slave->totalResources += provider.newTotal.get();
hashmap<FrameworkID, Resources> usedByOperations;
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1f5daae..5c26f20 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -274,6 +274,7 @@ struct Slave
SlaveObserver* observer;
hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+ hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviders;
private:
Slave(const Slave&); // No copying.
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 54d8bcc..ee920c8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1321,45 +1321,14 @@ void Slave::registered(
break;
}
- // Send the latest total, including resources from resource providers. We send
- // this message here as a resource provider might have registered with the
- // agent between recovery completion and agent registration.
- bool sendUpdateSlaveMessage = false;
+ // If this agent can support resource providers or has had any oversubscribed
+ // resources set, send an `UpdateSlaveMessage` to the master to inform it of a
+ // possible changes between completion of recovery and agent registration.
+ if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
+ UpdateSlaveMessage message = generateUpdateSlaveMessage();
- UpdateSlaveMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(resourceVersions));
-
- if (capabilities.resourceProvider) {
- LOG(INFO) << "Forwarding total resources " << totalResources;
-
- message.mutable_resource_categories()->set_total(true);
- message.mutable_total_resources()->CopyFrom(totalResources);
-
- UpdateSlaveMessage::OfferOperations* operations =
- message.mutable_offer_operations();
-
- foreachvalue (const OfferOperation* operation, offerOperations) {
- operations->add_operations()->CopyFrom(*operation);
- }
-
- sendUpdateSlaveMessage = true;
- }
-
- // Send the latest estimate for oversubscribed resources.
- if (oversubscribedResources.isSome()) {
- LOG(INFO) << "Forwarding total oversubscribed resources "
- << oversubscribedResources.get();
+ LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
- message.mutable_resource_categories()->set_oversubscribed(true);
- message.mutable_oversubscribed_resources()->CopyFrom(
- oversubscribedResources.get());
-
- sendUpdateSlaveMessage = true;
- }
-
- if (sendUpdateSlaveMessage) {
send(master.get(), message);
}
}
@@ -1433,46 +1402,13 @@ void Slave::reregistered(
return;
}
- // Send the latest total, including resources from resource providers. We send
- // this message here as a resource provider might have registered with the
- // agent between recovery completion and agent registration.
- bool sendUpdateSlaveMessage = false;
-
- UpdateSlaveMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
-
- message.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(resourceVersions));
-
- if (capabilities.resourceProvider) {
- LOG(INFO) << "Forwarding total resources " << totalResources;
-
- message.mutable_resource_categories()->set_total(true);
- message.mutable_total_resources()->CopyFrom(totalResources);
-
- UpdateSlaveMessage::OfferOperations* operations =
- message.mutable_offer_operations();
+ // If this agent can support resource providers or has had any oversubscribed
+ // resources set, send an `UpdateSlaveMessage` to the master to inform it of a
+ // possible changes between completion of recovery and agent registration.
+ if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
+ UpdateSlaveMessage message = generateUpdateSlaveMessage();
- foreachvalue (const OfferOperation* operation, offerOperations) {
- operations->add_operations()->CopyFrom(*operation);
- }
-
- sendUpdateSlaveMessage = true;
- }
-
- // Send the latest estimate for oversubscribed resources.
- if (oversubscribedResources.isSome()) {
- LOG(INFO) << "Forwarding total oversubscribed resources "
- << oversubscribedResources.get();
-
- message.mutable_resource_categories()->set_oversubscribed(true);
- message.mutable_oversubscribed_resources()->CopyFrom(
- oversubscribedResources.get());
-
- sendUpdateSlaveMessage = true;
- }
-
- if (sendUpdateSlaveMessage) {
+ LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
send(master.get(), message);
}
@@ -7020,10 +6956,17 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
// Add oversubscribable resources to the total.
oversubscribed += oversubscribable.get();
+ // Remember the previous amount of oversubscribed resources.
+ const Option<Resources> previousOversubscribedResources =
+ oversubscribedResources;
+
+ // Update the estimate.
+ oversubscribedResources = oversubscribed;
+
// Only forward the estimate if it's different from the previous
// estimate. We also send this whenever we get (re-)registered
// (i.e. whenever we transition into the RUNNING state).
- if (state == RUNNING && oversubscribedResources != oversubscribed) {
+ if (state == RUNNING && previousOversubscribedResources != oversubscribed) {
LOG(INFO) << "Forwarding total oversubscribed resources "
<< oversubscribed;
@@ -7033,23 +6976,14 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
// intervals updating the version could cause a lot of offer
// operation churn.
//
- // TODO(bbannier): Revisit this if we modify the operations
+ // TODO(bbannier): Revisit this if we modify the operations
// possible on oversubscribed resources.
- UpdateSlaveMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_resource_categories()->set_oversubscribed(true);
- message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
-
- message.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(resourceVersions));
+ UpdateSlaveMessage message = generateOversubscribedUpdate();
CHECK_SOME(master);
send(master.get(), message);
}
-
- // Update the estimate.
- oversubscribedResources = oversubscribed;
}
delay(flags.oversubscribed_resources_interval,
@@ -7058,6 +6992,112 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
}
+UpdateSlaveMessage Slave::generateOversubscribedUpdate() const
+{
+ UpdateSlaveMessage message;
+
+ message.mutable_slave_id()->CopyFrom(info.id());
+ message.mutable_resource_categories()->set_oversubscribed(true);
+
+ if (oversubscribedResources.isSome()) {
+ message.mutable_oversubscribed_resources()->CopyFrom(
+ oversubscribedResources.get());
+ }
+
+ return message;
+}
+
+
+UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
+{
+ UpdateSlaveMessage message;
+
+ message.mutable_slave_id()->CopyFrom(info.id());
+
+ // Agent information (total resources, offer operations, resource
+ // versions) is not passed as part of some `ResourceProvider`, but
+ // globally in `UpdateStateMessage`.
+ //
+ // TODO(bbannier): Pass agent information as a resource provider.
+
+ // Process total resources.
+ hashmap<ResourceProviderID, UpdateSlaveMessage::ResourceProvider>
+ resourceProviders;
+
+ foreach (const Resource& resource, totalResources) {
+ if (resource.has_provider_id()) {
+ resourceProviders[resource.provider_id()].add_total_resources()->CopyFrom(
+ resource);
+ }
+ }
+
+ // Process offer operations.
+ UpdateSlaveMessage::OfferOperations* operations =
+ message.mutable_offer_operations();
+
+ foreachvalue (const OfferOperation* operation, offerOperations) {
+ Result<ResourceProviderID> resourceProviderId =
+ getResourceProviderId(operation->info());
+
+ if (resourceProviderId.isSome()) {
+ resourceProviders[resourceProviderId.get()]
+ .mutable_operations()
+ ->add_operations()
+ ->CopyFrom(*operation);
+ } else if (resourceProviderId.isNone()) {
+ operations->add_operations()->CopyFrom(*operation);
+ }
+ }
+
+ // Make sure 'offer_operations' is always set for resource providers.
+ foreachkey (
+ const ResourceProviderID& resourceProviderId,
+ resourceProviderInfos) {
+ resourceProviders[resourceProviderId].mutable_operations();
+ }
+
+ // Process resource versions.
+ CHECK(resourceVersions.contains(None()));
+ message.add_resource_version_uuids()->set_uuid(
+ resourceVersions.at(None()).toBytes());
+
+ foreachpair (
+ const ResourceProviderID& providerId,
+ UpdateSlaveMessage::ResourceProvider& provider,
+ resourceProviders) {
+ CHECK(resourceVersions.contains(providerId));
+ provider.set_resource_version_uuid(
+ resourceVersions.at(providerId).toBytes());
+
+ CHECK(resourceProviderInfos.contains(providerId));
+ provider.mutable_info()->CopyFrom(resourceProviderInfos.at(providerId));
+ }
+
+ // We only actually surface resource-provider related information if
+ // this agent is resource provider-capable.
+ if (capabilities.resourceProvider) {
+ list<UpdateSlaveMessage::ResourceProvider> resourceProviders_ =
+ resourceProviders.values();
+
+ message.mutable_resource_providers()->mutable_providers()->CopyFrom(
+ {resourceProviders_.begin(), resourceProviders_.end()});
+ }
+
+ return message;
+}
+
+
+UpdateSlaveMessage Slave::generateUpdateSlaveMessage() const
+{
+ UpdateSlaveMessage message;
+
+ message.MergeFrom(generateResourceProviderUpdate());
+ message.MergeFrom(generateOversubscribedUpdate());
+
+ return message;
+}
+
+
void Slave::handleResourceProviderMessage(
const Future<ResourceProviderMessage>& message)
{
@@ -7227,26 +7267,8 @@ void Slave::handleResourceProviderMessage(
if (updated) {
LOG(INFO) << "Forwarding new total resources " << totalResources;
- // Inform the master that the total capacity of this agent has
- // changed.
- UpdateSlaveMessage updateSlaveMessage;
- updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
- updateSlaveMessage.mutable_resource_categories()->set_total(true);
-
- updateSlaveMessage.mutable_total_resources()->CopyFrom(
- totalResources);
-
- updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(resourceVersions));
-
- UpdateSlaveMessage::OfferOperations* operations =
- updateSlaveMessage.mutable_offer_operations();
-
- foreachvalue (const OfferOperation* operation, offerOperations) {
- operations->add_operations()->CopyFrom(*operation);
- }
-
- send(master.get(), updateSlaveMessage);
+ // Inform the master about the update from the resource provider.
+ send(master.get(), generateResourceProviderUpdate());
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5cb0d55..b3a1e70 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -555,6 +555,13 @@ private:
void _forwardOversubscribed(
const process::Future<Resources>& oversubscribable);
+ // Helper functions to generate `UpdateSlaveMessage` for either just
+ // updates to oversubscribed resources, resource provider-related
+ // information, or both.
+ UpdateSlaveMessage generateOversubscribedUpdate() const;
+ UpdateSlaveMessage generateResourceProviderUpdate() const;
+ UpdateSlaveMessage generateUpdateSlaveMessage() const;
+
void handleResourceProviderMessage(
const process::Future<ResourceProviderMessage>& message);
http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 0714543..3852a57 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -28,6 +28,7 @@
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
#include <mesos/authentication/http/basic_authenticator_factory.hpp>
@@ -8751,54 +8752,39 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
resourceProviderResources.mutable_provider_id()->CopyFrom(resourceProviderId);
+ const string resourceVersionUuid = UUID::random().toBytes();
+
{
mesos::v1::resource_provider::Call call;
call.set_type(mesos::v1::resource_provider::Call::UPDATE_STATE);
-
call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
- auto updateState = call.mutable_update_state();
+ mesos::v1::resource_provider::Call::UpdateState* updateState =
+ call.mutable_update_state();
+
updateState->mutable_resources()->CopyFrom(
v1::Resources(resourceProviderResources));
- updateState->set_resource_version_uuid(UUID::random().toBytes());
+
+ updateState->set_resource_version_uuid(resourceVersionUuid);
resourceProvider.send(call);
}
AWAIT_READY(updateSlaveMessage);
- EXPECT_TRUE(updateSlaveMessage->has_resource_categories());
- EXPECT_TRUE(updateSlaveMessage->resource_categories().has_total());
- EXPECT_TRUE(updateSlaveMessage->resource_categories().total());
-
- // We expect the updated agent total to contain both the resources of the
- // agent and of the newly subscribed resource provider. The resources from the
- // resource provider have a matching `ResourceProviderId` set.
- Resources expectedResources =
- Resources::parse(slaveFlags.resources.get()).get();
- expectedResources += devolve(resourceProviderResources);
-
- EXPECT_EQ(expectedResources, updateSlaveMessage->total_resources());
-
- // The update from the agent should now contain both the agent and
- // resource provider resource versions.
- ASSERT_EQ(2u, updateSlaveMessage->resource_version_uuids_size());
-
- hashset<Option<ResourceProviderID>> resourceProviderIds;
- foreach (
- const ResourceVersionUUID& resourceVersionUuid,
- updateSlaveMessage->resource_version_uuids()) {
- resourceProviderIds.insert(
- resourceVersionUuid.has_resource_provider_id()
- ? resourceVersionUuid.resource_provider_id()
- : Option<ResourceProviderID>::none());
- }
+ ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+ ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
- hashset<Option<ResourceProviderID>> expectedResourceProviderIds;
- expectedResourceProviderIds.insert(None());
- expectedResourceProviderIds.insert(devolve(resourceProviderId));
+ const UpdateSlaveMessage::ResourceProvider& receivedResourceProvider =
+ updateSlaveMessage->resource_providers().providers(0);
- EXPECT_EQ(expectedResourceProviderIds, resourceProviderIds);
+ EXPECT_EQ(
+ Resources(devolve(resourceProviderResources)),
+ Resources(receivedResourceProvider.total_resources()));
+
+ EXPECT_EQ(
+ resourceVersionUuid,
+ receivedResourceProvider.resource_version_uuid());
}
@@ -9251,6 +9237,9 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
v1::Resources reserved = offer.resources();
reserved = reserved.filter(
[](const v1::Resource& r) { return r.has_provider_id(); });
+
+ ASSERT_FALSE(reserved.empty());
+
reserved = reserved.pushReservation(v1::createDynamicReservationInfo(
frameworkInfo.roles(0), frameworkInfo.principal()));
@@ -9320,13 +9309,16 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
protobuf::parseResourceVersions(
updateSlaveMessage->resource_version_uuids());
+
// The reserve operation will still be reported as pending since no offer
// operation status update has been received from the resource provider.
- ASSERT_TRUE(updateSlaveMessage->has_offer_operations());
- ASSERT_EQ(1, updateSlaveMessage->offer_operations().operations_size());
+ ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+ ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+ auto provider = updateSlaveMessage->resource_providers().providers(0);
+ ASSERT_TRUE(provider.has_operations());
+ ASSERT_EQ(1, provider.operations().operations_size());
- const OfferOperation& reserve =
- updateSlaveMessage->offer_operations().operations(0);
+ const OfferOperation& reserve = provider.operations().operations(0);
EXPECT_EQ(Offer::Operation::RESERVE, reserve.info().type());
ASSERT_TRUE(reserve.has_latest_status());
[4/5] mesos git commit: Removed 'total' from 'UpdateSlaveMessage'.
Posted by bb...@apache.org.
Removed 'total' from 'UpdateSlaveMessage'.
This field was added during the development leading up to version 1.5
in order to allow updates to agent-total resources in the context of
resource providers. It was never used in a released Mesos version.
Review: https://reviews.apache.org/r/64424
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bd55be10
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bd55be10
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bd55be10
Branch: refs/heads/master
Commit: bd55be1066b8f2553e10e9d5bd35a3582f659b00
Parents: 7cfa0e9
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Dec 6 14:27:34 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:41 2017 +0100
----------------------------------------------------------------------
src/master/master.cpp | 51 ++++++++++++++++++----------------------
src/messages/messages.proto | 16 +++++--------
2 files changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd55be10/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 8cf699b..3b2fcb8 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7211,12 +7211,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
message.resource_categories().oversubscribed()) ||
!message.has_resource_categories();
- const bool hasTotal =
- message.has_resource_categories() &&
- message.resource_categories().has_total() &&
- message.resource_categories().total();
-
- Option<Resources> newTotal;
Option<Resources> newOversubscribed;
// Make a copy of the message so we can transform its resources.
@@ -7226,21 +7220,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
message_.mutable_oversubscribed_resources(),
POST_RESERVATION_REFINEMENT);
- convertResourceFormat(
- message_.mutable_total_resources(),
- POST_RESERVATION_REFINEMENT);
-
- // Agents will send a total if a resource provider subscribed or went away.
- // Process resources and operations grouped by resource provider.
- if (hasTotal) {
- const Resources& totalResources = message_.total_resources();
-
- LOG(INFO) << "Received update of agent " << *slave << " with total"
- << " resources " << totalResources;
-
- newTotal = totalResources;
- }
-
if (hasOversubscribed) {
const Resources& oversubscribedResources =
message_.oversubscribed_resources();
@@ -7251,15 +7230,30 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
newOversubscribed = oversubscribedResources;
}
+ Resources newResourceProviderResources;
+ if (message.has_resource_providers()) {
+ foreach (
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ newResourceProviderResources += resourceProvider.total_resources();
+ }
+ }
+
auto agentResources = [](const Resource& resource) {
return !resource.has_provider_id();
};
const Resources newSlaveResources =
- newTotal.getOrElse(slave->totalResources.nonRevocable()) +
- newOversubscribed.getOrElse(slave->totalResources.revocable());
+ slave->totalResources.nonRevocable().filter(agentResources) +
+ newOversubscribed.getOrElse(
+ slave->totalResources.revocable().filter(agentResources)) +
+ newResourceProviderResources;
- bool updated = slave->totalResources != newSlaveResources;
+ // TODO(bbannier): We only need to update if any changes from
+ // resource providers are reported.
+ bool updated =
+ slave->totalResources != newSlaveResources ||
+ message.has_resource_providers();
// Agents which can support resource providers always update the
// master on their resource versions uuids via `UpdateSlaveMessage`.
@@ -7657,11 +7651,12 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
rescind = true;
}
- // For updates to the agent's total resources all offers are rescinded.
+ // Updates on resource providers can change the agent total
+ // resources, so we rescind all offers.
//
- // TODO(bbannier): Only rescind offers possibly containing removed
- // resources.
- if (hasTotal) {
+ // TODO(bbannier): Only rescind offers possibly containing
+ // affected resources.
+ if (message.has_resource_providers()) {
LOG(INFO) << "Removing offer " << offer->id() << " with resources "
<< offered << " on agent " << *slave;
http://git-wip-us.apache.org/repos/asf/mesos/blob/bd55be10/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 53db010..0b44ba2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -723,23 +723,19 @@ message UpdateSlaveMessage {
// TODO(bbannier): Consider passing agent information inside a
// `ResourceProvider` value as well where applicable.
- // This message can contain `oversubscribed_resources` or
- // `total_resources`. Callers are expected to set the respective
- // categories in `resource_categories` to denote which fields should
- // be examined. For backwards compatibility we interpret an unset
- // `category` field as if only oversubscribed was set.
- //
- // Oversubscribed resources must be revocable, while total resources
- // must be non-revocable.
+ // This message can contain `oversubscribed_resources` or resource
+ // providers. Callers are expected to set the `oversubscribed`
+ // category in `resource_categories` to denote whether the
+ // `oversubscribed_resources` field should be examined. For
+ // backwards compatibility we interpret an unset `category` field as
+ // if only oversubscribed was set.
message ResourceCategories {
- optional bool total = 1;
optional bool oversubscribed = 2;
}
optional ResourceCategories resource_categories = 5;
repeated Resource oversubscribed_resources = 2;
- repeated Resource total_resources = 4;
message OfferOperations {
repeated OfferOperation operations = 1;