You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2017/12/05 21:59:06 UTC

[4/8] mesos git commit: Updated master behaviour to update agent state on reregistration.

Updated master behaviour to update agent state on reregistration.

When an agent reregisters, the master will now always update
the agent information it holds in memory, and will write any
changes back to the registry if necessary.

Note that most tests for this are added in a later review, since they
require the capability to actually restart the agent with
a changed state to effectively test the masters response to that.

Review: https://reviews.apache.org/r/64011/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2e8c7663
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2e8c7663
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2e8c7663

Branch: refs/heads/master
Commit: 2e8c76631a8cd0f3a46c9865fa3eb83e2f425de9
Parents: 9015cd3
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:55:42 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:55:42 2017 -0800

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp |  58 +++-
 src/master/allocator/mesos/hierarchical.hpp |   4 +
 src/master/master.cpp                       | 383 +++++++++++++++--------
 src/master/master.hpp                       |  19 +-
 src/tests/hierarchical_allocator_tests.cpp  |  70 +++++
 5 files changed, 389 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index b22829b..d348516 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -22,6 +22,7 @@
 #include <utility>
 #include <vector>
 
+#include <mesos/attributes.hpp>
 #include <mesos/resources.hpp>
 #include <mesos/type_utils.hpp>
 
@@ -622,14 +623,32 @@ void HierarchicalAllocatorProcess::updateSlave(
 
   Slave& slave = slaves.at(slaveId);
 
-  // We unconditionally overwrite the old domain and hostname: Even though
-  // the master places some restrictions on this (i.e. agents are not allowed
-  // to re-register with a different hostname) inside the allocator it
-  // doesn't matter, as the algorithm will work correctly either way.
-  slave.info = info;
-
   bool updated = false;
 
+  // Remove all offer filters for this slave if it was restarted with changed
+  // attributes. We do this because schedulers might have decided that they're
+  // not interested in offers from this slave based on the non-presence of some
+  // required attributes, and right now they have no other way of learning
+  // about this change.
+  // TODO(bennoe): Once the agent lifecycle design is implemented, there is a
+  // better way to notify frameworks about such changes and let them make this
+  // decision. We should think about ways to safely remove this check at that
+  // point in time.
+  if (!(Attributes(info.attributes()) == Attributes(slave.info.attributes()))) {
+    updated = true;
+    removeFilters(slaveId);
+  }
+
+  if (!(slave.info == info)) {
+    updated = true;
+
+    // We unconditionally overwrite the old domain and hostname: Even though
+    // the master places some restrictions on this (i.e. agents are not allowed
+    // to re-register with a different hostname) inside the allocator it
+    // doesn't matter, as the algorithm will work correctly either way.
+    slave.info = info;
+  }
+
   // Update agent capabilities.
   if (capabilities.isSome()) {
     protobuf::slave::Capabilities newCapabilities(capabilities.get());
@@ -698,6 +717,33 @@ void HierarchicalAllocatorProcess::addResourceProvider(
 }
 
 
+void HierarchicalAllocatorProcess::removeFilters(const SlaveID& slaveId)
+{
+  CHECK(initialized);
+
+  foreachpair (const FrameworkID& id,
+               Framework& framework,
+               frameworks) {
+    framework.inverseOfferFilters.erase(slaveId);
+
+    // Need a typedef here, otherwise the preprocessor gets confused
+    // by the comma in the template argument list.
+    typedef hashmap<SlaveID, hashset<OfferFilter*>> Filters;
+    foreachpair(const string& role,
+                Filters& filters,
+                framework.offerFilters) {
+      size_t erased = filters.erase(slaveId);
+      if (erased) {
+        frameworkSorters.at(role)->activate(id.value());
+        framework.suppressedRoles.erase(role);
+      }
+    }
+  }
+
+  LOG(INFO) << "Removed all filters for agent " << slaveId;
+}
+
+
 void HierarchicalAllocatorProcess::activateSlave(
     const SlaveID& slaveId)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 14dcf3d..0622026 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -555,6 +555,10 @@ private:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Resources& allocated);
+
+  // Helper that removes all existing offer filters for the given slave
+  // id.
+  void removeFilters(const SlaveID& slaveId);
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6679e25..6a52aad 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6346,6 +6346,11 @@ void Master::reregisterSlave(
     return;
   }
 
+  // TODO(bevers): Technically this behaviour seems to be incorrect, since we
+  // discard the newer re-registration attempt, which might have additional
+  // capabilities or a higher version (or a changed SlaveInfo, after Mesos 1.5).
+  // However, this should very rarely happen in practice, and nobody seems to
+  // have complained about it so far.
   if (slaves.reregistering.contains(slaveInfo.id())) {
     LOG(INFO)
       << "Ignoring re-register agent message from agent "
@@ -6532,7 +6537,8 @@ void Master::_reregisterSlave(
     // For now, we assume this slave is not nefarious (eventually
     // this will be handled by orthogonal security measures like key
     // based authentication).
-    LOG(INFO) << "Re-registering agent " << *slave;
+    VLOG(1) << "Agent is already marked as registered: " << slaveInfo.id()
+            << " at " << pid << " (" << slaveInfo.hostname() << ")";
 
     // We don't allow re-registering this way with a different IP or
     // hostname. This is because maintenance is scheduled at the
@@ -6556,142 +6562,64 @@ void Master::_reregisterSlave(
       return;
     }
 
-    // Update the slave pid and relink to it.
-    // NOTE: Re-linking the slave here always rather than only when
-    // the slave is disconnected can lead to multiple exited events
-    // in succession for a disconnected slave. As a result, we
-    // ignore duplicate exited events for disconnected slaves.
-    // See: https://issues.apache.org/jira/browse/MESOS-675
-    slave->pid = pid;
-    link(slave->pid);
-
-    // Update slave's version, re-registration timestamp and
-    // agent capabilities after re-registering successfully.
-    slave->version = version;
-    slave->reregisteredTime = Clock::now();
-    slave->capabilities = agentCapabilities;
-    slave->resourceVersions = protobuf::parseResourceVersions(
-        {resourceVersions.begin(), resourceVersions.end()});
-
-    allocator->updateSlave(slave->id, slave->info, None(), agentCapabilities);
-
-    // Reconcile tasks between master and slave, and send the
-    // `SlaveReregisteredMessage`.
-    reconcileKnownSlave(slave, executorInfos, tasks);
-
-    // If this is a disconnected slave, add it back to the allocator.
-    // This is done after reconciliation to ensure the allocator's
-    // offers include the recovered resources initially on this
-    // slave.
-    if (!slave->connected) {
-      CHECK(slave->reregistrationTimer.isSome());
-      Clock::cancel(slave->reregistrationTimer.get());
-
-      slave->connected = true;
-      dispatch(slave->observer, &SlaveObserver::reconnect);
-
-      slave->active = true;
-      allocator->activateSlave(slave->id);
-    }
-
-    CHECK(slave->active)
-      << "Unexpected connected but deactivated agent " << *slave;
-
-    // Inform the agent of the new framework pids for its tasks.
-    ___reregisterSlave(slave, frameworks);
-
-    slaves.reregistering.erase(slaveInfo.id());
-
-    // If the agent is not resource provider capable (legacy agent),
-    // send checkpointed resources to the agent. This is important for
-    // the cases where the master didn't fail over. In that case, the
-    // master might have already applied an operation that the agent
-    // didn't see (e.g., due to a breaking connection). This message
-    // will sync the state between the master and the agent about
-    // checkpointed resources.
-    //
-    // New agents that are resource provider capable will always
-    // update the master with total resources during re-registration.
-    // Therefore, no need to send checkpointed resources to the new
-    // agent in this case.
-    if (!slave->capabilities.resourceProvider) {
-      CheckpointResourcesMessage message;
-
-      message.mutable_resources()->CopyFrom(slave->checkpointedResources);
-
-      if (!slave->capabilities.reservationRefinement) {
-        // If the agent is not refinement-capable, don't send it
-        // checkpointed resources that contain refined reservations. This
-        // might occur if a reservation refinement is created but never
-        // reaches the agent (e.g., due to network partition), and then
-        // the agent is downgraded before the partition heals.
-        //
-        // TODO(neilc): It would probably be better to prevent the agent
-        // from re-registering in this scenario.
-        Try<Nothing> result = downgradeResources(message.mutable_resources());
-        if (result.isError()) {
-          LOG(WARNING) << "Not sending updated checkpointed resouces "
-                       << slave->checkpointedResources
-                       << " with refined reservations, since agent " << *slave
-                       << " is not RESERVATION_REFINEMENT-capable.";
-
-          return;
-        }
-      }
-
-      LOG(INFO) << "Sending updated checkpointed resources "
-                << slave->checkpointedResources
-                << " to agent " << *slave;
-
-      send(slave->pid, message);
-    }
-
-    return;
-  }
-
-  LOG(INFO) << "Re-registering agent " << slaveInfo.id() << " at " << pid
-            << " (" << slaveInfo.hostname() << ")";
-
-  if (slaves.recovered.contains(slaveInfo.id())) {
+    // TODO(bevers): Verify that the checkpointed resources sent by the
+    // slave match the ones stored in `slave`.
+    registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
+      .onAny(defer(self(),
+          &Self::___reregisterSlave,
+          slaveInfo,
+          pid,
+          executorInfos,
+          tasks,
+          frameworks,
+          version,
+          agentCapabilities,
+          resourceVersions,
+          lambda::_1));
+
+  } else if (slaves.recovered.contains(slaveInfo.id())) {
     // The agent likely is re-registering after a master failover as it
-    // is in the list recovered from the registry. No need to consult the
-    // registry in this case and we can directly re-admit it.
-    VLOG(1) << "Re-admitting recovered agent " << slaveInfo.id() << " at "
-            << pid << " (" << slaveInfo.hostname() << ")";
-
-    __reregisterSlave(
-        slaveInfo,
-        pid,
-        checkpointedResources,
-        executorInfos,
-        tasks,
-        frameworks,
-        completedFrameworks,
-        version,
-        agentCapabilities,
-        resourceVersions,
-        true);
+    // is in the list recovered from the registry.
+    VLOG(1) << "Re-admitting recovered agent " << slaveInfo.id()
+            << " at " << pid << "(" << slaveInfo.hostname() << ")";
+
+    registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
+      .onAny(defer(self(),
+          &Self::__reregisterSlave,
+          slaveInfo,
+          pid,
+          checkpointedResources,
+          executorInfos,
+          tasks,
+          frameworks,
+          completedFrameworks,
+          version,
+          agentCapabilities,
+          resourceVersions,
+          lambda::_1));
   } else {
-    // Consult the registry to determine whether to readmit the
-    // slave. In the common case, the slave has been marked unreachable
+    // In the common case, the slave has been marked unreachable
     // by the master, so we move the slave to the reachable list and
     // readmit it. If the slave isn't in the unreachable list (which
     // might occur if the slave's entry in the unreachable list is
     // GC'd), we admit the slave anyway.
+    VLOG(1) << "Consulting registry about agent " << slaveInfo.id()
+            << " at " << pid << "(" << slaveInfo.hostname() << ")";
+
     registrar->apply(Owned<Operation>(new MarkSlaveReachable(slaveInfo)))
       .onAny(defer(self(),
-                   &Self::__reregisterSlave,
-                   slaveInfo,
-                   pid,
-                   checkpointedResources,
-                   executorInfos,
-                   tasks,
-                   frameworks,
-                   completedFrameworks,
-                   version,
-                   agentCapabilities,
-                   resourceVersions,
-                   lambda::_1));
+          &Self::__reregisterSlave,
+          slaveInfo,
+          pid,
+          checkpointedResources,
+          executorInfos,
+          tasks,
+          frameworks,
+          completedFrameworks,
+          version,
+          agentCapabilities,
+          resourceVersions,
+          lambda::_1));
   }
 }
 
@@ -6707,19 +6635,21 @@ void Master::__reregisterSlave(
     const string& version,
     const vector<SlaveInfo::Capability>& agentCapabilities,
     const vector<ResourceVersionUUID>& resourceVersions,
-    const Future<bool>& readmit)
+    const Future<bool>& future)
 {
   CHECK(slaves.reregistering.contains(slaveInfo.id()));
 
-  if (readmit.isFailed()) {
-    LOG(FATAL) << "Failed to readmit agent " << slaveInfo.id() << " at " << pid
-               << " (" << slaveInfo.hostname() << "): " << readmit.failure();
+  if (future.isFailed()) {
+    LOG(FATAL) << "Failed to update registry for agent " << slaveInfo.id()
+               << " at " << pid << " (" << slaveInfo.hostname() << "): "
+               << future.failure();
   }
 
-  CHECK(!readmit.isDiscarded());
+  CHECK(!future.isDiscarded());
 
-  // `MarkSlaveReachable` registry operation should never fail.
-  CHECK(readmit.get());
+  // Neither the `UpdateSlave` nor `MarkSlaveReachable` registry operations
+  // should ever fail.
+  CHECK(future.get());
 
   VLOG(1) << "Re-admitted agent " << slaveInfo.id() << " at " << pid
           << " (" << slaveInfo.hostname() << ")";
@@ -6896,13 +6826,158 @@ void Master::__reregisterSlave(
     }
   }
 
-  ___reregisterSlave(slave, frameworks);
+  updateSlaveFrameworks(slave, frameworks);
 
   slaves.reregistering.erase(slaveInfo.id());
 }
 
 
 void Master::___reregisterSlave(
+    const SlaveInfo& slaveInfo,
+    const process::UPID& pid,
+    const std::vector<ExecutorInfo>& executorInfos,
+    const std::vector<Task>& tasks,
+    const std::vector<FrameworkInfo>& frameworks,
+    const std::string& version,
+    const std::vector<SlaveInfo::Capability>& agentCapabilities,
+    const vector<ResourceVersionUUID>& resourceVersions,
+    const process::Future<bool>& updated)
+{
+  CHECK(slaves.reregistering.contains(slaveInfo.id()));
+
+  CHECK_READY(updated);
+  CHECK(updated.get());
+
+  VLOG(1) << "Registry updated for slave " << slaveInfo.id() << " at " << pid
+          << "(" << slaveInfo.hostname() << ")";
+
+  if (!slaves.registered.contains(slaveInfo.id())) {
+    LOG(WARNING)
+      << "Dropping ongoing re-registration attempt of slave " << slaveInfo.id()
+      << " at " << pid << "(" << slaveInfo.hostname() << ") "
+      << "because the re-registration timeout was reached.";
+
+    slaves.reregistering.erase(slaveInfo.id());
+    // Don't send a ShutdownMessage here because tasks from partition-aware
+    // frameworks running on this host might still be recovered when the slave
+    // retries the re-registration.
+    return;
+  }
+
+  Slave* slave = slaves.registered.get(slaveInfo.id());
+
+  // Update the slave pid and relink to it.
+  // NOTE: Re-linking the slave here always rather than only when
+  // the slave is disconnected can lead to multiple exited events
+  // in succession for a disconnected slave. As a result, we
+  // ignore duplicate exited events for disconnected slaves.
+  // See: https://issues.apache.org/jira/browse/MESOS-675
+  slave->pid = pid;
+  link(slave->pid);
+
+  Try<Nothing> stateUpdated =
+    slave->update(slaveInfo, version, agentCapabilities, resourceVersions);
+
+  // As of now, the only way `slave->update()` can fail is if the agent sent
+  // different checkpointed resources than it had before. A well-behaving
+  // agent shouldn't do this, so this one is either malicious or buggy. Either
+  // way, we refuse the re-registration attempt.
+  if (stateUpdated.isError()) {
+    LOG(WARNING) << "Refusing re-registration of agent " << slaveInfo.id()
+                 << " at " << pid << " (" << slaveInfo.hostname() << ")"
+                 << " because state update failed: " << stateUpdated.error();
+
+    ShutdownMessage message;
+    message.set_message(stateUpdated.error());
+    send(pid, message);
+
+    slaves.reregistering.erase(slaveInfo.id());
+    return;
+  }
+
+  slave->reregisteredTime = Clock::now();
+
+  allocator->updateSlave(
+    slave->id,
+    slave->info,
+    slave->totalResources,
+    agentCapabilities);
+
+  // Reconcile tasks between master and slave, and send the
+  // `SlaveReregisteredMessage`.
+  reconcileKnownSlave(slave, executorInfos, tasks);
+
+  // If this is a disconnected slave, add it back to the allocator.
+  // This is done after reconciliation to ensure the allocator's
+  // offers include the recovered resources initially on this
+  // slave.
+  if (!slave->connected) {
+    CHECK(slave->reregistrationTimer.isSome());
+    Clock::cancel(slave->reregistrationTimer.get());
+
+    slave->connected = true;
+    dispatch(slave->observer, &SlaveObserver::reconnect);
+
+    slave->active = true;
+    allocator->activateSlave(slave->id);
+  }
+
+  CHECK(slave->active)
+    << "Unexpected connected but deactivated agent " << *slave;
+
+  // Inform the agent of the new framework pids for its tasks, and
+  // recover any unknown frameworks from the slave info.
+  updateSlaveFrameworks(slave, frameworks);
+
+  slaves.reregistering.erase(slaveInfo.id());
+
+  // If the agent is not resource provider capable (legacy agent),
+  // send checkpointed resources to the agent. This is important for
+  // the cases where the master didn't fail over. In that case, the
+  // master might have already applied an operation that the agent
+  // didn't see (e.g., due to a breaking connection). This message
+  // will sync the state between the master and the agent about
+  // checkpointed resources.
+  //
+  // New agents that are resource provider capable will always
+  // update the master with total resources during re-registration.
+  // Therefore, no need to send checkpointed resources to the new
+  // agent in this case.
+  if (!slave->capabilities.resourceProvider) {
+    CheckpointResourcesMessage message;
+
+    message.mutable_resources()->CopyFrom(slave->checkpointedResources);
+
+    if (!slave->capabilities.reservationRefinement) {
+      // If the agent is not refinement-capable, don't send it
+      // checkpointed resources that contain refined reservations. This
+      // might occur if a reservation refinement is created but never
+      // reaches the agent (e.g., due to network partition), and then
+      // the agent is downgraded before the partition heals.
+      //
+      // TODO(neilc): It would probably be better to prevent the agent
+      // from re-registering in this scenario.
+      Try<Nothing> result = downgradeResources(message.mutable_resources());
+      if (result.isError()) {
+        LOG(WARNING) << "Not sending updated checkpointed resouces "
+                     << slave->checkpointedResources
+                     << " with refined reservations, since agent " << *slave
+                     << " is not RESERVATION_REFINEMENT-capable.";
+
+        return;
+      }
+    }
+
+    LOG(INFO) << "Sending updated checkpointed resources "
+              << slave->checkpointedResources
+              << " to agent " << *slave;
+
+    send(slave->pid, message);
+  }
+}
+
+
+void Master::updateSlaveFrameworks(
     Slave* slave,
     const vector<FrameworkInfo>& frameworks)
 {
@@ -7423,7 +7498,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     }
   }
 
-  // Now update the agent's total resources in the allocator.
+  // Now update the agent's state and total resources in the allocator.
   allocator->updateSlave(slaveId, slave->info, slave->totalResources);
 
   // Then rescind outstanding offers affected by the update.
@@ -11349,6 +11424,38 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
   checkpointedResources = totalResources.filter(needCheckpointing);
 }
 
+
+Try<Nothing> Slave::update(
+  const SlaveInfo& _info,
+  const string& _version,
+  const vector<SlaveInfo::Capability>& _capabilities,
+  const vector<ResourceVersionUUID>& _resourceVersions)
+{
+  Try<Resources> resources = applyCheckpointedResources(
+      _info.resources(),
+      checkpointedResources);
+
+  // This should be validated during slave recovery.
+  if (resources.isError()) {
+    return Error(resources.error());
+  }
+
+  version = _version;
+  capabilities = _capabilities;
+  info = _info;
+
+  // There is a short window here where `totalResources` can have an old value,
+  // but it should be relatively short because the agent will send
+  // an `UpdateSlaveMessage` with the new total resources immediately after
+  // re-registering in this case.
+  totalResources = resources.get();
+
+  resourceVersions = protobuf::parseResourceVersions(
+      {_resourceVersions.begin(), _resourceVersions.end()});
+
+  return Nothing();
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index a721131..d42acae 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -178,9 +178,15 @@ struct Slave
 
   void apply(const std::vector<ResourceConversion>& conversions);
 
+  Try<Nothing> update(
+    const SlaveInfo& info,
+    const std::string& _version,
+    const std::vector<SlaveInfo::Capability>& _capabilites,
+    const std::vector<ResourceVersionUUID>& resourceVersions);
+
   Master* const master;
   const SlaveID id;
-  const SlaveInfo info;
+  SlaveInfo info;
 
   const MachineID machineId;
 
@@ -637,6 +643,17 @@ protected:
       const process::Future<bool>& readmit);
 
   void ___reregisterSlave(
+      const SlaveInfo& slaveInfo,
+      const process::UPID& pid,
+      const std::vector<ExecutorInfo>& executorInfos,
+      const std::vector<Task>& tasks,
+      const std::vector<FrameworkInfo>& frameworks,
+      const std::string& version,
+      const std::vector<SlaveInfo::Capability>& agentCapabilities,
+      const std::vector<ResourceVersionUUID>& resourceVersions,
+      const process::Future<bool>& updated);
+
+  void updateSlaveFrameworks(
       Slave* slave,
       const std::vector<FrameworkInfo>& frameworks);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 2fc5f5e..862f468 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -6269,6 +6269,76 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, Metrics)
 }
 
 
+// Tests that the `updateSlave()` function correctly removes all filters
+// for the specified slave when slave attributes are changed on restart.
+TEST_F(HierarchicalAllocatorTest, RemoveFilters)
+{
+  // We put both frameworks into the same role, but we could also
+  // have had separate roles; this should not influence the test.
+  const string ROLE{"role"};
+
+  // Pause the clock because we want to manually drive the allocations.
+  Clock::pause();
+
+  initialize();
+
+  FrameworkInfo framework = createFrameworkInfo({ROLE});
+  allocator->addFramework(framework.id(), framework, {}, true, {});
+
+  SlaveInfo agent = createSlaveInfo("cpus:1;mem:512;disk:0");
+  allocator->addSlave(
+      agent.id(),
+      agent,
+      AGENT_CAPABILITIES(),
+      None(),
+      agent.resources(),
+      {});
+
+  // `framework` will be offered all of `agent` resources
+  // because it is the only framework in the cluster.
+  Allocation expected = Allocation(
+      framework.id(),
+      {{ROLE, {{agent.id(), agent.resources()}}}});
+
+  Future<Allocation> allocation = allocations.get();
+  AWAIT_EXPECT_EQ(expected, allocation);
+
+  // Now `framework` declines the offer and sets a filter.
+  Duration filterTimeout = flags.allocation_interval*2;
+  Filters offerFilter;
+  offerFilter.set_refuse_seconds(filterTimeout.secs());
+
+  allocator->recoverResources(
+      framework.id(),
+      agent.id(),
+      allocation->resources.at(ROLE).at(agent.id()),
+      offerFilter);
+
+  // There should be no allocation due to the offer filter.
+  Clock::advance(flags.allocation_interval);
+  Clock::settle();
+
+  allocation = allocations.get();
+  EXPECT_TRUE(allocation.isPending());
+
+  // Update the slave with new attributes.
+  Attributes attributes = Attributes::parse("foo:bar;baz:quux");
+  *agent.mutable_attributes() = attributes;
+  allocator->updateSlave(
+      agent.id(),
+      agent,
+      agent.resources(),
+      AGENT_CAPABILITIES());
+
+  // Previously declined resources should be offered to the quota'ed role.
+  expected = Allocation(
+      framework.id(),
+      {{ROLE, {{agent.id(), agent.resources()}}}});
+
+  AWAIT_EXPECT_EQ(expected, allocation);
+}
+
+
 // This test uses `reviveOffers` to add allocation-triggering events
 // to the allocator queue in order to measure the impact of allocation
 // batching (MESOS-6904).