You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/12/08 16:02:19 UTC

[1/5] mesos git commit: Added explicit resource provider information to 'UpdateSlaveMessage'.

Repository: mesos
Updated Branches:
  refs/heads/master f8771c7f5 -> a345c2a65


Added explicit resource provider information to 'UpdateSlaveMessage'.

The added fields will allow us to explicitly surface resource
provider-related information in the master.

Review: https://reviews.apache.org/r/64422


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b5d64414
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b5d64414
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b5d64414

Branch: refs/heads/master
Commit: b5d64414e0def423aa1beb5552391ea7b975fcc6
Parents: 396b927
Author: Benjamin Bannier <bb...@apache.org>
Authored: Tue Dec 5 13:39:07 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:40 2017 +0100

----------------------------------------------------------------------
 src/messages/messages.proto | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b5d64414/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index f711784..53db010 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -716,6 +716,13 @@ message ResourceVersionUUID {
 message UpdateSlaveMessage {
   required SlaveID slave_id = 1;
 
+  // Top-level fields in this message should only contain information
+  // on the agent itself; information on local resource providers is
+  // passed explicitly in the `resource_providers` message.
+  //
+  // TODO(bbannier): Consider passing agent information inside a
+  // `ResourceProvider` value as well where applicable.
+
   // This message can contain `oversubscribed_resources` or
   // `total_resources`. Callers are expected to set the respective
   // categories in `resource_categories` to denote which fields should
@@ -754,6 +761,21 @@ message UpdateSlaveMessage {
   // because this means the operation is operating on resources that
   // might have already been invalidated.
   repeated ResourceVersionUUID resource_version_uuids = 7;
+
+  // Describes an agent-local resource provider.
+  message ResourceProvider {
+    optional ResourceProviderInfo info = 1;
+    repeated Resource total_resources = 2;
+    required OfferOperations operations = 3;
+    required bytes resource_version_uuid = 4;
+  }
+
+  message ResourceProviders {
+    repeated ResourceProvider providers = 1;
+  }
+
+  // The list of all resource providers on this agent.
+  optional ResourceProviders resource_providers = 8;
 }
 
 


[2/5] mesos git commit: Fixed 'getResourceProviderId' for operations without resources.

Posted by bb...@apache.org.
Fixed 'getResourceProviderId' for operations without resources.

The code in 'getResourceProviderId' was written in a way assuming that
any operation containing a list resources always contained at least a
single resource. This is both incorrect since such operations pass
validation, and also neglects to check a crucial precondition since
accessing a non-existent element in this context would lead to a check
failure in protobuf and a subsequent abort.

This code updates 'getResourceProviderId' to check the precondition of
operations holding at least one element.

Review: https://reviews.apache.org/r/64425


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/396b927c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/396b927c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/396b927c

Branch: refs/heads/master
Commit: 396b927c19bc61a986c8c0fb4068af159fd1e87e
Parents: f8771c7
Author: Benjamin Bannier <bb...@apache.org>
Authored: Thu Dec 7 17:14:46 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:40 2017 +0100

----------------------------------------------------------------------
 src/common/resources_utils.cpp | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/396b927c/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 8e3d304..1676b72 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -234,15 +234,27 @@ Result<ResourceProviderID> getResourceProviderId(
     case Offer::Operation::LAUNCH_GROUP:
       return Error("Unexpected LAUNCH_GROUP operation");
     case Offer::Operation::RESERVE:
+      if (operation.reserve().resources().empty()) {
+        return Error("Operation contains no resources");
+      }
       resource = operation.reserve().resources(0);
       break;
     case Offer::Operation::UNRESERVE:
+      if (operation.unreserve().resources().empty()) {
+        return Error("Operation contains no resources");
+      }
       resource = operation.unreserve().resources(0);
       break;
     case Offer::Operation::CREATE:
+      if (operation.create().volumes().empty()) {
+        return Error("Operation contains no resources");
+      }
       resource = operation.create().volumes(0);
       break;
     case Offer::Operation::DESTROY:
+      if (operation.destroy().volumes().empty()) {
+        return Error("Operation contains no resources");
+      }
       resource = operation.destroy().volumes(0);
       break;
     case Offer::Operation::CREATE_VOLUME:


[5/5] mesos git commit: Only passed agent's resource version in top-level 'UpdateSlaveMessage'.

Posted by bb...@apache.org.
Only passed agent's resource version in top-level 'UpdateSlaveMessage'.

Since we have moved all resource provider-related information like
e.g., resource versions to an explicit field in 'UpdateSlaveMessage',
we now only pass agent information in top-level fields.

To model that we changed the top-level resource versions there from a
repeated field to a single optional field. This will allow us to in
the future pass agent information in the resource provider data
structure as well.

Review: https://reviews.apache.org/r/64430


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a345c2a6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a345c2a6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a345c2a6

Branch: refs/heads/master
Commit: a345c2a656598656d6172231e96cf3d5e7be0800
Parents: bd55be1
Author: Benjamin Bannier <bb...@apache.org>
Authored: Thu Dec 7 23:03:01 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:41 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp       | 13 ++++++++++---
 src/messages/messages.proto | 21 ++++++++++-----------
 src/slave/slave.cpp         |  3 +--
 src/tests/slave_tests.cpp   |  6 ------
 4 files changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a345c2a6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3b2fcb8..b3e074c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7256,10 +7256,17 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     message.has_resource_providers();
 
   // Agents which can support resource providers always update the
-  // master on their resource versions uuids via `UpdateSlaveMessage`.
+  // master on their resource versions uuid via `UpdateSlaveMessage`.
   if (slave->capabilities.resourceProvider) {
-    hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
-      protobuf::parseResourceVersions(message.resource_version_uuids());
+    CHECK(message.has_resource_version_uuid());
+
+    hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+
+    const Try<UUID> slaveResourceVersion =
+      UUID::fromBytes(message.resource_version_uuid());
+
+    CHECK_SOME(slaveResourceVersion);
+    resourceVersions.insert({None(), slaveResourceVersion.get()});
 
     foreach (
         const UpdateSlaveMessage::ResourceProvider& resourceProvider,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a345c2a6/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 0b44ba2..a13a641 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -746,17 +746,16 @@ message UpdateSlaveMessage {
   optional OfferOperations offer_operations = 6;
 
   // Used to establish the relationship between the operation and the
-  // resources that the operation is operating on. Each resource
-  // provider will keep a resource version UUID, and change it when
-  // it believes that the resources from this resource provider are
-  // out of sync from the master's view.  The master will keep track
-  // of the last known resource version UUID for each resource
-  // provider, and attach the resource version UUID in each operation
-  // it sends out. The resource provider should reject operations that
-  // have a different resource version UUID than that it maintains,
-  // because this means the operation is operating on resources that
-  // might have already been invalidated.
-  repeated ResourceVersionUUID resource_version_uuids = 7;
+  // resources that the operation is operating on. Each agent will
+  // keep a resource version UUID, and change it when it believes that
+  // its resources are out of sync from the master's view. The master
+  // will keep track of the last known resource version UUID for each
+  // resource provider or agent, and attach the resource version UUID
+  // in each operation it sends out. The resource provider or agent
+  // should reject operations that have a different resource version
+  // UUID than that it maintains, because this means the operation is
+  // operating on resources that might have already been invalidated.
+  optional bytes resource_version_uuid = 7;
 
   // Describes an agent-local resource provider.
   message ResourceProvider {

http://git-wip-us.apache.org/repos/asf/mesos/blob/a345c2a6/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ee920c8..373e393 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7058,8 +7058,7 @@ UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
 
   // Process resource versions.
   CHECK(resourceVersions.contains(None()));
-  message.add_resource_version_uuids()->set_uuid(
-      resourceVersions.at(None()).toBytes());
+  message.set_resource_version_uuid(resourceVersions.at(None()).toBytes());
 
   foreachpair (
       const ResourceProviderID& providerId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a345c2a6/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 3852a57..0fb2a63 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -9304,12 +9304,6 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
 
   AWAIT_READY(updateSlaveMessage);
 
-  // We expect to see the new resource provider resource version in
-  // the `UpdateSlaveMessage`.
-  hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
-    protobuf::parseResourceVersions(
-        updateSlaveMessage->resource_version_uuids());
-
   // The reserve operation will still be reported as pending since no offer
   // operation status update has been received from the resource provider.
   ASSERT_TRUE(updateSlaveMessage->has_resource_providers());


[3/5] mesos git commit: Explicitly passed resource-provider information in 'UpdateSlaveMessage'.

Posted by bb...@apache.org.
Explicitly passed resource-provider information in 'UpdateSlaveMessage'.

This patch changes the way resource-provider related information is
passed. Instead of aggregating all information from both the agent and
resource providers into global per-agent lists in
'UpdateSlaveMessage', with this patch we pass resource-provider
related information explicitly. We can in a subsequent patch surface
this information in e.g., the operator API.

Review: https://reviews.apache.org/r/64423


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7cfa0e92
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7cfa0e92
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7cfa0e92

Branch: refs/heads/master
Commit: 7cfa0e9206647ddd6217cd69090f5eb328a73529
Parents: b5d6441
Author: Benjamin Bannier <bb...@apache.org>
Authored: Thu Dec 7 11:05:05 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:41 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp     | 151 ++++++++++++++++----------
 src/master/master.hpp     |   1 +
 src/slave/slave.cpp       | 238 ++++++++++++++++++++++-------------------
 src/slave/slave.hpp       |   7 ++
 src/tests/slave_tests.cpp |  66 +++++-------
 5 files changed, 263 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5cba506..8cf699b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7241,9 +7241,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newTotal = totalResources;
   }
 
-  // Since `total` always overwrites an existing total, we apply
-  // `oversubscribed` after updating the total to be able to
-  // independently apply it regardless of whether `total` was sent.
   if (hasOversubscribed) {
     const Resources& oversubscribedResources =
       message_.oversubscribed_resources();
@@ -7254,6 +7251,10 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newOversubscribed = oversubscribedResources;
   }
 
+  auto agentResources = [](const Resource& resource) {
+    return !resource.has_provider_id();
+  };
+
   const Resources newSlaveResources =
     newTotal.getOrElse(slave->totalResources.nonRevocable()) +
     newOversubscribed.getOrElse(slave->totalResources.revocable());
@@ -7266,30 +7267,59 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
       protobuf::parseResourceVersions(message.resource_version_uuids());
 
+    foreach (
+        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+        message.resource_providers().providers()) {
+      if (!resourceProvider.has_info()) {
+        continue;
+      }
+
+      Try<UUID> resourceVersion =
+        UUID::fromBytes(resourceProvider.resource_version_uuid());
+
+      CHECK_SOME(resourceVersion);
+
+      CHECK(resourceProvider.info().has_id());
+
+      const ResourceProviderID& resourceProviderId =
+        resourceProvider.info().id();
+
+      CHECK(!resourceVersions.contains(resourceProviderId));
+      resourceVersions.insert({resourceProviderId, resourceVersion.get()});
+    }
+
     updated = updated || slave->resourceVersions != resourceVersions;
     slave->resourceVersions = resourceVersions;
   }
 
   // Check if the known offer operations for this agent changed.
-  updated =
-    updated ||
-    (slave->offerOperations.empty() && message.has_offer_operations()) ||
-    (!slave->offerOperations.empty() && !message.has_offer_operations());
-  if (!updated) {
-    const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
-    hashset<UUID> receivedOfferOperations;
+  const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
+  hashset<UUID> receivedOfferOperations;
+
+  foreach (
+      const OfferOperation& operation,
+      message.offer_operations().operations()) {
+    Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+    CHECK_SOME(operationUuid);
+    receivedOfferOperations.insert(operationUuid.get());
+  }
 
+  if (message.has_resource_providers()) {
     foreach (
-        const OfferOperation& operation,
-        message.offer_operations().operations()) {
+        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+        message.resource_providers().providers()) {
+      foreach (
+          const OfferOperation& operation,
+          resourceProvider.operations().operations()) {
         Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
         CHECK_SOME(operationUuid);
         receivedOfferOperations.insert(operationUuid.get());
+      }
     }
-
-    updated = updated || knownOfferOperations != receivedOfferOperations;
   }
 
+  updated = updated || knownOfferOperations != receivedOfferOperations;
+
   if (!updated) {
     LOG(INFO) << "Ignoring update on agent " << *slave
               << " as it reports no changes";
@@ -7302,6 +7332,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     Option<Resources> newTotal;
     Option<hashmap<UUID, OfferOperation>> oldOfferOperations;
     Option<hashmap<UUID, OfferOperation>> newOfferOperations;
+    Option<ResourceProviderInfo> info;
   };
 
   // We store information on the different `ResourceProvider`s on this agent in
@@ -7313,6 +7344,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
 
   // Group the resources and operation updates by resource provider.
   {
+    // Process known resources.
     auto groupResourcesByProviderId = [](const Resources& resources) {
       hashmap<Option<ResourceProviderID>, Resources> result;
 
@@ -7335,14 +7367,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       resourceProviders[providerId].oldTotal = resources;
     }
 
-    foreachpair (
-        const Option<ResourceProviderID>& providerId,
-        const Resources& resources,
-        groupResourcesByProviderId(newSlaveResources)) {
-      // Implicitly create a new record if none exists.
-      resourceProviders[providerId].newTotal = resources;
-    }
-
+    // Process known offer operations.
     foreachpair (
         const UUID& uuid,
         OfferOperation* operation,
@@ -7370,36 +7395,55 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
         .oldOfferOperations->emplace(uuid, *operation);
     }
 
-    // Process received offer operations.
+    // Explicitly add an entry for received agent resources.
+    resourceProviders[None()].newTotal =
+      newSlaveResources.filter(agentResources);
+
+    // Process received agent offer operations.
+    resourceProviders[None()].newOfferOperations =
+      hashmap<UUID, OfferOperation>();
+
     foreach (
         const OfferOperation& operation,
         message.offer_operations().operations()) {
-      Result<ResourceProviderID> providerId_ =
-        getResourceProviderId(operation.info());
-
-      CHECK(!providerId_.isError())
-        << "Failed to extract resource provider id from known operation: "
-        << providerId_.error();
-
-      Option<ResourceProviderID> providerId =
-        providerId_.isSome()
-          ? providerId_.get()
-          : Option<ResourceProviderID>::none();
-
-      // Set up an init empty list of new operations. We might
-      // create a record for this resource provider if needed.
-      if (resourceProviders[providerId].newOfferOperations.isNone()) {
-        resourceProviders.at(providerId).newOfferOperations =
-          hashmap<UUID, OfferOperation>();
-      }
-
       Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
       CHECK_SOME(uuid) << "Could not deserialize operation id when reconciling "
                           "offer operations";
 
-      resourceProviders.at(providerId)
+      resourceProviders.at(None())
         .newOfferOperations->emplace(uuid.get(), operation);
     }
+
+    // Process explicitly received resource provider information.
+    if (message.has_resource_providers()) {
+      foreach (
+          const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+          message.resource_providers().providers()) {
+        CHECK(resourceProvider.has_info());
+        CHECK(resourceProvider.info().has_id());
+
+        ResourceProvider& provider =
+          resourceProviders[resourceProvider.info().id()];
+
+        provider.info = resourceProvider.info();
+
+        provider.newTotal = resourceProvider.total_resources();
+        if (provider.newOfferOperations.isNone()) {
+          provider.newOfferOperations = hashmap<UUID, OfferOperation>();
+        }
+
+        foreach (
+            const OfferOperation& operation,
+            resourceProvider.operations().operations()) {
+          Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+          CHECK_SOME(uuid)
+            << "Could not deserialize operation id when reconciling "
+            "offer operations";
+
+          provider.newOfferOperations->emplace(uuid.get(), operation);
+        }
+      }
+    }
   }
 
   // Check invariants of the received update.
@@ -7408,10 +7452,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
         const Option<ResourceProviderID>& providerId,
         const ResourceProvider& provider,
         resourceProviders) {
-      const bool isNewResourceProvider =
-        provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
-
-      if (!isNewResourceProvider) {
+      if (providerId.isSome() &&
+          slave->resourceProviders.contains(providerId.get())) {
         // For known resource providers the master should always know at least
         // as many non-terminal offer operations as the agent. While an
         // operation might get lost on the way to the agent or resource
@@ -7494,9 +7536,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       const Option<ResourceProviderID>& providerId,
       const ResourceProvider& provider,
       resourceProviders) {
-    const bool isNewResourceProvider =
-      provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
-
     // Below we only add offer operations to our state from resource providers
     // which are unknown, or possibly remove them for known resource providers.
     // This works since the master should always known more offer operations of
@@ -7517,14 +7556,16 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     // new (terminal) operations when observing messages from status
     // update managers to frameworks.
 
-    if (isNewResourceProvider) {
-      // If this is a not previously seen resource provider with
-      // operations we had a master failover. Add the resources and
-      // operations to our state.
-      CHECK_SOME(providerId);
+    if (providerId.isSome() &&
+        !slave->resourceProviders.contains(providerId.get())) {
+      // If this is a not previously seen resource provider we had a master
+      // failover. Add the resources and operations to our state.
       CHECK_SOME(provider.newTotal);
       CHECK(!slave->totalResources.contains(provider.newTotal.get()));
 
+      CHECK_SOME(provider.info);
+      slave->resourceProviders.insert({providerId.get(), provider.info.get()});
+
       slave->totalResources += provider.newTotal.get();
 
       hashmap<FrameworkID, Resources> usedByOperations;

http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1f5daae..5c26f20 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -274,6 +274,7 @@ struct Slave
   SlaveObserver* observer;
 
   hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+  hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviders;
 
 private:
   Slave(const Slave&);              // No copying.

http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 54d8bcc..ee920c8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1321,45 +1321,14 @@ void Slave::registered(
       break;
   }
 
-  // Send the latest total, including resources from resource providers. We send
-  // this message here as a resource provider might have registered with the
-  // agent between recovery completion and agent registration.
-  bool sendUpdateSlaveMessage = false;
+  // If this agent can support resource providers or has had any oversubscribed
+  // resources set, send an `UpdateSlaveMessage` to the master to inform it of a
+  // possible changes between completion of recovery and agent registration.
+  if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
+    UpdateSlaveMessage message = generateUpdateSlaveMessage();
 
-  UpdateSlaveMessage message;
-  message.mutable_slave_id()->CopyFrom(info.id());
-  message.mutable_resource_version_uuids()->CopyFrom(
-      protobuf::createResourceVersions(resourceVersions));
-
-  if (capabilities.resourceProvider) {
-    LOG(INFO) << "Forwarding total resources " << totalResources;
-
-    message.mutable_resource_categories()->set_total(true);
-    message.mutable_total_resources()->CopyFrom(totalResources);
-
-    UpdateSlaveMessage::OfferOperations* operations =
-      message.mutable_offer_operations();
-
-    foreachvalue (const OfferOperation* operation, offerOperations) {
-      operations->add_operations()->CopyFrom(*operation);
-    }
-
-    sendUpdateSlaveMessage = true;
-  }
-
-  // Send the latest estimate for oversubscribed resources.
-  if (oversubscribedResources.isSome()) {
-    LOG(INFO) << "Forwarding total oversubscribed resources "
-              << oversubscribedResources.get();
+    LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
 
-    message.mutable_resource_categories()->set_oversubscribed(true);
-    message.mutable_oversubscribed_resources()->CopyFrom(
-        oversubscribedResources.get());
-
-    sendUpdateSlaveMessage = true;
-  }
-
-  if (sendUpdateSlaveMessage) {
     send(master.get(), message);
   }
 }
@@ -1433,46 +1402,13 @@ void Slave::reregistered(
       return;
   }
 
-  // Send the latest total, including resources from resource providers. We send
-  // this message here as a resource provider might have registered with the
-  // agent between recovery completion and agent registration.
-  bool sendUpdateSlaveMessage = false;
-
-  UpdateSlaveMessage message;
-  message.mutable_slave_id()->CopyFrom(info.id());
-
-  message.mutable_resource_version_uuids()->CopyFrom(
-      protobuf::createResourceVersions(resourceVersions));
-
-  if (capabilities.resourceProvider) {
-    LOG(INFO) << "Forwarding total resources " << totalResources;
-
-    message.mutable_resource_categories()->set_total(true);
-    message.mutable_total_resources()->CopyFrom(totalResources);
-
-    UpdateSlaveMessage::OfferOperations* operations =
-      message.mutable_offer_operations();
+  // If this agent can support resource providers or has had any oversubscribed
+  // resources set, send an `UpdateSlaveMessage` to the master to inform it of a
+  // possible changes between completion of recovery and agent registration.
+  if (capabilities.resourceProvider || oversubscribedResources.isSome()) {
+    UpdateSlaveMessage message = generateUpdateSlaveMessage();
 
-    foreachvalue (const OfferOperation* operation, offerOperations) {
-      operations->add_operations()->CopyFrom(*operation);
-    }
-
-    sendUpdateSlaveMessage = true;
-  }
-
-  // Send the latest estimate for oversubscribed resources.
-  if (oversubscribedResources.isSome()) {
-    LOG(INFO) << "Forwarding total oversubscribed resources "
-              << oversubscribedResources.get();
-
-    message.mutable_resource_categories()->set_oversubscribed(true);
-    message.mutable_oversubscribed_resources()->CopyFrom(
-        oversubscribedResources.get());
-
-    sendUpdateSlaveMessage = true;
-  }
-
-  if (sendUpdateSlaveMessage) {
+    LOG(INFO) << "Forwarding agent update " << JSON::protobuf(message);
     send(master.get(), message);
   }
 
@@ -7020,10 +6956,17 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
     // Add oversubscribable resources to the total.
     oversubscribed += oversubscribable.get();
 
+    // Remember the previous amount of oversubscribed resources.
+    const Option<Resources> previousOversubscribedResources =
+      oversubscribedResources;
+
+    // Update the estimate.
+    oversubscribedResources = oversubscribed;
+
     // Only forward the estimate if it's different from the previous
     // estimate. We also send this whenever we get (re-)registered
     // (i.e. whenever we transition into the RUNNING state).
-    if (state == RUNNING && oversubscribedResources != oversubscribed) {
+    if (state == RUNNING && previousOversubscribedResources != oversubscribed) {
       LOG(INFO) << "Forwarding total oversubscribed resources "
                 << oversubscribed;
 
@@ -7033,23 +6976,14 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
       // intervals updating the version could cause a lot of offer
       // operation churn.
       //
-      // TODO(bbannier): Revisit this if  we modify the operations
+      // TODO(bbannier): Revisit this if we modify the operations
       // possible on oversubscribed resources.
 
-      UpdateSlaveMessage message;
-      message.mutable_slave_id()->CopyFrom(info.id());
-      message.mutable_resource_categories()->set_oversubscribed(true);
-      message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
-
-      message.mutable_resource_version_uuids()->CopyFrom(
-          protobuf::createResourceVersions(resourceVersions));
+      UpdateSlaveMessage message = generateOversubscribedUpdate();
 
       CHECK_SOME(master);
       send(master.get(), message);
     }
-
-    // Update the estimate.
-    oversubscribedResources = oversubscribed;
   }
 
   delay(flags.oversubscribed_resources_interval,
@@ -7058,6 +6992,112 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 }
 
 
+UpdateSlaveMessage Slave::generateOversubscribedUpdate() const
+{
+  UpdateSlaveMessage message;
+
+  message.mutable_slave_id()->CopyFrom(info.id());
+  message.mutable_resource_categories()->set_oversubscribed(true);
+
+  if (oversubscribedResources.isSome()) {
+    message.mutable_oversubscribed_resources()->CopyFrom(
+        oversubscribedResources.get());
+  }
+
+  return message;
+}
+
+
+UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
+{
+  UpdateSlaveMessage message;
+
+  message.mutable_slave_id()->CopyFrom(info.id());
+
+  // Agent information (total resources, offer operations, resource
+  // versions) is not passed as part of some `ResourceProvider`, but
+  // globally in `UpdateStateMessage`.
+  //
+  // TODO(bbannier): Pass agent information as a resource provider.
+
+  // Process total resources.
+  hashmap<ResourceProviderID, UpdateSlaveMessage::ResourceProvider>
+    resourceProviders;
+
+  foreach (const Resource& resource, totalResources) {
+    if (resource.has_provider_id()) {
+      resourceProviders[resource.provider_id()].add_total_resources()->CopyFrom(
+          resource);
+    }
+  }
+
+  // Process offer operations.
+  UpdateSlaveMessage::OfferOperations* operations =
+    message.mutable_offer_operations();
+
+  foreachvalue (const OfferOperation* operation, offerOperations) {
+    Result<ResourceProviderID> resourceProviderId =
+      getResourceProviderId(operation->info());
+
+    if (resourceProviderId.isSome()) {
+      resourceProviders[resourceProviderId.get()]
+        .mutable_operations()
+        ->add_operations()
+        ->CopyFrom(*operation);
+    } else if (resourceProviderId.isNone()) {
+      operations->add_operations()->CopyFrom(*operation);
+    }
+  }
+
+  // Make sure 'offer_operations' is always set for resource providers.
+  foreachkey (
+      const ResourceProviderID& resourceProviderId,
+      resourceProviderInfos) {
+    resourceProviders[resourceProviderId].mutable_operations();
+  }
+
+  // Process resource versions.
+  CHECK(resourceVersions.contains(None()));
+  message.add_resource_version_uuids()->set_uuid(
+      resourceVersions.at(None()).toBytes());
+
+  foreachpair (
+      const ResourceProviderID& providerId,
+      UpdateSlaveMessage::ResourceProvider& provider,
+      resourceProviders) {
+    CHECK(resourceVersions.contains(providerId));
+    provider.set_resource_version_uuid(
+        resourceVersions.at(providerId).toBytes());
+
+    CHECK(resourceProviderInfos.contains(providerId));
+    provider.mutable_info()->CopyFrom(resourceProviderInfos.at(providerId));
+  }
+
+  // We only actually surface resource-provider related information if
+  // this agent is resource provider-capable.
+  if (capabilities.resourceProvider) {
+    list<UpdateSlaveMessage::ResourceProvider> resourceProviders_ =
+      resourceProviders.values();
+
+    message.mutable_resource_providers()->mutable_providers()->CopyFrom(
+        {resourceProviders_.begin(), resourceProviders_.end()});
+  }
+
+  return message;
+}
+
+
+UpdateSlaveMessage Slave::generateUpdateSlaveMessage() const
+{
+  UpdateSlaveMessage message;
+
+  message.MergeFrom(generateResourceProviderUpdate());
+  message.MergeFrom(generateOversubscribedUpdate());
+
+  return message;
+}
+
+
 void Slave::handleResourceProviderMessage(
     const Future<ResourceProviderMessage>& message)
 {
@@ -7227,26 +7267,8 @@ void Slave::handleResourceProviderMessage(
           if (updated) {
             LOG(INFO) << "Forwarding new total resources " << totalResources;
 
-            // Inform the master that the total capacity of this agent has
-            // changed.
-            UpdateSlaveMessage updateSlaveMessage;
-            updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
-            updateSlaveMessage.mutable_resource_categories()->set_total(true);
-
-            updateSlaveMessage.mutable_total_resources()->CopyFrom(
-                totalResources);
-
-            updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
-                protobuf::createResourceVersions(resourceVersions));
-
-            UpdateSlaveMessage::OfferOperations* operations =
-              updateSlaveMessage.mutable_offer_operations();
-
-            foreachvalue (const OfferOperation* operation, offerOperations) {
-              operations->add_operations()->CopyFrom(*operation);
-            }
-
-            send(master.get(), updateSlaveMessage);
+            // Inform the master about the update from the resource provider.
+            send(master.get(), generateResourceProviderUpdate());
 
             break;
           }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5cb0d55..b3a1e70 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -555,6 +555,13 @@ private:
   void _forwardOversubscribed(
       const process::Future<Resources>& oversubscribable);
 
+  // Helper functions to generate `UpdateSlaveMessage` for either just
+  // updates to oversubscribed resources, resource provider-related
+  // information, or both.
+  UpdateSlaveMessage generateOversubscribedUpdate() const;
+  UpdateSlaveMessage generateResourceProviderUpdate() const;
+  UpdateSlaveMessage generateUpdateSlaveMessage() const;
+
   void handleResourceProviderMessage(
       const process::Future<ResourceProviderMessage>& message);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7cfa0e92/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 0714543..3852a57 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -28,6 +28,7 @@
 
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
+#include <mesos/type_utils.hpp>
 
 #include <mesos/authentication/http/basic_authenticator_factory.hpp>
 
@@ -8751,54 +8752,39 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
 
   resourceProviderResources.mutable_provider_id()->CopyFrom(resourceProviderId);
 
+  const string resourceVersionUuid = UUID::random().toBytes();
+
   {
     mesos::v1::resource_provider::Call call;
     call.set_type(mesos::v1::resource_provider::Call::UPDATE_STATE);
-
     call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
 
-    auto updateState = call.mutable_update_state();
+    mesos::v1::resource_provider::Call::UpdateState* updateState =
+      call.mutable_update_state();
+
     updateState->mutable_resources()->CopyFrom(
         v1::Resources(resourceProviderResources));
-    updateState->set_resource_version_uuid(UUID::random().toBytes());
+
+    updateState->set_resource_version_uuid(resourceVersionUuid);
 
     resourceProvider.send(call);
   }
 
   AWAIT_READY(updateSlaveMessage);
 
-  EXPECT_TRUE(updateSlaveMessage->has_resource_categories());
-  EXPECT_TRUE(updateSlaveMessage->resource_categories().has_total());
-  EXPECT_TRUE(updateSlaveMessage->resource_categories().total());
-
-  // We expect the updated agent total to contain both the resources of the
-  // agent and of the newly subscribed resource provider. The resources from the
-  // resource provider have a matching `ResourceProviderId` set.
-  Resources expectedResources =
-    Resources::parse(slaveFlags.resources.get()).get();
-  expectedResources += devolve(resourceProviderResources);
-
-  EXPECT_EQ(expectedResources, updateSlaveMessage->total_resources());
-
-  // The update from the agent should now contain both the agent and
-  // resource provider resource versions.
-  ASSERT_EQ(2u, updateSlaveMessage->resource_version_uuids_size());
-
-  hashset<Option<ResourceProviderID>> resourceProviderIds;
-  foreach (
-      const ResourceVersionUUID& resourceVersionUuid,
-      updateSlaveMessage->resource_version_uuids()) {
-    resourceProviderIds.insert(
-        resourceVersionUuid.has_resource_provider_id()
-          ? resourceVersionUuid.resource_provider_id()
-          : Option<ResourceProviderID>::none());
-  }
+  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
 
-  hashset<Option<ResourceProviderID>> expectedResourceProviderIds;
-  expectedResourceProviderIds.insert(None());
-  expectedResourceProviderIds.insert(devolve(resourceProviderId));
+  const UpdateSlaveMessage::ResourceProvider& receivedResourceProvider =
+    updateSlaveMessage->resource_providers().providers(0);
 
-  EXPECT_EQ(expectedResourceProviderIds, resourceProviderIds);
+  EXPECT_EQ(
+      Resources(devolve(resourceProviderResources)),
+      Resources(receivedResourceProvider.total_resources()));
+
+  EXPECT_EQ(
+      resourceVersionUuid,
+      receivedResourceProvider.resource_version_uuid());
 }
 
 
@@ -9251,6 +9237,9 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
     v1::Resources reserved = offer.resources();
     reserved = reserved.filter(
         [](const v1::Resource& r) { return r.has_provider_id(); });
+
+    ASSERT_FALSE(reserved.empty());
+
     reserved = reserved.pushReservation(v1::createDynamicReservationInfo(
         frameworkInfo.roles(0), frameworkInfo.principal()));
 
@@ -9320,13 +9309,16 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
   hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
     protobuf::parseResourceVersions(
         updateSlaveMessage->resource_version_uuids());
+
   // The reserve operation will still be reported as pending since no offer
   // operation status update has been received from the resource provider.
-  ASSERT_TRUE(updateSlaveMessage->has_offer_operations());
-  ASSERT_EQ(1, updateSlaveMessage->offer_operations().operations_size());
+  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+  auto provider = updateSlaveMessage->resource_providers().providers(0);
+  ASSERT_TRUE(provider.has_operations());
+  ASSERT_EQ(1, provider.operations().operations_size());
 
-  const OfferOperation& reserve =
-    updateSlaveMessage->offer_operations().operations(0);
+  const OfferOperation& reserve = provider.operations().operations(0);
 
   EXPECT_EQ(Offer::Operation::RESERVE, reserve.info().type());
   ASSERT_TRUE(reserve.has_latest_status());


[4/5] mesos git commit: Removed 'total' from 'UpdateSlaveMessage'.

Posted by bb...@apache.org.
Removed 'total' from 'UpdateSlaveMessage'.

This field was added during the development leading up to version 1.5
in order to allow updates to agent-total resources in the context of
resource providers. It was never used in a released Mesos version.

Review: https://reviews.apache.org/r/64424


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bd55be10
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bd55be10
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bd55be10

Branch: refs/heads/master
Commit: bd55be1066b8f2553e10e9d5bd35a3582f659b00
Parents: 7cfa0e9
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Dec 6 14:27:34 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 8 13:59:41 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp       | 51 ++++++++++++++++++----------------------
 src/messages/messages.proto | 16 +++++--------
 2 files changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bd55be10/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 8cf699b..3b2fcb8 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7211,12 +7211,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
      message.resource_categories().oversubscribed()) ||
     !message.has_resource_categories();
 
-  const bool hasTotal =
-    message.has_resource_categories() &&
-    message.resource_categories().has_total() &&
-    message.resource_categories().total();
-
-  Option<Resources> newTotal;
   Option<Resources> newOversubscribed;
 
   // Make a copy of the message so we can transform its resources.
@@ -7226,21 +7220,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       message_.mutable_oversubscribed_resources(),
       POST_RESERVATION_REFINEMENT);
 
-  convertResourceFormat(
-      message_.mutable_total_resources(),
-      POST_RESERVATION_REFINEMENT);
-
-  // Agents will send a total if a resource provider subscribed or went away.
-  // Process resources and operations grouped by resource provider.
-  if (hasTotal) {
-    const Resources& totalResources = message_.total_resources();
-
-    LOG(INFO) << "Received update of agent " << *slave << " with total"
-              << " resources " << totalResources;
-
-    newTotal = totalResources;
-  }
-
   if (hasOversubscribed) {
     const Resources& oversubscribedResources =
       message_.oversubscribed_resources();
@@ -7251,15 +7230,30 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newOversubscribed = oversubscribedResources;
   }
 
+  Resources newResourceProviderResources;
+  if (message.has_resource_providers()) {
+    foreach (
+        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+        message.resource_providers().providers()) {
+      newResourceProviderResources += resourceProvider.total_resources();
+    }
+  }
+
   auto agentResources = [](const Resource& resource) {
     return !resource.has_provider_id();
   };
 
   const Resources newSlaveResources =
-    newTotal.getOrElse(slave->totalResources.nonRevocable()) +
-    newOversubscribed.getOrElse(slave->totalResources.revocable());
+    slave->totalResources.nonRevocable().filter(agentResources) +
+    newOversubscribed.getOrElse(
+        slave->totalResources.revocable().filter(agentResources)) +
+    newResourceProviderResources;
 
-  bool updated = slave->totalResources != newSlaveResources;
+  // TODO(bbannier): We only need to update if any changes from
+  // resource providers are reported.
+  bool updated =
+    slave->totalResources != newSlaveResources ||
+    message.has_resource_providers();
 
   // Agents which can support resource providers always update the
   // master on their resource versions uuids via `UpdateSlaveMessage`.
@@ -7657,11 +7651,12 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       rescind = true;
     }
 
-    // For updates to the agent's total resources all offers are rescinded.
+    // Updates on resource providers can change the agent total
+    // resources, so we rescind all offers.
     //
-    // TODO(bbannier): Only rescind offers possibly containing removed
-    // resources.
-    if (hasTotal) {
+    // TODO(bbannier): Only rescind offers possibly containing
+    // affected resources.
+    if (message.has_resource_providers()) {
       LOG(INFO) << "Removing offer " << offer->id() << " with resources "
                 << offered << " on agent " << *slave;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bd55be10/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 53db010..0b44ba2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -723,23 +723,19 @@ message UpdateSlaveMessage {
   // TODO(bbannier): Consider passing agent information inside a
   // `ResourceProvider` value as well where applicable.
 
-  // This message can contain `oversubscribed_resources` or
-  // `total_resources`. Callers are expected to set the respective
-  // categories in `resource_categories` to denote which fields should
-  // be examined. For backwards compatibility we interpret an unset
-  // `category` field as if only oversubscribed was set.
-  //
-  // Oversubscribed resources must be revocable, while total resources
-  // must be non-revocable.
+  // This message can contain `oversubscribed_resources` or resource
+  // providers. Callers are expected to set the `oversubscribed`
+  // category in `resource_categories` to denote whether the
+  // `oversubscribed_resources` field should be examined. For
+  // backwards compatibility we interpret an unset `category` field as
+  // if only oversubscribed was set.
   message ResourceCategories {
-    optional bool total = 1;
     optional bool oversubscribed = 2;
   }
 
   optional ResourceCategories resource_categories = 5;
 
   repeated Resource oversubscribed_resources = 2;
-  repeated Resource total_resources = 4;
 
   message OfferOperations {
     repeated OfferOperation operations = 1;