You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/11/08 16:01:22 UTC

[1/9] mesos git commit: Added tests for agent resource version transmission.

Repository: mesos
Updated Branches:
  refs/heads/master 439825f9e -> 72e6cde81


Added tests for agent resource version transmission.

This patch introduces separate tests for clock values communicated
from resource providers and from agents to masters.

Review: https://reviews.apache.org/r/63496/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/72e6cde8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/72e6cde8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/72e6cde8

Branch: refs/heads/master
Commit: 72e6cde810ce4543bc91abebb7cd84d01889ddea
Parents: 06856ec
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:52:34 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 77 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 77 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/72e6cde8/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 9928cfc..cf2fbac 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8657,6 +8657,83 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
   expectedResources += devolve(resourceProviderResources);
 
   EXPECT_EQ(expectedResources, updateSlaveMessage->total_resources());
+
+  // The update from the agent should now contain both the agent and
+  // resource provider resource versions.
+  ASSERT_EQ(2u, updateSlaveMessage->resource_version_uuids_size());
+
+  hashset<Option<ResourceProviderID>> resourceProviderIds;
+  foreach (
+      const ResourceVersionUUID& resourceVersionUuid,
+      updateSlaveMessage->resource_version_uuids()) {
+    resourceProviderIds.insert(
+        resourceVersionUuid.has_resource_provider_id()
+          ? resourceVersionUuid.resource_provider_id()
+          : Option<ResourceProviderID>::none());
+  }
+
+  hashset<Option<ResourceProviderID>> expectedResourceProviderIds;
+  expectedResourceProviderIds.insert(None());
+  expectedResourceProviderIds.insert(devolve(resourceProviderId));
+
+  EXPECT_EQ(expectedResourceProviderIds, resourceProviderIds);
+}
+
+
+// This test checks that the agent correctly updates and sends
+// resource version values when it registers or reregisters.
+TEST_F(SlaveTest, ResourceVersions)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Check that the agent sends its resource version uuid with
+  // `RegisterSlaveMessage`.
+  Future<RegisterSlaveMessage> registerSlaveMessage =
+    FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::settle();
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(registerSlaveMessage);
+
+  // Since no resource providers registered, the agent only sends its
+  // own resource version uuid. The agent has no resource provider id.
+  ASSERT_EQ(1u, registerSlaveMessage->resource_version_uuids().size());
+  EXPECT_FALSE(registerSlaveMessage->resource_version_uuids(0)
+                 .has_resource_provider_id());
+
+  // Check that the agent sends its resource version uuid in
+  // `ReregisterSlaveMessage`.
+  Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  // Simulate a new master detected event on the slave,
+  // so that the slave will attempt to re-register.
+  detector.appoint(master.get()->pid);
+
+  Clock::settle();
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(reregisterSlaveMessage);
+
+  // No resource changes occurred on the agent and we expect the
+  // resource version uuids to be unchanged to the ones sent in the
+  // original registration.
+  ASSERT_EQ(
+      registerSlaveMessage->resource_version_uuids_size(),
+      reregisterSlaveMessage->resource_version_uuids_size());
+
+  EXPECT_EQ(
+      registerSlaveMessage->resource_version_uuids(0),
+      reregisterSlaveMessage->resource_version_uuids(0));
 }
 
 } // namespace tests {


[2/9] mesos git commit: Removed unused declaration.

Posted by bb...@apache.org.
Removed unused declaration.

The declaration of this operator was not needed here. It was also
done in a surprising namespace as the 'Task' equality operator lives
in the 'mesos' namespace while this declaration was in
'mesos::internal'.

Review: https://reviews.apache.org/r/63494/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f44cc225
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f44cc225
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f44cc225

Branch: refs/heads/master
Commit: f44cc22509fab77a4d0ae2d773cb857f6714a0e1
Parents: 76e6358
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:52:26 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/messages/messages.hpp | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f44cc225/src/messages/messages.hpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.hpp b/src/messages/messages.hpp
index 59e6705..0a2a947 100644
--- a/src/messages/messages.hpp
+++ b/src/messages/messages.hpp
@@ -35,9 +35,6 @@
 namespace mesos {
 namespace internal {
 
-bool operator==(const Task& left, const Task& right);
-
-
 std::ostream& operator<<(std::ostream& stream, const StatusUpdate& update);
 
 


[4/9] mesos git commit: Added flag protobuf message for agent capabilities.

Posted by bb...@apache.org.
Added flag protobuf message for agent capabilities.

See summary.

Review: https://reviews.apache.org/r/63540/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ca495620
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ca495620
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ca495620

Branch: refs/heads/master
Commit: ca4956206a06026e2fd0ca30bdacaf7e4a8cb5b5
Parents: 439825f
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:51:40 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/messages/flags.hpp   | 21 +++++++++++++++++++++
 src/messages/flags.proto |  8 ++++++++
 2 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ca495620/src/messages/flags.hpp
----------------------------------------------------------------------
diff --git a/src/messages/flags.hpp b/src/messages/flags.hpp
index b267677..00e26cd 100644
--- a/src/messages/flags.hpp
+++ b/src/messages/flags.hpp
@@ -58,6 +58,19 @@ inline Try<mesos::internal::ContainerDNSInfo> parse(const std::string& value)
   return protobuf::parse<mesos::internal::ContainerDNSInfo>(json.get());
 }
 
+
+template <>
+inline Try<mesos::internal::SlaveCapabilities> parse(const std::string& value)
+{
+  // Convert from string or file to JSON.
+  Try<JSON::Object> json = parse<JSON::Object>(value);
+  if (json.isError()) {
+    return Error(json.error());
+  }
+
+  return protobuf::parse<mesos::internal::SlaveCapabilities>(json.get());
+}
+
 } // namespace flags {
 
 namespace mesos {
@@ -78,6 +91,14 @@ inline std::ostream& operator<<(
   return stream << dns.DebugString();
 }
 
+
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const SlaveCapabilities& slaveCapabilities)
+{
+  return stream << slaveCapabilities.DebugString();
+}
+
 } // namespace internal {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ca495620/src/messages/flags.proto
----------------------------------------------------------------------
diff --git a/src/messages/flags.proto b/src/messages/flags.proto
index 077353c..7ae9ef8 100644
--- a/src/messages/flags.proto
+++ b/src/messages/flags.proto
@@ -16,6 +16,8 @@
 
 syntax = "proto2";
 
+import "mesos/mesos.proto";
+
 import "slave/containerizer/mesos/isolators/network/cni/spec.proto";
 
 package mesos.internal;
@@ -97,3 +99,9 @@ message ContainerDNSInfo {
   repeated MesosInfo mesos = 1;
   repeated DockerInfo docker = 2;
 }
+
+
+// Describes a set of agent capabilities.
+message SlaveCapabilities {
+  repeated SlaveInfo.Capability capabilities = 1;
+}


[9/9] mesos git commit: Transmitted agent resource versions in (re)registration.

Posted by bb...@apache.org.
Transmitted agent resource versions in (re)registration.

This commit introduces resource version fields into agent registration
and reregistration message. The agent is changed to set these fields
when needed; the master in turn stores the versions for use in to be
added speculated offer operations.

Review: https://reviews.apache.org/r/63493/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/76e63580
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/76e63580
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/76e63580

Branch: refs/heads/master
Commit: 76e63580cc3f3d0b88d9295f9b02231068d3bc65
Parents: 51a1549
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:52:19 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/messages/messages.proto | 26 ++++++++++++++++++++++++++
 src/slave/slave.cpp         |  6 ++++++
 2 files changed, 32 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/76e63580/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 0cc6b40..33732e2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -470,6 +470,19 @@ message RegisterSlaveMessage {
   // capabilities (e.g., ability to launch tasks of 'multi-role'
   // frameworks).
   repeated SlaveInfo.Capability agent_capabilities = 4;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when
+  // it believes that the resources from this resource provider are
+  // out of sync from the master's view.  The master will keep track
+  // of the last known resource version UUID for each resource
+  // provider, and attach the resource version UUID in each operation
+  // it sends out. The resource provider should reject operations that
+  // have a different resource version UUID than that it maintains,
+  // because this means the operation is operating on resources that
+  // might have already been invalidated.
+  repeated ResourceVersionUUID resource_version_uuids = 5;
 }
 
 
@@ -512,6 +525,19 @@ message ReregisterSlaveMessage {
   // capabilities (e.g., ability to launch tasks of 'multi-role'
   // frameworks).
   repeated SlaveInfo.Capability agent_capabilities = 9;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when
+  // it believes that the resources from this resource provider are
+  // out of sync from the master's view.  The master will keep track
+  // of the last known resource version UUID for each resource
+  // provider, and attach the resource version UUID in each operation
+  // it sends out. The resource provider should reject operations that
+  // have a different resource version UUID than that it maintains,
+  // because this means the operation is operating on resources that
+  // might have already been invalidated.
+  repeated ResourceVersionUUID resource_version_uuids = 10;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/76e63580/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index be46ebd..7cb6661 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1536,6 +1536,9 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
+    message.mutable_resource_version_uuids()->CopyFrom(
+        protobuf::createResourceVersions(resourceVersions));
+
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
 
@@ -1548,6 +1551,9 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
+    message.mutable_resource_version_uuids()->CopyFrom(
+        protobuf::createResourceVersions(resourceVersions));
+
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
 


[5/9] mesos git commit: Added resource version to resource provider UpdateTotalResources call.

Posted by bb...@apache.org.
Added resource version to resource provider UpdateTotalResources call.

This patch surfaces this information to resource provider manager
users like the agent. In a later patch we will modify the agent to
forward this information to the master.

Review: https://reviews.apache.org/r/63491/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1c9b46b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1c9b46b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1c9b46b

Branch: refs/heads/master
Commit: b1c9b46b97e64c333709b73274a5ae3436b7fda7
Parents: e37b133
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:52:04 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/resource_provider/manager.cpp | 14 +++++++++++---
 src/resource_provider/message.hpp |  4 +++-
 2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b1c9b46b/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index a878507..bcc833b 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -434,9 +434,17 @@ void ResourceProviderManagerProcess::updateState(
 
   // TODO(chhsiao): Report pending operations.
 
-  ResourceProviderMessage::UpdateTotalResources updateTotalResources;
-  updateTotalResources.id = resourceProvider->info.id();
-  updateTotalResources.total = resourceProvider->resources;
+  Try<UUID> resourceVersionUuid =
+    UUID::fromBytes(update.resource_version_uuid());
+
+  CHECK_SOME(resourceVersionUuid)
+    << "Could not deserialize version of resource provider "
+    << resourceProvider->info.id() << ": " << resourceVersionUuid.error();
+
+  ResourceProviderMessage::UpdateTotalResources updateTotalResources{
+      resourceProvider->info.id(),
+      resourceVersionUuid.get(),
+      resourceProvider->resources};
 
   ResourceProviderMessage message;
   message.type = ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES;

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1c9b46b/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 931aab6..a1a84c1 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -36,8 +36,10 @@ struct ResourceProviderMessage
     UPDATE_TOTAL_RESOURCES
   };
 
-  struct UpdateTotalResources {
+  struct UpdateTotalResources
+  {
     ResourceProviderID id;
+    UUID resourceVersionUuid;
     Resources total;
   };
 


[8/9] mesos git commit: Added comparison operators for 'ResourceVersionUUID'.

Posted by bb...@apache.org.
Added comparison operators for 'ResourceVersionUUID'.

See summary.

Review: https://reviews.apache.org/r/63495/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/06856eca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/06856eca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/06856eca

Branch: refs/heads/master
Commit: 06856ecadf30852563ad427474aed02aae2792d9
Parents: f44cc22
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:52:30 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/messages/messages.cpp | 29 +++++++++++++++++++++++++++++
 src/messages/messages.hpp | 10 ++++++++++
 2 files changed, 39 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/06856eca/src/messages/messages.cpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.cpp b/src/messages/messages.cpp
index 7f1da63..6029502 100644
--- a/src/messages/messages.cpp
+++ b/src/messages/messages.cpp
@@ -26,6 +26,35 @@ using std::ostream;
 namespace mesos {
 namespace internal {
 
+bool operator==(
+    const ResourceVersionUUID& left,
+    const ResourceVersionUUID& right)
+{
+  if (left.has_resource_provider_id() != right.has_resource_provider_id()) {
+    return false;
+  }
+
+  if (left.has_resource_provider_id() &&
+      left.resource_provider_id() != right.resource_provider_id()) {
+    return false;
+  }
+
+  if (left.uuid() != right.uuid()) {
+    return false;
+  }
+
+  return true;
+}
+
+
+bool operator!=(
+    const ResourceVersionUUID& left,
+    const ResourceVersionUUID& right)
+{
+  return !(left == right);
+}
+
+
 ostream& operator<<(ostream& stream, const StatusUpdate& update)
 {
   stream << update.status().state();

http://git-wip-us.apache.org/repos/asf/mesos/blob/06856eca/src/messages/messages.hpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.hpp b/src/messages/messages.hpp
index 0a2a947..2756eba 100644
--- a/src/messages/messages.hpp
+++ b/src/messages/messages.hpp
@@ -35,6 +35,16 @@
 namespace mesos {
 namespace internal {
 
+bool operator==(
+    const ResourceVersionUUID& left,
+    const ResourceVersionUUID& right);
+
+
+bool operator!=(
+    const ResourceVersionUUID& left,
+    const ResourceVersionUUID& right);
+
+
 std::ostream& operator<<(std::ostream& stream, const StatusUpdate& update);
 
 


[3/9] mesos git commit: Triggered 'UpdateSlaveMessage' when 'ResourceProviderManager' updates.

Posted by bb...@apache.org.
Triggered 'UpdateSlaveMessage' when 'ResourceProviderManager' updates.

The agent's resource provider manager sends a
'ResourceProviderMessage' when its managed resources change. This
commit adds handling in the agent so that an 'UpdateSlaveMessage' is
sent to the master to update the total resource available on the
agent.

In order to provide push-like handling of the resource provider
manager's message queue, we chain recursive calls to the handler for
continuous processing. Initially, processing is kicked off from
'Slave::initialize'. In this simple implementation we e.g., provide no
direct way to stop processing of messages, yet, but it can be achieved
by e.g., replacing the manager with a new instance (this would also
require updating routes).

Since the agent can only send an 'UpdateSlaveMessage' when it is
registered with a master, a simple back-off of 5 s is implemented which
will defer processing of a ready message should the agent not yet have
registered.

To facilitate logging we add a stringification function for
'ResourceProviderMessage's.

This patch also adjusts a number of tests to no expect two
'UpdateSlaveMessage's.

Review: https://reviews.apache.org/r/61183/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e37b1336
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e37b1336
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e37b1336

Branch: refs/heads/master
Commit: e37b1336c21d0b2d60fead1d6489122615a8a8c9
Parents: 9850482
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:51:52 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/resource_provider/message.hpp |  25 ++++++
 src/slave/slave.cpp               | 130 ++++++++++++++++++++++++++++-
 src/slave/slave.hpp               |   7 +-
 src/tests/slave_tests.cpp         | 144 +++++++++++++++++++++++++++++++++
 4 files changed, 300 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e37b1336/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 3c7c3f2..931aab6 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -17,10 +17,14 @@
 #ifndef __RESOURCE_PROVIDER_MESSAGE_HPP__
 #define __RESOURCE_PROVIDER_MESSAGE_HPP__
 
+#include <ostream>
+
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
+#include <stout/check.hpp>
 #include <stout/option.hpp>
+#include <stout/unreachable.hpp>
 
 namespace mesos {
 namespace internal {
@@ -42,6 +46,27 @@ struct ResourceProviderMessage
   Option<UpdateTotalResources> updateTotalResources;
 };
 
+
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const ResourceProviderMessage& resourceProviderMessage)
+{
+  switch (resourceProviderMessage.type) {
+    case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES:
+      const Option<ResourceProviderMessage::UpdateTotalResources>&
+        updateTotalResources = resourceProviderMessage.updateTotalResources;
+
+      CHECK_SOME(updateTotalResources);
+
+      return stream
+          << "UPDATE_TOTAL_RESOURCES: "
+          << updateTotalResources->id << " "
+          << updateTotalResources->total;
+  }
+
+  UNREACHABLE();
+}
+
 } // namespace internal {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e37b1336/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0e342cd..494d793 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1270,17 +1270,36 @@ void Slave::registered(
       break;
   }
 
+  // Send the latest total, including resources from resource providers. We send
+  // this message here as a resource provider might have registered with the
+  // agent between recovery completion and agent registration.
+  bool sendUpdateSlaveMessage = false;
+
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+
+  if (capabilities.resourceProvider) {
+    LOG(INFO) << "Forwarding total resources " << totalResources;
+
+    message.mutable_resource_categories()->set_total(true);
+    message.mutable_total_resources()->CopyFrom(totalResources);
+
+    sendUpdateSlaveMessage = true;
+  }
+
   // Send the latest estimate for oversubscribed resources.
   if (oversubscribedResources.isSome()) {
     LOG(INFO) << "Forwarding total oversubscribed resources "
               << oversubscribedResources.get();
 
-    UpdateSlaveMessage message;
-    message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_resource_categories()->set_oversubscribed(true);
     message.mutable_oversubscribed_resources()->CopyFrom(
         oversubscribedResources.get());
 
+    sendUpdateSlaveMessage = true;
+  }
+
+  if (sendUpdateSlaveMessage) {
     send(master.get(), message);
   }
 }
@@ -1350,17 +1369,36 @@ void Slave::reregistered(
       return;
   }
 
+  // Send the latest total, including resources from resource providers. We send
+  // this message here as a resource provider might have registered with the
+  // agent between recovery completion and agent registration.
+  bool sendUpdateSlaveMessage = false;
+
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+
+  if (capabilities.resourceProvider) {
+    LOG(INFO) << "Forwarding total resources " << totalResources;
+
+    message.mutable_resource_categories()->set_total(true);
+    message.mutable_total_resources()->CopyFrom(totalResources);
+
+    sendUpdateSlaveMessage = true;
+  }
+
   // Send the latest estimate for oversubscribed resources.
   if (oversubscribedResources.isSome()) {
     LOG(INFO) << "Forwarding total oversubscribed resources "
               << oversubscribedResources.get();
 
-    UpdateSlaveMessage message;
-    message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_resource_categories()->set_oversubscribed(true);
     message.mutable_oversubscribed_resources()->CopyFrom(
         oversubscribedResources.get());
 
+    sendUpdateSlaveMessage = true;
+  }
+
+  if (sendUpdateSlaveMessage) {
     send(master.get(), message);
   }
 
@@ -6429,6 +6467,12 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
+    if (capabilities.resourceProvider) {
+      // Start listening for messages from the resource provider manager.
+      resourceProviderManager.messages().get().onAny(
+          defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+    }
+
     // Forward oversubscribed resources.
     forwardOversubscribed();
 
@@ -6621,6 +6665,84 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 }
 
 
+void Slave::handleResourceProviderMessage(
+    const Future<ResourceProviderMessage>& message)
+{
+  // Ignore terminal messages which are not ready. These
+  // can arise e.g., if the `Future` was discarded.
+  if (!message.isReady()) {
+    LOG(ERROR) << "Last resource provider message became terminal before "
+                  "becoming ready: "
+               << (message.isFailed() ? message.failure() : "future discarded");
+
+    // Wait for the next message.
+    resourceProviderManager.messages().get()
+      .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+
+    return;
+  }
+
+  LOG(INFO) << "Handling resource provider message '" << message.get() << "'";
+
+  switch(message->type) {
+    case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES: {
+      CHECK_SOME(message->updateTotalResources);
+
+      const Resources& newTotal = message->updateTotalResources->total;
+
+      const ResourceProviderID& resourceProviderId =
+        message->updateTotalResources->id;
+
+      const Resources oldTotal =
+        totalResources.filter([&resourceProviderId](const Resource& resource) {
+          return resource.provider_id() == resourceProviderId;
+        });
+
+      // Ignore the update if it contained no new information.
+      if (newTotal == oldTotal) {
+        break;
+      }
+
+      totalResources -= oldTotal;
+      totalResources += newTotal;
+
+      // Send the updated resources to the master if the agent is running. Note
+      // that since we have already updated our copy of the latest resource
+      // provider resources, it is safe to consume this message and wait for the
+      // next one; even if we do not send the update to the master right now, an
+      // update will be send once the agent reregisters.
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case TERMINATING: {
+          break;
+        }
+        case RUNNING: {
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
+
+          // Inform the master that the total capacity of this agent has
+          // changed.
+          UpdateSlaveMessage updateSlaveMessage;
+          updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
+          updateSlaveMessage.mutable_resource_categories()->set_total(true);
+          updateSlaveMessage.mutable_total_resources()->CopyFrom(
+              totalResources);
+
+          send(master.get(), updateSlaveMessage);
+
+          break;
+        }
+      }
+      break;
+    }
+  }
+
+  // Wait for the next message.
+  resourceProviderManager.messages().get()
+    .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+}
+
+
 void Slave::qosCorrections()
 {
   qosController->corrections()

http://git-wip-us.apache.org/repos/asf/mesos/blob/e37b1336/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 79c7f02..b2dc002 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -530,6 +530,9 @@ private:
   void _forwardOversubscribed(
       const process::Future<Resources>& oversubscribable);
 
+  void handleResourceProviderMessage(
+      const process::Future<ResourceProviderMessage>& message);
+
   // Gauge methods.
   double _frameworks_active()
   {
@@ -576,8 +579,8 @@ private:
   // Resources that are checkpointed by the slave.
   Resources checkpointedResources;
 
-  // The current total resources of the agent, i.e.,
-  // `info.resources()` with checkpointed resources applied.
+  // The current total resources of the agent, i.e., `info.resources()` with
+  // checkpointed resources applied and resource provider resources.
   Resources totalResources;
 
   Option<process::UPID> master;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e37b1336/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index f9c2e6b..9928cfc 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -31,6 +31,8 @@
 
 #include <mesos/authentication/http/basic_authenticator_factory.hpp>
 
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
 #include <process/clock.hpp>
 #include <process/collect.hpp>
 #include <process/future.hpp>
@@ -41,6 +43,8 @@
 #include <process/reap.hpp>
 #include <process/subprocess.hpp>
 
+#include <process/ssl/flags.hpp>
+
 #include <stout/hashset.hpp>
 #include <stout/json.hpp>
 #include <stout/none.hpp>
@@ -8515,6 +8519,146 @@ TEST_P(DefaultContainerDNSFlagTest, ValidateFlag)
   }
 }
 
+
+// This test checks that when a resource provider subscribes with the
+// agent's resource provider manager, the agent send an
+// `UpdateSlaveMessage` reflecting the updated capacity.
+//
+// TODO(bbannier): We should also add tests for the agent behavior
+// with resource providers where the agent ultimately resends the
+// previous total when the master fails over, or for the interaction
+// with the usual oversubscription protocol (oversubscribed resources
+// vs. updates of total).
+TEST_F(SlaveTest, ResourceProviderSubscribe)
+{
+  Clock::pause();
+
+  // Start an agent and a master.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Specify the agent resources so we can check the reported total later.
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:2;mem:512;disk:512;ports:[]";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER};
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    SlaveInfo::Capability* capability =
+      slaveFlags.agent_features->add_capabilities();
+    capability->set_type(type);
+  }
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a local resource provider with the agent.
+  v1::MockResourceProvider resourceProvider;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(resourceProvider, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  process::http::URL url(
+      scheme,
+      slave.get()->pid.address.ip,
+      slave.get()->pid.address.port,
+      slave.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  resourceProvider.start(
+      endpointDetector,
+      ContentType::PROTOBUF,
+      v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(connected);
+
+  Future<mesos::v1::resource_provider::Event::Subscribed> subscribed;
+  EXPECT_CALL(resourceProvider, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribed));
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  {
+    mesos::v1::resource_provider::Call call;
+    call.set_type(mesos::v1::resource_provider::Call::SUBSCRIBE);
+
+    mesos::v1::ResourceProviderInfo* info =
+      call.mutable_subscribe()->mutable_resource_provider_info();
+
+    info->set_type("org.apache.mesos.resource_provider.test");
+    info->set_name("test");
+
+    resourceProvider.send(call);
+  }
+
+  // The subscription event contains the assigned resource provider id.
+  AWAIT_READY(subscribed);
+
+  const mesos::v1::ResourceProviderID& resourceProviderId =
+    subscribed->provider_id();
+
+  v1::Resource resourceProviderResources =
+    v1::Resources::parse("disk", "8096", "*").get();
+
+  resourceProviderResources.mutable_provider_id()->CopyFrom(resourceProviderId);
+
+  {
+    mesos::v1::resource_provider::Call call;
+    call.set_type(mesos::v1::resource_provider::Call::UPDATE_STATE);
+
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
+
+    auto updateState = call.mutable_update_state();
+    updateState->mutable_resources()->CopyFrom(
+        v1::Resources(resourceProviderResources));
+    updateState->set_resource_version_uuid(UUID::random().toBytes());
+
+    resourceProvider.send(call);
+  }
+
+  AWAIT_READY(updateSlaveMessage);
+
+  EXPECT_TRUE(updateSlaveMessage->has_resource_categories());
+  EXPECT_TRUE(updateSlaveMessage->resource_categories().has_total());
+  EXPECT_TRUE(updateSlaveMessage->resource_categories().total());
+
+  // We expect the updated agent total to contain both the resources of the
+  // agent and of the newly subscribed resource provider. The resources from the
+  // resource provider have a matching `ResourceProviderId` set.
+  Resources expectedResources =
+    Resources::parse(slaveFlags.resources.get()).get();
+  expectedResources += devolve(resourceProviderResources);
+
+  EXPECT_EQ(expectedResources, updateSlaveMessage->total_resources());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[7/9] mesos git commit: Synchronized agent resource versions via 'UpdateSlaveMessage'.

Posted by bb...@apache.org.
Synchronized agent resource versions via 'UpdateSlaveMessage'.

This commit introduces agent resource versions to the master and
agents. Agents are responsible for maintaining their resource
versions. The resource versions are synchronized with the master via
'UpdateSlaveMessage'.

Review: https://reviews.apache.org/r/63492/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51a15496
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51a15496
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51a15496

Branch: refs/heads/master
Commit: 51a1549611fbfcf1ad1a1eb5e3efe0f1221acb36
Parents: b1c9b46
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:52:08 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp | 46 ++++++++++++++++++++++++++++++++++++++
 src/common/protobuf_utils.hpp | 11 +++++++++
 src/master/master.cpp         | 10 +++++++++
 src/master/master.hpp         |  2 ++
 src/slave/slave.cpp           | 31 +++++++++++++++++++++++++
 src/slave/slave.hpp           |  1 +
 6 files changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 7a4b87b..5739a63 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -755,6 +755,52 @@ void stripAllocationInfo(Offer::Operation* operation)
 }
 
 
+RepeatedPtrField<ResourceVersionUUID> createResourceVersions(
+    const hashmap<Option<ResourceProviderID>, UUID>& resourceVersions)
+{
+  RepeatedPtrField<ResourceVersionUUID> result;
+
+  foreachpair (
+      const Option<ResourceProviderID>& resourceProviderId,
+      const UUID& uuid,
+      resourceVersions) {
+    ResourceVersionUUID* entry = result.Add();
+
+    if (resourceProviderId.isSome()) {
+      entry->mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+    }
+    entry->set_uuid(uuid.toBytes());
+  }
+
+  return result;
+}
+
+
+hashmap<Option<ResourceProviderID>, UUID> parseResourceVersions(
+    const RepeatedPtrField<ResourceVersionUUID>& resourceVersionUUIDs)
+{
+  hashmap<Option<ResourceProviderID>, UUID> result;
+
+  foreach (
+      const ResourceVersionUUID& resourceVersionUUID,
+      resourceVersionUUIDs) {
+    const Option<ResourceProviderID> resourceProviderId =
+      resourceVersionUUID.has_resource_provider_id()
+        ? resourceVersionUUID.resource_provider_id()
+        : Option<ResourceProviderID>::none();
+
+    CHECK(!result.contains(resourceProviderId));
+
+    const Try<UUID> uuid = UUID::fromBytes(resourceVersionUUID.uuid());
+    CHECK_SOME(uuid);
+
+    result.insert({std::move(resourceProviderId), std::move(uuid.get())});
+  }
+
+  return result;
+}
+
+
 TimeInfo getCurrentTime()
 {
   TimeInfo timeInfo;

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 95f57da..0ca4c6d 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -192,6 +192,17 @@ void injectAllocationInfo(
 void stripAllocationInfo(Offer::Operation* operation);
 
 
+// Helper function to pack a protobuf list of resource versions.
+google::protobuf::RepeatedPtrField<ResourceVersionUUID> createResourceVersions(
+    const hashmap<Option<ResourceProviderID>, UUID>& resourceVersions);
+
+
+// Helper function to unpack a protobuf list of resource versions.
+hashmap<Option<ResourceProviderID>, UUID> parseResourceVersions(
+    const google::protobuf::RepeatedPtrField<ResourceVersionUUID>&
+      resourceVersionUUIDs);
+
+
 // Helper function that fills in a TimeInfo from the current time.
 TimeInfo getCurrentTime();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ee212c1..01675ed 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -126,6 +126,8 @@ using process::http::authentication::Principal;
 
 using process::metrics::Counter;
 
+using google::protobuf::RepeatedPtrField;
+
 namespace mesos {
 namespace internal {
 namespace master {
@@ -140,6 +142,7 @@ using mesos::master::detector::MasterDetector;
 
 static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo);
 
+
 class SlaveObserver : public ProtobufProcess<SlaveObserver>
 {
 public:
@@ -7089,6 +7092,13 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newTotal.getOrElse(slave->totalResources.nonRevocable()) +
     newOversubscribed.getOrElse(slave->totalResources.revocable());
 
+  // Agents which can support resource providers always update the
+  // master on their resource versions uuids via `UpdateSlaveMessage`.
+  if (slave->capabilities.resourceProvider) {
+    slave->resourceVersions =
+      protobuf::parseResourceVersions(message.resource_version_uuids());
+  }
+
   if (newSlaveResources == slave->totalResources) {
     LOG(INFO) << "Ignoring update on agent " << *slave
               << " as it reports no changes";

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0c1253a..adabc59 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -267,6 +267,8 @@ struct Slave
 
   SlaveObserver* observer;
 
+  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+
 private:
   Slave(const Slave&);              // No copying.
   Slave& operator=(const Slave&); // No assigning.

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 494d793..be46ebd 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -223,6 +223,7 @@ Slave::Slave(const string& id,
     resourceEstimator(_resourceEstimator),
     qosController(_qosController),
     authorizer(_authorizer),
+    resourceVersions({{Option<ResourceProviderID>::none(), UUID::random()}}),
     secretGenerator(nullptr) {}
 
 
@@ -1277,6 +1278,8 @@ void Slave::registered(
 
   UpdateSlaveMessage message;
   message.mutable_slave_id()->CopyFrom(info.id());
+  message.mutable_resource_version_uuids()->CopyFrom(
+      protobuf::createResourceVersions(resourceVersions));
 
   if (capabilities.resourceProvider) {
     LOG(INFO) << "Forwarding total resources " << totalResources;
@@ -1377,6 +1380,9 @@ void Slave::reregistered(
   UpdateSlaveMessage message;
   message.mutable_slave_id()->CopyFrom(info.id());
 
+  message.mutable_resource_version_uuids()->CopyFrom(
+      protobuf::createResourceVersions(resourceVersions));
+
   if (capabilities.resourceProvider) {
     LOG(INFO) << "Forwarding total resources " << totalResources;
 
@@ -6646,11 +6652,23 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
       LOG(INFO) << "Forwarding total oversubscribed resources "
                 << oversubscribed;
 
+      // We do not update the agent's resource version since
+      // oversubscribed resources cannot be used for any operations
+      // but launches. Since oversubscription is run at regular
+      // intervals updating the version could cause a lot of offer
+      // operation churn.
+      //
+      // TODO(bbannier): Revisit this if  we modify the operations
+      // possible on oversubscribed resources.
+
       UpdateSlaveMessage message;
       message.mutable_slave_id()->CopyFrom(info.id());
       message.mutable_resource_categories()->set_oversubscribed(true);
       message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
 
+      message.mutable_resource_version_uuids()->CopyFrom(
+          protobuf::createResourceVersions(resourceVersions));
+
       CHECK_SOME(master);
       send(master.get(), message);
     }
@@ -6706,6 +6724,15 @@ void Slave::handleResourceProviderMessage(
       totalResources -= oldTotal;
       totalResources += newTotal;
 
+      const UUID& resourceVersionUuid =
+        message->updateTotalResources->resourceVersionUuid;
+
+      if (resourceVersions.contains(resourceProviderId)) {
+        resourceVersions.at(resourceProviderId) = resourceVersionUuid;
+      } else {
+        resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+      }
+
       // Send the updated resources to the master if the agent is running. Note
       // that since we have already updated our copy of the latest resource
       // provider resources, it is safe to consume this message and wait for the
@@ -6725,9 +6752,13 @@ void Slave::handleResourceProviderMessage(
           UpdateSlaveMessage updateSlaveMessage;
           updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
           updateSlaveMessage.mutable_resource_categories()->set_total(true);
+
           updateSlaveMessage.mutable_total_resources()->CopyFrom(
               totalResources);
 
+          updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
+              protobuf::createResourceVersions(resourceVersions));
+
           send(master.get(), updateSlaveMessage);
 
           break;

http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b2dc002..0124df4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -665,6 +665,7 @@ private:
 
   ResourceProviderManager resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
+  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
 
 protected:
   // Made protected for testing purposes.


[6/9] mesos git commit: Allowed toggling of agent capabilities via command line flags.

Posted by bb...@apache.org.
Allowed toggling of agent capabilities via command line flags.

This patch introduces a new agent command line flags
'--agent_features' which can be used to whitelist capabilities to
enable. This flag is intended to enable introducing feature flags in
the future so experimental features can be added while still reducing
the risk of breaking existing functionality. Currently only the
'RESOURCE_PROVIDER' capability can be toggled.

Review: https://reviews.apache.org/r/63519/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9850482c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9850482c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9850482c

Branch: refs/heads/master
Commit: 9850482cc5a409762443206a79d186b06be36e4f
Parents: ca49562
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Nov 8 00:51:45 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Nov 8 15:51:20 2017 +0100

----------------------------------------------------------------------
 docs/configuration/agent.md | 22 ++++++++++++++++++++++
 src/slave/flags.cpp         | 32 ++++++++++++++++++++++++++++++++
 src/slave/flags.hpp         |  1 +
 src/slave/slave.cpp         | 21 ++++++++++++++-------
 src/slave/slave.hpp         |  2 ++
 5 files changed, 71 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9850482c/docs/configuration/agent.md
----------------------------------------------------------------------
diff --git a/docs/configuration/agent.md b/docs/configuration/agent.md
index 116e7c6..f1e0681 100644
--- a/docs/configuration/agent.md
+++ b/docs/configuration/agent.md
@@ -85,6 +85,28 @@ Example:
 }</code></pre>
   </td>
 </tr>
+
+<tr>
+  <td>
+    --agent_features=VALUE
+  </td>
+  <td>
+JSON representation of agent features to whitelist. We always require
+'MULTI_ROLE', 'HIERARCHICAL_ROLE', and 'RESERVATION_REFINEMENT'.
+<p/>
+Example:
+<pre><code>
+{
+    "capabilities": [
+        {"type": "MULTI_ROLE"},
+        {"type": "HIERARCHICAL_ROLE"},
+        {"type": "RESERVATION_REFINEMENT"}
+    ]
+}
+</pre></code>
+  </td>
+</tr>
+
 <tr>
   <td>
     --agent_subsystems=VALUE,

http://git-wip-us.apache.org/repos/asf/mesos/blob/9850482c/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 63aaac2..c5078a8 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -27,6 +27,7 @@
 
 #include "common/http.hpp"
 #include "common/parse.hpp"
+#include "common/protobuf_utils.hpp"
 
 #include "slave/constants.hpp"
 
@@ -641,6 +642,37 @@ mesos::internal::slave::Flags::Flags()
       "This flag has the same syntax as `--effective_capabilities`."
      );
 
+  add(&Flags::agent_features,
+      "agent_features",
+      "JSON representation of agent features to whitelist. We always require\n"
+      "'MULTI_ROLE', 'HIERARCHICAL_ROLE', and 'RESERVATION_REFINEMENT'.\n"
+      "\n"
+      "Example:\n"
+      "{\n"
+      "    \"capabilities\": [\n"
+      "        {\"type\": \"MULTI_ROLE\"},\n"
+      "        {\"type\": \"HIERARCHICAL_ROLE\"},\n"
+      "        {\"type\": \"RESERVATION_REFINEMENT\"}\n"
+      "    ]\n"
+      "}\n",
+      [](const Option<SlaveCapabilities>& agentFeatures) -> Option<Error> {
+        // Check all required capabilities are enabled.
+        if (agentFeatures.isSome()) {
+          protobuf::slave::Capabilities capabilities(
+              agentFeatures->capabilities());
+
+          if (!capabilities.multiRole ||
+              !capabilities.hierarchicalRole ||
+              !capabilities.reservationRefinement) {
+            return Error(
+                "At least the following agent features need to be enabled: "
+                "MULTI_ROLE, HIERARCHICAL_ROLE, RESERVATION_REFINEMENT");
+          }
+        }
+
+        return None();
+      });
+
   add(&Flags::disallow_sharing_agent_pid_namespace,
       "disallow_sharing_agent_pid_namespace",
       "If set to `true`, each top-level container will have its own pid\n"

http://git-wip-us.apache.org/repos/asf/mesos/blob/9850482c/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index d02edbf..0c02b49 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -176,6 +176,7 @@ public:
   std::string xfs_project_range;
 #endif
   bool http_command_executor;
+  Option<SlaveCapabilities> agent_features;
   Option<DomainInfo> domain;
 
   // The following flags are executable specific (e.g., since we only

http://git-wip-us.apache.org/repos/asf/mesos/blob/9850482c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c108239..0e342cd 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -56,17 +56,19 @@
 #include <stout/duration.hpp>
 #include <stout/exit.hpp>
 #include <stout/fs.hpp>
+#include <stout/json.hpp>
 #include <stout/lambda.hpp>
 #include <stout/net.hpp>
 #include <stout/numify.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
+#include <stout/protobuf.hpp>
 #include <stout/stringify.hpp>
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
-#include <stout/uuid.hpp>
 #include <stout/utils.hpp>
+#include <stout/uuid.hpp>
 
 #include "authentication/cram_md5/authenticatee.hpp"
 
@@ -196,6 +198,11 @@ Slave::Slave(const string& id,
     state(RECOVERING),
     flags(_flags),
     http(this),
+    capabilities(
+        _flags.agent_features.isNone()
+          ? protobuf::slave::Capabilities(AGENT_CAPABILITIES())
+          : protobuf::slave::Capabilities(
+                _flags.agent_features->capabilities())),
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
     detector(_detector),
     containerizer(_containerizer),
@@ -1481,9 +1488,9 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     RegisterSlaveMessage message;
     message.set_version(MESOS_VERSION);
     message.mutable_slave()->CopyFrom(slaveInfo);
-    foreach (const SlaveInfo::Capability& capability, AGENT_CAPABILITIES()) {
-      message.add_agent_capabilities()->CopyFrom(capability);
-    }
+
+    message.mutable_agent_capabilities()->CopyFrom(
+        capabilities.toRepeatedPtrField());
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
@@ -1493,9 +1500,9 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     // Re-registering, so send tasks running.
     ReregisterSlaveMessage message;
     message.set_version(MESOS_VERSION);
-    foreach (const SlaveInfo::Capability& capability, AGENT_CAPABILITIES()) {
-      message.add_agent_capabilities()->CopyFrom(capability);
-    }
+
+    message.mutable_agent_capabilities()->CopyFrom(
+        capabilities.toRepeatedPtrField());
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);

http://git-wip-us.apache.org/repos/asf/mesos/blob/9850482c/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index df1b020..79c7f02 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -571,6 +571,8 @@ private:
 
   SlaveInfo info;
 
+  protobuf::slave::Capabilities capabilities;
+
   // Resources that are checkpointed by the slave.
   Resources checkpointedResources;