You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2017/12/05 21:59:03 UTC

[1/8] mesos git commit: Convert resource format of messages entering master.

Repository: mesos
Updated Branches:
  refs/heads/master 1d24d42ea -> 7b0812e9b


Convert resource format of messages entering master.

Review: https://reviews.apache.org/r/64252/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/114519a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/114519a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/114519a4

Branch: refs/heads/master
Commit: 114519a46366b0e19f1ae0c64dcd2493c51a068a
Parents: 1d24d42
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:55:22 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:55:22 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp              | 48 +++++++++++++++++++++++----------
 src/master/registry_operations.cpp | 22 ++++++++++++---
 2 files changed, 52 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/114519a4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e8257e7..f77a1ed 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6044,6 +6044,20 @@ void Master::registerSlave(
 
   slaves.registering.insert(from);
 
+  // Update all resources passed by the agent to `POST_RESERVATION_REFINEMENT`
+  // format. We do this as early as possible so that we only use a single
+  // format inside master, and downgrade again if necessary when they leave the
+  // master (e.g. when writing to the registry).
+  // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks`
+  // here for consistency.
+  SlaveInfo _slaveInfo(slaveInfo);
+  convertResourceFormat(
+      _slaveInfo.mutable_resources(), POST_RESERVATION_REFINEMENT);
+
+  std::vector<Resource> _checkpointedResources(checkpointedResources);
+  convertResourceFormat(
+      &_checkpointedResources, POST_RESERVATION_REFINEMENT);
+
   // Note that the principal may be empty if authentication is not
   // required. Also it is passed along because it may be removed from
   // `authenticated` while the authorization is pending.
@@ -6052,10 +6066,10 @@ void Master::registerSlave(
   authorizeSlave(principal)
     .onAny(defer(self(),
                  &Self::_registerSlave,
-                 slaveInfo,
+                 _slaveInfo,
                  from,
                  principal,
-                 checkpointedResources,
+                 _checkpointedResources,
                  version,
                  agentCapabilities,
                  resourceVersions,
@@ -6361,6 +6375,20 @@ void Master::reregisterSlave(
   Option<Error> error = validation::master::message::reregisterSlave(
       slaveInfo, tasks, checkpointedResources, executorInfos, frameworks);
 
+  // Update all resources passed by the agent to `POST_RESERVATION_REFINEMENT`
+  // format. We do this as early as possible so that we only use a single
+  // format inside master, and downgrade again if necessary when they leave the
+  // master (e.g. when writing to the registry).
+  // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks`
+  // here for consistency.
+  SlaveInfo _slaveInfo(slaveInfo);
+  convertResourceFormat(
+      _slaveInfo.mutable_resources(), POST_RESERVATION_REFINEMENT);
+
+  std::vector<Resource> _checkpointedResources(checkpointedResources);
+  convertResourceFormat(
+      &_checkpointedResources, POST_RESERVATION_REFINEMENT);
+
   if (error.isSome()) {
     LOG(WARNING) << "Dropping re-registration of agent at " << from
                  << " because it sent an invalid re-registration: "
@@ -6382,10 +6410,10 @@ void Master::reregisterSlave(
   authorizeSlave(principal)
     .onAny(defer(self(),
                  &Self::_reregisterSlave,
-                 slaveInfo,
+                 _slaveInfo,
                  from,
                  principal,
-                 checkpointedResources,
+                 _checkpointedResources,
                  executorInfos,
                  tasks,
                  frameworks,
@@ -11024,11 +11052,7 @@ Slave::Slave(
     const vector<Task>& tasks)
   : master(_master),
     id(_info.id()),
-    info([&_info]() {
-      convertResourceFormat(
-          _info.mutable_resources(), POST_RESERVATION_REFINEMENT);
-      return _info;
-    }()),
+    info(_info),
     machineId(_machineId),
     pid(_pid),
     version(_version),
@@ -11036,11 +11060,7 @@ Slave::Slave(
     registeredTime(_registeredTime),
     connected(true),
     active(true),
-    checkpointedResources([&_checkpointedResources]() {
-      convertResourceFormat(
-          &_checkpointedResources, POST_RESERVATION_REFINEMENT);
-      return _checkpointedResources;
-    }()),
+    checkpointedResources(_checkpointedResources),
     observer(nullptr),
     resourceVersions(_resourceVersions)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/114519a4/src/master/registry_operations.cpp
----------------------------------------------------------------------
diff --git a/src/master/registry_operations.cpp b/src/master/registry_operations.cpp
index 1e1eadb..f9c2162 100644
--- a/src/master/registry_operations.cpp
+++ b/src/master/registry_operations.cpp
@@ -16,6 +16,8 @@
 
 #include "master/registry_operations.hpp"
 
+#include "common/resources_utils.hpp"
+
 namespace mesos {
 namespace internal {
 namespace master {
@@ -36,9 +38,15 @@ Try<bool> AdmitSlave::perform(Registry* registry, hashset<SlaveID>* slaveIDs)
     return Error("Agent already admitted");
   }
 
+  // Convert the resource format back to `PRE_RESERVATION_REFINEMENT` so
+  // the data stored in the registry can be read by older master versions.
+  SlaveInfo _info(info);
+  convertResourceFormat(_info.mutable_resources(),
+      PRE_RESERVATION_REFINEMENT);
+
   Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
-  slave->mutable_info()->CopyFrom(info);
-  slaveIDs->insert(info.id());
+  slave->mutable_info()->CopyFrom(_info);
+  slaveIDs->insert(_info.id());
   return true; // Mutation.
 }
 
@@ -133,13 +141,19 @@ Try<bool> MarkSlaveReachable::perform(
     LOG(WARNING) << "Allowing UNKNOWN agent to reregister: " << info;
   }
 
+  // Convert the resource format back to `PRE_RESERVATION_REFINEMENT` so
+  // the data stored in the registry can be read by older master versions.
+  SlaveInfo _info(info);
+  convertResourceFormat(_info.mutable_resources(),
+    PRE_RESERVATION_REFINEMENT);
+
   // Add the slave to the admitted list, even if we didn't find it
   // in the unreachable list. This accounts for when the slave was
   // unreachable for a long time, was GC'd from the unreachable
   // list, but then eventually reregistered.
   Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
-  slave->mutable_info()->CopyFrom(info);
-  slaveIDs->insert(info.id());
+  slave->mutable_info()->CopyFrom(_info);
+  slaveIDs->insert(_info.id());
 
   return true; // Mutation.
 }


[2/8] mesos git commit: Added new UpdateSlave registry operation.

Posted by vi...@apache.org.
Added new UpdateSlave registry operation.

This operation can be used to update the stored state
of an existing, admitted slave.

Review: https://reviews.apache.org/r/64009/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/054bd5ab
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/054bd5ab
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/054bd5ab

Branch: refs/heads/master
Commit: 054bd5abc6b9788553cd5beb285bd46bbe2dec0d
Parents: 114519a
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:55:30 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:55:30 2017 -0800

----------------------------------------------------------------------
 src/master/registry_operations.cpp | 44 ++++++++++++++++++++++++++++
 src/master/registry_operations.hpp | 14 +++++++++
 src/tests/registrar_tests.cpp      | 51 +++++++++++++++++++++++++++++++++
 3 files changed, 109 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/054bd5ab/src/master/registry_operations.cpp
----------------------------------------------------------------------
diff --git a/src/master/registry_operations.cpp b/src/master/registry_operations.cpp
index f9c2162..149009b 100644
--- a/src/master/registry_operations.cpp
+++ b/src/master/registry_operations.cpp
@@ -51,6 +51,50 @@ Try<bool> AdmitSlave::perform(Registry* registry, hashset<SlaveID>* slaveIDs)
 }
 
 
+UpdateSlave::UpdateSlave(const SlaveInfo& _info) : info(_info)
+{
+  CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
+}
+
+
+Try<bool> UpdateSlave::perform(Registry* registry, hashset<SlaveID>* slaveIDs)
+{
+  if (!slaveIDs->contains(info.id())) {
+    return Error("Agent not yet admitted.");
+  }
+
+  for (int i = 0; i < registry->slaves().slaves().size(); i++) {
+    Registry::Slave* slave = registry->mutable_slaves()->mutable_slaves(i);
+
+    if (slave->info().id() == info.id()) {
+      // The SlaveInfo in the registry is stored in the
+      // `PRE_RESERVATION_REFINEMENT` format, but the equality operator
+      // asserts that resources are in `POST_RESERVATION_REFINEMENT` format,
+      // so we have to upgrade before we can do the comparison.
+      SlaveInfo _previousInfo(slave->info());
+      convertResourceFormat(_previousInfo.mutable_resources(),
+          POST_RESERVATION_REFINEMENT);
+
+      if (info == _previousInfo) {
+        return false; // No mutation.
+      }
+
+      // Convert the resource format back to `PRE_RESERVATION_REFINEMENT` so
+      // the data stored in the registry can be read by older master versions.
+      SlaveInfo _info(info);
+      convertResourceFormat(_info.mutable_resources(),
+          PRE_RESERVATION_REFINEMENT);
+
+      slave->mutable_info()->CopyFrom(_info);
+      return true; // Mutation.
+    }
+  }
+
+  // Shouldn't happen
+  return Error("Failed to find agent " + stringify(info.id()));
+}
+
+
 // Move a slave from the list of admitted slaves to the list of
 // unreachable slaves.
 MarkSlaveUnreachable::MarkSlaveUnreachable(

http://git-wip-us.apache.org/repos/asf/mesos/blob/054bd5ab/src/master/registry_operations.hpp
----------------------------------------------------------------------
diff --git a/src/master/registry_operations.hpp b/src/master/registry_operations.hpp
index 5b78306..06f68a3 100644
--- a/src/master/registry_operations.hpp
+++ b/src/master/registry_operations.hpp
@@ -41,6 +41,20 @@ private:
 };
 
 
+// Update the SlaveInfo of an existing admitted slave.
+class UpdateSlave : public Operation
+{
+public:
+  explicit UpdateSlave(const SlaveInfo& _info);
+
+protected:
+  virtual Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs);
+
+private:
+  const SlaveInfo info;
+};
+
+
 // Move a slave from the list of admitted slaves to the list of
 // unreachable slaves.
 class MarkSlaveUnreachable : public Operation

http://git-wip-us.apache.org/repos/asf/mesos/blob/054bd5ab/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 21d78e9..f93129f 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -265,6 +265,57 @@ TEST_F(RegistrarTest, Admit)
 }
 
 
+TEST_F(RegistrarTest, UpdateSlave)
+{
+  // Add a new slave to the registry.
+  {
+    Registrar registrar(flags, state);
+    AWAIT_READY(registrar.recover(master));
+
+    slave.set_hostname("original");
+    AWAIT_TRUE(
+        registrar.apply(Owned<Operation>(new AdmitSlave(slave))));
+  }
+
+
+  // Verify that the slave is present, and update its hostname.
+  {
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    EXPECT_EQ("original", registry->slaves().slaves(0).info().hostname());
+
+    slave.set_hostname("changed");
+    AWAIT_TRUE(registrar.apply(Owned<Operation>(new UpdateSlave(slave))));
+  }
+
+  // Verify that the hostname indeed changed, and do one additional update
+  // to check that the operation is idempotent.
+  {
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    EXPECT_EQ("changed", registry->slaves().slaves(0).info().hostname());
+
+    AWAIT_TRUE(registrar.apply(Owned<Operation>(new UpdateSlave(slave))));
+  }
+
+  // Verify that nothing changed from the second update.
+  {
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    EXPECT_EQ("changed", registry->slaves().slaves(0).info().hostname());
+  }
+}
+
+
 TEST_F(RegistrarTest, MarkReachable)
 {
   Registrar registrar(flags, state);


[7/8] mesos git commit: Check if slave is marked gone during reregistration.

Posted by vi...@apache.org.
Check if slave is marked gone during reregistration.

This commit adds checks that test whether a
re-registering agent is being marked gone or has been
marked gone during the re-registration attempt.

Review: https://reviews.apache.org/r/64169/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b612e9d5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b612e9d5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b612e9d5

Branch: refs/heads/master
Commit: b612e9d511daa2a5f99ef984c7eb9aeeda79c4db
Parents: 63e3336
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:56:14 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:58:52 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 54 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b612e9d5/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6a52aad..f9740e0 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6472,6 +6472,24 @@ void Master::_reregisterSlave(
     return;
   }
 
+  if (slaves.markingGone.contains(slaveInfo.id())) {
+    LOG(INFO)
+      << "Ignoring re-register agent message from agent "
+      << slaveInfo.id() << " at " << pid << " ("
+      << slaveInfo.hostname() << ") as a gone operation is already in progress";
+    return;
+  }
+
+  if (slaves.gone.contains(slaveInfo.id())) {
+    LOG(WARNING) << "Refusing re-registration of agent at " << pid
+                 << " because it is already marked gone";
+
+    ShutdownMessage message;
+    message.set_message("Agent has been marked gone");
+    send(pid, message);
+    return;
+  }
+
   VLOG(1) << "Authorized re-registration of agent " << slaveInfo.id()
           << " at " << pid << " (" << slaveInfo.hostname() << ")";
 
@@ -6651,6 +6669,24 @@ void Master::__reregisterSlave(
   // should ever fail.
   CHECK(future.get());
 
+  if (slaves.markingGone.contains(slaveInfo.id())) {
+    LOG(INFO)
+      << "Ignoring re-register agent message from agent "
+      << slaveInfo.id() << " at " << pid << " ("
+      << slaveInfo.hostname() << ") as a gone operation is already in progress";
+    return;
+  }
+
+  if (slaves.gone.contains(slaveInfo.id())) {
+    LOG(WARNING) << "Refusing re-registration of agent at " << pid
+                 << " because it is already marked gone";
+
+    ShutdownMessage message;
+    message.set_message("Agent has been marked gone");
+    send(pid, message);
+    return;
+  }
+
   VLOG(1) << "Re-admitted agent " << slaveInfo.id() << " at " << pid
           << " (" << slaveInfo.hostname() << ")";
 
@@ -6851,6 +6887,24 @@ void Master::___reregisterSlave(
   VLOG(1) << "Registry updated for slave " << slaveInfo.id() << " at " << pid
           << "(" << slaveInfo.hostname() << ")";
 
+  if (slaves.markingGone.contains(slaveInfo.id())) {
+    LOG(INFO)
+      << "Ignoring re-register agent message from agent "
+      << slaveInfo.id() << " at " << pid << " ("
+      << slaveInfo.hostname() << ") as a gone operation is already in progress";
+    return;
+  }
+
+  if (slaves.gone.contains(slaveInfo.id())) {
+    LOG(WARNING) << "Refusing re-registration of agent at " << pid
+                 << " because it is already marked gone";
+
+    ShutdownMessage message;
+    message.set_message("Agent has been marked gone");
+    send(pid, message);
+    return;
+  }
+
   if (!slaves.registered.contains(slaveInfo.id())) {
     LOG(WARNING)
       << "Dropping ongoing re-registration attempt of slave " << slaveInfo.id()


[6/8] mesos git commit: Added new `--reconfiguration_policy` slave flag and implementation.

Posted by vi...@apache.org.
Added new `--reconfiguration_policy` slave flag and implementation.

This flag allows operators to select a set of permitted
configuration changes that the slave will tolerate during
recovery while still recovering running tasks and keeping
the same agent id.

The previous behaviour of Mesos is reproduced exactly by setting
this flag to "equal". For now only one additional policy is
provided, "additive".

Review: https://reviews.apache.org/r/64012/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/63e3336b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/63e3336b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/63e3336b

Branch: refs/heads/master
Commit: 63e3336b508f5fe3936ed701404cde7d70eee754
Parents: f4d2c8f
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:56:00 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:58:39 2017 -0800

----------------------------------------------------------------------
 src/CMakeLists.txt                      |   1 +
 src/Makefile.am                         |   2 +
 src/slave/compatibility.cpp             | 207 +++++++++++++++++++++
 src/slave/compatibility.hpp             |  66 +++++++
 src/slave/flags.cpp                     |  14 ++
 src/slave/flags.hpp                     |   1 +
 src/slave/slave.cpp                     | 119 +++++++++----
 src/slave/slave.hpp                     |   9 +
 src/tests/CMakeLists.txt                |   1 +
 src/tests/master_tests.cpp              | 257 ++++++++++++++++++++++++++-
 src/tests/slave_compatibility_tests.cpp | 175 ++++++++++++++++++
 src/tests/slave_recovery_tests.cpp      | 135 ++++++++++++++
 src/tests/slave_tests.cpp               |  98 ++++++++++
 13 files changed, 1052 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 592489d..a76ba1e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -246,6 +246,7 @@ configure_file(
 # SOURCE FILES FOR THE MESOS LIBRARY.
 #####################################
 set(AGENT_SRC
+  slave/compatibility.cpp
   slave/constants.cpp
   slave/container_daemon.cpp
   slave/container_logger.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index d5ca797..05e8b95 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1015,6 +1015,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   sched/sched.cpp							\
   scheduler/scheduler.cpp						\
   secret/resolver.cpp							\
+  slave/compatibility.cpp						\
   slave/constants.cpp							\
   slave/container_daemon.cpp						\
   slave/container_logger.cpp						\
@@ -2494,6 +2495,7 @@ mesos_tests_SOURCES =						\
   tests/scheduler_tests.cpp					\
   tests/script.cpp						\
   tests/slave_authorization_tests.cpp				\
+  tests/slave_compatibility_tests.cpp				\
   tests/slave_recovery_tests.cpp				\
   tests/slave_validation_tests.cpp				\
   tests/slave_tests.cpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/slave/compatibility.cpp
----------------------------------------------------------------------
diff --git a/src/slave/compatibility.cpp b/src/slave/compatibility.cpp
new file mode 100644
index 0000000..4ead4a5
--- /dev/null
+++ b/src/slave/compatibility.cpp
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "slave/compatibility.hpp"
+
+#include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
+
+#include <mesos/values.hpp>
+
+#include "mesos/type_utils.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace compatibility {
+
+// TODO(bevers): Compare the SlaveInfo fields individually, in order to be
+// able to generate better error messages.
+Try<Nothing> equal(
+    const SlaveInfo& previous,
+    const SlaveInfo& current)
+{
+  if (previous == current) {
+    return Nothing();
+  }
+
+  return Error(strings::join(
+      "\n",
+      "Incompatible agent info detected. ",
+      "\n------------------------------------------------------------",
+      "Old agent info:\n" + stringify(previous),
+      "\n------------------------------------------------------------",
+      "New agent info:\n" + stringify(current),
+      "\n------------------------------------------------------------"));
+}
+
+
+// T is instantiated below as either `Resource` or `Attribute`.
+template<typename T>
+Try<T> getMatchingValue(
+  const T& previous,
+  const google::protobuf::RepeatedPtrField<T>& values)
+{
+  auto match = std::find_if(
+      values.begin(),
+      values.end(),
+      [&previous](const T& value) {
+        return previous.name() == value.name();
+      });
+
+  if (match == values.end()) {
+    return Error("Couldn't find '" + previous.name() + "'");
+  }
+
+  if (match->type() != previous.type()) {
+    return Error(
+        "Type of '" + previous.name() + "' changed from " +
+        stringify(previous.type()) + " to " + stringify(match->type()));
+  }
+
+  return *match;
+}
+
+
+Try<Nothing> additive(
+    const SlaveInfo& previous,
+    const SlaveInfo& current)
+{
+  if (previous.hostname() != current.hostname()) {
+    return Error(
+        "Configuration change not permitted under `additive` policy: "
+        "Hostname changed from " +
+        previous.hostname() + " to " + current.hostname());
+  }
+
+  if (previous.port() != current.port()) {
+    return Error(
+        "Configuration change not permitted under `additive` policy: "
+        "Port changed from " + stringify(previous.port()) + " to " +
+        stringify(current.port()));
+  }
+
+  if (previous.has_domain() && !(previous.domain() == current.domain())) {
+    return Error(
+        "Configuration change not permitted under `additive` policy: "
+        "Domain changed from " + stringify(previous.domain()) + " to " +
+        stringify(current.domain()));
+  }
+
+  // TODO(bennoe): We should probably check `resources.size()` and switch to a
+  // smarter algorithm for the matching when its bigger than, say, 20.
+  for (const Resource& resource : previous.resources()) {
+    Try<Resource> match =
+      getMatchingValue(resource, current.resources());
+
+    if (match.isError()) {
+      return Error(
+          "Configuration change not permitted under 'additive' policy: " +
+          match.error());
+    }
+
+    switch (resource.type()) {
+      case Value::SCALAR: {
+        if (!(resource.scalar() <= match->scalar())) {
+          return Error(
+              "Configuration change not permitted under 'additive' policy: "
+              "Value of scalar resource '" + resource.name() + "' decreased "
+              "from " + stringify(resource.scalar()) + " to " +
+              stringify(match->scalar()));
+        }
+        continue;
+      }
+      case Value::RANGES: {
+        if (!(resource.ranges() <= match->ranges())) {
+          return Error(
+              "Configuration change not permitted under 'additive' policy: "
+              "Previous value of range resource '" + resource.name() + "' (" +
+              stringify(resource.ranges()) + ") not included in current " +
+              stringify(match->ranges()));
+        }
+        continue;
+      }
+      case Value::SET: {
+        if (!(resource.set() <= match->set())) {
+          return Error(
+              "Configuration change not permitted under 'additive' policy: "
+              "Previous value of set resource '" + resource.name() + "' (" +
+              stringify(resource.set()) + ") not included in current " +
+              stringify(match->set()));
+        }
+        continue;
+      }
+      case Value::TEXT: {
+        // Text resources are not supported.
+        UNREACHABLE();
+      }
+    }
+  }
+
+  for (const Attribute& attribute : previous.attributes()) {
+    Try<Attribute> match =
+      getMatchingValue(attribute, current.attributes());
+
+    if (match.isError()) {
+      return Error(
+          "Configuration change not permitted under 'additive' policy: " +
+          match.error());
+    }
+
+    switch (attribute.type()) {
+      case Value::SCALAR: {
+        if (!(attribute.scalar() == match->scalar())) {
+          return Error(
+              "Configuration change not permitted under 'additive' policy: "
+              "Value of scalar attribute '" + attribute.name() + "' changed "
+              "from " + stringify(attribute.scalar()) + " to " +
+              stringify(match->scalar()));
+        }
+        continue;
+      }
+      case Value::RANGES: {
+        if (!(attribute.ranges() <= match->ranges())) {
+          return Error(
+              "Previous value of ranges resource '" + attribute.name() + "' (" +
+              stringify(attribute.ranges()) + ") not included in current " +
+              stringify(match->ranges()));
+        }
+        continue;
+      }
+      case Value::TEXT: {
+        if (!(attribute.text() == match->text())) {
+          return Error(
+              "Configuration change not permitted under 'additive' policy: "
+              "Value of text attribute '" + attribute.name() + "' changed "
+              "from " + stringify(attribute.text()) +
+              " to " + stringify(match->text()));
+        }
+        continue;
+      }
+      case Value::SET: {
+        // Set attributes are not supported.
+        UNREACHABLE();
+      }
+    }
+  }
+
+  return Nothing();
+}
+
+} // namespace compatibility {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/slave/compatibility.hpp
----------------------------------------------------------------------
diff --git a/src/slave/compatibility.hpp b/src/slave/compatibility.hpp
new file mode 100644
index 0000000..78b421a
--- /dev/null
+++ b/src/slave/compatibility.hpp
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __SLAVE_COMPATIBILITY_HPP__
+#define __SLAVE_COMPATIBILITY_HPP__
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+#include <mesos/mesos.pb.h>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace compatibility {
+
+// This function checks whether `previous` and `current` are considered to be
+// "equal", i.e., all fields in `SlaveInfo` are the same.
+Try<Nothing> equal(
+    const SlaveInfo& previous,
+    const SlaveInfo& current);
+
+
+// This function checks whether the changes between `previous` and `current`
+// are considered to be "additive", according to the rules in the following
+// table:
+//
+// Field      | Constraint
+// -----------------------------------------------------------------------------
+// hostname   | Must match exactly.
+// port       | Must match exactly.
+// domain     | Must either match exactly or change from not configured to
+//            | configured.
+// resources  | All previous resources must be present with the same type.
+//            | For type SCALAR: The new value must be not smaller than the old.
+//            | For type RANGE:  The new value must include the old ranges.
+//            | For type SET:    The new value must be a superset of the old.
+//            | New resources are permitted.
+// attributes | All previous attributes must be present with the same type.
+//            | For type SCALAR: The new value must be not smaller than the old.
+//            | For type RANGE:  The new value must include the old ranges.
+//            | For type TEXT:   The new value must exactly match the previous.
+//            | New attributes are permitted.
+Try<Nothing> additive(
+    const SlaveInfo& previous,
+    const SlaveInfo& current);
+
+} // namespace compatibility {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_COMPATIBILITY_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 34edf4c..d876474 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -459,6 +459,20 @@ mesos::internal::slave::Flags::Flags()
       "sucessfully reconnect to the framework.",
       RECOVERY_TIMEOUT);
 
+  add(&Flags::reconfiguration_policy,
+      "reconfiguration_policy",
+      "This flag controls which agent configuration changes are considered\n"
+      "acceptable when recovering the previous agent state. Possible values:\n"
+      "equal:    Require that the old and the new state match exactly.\n"
+      "additive: Require that the new state is a superset of the old state:\n"
+      "          it is permitted to add additional resources, attributes\n"
+      "          and domains but not to remove existing ones.\n"
+      "Note that this only affects the checking done on the agent itself,\n"
+      "the master may still reject the slave if it detects a change that it\n"
+      "considers unacceptable, which currently happens when port or hostname\n"
+      "are changed.",
+      "equal");
+
   add(&Flags::strict,
       "strict",
       "If `strict=true`, any and all recovery errors are considered fatal.\n"

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index f25d8af..f84ba5a 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -90,6 +90,7 @@ public:
 
   Option<std::string> container_logger;
 
+  std::string reconfiguration_policy;
   std::string recover;
   Duration recovery_timeout;
   bool strict;

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c07e25f..4927001 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -90,8 +90,11 @@
 
 #include "logging/logging.hpp"
 
+#include "master/detector/standalone.hpp"
+
 #include "module/manager.hpp"
 
+#include "slave/compatibility.hpp"
 #include "slave/constants.hpp"
 #include "slave/flags.hpp"
 #include "slave/paths.hpp"
@@ -842,6 +845,15 @@ void Slave::initialize()
     }
   }
 
+  // Check that the reconfiguration_policy flag is valid.
+  if (flags.reconfiguration_policy != "equal" &&
+      flags.reconfiguration_policy != "additive") {
+    EXIT(EXIT_FAILURE)
+      << "Unknown option for 'reconfiguration_policy' flag "
+      << flags.reconfiguration_policy << "."
+      << " Please run the agent with '--help' to see the valid options.";
+  }
+
   // Check that the recover flag is valid.
   if (flags.recover != "reconnect" && flags.recover != "cleanup") {
     EXIT(EXIT_FAILURE)
@@ -1010,6 +1022,26 @@ void Slave::detected(const Future<Option<MasterInfo>>& _master)
       return;
     }
 
+    if (requiredMasterCapabilities.agentUpdate) {
+      protobuf::master::Capabilities masterCapabilities(
+          latest->capabilities());
+
+      if (!masterCapabilities.agentUpdate) {
+        EXIT(EXIT_FAILURE) <<
+          "Agent state changed on restart, but the detected master lacks the "
+          "AGENT_UPDATE capability. Refusing to connect.";
+        return;
+      }
+
+      if (dynamic_cast<mesos::master::detector::StandaloneMasterDetector*>(
+          detector)) {
+        LOG(WARNING) <<
+          "The AGENT_UPDATE master capability is required, "
+          "but the StandaloneMasterDetector does not have the ability to read "
+          "master capabilities.";
+      }
+    }
+
     // Wait for a random amount of time before authentication or
     // registration.
     Duration duration =
@@ -6143,6 +6175,25 @@ void Slave::_checkDiskUsage(const Future<double>& usage)
 }
 
 
+Try<Nothing> Slave::compatible(
+  const SlaveInfo& previous,
+  const SlaveInfo& current) const
+{
+  // TODO(vinod): Also check for version compatibility.
+
+  if (flags.reconfiguration_policy == "equal") {
+    return compatibility::equal(previous, current);
+  }
+
+  if (flags.reconfiguration_policy == "additive") {
+    return compatibility::additive(previous, current);
+  }
+
+  // Should have been validated during startup.
+  UNREACHABLE();
+}
+
+
 Future<Nothing> Slave::recover(const Try<state::State>& state)
 {
   if (state.isError()) {
@@ -6303,43 +6354,53 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
       metrics.recovery_errors += slaveState->errors;
     }
 
+    // Save the previous id into the current `SlaveInfo`, so we can compare
+    // both of them for equality. This is safe because if it turned out that
+    // we can not reuse the id, we will either crash or erase it again.
+    info.mutable_id()->CopyFrom(slaveState->info->id());
+
     // Check for SlaveInfo compatibility.
-    // TODO(vinod): Also check for version compatibility.
-
-    SlaveInfo _info(info);
-    _info.mutable_id()->CopyFrom(slaveState->id);
-    if (flags.recover == "reconnect" &&
-        !(_info == slaveState->info.get())) {
-      string message = strings::join(
-          "\n",
-          "Incompatible agent info detected.",
-          "------------------------------------------------------------",
-          "Old agent info:\n" + stringify(slaveState->info.get()),
-          "------------------------------------------------------------",
-          "New agent info:\n" + stringify(info),
-          "------------------------------------------------------------");
-
-      // Fail the recovery unless the agent is recovering for the first
-      // time after host reboot.
-      //
+    Try<Nothing> _compatible =
+      compatible(slaveState->info.get(), info);
+
+    if (_compatible.isSome()) {
+      // Permitted change, so we reuse the recovered agent id and reconnect
+      // to running executors.
+
+      // Prior to Mesos 1.5, the master expected that an agent would never
+      // change its `SlaveInfo` and keep the same slave id, and therefore would
+      // not update it's internal data structures on agent re-registration.
+      if (!(slaveState->info.get() == info)) {
+        requiredMasterCapabilities.agentUpdate = true;
+      }
+
+      // Start the local resource providers daemon once we have the slave id.
+      localResourceProviderDaemon->start(info.id());
+
+      // Recover the frameworks.
+      foreachvalue (const FrameworkState& frameworkState,
+                    slaveState->frameworks) {
+        recoverFramework(frameworkState, injectedExecutors, injectedTasks);
+      }
+    } else if (state->rebooted) {
       // Prior to Mesos 1.4 we directly bypass the state recovery and
       // start as a new agent upon reboot (introduced in MESOS-844).
       // This unncessarily discards the existing agent ID (MESOS-6223).
       // Starting in Mesos 1.4 we'll attempt to recover the slave state
-      // even after reboot but in case of slave info mismatch we'll fall
-      // back to recovering as a new agent (existing behavior). This
-      // prevents the agent from flapping if the slave info (resources,
+      // even after reboot but in case of an incompatible slave info change
+      // we'll fall back to recovering as a new agent (existing behavior).
+      // Prior to Mesos 1.5, an incompatible change would be any slave info
+      // mismatch.
+      // This prevents the agent from flapping if the slave info (resources,
       // attributes, etc.) change is due to host maintenance associated
       // with the reboot.
-      if (!state->rebooted) {
-        return Failure(message);
-      }
 
       LOG(WARNING) << "Falling back to recover as a new agent due to error: "
-                   << message;
+                   << _compatible.error();
 
       // Cleaning up the slave state to avoid any state recovery for the
       // old agent.
+      info.clear_id();
       slaveState = None();
 
       // Remove the "latest" symlink if it exists to "checkpoint" the
@@ -6350,13 +6411,7 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
           << "Failed to remove latest symlink '" << latest << "'";
       }
     } else {
-      info = slaveState->info.get(); // Recover the slave info.
-
-      // Recover the frameworks.
-      foreachvalue (const FrameworkState& frameworkState,
-                    slaveState->frameworks) {
-        recoverFramework(frameworkState, injectedExecutors, injectedTasks);
-      }
+      return Failure(_compatible.error());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 643d855..06afd52 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -588,6 +588,15 @@ private:
   double _resources_revocable_used(const std::string& name);
   double _resources_revocable_percent(const std::string& name);
 
+  // Checks whether the two `SlaveInfo` objects are considered
+  // compatible based on the value of the `--configuration_policy`
+  // flag.
+  Try<Nothing> compatible(
+      const SlaveInfo& previous,
+      const SlaveInfo& current) const;
+
+  protobuf::master::Capabilities requiredMasterCapabilities;
+
   const Flags flags;
 
   const Http http;

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 5e52020..b74fbb9 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -115,6 +115,7 @@ set(MESOS_TESTS_SRC
   scheduler_http_api_tests.cpp
   scheduler_tests.cpp
   slave_authorization_tests.cpp
+  slave_compatibility_tests.cpp
   slave_tests.cpp
   slave_validation_tests.cpp
   sorter_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 64ac2d5..0dfa8c7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2655,6 +2655,261 @@ TEST_F(MasterTest, SlavesEndpointQuerySlave)
 }
 
 
+// Tests that the master correctly updates the slave info on
+// slave re-registration.
+TEST_F(MasterTest, RegistryUpdateAfterReconfiguration)
+{
+  // Start a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry = "replicated_log";
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Start a slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  // Reuse slaveFlags so both StartSlave() use the same work_dir.
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
+  slaveFlags.resources = "cpus:100";
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Restart slave with changed resources.
+  slave.get()->terminate();
+  slave->reset();
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get()->pid, _);
+
+  slaveFlags.reconfiguration_policy = "additive";
+  slaveFlags.resources = "cpus:200";
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Verify master has correctly updated the slave state.
+  {
+    string slaveId = slaveReregisteredMessage->slave_id().value();
+
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "slaves?slave_id=" + slaveId,
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    const Try<JSON::Value> value = JSON::parse<JSON::Value>(response->body);
+
+    ASSERT_SOME(value);
+
+    Try<JSON::Object> object = value->as<JSON::Object>();
+
+    Result<JSON::Array> array = object->find<JSON::Array>("slaves");
+    ASSERT_SOME(array);
+    EXPECT_EQ(1u, array->values.size());
+
+    Try<JSON::Value> expected = JSON::parse(
+        "{"
+          "\"slaves\":"
+            "[{"
+                "\"id\":\"" + slaveId + "\","
+                "\"resources\": {\"cpus\": 200}"
+            "}]"
+        "}");
+
+    ASSERT_SOME(expected);
+
+    EXPECT_TRUE(value->contains(expected.get()));
+  }
+
+  // Verify master has correctly updated the registry.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->registrar.get()->pid(),
+        "registry",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    const Try<JSON::Value> value = JSON::parse<JSON::Value>(response->body);
+
+    ASSERT_SOME(value);
+
+    Result<JSON::Array> array = value->as<JSON::Object>()
+      .find<JSON::Object>("slaves")->find<JSON::Array>("slaves");
+
+    ASSERT_SOME(array);
+    ASSERT_EQ(1u, array->values.size());
+
+    Result<JSON::Object> info =
+      array->values.at(0).as<JSON::Object>().find<JSON::Object>("info");
+    ASSERT_SOME(info);
+
+    string slaveId = slaveReregisteredMessage->slave_id().value();
+    Result<JSON::String> id =
+      info->find<JSON::Object>("id")->at<JSON::String>("value");
+    ASSERT_SOME(id);
+    ASSERT_EQ(id->value, slaveId);
+
+    Result<JSON::Array> resources = info->find<JSON::Array>("resources");
+    ASSERT_SOME(resources);
+
+    JSON::Value expectedCpu = JSON::parse(
+      "{\"name\": \"cpus\","
+      " \"scalar\": {\"value\": 200.0},"
+      " \"type\":\"SCALAR\"}").get();
+
+    bool found = std::any_of(resources->values.begin(), resources->values.end(),
+        [&](const JSON::Value& value) { return value.contains(expectedCpu); });
+
+    EXPECT_TRUE(found);
+  }
+}
+
+
+// Tests that the master correctly updates the slave state on
+// slave re-registration after master failover. This  is almost an exact
+// duplicate of the `MasterTest.RegistryUpdateAfterReconfiguration` above,
+// except that we shutdown the master prior to reconfiguring the slave.
+TEST_F(MasterTest, RegistryUpdateAfterMasterFailover)
+{
+  // Start a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry = "replicated_log";
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Start a slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  // Reuse `slaveFlags` so both `StartSlave()` calls use the same `work_dir`.
+  slave::Flags slaveFlags = this->CreateSlaveFlags();
+  slaveFlags.resources = "cpus:100";
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Shutdown the master.
+  master->reset();
+
+  // Shutdown the slave.
+  slave.get()->terminate();
+  slave->reset();
+
+  // Restart the master.
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+  detector = master.get()->createDetector();
+
+  // Restart the slave with changed resources.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get()->pid, _);
+
+  slaveFlags.reconfiguration_policy = "additive";
+  slaveFlags.resources = "cpus:200";
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Verify master has correctly updated the slave info.
+  {
+    string slaveId = slaveReregisteredMessage->slave_id().value();
+
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "slaves?slave_id=" + slaveId,
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    const Try<JSON::Value> value = JSON::parse<JSON::Value>(response->body);
+
+    ASSERT_SOME(value);
+
+    Try<JSON::Object> object = value->as<JSON::Object>();
+
+    Result<JSON::Array> array = object->find<JSON::Array>("slaves");
+    ASSERT_SOME(array);
+    EXPECT_EQ(1u, array->values.size());
+
+    Try<JSON::Value> expected = JSON::parse(
+        "{"
+          "\"slaves\":"
+            "[{"
+                "\"id\":\"" + slaveId + "\","
+                "\"resources\": {\"cpus\": 200}"
+            "}]"
+        "}");
+
+    ASSERT_SOME(expected);
+
+    EXPECT_TRUE(value->contains(expected.get()));
+  }
+
+  // Verify master has correctly updated the registry.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->registrar.get()->pid(),
+        "registry",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    const Try<JSON::Value> value = JSON::parse<JSON::Value>(response->body);
+
+    ASSERT_SOME(value);
+
+    Result<JSON::Array> array = value->as<JSON::Object>()
+      .find<JSON::Object>("slaves")->find<JSON::Array>("slaves");
+
+    ASSERT_SOME(array);
+    ASSERT_EQ(1u, array->values.size());
+
+    Result<JSON::Object> info =
+      array->values.at(0).as<JSON::Object>().find<JSON::Object>("info");
+    ASSERT_SOME(info);
+
+    string slaveId = slaveReregisteredMessage->slave_id().value();
+    Result<JSON::String> id =
+      info->find<JSON::Object>("id")->at<JSON::String>("value");
+    ASSERT_SOME(id);
+    ASSERT_EQ(id->value, slaveId);
+
+    Result<JSON::Array> resources = info->find<JSON::Array>("resources");
+    ASSERT_SOME(resources);
+
+    JSON::Value expectedCpu = JSON::parse(
+      "{\"name\": \"cpus\","
+      " \"scalar\": {\"value\": 200.0},"
+      " \"type\":\"SCALAR\"}").get();
+
+    bool found = std::any_of(resources->values.begin(), resources->values.end(),
+        [&](const JSON::Value& value) { return value.contains(expectedCpu); });
+
+    EXPECT_TRUE(found);
+  }
+}
+
+
 // This test ensures that when a slave is recovered from the registry
 // but does not re-register with the master, it is marked unreachable
 // in the registry, the framework is informed that the slave is lost,
@@ -2998,7 +3253,7 @@ TEST_F(MasterTest, CancelRecoveredSlaveRemoval)
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
 
-  // Reuse slaveFlags so both StartSlave() use the same work_dir.
+  // Reuse `slaveFlags` so both `StartSlave()` calls use the same `work_dir`.
   slave::Flags slaveFlags = CreateSlaveFlags();
 
   Owned<MasterDetector> detector = master.get()->createDetector();

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/tests/slave_compatibility_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_compatibility_tests.cpp b/src/tests/slave_compatibility_tests.cpp
new file mode 100644
index 0000000..ab5ed29
--- /dev/null
+++ b/src/tests/slave_compatibility_tests.cpp
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+// This set of tests checks whether the various settings of the
+// --reconfiguration_policy flag behave as expected.
+
+#include "slave/compatibility.hpp"
+
+#include <mesos/attributes.hpp>
+#include <mesos/resources.hpp>
+
+#include "tests/mesos.hpp"
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class SlaveCompatibilityTest : public MesosTest {};
+
+
+SlaveInfo createSlaveInfo(
+    const std::string& resources,
+    const std::string& attributes)
+{
+  SlaveID id;
+  id.set_value("agent");
+
+  Attributes agentAttributes = Attributes::parse(attributes);
+  Resources agentResources = Resources::parse(resources).get();
+
+  SlaveInfo slave;
+  *(slave.mutable_attributes()) = agentAttributes;
+  *(slave.mutable_resources()) = agentResources;
+  *(slave.mutable_id()) = id;
+  slave.set_hostname(id.value());
+
+  return slave;
+}
+
+
+TEST_F(SlaveCompatibilityTest, Equal)
+{
+  SlaveInfo original = createSlaveInfo("cpus:500", "foo:bar");
+
+  SlaveInfo changedAttributes(original);
+  SlaveInfo changedResources(original);
+  ASSERT_SOME(slave::compatibility::equal(original, changedAttributes));
+  ASSERT_SOME(slave::compatibility::equal(original, changedResources));
+
+  *(changedAttributes.mutable_attributes()) = Attributes::parse("foo:baz");
+  ASSERT_ERROR(slave::compatibility::equal(original, changedAttributes));
+
+  *(changedResources.mutable_resources()) = Resources::parse("cpus:600").get();
+  ASSERT_ERROR(slave::compatibility::equal(original, changedResources));
+}
+
+
+TEST_F(SlaveCompatibilityTest, Additive)
+{
+  // Changing the hostname is not permitted.
+  SlaveInfo originalHostname;
+  originalHostname.set_hostname("host");
+  SlaveInfo changedHostname(originalHostname);
+  ASSERT_SOME(slave::compatibility::additive(
+      originalHostname, changedHostname));
+
+  changedHostname.set_hostname("another_host");
+  ASSERT_ERROR(slave::compatibility::additive(
+      originalHostname, changedHostname));
+
+  // Changing the port is not permitted.
+  SlaveInfo originalPort;
+  originalPort.set_port(1234);
+  SlaveInfo changedPort(originalPort);
+  ASSERT_SOME(slave::compatibility::additive(originalPort, changedPort));
+
+  changedPort.set_port(4321);
+  ASSERT_ERROR(slave::compatibility::additive(originalPort, changedPort));
+
+  // Resources.
+
+  // Adding new resources is permitted.
+  SlaveInfo originalResource = createSlaveInfo("cpus:50", "");
+  SlaveInfo extendedResource = createSlaveInfo("cpus:50;mem:100", "");
+  SlaveInfo modifiedResource = createSlaveInfo("cpus:[100-200]", "");
+  ASSERT_SOME(slave::compatibility::additive(
+      originalResource, extendedResource));
+
+  // Removing existing resources is not permitted.
+  ASSERT_ERROR(slave::compatibility::additive(
+      extendedResource, originalResource));
+
+  // Changing the type of a resource is not permitted.
+  ASSERT_ERROR(slave::compatibility::additive(
+      originalResource, modifiedResource));
+
+  // Scalar resources can be increased but not decreased.
+  SlaveInfo originalScalarResource = createSlaveInfo("cpus:50", "");
+  SlaveInfo changedScalarResource = createSlaveInfo("cpus:100", "");
+  ASSERT_SOME(slave::compatibility::additive(
+      originalScalarResource, changedScalarResource));
+  ASSERT_ERROR(slave::compatibility::additive(
+      changedScalarResource, originalScalarResource));
+
+  // Range attributes can be extended but not shrinked.
+  SlaveInfo originalRangeResource = createSlaveInfo("range:[100-200]", "");
+  SlaveInfo changedRangeResource = createSlaveInfo("range:[100-300]", "");
+  ASSERT_SOME(slave::compatibility::additive(
+      originalRangeResource, changedRangeResource));
+  ASSERT_ERROR(slave::compatibility::additive(
+      changedRangeResource, originalRangeResource));
+
+  // Set attributes can be extended but not shrinked.
+  SlaveInfo originalSetResource = createSlaveInfo("set:{}", "");
+  SlaveInfo changedSetResource = createSlaveInfo("set:{a,b}", "");
+  ASSERT_SOME(slave::compatibility::additive(
+      originalSetResource, changedSetResource));
+  ASSERT_ERROR(slave::compatibility::additive(
+      changedSetResource, originalSetResource));
+
+  // Attributes.
+
+  // Adding new attributes is permitted.
+  SlaveInfo originalAttribute = createSlaveInfo("", "os:lucid");
+  SlaveInfo extendedAttribute = createSlaveInfo("", "os:lucid;dc:amsterdam");
+  SlaveInfo modifiedAttribute = createSlaveInfo("", "os:4");
+  ASSERT_SOME(slave::compatibility::additive(
+      originalAttribute, extendedAttribute));
+
+  // Removing existing attributes is not permitted.
+  ASSERT_ERROR(slave::compatibility::additive(
+      extendedAttribute, originalAttribute));
+
+  // Changing the type of an attribute is not permitted.
+  ASSERT_ERROR(slave::compatibility::additive(
+      originalAttribute, modifiedAttribute));
+
+  // Changing value of a text attribute is not permitted.
+  SlaveInfo originalTextAttribute = createSlaveInfo("", "os:lucid");
+  SlaveInfo changedTextAttribute = createSlaveInfo("", "os:trusty");
+  ASSERT_ERROR(slave::compatibility::additive(
+      originalTextAttribute, changedTextAttribute));
+
+  // Changing the value of a scalar attribute is not permitted.
+  SlaveInfo originalScalarAttribute = createSlaveInfo("", "rack:1");
+  SlaveInfo changedScalarAttribute = createSlaveInfo("", "rack:2");
+  ASSERT_ERROR(slave::compatibility::additive(
+      originalScalarAttribute, changedScalarAttribute));
+
+  // Range attributes can be extended but not shrinked.
+  SlaveInfo originalRangeAttribute = createSlaveInfo("", "range:[100-200]");
+  SlaveInfo changedRangeAttribute = createSlaveInfo("", "range:[100-300]");
+  ASSERT_SOME(slave::compatibility::additive(
+      originalRangeAttribute, changedRangeAttribute));
+  ASSERT_ERROR(slave::compatibility::additive(
+      changedRangeAttribute, originalRangeAttribute));
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 7674e60..253b0fc 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -4559,6 +4559,7 @@ TYPED_TEST(SlaveRecoveryTest, RestartBeforeContainerizerLaunch)
 
   Try<Owned<cluster::Slave>> slave =
     this->StartSlave(detector.get(), &containerizer1, flags);
+
   ASSERT_SOME(slave);
 
   // Enable checkpointing for the framework.
@@ -4637,6 +4638,140 @@ TYPED_TEST(SlaveRecoveryTest, RestartBeforeContainerizerLaunch)
 }
 
 
+// This test starts a task, restarts the slave with increased
+// resources while the task is still running, and then stops
+// the task, verifying that all resources are seen in subsequent
+// offers.
+TYPED_TEST(SlaveRecoveryTest, AgentReconfigurationWithRunningTask)
+{
+  // Start a master.
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  // Start a framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Start a slave.
+  slave::Flags flags = this->CreateSlaveFlags();
+  flags.resources = "cpus:5;mem:0;disk:0;ports:0";
+
+  Fetcher fetcher(flags);
+
+  Try<TypeParam*> _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave =
+    this->StartSlave(detector.get(), containerizer.get(), flags);
+
+  ASSERT_SOME(slave);
+
+  // Start a long-running task on the slave.
+  AWAIT_READY(offers1);
+  ASSERT_FALSE(offers1->empty());
+
+  EXPECT_EQ(
+      offers1.get()[0].resources(),
+      allocatedResources(Resources::parse("cpus:5").get(), "*"));
+
+  SlaveID slaveId = offers1.get()[0].slave_id();
+  TaskInfo task = createTask(
+      slaveId, Resources::parse("cpus:3").get(), "sleep 1000");
+
+  Future<TaskStatus> statusStarting;
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusKilled;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusKilled));
+
+  driver.launchTasks(offers1.get()[0].id(), {task});
+
+  AWAIT_READY(statusStarting);
+  AWAIT_READY(statusRunning);
+
+  // Grab one of the offers while the task is running.
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  AWAIT_READY(offers2);
+  ASSERT_FALSE(offers2->empty());
+
+  EXPECT_EQ(
+      offers2.get()[0].resources(),
+      allocatedResources(Resources::parse("cpus:2").get(), "*"));
+
+  driver.declineOffer(offers2.get()[0].id());
+
+  // Restart the slave with increased resources.
+  slave.get()->terminate();
+  flags.reconfiguration_policy = "additive";
+  flags.resources = "cpus:10;mem:512;disk:0;ports:0";
+
+  // Restart the slave with a new containerizer.
+  _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  containerizer.reset(_containerizer.get());
+
+  Future<SlaveReregisteredMessage> slaveReregistered =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Grab one of the offers after the slave was restarted.
+  Future<vector<Offer>> offers3;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers3))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  slave = this->StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+  AWAIT_READY(slaveReregistered);
+
+  AWAIT_READY(offers3);
+  EXPECT_EQ(
+      offers3.get()[0].resources(),
+      allocatedResources(Resources::parse("cpus:7;mem:512").get(), "*"));
+
+  // Decline so we get the resources offered again with the next offer.
+  driver.declineOffer(offers3.get()[0].id());
+
+  // Kill the task
+  driver.killTask(task.task_id());
+  AWAIT_READY(statusKilled);
+  ASSERT_EQ(TASK_KILLED, statusKilled->state());
+
+  // Grab one of the offers after the task was killed.
+  Future<vector<Offer>> offers4;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers4))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_EQ(
+      offers4.get()[0].resources(),
+      allocatedResources(Resources::parse("cpus:10;mem:512").get(), "*"));
+
+  driver.stop();
+  driver.join();
+}
+
+
 // We explicitly instantiate a SlaveRecoveryTest for test cases where
 // we assume we'll only have the MesosContainerizer.
 class MesosContainerizerSlaveRecoveryTest

http://git-wip-us.apache.org/repos/asf/mesos/blob/63e3336b/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index a7f6658..eee6aa4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -90,6 +90,7 @@
 #include "tests/limiter.hpp"
 #include "tests/mesos.hpp"
 #include "tests/mock_slave.hpp"
+#include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
 using namespace mesos::internal::slave;
@@ -8830,6 +8831,103 @@ TEST_F(SlaveTest, ResourceVersions)
 }
 
 
+// Test that it is possible to add additional resources, attributes,
+// and a domain when the reconfiguration policy is set to
+// `additive`.
+TEST_F(SlaveTest, ReconfigurationPolicy)
+{
+  DomainInfo domain = flags::parse<DomainInfo>(
+      "{"
+      "    \"fault_domain\": {"
+      "        \"region\": {\"name\": \"europe\"},"
+      "        \"zone\": {\"name\": \"europe-b2\"}"
+      "    }"
+      "}").get();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  // Need to set a master domain, otherwise it will reject a slave with
+  // a configured domain.
+  masterFlags.domain = domain;
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.attributes = "distro:debian";
+  slaveFlags.resources = "cpus:4;mem:32;disk:512";
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  // Start a slave.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, slaveFlags);
+
+  ASSERT_SOME(slave);
+
+  // Wait until the slave registers to ensure that it has successfully
+  // checkpointed its state.
+  AWAIT_READY(slaveRegisteredMessage);
+
+  slave.get()->terminate();
+  slave->reset();
+
+  // Do a valid reconfiguration.
+  slaveFlags.reconfiguration_policy = "additive";
+  slaveFlags.resources = "cpus:8;mem:128;disk:512";
+  slaveFlags.attributes = "distro:debian;version:8";
+  slaveFlags.domain = domain;
+
+  // Restart slave.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get()->pid, _);
+
+  slave = StartSlave(detector.get(), &containerizer, slaveFlags);
+
+  ASSERT_SOME(slave);
+
+  // If we get here without the slave exiting, things are working as expected.
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Start scheduler and check that it gets offered the updated resources
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_EQ(1u, offers->size());
+
+  // Verify that the offer contains the new domain, attributes and resources.
+  EXPECT_TRUE(offers.get()[0].has_domain());
+  EXPECT_EQ(
+      Attributes(offers.get()[0].attributes()),
+      Attributes::parse(slaveFlags.attributes.get()));
+
+  // The resources are slightly transformed by both master and slave
+  // before they end up in an offer (in particular, ports are implicitly
+  // added and they're assigned to role '*'), so we cannot simply compare
+  // for equality.
+  Resources offeredResources = Resources(offers.get()[0].resources());
+  Resources reconfiguredResources = allocatedResources(
+      Resources::parse(slaveFlags.resources.get()).get(), "*");
+
+  EXPECT_TRUE(offeredResources.contains(reconfiguredResources));
+}
+
+
 // This test checks that a resource provider triggers an
 // `UpdateSlaveMessage` to be sent to the master if an non-speculated
 // offer operation fails in the resource provider.


[8/8] mesos git commit: Skip registry update when nothing changed.

Posted by vi...@apache.org.
Skip registry update when nothing changed.

Most of the time, agents will not change their configuration
when they restart, so we can skip updating the registry in this
case.

Review: https://reviews.apache.org/r/64101/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7b0812e9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7b0812e9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7b0812e9

Branch: refs/heads/master
Commit: 7b0812e9bf851b8baadadf1c1a95f3790a43d788
Parents: b612e9d
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:56:20 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:58:52 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 54 ++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 45 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7b0812e9/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f9740e0..16cdde7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6582,9 +6582,11 @@ void Master::_reregisterSlave(
 
     // TODO(bevers): Verify that the checkpointed resources sent by the
     // slave match the ones stored in `slave`.
-    registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
-      .onAny(defer(self(),
-          &Self::___reregisterSlave,
+
+    // Skip updating the registry if `slaveInfo` did not change from its
+    // previously known state.
+    if (slaveInfo == slave->info) {
+      ___reregisterSlave(
           slaveInfo,
           pid,
           executorInfos,
@@ -6593,17 +6595,35 @@ void Master::_reregisterSlave(
           version,
           agentCapabilities,
           resourceVersions,
-          lambda::_1));
-
+          true);
+    } else {
+      registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
+        .onAny(defer(self(),
+            &Self::___reregisterSlave,
+            slaveInfo,
+            pid,
+            executorInfos,
+            tasks,
+            frameworks,
+            version,
+            agentCapabilities,
+            resourceVersions,
+            lambda::_1));
+    }
   } else if (slaves.recovered.contains(slaveInfo.id())) {
     // The agent likely is re-registering after a master failover as it
     // is in the list recovered from the registry.
     VLOG(1) << "Re-admitting recovered agent " << slaveInfo.id()
             << " at " << pid << "(" << slaveInfo.hostname() << ")";
 
-    registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
-      .onAny(defer(self(),
-          &Self::__reregisterSlave,
+    SlaveInfo recoveredInfo = slaves.recovered.at(slaveInfo.id());
+    convertResourceFormat(
+        recoveredInfo.mutable_resources(), POST_RESERVATION_REFINEMENT);
+
+    // Skip updating the registry if `slaveInfo` did not change from its
+    // previously known state (see also MESOS-7711).
+    if (slaveInfo == recoveredInfo) {
+      __reregisterSlave(
           slaveInfo,
           pid,
           checkpointedResources,
@@ -6614,7 +6634,23 @@ void Master::_reregisterSlave(
           version,
           agentCapabilities,
           resourceVersions,
-          lambda::_1));
+          true);
+    } else {
+      registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
+        .onAny(defer(self(),
+            &Self::__reregisterSlave,
+            slaveInfo,
+            pid,
+            checkpointedResources,
+            executorInfos,
+            tasks,
+            frameworks,
+            completedFrameworks,
+            version,
+            agentCapabilities,
+            resourceVersions,
+            lambda::_1));
+    }
   } else {
     // In the common case, the slave has been marked unreachable
     // by the master, so we move the slave to the reachable list and


[3/8] mesos git commit: Added `SlaveInfo` parameter to Allocator::updateSlave().

Posted by vi...@apache.org.
Added `SlaveInfo` parameter to Allocator::updateSlave().

This is done mainly in preparation for the upcoming agent
reconfiguration patches, where it will become possible that
`updateSlave()` is passed a different SlaveInfo than `addSlave()`,
and schedulers who rely on some fields in it for their scheduling
decisions need to be able to check and compare them.

Additionally, the HierarchicalDRFAllocator was changed to store
the full SlaveInfo for every slave instead of storing domain and
hostname separately.

Review: https://reviews.apache.org/r/64010/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9015cd31
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9015cd31
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9015cd31

Branch: refs/heads/master
Commit: 9015cd316bf6d185363cd0caf1705e2fb118ed63
Parents: 054bd5a
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:55:34 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:55:34 2017 -0800

----------------------------------------------------------------------
 include/mesos/allocator/allocator.hpp       |  4 ++++
 src/master/allocator/mesos/allocator.hpp    |  5 +++-
 src/master/allocator/mesos/hierarchical.cpp | 29 ++++++++++++++----------
 src/master/allocator/mesos/hierarchical.hpp |  9 +++++---
 src/master/master.cpp                       |  4 ++--
 src/tests/allocator.hpp                     |  9 ++++----
 src/tests/hierarchical_allocator_tests.cpp  | 22 ++++++++++--------
 7 files changed, 51 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9015cd31/include/mesos/allocator/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/allocator/allocator.hpp b/include/mesos/allocator/allocator.hpp
index acb9e4f..e5a5355 100644
--- a/include/mesos/allocator/allocator.hpp
+++ b/include/mesos/allocator/allocator.hpp
@@ -205,11 +205,15 @@ public:
   /**
    * Updates an agent.
    *
+   * TODO(bevers): Make `total` and `capabilities` non-optional.
+   *
+   * @param slaveInfo The current slave info of the agent.
    * @param total The new total resources on the agent.
    * @param capabilities The new capabilities of the agent.
    */
   virtual void updateSlave(
       const SlaveID& slave,
+      const SlaveInfo& slaveInfo,
       const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>&
           capabilities = None()) = 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9015cd31/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 48254b6..c453c01 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -99,6 +99,7 @@ public:
 
   void updateSlave(
       const SlaveID& slave,
+      const SlaveInfo& slaveInfo,
       const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>& capabilities = None());
 
@@ -244,6 +245,7 @@ public:
 
   virtual void updateSlave(
       const SlaveID& slave,
+      const SlaveInfo& slaveInfo,
       const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>&
           capabilities = None()) = 0;
@@ -487,6 +489,7 @@ inline void MesosAllocator<AllocatorProcess>::removeSlave(
 template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::updateSlave(
     const SlaveID& slaveId,
+    const SlaveInfo& slaveInfo,
     const Option<Resources>& total,
     const Option<std::vector<SlaveInfo::Capability>>& capabilities)
 {
@@ -494,11 +497,11 @@ inline void MesosAllocator<AllocatorProcess>::updateSlave(
       process,
       &MesosAllocatorProcess::updateSlave,
       slaveId,
+      slaveInfo,
       total,
       capabilities);
 }
 
-
 template <typename AllocatorProcess>
 void MesosAllocator<AllocatorProcess>::addResourceProvider(
     const SlaveID& slave,

http://git-wip-us.apache.org/repos/asf/mesos/blob/9015cd31/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 4bc9fb6..b22829b 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -507,6 +507,7 @@ void HierarchicalAllocatorProcess::addSlave(
 {
   CHECK(initialized);
   CHECK(!slaves.contains(slaveId));
+  CHECK_EQ(slaveId, slaveInfo.id());
   CHECK(!paused || expectedAgentCount.isSome());
 
   slaves[slaveId] = Slave();
@@ -516,13 +517,9 @@ void HierarchicalAllocatorProcess::addSlave(
   slave.total = total;
   slave.allocated = Resources::sum(used);
   slave.activated = true;
-  slave.hostname = slaveInfo.hostname();
+  slave.info = slaveInfo;
   slave.capabilities = protobuf::slave::Capabilities(capabilities);
 
-  if (slaveInfo.has_domain()) {
-    slave.domain = slaveInfo.domain();
-  }
-
   // NOTE: We currently implement maintenance in the allocator to be able to
   // leverage state and features such as the FrameworkSorter and OfferFilter.
   if (unavailability.isSome()) {
@@ -576,7 +573,7 @@ void HierarchicalAllocatorProcess::addSlave(
     resume();
   }
 
-  LOG(INFO) << "Added agent " << slaveId << " (" << slave.hostname << ")"
+  LOG(INFO) << "Added agent " << slaveId << " (" << slave.info.hostname() << ")"
             << " with " << slave.total
             << " (allocated: " << slave.allocated << ")";
 
@@ -615,14 +612,22 @@ void HierarchicalAllocatorProcess::removeSlave(
 
 void HierarchicalAllocatorProcess::updateSlave(
     const SlaveID& slaveId,
+    const SlaveInfo& info,
     const Option<Resources>& total,
     const Option<vector<SlaveInfo::Capability>>& capabilities)
 {
   CHECK(initialized);
   CHECK(slaves.contains(slaveId));
+  CHECK_EQ(slaveId, info.id());
 
   Slave& slave = slaves.at(slaveId);
 
+  // We unconditionally overwrite the old domain and hostname: Even though
+  // the master places some restrictions on this (i.e. agents are not allowed
+  // to re-register with a different hostname) inside the allocator it
+  // doesn't matter, as the algorithm will work correctly either way.
+  slave.info = info;
+
   bool updated = false;
 
   // Update agent capabilities.
@@ -635,7 +640,7 @@ void HierarchicalAllocatorProcess::updateSlave(
     if (newCapabilities != oldCapabilities) {
       updated = true;
 
-      LOG(INFO) << "Agent " << slaveId << " (" << slave.hostname << ")"
+      LOG(INFO) << "Agent " << slaveId << " (" << slave.info.hostname() << ")"
                 << " updated with capabilities " << slave.capabilities;
     }
   }
@@ -643,7 +648,7 @@ void HierarchicalAllocatorProcess::updateSlave(
   if (total.isSome()) {
     updated = updateSlaveTotal(slaveId, total.get()) || updated;
 
-    LOG(INFO) << "Agent " << slaveId << " (" << slave.hostname << ")"
+    LOG(INFO) << "Agent " << slaveId << " (" << slave.info.hostname() << ")"
               << " updated with total resources " << total.get();
   }
 
@@ -2119,7 +2124,7 @@ bool HierarchicalAllocatorProcess::isWhitelisted(
 
   const Slave& slave = slaves.at(slaveId);
 
-  return whitelist.isNone() || whitelist->contains(slave.hostname);
+  return whitelist.isNone() || whitelist->contains(slave.info.hostname());
 }
 
 
@@ -2393,7 +2398,7 @@ bool HierarchicalAllocatorProcess::updateSlaveTotal(
 bool HierarchicalAllocatorProcess::isRemoteSlave(const Slave& slave) const
 {
   // If the slave does not have a configured domain, assume it is not remote.
-  if (slave.domain.isNone()) {
+  if (!slave.info.has_domain()) {
     return false;
   }
 
@@ -2402,7 +2407,7 @@ bool HierarchicalAllocatorProcess::isRemoteSlave(const Slave& slave) const
   // might change in the future, if more types of domains are added.
   // For forward compatibility, we treat agents with a configured
   // domain but no fault domain as having no configured domain.
-  if (!slave.domain->has_fault_domain()) {
+  if (!slave.info.domain().has_fault_domain()) {
     return false;
   }
 
@@ -2418,7 +2423,7 @@ bool HierarchicalAllocatorProcess::isRemoteSlave(const Slave& slave) const
   const DomainInfo::FaultDomain::RegionInfo& masterRegion =
     domain->fault_domain().region();
   const DomainInfo::FaultDomain::RegionInfo& slaveRegion =
-    slave.domain->fault_domain().region();
+    slave.info.domain().fault_domain().region();
 
   return masterRegion != slaveRegion;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9015cd31/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 41ffe17..14dcf3d 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -144,6 +144,7 @@ public:
 
   void updateSlave(
       const SlaveID& slave,
+      const SlaveInfo& slaveInfo,
       const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>& capabilities = None());
 
@@ -381,12 +382,14 @@ protected:
 
     bool activated;  // Whether to offer resources.
 
-    std::string hostname;
+    // The `SlaveInfo` that was passed to the allocator when the slave was added
+    // or updated. Currently only two fields are used: `hostname` for host
+    // whitelisting and in log messages, and `domain` for region-aware
+    // scheduling.
+    SlaveInfo info;
 
     protobuf::slave::Capabilities capabilities;
 
-    Option<DomainInfo> domain;
-
     // Represents a scheduled unavailability due to maintenance for a specific
     // slave, and the responses from frameworks as to whether they will be able
     // to gracefully handle this unavailability.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9015cd31/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f77a1ed..6679e25 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6573,7 +6573,7 @@ void Master::_reregisterSlave(
     slave->resourceVersions = protobuf::parseResourceVersions(
         {resourceVersions.begin(), resourceVersions.end()});
 
-    allocator->updateSlave(slave->id, None(), agentCapabilities);
+    allocator->updateSlave(slave->id, slave->info, None(), agentCapabilities);
 
     // Reconcile tasks between master and slave, and send the
     // `SlaveReregisteredMessage`.
@@ -7424,7 +7424,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
   }
 
   // Now update the agent's total resources in the allocator.
-  allocator->updateSlave(slaveId, slave->totalResources);
+  allocator->updateSlave(slaveId, slave->info, slave->totalResources);
 
   // Then rescind outstanding offers affected by the update.
   // NOTE: Need a copy of offers because the offers are removed inside the loop.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9015cd31/src/tests/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator.hpp b/src/tests/allocator.hpp
index fc5d9ef..341efa6 100644
--- a/src/tests/allocator.hpp
+++ b/src/tests/allocator.hpp
@@ -99,7 +99,7 @@ ACTION_P(InvokeRemoveSlave, allocator)
 
 ACTION_P(InvokeUpdateSlave, allocator)
 {
-  allocator->real->updateSlave(arg0, arg1, arg2);
+  allocator->real->updateSlave(arg0, arg1, arg2, arg3);
 }
 
 
@@ -280,9 +280,9 @@ public:
     EXPECT_CALL(*this, removeSlave(_))
       .WillRepeatedly(DoDefault());
 
-    ON_CALL(*this, updateSlave(_, _, _))
+    ON_CALL(*this, updateSlave(_, _, _, _))
       .WillByDefault(InvokeUpdateSlave(this));
-    EXPECT_CALL(*this, updateSlave(_, _, _))
+    EXPECT_CALL(*this, updateSlave(_, _, _, _))
       .WillRepeatedly(DoDefault());
 
     ON_CALL(*this, addResourceProvider(_, _, _))
@@ -416,8 +416,9 @@ public:
   MOCK_METHOD1(removeSlave, void(
       const SlaveID&));
 
-  MOCK_METHOD3(updateSlave, void(
+  MOCK_METHOD4(updateSlave, void(
       const SlaveID&,
+      const SlaveInfo&,
       const Option<Resources>&,
       const Option<std::vector<SlaveInfo::Capability>>&));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9015cd31/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 154049c..2fc5f5e 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -2019,7 +2019,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveOversubscribedResources)
 
   // Update the slave with 10 oversubscribed cpus.
   Resources oversubscribed = createRevocableResources("cpus", "10");
-  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed);
+  allocator->updateSlave(slave.id(), slave, slave.resources() + oversubscribed);
 
   // The next allocation should be for 10 oversubscribed resources.
   expected = Allocation(
@@ -2030,7 +2030,8 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveOversubscribedResources)
 
   // Update the slave again with 12 oversubscribed cpus.
   Resources oversubscribed2 = createRevocableResources("cpus", "12");
-  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed2);
+  allocator->updateSlave(
+      slave.id(), slave, slave.resources() + oversubscribed2);
 
   // The next allocation should be for 2 oversubscribed cpus.
   expected = Allocation(
@@ -2041,7 +2042,8 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveOversubscribedResources)
 
   // Update the slave again with 5 oversubscribed cpus.
   Resources oversubscribed3 = createRevocableResources("cpus", "5");
-  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed3);
+  allocator->updateSlave(
+      slave.id(), slave, slave.resources() + oversubscribed3);
 
   // Since there are no more available oversubscribed resources there
   // shouldn't be an allocation.
@@ -2090,6 +2092,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveTotalResources)
 
   allocator->updateSlave(
       agent.id(),
+      agent,
       agent.resources() + addedResources);
 
   const Allocation expected2 = Allocation(
@@ -2104,7 +2107,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveTotalResources)
   const Resources agentResources2 =
     Resources::parse("cpus:50;mem:50;disk:50").get();
 
-  allocator->updateSlave(agent.id(), agentResources2);
+  allocator->updateSlave(agent.id(), agent, agentResources2);
 
   // Recover all agent resources allocated to the framework in the last two
   // allocations. We will subsequently be offered the complete agent which has
@@ -2129,7 +2132,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveTotalResources)
   // Set the agent's total resources to its original value. This will trigger
   // allocation of the newly added `agentResources2` resources now available on
   // the agent.
-  allocator->updateSlave(agent.id(), agent.resources());
+  allocator->updateSlave(agent.id(), agent, agent.resources());
 
   const Allocation expected4 = Allocation(
       framework.id(),
@@ -2174,7 +2177,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveCapabilities)
 
   // Update the agent to be MULTI_ROLE capable.
 
-  allocator->updateSlave(agent.id(), None(), AGENT_CAPABILITIES());
+  allocator->updateSlave(agent.id(), agent, None(), AGENT_CAPABILITIES());
 
   Clock::settle();
 
@@ -2218,7 +2221,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
 
   // Update the slave with 10 oversubscribed cpus.
   Resources oversubscribed = createRevocableResources("cpus", "10");
-  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed);
+  allocator->updateSlave(slave.id(), slave, slave.resources() + oversubscribed);
 
   // No allocation should be made for oversubscribed resources because
   // the framework has not opted in for them.
@@ -2262,7 +2265,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
 
   // Update the slave with 10 oversubscribed cpus.
   Resources oversubscribed = createRevocableResources("cpus", "10");
-  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed);
+  allocator->updateSlave(slave.id(), slave, slave.resources() + oversubscribed);
 
   // The next allocation should be for 10 oversubscribed cpus.
   expected = Allocation(
@@ -5527,7 +5530,8 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, AddAndUpdateSlave)
   watch.start(); // Reset.
 
   foreach (const SlaveInfo& slave, slaves) {
-    allocator->updateSlave(slave.id(), slave.resources() + oversubscribed);
+    allocator->updateSlave(
+        slave.id(), slave, slave.resources() + oversubscribed);
   }
 
   // Wait for all the `updateSlave` operations to be processed.


[5/8] mesos git commit: Activated AGENT_UPDATE master capability.

Posted by vi...@apache.org.
Activated AGENT_UPDATE master capability.

Review: https://reviews.apache.org/r/64008/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f4d2c8f5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f4d2c8f5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f4d2c8f5

Branch: refs/heads/master
Commit: f4d2c8f547e42b65dd0040ddbe35157335577f6a
Parents: 2e8c766
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:55:52 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:55:52 2017 -0800

----------------------------------------------------------------------
 src/master/constants.cpp   | 13 ++++++++++++-
 src/tests/master_tests.cpp | 10 +++++++++-
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f4d2c8f5/src/master/constants.cpp
----------------------------------------------------------------------
diff --git a/src/master/constants.cpp b/src/master/constants.cpp
index 55eecfb..1109b48 100644
--- a/src/master/constants.cpp
+++ b/src/master/constants.cpp
@@ -22,7 +22,18 @@ namespace master {
 
 std::vector<MasterInfo::Capability> MASTER_CAPABILITIES()
 {
-  return {}; // Empty for now.
+  MasterInfo::Capability::Type types[] = {
+    MasterInfo::Capability::AGENT_UPDATE,
+  };
+
+  std::vector<MasterInfo::Capability> result;
+  foreach (MasterInfo::Capability::Type type, types) {
+    MasterInfo::Capability capability;
+    capability.set_type(type);
+    result.push_back(capability);
+  }
+
+  return result;
 }
 
 } // namespace master {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f4d2c8f5/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 08742ec..64ac2d5 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -4515,7 +4515,15 @@ TEST_F(MasterTest, StateEndpoint)
       state.values["unregistered_frameworks"].as<JSON::Array>().values.empty());
 
   ASSERT_TRUE(state.values["capabilities"].is<JSON::Array>());
-  EXPECT_TRUE(state.values["capabilities"].as<JSON::Array>().values.empty());
+  EXPECT_FALSE(state.values["capabilities"].as<JSON::Array>().values.empty());
+
+  JSON::Value masterCapabilities = state.values.at("capabilities");
+
+  // Master should always have the AGENT_UPDATE capability
+  Try<JSON::Value> expectedCapabilities = JSON::parse("[\"AGENT_UPDATE\"]");
+
+  ASSERT_SOME(expectedCapabilities);
+  EXPECT_TRUE(masterCapabilities.contains(expectedCapabilities.get()));
 }
 
 


[4/8] mesos git commit: Updated master behaviour to update agent state on reregistration.

Posted by vi...@apache.org.
Updated master behaviour to update agent state on reregistration.

When an agent reregisters, the master will now always update
the agent information it holds in memory, and will write any
changes back to the registry if necessary.

Note that most tests for this are added in a later review, since they
require the capability to actually restart the agent with
a changed state to effectively test the masters response to that.

Review: https://reviews.apache.org/r/64011/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2e8c7663
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2e8c7663
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2e8c7663

Branch: refs/heads/master
Commit: 2e8c76631a8cd0f3a46c9865fa3eb83e2f425de9
Parents: 9015cd3
Author: Benno Evers <be...@mesosphere.com>
Authored: Tue Dec 5 13:55:42 2017 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Dec 5 13:55:42 2017 -0800

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp |  58 +++-
 src/master/allocator/mesos/hierarchical.hpp |   4 +
 src/master/master.cpp                       | 383 +++++++++++++++--------
 src/master/master.hpp                       |  19 +-
 src/tests/hierarchical_allocator_tests.cpp  |  70 +++++
 5 files changed, 389 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index b22829b..d348516 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -22,6 +22,7 @@
 #include <utility>
 #include <vector>
 
+#include <mesos/attributes.hpp>
 #include <mesos/resources.hpp>
 #include <mesos/type_utils.hpp>
 
@@ -622,14 +623,32 @@ void HierarchicalAllocatorProcess::updateSlave(
 
   Slave& slave = slaves.at(slaveId);
 
-  // We unconditionally overwrite the old domain and hostname: Even though
-  // the master places some restrictions on this (i.e. agents are not allowed
-  // to re-register with a different hostname) inside the allocator it
-  // doesn't matter, as the algorithm will work correctly either way.
-  slave.info = info;
-
   bool updated = false;
 
+  // Remove all offer filters for this slave if it was restarted with changed
+  // attributes. We do this because schedulers might have decided that they're
+  // not interested in offers from this slave based on the non-presence of some
+  // required attributes, and right now they have no other way of learning
+  // about this change.
+  // TODO(bennoe): Once the agent lifecycle design is implemented, there is a
+  // better way to notify frameworks about such changes and let them make this
+  // decision. We should think about ways to safely remove this check at that
+  // point in time.
+  if (!(Attributes(info.attributes()) == Attributes(slave.info.attributes()))) {
+    updated = true;
+    removeFilters(slaveId);
+  }
+
+  if (!(slave.info == info)) {
+    updated = true;
+
+    // We unconditionally overwrite the old domain and hostname: Even though
+    // the master places some restrictions on this (i.e. agents are not allowed
+    // to re-register with a different hostname) inside the allocator it
+    // doesn't matter, as the algorithm will work correctly either way.
+    slave.info = info;
+  }
+
   // Update agent capabilities.
   if (capabilities.isSome()) {
     protobuf::slave::Capabilities newCapabilities(capabilities.get());
@@ -698,6 +717,33 @@ void HierarchicalAllocatorProcess::addResourceProvider(
 }
 
 
+void HierarchicalAllocatorProcess::removeFilters(const SlaveID& slaveId)
+{
+  CHECK(initialized);
+
+  foreachpair (const FrameworkID& id,
+               Framework& framework,
+               frameworks) {
+    framework.inverseOfferFilters.erase(slaveId);
+
+    // Need a typedef here, otherwise the preprocessor gets confused
+    // by the comma in the template argument list.
+    typedef hashmap<SlaveID, hashset<OfferFilter*>> Filters;
+    foreachpair(const string& role,
+                Filters& filters,
+                framework.offerFilters) {
+      size_t erased = filters.erase(slaveId);
+      if (erased) {
+        frameworkSorters.at(role)->activate(id.value());
+        framework.suppressedRoles.erase(role);
+      }
+    }
+  }
+
+  LOG(INFO) << "Removed all filters for agent " << slaveId;
+}
+
+
 void HierarchicalAllocatorProcess::activateSlave(
     const SlaveID& slaveId)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 14dcf3d..0622026 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -555,6 +555,10 @@ private:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Resources& allocated);
+
+  // Helper that removes all existing offer filters for the given slave
+  // id.
+  void removeFilters(const SlaveID& slaveId);
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6679e25..6a52aad 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6346,6 +6346,11 @@ void Master::reregisterSlave(
     return;
   }
 
+  // TODO(bevers): Technically this behaviour seems to be incorrect, since we
+  // discard the newer re-registration attempt, which might have additional
+  // capabilities or a higher version (or a changed SlaveInfo, after Mesos 1.5).
+  // However, this should very rarely happen in practice, and nobody seems to
+  // have complained about it so far.
   if (slaves.reregistering.contains(slaveInfo.id())) {
     LOG(INFO)
       << "Ignoring re-register agent message from agent "
@@ -6532,7 +6537,8 @@ void Master::_reregisterSlave(
     // For now, we assume this slave is not nefarious (eventually
     // this will be handled by orthogonal security measures like key
     // based authentication).
-    LOG(INFO) << "Re-registering agent " << *slave;
+    VLOG(1) << "Agent is already marked as registered: " << slaveInfo.id()
+            << " at " << pid << " (" << slaveInfo.hostname() << ")";
 
     // We don't allow re-registering this way with a different IP or
     // hostname. This is because maintenance is scheduled at the
@@ -6556,142 +6562,64 @@ void Master::_reregisterSlave(
       return;
     }
 
-    // Update the slave pid and relink to it.
-    // NOTE: Re-linking the slave here always rather than only when
-    // the slave is disconnected can lead to multiple exited events
-    // in succession for a disconnected slave. As a result, we
-    // ignore duplicate exited events for disconnected slaves.
-    // See: https://issues.apache.org/jira/browse/MESOS-675
-    slave->pid = pid;
-    link(slave->pid);
-
-    // Update slave's version, re-registration timestamp and
-    // agent capabilities after re-registering successfully.
-    slave->version = version;
-    slave->reregisteredTime = Clock::now();
-    slave->capabilities = agentCapabilities;
-    slave->resourceVersions = protobuf::parseResourceVersions(
-        {resourceVersions.begin(), resourceVersions.end()});
-
-    allocator->updateSlave(slave->id, slave->info, None(), agentCapabilities);
-
-    // Reconcile tasks between master and slave, and send the
-    // `SlaveReregisteredMessage`.
-    reconcileKnownSlave(slave, executorInfos, tasks);
-
-    // If this is a disconnected slave, add it back to the allocator.
-    // This is done after reconciliation to ensure the allocator's
-    // offers include the recovered resources initially on this
-    // slave.
-    if (!slave->connected) {
-      CHECK(slave->reregistrationTimer.isSome());
-      Clock::cancel(slave->reregistrationTimer.get());
-
-      slave->connected = true;
-      dispatch(slave->observer, &SlaveObserver::reconnect);
-
-      slave->active = true;
-      allocator->activateSlave(slave->id);
-    }
-
-    CHECK(slave->active)
-      << "Unexpected connected but deactivated agent " << *slave;
-
-    // Inform the agent of the new framework pids for its tasks.
-    ___reregisterSlave(slave, frameworks);
-
-    slaves.reregistering.erase(slaveInfo.id());
-
-    // If the agent is not resource provider capable (legacy agent),
-    // send checkpointed resources to the agent. This is important for
-    // the cases where the master didn't fail over. In that case, the
-    // master might have already applied an operation that the agent
-    // didn't see (e.g., due to a breaking connection). This message
-    // will sync the state between the master and the agent about
-    // checkpointed resources.
-    //
-    // New agents that are resource provider capable will always
-    // update the master with total resources during re-registration.
-    // Therefore, no need to send checkpointed resources to the new
-    // agent in this case.
-    if (!slave->capabilities.resourceProvider) {
-      CheckpointResourcesMessage message;
-
-      message.mutable_resources()->CopyFrom(slave->checkpointedResources);
-
-      if (!slave->capabilities.reservationRefinement) {
-        // If the agent is not refinement-capable, don't send it
-        // checkpointed resources that contain refined reservations. This
-        // might occur if a reservation refinement is created but never
-        // reaches the agent (e.g., due to network partition), and then
-        // the agent is downgraded before the partition heals.
-        //
-        // TODO(neilc): It would probably be better to prevent the agent
-        // from re-registering in this scenario.
-        Try<Nothing> result = downgradeResources(message.mutable_resources());
-        if (result.isError()) {
-          LOG(WARNING) << "Not sending updated checkpointed resouces "
-                       << slave->checkpointedResources
-                       << " with refined reservations, since agent " << *slave
-                       << " is not RESERVATION_REFINEMENT-capable.";
-
-          return;
-        }
-      }
-
-      LOG(INFO) << "Sending updated checkpointed resources "
-                << slave->checkpointedResources
-                << " to agent " << *slave;
-
-      send(slave->pid, message);
-    }
-
-    return;
-  }
-
-  LOG(INFO) << "Re-registering agent " << slaveInfo.id() << " at " << pid
-            << " (" << slaveInfo.hostname() << ")";
-
-  if (slaves.recovered.contains(slaveInfo.id())) {
+    // TODO(bevers): Verify that the checkpointed resources sent by the
+    // slave match the ones stored in `slave`.
+    registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
+      .onAny(defer(self(),
+          &Self::___reregisterSlave,
+          slaveInfo,
+          pid,
+          executorInfos,
+          tasks,
+          frameworks,
+          version,
+          agentCapabilities,
+          resourceVersions,
+          lambda::_1));
+
+  } else if (slaves.recovered.contains(slaveInfo.id())) {
     // The agent likely is re-registering after a master failover as it
-    // is in the list recovered from the registry. No need to consult the
-    // registry in this case and we can directly re-admit it.
-    VLOG(1) << "Re-admitting recovered agent " << slaveInfo.id() << " at "
-            << pid << " (" << slaveInfo.hostname() << ")";
-
-    __reregisterSlave(
-        slaveInfo,
-        pid,
-        checkpointedResources,
-        executorInfos,
-        tasks,
-        frameworks,
-        completedFrameworks,
-        version,
-        agentCapabilities,
-        resourceVersions,
-        true);
+    // is in the list recovered from the registry.
+    VLOG(1) << "Re-admitting recovered agent " << slaveInfo.id()
+            << " at " << pid << "(" << slaveInfo.hostname() << ")";
+
+    registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo)))
+      .onAny(defer(self(),
+          &Self::__reregisterSlave,
+          slaveInfo,
+          pid,
+          checkpointedResources,
+          executorInfos,
+          tasks,
+          frameworks,
+          completedFrameworks,
+          version,
+          agentCapabilities,
+          resourceVersions,
+          lambda::_1));
   } else {
-    // Consult the registry to determine whether to readmit the
-    // slave. In the common case, the slave has been marked unreachable
+    // In the common case, the slave has been marked unreachable
     // by the master, so we move the slave to the reachable list and
     // readmit it. If the slave isn't in the unreachable list (which
     // might occur if the slave's entry in the unreachable list is
     // GC'd), we admit the slave anyway.
+    VLOG(1) << "Consulting registry about agent " << slaveInfo.id()
+            << " at " << pid << "(" << slaveInfo.hostname() << ")";
+
     registrar->apply(Owned<Operation>(new MarkSlaveReachable(slaveInfo)))
       .onAny(defer(self(),
-                   &Self::__reregisterSlave,
-                   slaveInfo,
-                   pid,
-                   checkpointedResources,
-                   executorInfos,
-                   tasks,
-                   frameworks,
-                   completedFrameworks,
-                   version,
-                   agentCapabilities,
-                   resourceVersions,
-                   lambda::_1));
+          &Self::__reregisterSlave,
+          slaveInfo,
+          pid,
+          checkpointedResources,
+          executorInfos,
+          tasks,
+          frameworks,
+          completedFrameworks,
+          version,
+          agentCapabilities,
+          resourceVersions,
+          lambda::_1));
   }
 }
 
@@ -6707,19 +6635,21 @@ void Master::__reregisterSlave(
     const string& version,
     const vector<SlaveInfo::Capability>& agentCapabilities,
     const vector<ResourceVersionUUID>& resourceVersions,
-    const Future<bool>& readmit)
+    const Future<bool>& future)
 {
   CHECK(slaves.reregistering.contains(slaveInfo.id()));
 
-  if (readmit.isFailed()) {
-    LOG(FATAL) << "Failed to readmit agent " << slaveInfo.id() << " at " << pid
-               << " (" << slaveInfo.hostname() << "): " << readmit.failure();
+  if (future.isFailed()) {
+    LOG(FATAL) << "Failed to update registry for agent " << slaveInfo.id()
+               << " at " << pid << " (" << slaveInfo.hostname() << "): "
+               << future.failure();
   }
 
-  CHECK(!readmit.isDiscarded());
+  CHECK(!future.isDiscarded());
 
-  // `MarkSlaveReachable` registry operation should never fail.
-  CHECK(readmit.get());
+  // Neither the `UpdateSlave` nor `MarkSlaveReachable` registry operations
+  // should ever fail.
+  CHECK(future.get());
 
   VLOG(1) << "Re-admitted agent " << slaveInfo.id() << " at " << pid
           << " (" << slaveInfo.hostname() << ")";
@@ -6896,13 +6826,158 @@ void Master::__reregisterSlave(
     }
   }
 
-  ___reregisterSlave(slave, frameworks);
+  updateSlaveFrameworks(slave, frameworks);
 
   slaves.reregistering.erase(slaveInfo.id());
 }
 
 
 void Master::___reregisterSlave(
+    const SlaveInfo& slaveInfo,
+    const process::UPID& pid,
+    const std::vector<ExecutorInfo>& executorInfos,
+    const std::vector<Task>& tasks,
+    const std::vector<FrameworkInfo>& frameworks,
+    const std::string& version,
+    const std::vector<SlaveInfo::Capability>& agentCapabilities,
+    const vector<ResourceVersionUUID>& resourceVersions,
+    const process::Future<bool>& updated)
+{
+  CHECK(slaves.reregistering.contains(slaveInfo.id()));
+
+  CHECK_READY(updated);
+  CHECK(updated.get());
+
+  VLOG(1) << "Registry updated for slave " << slaveInfo.id() << " at " << pid
+          << "(" << slaveInfo.hostname() << ")";
+
+  if (!slaves.registered.contains(slaveInfo.id())) {
+    LOG(WARNING)
+      << "Dropping ongoing re-registration attempt of slave " << slaveInfo.id()
+      << " at " << pid << "(" << slaveInfo.hostname() << ") "
+      << "because the re-registration timeout was reached.";
+
+    slaves.reregistering.erase(slaveInfo.id());
+    // Don't send a ShutdownMessage here because tasks from partition-aware
+    // frameworks running on this host might still be recovered when the slave
+    // retries the re-registration.
+    return;
+  }
+
+  Slave* slave = slaves.registered.get(slaveInfo.id());
+
+  // Update the slave pid and relink to it.
+  // NOTE: Re-linking the slave here always rather than only when
+  // the slave is disconnected can lead to multiple exited events
+  // in succession for a disconnected slave. As a result, we
+  // ignore duplicate exited events for disconnected slaves.
+  // See: https://issues.apache.org/jira/browse/MESOS-675
+  slave->pid = pid;
+  link(slave->pid);
+
+  Try<Nothing> stateUpdated =
+    slave->update(slaveInfo, version, agentCapabilities, resourceVersions);
+
+  // As of now, the only way `slave->update()` can fail is if the agent sent
+  // different checkpointed resources than it had before. A well-behaving
+  // agent shouldn't do this, so this one is either malicious or buggy. Either
+  // way, we refuse the re-registration attempt.
+  if (stateUpdated.isError()) {
+    LOG(WARNING) << "Refusing re-registration of agent " << slaveInfo.id()
+                 << " at " << pid << " (" << slaveInfo.hostname() << ")"
+                 << " because state update failed: " << stateUpdated.error();
+
+    ShutdownMessage message;
+    message.set_message(stateUpdated.error());
+    send(pid, message);
+
+    slaves.reregistering.erase(slaveInfo.id());
+    return;
+  }
+
+  slave->reregisteredTime = Clock::now();
+
+  allocator->updateSlave(
+    slave->id,
+    slave->info,
+    slave->totalResources,
+    agentCapabilities);
+
+  // Reconcile tasks between master and slave, and send the
+  // `SlaveReregisteredMessage`.
+  reconcileKnownSlave(slave, executorInfos, tasks);
+
+  // If this is a disconnected slave, add it back to the allocator.
+  // This is done after reconciliation to ensure the allocator's
+  // offers include the recovered resources initially on this
+  // slave.
+  if (!slave->connected) {
+    CHECK(slave->reregistrationTimer.isSome());
+    Clock::cancel(slave->reregistrationTimer.get());
+
+    slave->connected = true;
+    dispatch(slave->observer, &SlaveObserver::reconnect);
+
+    slave->active = true;
+    allocator->activateSlave(slave->id);
+  }
+
+  CHECK(slave->active)
+    << "Unexpected connected but deactivated agent " << *slave;
+
+  // Inform the agent of the new framework pids for its tasks, and
+  // recover any unknown frameworks from the slave info.
+  updateSlaveFrameworks(slave, frameworks);
+
+  slaves.reregistering.erase(slaveInfo.id());
+
+  // If the agent is not resource provider capable (legacy agent),
+  // send checkpointed resources to the agent. This is important for
+  // the cases where the master didn't fail over. In that case, the
+  // master might have already applied an operation that the agent
+  // didn't see (e.g., due to a breaking connection). This message
+  // will sync the state between the master and the agent about
+  // checkpointed resources.
+  //
+  // New agents that are resource provider capable will always
+  // update the master with total resources during re-registration.
+  // Therefore, no need to send checkpointed resources to the new
+  // agent in this case.
+  if (!slave->capabilities.resourceProvider) {
+    CheckpointResourcesMessage message;
+
+    message.mutable_resources()->CopyFrom(slave->checkpointedResources);
+
+    if (!slave->capabilities.reservationRefinement) {
+      // If the agent is not refinement-capable, don't send it
+      // checkpointed resources that contain refined reservations. This
+      // might occur if a reservation refinement is created but never
+      // reaches the agent (e.g., due to network partition), and then
+      // the agent is downgraded before the partition heals.
+      //
+      // TODO(neilc): It would probably be better to prevent the agent
+      // from re-registering in this scenario.
+      Try<Nothing> result = downgradeResources(message.mutable_resources());
+      if (result.isError()) {
+        LOG(WARNING) << "Not sending updated checkpointed resouces "
+                     << slave->checkpointedResources
+                     << " with refined reservations, since agent " << *slave
+                     << " is not RESERVATION_REFINEMENT-capable.";
+
+        return;
+      }
+    }
+
+    LOG(INFO) << "Sending updated checkpointed resources "
+              << slave->checkpointedResources
+              << " to agent " << *slave;
+
+    send(slave->pid, message);
+  }
+}
+
+
+void Master::updateSlaveFrameworks(
     Slave* slave,
     const vector<FrameworkInfo>& frameworks)
 {
@@ -7423,7 +7498,7 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     }
   }
 
-  // Now update the agent's total resources in the allocator.
+  // Now update the agent's state and total resources in the allocator.
   allocator->updateSlave(slaveId, slave->info, slave->totalResources);
 
   // Then rescind outstanding offers affected by the update.
@@ -11349,6 +11424,38 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
   checkpointedResources = totalResources.filter(needCheckpointing);
 }
 
+
+Try<Nothing> Slave::update(
+  const SlaveInfo& _info,
+  const string& _version,
+  const vector<SlaveInfo::Capability>& _capabilities,
+  const vector<ResourceVersionUUID>& _resourceVersions)
+{
+  Try<Resources> resources = applyCheckpointedResources(
+      _info.resources(),
+      checkpointedResources);
+
+  // This should be validated during slave recovery.
+  if (resources.isError()) {
+    return Error(resources.error());
+  }
+
+  version = _version;
+  capabilities = _capabilities;
+  info = _info;
+
+  // There is a short window here where `totalResources` can have an old value,
+  // but it should be relatively short because the agent will send
+  // an `UpdateSlaveMessage` with the new total resources immediately after
+  // re-registering in this case.
+  totalResources = resources.get();
+
+  resourceVersions = protobuf::parseResourceVersions(
+      {_resourceVersions.begin(), _resourceVersions.end()});
+
+  return Nothing();
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index a721131..d42acae 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -178,9 +178,15 @@ struct Slave
 
   void apply(const std::vector<ResourceConversion>& conversions);
 
+  Try<Nothing> update(
+    const SlaveInfo& info,
+    const std::string& _version,
+    const std::vector<SlaveInfo::Capability>& _capabilites,
+    const std::vector<ResourceVersionUUID>& resourceVersions);
+
   Master* const master;
   const SlaveID id;
-  const SlaveInfo info;
+  SlaveInfo info;
 
   const MachineID machineId;
 
@@ -637,6 +643,17 @@ protected:
       const process::Future<bool>& readmit);
 
   void ___reregisterSlave(
+      const SlaveInfo& slaveInfo,
+      const process::UPID& pid,
+      const std::vector<ExecutorInfo>& executorInfos,
+      const std::vector<Task>& tasks,
+      const std::vector<FrameworkInfo>& frameworks,
+      const std::string& version,
+      const std::vector<SlaveInfo::Capability>& agentCapabilities,
+      const std::vector<ResourceVersionUUID>& resourceVersions,
+      const process::Future<bool>& updated);
+
+  void updateSlaveFrameworks(
       Slave* slave,
       const std::vector<FrameworkInfo>& frameworks);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2e8c7663/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 2fc5f5e..862f468 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -6269,6 +6269,76 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, Metrics)
 }
 
 
+// Tests that the `updateSlave()` function correctly removes all filters
+// for the specified slave when slave attributes are changed on restart.
+TEST_F(HierarchicalAllocatorTest, RemoveFilters)
+{
+  // We put both frameworks into the same role, but we could also
+  // have had separate roles; this should not influence the test.
+  const string ROLE{"role"};
+
+  // Pause the clock because we want to manually drive the allocations.
+  Clock::pause();
+
+  initialize();
+
+  FrameworkInfo framework = createFrameworkInfo({ROLE});
+  allocator->addFramework(framework.id(), framework, {}, true, {});
+
+  SlaveInfo agent = createSlaveInfo("cpus:1;mem:512;disk:0");
+  allocator->addSlave(
+      agent.id(),
+      agent,
+      AGENT_CAPABILITIES(),
+      None(),
+      agent.resources(),
+      {});
+
+  // `framework` will be offered all of `agent` resources
+  // because it is the only framework in the cluster.
+  Allocation expected = Allocation(
+      framework.id(),
+      {{ROLE, {{agent.id(), agent.resources()}}}});
+
+  Future<Allocation> allocation = allocations.get();
+  AWAIT_EXPECT_EQ(expected, allocation);
+
+  // Now `framework` declines the offer and sets a filter.
+  Duration filterTimeout = flags.allocation_interval*2;
+  Filters offerFilter;
+  offerFilter.set_refuse_seconds(filterTimeout.secs());
+
+  allocator->recoverResources(
+      framework.id(),
+      agent.id(),
+      allocation->resources.at(ROLE).at(agent.id()),
+      offerFilter);
+
+  // There should be no allocation due to the offer filter.
+  Clock::advance(flags.allocation_interval);
+  Clock::settle();
+
+  allocation = allocations.get();
+  EXPECT_TRUE(allocation.isPending());
+
+  // Update the slave with new attributes.
+  Attributes attributes = Attributes::parse("foo:bar;baz:quux");
+  *agent.mutable_attributes() = attributes;
+  allocator->updateSlave(
+      agent.id(),
+      agent,
+      agent.resources(),
+      AGENT_CAPABILITIES());
+
+  // Previously declined resources should be offered to the quota'ed role.
+  expected = Allocation(
+      framework.id(),
+      {{ROLE, {{agent.id(), agent.resources()}}}});
+
+  AWAIT_EXPECT_EQ(expected, allocation);
+}
+
+
 // This test uses `reviveOffers` to add allocation-triggering events
 // to the allocator queue in order to measure the impact of allocation
 // batching (MESOS-6904).