You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/11/08 16:01:28 UTC

[7/9] mesos git commit: Synchronized agent resource versions via 'UpdateSlaveMessage'.

Synchronized agent resource versions via 'UpdateSlaveMessage'.

This commit introduces agent resource versions to the master and
agents. Agents are responsible for maintaining their resource
versions. The resource versions are synchronized with the master via
'UpdateSlaveMessage'.

Review: https://reviews.apache.org/r/63492/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51a15496
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51a15496
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51a15496

Branch: refs/heads/master
Commit: 51a1549611fbfcf1ad1a1eb5e3efe0f1221acb36
Parents: b1c9b46
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:52:08 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp | 46 ++++++++++++++++++++++++++++++++++++++
 src/common/protobuf_utils.hpp | 11 +++++++++
 src/master/master.cpp         | 10 +++++++++
 src/master/master.hpp         |  2 ++
 src/slave/slave.cpp           | 31 +++++++++++++++++++++++++
 src/slave/slave.hpp           |  1 +
 6 files changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 7a4b87b..5739a63 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -755,6 +755,52 @@ void stripAllocationInfo(Offer::Operation* operation)
 }
 
 
+RepeatedPtrField<ResourceVersionUUID> createResourceVersions(
+    const hashmap<Option<ResourceProviderID>, UUID>& resourceVersions)
+{
+  RepeatedPtrField<ResourceVersionUUID> result;
+
+  foreachpair (
+      const Option<ResourceProviderID>& resourceProviderId,
+      const UUID& uuid,
+      resourceVersions) {
+    ResourceVersionUUID* entry = result.Add();
+
+    if (resourceProviderId.isSome()) {
+      entry->mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+    }
+    entry->set_uuid(uuid.toBytes());
+  }
+
+  return result;
+}
+
+
+hashmap<Option<ResourceProviderID>, UUID> parseResourceVersions(
+    const RepeatedPtrField<ResourceVersionUUID>& resourceVersionUUIDs)
+{
+  hashmap<Option<ResourceProviderID>, UUID> result;
+
+  foreach (
+      const ResourceVersionUUID& resourceVersionUUID,
+      resourceVersionUUIDs) {
+    const Option<ResourceProviderID> resourceProviderId =
+      resourceVersionUUID.has_resource_provider_id()
+        ? resourceVersionUUID.resource_provider_id()
+        : Option<ResourceProviderID>::none();
+
+    CHECK(!result.contains(resourceProviderId));
+
+    const Try<UUID> uuid = UUID::fromBytes(resourceVersionUUID.uuid());
+    CHECK_SOME(uuid);
+
+    result.insert({std::move(resourceProviderId), std::move(uuid.get())});
+  }
+
+  return result;
+}
+
+
 TimeInfo getCurrentTime()
 {
   TimeInfo timeInfo;

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 95f57da..0ca4c6d 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -192,6 +192,17 @@ void injectAllocationInfo(
 void stripAllocationInfo(Offer::Operation* operation);
 
 
+// Helper function to pack a protobuf list of resource versions.
+google::protobuf::RepeatedPtrField<ResourceVersionUUID> createResourceVersions(
+    const hashmap<Option<ResourceProviderID>, UUID>& resourceVersions);
+
+
+// Helper function to unpack a protobuf list of resource versions.
+hashmap<Option<ResourceProviderID>, UUID> parseResourceVersions(
+    const google::protobuf::RepeatedPtrField<ResourceVersionUUID>&
+      resourceVersionUUIDs);
+
+
 // Helper function that fills in a TimeInfo from the current time.
 TimeInfo getCurrentTime();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ee212c1..01675ed 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -126,6 +126,8 @@ using process::http::authentication::Principal;
 
 using process::metrics::Counter;
 
+using google::protobuf::RepeatedPtrField;
+
 namespace mesos {
 namespace internal {
 namespace master {
@@ -140,6 +142,7 @@ using mesos::master::detector::MasterDetector;
 
 static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo);
 
+
 class SlaveObserver : public ProtobufProcess<SlaveObserver>
 {
 public:
@@ -7089,6 +7092,13 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newTotal.getOrElse(slave->totalResources.nonRevocable()) +
     newOversubscribed.getOrElse(slave->totalResources.revocable());
 
+  // Agents which can support resource providers always update the
+  // master on their resource versions uuids via `UpdateSlaveMessage`.
+  if (slave->capabilities.resourceProvider) {
+    slave->resourceVersions =
+      protobuf::parseResourceVersions(message.resource_version_uuids());
+  }
+
   if (newSlaveResources == slave->totalResources) {
     LOG(INFO) << "Ignoring update on agent " << *slave
               << " as it reports no changes";

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0c1253a..adabc59 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -267,6 +267,8 @@ struct Slave
 
   SlaveObserver* observer;
 
+  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+
 private:
   Slave(const Slave&);              // No copying.
   Slave& operator=(const Slave&); // No assigning.

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 494d793..be46ebd 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -223,6 +223,7 @@ Slave::Slave(const string& id,
     resourceEstimator(_resourceEstimator),
     qosController(_qosController),
     authorizer(_authorizer),
+    resourceVersions({{Option<ResourceProviderID>::none(), UUID::random()}}),
     secretGenerator(nullptr) {}
 
 
@@ -1277,6 +1278,8 @@ void Slave::registered(
 
   UpdateSlaveMessage message;
   message.mutable_slave_id()->CopyFrom(info.id());
+  message.mutable_resource_version_uuids()->CopyFrom(
+      protobuf::createResourceVersions(resourceVersions));
 
   if (capabilities.resourceProvider) {
     LOG(INFO) << "Forwarding total resources " << totalResources;
@@ -1377,6 +1380,9 @@ void Slave::reregistered(
   UpdateSlaveMessage message;
   message.mutable_slave_id()->CopyFrom(info.id());
 
+  message.mutable_resource_version_uuids()->CopyFrom(
+      protobuf::createResourceVersions(resourceVersions));
+
   if (capabilities.resourceProvider) {
     LOG(INFO) << "Forwarding total resources " << totalResources;
 
@@ -6646,11 +6652,23 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
       LOG(INFO) << "Forwarding total oversubscribed resources "
                 << oversubscribed;
 
+      // We do not update the agent's resource version since
+      // oversubscribed resources cannot be used for any operations
+      // but launches. Since oversubscription is run at regular
+      // intervals updating the version could cause a lot of offer
+      // operation churn.
+      //
+      // TODO(bbannier): Revisit this if  we modify the operations
+      // possible on oversubscribed resources.
+
       UpdateSlaveMessage message;
       message.mutable_slave_id()->CopyFrom(info.id());
       message.mutable_resource_categories()->set_oversubscribed(true);
       message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
 
+      message.mutable_resource_version_uuids()->CopyFrom(
+          protobuf::createResourceVersions(resourceVersions));
+
       CHECK_SOME(master);
       send(master.get(), message);
     }
@@ -6706,6 +6724,15 @@ void Slave::handleResourceProviderMessage(
       totalResources -= oldTotal;
       totalResources += newTotal;
 
+      const UUID& resourceVersionUuid =
+        message->updateTotalResources->resourceVersionUuid;
+
+      if (resourceVersions.contains(resourceProviderId)) {
+        resourceVersions.at(resourceProviderId) = resourceVersionUuid;
+      } else {
+        resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+      }
+
       // Send the updated resources to the master if the agent is running. Note
       // that since we have already updated our copy of the latest resource
       // provider resources, it is safe to consume this message and wait for the
@@ -6725,9 +6752,13 @@ void Slave::handleResourceProviderMessage(
           UpdateSlaveMessage updateSlaveMessage;
           updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
           updateSlaveMessage.mutable_resource_categories()->set_total(true);
+
           updateSlaveMessage.mutable_total_resources()->CopyFrom(
               totalResources);
 
+          updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
+              protobuf::createResourceVersions(resourceVersions));
+
           send(master.get(), updateSlaveMessage);
 
           break;

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b2dc002..0124df4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -665,6 +665,7 @@ private:
 
   ResourceProviderManager resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
+  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
 
 protected:
   // Made protected for testing purposes.