You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/01/04 12:23:58 UTC

[3/3] mesos git commit: Fixed handling of checkpointed resources for RP-capable agents.

Fixed handling of checkpointed resources for RP-capable agents.

The master will not resend checkpointed resources when a resource
provider-capable agent reregisters. Instead the checkpointed resources
sent as part of the agent reregistration should be evaluated by the
master and be used to update its state.

This patch fixes the handling of checkpointed resources sent as part
of the agent reregistration so that the resources are used to update
the master state.

Review: https://reviews.apache.org/r/64889/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1a7c6fb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1a7c6fb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1a7c6fb

Branch: refs/heads/master
Commit: a1a7c6fb07898d22642ed76ce4068681ec05943e
Parents: 838850f
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Jan 4 11:12:27 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Jan 4 11:12:27 2018 +0100

----------------------------------------------------------------------
 src/master/master.cpp              |  16 +++--
 src/master/master.hpp              |   1 +
 src/tests/slave_recovery_tests.cpp | 112 ++++++++++++++++++++++++++++++++
 3 files changed, 125 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a1a7c6fb/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 282fdf8..28d8be3 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6620,9 +6620,6 @@ void Master::_reregisterSlave(
       return;
     }
 
-    // TODO(bevers): Verify that the checkpointed resources sent by the
-    // slave match the ones stored in `slave`.
-
     // Skip updating the registry if `slaveInfo` did not change from its
     // previously known state.
     if (slaveInfo == slave->info) {
@@ -7039,10 +7036,19 @@ void Master::___reregisterSlave(
     resourceVersion = uuid.get();
   }
 
+  // Update our view of checkpointed agent resources for resource
+  // provider-capable agents; for other agents the master will resend
+  // checkpointed resources after reregistration.
+  const Resources checkpointedResources =
+    slave->capabilities.resourceProvider
+      ? Resources(reregisterSlaveMessage.checkpointed_resources())
+      : slave->checkpointedResources;
+
   Try<Nothing> stateUpdated = slave->update(
       slaveInfo,
       version,
       agentCapabilities,
+      checkpointedResources,
       resourceVersion);
 
   // As of now, the only way `slave->update()` can fail is if the agent sent
@@ -11677,11 +11683,12 @@ Try<Nothing> Slave::update(
     const SlaveInfo& _info,
     const string& _version,
     const vector<SlaveInfo::Capability>& _capabilities,
+    const Resources& _checkpointedResources,
     const Option<id::UUID>& resourceVersion)
 {
   Try<Resources> resources = applyCheckpointedResources(
       _info.resources(),
-      checkpointedResources);
+      _checkpointedResources);
 
   // This should be validated during slave recovery.
   if (resources.isError()) {
@@ -11691,6 +11698,7 @@ Try<Nothing> Slave::update(
   version = _version;
   capabilities = _capabilities;
   info = _info;
+  checkpointedResources = _checkpointedResources;
 
   // There is a short window here where `totalResources` can have an old value,
   // but it should be relatively short because the agent will send

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1a7c6fb/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8fe9420..130f6e2 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -180,6 +180,7 @@ Slave(Master* const _master,
       const SlaveInfo& info,
       const std::string& _version,
       const std::vector<SlaveInfo::Capability>& _capabilites,
+      const Resources& _checkpointedResources,
       const Option<id::UUID>& resourceVersion);
 
   Master* const master;

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1a7c6fb/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 23a2909..e305d74 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -4881,6 +4881,118 @@ TYPED_TEST(SlaveRecoveryTest, CheckpointedResources)
 }
 
 
+// This test verifies that checkpointed resources sent by resource
+// provider-capable agents during agent reregistration overwrite the
+// master's view of that agent's checkpointed resources.
+TYPED_TEST(SlaveRecoveryTest, CheckpointedResourcesResourceProviderCapable)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = this->CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = this->StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger the agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, "foo");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Advance the clock to trigger a batch allocation.
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers1);
+  ASSERT_FALSE(offers1->empty());
+
+  // Below we send a reserve operation which is applied in the master
+  // speculatively. We drop the operation on its way to the agent so
+  // that the master's and the agent's view of the agent's
+  // checkpointed resources diverge.
+  Future<ApplyOperationMessage> applyOperationMessage =
+    DROP_PROTOBUF(ApplyOperationMessage(), _, _);
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  const Offer& offer1 = offers1->at(0);
+
+  const Resources offeredResources = offer1.resources();
+  const Resources dynamicallyReserved =
+    offeredResources.pushReservation(createDynamicReservationInfo(
+        frameworkInfo.roles(0), frameworkInfo.principal()));
+
+  driver.acceptOffers({offer1.id()}, {RESERVE(dynamicallyReserved)}, filters);
+
+  AWAIT_READY(applyOperationMessage);
+
+  // Restart and reregister the agent.
+  slave.get()->terminate();
+  slave = this->StartSlave(&detector, slaveFlags);
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Advance the clock to trigger an agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::settle();
+
+  // Advance the clock to trigger a batch allocation.
+  AWAIT_READY(updateSlaveMessage);
+
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
+
+  AWAIT_READY(offers2);
+  ASSERT_FALSE(offers2->empty());
+
+  const Offer& offer2 = offers2->at(0);
+
+  // The second offer will be identical to the first
+  // one as our operations never made it to the agent.
+  const Resources offeredResources1 = offer1.resources();
+  const Resources offeredResources2 = offer2.resources();
+
+  EXPECT_EQ(offeredResources1, offeredResources2);
+}
+
+
 // We explicitly instantiate a SlaveRecoveryTest for test cases where
 // we assume we'll only have the MesosContainerizer.
 class MesosContainerizerSlaveRecoveryTest