You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/11 22:02:22 UTC

[1/3] mesos git commit: Sent resource version uuid only for agent default resources.

Repository: mesos
Updated Branches:
  refs/heads/master 3f862f332 -> 848767b4f


Sent resource version uuid only for agent default resources.

It's not correct to send resource version uuids for local resources
providers during agent re(registration) because the total resources from
those local resource providers are not sent in the same message.

Consider the following sequence of events:
(1) Agent disconnects
(2) Speculative operation fails in an RP, the RP bumps the version uuid
(3) Agent updates the RP’s resource version uuid
(4) Agent reregisters
(5) Master is informed about the new resource version uuid of that RP
(6) Master still has the old total of the RP
(7) Framework launch an operation assuming the old total, but with the
    new resource version uuid

This patch updated the `RegisterSlaveMessage` and
`ReregisterSlaveMessage` to only send resource version uuids for the
agent default resources.

Review: https://reviews.apache.org/r/64494


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/848767b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/848767b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/848767b4

Branch: refs/heads/master
Commit: 848767b4f3b503944936905ca498dc77681cce24
Parents: c3157d7
Author: Jie Yu <yu...@gmail.com>
Authored: Sun Dec 10 11:42:54 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Dec 11 14:02:00 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp       | 64 +++++++++++++++++++++++++++++-----------
 src/master/master.hpp       | 16 ++++++----
 src/messages/messages.proto | 46 +++++++++++++++--------------
 src/slave/slave.cpp         | 24 +++------------
 src/tests/slave_tests.cpp   | 16 +++++-----
 5 files changed, 92 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 120cb75..b10d034 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6297,6 +6297,15 @@ void Master::__registerSlave(
   vector<Resource> checkpointedResources = google::protobuf::convert(
       std::move(*registerSlaveMessage.mutable_checkpointed_resources()));
 
+  Option<UUID> resourceVersion;
+  if (registerSlaveMessage.has_resource_version_uuid()) {
+    Try<UUID> uuid = UUID::fromBytes(
+        registerSlaveMessage.resource_version_uuid());
+
+    CHECK_SOME(uuid);
+    resourceVersion = uuid.get();
+  }
+
   Slave* slave = new Slave(
       this,
       slaveInfo,
@@ -6306,8 +6315,7 @@ void Master::__registerSlave(
       std::move(agentCapabilities),
       Clock::now(),
       std::move(checkpointedResources),
-      protobuf::parseResourceVersions(
-          registerSlaveMessage.resource_version_uuids()));
+      resourceVersion);
 
   ++metrics->slave_registrations;
 
@@ -6812,6 +6820,15 @@ void Master::__reregisterSlave(
   vector<ExecutorInfo> executorInfos = google::protobuf::convert(
       std::move(*reregisterSlaveMessage.mutable_executor_infos()));
 
+  Option<UUID> resourceVersion;
+  if (reregisterSlaveMessage.has_resource_version_uuid()) {
+    Try<UUID> uuid = UUID::fromBytes(
+        reregisterSlaveMessage.resource_version_uuid());
+
+    CHECK_SOME(uuid);
+    resourceVersion = uuid.get();
+  }
+
   Slave* slave = new Slave(
       this,
       slaveInfo,
@@ -6821,8 +6838,7 @@ void Master::__reregisterSlave(
       std::move(agentCapabilities),
       Clock::now(),
       std::move(checkpointedResources),
-      protobuf::parseResourceVersions(
-          reregisterSlaveMessage.resource_version_uuids()),
+      resourceVersion,
       std::move(executorInfos),
       std::move(recoveredTasks));
 
@@ -6945,11 +6961,21 @@ void Master::___reregisterSlave(
   const string& version = reregisterSlaveMessage.version();
   const vector<SlaveInfo::Capability> agentCapabilities =
     google::protobuf::convert(reregisterSlaveMessage.agent_capabilities());
-  const vector<ResourceVersionUUID> resourceVersions =
-    google::protobuf::convert(reregisterSlaveMessage.resource_version_uuids());
 
-  Try<Nothing> stateUpdated =
-    slave->update(slaveInfo, version, agentCapabilities, resourceVersions);
+  Option<UUID> resourceVersion;
+  if (reregisterSlaveMessage.has_resource_version_uuid()) {
+    Try<UUID> uuid = UUID::fromBytes(
+        reregisterSlaveMessage.resource_version_uuid());
+
+    CHECK_SOME(uuid);
+    resourceVersion = uuid.get();
+  }
+
+  Try<Nothing> stateUpdated = slave->update(
+      slaveInfo,
+      version,
+      agentCapabilities,
+      resourceVersion);
 
   // As of now, the only way `slave->update()` can fail is if the agent sent
   // different checkpointed resources than it had before. A well-behaving
@@ -11274,7 +11300,7 @@ Slave::Slave(
     vector<SlaveInfo::Capability> _capabilites,
     const Time& _registeredTime,
     vector<Resource> _checkpointedResources,
-    hashmap<Option<ResourceProviderID>, UUID> _resourceVersions,
+    const Option<UUID>& resourceVersion,
     vector<ExecutorInfo> executorInfos,
     vector<Task> tasks)
   : master(_master),
@@ -11288,8 +11314,7 @@ Slave::Slave(
     connected(true),
     active(true),
     checkpointedResources(std::move(_checkpointedResources)),
-    observer(nullptr),
-    resourceVersions(std::move(_resourceVersions))
+    observer(nullptr)
 {
   CHECK(info.has_id());
 
@@ -11301,6 +11326,10 @@ Slave::Slave(
   CHECK_SOME(resources);
   totalResources = resources.get();
 
+  if (resourceVersion.isSome()) {
+    resourceVersions.put(None(), resourceVersion.get());
+  }
+
   foreach (ExecutorInfo& executorInfo, executorInfos) {
     CHECK(executorInfo.has_framework_id());
     addExecutor(executorInfo.framework_id(), std::move(executorInfo));
@@ -11578,10 +11607,10 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
 
 
 Try<Nothing> Slave::update(
-  const SlaveInfo& _info,
-  const string& _version,
-  const vector<SlaveInfo::Capability>& _capabilities,
-  const vector<ResourceVersionUUID>& _resourceVersions)
+    const SlaveInfo& _info,
+    const string& _version,
+    const vector<SlaveInfo::Capability>& _capabilities,
+    const Option<UUID>& resourceVersion)
 {
   Try<Resources> resources = applyCheckpointedResources(
       _info.resources(),
@@ -11602,8 +11631,9 @@ Try<Nothing> Slave::update(
   // re-registering in this case.
   totalResources = resources.get();
 
-  resourceVersions = protobuf::parseResourceVersions(
-      {_resourceVersions.begin(), _resourceVersions.end()});
+  if (resourceVersion.isSome()) {
+    resourceVersions.put(None(), resourceVersion.get());
+  }
 
   return Nothing();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 7411e0b..232cc37 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -124,7 +124,7 @@ struct Slave
         std::vector<SlaveInfo::Capability> _capabilites,
         const process::Time& _registeredTime,
         std::vector<Resource> _checkpointedResources,
-        hashmap<Option<ResourceProviderID>, UUID> _resourceVersions,
+        const Option<UUID>& resourceVersion,
         std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
         std::vector<Task> tasks = std::vector<Task>());
 
@@ -177,10 +177,10 @@ struct Slave
   void apply(const std::vector<ResourceConversion>& conversions);
 
   Try<Nothing> update(
-    const SlaveInfo& info,
-    const std::string& _version,
-    const std::vector<SlaveInfo::Capability>& _capabilites,
-    const std::vector<ResourceVersionUUID>& resourceVersions);
+      const SlaveInfo& info,
+      const std::string& _version,
+      const std::vector<SlaveInfo::Capability>& _capabilites,
+      const Option<UUID>& resourceVersion);
 
   Master* const master;
   const SlaveID id;
@@ -263,12 +263,16 @@ struct Slave
   // persistent volumes, dynamic reservations, etc). These are either
   // in use by a task/executor, or are available for use and will be
   // re-offered to the framework.
+  // TODO(jieyu): `checkpointedResources` is only for agent default
+  // resources. Resources from resource providers are not included in
+  // this field. Consider removing this field.
   Resources checkpointedResources;
 
   // The current total resources of the slave. Note that this is
   // different from 'info.resources()' because this also considers
   // operations (e.g., CREATE, RESERVE) that have been applied and
-  // includes revocable resources as well.
+  // includes revocable resources and resources from resource
+  // providers as well.
   Resources totalResources;
 
   SlaveObserver* observer;

http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index a13a641..e680cd5 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -514,18 +514,19 @@ message RegisterSlaveMessage {
   // frameworks).
   repeated SlaveInfo.Capability agent_capabilities = 4;
 
-  // Used to establish the relationship between the operation and the
+  // Resource version UUID for agent default resources. Used to
+  // establish the relationship between the operation and the
   // resources that the operation is operating on. Each resource
-  // provider will keep a resource version UUID, and change it when
-  // it believes that the resources from this resource provider are
-  // out of sync from the master's view.  The master will keep track
-  // of the last known resource version UUID for each resource
-  // provider, and attach the resource version UUID in each operation
-  // it sends out. The resource provider should reject operations that
-  // have a different resource version UUID than that it maintains,
-  // because this means the operation is operating on resources that
-  // might have already been invalidated.
-  repeated ResourceVersionUUID resource_version_uuids = 5;
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  optional bytes resource_version_uuid = 5;
 }
 
 
@@ -572,18 +573,19 @@ message ReregisterSlaveMessage {
   // frameworks).
   repeated SlaveInfo.Capability agent_capabilities = 9;
 
-  // Used to establish the relationship between the operation and the
+  // Resource version UUID for agent default resources. Used to
+  // establish the relationship between the operation and the
   // resources that the operation is operating on. Each resource
-  // provider will keep a resource version UUID, and change it when
-  // it believes that the resources from this resource provider are
-  // out of sync from the master's view.  The master will keep track
-  // of the last known resource version UUID for each resource
-  // provider, and attach the resource version UUID in each operation
-  // it sends out. The resource provider should reject operations that
-  // have a different resource version UUID than that it maintains,
-  // because this means the operation is operating on resources that
-  // might have already been invalidated.
-  repeated ResourceVersionUUID resource_version_uuids = 10;
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  optional bytes resource_version_uuid = 10;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5d4cd6d..302bcd3 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1540,19 +1540,11 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
-    ResourceVersionUUID* uuid = message.add_resource_version_uuids();
-    uuid->set_uuid(resourceVersion.toBytes());
-
-    foreachvalue (ResourceProvider* provider, resourceProviders) {
-      ResourceVersionUUID* uuid = message.add_resource_version_uuids();
-      CHECK(provider->info.has_id());
-      uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
-      uuid->set_uuid(provider->resourceVersion.toBytes());
-    }
-
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
 
+    message.set_resource_version_uuid(resourceVersion.toBytes());
+
     send(master.get(), message);
   } else {
     // Re-registering, so send tasks running.
@@ -1562,19 +1554,11 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
-    ResourceVersionUUID* uuid = message.add_resource_version_uuids();
-    uuid->set_uuid(resourceVersion.toBytes());
-
-    foreachvalue (ResourceProvider* provider, resourceProviders) {
-      ResourceVersionUUID* uuid = message.add_resource_version_uuids();
-      CHECK(provider->info.has_id());
-      uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
-      uuid->set_uuid(provider->resourceVersion.toBytes());
-    }
-
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
 
+    message.set_resource_version_uuid(resourceVersion.toBytes());
+
     message.mutable_slave()->CopyFrom(slaveInfo);
 
     foreachvalue (Framework* framework, frameworks) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/848767b4/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 0fb2a63..5228e03 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8961,7 +8961,9 @@ TEST_F(SlaveTest, ResourceVersions)
     FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
 
   StandaloneMasterDetector detector(master.get()->pid);
+
   slave::Flags slaveFlags = CreateSlaveFlags();
+
   Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
   ASSERT_SOME(slave);
 
@@ -8972,9 +8974,7 @@ TEST_F(SlaveTest, ResourceVersions)
 
   // Since no resource providers registered, the agent only sends its
   // own resource version uuid. The agent has no resource provider id.
-  ASSERT_EQ(1u, registerSlaveMessage->resource_version_uuids().size());
-  EXPECT_FALSE(registerSlaveMessage->resource_version_uuids(0)
-                 .has_resource_provider_id());
+  ASSERT_TRUE(registerSlaveMessage->has_resource_version_uuid());
 
   // Check that the agent sends its resource version uuid in
   // `ReregisterSlaveMessage`.
@@ -8991,15 +8991,13 @@ TEST_F(SlaveTest, ResourceVersions)
   AWAIT_READY(reregisterSlaveMessage);
 
   // No resource changes occurred on the agent and we expect the
-  // resource version uuids to be unchanged to the ones sent in the
+  // resource version uuid to be unchanged to the one sent in the
   // original registration.
-  ASSERT_EQ(
-      registerSlaveMessage->resource_version_uuids_size(),
-      reregisterSlaveMessage->resource_version_uuids_size());
+  ASSERT_TRUE(reregisterSlaveMessage->has_resource_version_uuid());
 
   EXPECT_EQ(
-      registerSlaveMessage->resource_version_uuids(0),
-      reregisterSlaveMessage->resource_version_uuids(0));
+      registerSlaveMessage->resource_version_uuid(),
+      reregisterSlaveMessage->resource_version_uuid());
 }
 
 


[3/3] mesos git commit: Refactored agent to keep track of local resource providers.

Posted by ji...@apache.org.
Refactored agent to keep track of local resource providers.

Currently, we don't explicitly keep track of local resources providers.
This causes the logic for a few methods to be quite complex because we
need to reconstruct the resource provider information everytime.

Review: https://reviews.apache.org/r/64477


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c9861e1a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c9861e1a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c9861e1a

Branch: refs/heads/master
Commit: c9861e1ae5225b4ee2cb160bbb53c3ea9fafd021
Parents: 3f862f3
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Dec 8 17:31:24 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Dec 11 14:02:00 2017 -0800

----------------------------------------------------------------------
 src/master/validation.cpp                     |   3 +
 src/resource_provider/manager.cpp             |  18 +-
 src/resource_provider/message.hpp             |   9 +-
 src/slave/http.cpp                            |  11 +-
 src/slave/slave.cpp                           | 368 +++++++++++----------
 src/slave/slave.hpp                           |  65 +++-
 src/tests/resource_provider_manager_tests.cpp |   2 +-
 7 files changed, 286 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 38d9a3c..585d8bf 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -1929,6 +1929,9 @@ Option<Error> validateInverseOffers(
 
 namespace operation {
 
+// TODO(jieyu): Validate that resources in an operation is not empty.
+
+
 Option<Error> validate(
     const Offer::Operation::Reserve& reserve,
     const Option<Principal>& principal,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index f98611c..bfc917f 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -646,18 +646,26 @@ void ResourceProviderManagerProcess::updateState(
 
   // TODO(chhsiao): Report pending operations.
 
-  Try<UUID> resourceVersionUuid =
+  Try<UUID> resourceVersion =
     UUID::fromBytes(update.resource_version_uuid());
 
-  CHECK_SOME(resourceVersionUuid)
+  CHECK_SOME(resourceVersion)
     << "Could not deserialize version of resource provider "
-    << resourceProvider->info.id() << ": " << resourceVersionUuid.error();
+    << resourceProvider->info.id() << ": " << resourceVersion.error();
+
+  hashmap<UUID, OfferOperation> offerOperations;
+  foreach (const OfferOperation &operation, update.operations()) {
+    Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+    CHECK_SOME(uuid);
+
+    offerOperations.put(uuid.get(), operation);
+  }
 
   ResourceProviderMessage::UpdateState updateState{
       resourceProvider->info,
-      resourceVersionUuid.get(),
+      resourceVersion.get(),
       update.resources(),
-      {update.operations().begin(), update.operations().end()}};
+      std::move(offerOperations)};
 
   ResourceProviderMessage message;
   message.type = ResourceProviderMessage::Type::UPDATE_STATE;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index bbf6bb2..eab90cf 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -24,6 +24,7 @@
 #include <mesos/resources.hpp>
 
 #include <stout/check.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/jsonify.hpp>
 #include <stout/option.hpp>
 #include <stout/protobuf.hpp>
@@ -46,9 +47,9 @@ struct ResourceProviderMessage
   struct UpdateState
   {
     ResourceProviderInfo info;
-    UUID resourceVersionUuid;
-    Resources total;
-    std::vector<OfferOperation> operations;
+    UUID resourceVersion;
+    Resources totalResources;
+    hashmap<UUID, OfferOperation> offerOperations;
   };
 
   struct UpdateOfferOperationStatus
@@ -77,7 +78,7 @@ inline std::ostream& operator<<(
       return stream
           << "UPDATE_STATE: "
           << updateState->info.id() << " "
-          << updateState->total;
+          << updateState->totalResources;
     }
 
     case ResourceProviderMessage::Type::UPDATE_OFFER_OPERATION_STATUS: {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 738786f..f71adbc 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -1883,14 +1883,13 @@ Future<Response> Http::getResourceProviders(
   agent::Response::GetResourceProviders* resourceProviders =
     response.mutable_get_resource_providers();
 
-  foreachvalue (
-      const ResourceProviderInfo& resourceProviderInfo,
-      slave->resourceProviderInfos) {
-    agent::Response::GetResourceProviders::ResourceProvider* resourceProvider =
+  foreachvalue (ResourceProvider* resourceProvider,
+                slave->resourceProviders) {
+    agent::Response::GetResourceProviders::ResourceProvider* provider =
       resourceProviders->add_resource_providers();
 
-    resourceProvider->mutable_resource_provider_info()->CopyFrom(
-        resourceProviderInfo);
+    provider->mutable_resource_provider_info()
+      ->CopyFrom(resourceProvider->info);
   }
 
   return OK(serialize(acceptType, evolve(response)), stringify(acceptType));

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 373e393..5d4cd6d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -222,7 +222,7 @@ Slave::Slave(const string& id,
     qosController(_qosController),
     secretGenerator(_secretGenerator),
     authorizer(_authorizer),
-    resourceVersions({{Option<ResourceProviderID>::none(), UUID::random()}}) {}
+    resourceVersion(UUID::random()) {}
 
 
 Slave::~Slave()
@@ -1540,8 +1540,15 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
-    message.mutable_resource_version_uuids()->CopyFrom(
-        protobuf::createResourceVersions(resourceVersions));
+    ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+    uuid->set_uuid(resourceVersion.toBytes());
+
+    foreachvalue (ResourceProvider* provider, resourceProviders) {
+      ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+      CHECK(provider->info.has_id());
+      uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
+      uuid->set_uuid(provider->resourceVersion.toBytes());
+    }
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
@@ -1555,8 +1562,15 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.mutable_agent_capabilities()->CopyFrom(
         capabilities.toRepeatedPtrField());
 
-    message.mutable_resource_version_uuids()->CopyFrom(
-        protobuf::createResourceVersions(resourceVersions));
+    ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+    uuid->set_uuid(resourceVersion.toBytes());
+
+    foreachvalue (ResourceProvider* provider, resourceProviders) {
+      ResourceVersionUUID* uuid = message.add_resource_version_uuids();
+      CHECK(provider->info.has_id());
+      uuid->mutable_resource_provider_id()->CopyFrom(provider->info.id());
+      uuid->set_uuid(provider->resourceVersion.toBytes());
+    }
 
     // Include checkpointed resources.
     message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources_);
@@ -2258,35 +2272,37 @@ void Slave::__run(
   // TODO(bbannier): Also check executor resources.
   bool kill = false;
   if (!resourceVersionUuids.empty()) {
-    hashset<Option<ResourceProviderID>> usedResourceProviders;
+    hashset<Option<ResourceProviderID>> usedResourceProviderIds;
     foreach (const TaskInfo& _task, tasks) {
       foreach (const Resource& resource, _task.resources()) {
-        if (resource.has_provider_id()) {
-          usedResourceProviders.insert(resource.provider_id());
-        } else {
-          usedResourceProviders.insert(None());
-        }
+        usedResourceProviderIds.insert(resource.has_provider_id()
+           ? Option<ResourceProviderID>(resource.provider_id())
+           : None());
       }
     }
 
     const hashmap<Option<ResourceProviderID>, UUID> receivedResourceVersions =
-      protobuf::parseResourceVersions(
-          {resourceVersionUuids.begin(), resourceVersionUuids.end()});
+      protobuf::parseResourceVersions({
+          resourceVersionUuids.begin(),
+          resourceVersionUuids.end()});
 
-    foreach (auto&& resourceProvider, usedResourceProviders) {
-      Option<Error> error = None();
+    foreach (const Option<ResourceProviderID>& resourceProviderId,
+             usedResourceProviderIds) {
+      if (resourceProviderId.isNone()) {
+        CHECK(receivedResourceVersions.contains(None()));
 
-      if (!resourceVersions.contains(resourceProvider)) {
-        // We do not expect the agent to forget about itself.
-        CHECK_SOME(resourceProvider);
-        kill = true;
-      }
-
-      CHECK(receivedResourceVersions.contains(resourceProvider));
+        if (resourceVersion != receivedResourceVersions.at(None())) {
+          kill = true;
+        }
+      } else {
+        ResourceProvider* resourceProvider =
+          getResourceProvider(resourceProviderId.get());
 
-      if (resourceVersions.at(resourceProvider) !=
-          receivedResourceVersions.at(resourceProvider)) {
-        kill = true;
+        if (resourceProvider == nullptr ||
+            resourceProvider->resourceVersion !=
+              receivedResourceVersions.at(resourceProviderId.get())) {
+          kill = true;
+        }
       }
     }
   }
@@ -7010,76 +7026,44 @@ UpdateSlaveMessage Slave::generateOversubscribedUpdate() const
 
 UpdateSlaveMessage Slave::generateResourceProviderUpdate() const
 {
-  UpdateSlaveMessage message;
-
-  message.mutable_slave_id()->CopyFrom(info.id());
-
   // Agent information (total resources, offer operations, resource
   // versions) is not passed as part of some `ResourceProvider`, but
   // globally in `UpdateStateMessage`.
   //
   // TODO(bbannier): Pass agent information as a resource provider.
-
-  // Process total resources.
-  hashmap<ResourceProviderID, UpdateSlaveMessage::ResourceProvider>
-    resourceProviders;
-
-  foreach (const Resource& resource, totalResources) {
-    if (resource.has_provider_id()) {
-      resourceProviders[resource.provider_id()].add_total_resources()->CopyFrom(
-          resource);
-    }
-  }
-
-  // Process offer operations.
-  UpdateSlaveMessage::OfferOperations* operations =
-    message.mutable_offer_operations();
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+  message.set_resource_version_uuid(resourceVersion.toBytes());
+  message.mutable_offer_operations();
 
   foreachvalue (const OfferOperation* operation, offerOperations) {
     Result<ResourceProviderID> resourceProviderId =
       getResourceProviderId(operation->info());
 
-    if (resourceProviderId.isSome()) {
-      resourceProviders[resourceProviderId.get()]
-        .mutable_operations()
-        ->add_operations()
-        ->CopyFrom(*operation);
-    } else if (resourceProviderId.isNone()) {
-      operations->add_operations()->CopyFrom(*operation);
+    if (resourceProviderId.isNone()) {
+      message.mutable_offer_operations()
+        ->add_operations()->CopyFrom(*operation);
     }
   }
 
-  // Make sure 'offer_operations' is always set for resource providers.
-  foreachkey (
-      const ResourceProviderID& resourceProviderId,
-      resourceProviderInfos) {
-    resourceProviders[resourceProviderId].mutable_operations();
-  }
-
-  // Process resource versions.
-  CHECK(resourceVersions.contains(None()));
-  message.set_resource_version_uuid(resourceVersions.at(None()).toBytes());
-
-  foreachpair (
-      const ResourceProviderID& providerId,
-      UpdateSlaveMessage::ResourceProvider& provider,
-      resourceProviders) {
-    CHECK(resourceVersions.contains(providerId));
-    provider.set_resource_version_uuid(
-        resourceVersions.at(providerId).toBytes());
+  foreachvalue (ResourceProvider* resourceProvider, resourceProviders) {
+    UpdateSlaveMessage::ResourceProvider* provider =
+      message.mutable_resource_providers()->add_providers();
 
-    CHECK(resourceProviderInfos.contains(providerId));
-    provider.mutable_info()->CopyFrom(resourceProviderInfos.at(providerId));
-  }
+    provider->mutable_info()->CopyFrom(
+        resourceProvider->info);
+    provider->mutable_total_resources()->CopyFrom(
+        resourceProvider->totalResources);
+    provider->set_resource_version_uuid(
+        resourceProvider->resourceVersion.toBytes());
 
-  // We only actually surface resource-provider related information if
-  // this agent is resource provider-capable.
-  if (capabilities.resourceProvider) {
-    list<UpdateSlaveMessage::ResourceProvider> resourceProviders_ =
-      resourceProviders.values();
+    provider->mutable_operations();
 
-    message.mutable_resource_providers()->mutable_providers()->CopyFrom(
-        {resourceProviders_.begin(), resourceProviders_.end()});
+    foreachvalue (const OfferOperation* operation,
+                  resourceProvider->offerOperations) {
+      provider->mutable_operations()
+        ->add_operations()->CopyFrom(*operation);
+    }
   }
 
   return message;
@@ -7120,75 +7104,61 @@ void Slave::handleResourceProviderMessage(
     case ResourceProviderMessage::Type::UPDATE_STATE: {
       CHECK_SOME(message->updateState);
 
-      const Resources& newTotal = message->updateState->total;
+      const ResourceProviderMessage::UpdateState& updateState =
+        message->updateState.get();
 
-      CHECK(message->updateState->info.has_id());
+      CHECK(updateState.info.has_id());
+      const ResourceProviderID& resourceProviderId = updateState.info.id();
 
-      const ResourceProviderID& resourceProviderId =
-        message->updateState->info.id();
+      ResourceProvider* resourceProvider =
+        getResourceProvider(resourceProviderId);
 
-      if (resourceProviderInfos.contains(resourceProviderId)) {
-        resourceProviderInfos[resourceProviderId] = message->updateState->info;
-      } else {
-        resourceProviderInfos.put(
-            resourceProviderId,
-            message->updateState->info);
-      }
+      if (resourceProvider == nullptr) {
+        resourceProvider = new ResourceProvider(
+            updateState.info,
+            updateState.totalResources,
+            updateState.resourceVersion);
 
-      const Resources oldTotal =
-        totalResources.filter([&resourceProviderId](const Resource& resource) {
-          return resource.provider_id() == resourceProviderId;
-        });
+        addResourceProvider(resourceProvider);
 
-      bool updated = false;
-
-      if (oldTotal != newTotal) {
-        totalResources -= oldTotal;
-        totalResources += newTotal;
-
-        updated = true;
-      }
-
-      // Update offer operation state.
-      //
-      // We only update offer operations which are not contained in both the
-      // known and just received sets. All other offer operations will be
-      // updated via relayed offer operation status updates.
-      auto isForResourceProvider = [resourceProviderId](
-                                      const OfferOperation& operation) {
-        Result<ResourceProviderID> id = getResourceProviderId(operation.info());
-        return id.isSome() && resourceProviderId == id.get();
-      };
-
-      hashmap<UUID, OfferOperation*> knownOfferOperations;
-      foreachpair(auto&& uuid, auto&& operation, offerOperations) {
-        if (isForResourceProvider(*operation)) {
-          knownOfferOperations.put(uuid, operation);
+        foreachvalue (const OfferOperation& operation,
+                      updateState.offerOperations) {
+          addOfferOperation(new OfferOperation(operation));
         }
-      }
 
-      hashmap<UUID, OfferOperation> receivedOfferOperations;
-      foreach (
-          const OfferOperation& operation,
-          message->updateState->operations) {
-        CHECK(isForResourceProvider(operation))
-          << "Received operation on unexpected resource provider "
-          << "from resource provider " << resourceProviderId;
-
-        Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
-        CHECK_SOME(operationUuid);
+        // Update the 'total' in the Slave.
+        totalResources += updateState.totalResources;
+      } else {
+        // Always update the resource provider info.
+        resourceProvider->info = updateState.info;
 
-        receivedOfferOperations.put(operationUuid.get(), operation);
-      }
+        if (resourceProvider->totalResources != updateState.totalResources) {
+          // Update the 'total' in the Slave.
+          CHECK(totalResources.contains(resourceProvider->totalResources));
+          totalResources -= resourceProvider->totalResources;
+          totalResources += updateState.totalResources;
 
-      const hashset<UUID> knownUuids = knownOfferOperations.keys();
-      const hashset<UUID> receivedUuids = receivedOfferOperations.keys();
+          // Update the 'total' in the resource provider.
+          resourceProvider->totalResources = updateState.totalResources;
+        }
 
-      if (knownUuids != receivedUuids) {
-        // Handle offer operations known to the agent but not reported by the
-        // resource provider. These could be operations where the agent has
-        // started tracking an offer operation, but the resource provider failed
-        // over before it could bookkeep the operation.
+        // Update offer operation state.
+        //
+        // We only update offer operations which are not contained in
+        // both the known and just received sets. All other offer
+        // operations will be updated via relayed offer operation
+        // status updates.
+        const hashset<UUID> knownUuids =
+          resourceProvider->offerOperations.keys();
+
+        const hashset<UUID> receivedUuids =
+          updateState.offerOperations.keys();
+
+        // Handle offer operations known to the agent but not reported
+        // by the resource provider. These could be operations where
+        // the agent has started tracking an offer operation, but the
+        // resource provider failed over before it could bookkeep the
+        // operation.
         //
         // NOTE: We do not mutate offer operations statuses here; this
         // would be the responsibility of a offer operation status
@@ -7203,13 +7173,13 @@ void Slave::handleResourceProviderMessage(
                 disappearedOperations, disappearedOperations.begin()));
 
         foreach (const UUID& uuid, disappearedOperations) {
-          // TODO(bbannier): Instead of simply dropping an operation with
-          // `removeOfferOperation` here we should instead send a `Reconcile`
-          // message with a failed state to the resource provider so its status
-          // update manager can reliably deliver the operation status to the
-          // framework.
-          CHECK(offerOperations.contains(uuid));
-          removeOfferOperation(offerOperations.at(uuid));
+          // TODO(bbannier): Instead of simply dropping an operation
+          // with `removeOfferOperation` here we should instead send a
+          // `Reconcile` message with a failed state to the resource
+          // provider so its status update manager can reliably
+          // deliver the operation status to the framework.
+          CHECK(resourceProvider->offerOperations.contains(uuid));
+          removeOfferOperation(resourceProvider->offerOperations.at(uuid));
         }
 
         // Handle offer operations known to the resource provider but
@@ -7228,27 +7198,13 @@ void Slave::handleResourceProviderMessage(
           //
           // NOTE: We do not need to update total resources here as its
           // state was sync explicitly with the received total above.
-          CHECK(receivedOfferOperations.contains(uuid));
+          CHECK(updateState.offerOperations.contains(uuid));
           addOfferOperation(
-              new OfferOperation(receivedOfferOperations.at(uuid)));
-        }
-
-        updated = true;
-      }
-
-      // Update resource version of this resource provider.
-      const UUID& resourceVersionUuid =
-        message->updateState->resourceVersionUuid;
-
-      if (!resourceVersions.contains(resourceProviderId) ||
-          resourceVersions.at(resourceProviderId) != resourceVersionUuid) {
-        if (resourceVersions.contains(resourceProviderId)) {
-          resourceVersions.at(resourceProviderId) = resourceVersionUuid;
-        } else {
-          resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+              new OfferOperation(updateState.offerOperations.at(uuid)));
         }
 
-        updated = true;
+        // Update resource version of this resource provider.
+        resourceProvider->resourceVersion = updateState.resourceVersion;
       }
 
       // Send the updated resources to the master if the agent is running. Note
@@ -7263,14 +7219,12 @@ void Slave::handleResourceProviderMessage(
           break;
         }
         case RUNNING: {
-          if (updated) {
-            LOG(INFO) << "Forwarding new total resources " << totalResources;
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
 
-            // Inform the master about the update from the resource provider.
-            send(master.get(), generateResourceProviderUpdate());
+          // Inform the master about the update from the resource provider.
+          send(master.get(), generateResourceProviderUpdate());
 
-            break;
-          }
+          break;
         }
       }
       break;
@@ -7338,6 +7292,22 @@ void Slave::addOfferOperation(OfferOperation* operation)
   CHECK_SOME(uuid);
 
   offerOperations.put(uuid.get(), operation);
+
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError())
+    << "Failed to get resource provider ID: "
+    << resourceProviderId.error();
+
+  if (resourceProviderId.isSome()) {
+    ResourceProvider* resourceProvider =
+      getResourceProvider(resourceProviderId.get());
+
+    CHECK_NOTNULL(resourceProvider);
+
+    resourceProvider->addOfferOperation(operation);
+  }
 }
 
 
@@ -7441,6 +7411,22 @@ void Slave::removeOfferOperation(OfferOperation* operation)
   Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
   CHECK_SOME(uuid);
 
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError())
+    << "Failed to get resource provider ID: "
+    << resourceProviderId.error();
+
+  if (resourceProviderId.isSome()) {
+    ResourceProvider* resourceProvider =
+      getResourceProvider(resourceProviderId.get());
+
+    CHECK_NOTNULL(resourceProvider);
+
+    resourceProvider->removeOfferOperation(operation);
+  }
+
   CHECK(offerOperations.contains(uuid.get()))
     << "Unknown offer operation (uuid: " << uuid->toString() << ")";
 
@@ -7458,6 +7444,26 @@ OfferOperation* Slave::getOfferOperation(const UUID& uuid) const
 }
 
 
+void Slave::addResourceProvider(ResourceProvider* resourceProvider)
+{
+  CHECK(resourceProvider->info.has_id());
+  CHECK(!resourceProviders.contains(resourceProvider->info.id()));
+
+  resourceProviders.put(
+      resourceProvider->info.id(),
+      resourceProvider);
+}
+
+
+ResourceProvider* Slave::getResourceProvider(const ResourceProviderID& id) const
+{
+  if (resourceProviders.contains(id)) {
+    return resourceProviders.at(id);
+  }
+  return nullptr;
+}
+
+
 void Slave::apply(const vector<ResourceConversion>& conversions)
 {
   Try<Resources> resources = totalResources.apply(conversions);
@@ -9076,6 +9082,30 @@ Resources Executor::allocatedResources() const
 }
 
 
+void ResourceProvider::addOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(!offerOperations.contains(uuid.get()))
+    << "Offer operation (uuid: " << uuid->toString() << ") already exists";
+
+  offerOperations.put(uuid.get(), operation);
+}
+
+
+void ResourceProvider::removeOfferOperation(OfferOperation* operation)
+{
+  Try<UUID> uuid = UUID::fromBytes(operation->operation_uuid());
+  CHECK_SOME(uuid);
+
+  CHECK(offerOperations.contains(uuid.get()))
+    << "Unknown offer operation (uuid: " << uuid->toString() << ")";
+
+  offerOperations.erase(uuid.get());
+}
+
+
 map<string, string> executorEnvironment(
     const Flags& flags,
     const ExecutorInfo& executorInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b3a1e70..7c40fc7 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -108,6 +108,7 @@ class Executor;
 class Framework;
 
 struct HttpConnection;
+struct ResourceProvider;
 
 
 class Slave : public ProtobufProcess<Slave>
@@ -577,6 +578,9 @@ private:
 
   OfferOperation* getOfferOperation(const UUID& uuid) const;
 
+  void addResourceProvider(ResourceProvider* resourceProvider);
+  ResourceProvider* getResourceProvider(const ResourceProviderID& id) const;
+
   void apply(const std::vector<ResourceConversion>& conversions);
 
   // Publish all resources that are needed to run the current set of
@@ -730,12 +734,28 @@ private:
 
   ResourceProviderManager resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
-  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
-
-  hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviderInfos;
 
-  // Pending operations or terminal operations that have
-  // unacknowledged status updates.
+  // Local resource providers known by the agent.
+  hashmap<ResourceProviderID, ResourceProvider*> resourceProviders;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  UUID resourceVersion;
+
+  // Keeps track of the following:
+  // (1) Pending operations for resources from the agent.
+  // (2) Pending operations or terminal operations that have
+  //     unacknowledged status updates for resource provider
+  //     provided resources.
   hashmap<UUID, OfferOperation*> offerOperations;
 };
 
@@ -1034,6 +1054,41 @@ private:
 };
 
 
+struct ResourceProvider
+{
+  ResourceProvider(
+      const ResourceProviderInfo& _info,
+      const Resources& _totalResources,
+      const UUID& _resourceVersion)
+    : info(_info),
+      totalResources(_totalResources),
+      resourceVersion(_resourceVersion) {}
+
+  void addOfferOperation(OfferOperation* operation);
+  void removeOfferOperation(OfferOperation* operation);
+
+  ResourceProviderInfo info;
+  Resources totalResources;
+
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  UUID resourceVersion;
+
+  // Pending operations or terminal operations that have
+  // unacknowledged status updates.
+  hashmap<UUID, OfferOperation*> offerOperations;
+};
+
+
 /**
  * Returns a map of environment variables necessary in order to launch
  * an executor.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c9861e1a/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index a6eb4c9..e37a53a 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -343,7 +343,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
     EXPECT_EQ(
         devolve(resourceProviderId.get()),
         message->updateState->info.id());
-    EXPECT_EQ(devolve(resources), message->updateState->total);
+    EXPECT_EQ(devolve(resources), message->updateState->totalResources);
   }
 }
 


[2/3] mesos git commit: Passed the message directly to the Master::registerSlave handler.

Posted by ji...@apache.org.
Passed the message directly to the Master::registerSlave handler.

Some fields in `RegisterSlaveMessage` will become optiona. This patch
prepares for that. Also, by passing a message directly to the handler,
it allows us to eliminate some copying by using rvalue references.

Review: https://reviews.apache.org/r/64487


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3157d7e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3157d7e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3157d7e

Branch: refs/heads/master
Commit: c3157d7ea328405aa1c9c05778b8ffc01884ac38
Parents: c9861e1
Author: Jie Yu <yu...@gmail.com>
Authored: Sun Dec 10 08:59:35 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Dec 11 14:02:00 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp     | 91 ++++++++++++++++++------------------------
 src/master/master.hpp     | 18 ++-------
 src/master/validation.cpp | 13 +++---
 src/master/validation.hpp |  8 +---
 4 files changed, 49 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 55e9195..120cb75 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -847,12 +847,7 @@ void Master::initialize()
       &FrameworkToExecutorMessage::data);
 
   install<RegisterSlaveMessage>(
-      &Master::registerSlave,
-      &RegisterSlaveMessage::slave,
-      &RegisterSlaveMessage::checkpointed_resources,
-      &RegisterSlaveMessage::version,
-      &RegisterSlaveMessage::agent_capabilities,
-      &RegisterSlaveMessage::resource_version_uuids);
+      &Master::registerSlave);
 
   install<ReregisterSlaveMessage>(
       &Master::reregisterSlave);
@@ -6034,11 +6029,7 @@ void Master::message(
 
 void Master::registerSlave(
     const UPID& from,
-    const SlaveInfo& slaveInfo,
-    const vector<Resource>& checkpointedResources,
-    const string& version,
-    const vector<SlaveInfo::Capability>& agentCapabilities,
-    const vector<ResourceVersionUUID>& resourceVersions)
+    RegisterSlaveMessage&& registerSlaveMessage)
 {
   ++metrics->messages_register_slave;
 
@@ -6050,11 +6041,7 @@ void Master::registerSlave(
       .onReady(defer(self(),
                      &Self::registerSlave,
                      from,
-                     slaveInfo,
-                     checkpointedResources,
-                     version,
-                     agentCapabilities,
-                     resourceVersions));
+                     std::move(registerSlaveMessage)));
     return;
   }
 
@@ -6070,8 +6057,8 @@ void Master::registerSlave(
     return;
   }
 
-  Option<Error> error = validation::master::message::registerSlave(
-      slaveInfo, checkpointedResources);
+  Option<Error> error =
+    validation::master::message::registerSlave(registerSlaveMessage);
 
   if (error.isSome()) {
     LOG(WARNING) << "Dropping registration of agent at " << from
@@ -6082,13 +6069,13 @@ void Master::registerSlave(
 
   if (slaves.registering.contains(from)) {
     LOG(INFO) << "Ignoring register agent message from " << from
-              << " (" << slaveInfo.hostname() << ") as registration"
-              << " is already in progress";
+              << " (" << registerSlaveMessage.slave().hostname()
+              << ") as registration is already in progress";
     return;
   }
 
-  LOG(INFO) << "Received register agent message from "
-            << from << " (" << slaveInfo.hostname() << ")";
+  LOG(INFO) << "Received register agent message from " << from
+            << " (" << registerSlaveMessage.slave().hostname() << ")";
 
   slaves.registering.insert(from);
 
@@ -6098,13 +6085,13 @@ void Master::registerSlave(
   // master (e.g. when writing to the registry).
   // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks`
   // here for consistency.
-  SlaveInfo _slaveInfo(slaveInfo);
   convertResourceFormat(
-      _slaveInfo.mutable_resources(), POST_RESERVATION_REFINEMENT);
+      registerSlaveMessage.mutable_slave()->mutable_resources(),
+      POST_RESERVATION_REFINEMENT);
 
-  std::vector<Resource> _checkpointedResources(checkpointedResources);
   convertResourceFormat(
-      &_checkpointedResources, POST_RESERVATION_REFINEMENT);
+      registerSlaveMessage.mutable_checkpointed_resources(),
+      POST_RESERVATION_REFINEMENT);
 
   // Note that the principal may be empty if authentication is not
   // required. Also it is passed along because it may be removed from
@@ -6114,30 +6101,24 @@ void Master::registerSlave(
   authorizeSlave(principal)
     .onAny(defer(self(),
                  &Self::_registerSlave,
-                 _slaveInfo,
                  from,
+                 std::move(registerSlaveMessage),
                  principal,
-                 _checkpointedResources,
-                 version,
-                 agentCapabilities,
-                 resourceVersions,
                  lambda::_1));
 }
 
 
 void Master::_registerSlave(
-    const SlaveInfo& slaveInfo,
     const UPID& pid,
+    RegisterSlaveMessage&& registerSlaveMessage,
     const Option<string>& principal,
-    const vector<Resource>& checkpointedResources,
-    const string& version,
-    const vector<SlaveInfo::Capability>& agentCapabilities,
-    const vector<ResourceVersionUUID>& resourceVersions,
     const Future<bool>& authorized)
 {
   CHECK(!authorized.isDiscarded());
   CHECK(slaves.registering.contains(pid));
 
+  const SlaveInfo& slaveInfo = registerSlaveMessage.slave();
+
   Option<string> authorizationError = None();
 
   if (authorized.isFailed()) {
@@ -6189,6 +6170,7 @@ void Master::_registerSlave(
   // Ignore registration attempts by agents running old Mesos versions.
   // We expect that the agent's version is in SemVer format; if the
   // version cannot be parsed, the registration attempt is ignored.
+  const string& version = registerSlaveMessage.version();
   Try<Version> parsedVersion = Version::parse(version);
 
   if (parsedVersion.isError()) {
@@ -6253,38 +6235,36 @@ void Master::_registerSlave(
   }
 
   // Create and add the slave id.
-  SlaveInfo slaveInfo_ = slaveInfo;
-  slaveInfo_.mutable_id()->CopyFrom(newSlaveId());
+  SlaveID slaveId = newSlaveId();
 
   LOG(INFO) << "Registering agent at " << pid << " ("
-            << slaveInfo.hostname() << ") with id " << slaveInfo_.id();
+            << slaveInfo.hostname() << ") with id " << slaveId;
+
+  SlaveInfo slaveInfo_ = slaveInfo;
+  slaveInfo_.mutable_id()->CopyFrom(slaveId);
+
+  registerSlaveMessage.mutable_slave()->mutable_id()->CopyFrom(slaveId);
 
   registrar->apply(Owned<Operation>(new AdmitSlave(slaveInfo_)))
     .onAny(defer(self(),
                  &Self::__registerSlave,
-                 slaveInfo_,
                  pid,
-                 checkpointedResources,
-                 version,
-                 agentCapabilities,
-                 resourceVersions,
+                 std::move(registerSlaveMessage),
                  lambda::_1));
 }
 
 
 void Master::__registerSlave(
-    const SlaveInfo& slaveInfo,
     const UPID& pid,
-    const vector<Resource>& checkpointedResources,
-    const string& version,
-    const vector<SlaveInfo::Capability>& agentCapabilities,
-    const vector<ResourceVersionUUID>& resourceVersions,
+    RegisterSlaveMessage&& registerSlaveMessage,
     const Future<bool>& admit)
 {
   CHECK(slaves.registering.contains(pid));
 
   CHECK(!admit.isDiscarded());
 
+  const SlaveInfo& slaveInfo = registerSlaveMessage.slave();
+
   if (admit.isFailed()) {
     LOG(FATAL) << "Failed to admit agent " << slaveInfo.id() << " at " << pid
                << " (" << slaveInfo.hostname() << "): " << admit.failure();
@@ -6312,17 +6292,22 @@ void Master::__registerSlave(
   machineId.set_hostname(slaveInfo.hostname());
   machineId.set_ip(stringify(pid.address.ip));
 
+  vector<SlaveInfo::Capability> agentCapabilities = google::protobuf::convert(
+      std::move(*registerSlaveMessage.mutable_agent_capabilities()));
+  vector<Resource> checkpointedResources = google::protobuf::convert(
+      std::move(*registerSlaveMessage.mutable_checkpointed_resources()));
+
   Slave* slave = new Slave(
       this,
       slaveInfo,
       pid,
       machineId,
-      version,
-      agentCapabilities,
+      registerSlaveMessage.version(),
+      std::move(agentCapabilities),
       Clock::now(),
-      checkpointedResources,
+      std::move(checkpointedResources),
       protobuf::parseResourceVersions(
-          {resourceVersions.begin(), resourceVersions.end()}));
+          registerSlaveMessage.resource_version_uuids()));
 
   ++metrics->slave_registrations;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5c26f20..7411e0b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -466,11 +466,7 @@ public:
 
   void registerSlave(
       const process::UPID& from,
-      const SlaveInfo& slaveInfo,
-      const std::vector<Resource>& checkpointedResources,
-      const std::string& version,
-      const std::vector<SlaveInfo::Capability>& agentCapabilities,
-      const std::vector<ResourceVersionUUID>& resourceVersions);
+      RegisterSlaveMessage&& registerSlaveMessage);
 
   void reregisterSlave(
       const process::UPID& from,
@@ -588,22 +584,14 @@ protected:
   void recoveredSlavesTimeout(const Registry& registry);
 
   void _registerSlave(
-      const SlaveInfo& slaveInfo,
       const process::UPID& pid,
+      RegisterSlaveMessage&& registerSlaveMessage,
       const Option<std::string>& principal,
-      const std::vector<Resource>& checkpointedResources,
-      const std::string& version,
-      const std::vector<SlaveInfo::Capability>& agentCapabilities,
-      const std::vector<ResourceVersionUUID>& resourceVersions,
       const process::Future<bool>& authorized);
 
   void __registerSlave(
-      const SlaveInfo& slaveInfo,
       const process::UPID& pid,
-      const std::vector<Resource>& checkpointedResources,
-      const std::string& version,
-      const std::vector<SlaveInfo::Capability>& agentCapabilities,
-      const std::vector<ResourceVersionUUID>& resourceVersions,
+      RegisterSlaveMessage&& registerSlaveMessage,
       const process::Future<bool>& admit);
 
   void _reregisterSlave(

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 585d8bf..a9b0805 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -264,23 +264,23 @@ static Option<Error> validateSlaveInfo(const SlaveInfo& slaveInfo)
 }
 
 
-Option<Error> registerSlave(
-    const SlaveInfo& slaveInfo,
-    const vector<Resource>& checkpointedResources)
+Option<Error> registerSlave(const RegisterSlaveMessage& message)
 {
+  const SlaveInfo& slaveInfo = message.slave();
+
   Option<Error> error = validateSlaveInfo(slaveInfo);
   if (error.isSome()) {
     return error.get();
   }
 
-  if (!checkpointedResources.empty()) {
+  if (!message.checkpointed_resources().empty()) {
     if (!slaveInfo.has_checkpoint() || !slaveInfo.checkpoint()) {
       return Error(
           "Checkpointed resources provided when checkpointing is not enabled");
     }
   }
 
-  foreach (const Resource& resource, checkpointedResources) {
+  foreach (const Resource& resource, message.checkpointed_resources()) {
     error = Resources::validate(resource);
     if (error.isSome()) {
       return error.get();
@@ -291,8 +291,7 @@ Option<Error> registerSlave(
 }
 
 
-Option<Error> reregisterSlave(
-    const ReregisterSlaveMessage& message)
+Option<Error> reregisterSlave(const ReregisterSlaveMessage& message)
 {
   hashset<FrameworkID> frameworkIDs;
   hashset<pair<FrameworkID, ExecutorID>> executorIDs;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index 30db3bf..7c129ce 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -66,12 +66,8 @@ namespace message {
 // guarantees at the libprocess level that would prevent arbitrary UPID
 // impersonation (MESOS-7424).
 
-Option<Error> registerSlave(
-    const SlaveInfo& slaveInfo,
-    const std::vector<Resource>& checkpointedResources);
-
-Option<Error> reregisterSlave(
-    const ReregisterSlaveMessage& message);
+Option<Error> registerSlave(const RegisterSlaveMessage& message);
+Option<Error> reregisterSlave(const ReregisterSlaveMessage& message);
 
 } // namespace message {
 } // namespace master {