You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/14 19:58:45 UTC

[01/16] mesos git commit: Maintenance Primitives: Prevent Slave registration from DOWN machine.

Repository: mesos
Updated Branches:
  refs/heads/master 57385ec45 -> ce9c75d3e


Maintenance Primitives: Prevent Slave registration from DOWN machine.

Review: https://reviews.apache.org/r/37623


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ce9c75d3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ce9c75d3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ce9c75d3

Branch: refs/heads/master
Commit: ce9c75d3eefe370e0ca87a294e96c6d2ae6cb566
Parents: 147420e
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:32:46 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 include/mesos/type_utils.hpp           | 17 ++++++++++++
 include/mesos/v1/mesos.hpp             | 17 ++++++++++++
 src/master/master.cpp                  | 36 ++++++++++++++++++++++++++
 src/tests/master_maintenance_tests.cpp | 40 ++++++++++++++++++++++++++++-
 4 files changed, 109 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ce9c75d3/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index 64c2a86..6cedf07 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -319,6 +319,23 @@ inline std::ostream& operator<<(std::ostream& stream, const TaskID& taskId)
 }
 
 
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const MachineID& machineId)
+{
+  if (machineId.has_hostname() && machineId.has_ip()) {
+    return stream << machineId.hostname() << " (" << machineId.ip() << ")";
+  }
+
+  // If only a hostname is present.
+  if (machineId.has_hostname()) {
+    return stream << machineId.hostname();
+  } else { // If there is no hostname, then there is an IP.
+    return stream << "(" << machineId.ip() << ")";
+  }
+}
+
+
 inline std::ostream& operator<<(std::ostream& stream, const TaskInfo& task)
 {
   return stream << task.DebugString();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ce9c75d3/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index f8f9617..260e112 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -255,6 +255,23 @@ inline std::ostream& operator<<(
 }
 
 
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const MachineID& machineId)
+{
+  if (machineId.has_hostname() && machineId.has_ip()) {
+    return stream << machineId.hostname() << " (" << machineId.ip() << ")";
+  }
+
+  // If only a hostname is present.
+  if (machineId.has_hostname()) {
+    return stream << machineId.hostname();
+  } else { // If there is no hostname, then there is an IP.
+    return stream << "(" << machineId.ip() << ")";
+  }
+}
+
+
 inline std::ostream& operator<<(std::ostream& stream, const MasterInfo& master)
 {
   return stream << master.DebugString();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ce9c75d3/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 61236b3..f26271c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3746,6 +3746,24 @@ void Master::registerSlave(
     return;
   }
 
+  MachineID machineId;
+  machineId.set_hostname(slaveInfo.hostname());
+  machineId.set_ip(stringify(from.address.ip));
+
+  // Slaves are not allowed to register while the machine they are on is in
+  // `DOWN` mode.
+  if (machines.contains(machineId) &&
+      machines[machineId].info.mode() == MachineInfo::DOWN) {
+    LOG(WARNING) << "Refusing registration of slave at " << from
+                 << " because the machine '" << machineId << "' that it is "
+                 << "running on is `DOWN`";
+
+    ShutdownMessage message;
+    message.set_message("Machine is `DOWN`");
+    send(from, message);
+    return;
+  }
+
   // Check if this slave is already registered (because it retries).
   if (slaves.registered.contains(from)) {
     Slave* slave = slaves.registered.get(from);
@@ -3909,6 +3927,24 @@ void Master::reregisterSlave(
     return;
   }
 
+  MachineID machineId;
+  machineId.set_hostname(slaveInfo.hostname());
+  machineId.set_ip(stringify(from.address.ip));
+
+  // Slaves are not allowed to register while the machine they are on is in
+  // 'DOWN` mode.
+  if (machines.contains(machineId) &&
+      machines[machineId].info.mode() == MachineInfo::DOWN) {
+    LOG(WARNING) << "Refusing re-registration of slave at " << from
+                 << " because the machine '" << machineId << "' that it is "
+                 << "running on is `DOWN`";
+
+    ShutdownMessage message;
+    message.set_message("Machine is `DOWN`");
+    send(from, message);
+    return;
+  }
+
   if (slaves.removed.get(slaveInfo.id()).isSome()) {
     // To compensate for the case where a non-strict registrar is
     // being used, we explicitly deny removed slaves from

http://git-wip-us.apache.org/repos/asf/mesos/blob/ce9c75d3/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 6ae502d..4478505 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -661,7 +661,7 @@ TEST_F(MasterMaintenanceTest, EnterMaintenanceMode)
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
 
-  // Wait for the slave to be shut down.
+  // Wait for the slave to be told to shut down.
   AWAIT_READY(shutdownMessage);
 
   // Verify that we received a TASK_LOST.
@@ -671,6 +671,44 @@ TEST_F(MasterMaintenanceTest, EnterMaintenanceMode)
   // Verify that the framework received the slave lost message.
   AWAIT_READY(slaveLost);
 
+  // Wait on the agent to terminate so that it wipes out it's latest symlink.
+  // This way when we launch a new agent it will register with a new agent id.
+  wait(slave.get());
+
+  // Ensure that the slave gets shut down immediately if it tries to register
+  // from a machine that is under maintenance.
+  shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), master.get(), _);
+  EXPECT_TRUE(shutdownMessage.isPending());
+
+  slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(shutdownMessage);
+
+  // Wait on the agent to terminate so that it wipes out it's latest symlink.
+  // This way when we launch a new agent it will register with a new agent id.
+  wait(slave.get());
+
+  // Stop maintenance.
+  response =
+    process::http::post(
+        master.get(),
+        "machine/up",
+        headers,
+        stringify(JSON::Protobuf(createMachineList({machine}))));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  // Capture the registration message.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Start the agent again.
+  slave = StartSlave();
+
+  // Wait for agent registration.
+  AWAIT_READY(slaveRegisteredMessage);
+
   driver.stop();
   driver.join();
 


[12/16] mesos git commit: Maintenance Primitives: Added URL field to InverseOffer protobuf.

Posted by jo...@apache.org.
Maintenance Primitives: Added URL field to InverseOffer protobuf.

Review: https://reviews.apache.org/r/37234


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/388eaa5b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/388eaa5b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/388eaa5b

Branch: refs/heads/master
Commit: 388eaa5b133c4e1b4757a26c5e4afb84ad7bf08d
Parents: e6375f3
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:24:16 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/master/master.cpp | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/388eaa5b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 8ab5a03..52d5763 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4874,6 +4874,14 @@ void Master::inverseOffer(
       continue;
     }
 
+    // TODO(bmahler): Set "https" if only "https" is supported.
+    mesos::URL url;
+    url.set_scheme("http");
+    url.mutable_address()->set_hostname(slave->info.hostname());
+    url.mutable_address()->set_ip(stringify(slave->pid.address.ip));
+    url.mutable_address()->set_port(slave->pid.address.port);
+    url.set_path("/" + slave->pid.id);
+
     InverseOffer* inverseOffer = new InverseOffer();
 
     // We use the same id generator as regular offers so that we can have unique
@@ -4882,6 +4890,7 @@ void Master::inverseOffer(
     inverseOffer->mutable_id()->CopyFrom(newOfferId());
     inverseOffer->mutable_framework_id()->CopyFrom(framework->id());
     inverseOffer->mutable_slave_id()->CopyFrom(slave->id);
+    inverseOffer->mutable_url()->CopyFrom(url);
     inverseOffer->mutable_unavailability()->CopyFrom(
         unavailableResources.unavailability);
 


[09/16] mesos git commit: Maintenance Primitives: Added inverse offers.

Posted by jo...@apache.org.
Maintenance Primitives: Added inverse offers.

Review: https://reviews.apache.org/r/37177


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1de99f4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1de99f4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1de99f4

Branch: refs/heads/master
Commit: a1de99f42323d8eb1396fcd10884eaac32a93eab
Parents: 8e04258
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:41:21 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 include/mesos/maintenance/maintenance.hpp   |  15 +++
 include/mesos/master/allocator.hpp          |   6 +
 src/master/allocator/mesos/allocator.hpp    |  13 ++
 src/master/allocator/mesos/hierarchical.hpp |  99 ++++++++++++++
 src/master/master.cpp                       |   9 ++
 src/master/master.hpp                       |   4 +
 src/tests/hierarchical_allocator_tests.cpp  | 157 +++++++++++++----------
 src/tests/master_allocator_tests.cpp        |  32 ++---
 src/tests/mesos.hpp                         |  11 +-
 src/tests/reservation_endpoints_tests.cpp   |  20 +--
 src/tests/reservation_tests.cpp             |   4 +-
 src/tests/resource_offers_tests.cpp         |   2 +-
 src/tests/slave_recovery_tests.cpp          |   2 +-
 13 files changed, 270 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/include/mesos/maintenance/maintenance.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/maintenance/maintenance.hpp b/include/mesos/maintenance/maintenance.hpp
index 7fec3ff..f676d01 100644
--- a/include/mesos/maintenance/maintenance.hpp
+++ b/include/mesos/maintenance/maintenance.hpp
@@ -22,4 +22,19 @@
 // ONLY USEFUL AFTER RUNNING PROTOC.
 #include <mesos/maintenance/maintenance.pb.h>
 
+#include <mesos/resources.hpp>
+
+namespace mesos {
+
+// A wrapper for resources and unavailability used to communicate between the
+// Allocator and Master in order to let the Master create InverseOffers from the
+// Allocator.
+struct UnavailableResources
+{
+  Resources resources;
+  Unavailability unavailability;
+};
+
+} // namespace mesos {
+
 #endif // __MAINTENANCE_PROTO_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index b5bfc28..18d31ef 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -25,6 +25,8 @@
 // ONLY USEFUL AFTER RUNNING PROTOC.
 #include <mesos/master/allocator.pb.h>
 
+#include <mesos/maintenance/maintenance.hpp>
+
 #include <mesos/resources.hpp>
 
 #include <process/future.hpp>
@@ -67,6 +69,10 @@ public:
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, Resources>&)>& offerCallback,
+      const lambda::function<
+          void(const FrameworkID&,
+               const hashmap<SlaveID, UnavailableResources>&)>&
+        inverseOfferCallback,
       const hashmap<std::string, RoleInfo>& roles) = 0;
 
   virtual void addFramework(

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index ee6ec58..124dd3d 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -52,6 +52,10 @@ public:
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, Resources>&)>& offerCallback,
+      const lambda::function<
+          void(const FrameworkID&,
+               const hashmap<SlaveID, UnavailableResources>&)>&
+        inverseOfferCallback,
       const hashmap<std::string, mesos::master::RoleInfo>& roles);
 
   void addFramework(
@@ -147,6 +151,10 @@ public:
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, Resources>&)>& offerCallback,
+      const lambda::function<
+          void(const FrameworkID&,
+               const hashmap<SlaveID, UnavailableResources>&)>&
+        inverseOfferCallback,
       const hashmap<std::string, mesos::master::RoleInfo>& roles) = 0;
 
   virtual void addFramework(
@@ -250,6 +258,10 @@ inline void MesosAllocator<AllocatorProcess>::initialize(
     const lambda::function<
         void(const FrameworkID&,
              const hashmap<SlaveID, Resources>&)>& offerCallback,
+    const lambda::function<
+        void(const FrameworkID&,
+              const hashmap<SlaveID, UnavailableResources>&)>&
+      inverseOfferCallback,
     const hashmap<std::string, mesos::master::RoleInfo>& roles)
 {
   process::dispatch(
@@ -257,6 +269,7 @@ inline void MesosAllocator<AllocatorProcess>::initialize(
       &MesosAllocatorProcess::initialize,
       allocationInterval,
       offerCallback,
+      inverseOfferCallback,
       roles);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 77a5b4c..8ae7475 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -90,6 +90,10 @@ public:
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, Resources>&)>& offerCallback,
+      const lambda::function<
+          void(const FrameworkID&,
+               const hashmap<SlaveID, UnavailableResources>&)>&
+        inverseOfferCallback,
       const hashmap<std::string, mesos::master::RoleInfo>& roles);
 
   void addFramework(
@@ -176,6 +180,9 @@ protected:
   // Allocate resources from the specified slaves.
   void allocate(const hashset<SlaveID>& slaveIds);
 
+  // Send inverse offers from the specified slaves.
+  void deallocate(const hashset<SlaveID>& slaveIds);
+
   // Remove a filter for the specified framework.
   void expire(
       const FrameworkID& frameworkId,
@@ -202,6 +209,10 @@ protected:
       void(const FrameworkID&,
            const hashmap<SlaveID, Resources>&)> offerCallback;
 
+  lambda::function<
+      void(const FrameworkID&,
+           const hashmap<SlaveID, UnavailableResources>&)> inverseOfferCallback;
+
   struct Metrics
   {
     explicit Metrics(const Self& process)
@@ -366,10 +377,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
     const lambda::function<
         void(const FrameworkID&,
              const hashmap<SlaveID, Resources>&)>& _offerCallback,
+    const lambda::function<
+        void(const FrameworkID&,
+             const hashmap<SlaveID, UnavailableResources>&)>&
+      _inverseOfferCallback,
     const hashmap<std::string, mesos::master::RoleInfo>& _roles)
 {
   allocationInterval = _allocationInterval;
   offerCallback = _offerCallback;
+  inverseOfferCallback = _inverseOfferCallback;
   roles = _roles;
   initialized = true;
 
@@ -1086,6 +1102,89 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
       offerCallback(frameworkId, offerable[frameworkId]);
     }
   }
+
+  // NOTE: For now, we implement maintenance inverse offers within the
+  // allocator. We leverage the existing timer/cycle of offers to also do any
+  // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
+  deallocate(slaveIds_);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate(
+    const hashset<SlaveID>& slaveIds_)
+{
+  if (frameworkSorters.empty()) {
+    LOG(ERROR) << "No frameworks specified, cannot send inverse offers!";
+    return;
+  }
+
+  // In this case, `offerable` is actually the slaves and/or resources that we
+  // want the master to create `InverseOffer`s from.
+  hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable;
+
+  // For maintenance, we use the framework sorters to determine which frameworks
+  // have (1) reserved and / or (2) unreserved resource on the specified
+  // slaveIds. This way we only send inverse offers to frameworks that have the
+  // potential to lose something. We keep track of which frameworks already have
+  // an outstanding inverse offer for the given slave in the
+  // UnavailabilityStatus of the specific slave using the `offerOutstanding`
+  // flag. This is equivalent to the accounting we do for resources when we send
+  // regular offers. If we didn't keep track of outstanding offers then we would
+  // keep generating new inverse offers even though the framework had not
+  // responded yet.
+
+  foreachvalue (FrameworkSorter* frameworkSorter, frameworkSorters) {
+    foreach (const SlaveID& slaveId, slaveIds_) {
+      CHECK(slaves.contains(slaveId));
+
+      if (slaves[slaveId].maintenance.isSome()) {
+        // We use a reference by alias because we intend to modify the
+        // `maintenance` and to improve readability.
+        typename Slave::Maintenance& maintenance =
+          slaves[slaveId].maintenance.get();
+
+        hashmap<std::string, Resources> allocation =
+          frameworkSorter->allocation(slaveId);
+
+        foreachkey (const std::string& frameworkId_, allocation) {
+          FrameworkID frameworkId;
+          frameworkId.set_value(frameworkId_);
+
+          // If this framework doesn't already have inverse offers for the
+          // specified slave.
+          if (!offerable[frameworkId].contains(slaveId)) {
+            // If there isn't already an outstanding inverse offer to this
+            // framework for the specified slave.
+            if (!maintenance.offersOutstanding.contains(frameworkId)) {
+              // For now we send inverse offers with empty resources when the
+              // inverse offer represents maintenance on the machine. In the
+              // future we could be more specific about the resources on the
+              // host, as we have the information available.
+              offerable[frameworkId][slaveId] =
+                UnavailableResources{
+                    Resources(),
+                    maintenance.unavailability};
+
+              // Mark this framework as having an offer oustanding for the
+              // specified slave.
+              maintenance.offersOutstanding.insert(frameworkId);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  if (offerable.empty()) {
+    VLOG(1) << "No inverse offers to send out!";
+  } else {
+    // Now send inverse offers to each framework.
+    foreachkey (const FrameworkID& frameworkId, offerable) {
+      inverseOfferCallback(frameworkId, offerable[frameworkId]);
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0b3ba56..8471735 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -621,6 +621,7 @@ void Master::initialize()
   allocator->initialize(
       flags.allocation_interval,
       defer(self(), &Master::offer, lambda::_1, lambda::_2),
+      defer(self(), &Master::inverseOffer, lambda::_1, lambda::_2),
       roleInfos);
 
   // Parse the whitelist. Passing Allocator::updateWhitelist()
@@ -4781,6 +4782,14 @@ void Master::offer(const FrameworkID& frameworkId,
 }
 
 
+void Master::inverseOffer(
+    const FrameworkID& frameworkId,
+    const hashmap<SlaveID, UnavailableResources>& resources)
+{
+  // TODO(jmlvanre): Implement this function.
+}
+
+
 // TODO(vinod): If due to network partition there are two instances
 // of the framework that think they are leaders and try to
 // authenticate with master they would be stepping on each other's

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index cd71a25..1ba0837 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -491,6 +491,10 @@ public:
       const FrameworkID& framework,
       const hashmap<SlaveID, Resources>& resources);
 
+  void inverseOffer(
+      const FrameworkID& framework,
+      const hashmap<SlaveID, UnavailableResources>& resources);
+
   // Invoked when there is a newly elected leading master.
   // Made public for testing purposes.
   void detected(const process::Future<Option<MasterInfo>>& pid);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 0a24b6b..2f37c98 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -79,6 +79,13 @@ struct Allocation
 };
 
 
+struct Deallocation
+{
+  FrameworkID frameworkId;
+  hashmap<SlaveID, UnavailableResources> resources;
+};
+
+
 class HierarchicalAllocatorTestBase : public ::testing::Test
 {
 protected:
@@ -95,9 +102,13 @@ protected:
   void initialize(
       const vector<string>& _roles,
       const master::Flags& _flags = master::Flags(),
-      const Option<lambda::function<
+      Option<lambda::function<
           void(const FrameworkID&,
-               const hashmap<SlaveID, Resources>&)>>& offerCallback = None())
+               const hashmap<SlaveID, Resources>&)>> offerCallback = None(),
+      Option<lambda::function<
+          void(const FrameworkID&,
+               const hashmap<SlaveID, UnavailableResources>&)>>
+                 inverseOfferCallback = None())
   {
     flags = _flags;
 
@@ -111,17 +122,35 @@ protected:
       roles[role] = info;
     }
 
-    if (offerCallback.isSome()) {
-      allocator->initialize(
-          flags.allocation_interval,
-          offerCallback.get(),
-          roles);
-    } else {
-      allocator->initialize(
-          flags.allocation_interval,
-          lambda::bind(&put, &queue, lambda::_1, lambda::_2),
-          roles);
+    if (offerCallback.isNone()) {
+      offerCallback =
+        [this](const FrameworkID& frameworkId,
+               const hashmap<SlaveID, Resources>& resources) {
+          Allocation allocation;
+          allocation.frameworkId = frameworkId;
+          allocation.resources = resources;
+
+          allocations.put(allocation);
+        };
     }
+
+    if (inverseOfferCallback.isNone()) {
+      inverseOfferCallback =
+        [this](const FrameworkID& frameworkId,
+               const hashmap<SlaveID, UnavailableResources>& resources) {
+          Deallocation deallocation;
+          deallocation.frameworkId = frameworkId;
+          deallocation.resources = resources;
+
+          deallocations.put(deallocation);
+        };
+    }
+
+    allocator->initialize(
+        flags.allocation_interval,
+        offerCallback.get(),
+        inverseOfferCallback.get(),
+        roles);
   }
 
   SlaveInfo createSlaveInfo(const string& resources)
@@ -158,25 +187,13 @@ protected:
     return resource;
   }
 
-private:
-  static void put(
-      process::Queue<Allocation>* queue,
-      const FrameworkID& frameworkId,
-      const hashmap<SlaveID, Resources>& resources)
-  {
-    Allocation allocation;
-    allocation.frameworkId = frameworkId;
-    allocation.resources = resources;
-
-    queue->put(allocation);
-  }
-
 protected:
   master::Flags flags;
 
   Allocator* allocator;
 
-  process::Queue<Allocation> queue;
+  process::Queue<Allocation> allocations;
+  process::Queue<Deallocation> deallocations;
 
   hashmap<string, RoleInfo> roles;
 
@@ -224,7 +241,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   allocator->addFramework(
       framework1.id(), framework1, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave1.resources(), Resources::sum(allocation.get().resources));
@@ -246,7 +263,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
 
   // framework2 will be offered all of slave2's resources since role2
   // has the lowest user share, and framework2 is its only framework.
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources));
@@ -266,7 +283,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
 
   // framework2 will be offered all of slave3's resources since role2
   // has the lowest share.
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources));
@@ -292,7 +309,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // framework3 will be offered all of slave4's resources since role1
   // has the lowest user share, and framework3 has the lowest share of
   // role1's frameworks.
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework3.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources));
@@ -319,7 +336,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
 
   // Even though framework4 doesn't have any resources, role2 has a
   // lower share than role1, so framework2 receives slave5's resources.
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave5.resources(), Resources::sum(allocation.get().resources));
@@ -350,7 +367,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
   allocator->addFramework(
       framework1.id(), framework1, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave1.resources(), Resources::sum(allocation.get().resources));
@@ -363,7 +380,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
   SlaveInfo slave2 = createSlaveInfo("cpus:2;mem:512;disk:0");
   allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources));
@@ -375,7 +392,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
   SlaveInfo slave3 = createSlaveInfo("cpus:2;mem:512;disk:0");
   allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY);
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources));
@@ -392,7 +409,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
       "cpus(role1):2;mem(role1):1024;disk(role1):0");
   allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY);
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework3.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources));
@@ -425,7 +442,7 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
   allocator->addFramework(
       framework1.id(), framework1, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
   EXPECT_EQ(slave1.resources() + slave2.resources(),
@@ -448,28 +465,28 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
   allocator->addFramework(
       framework2.id(), framework2, hashmap<SlaveID, Resources>());
 
-  hashmap<FrameworkID, Allocation> allocations;
+  hashmap<FrameworkID, Allocation> frameworkAllocations;
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
-  allocations[allocation.get().frameworkId] = allocation.get();
+  frameworkAllocations[allocation.get().frameworkId] = allocation.get();
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
-  allocations[allocation.get().frameworkId] = allocation.get();
+  frameworkAllocations[allocation.get().frameworkId] = allocation.get();
 
   // Note that slave1 and slave2 have the same resources, we don't
   // care which framework received which slave.. only that they each
   // received one.
-  ASSERT_TRUE(allocations.contains(framework1.id()));
-  ASSERT_EQ(1u, allocations[framework1.id()].resources.size());
+  ASSERT_TRUE(frameworkAllocations.contains(framework1.id()));
+  ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size());
   EXPECT_EQ(slave1.resources(),
-            Resources::sum(allocations[framework1.id()].resources));
+            Resources::sum(frameworkAllocations[framework1.id()].resources));
 
-  ASSERT_TRUE(allocations.contains(framework2.id()));
-  ASSERT_EQ(1u, allocations[framework1.id()].resources.size());
+  ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
+  ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size());
   EXPECT_EQ(slave2.resources(),
-            Resources::sum(allocations[framework1.id()].resources));
+            Resources::sum(frameworkAllocations[framework1.id()].resources));
 }
 
 
@@ -501,7 +518,7 @@ TEST_F(HierarchicalAllocatorTest, SameShareFairness)
   hashmap<FrameworkID, size_t> counts;
 
   for (int i = 0; i < 10; i++) {
-    Future<Allocation> allocation = queue.get();
+    Future<Allocation> allocation = allocations.get();
     AWAIT_READY(allocation);
     counts[allocation.get().frameworkId]++;
 
@@ -552,7 +569,7 @@ TEST_F(HierarchicalAllocatorTest, Reservations)
   allocator->addFramework(
       framework1.id(), framework1, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
   EXPECT_EQ(2u, allocation.get().resources.size());
@@ -566,7 +583,7 @@ TEST_F(HierarchicalAllocatorTest, Reservations)
   allocator->addFramework(
       framework2.id(), framework2, hashmap<SlaveID, Resources>());
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -595,7 +612,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
   allocator->addFramework(
       framework1.id(), framework1, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -613,7 +630,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
 
   Clock::advance(flags.allocation_interval);
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -631,7 +648,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
 
   Clock::advance(flags.allocation_interval);
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -669,7 +686,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
       "disk:128");
   allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -683,7 +700,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
       "disk:128");
   allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY);
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -700,7 +717,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
       "disk:128");
   allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY);
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -726,7 +743,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
   allocator->addFramework(
       framework.id(), framework, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -764,7 +781,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
 
   Clock::advance(flags.allocation_interval);
 
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -809,7 +826,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess)
   allocator->addFramework(
       framework.id(), framework, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -843,7 +860,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail)
   allocator->addFramework(
       framework.id(), framework, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(framework.id(), allocation.get().frameworkId);
   EXPECT_EQ(1u, allocation.get().resources.size());
@@ -884,7 +901,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
       framework.id(), framework, hashmap<SlaveID, Resources>());
 
   // Initially, all the resources are allocated.
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
 
@@ -893,7 +910,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
   allocator->updateSlave(slave.id(), oversubscribed);
 
   // The next allocation should be for 10 oversubscribed resources.
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources));
 
@@ -902,7 +919,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
   allocator->updateSlave(slave.id(), oversubscribed2);
 
   // The next allocation should be for 2 oversubscribed cpus.
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(oversubscribed2 - oversubscribed,
             Resources::sum(allocation.get().resources));
@@ -914,7 +931,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
   // Since there are no more available oversubscribed resources there
   // shouldn't be an allocation.
   Clock::settle();
-  allocation = queue.get();
+  allocation = allocations.get();
   ASSERT_TRUE(allocation.isPending());
 }
 
@@ -938,7 +955,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
       framework.id(), framework, hashmap<SlaveID, Resources>());
 
   // Initially, all the resources are allocated.
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
 
@@ -949,7 +966,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
   // No allocation should be made for oversubscribed resources because
   // the framework has not opted in for them.
   Clock::settle();
-  allocation = queue.get();
+  allocation = allocations.get();
   ASSERT_TRUE(allocation.isPending());
 }
 
@@ -976,7 +993,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
       framework.id(), framework, hashmap<SlaveID, Resources>());
 
   // Initially, all the resources are allocated.
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
 
@@ -985,7 +1002,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
   allocator->updateSlave(slave.id(), oversubscribed);
 
   // The next allocation should be for 10 oversubscribed cpus.
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources));
 
@@ -999,7 +1016,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
 
   // The next allocation should be for 6 oversubscribed and 2 regular
   // cpus.
-  allocation = queue.get();
+  allocation = allocations.get();
   AWAIT_READY(allocation);
   EXPECT_EQ(recovered, Resources::sum(allocation.get().resources));
 }
@@ -1028,7 +1045,7 @@ TEST_F(HierarchicalAllocatorTest, Whitelist)
   allocator->addFramework(
       framework.id(), framework, hashmap<SlaveID, Resources>());
 
-  Future<Allocation> allocation = queue.get();
+  Future<Allocation> allocation = allocations.get();
 
   // Ensure a batch allocation is triggered.
   Clock::advance(flags.allocation_interval);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index c6a419b..1fe3757 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -95,7 +95,7 @@ TYPED_TEST(MasterAllocatorTest, SingleFramework)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = this->StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -141,7 +141,7 @@ TYPED_TEST(MasterAllocatorTest, ResourcesUnused)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = this->StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -246,7 +246,7 @@ TYPED_TEST(MasterAllocatorTest, OutOfOrderDispatch)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = this->StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -374,7 +374,7 @@ TYPED_TEST(MasterAllocatorTest, SchedulerFailover)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = this->StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -496,7 +496,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkExited)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   master::Flags masterFlags = this->CreateMasterFlags();
   masterFlags.allocation_interval = Milliseconds(50);
@@ -642,7 +642,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = this->StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -757,7 +757,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveAdded)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   master::Flags masterFlags = this->CreateMasterFlags();
   masterFlags.allocation_interval = Milliseconds(50);
@@ -851,7 +851,7 @@ TYPED_TEST(MasterAllocatorTest, TaskFinished)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   master::Flags masterFlags = this->CreateMasterFlags();
   masterFlags.allocation_interval = Milliseconds(50);
@@ -952,7 +952,7 @@ TYPED_TEST(MasterAllocatorTest, CpusOnlyOfferedAndTaskLaunched)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   master::Flags masterFlags = this->CreateMasterFlags();
   masterFlags.allocation_interval = Milliseconds(50);
@@ -1030,7 +1030,7 @@ TYPED_TEST(MasterAllocatorTest, MemoryOnlyOfferedAndTaskLaunched)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   master::Flags masterFlags = this->CreateMasterFlags();
   masterFlags.allocation_interval = Milliseconds(50);
@@ -1121,7 +1121,7 @@ TYPED_TEST(MasterAllocatorTest, Whitelist)
 
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Future<Nothing> updateWhitelist1;
   EXPECT_CALL(allocator, updateWhitelist(Option<hashset<string>>(hosts)))
@@ -1161,7 +1161,7 @@ TYPED_TEST(MasterAllocatorTest, RoleTest)
 {
   TestAllocator<TypeParam> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   master::Flags masterFlags = this->CreateMasterFlags();
   masterFlags.roles = Some("role2");
@@ -1253,7 +1253,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst)
   {
     TestAllocator<TypeParam> allocator;
 
-    EXPECT_CALL(allocator, initialize(_, _, _));
+    EXPECT_CALL(allocator, initialize(_, _, _, _));
 
     Try<PID<Master>> master = this->StartMaster(&allocator);
     ASSERT_SOME(master);
@@ -1311,7 +1311,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst)
   {
     TestAllocator<TypeParam> allocator2;
 
-    EXPECT_CALL(allocator2, initialize(_, _, _));
+    EXPECT_CALL(allocator2, initialize(_, _, _, _));
 
     Future<Nothing> addFramework;
     EXPECT_CALL(allocator2, addFramework(_, _, _))
@@ -1378,7 +1378,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst)
   {
     TestAllocator<TypeParam> allocator;
 
-    EXPECT_CALL(allocator, initialize(_, _, _));
+    EXPECT_CALL(allocator, initialize(_, _, _, _));
 
     Try<PID<Master>> master = this->StartMaster(&allocator);
     ASSERT_SOME(master);
@@ -1435,7 +1435,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst)
   {
     TestAllocator<TypeParam> allocator2;
 
-    EXPECT_CALL(allocator2, initialize(_, _, _));
+    EXPECT_CALL(allocator2, initialize(_, _, _, _));
 
     Future<Nothing> addSlave;
     EXPECT_CALL(allocator2, addSlave(_, _, _, _, _))

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 477b7e4..858618f 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1265,7 +1265,7 @@ public:
 
 ACTION_P(InvokeInitialize, allocator)
 {
-  allocator->real->initialize(arg0, arg1, arg2);
+  allocator->real->initialize(arg0, arg1, arg2, arg3);
 }
 
 
@@ -1407,9 +1407,9 @@ public:
     // to get the best of both worlds: the ability to use 'DoDefault'
     // and no warnings when expectations are not explicit.
 
-    ON_CALL(*this, initialize(_, _, _))
+    ON_CALL(*this, initialize(_, _, _, _))
       .WillByDefault(InvokeInitialize(this));
-    EXPECT_CALL(*this, initialize(_, _, _))
+    EXPECT_CALL(*this, initialize(_, _, _, _))
       .WillRepeatedly(DoDefault());
 
     ON_CALL(*this, addFramework(_, _, _))
@@ -1500,11 +1500,14 @@ public:
 
   virtual ~TestAllocator() {}
 
-  MOCK_METHOD3(initialize, void(
+  MOCK_METHOD4(initialize, void(
       const Duration&,
       const lambda::function<
           void(const FrameworkID&,
                const hashmap<SlaveID, Resources>&)>&,
+      const lambda::function<
+          void(const FrameworkID&,
+               const hashmap<SlaveID, UnavailableResources>&)>&,
       const hashmap<std::string, mesos::master::RoleInfo>&));
 
   MOCK_METHOD3(addFramework, void(

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index 572a8d6..398a2e1 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -132,7 +132,7 @@ TEST_F(ReservationEndpointsTest, AvailableResources)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -225,7 +225,7 @@ TEST_F(ReservationEndpointsTest, ReserveOfferedResources)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -299,7 +299,7 @@ TEST_F(ReservationEndpointsTest, UnreserveOfferedResources)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -381,7 +381,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   master::Flags masterFlags = CreateMasterFlags();
   // Turn off allocation. We're doing it manually.
@@ -527,7 +527,7 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
   // Turn off allocation. We're doing it manually.
   masterFlags.allocation_interval = Seconds(1000);
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator, masterFlags);
   ASSERT_SOME(master);
@@ -678,7 +678,7 @@ TEST_F(ReservationEndpointsTest, InsufficientResources)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -720,7 +720,7 @@ TEST_F(ReservationEndpointsTest, NoHeader)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -770,7 +770,7 @@ TEST_F(ReservationEndpointsTest, BadCredentials)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -847,7 +847,7 @@ TEST_F(ReservationEndpointsTest, NoResources)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator);
   ASSERT_SOME(master);
@@ -882,7 +882,7 @@ TEST_F(ReservationEndpointsTest, NonMatchingPrincipal)
 {
   TestAllocator<> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator);
   ASSERT_SOME(master);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index 91fcf0d..6b7c43c 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -410,7 +410,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
   masterFlags.allocation_interval = Milliseconds(50);
   masterFlags.roles = frameworkInfo.role();
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator, masterFlags);
   ASSERT_SOME(master);
@@ -501,7 +501,7 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
   masterFlags.allocation_interval = Milliseconds(50);
   masterFlags.roles = frameworkInfo.role();
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master>> master = StartMaster(&allocator, masterFlags);
   ASSERT_SOME(master);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index 882a9ff..af40a07 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -283,7 +283,7 @@ TEST_F(ResourceOffersTest, Request)
 {
   TestAllocator<master::allocator::HierarchicalDRFAllocator> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _))
+  EXPECT_CALL(allocator, initialize(_, _, _, _))
     .Times(1);
 
   Try<PID<Master>> master = StartMaster(&allocator);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index b636986..dd8f823 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2143,7 +2143,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
 {
   TestAllocator<master::allocator::HierarchicalDRFAllocator> allocator;
 
-  EXPECT_CALL(allocator, initialize(_, _, _));
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
 
   Try<PID<Master> > master = this->StartMaster(&allocator);
   ASSERT_SOME(master);


[11/16] mesos git commit: Maintenance Primitives: Added InverseOffer to V1 API.

Posted by jo...@apache.org.
Maintenance Primitives: Added InverseOffer to V1 API.

Review: https://reviews.apache.org/r/37282


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6c568bac
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6c568bac
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6c568bac

Branch: refs/heads/master
Commit: 6c568bacea42f251bc68526a642533fe95e7bcf3
Parents: c702a2c
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:23:37 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/internal/devolve.cpp    | 6 ++++++
 src/internal/devolve.hpp    | 1 +
 src/internal/evolve.cpp     | 8 ++++++++
 src/internal/evolve.hpp     | 1 +
 src/messages/messages.proto | 8 ++++++++
 5 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6c568bac/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 0a069e5..6cace66 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -99,6 +99,12 @@ Offer devolve(const v1::Offer& offer)
 }
 
 
+InverseOffer devolve(const v1::InverseOffer& inverseOffer)
+{
+  return devolve<InverseOffer>(inverseOffer);
+}
+
+
 Credential devolve(const v1::Credential& credential)
 {
   return devolve<Credential>(credential);

http://git-wip-us.apache.org/repos/asf/mesos/blob/6c568bac/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 6e4306d..f03cac5 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -41,6 +41,7 @@ FrameworkID devolve(const v1::FrameworkID& frameworkId);
 FrameworkInfo devolve(const v1::FrameworkInfo& frameworkInfo);
 ExecutorID devolve(const v1::ExecutorID& executorId);
 Offer devolve(const v1::Offer& offer);
+InverseOffer devolve(const v1::InverseOffer& inverseOffer);
 Credential devolve(const v1::Credential& credential);
 
 scheduler::Call devolve(const v1::scheduler::Call& call);

http://git-wip-us.apache.org/repos/asf/mesos/blob/6c568bac/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index 11ce9e7..625706e 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -92,6 +92,12 @@ v1::Offer evolve(const Offer& offer)
 }
 
 
+v1::InverseOffer evolve(const InverseOffer& inverseOffer)
+{
+  return evolve<v1::InverseOffer>(inverseOffer);
+}
+
+
 v1::OfferID evolve(const OfferID& offerId)
 {
   return evolve<v1::OfferID>(offerId);
@@ -153,6 +159,8 @@ v1::scheduler::Event evolve(const ResourceOffersMessage& message)
 
   v1::scheduler::Event::Offers* offers = event.mutable_offers();
   offers->mutable_offers()->CopyFrom(evolve<v1::Offer>(message.offers()));
+  offers->mutable_inverse_offers()->CopyFrom(evolve<v1::InverseOffer>(
+      message.inverse_offers()));
 
   return event;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6c568bac/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index 13e9f52..9babac3 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -42,6 +42,7 @@ v1::AgentInfo evolve(const SlaveInfo& slaveInfo);
 v1::FrameworkID evolve(const FrameworkID& frameworkId);
 v1::ExecutorID evolve(const ExecutorID& executorId);
 v1::Offer evolve(const Offer& offer);
+v1::InverseOffer evolve(const InverseOffer& inverseOffer);
 v1::OfferID evolve(const OfferID& offerId);
 v1::TaskInfo evolve(const TaskInfo& taskInfo);
 v1::TaskStatus evolve(const TaskStatus& status);

http://git-wip-us.apache.org/repos/asf/mesos/blob/6c568bac/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 8977d8e..ea9a67e 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -168,6 +168,14 @@ message ResourceRequestMessage {
 message ResourceOffersMessage {
   repeated Offer offers = 1;
   repeated string pids = 2;
+  
+  // The `inverse_offers` field is added here because we currently use it in
+  // `master.cpp` when constructing the message to send to schedulers. We use
+  // the original version of the proto API until we do a full refactor of all
+  // the messages being sent.
+  // It is not fully implemented in the old scheduler; only the V1 scheduler
+  // currently implements inverse offers.
+  repeated InverseOffer inverse_offers = 3;
 }
 
 


[03/16] mesos git commit: Maintenance Primitives: Implemented Master::inverseOffer.

Posted by jo...@apache.org.
Maintenance Primitives: Implemented Master::inverseOffer.

Review: https://reviews.apache.org/r/37180


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e6375f31
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e6375f31
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e6375f31

Branch: refs/heads/master
Commit: e6375f319914741c652bca7c9b97049e81828f5e
Parents: 42f9ce5
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:24:03 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/master/master.cpp | 225 ++++++++++++++++++++++++++++++++++++++++++++-
 src/master/master.hpp |  46 +++++++++
 2 files changed, 268 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e6375f31/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 8471735..8ab5a03 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -953,6 +953,13 @@ void Master::finalize()
       removeOffer(offer);
     }
 
+    // Remove inverse offers.
+    foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+      // We don't need to update the allocator because the slave has already
+      // been removed.
+      removeInverseOffer(inverseOffer);
+    }
+
     // Terminate the slave observer.
     terminate(slave->observer);
     wait(slave->observer);
@@ -978,12 +985,14 @@ void Master::finalize()
     CHECK(framework->tasks.empty());
     CHECK(framework->executors.empty());
     CHECK(framework->offers.empty());
+    CHECK(framework->inverseOffers.empty());
 
     delete framework;
   }
   frameworks.registered.clear();
 
   CHECK(offers.empty());
+  CHECK(inverseOffers.empty());
 
   foreachvalue (Future<Option<string>> future, authenticating) {
     // NOTE: This is necessary during tests because a copy of
@@ -2323,6 +2332,17 @@ void Master::_subscribe(
         removeOffer(offer, true); // Rescind.
       }
 
+      // Also remove inverse offers.
+      foreach (InverseOffer* inverseOffer,
+               utils::copy(framework->inverseOffers)) {
+        allocator->updateInverseOffer(
+            inverseOffer->slave_id(),
+            inverseOffer->framework_id(),
+            None());
+
+        removeInverseOffer(inverseOffer, true); // Rescind.
+      }
+
       // TODO(bmahler): Shouldn't this re-link with the scheduler?
       framework->connected = true;
 
@@ -2474,8 +2494,19 @@ void Master::deactivate(Framework* framework)
   foreach (Offer* offer, utils::copy(framework->offers)) {
     allocator->recoverResources(
         offer->framework_id(), offer->slave_id(), offer->resources(), None());
+
     removeOffer(offer, true); // Rescind.
   }
+
+  // Remove the framework's inverse offers.
+  foreach (InverseOffer* inverseOffer, utils::copy(framework->inverseOffers)) {
+    allocator->updateInverseOffer(
+        inverseOffer->slave_id(),
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer, true); // Rescind.
+  }
 }
 
 
@@ -2515,6 +2546,16 @@ void Master::deactivate(Slave* slave)
 
     removeOffer(offer, true); // Rescind!
   }
+
+  // Remove and rescind inverse offers.
+  foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+    allocator->updateInverseOffer(
+        slave->id,
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer, true); // Rescind!
+  }
 }
 
 
@@ -4100,8 +4141,7 @@ void Master::updateSlave(
             << " oversubscribed resources " <<  oversubscribedResources;
 
   // First, rescind any outstanding offers with revocable resources.
-  // NOTE: Need a copy of offers because the offers are removed inside
-  // the loop.
+  // NOTE: Need a copy of offers because the offers are removed inside the loop.
   foreach (Offer* offer, utils::copy(slave->offers)) {
     const Resources offered = offer->resources();
     if (!offered.revocable().empty()) {
@@ -4116,6 +4156,9 @@ void Master::updateSlave(
     }
   }
 
+  // NOTE: We don't need to rescind inverse offers here as they are unrelated to
+  // oversubscription.
+
   slave->totalResources -= slave->totalResources.revocable();
   slave->totalResources += oversubscribedResources.revocable();
 
@@ -4166,6 +4209,17 @@ void Master::updateUnavailability(
         removeOffer(offer, true); // Rescind!
       }
 
+      // Remove and rescind inverse offers since the allocator will send new
+      // inverse offers for the updated unavailability.
+      foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+        allocator->updateInverseOffer(
+            slave->id,
+            inverseOffer->framework_id(),
+            None());
+
+        removeInverseOffer(inverseOffer, true); // Rescind!
+      }
+
       // We remove / resind all the offers first so that any calls to the
       // allocator to modify its internal state are queued before the update of
       // the unavailability in the allocator. We do this so that the allocator's
@@ -4786,7 +4840,80 @@ void Master::inverseOffer(
     const FrameworkID& frameworkId,
     const hashmap<SlaveID, UnavailableResources>& resources)
 {
-  // TODO(jmlvanre): Implement this function.
+  if (!frameworks.registered.contains(frameworkId) ||
+      !frameworks.registered[frameworkId]->active) {
+    LOG(INFO) << "Master ignoring inverse offers to framework " << frameworkId
+              << " because the framework has terminated or is inactive";
+    return;
+  }
+
+  // Create an inverse offer for each slave and add it to the message.
+  ResourceOffersMessage message;
+
+  Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkId]);
+  foreachpair (const SlaveID& slaveId,
+               const UnavailableResources& unavailableResources,
+               resources) {
+    if (!slaves.registered.contains(slaveId)) {
+      LOG(INFO)
+        << "Master ignoring inverse offers to framework " << *framework
+        << " because slave " << slaveId << " is not valid";
+      continue;
+    }
+
+    Slave* slave = slaves.registered.get(slaveId);
+    CHECK_NOTNULL(slave);
+
+    // This could happen if the allocator dispatched 'Master::inverseOffer'
+    // before the slave was deactivated in the allocator.
+    if (!slave->active) {
+      LOG(INFO)
+        << "Master ignoring inverse offers because slave " << *slave
+        << " is " << (slave->connected ? "deactivated" : "disconnected");
+
+      continue;
+    }
+
+    InverseOffer* inverseOffer = new InverseOffer();
+
+    // We use the same id generator as regular offers so that we can have unique
+    // ids accross both. This way we can re-use some of the `OfferID` only
+    // messages.
+    inverseOffer->mutable_id()->CopyFrom(newOfferId());
+    inverseOffer->mutable_framework_id()->CopyFrom(framework->id());
+    inverseOffer->mutable_slave_id()->CopyFrom(slave->id);
+    inverseOffer->mutable_unavailability()->CopyFrom(
+        unavailableResources.unavailability);
+
+    inverseOffers[inverseOffer->id()] = inverseOffer;
+
+    framework->addInverseOffer(inverseOffer);
+    slave->addInverseOffer(inverseOffer);
+
+    // TODO(jmlvanre): Do we want a separate flag for inverse offer
+    // timeout?
+    if (flags.offer_timeout.isSome()) {
+      // Rescind the inverse offer after the timeout elapses.
+      inverseOfferTimers[inverseOffer->id()] =
+        delay(flags.offer_timeout.get(),
+              self(),
+              &Self::inverseOfferTimeout,
+              inverseOffer->id());
+    }
+
+    // Add the inverse offer *AND* the corresponding slave's PID.
+    message.add_inverse_offers()->CopyFrom(*inverseOffer);
+    message.add_pids(slave->pid);
+  }
+
+  if (message.inverse_offers().size() == 0) {
+    return;
+  }
+
+  LOG(INFO) << "Sending " << message.inverse_offers().size()
+            << " inverse offers to framework " << *framework;
+
+  framework->send(message);
 }
 
 
@@ -5244,12 +5371,24 @@ void Master::_failoverFramework(Framework* framework)
   foreach (Offer* offer, utils::copy(framework->offers)) {
     allocator->recoverResources(
         offer->framework_id(), offer->slave_id(), offer->resources(), None());
+
     removeOffer(offer);
   }
 
+  // Also remove the inverse offers.
+  foreach (InverseOffer* inverseOffer, utils::copy(framework->inverseOffers)) {
+    allocator->updateInverseOffer(
+        inverseOffer->slave_id(),
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer);
+  }
+
   // Reconnect and reactivate the framework.
   framework->connected = true;
 
+  // Reactivate the framework.
   // NOTE: We do this after recovering resources (above) so that
   // the allocator has the correct view of the framework's share.
   if (!framework->active) {
@@ -5344,9 +5483,20 @@ void Master::removeFramework(Framework* framework)
         offer->slave_id(),
         offer->resources(),
         None());
+
     removeOffer(offer);
   }
 
+  // Also remove the inverse offers.
+  foreach (InverseOffer* inverseOffer, utils::copy(framework->inverseOffers)) {
+    allocator->updateInverseOffer(
+        inverseOffer->slave_id(),
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer);
+  }
+
   // Remove the framework's executors for correct resource accounting.
   foreachkey (const SlaveID& slaveId, utils::copy(framework->executors)) {
     Slave* slave = slaves.registered.get(slaveId);
@@ -5610,6 +5760,15 @@ void Master::removeSlave(
     removeOffer(offer, true); // Rescind!
   }
 
+  // Remove inverse offers because sending them for a slave that is
+  // gone doesn't make sense.
+  foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+    // We don't need to update the allocator because we've already called
+    // `RemoveSlave()`.
+    // Remove and rescind inverse offers.
+    removeInverseOffer(inverseOffer, true); // Rescind!
+  }
+
   // Mark the slave as being removed.
   slaves.removing.insert(slave->id);
   slaves.registered.remove(slave);
@@ -5967,6 +6126,58 @@ void Master::removeOffer(Offer* offer, bool rescind)
 }
 
 
+void Master::inverseOfferTimeout(const OfferID& inverseOfferId)
+{
+  InverseOffer* inverseOffer = getInverseOffer(inverseOfferId);
+  if (inverseOffer != NULL) {
+    allocator->updateInverseOffer(
+        inverseOffer->slave_id(),
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer, true);
+  }
+}
+
+
+void Master::removeInverseOffer(InverseOffer* inverseOffer, bool rescind)
+{
+  // Remove from framework.
+  Framework* framework = getFramework(inverseOffer->framework_id());
+  CHECK(framework != NULL)
+    << "Unknown framework " << inverseOffer->framework_id()
+    << " in the inverse offer " << inverseOffer->id();
+
+  framework->removeInverseOffer(inverseOffer);
+
+  // Remove from slave.
+  Slave* slave = slaves.registered.get(inverseOffer->slave_id());
+
+  CHECK(slave != NULL)
+    << "Unknown slave " << inverseOffer->slave_id()
+    << " in the inverse offer " << inverseOffer->id();
+
+  slave->removeInverseOffer(inverseOffer);
+
+  if (rescind) {
+    RescindResourceOfferMessage message;
+    message.mutable_offer_id()->CopyFrom(inverseOffer->id());
+    framework->send(message);
+  }
+
+  // Remove and cancel inverse offer removal timers. Canceling the Timers is
+  // only done to avoid having too many active Timers in libprocess.
+  if (inverseOfferTimers.contains(inverseOffer->id())) {
+    Clock::cancel(inverseOfferTimers[inverseOffer->id()]);
+    inverseOfferTimers.erase(inverseOffer->id());
+  }
+
+  // Delete it.
+  inverseOffers.erase(inverseOffer->id());
+  delete inverseOffer;
+}
+
+
 // TODO(bmahler): Consider killing this.
 Framework* Master::getFramework(const FrameworkID& frameworkId)
 {
@@ -5983,6 +6194,14 @@ Offer* Master::getOffer(const OfferID& offerId)
 }
 
 
+// TODO(bmahler): Consider killing this.
+InverseOffer* Master::getInverseOffer(const OfferID& inverseOfferId)
+{
+  return inverseOffers.contains(inverseOfferId) ?
+    inverseOffers[inverseOfferId] : NULL;
+}
+
+
 // Create a new framework ID. We format the ID as MASTERID-FWID, where
 // MASTERID is the ID of the master (launch date plus fault tolerant ID)
 // and FWID is an increasing integer.

http://git-wip-us.apache.org/repos/asf/mesos/blob/e6375f31/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1ba0837..d48ef7c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -236,6 +236,22 @@ struct Slave
     offers.erase(offer);
   }
 
+  void addInverseOffer(InverseOffer* inverseOffer)
+  {
+    CHECK(!inverseOffers.contains(inverseOffer))
+      << "Duplicate inverse offer " << inverseOffer->id();
+
+    inverseOffers.insert(inverseOffer);
+  }
+
+  void removeInverseOffer(InverseOffer* inverseOffer)
+  {
+    CHECK(inverseOffers.contains(inverseOffer))
+      << "Unknown inverse offer " << inverseOffer->id();
+
+    inverseOffers.erase(inverseOffer);
+  }
+
   bool hasExecutor(const FrameworkID& frameworkId,
                    const ExecutorID& executorId) const
   {
@@ -319,6 +335,9 @@ struct Slave
   // Active offers on this slave.
   hashset<Offer*> offers;
 
+  // Active inverse offers on this slave.
+  hashset<InverseOffer*> inverseOffers;
+
   hashmap<FrameworkID, Resources> usedResources;  // Active task / executors.
   Resources offeredResources; // Offers.
 
@@ -715,8 +734,15 @@ protected:
   // Remove an offer and optionally rescind the offer as well.
   void removeOffer(Offer* offer, bool rescind = false);
 
+  // Remove an inverse offer after specified timeout
+  void inverseOfferTimeout(const OfferID& inverseOfferId);
+
+  // Remove an inverse offer and optionally rescind it as well.
+  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
+
   Framework* getFramework(const FrameworkID& frameworkId);
   Offer* getOffer(const OfferID& offerId);
+  InverseOffer* getInverseOffer(const OfferID& inverseOfferId);
 
   FrameworkID newFrameworkId();
   OfferID newOfferId();
@@ -1134,6 +1160,9 @@ private:
   hashmap<OfferID, Offer*> offers;
   hashmap<OfferID, process::Timer> offerTimers;
 
+  hashmap<OfferID, InverseOffer*> inverseOffers;
+  hashmap<OfferID, process::Timer> inverseOfferTimers;
+
   hashmap<std::string, Role*> roles;
 
   // Authenticator names as supplied via flags.
@@ -1558,6 +1587,21 @@ struct Framework
     offers.erase(offer);
   }
 
+  void addInverseOffer(InverseOffer* inverseOffer)
+  {
+    CHECK(!inverseOffers.contains(inverseOffer))
+      << "Duplicate inverse offer " << inverseOffer->id();
+    inverseOffers.insert(inverseOffer);
+  }
+
+  void removeInverseOffer(InverseOffer* inverseOffer)
+  {
+    CHECK(inverseOffers.contains(inverseOffer))
+      << "Unknown inverse offer " << inverseOffer->id();
+
+    inverseOffers.erase(inverseOffer);
+  }
+
   bool hasExecutor(const SlaveID& slaveId,
                    const ExecutorID& executorId)
   {
@@ -1767,6 +1811,8 @@ struct Framework
 
   hashset<Offer*> offers; // Active offers for framework.
 
+  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
+
   hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors;
 
   // NOTE: For the used and offered resources below, we keep the


[02/16] mesos git commit: Maintenance Primitives: Handle inverse offers in pre-V1 scheduler.

Posted by jo...@apache.org.
Maintenance Primitives: Handle inverse offers in pre-V1 scheduler.

Review: https://reviews.apache.org/r/37621


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea961908
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea961908
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea961908

Branch: refs/heads/master
Commit: ea961908dadcf71234f95b2465e118c89cfca60c
Parents: a127671
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:50:41 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/sched/sched.cpp                    | 10 +++-
 src/tests/master_maintenance_tests.cpp | 90 +++++++++++++++++++++++++++++
 2 files changed, 99 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ea961908/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 1fc9e73..a1723f3 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -505,6 +505,7 @@ protected:
         }
 
         resourceOffers(from, offers, pids);
+
         break;
       }
 
@@ -776,9 +777,16 @@ protected:
       return;
     }
 
+    // We exit early if `offers` is empty since we don't implement inverse
+    // offers in the old scheduler API. It could be empty when there are only
+    // inverse offers as part of the `ResourceOffersMessage`.
+    if (offers.empty()) {
+      return;
+    }
+
     VLOG(2) << "Received " << offers.size() << " offers";
 
-    CHECK(offers.size() == pids.size());
+    CHECK_EQ(offers.size(), pids.size());
 
     // Save the pid associated with each slave (one per offer) so
     // later we can send framework messages directly.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea961908/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 8f39ac3..4a59389 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -474,6 +474,96 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
 }
 
 
+// Test ensures that old schedulers gracefully handle inverse offers, even if
+// they aren't passed up to the top level API yet.
+TEST_F(MasterMaintenanceTest, PreV1SchedulerSupport)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  // Intercept offers sent to the scheduler.
+  Future<vector<Offer>> normalOffers;
+  Future<vector<Offer>> unavailabilityOffers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&normalOffers))
+    .WillOnce(FutureArg<1>(&unavailabilityOffers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // The original offers should be rescinded when the unavailability is changed.
+  Future<Nothing> offerRescinded;
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillOnce(FutureSatisfy(&offerRescinded));
+
+  // Start the test.
+  driver.start();
+
+  // Wait for some normal offers.
+  AWAIT_READY(normalOffers);
+  EXPECT_NE(0u, normalOffers.get().size());
+
+  // Check that unavailability is not set.
+  foreach (const Offer& offer, normalOffers.get()) {
+    EXPECT_FALSE(offer.has_unavailability());
+  }
+
+  // Schedule this slave for maintenance.
+  MachineID machine;
+  machine.set_hostname(maintenanceHostname);
+  machine.set_ip(stringify(slave.get().address.ip));
+
+  // TODO(jmlvanre): Replace Time(0.0) with `Clock::now()` once JSON double
+  // conversion is fixed. For now using a rounded time avoids the issue.
+  const Time start = Time::create(0.0).get() + Seconds(60);
+  const Duration duration = Seconds(120);
+  const Unavailability unavailability = createUnavailability(start, duration);
+
+  // Post a valid schedule with one machine.
+  maintenance::Schedule schedule = createSchedule(
+      {createWindow({machine}, unavailability)});
+
+  // We have a few seconds between the first set of offers and the next
+  // allocation of offers. This should be enough time to perform a maintenance
+  // schedule update. This update will also trigger the rescinding of offers
+  // from the scheduled slave.
+  Future<Response> response =
+    process::http::post(
+        master.get(),
+        "maintenance/schedule",
+        headers,
+        stringify(JSON::Protobuf(schedule)));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  // Wait for some offers.
+  AWAIT_READY(unavailabilityOffers);
+  EXPECT_NE(0u, unavailabilityOffers.get().size());
+
+  // Check that each offer has an unavailability.
+  foreach (const Offer& offer, unavailabilityOffers.get()) {
+    EXPECT_TRUE(offer.has_unavailability());
+    EXPECT_EQ(unavailability.start(), offer.unavailability().start());
+    EXPECT_EQ(unavailability.duration(), offer.unavailability().duration());
+  }
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // Posts valid and invalid machines to the maintenance start endpoint.
 TEST_F(MasterMaintenanceTest, BringDownMachines)
 {


[04/16] mesos git commit: Maintenance Primitives: Set offer `unavailability` for maintenance.

Posted by jo...@apache.org.
Maintenance Primitives: Set offer `unavailability` for maintenance.

Review: https://reviews.apache.org/r/37172


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ee1eb2ba
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ee1eb2ba
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ee1eb2ba

Branch: refs/heads/master
Commit: ee1eb2ba6b17cba66ad99f4e6344416c2d2709d2
Parents: 9e7ee6b
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:39:35 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 include/mesos/type_utils.hpp           | 24 ++++++++
 include/mesos/v1/mesos.hpp             | 24 ++++++++
 src/master/master.cpp                  |  8 +++
 src/tests/master_maintenance_tests.cpp | 96 +++++++++++++++++++++++++++++
 4 files changed, 152 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ee1eb2ba/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index 4fb0037..64c2a86 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -108,6 +108,18 @@ inline bool operator==(const TaskID& left, const TaskID& right)
 }
 
 
+inline bool operator==(const TimeInfo& left, const TimeInfo& right)
+{
+  return left.nanoseconds() == right.nanoseconds();
+}
+
+
+inline bool operator==(const DurationInfo& left, const DurationInfo& right)
+{
+  return left.nanoseconds() == right.nanoseconds();
+}
+
+
 inline bool operator==(const ContainerID& left, const std::string& right)
 {
   return left.value() == right;
@@ -183,6 +195,18 @@ inline bool operator!=(const SlaveID& left, const SlaveID& right)
 }
 
 
+inline bool operator!=(const TimeInfo& left, const TimeInfo& right)
+{
+  return !(left == right);
+}
+
+
+inline bool operator!=(const DurationInfo& left, const DurationInfo& right)
+{
+  return !(left == right);
+}
+
+
 inline bool operator<(const ContainerID& left, const ContainerID& right)
 {
   return left.value() < right.value();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ee1eb2ba/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index 0d695f3..f8f9617 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -89,6 +89,18 @@ inline bool operator==(const TaskID& left, const TaskID& right)
 }
 
 
+inline bool operator==(const TimeInfo& left, const TimeInfo& right)
+{
+  return left.nanoseconds() == right.nanoseconds();
+}
+
+
+inline bool operator==(const DurationInfo& left, const DurationInfo& right)
+{
+  return left.nanoseconds() == right.nanoseconds();
+}
+
+
 inline bool operator==(const ContainerID& left, const std::string& right)
 {
   return left.value() == right;
@@ -125,6 +137,18 @@ inline bool operator==(const TaskID& left, const std::string& right)
 }
 
 
+inline bool operator!=(const TimeInfo& left, const TimeInfo& right)
+{
+  return !(left == right);
+}
+
+
+inline bool operator!=(const DurationInfo& left, const DurationInfo& right)
+{
+  return !(left == right);
+}
+
+
 inline bool operator!=(const ContainerID& left, const ContainerID& right)
 {
   return left.value() != right.value();

http://git-wip-us.apache.org/repos/asf/mesos/blob/ee1eb2ba/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 31fc83d..5c2f032 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4654,6 +4654,14 @@ void Master::offer(const FrameworkID& frameworkId,
       }
     }
 
+    // If the slave in this offer is planned to be unavailable due to
+    // maintenance in the future, then set the Unavailability.
+    CHECK(machines.contains(slave->machineId));
+    if (machines[slave->machineId].info.has_unavailability()) {
+      offer->mutable_unavailability()->CopyFrom(
+          machines[slave->machineId].info.unavailability());
+    }
+
     offers[offer->id()] = offer;
 
     framework->addOffer(offer);

http://git-wip-us.apache.org/repos/asf/mesos/blob/ee1eb2ba/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index fb8dca3..5811446 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -63,6 +63,7 @@ using mesos::internal::protobuf::maintenance::createUnavailability;
 using mesos::internal::protobuf::maintenance::createWindow;
 
 using std::string;
+using std::vector;
 
 using testing::DoAll;
 
@@ -90,10 +91,18 @@ public:
     unavailability = createUnavailability(Clock::now());
   }
 
+  virtual slave::Flags CreateSlaveFlags()
+  {
+    slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
+    slaveFlags.hostname = maintenanceHostname;
+    return slaveFlags;
+  }
 
   // Default headers for all POST's to maintenance endpoints.
   hashmap<string, string> headers;
 
+  const string maintenanceHostname = "maintenance-host";
+
   // Some generic `MachineID`s that can be used in this test.
   MachineID machine1;
   MachineID machine2;
@@ -291,6 +300,93 @@ TEST_F(MasterMaintenanceTest, FailToUnscheduleDeactivatedMachines)
 }
 
 
+// Test ensures that an offer will have an `unavailability` set if the
+// slave is scheduled to go down for maintenance.
+TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  // Intercept offers sent to the scheduler.
+  Future<vector<Offer>> normalOffers;
+  Future<vector<Offer>> unavailabilityOffers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&normalOffers))
+    .WillOnce(FutureArg<1>(&unavailabilityOffers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Start the test.
+  driver.start();
+
+  // Wait for some normal offers.
+  AWAIT_READY(normalOffers);
+  EXPECT_NE(0u, normalOffers.get().size());
+
+  // Check that unavailability is not set.
+  foreach (const Offer& offer, normalOffers.get()) {
+    EXPECT_FALSE(offer.has_unavailability());
+
+    // We have a few seconds between allocations (by default).  That should
+    // be enough time to post a schedule before the next allocation.
+    driver.declineOffer(offer.id());
+  }
+
+  // Schedule this slave for maintenance.
+  MachineID machine;
+  machine.set_hostname("maintenance-host");
+  machine.set_ip(stringify(slave.get().address.ip));
+
+  // TODO(jmlvanre): Replace Time(0.0) with `Clock::now()` once JSON double
+  // conversion is fixed. For now using a rounded time avoids the issue.
+  const Time start = Time::create(0.0).get() + Seconds(60);
+  const Duration duration = Seconds(120);
+  const Unavailability unavailability = createUnavailability(start, duration);
+
+  // Post a valid schedule with one machine.
+  maintenance::Schedule schedule = createSchedule({
+      createWindow({machine}, unavailability)});
+
+  Future<Response> response = process::http::post(
+      master.get(),
+      "maintenance/schedule",
+      headers,
+      stringify(JSON::Protobuf(schedule)));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  // Speed up the test by not waiting until the next allocation.
+  driver.reviveOffers();
+
+  // Wait for some offers.
+  AWAIT_READY(unavailabilityOffers);
+  EXPECT_NE(0u, unavailabilityOffers.get().size());
+
+  // Check that each offer has an unavailability.
+  foreach (const Offer& offer, unavailabilityOffers.get()) {
+    EXPECT_TRUE(offer.has_unavailability());
+    EXPECT_EQ(unavailability.start(), offer.unavailability().start());
+    EXPECT_EQ(unavailability.duration(), offer.unavailability().duration());
+  }
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // Posts valid and invalid machines to the maintenance start endpoint.
 TEST_F(MasterMaintenanceTest, BringDownMachines)
 {


[15/16] mesos git commit: Maintenance Primitives: Shutdown slave when maintenance is started.

Posted by jo...@apache.org.
Maintenance Primitives: Shutdown slave when maintenance is started.

Review: https://reviews.apache.org/r/37622


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/147420e3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/147420e3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/147420e3

Branch: refs/heads/master
Commit: 147420e3e591c4b2674d3f84252066bc5d4b660c
Parents: ea96190
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:55:25 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/master/http.cpp                    |  31 ++++++++
 src/tests/master_maintenance_tests.cpp | 114 ++++++++++++++++++++++++++++
 2 files changed, 145 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/147420e3/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 05b590e..f7ce9aa 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1593,6 +1593,37 @@ Future<Response> Master::Http::machineDown(const Request& request) const
       // is here, and is appropriate.
       CHECK(result);
 
+      // We currently send a `ShutdownMessage` to each slave. This terminates
+      // all the executors for all the frameworks running on that slave.
+      // We also manually remove the slave to force sending TASK_LOST updates
+      // for all the tasks that were running on the slave and `LostSlaveMessage`
+      // messages to the framework. This guards against the slave having dropped
+      // the `ShutdownMessage`.
+      foreach (const MachineID& machineId, ids.values()) {
+        // The machine may not be in machines. This means no slaves are
+        // currently registered on that machine so this is a no-op.
+        if (master->machines.contains(machineId)) {
+          // NOTE: Copies are needed because removeSlave modifies
+          // master->machines.
+          foreach (
+              const SlaveID& slaveId,
+              utils::copy(master->machines[machineId].slaves)) {
+            Slave* slave = master->slaves.registered.get(slaveId);
+            CHECK_NOTNULL(slave);
+
+            // Tell the slave to shut down.
+            ShutdownMessage shutdownMessage;
+            shutdownMessage.set_message("Operator initiated 'Machine DOWN'");
+            master->send(slave->pid, shutdownMessage);
+
+            // Immediately remove the slave to force sending `TASK_LOST` status
+            // updates as well as `LostSlaveMessage` messages to the frameworks.
+            // See comment above.
+            master->removeSlave(slave, "Operator initiated 'Machine DOWN'");
+          }
+        }
+      }
+
       // Update the master's local state with the downed machines.
       foreach (const MachineID& id, ids.values()) {
         master->machines[id].info.set_mode(MachineInfo::DOWN);

http://git-wip-us.apache.org/repos/asf/mesos/blob/147420e3/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 4a59389..6ae502d 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -564,6 +564,120 @@ TEST_F(MasterMaintenanceTest, PreV1SchedulerSupport)
 }
 
 
+// Test ensures that slaves receive a shutdown message from the master when
+// maintenance is started, and frameworks receive a task lost message.
+TEST_F(MasterMaintenanceTest, EnterMaintenanceMode)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  // Launch a task.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillRepeatedly(Return()); // Ignore rescinds.
+
+  // Collect the status updates to verify the task is running and then lost.
+  Future<TaskStatus> startStatus, lostStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startStatus))
+    .WillOnce(FutureArg<1>(&lostStatus));
+
+  // Start the test.
+  driver.start();
+
+  // Wait till the task is running to schedule the maintenance.
+  AWAIT_READY(startStatus);
+  EXPECT_EQ(TASK_RUNNING, startStatus.get().state());
+
+  // Schedule this slave for maintenance.
+  MachineID machine;
+  machine.set_hostname(maintenanceHostname);
+  machine.set_ip(stringify(slave.get().address.ip));
+
+  // TODO(jmlvanre): Replace Time(0.0) with `Clock::now()` once JSON double
+  // conversion is fixed. For now using a rounded time avoids the issue.
+  const Time start = Time::create(0.0).get() + Seconds(60);
+  const Duration duration = Seconds(120);
+  const Unavailability unavailability = createUnavailability(start, duration);
+
+  // Post a valid schedule with one machine.
+  maintenance::Schedule schedule = createSchedule(
+      {createWindow({machine}, unavailability)});
+
+  // We have a few seconds between the first set of offers and the next
+  // allocation of offers.  This should be enough time to perform a maintenance
+  // schedule update.  This update will also trigger the rescinding of offers
+  // from the scheduled slave.
+  Future<Response> response =
+    process::http::post(
+        master.get(),
+        "maintenance/schedule",
+        headers,
+        stringify(JSON::Protobuf(schedule)));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  // Verify that the master forces the slave to be shut down after the
+  // maintenance is started.
+  Future<ShutdownMessage> shutdownMessage =
+    FUTURE_PROTOBUF(ShutdownMessage(), master.get(), slave.get());
+
+  // Verify that the framework will be informed that the slave is lost.
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  // Start the maintenance.
+  response =
+    process::http::post(
+        master.get(),
+        "machine/down",
+        headers,
+        stringify(JSON::Protobuf(createMachineList({machine}))));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  // Wait for the slave to be shut down.
+  AWAIT_READY(shutdownMessage);
+
+  // Verify that we received a TASK_LOST.
+  AWAIT_READY(lostStatus);
+  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+  // Verify that the framework received the slave lost message.
+  AWAIT_READY(slaveLost);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // Posts valid and invalid machines to the maintenance start endpoint.
 TEST_F(MasterMaintenanceTest, BringDownMachines)
 {


[10/16] mesos git commit: Maintenance Primitives: Added InverseOffers to Scheduler Event Offers.

Posted by jo...@apache.org.
Maintenance Primitives: Added InverseOffers to Scheduler Event Offers.

Review: https://reviews.apache.org/r/37178


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c702a2c5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c702a2c5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c702a2c5

Branch: refs/heads/master
Commit: c702a2c55e3e6d893b959e22f29bb18cb34ffdbb
Parents: a1de99f
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:22:03 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto    | 10 +++++++---
 include/mesos/v1/scheduler/scheduler.proto | 10 +++++++---
 2 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c702a2c5/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto
index 89daf8a..19f548d 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -65,11 +65,15 @@ message Event {
   }
 
   // Received whenever there are new resources that are offered to the
-  // scheduler. Each offer corresponds to a set of resources on a
-  // slave. Until the scheduler accepts or declines an offer the
-  // resources are considered allocated to the scheduler.
+  // scheduler or resources requested back from the scheduler. Each
+  // offer corresponds to a set of resources on a slave. Until the
+  // scheduler accepts or declines an offer the resources are
+  // considered allocated to the scheduler. Accepting or Declining an
+  // inverse offer informs the allocator of the scheduler's ability to
+  // release the resources without violating an SLA.
   message Offers {
     repeated Offer offers = 1;
+    repeated InverseOffer inverse_offers = 2;
   }
 
   // Received when a particular offer is no longer valid (e.g., the

http://git-wip-us.apache.org/repos/asf/mesos/blob/c702a2c5/include/mesos/v1/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto
index bd5e82a..0118b46 100644
--- a/include/mesos/v1/scheduler/scheduler.proto
+++ b/include/mesos/v1/scheduler/scheduler.proto
@@ -65,11 +65,15 @@ message Event {
   }
 
   // Received whenever there are new resources that are offered to the
-  // scheduler. Each offer corresponds to a set of resources on a
-  // agent. Until the scheduler accepts or declines an offer the
-  // resources are considered allocated to the scheduler.
+  // scheduler or resources requested back from the scheduler. Each
+  // offer corresponds to a set of resources on a slave. Until the
+  // scheduler accepts or declines an offer the resources are
+  // considered allocated to the scheduler. Accepting or Declining an
+  // inverse offer informs the allocator of the scheduler's ability to
+  // release the resources without violating an SLA.
   message Offers {
     repeated Offer offers = 1;
+    repeated InverseOffer inverse_offers = 2;
   }
 
   // Received when a particular offer is no longer valid (e.g., the


[16/16] mesos git commit: Maintenance Primitives: Added Accept / Decline for InverseOffers.

Posted by jo...@apache.org.
Maintenance Primitives: Added Accept / Decline for InverseOffers.

Review: https://reviews.apache.org/r/37284


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a127671a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a127671a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a127671a

Branch: refs/heads/master
Commit: a127671a726542e21cc7bc8838aa882b6bec4b49
Parents: bf82689
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:28:53 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/master/master.cpp | 56 +++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 53 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a127671a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 52d5763..61236b3 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2789,6 +2789,11 @@ void Master::accept(
     // validation failed, return resources to the allocator.
     foreach (const OfferID& offerId, accept.offer_ids()) {
       Offer* offer = getOffer(offerId);
+
+      // Since we re-use `OfferID`s, it is possible to arrive here with either
+      // a resource offer, or an inverse offer. We first try as a resource offer
+      // and if that fails, then we assume it is an inverse offer. This is
+      // correct as those are currently the only 2 ways to get an `OfferID`.
       if (offer != NULL) {
         slaveId = offer->slave_id();
         offeredResources += offer->resources();
@@ -2801,7 +2806,29 @@ void Master::accept(
               None());
         }
         removeOffer(offer);
+        continue;
+      }
+
+      // Try it as an inverse offer. If this fails then the offer is no longer
+      // valid.
+      InverseOffer* inverseOffer = getInverseOffer(offerId);
+      if (inverseOffer != NULL) {
+        mesos::master::InverseOfferStatus status;
+        status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
+
+        allocator->updateInverseOffer(
+            offer->slave_id(),
+            offer->framework_id(),
+            status);
+
+        removeInverseOffer(inverseOffer);
+        continue;
       }
+
+      // If the offer was neither in our offer or inverse offer sets, then this
+      // offer is no longer valid.
+      LOG(WARNING) << "Ignoring accept of offer " << offerId
+                   << " since it is no longer valid";
     }
   }
 
@@ -3237,6 +3264,10 @@ void Master::decline(
 
   //  Return resources to the allocator.
   foreach (const OfferID& offerId, decline.offer_ids()) {
+    // Since we re-use `OfferID`s, it is possible to arrive here with either a
+    // resource offer, or an inverse offer. We first try as a resource offer and
+    // if that fails, then we assume it is an inverse offer. This is correct as
+    // those are currently the only 2 ways to get an `OfferID`.
     Offer* offer = getOffer(offerId);
     if (offer != NULL) {
       allocator->recoverResources(
@@ -3246,10 +3277,29 @@ void Master::decline(
           decline.filters());
 
       removeOffer(offer);
-    } else {
-      LOG(WARNING) << "Ignoring decline of offer " << offerId
-                   << " since it is no longer valid";
+      continue;
+    }
+
+    // Try it as an inverse offer. If this fails then the offer is no longer
+    // valid.
+    InverseOffer* inverseOffer = getInverseOffer(offerId);
+    if (inverseOffer != NULL) { // If this is an inverse offer.
+      mesos::master::InverseOfferStatus status;
+      status.set_status(mesos::master::InverseOfferStatus::DECLINE);
+
+      allocator->updateInverseOffer(
+          offer->slave_id(),
+          offer->framework_id(),
+          status);
+
+      removeInverseOffer(inverseOffer);
+      continue;
     }
+
+    // If the offer was neither in our offer or inverse offer sets, then this
+    // offer is no longer valid.
+    LOG(WARNING) << "Ignoring decline of offer " << offerId
+                 << " since it is no longer valid";
   }
 }
 


[08/16] mesos git commit: Maintenance Primitives: Added a new allocation overload to sorter.

Posted by jo...@apache.org.
Maintenance Primitives: Added a new allocation overload to sorter.

This provides the ability to compute the frameworks that currently have
resources allocated or reserved. This information is used by the
maintenance feature to send out inverse offers.

Review: https://reviews.apache.org/r/37176


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8e042581
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8e042581
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8e042581

Branch: refs/heads/master
Commit: 8e042581671fba360c92378ba47dee5a7d2b0f34
Parents: f87f733
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:19:40 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/master/allocator/sorter/drf/sorter.cpp | 20 ++++++++++++++++++++
 src/master/allocator/sorter/drf/sorter.hpp |  2 ++
 src/master/allocator/sorter/sorter.hpp     |  4 ++++
 3 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8e042581/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index bfc2734..33c47e7 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -162,6 +162,26 @@ hashmap<SlaveID, Resources> DRFSorter::allocation(const string& name)
 }
 
 
+hashmap<std::string, Resources> DRFSorter::allocation(const SlaveID& slaveId)
+{
+  // TODO(jmlvanre): We can index the allocation by slaveId to make this faster.
+  // It is a tradeoff between speed vs. memory. For now we use existing data
+  // structures.
+
+  hashmap<std::string, Resources> result;
+
+  foreachpair (const string& name, const Allocation& allocation, allocations) {
+    if (allocation.resources.contains(slaveId)) {
+      // It is safe to use `at()` here because we've just checked the existence
+      // of the key. This avoid un-necessary copies.
+      result.emplace(name, allocation.resources.at(slaveId));
+    }
+  }
+
+  return result;
+}
+
+
 Resources DRFSorter::allocation(const string& name, const SlaveID& slaveId)
 {
   CHECK(contains(name));

http://git-wip-us.apache.org/repos/asf/mesos/blob/8e042581/src/master/allocator/sorter/drf/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
index 217c7c4..9c64d7a 100644
--- a/src/master/allocator/sorter/drf/sorter.hpp
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -91,6 +91,8 @@ public:
 
   virtual hashmap<SlaveID, Resources> allocation(const std::string& name);
 
+  virtual hashmap<std::string, Resources> allocation(const SlaveID& slaveId);
+
   virtual Resources allocation(const std::string& name, const SlaveID& slaveId);
 
   virtual void add(const SlaveID& slaveId, const Resources& resources);

http://git-wip-us.apache.org/repos/asf/mesos/blob/8e042581/src/master/allocator/sorter/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp
index 536a7ad..faebeb3 100644
--- a/src/master/allocator/sorter/sorter.hpp
+++ b/src/master/allocator/sorter/sorter.hpp
@@ -80,6 +80,10 @@ public:
   // Returns the resources that have been allocated to this client.
   virtual hashmap<SlaveID, Resources> allocation(const std::string& client) = 0;
 
+  // Returns the clients that have allocations on this slave.
+  virtual hashmap<std::string, Resources> allocation(
+      const SlaveID& slaveId) = 0;
+
   // Returns the given slave's resources that have been allocated to
   // this client.
   virtual Resources allocation(


[05/16] mesos git commit: Maintenance Primitives: Added unavailability to Allocator Slave struct.

Posted by jo...@apache.org.
Maintenance Primitives: Added unavailability to Allocator Slave struct.

Review: https://reviews.apache.org/r/37173


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea48105a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea48105a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea48105a

Branch: refs/heads/master
Commit: ea48105aa68f249dd409c60ed1dd4998f3498e1e
Parents: ee1eb2b
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:04:47 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          |  1 +
 include/mesos/master/allocator.proto        | 29 +++++++++++
 src/master/allocator/mesos/allocator.hpp    |  4 ++
 src/master/allocator/mesos/hierarchical.hpp | 44 +++++++++++++++++
 src/master/master.cpp                       |  9 ++++
 src/tests/hierarchical_allocator_tests.cpp  | 61 +++++++++++++-----------
 src/tests/master_allocator_tests.cpp        | 32 ++++++-------
 src/tests/mesos.hpp                         |  9 ++--
 src/tests/reservation_endpoints_tests.cpp   | 20 ++++----
 src/tests/reservation_tests.cpp             |  4 +-
 src/tests/slave_recovery_tests.cpp          |  2 +-
 11 files changed, 154 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 659f37b..257d2f6 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -96,6 +96,7 @@ public:
   virtual void addSlave(
       const SlaveID& slaveId,
       const SlaveInfo& slaveInfo,
+      const Option<Unavailability>& unavailability,
       const Resources& total,
       const hashmap<FrameworkID, Resources>& used) = 0;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/include/mesos/master/allocator.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.proto b/include/mesos/master/allocator.proto
index 10fd9a2..b42f19d 100644
--- a/include/mesos/master/allocator.proto
+++ b/include/mesos/master/allocator.proto
@@ -28,3 +28,32 @@ message RoleInfo {
   required string name = 1;
   optional double weight = 2 [default = 1];
 }
+
+
+/**
+ * Describes the status of an inverse offer.
+ *
+ * This is a protobuf so as to be able to share the status to inverse offers
+ * through endpoints such as the maintenance status endpoint.
+ */
+// TODO(jmlvanre): Copy this when V1 Allocator API is introduced.
+message InverseOfferStatus {
+  enum Status {
+    // We have not received a response yet. This is the default state before
+    // receiving a response.
+    UNKNOWN = 1;
+    // The framework is ok with the inverse offer. This means it will not
+    // violate any SLAs and will attempt to evacuate any tasks running on the
+    // agent. If the tasks are not evacuated by the framework, the operator can
+    // manually shut down the slave knowing that the framework will not have
+    // violated its SLAs.
+    ACCEPT = 2;
+    // The framework wants to block the maintenance operation from happening. An
+    // example would be that it can not meet its SLA by losing resources.
+    DECLINE = 3;
+  }
+
+  required Status status = 1;
+
+  // TODO(jmlvanre): Capture decline message.
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index aa55755..c845723 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -75,6 +75,7 @@ public:
   void addSlave(
       const SlaveID& slaveId,
       const SlaveInfo& slaveInfo,
+      const Option<Unavailability>& unavailability,
       const Resources& total,
       const hashmap<FrameworkID, Resources>& used);
 
@@ -165,6 +166,7 @@ public:
   virtual void addSlave(
       const SlaveID& slaveId,
       const SlaveInfo& slaveInfo,
+      const Option<Unavailability>& unavailability,
       const Resources& total,
       const hashmap<FrameworkID, Resources>& used) = 0;
 
@@ -316,6 +318,7 @@ template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::addSlave(
     const SlaveID& slaveId,
     const SlaveInfo& slaveInfo,
+    const Option<Unavailability>& unavailability,
     const Resources& total,
     const hashmap<FrameworkID, Resources>& used)
 {
@@ -324,6 +327,7 @@ inline void MesosAllocator<AllocatorProcess>::addSlave(
       &MesosAllocatorProcess::addSlave,
       slaveId,
       slaveInfo,
+      unavailability,
       total,
       used);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index fbf353d..f86a701 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -36,6 +36,7 @@
 #include <stout/check.hpp>
 #include <stout/duration.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
 #include <stout/stopwatch.hpp>
 #include <stout/stringify.hpp>
 
@@ -112,6 +113,7 @@ public:
   void addSlave(
       const SlaveID& slaveId,
       const SlaveInfo& slaveInfo,
+      const Option<Unavailability>& unavailability,
       const Resources& total,
       const hashmap<FrameworkID, Resources>& used);
 
@@ -258,6 +260,40 @@ protected:
     bool checkpoint; // Whether slave supports checkpointing.
 
     std::string hostname;
+
+    // Represents a scheduled unavailability due to maintenance for a specific
+    // slave, and the responses from frameworks as to whether they will be able
+    // to gracefully handle this unavailability.
+    // NOTE: We currently implement maintenance in the allocator to be able to
+    // leverage state and features such as the FrameworkSorter and Filters.
+    struct Maintenance
+    {
+      Maintenance(const Unavailability& _unavailability)
+        : unavailability(_unavailability) {}
+
+      // The start time and optional duration of the event.
+      Unavailability unavailability;
+
+      // A mapping of frameworks to the inverse offer status associated with
+      // this unavailability.
+      // NOTE: We currently lose this information during a master fail over
+      // since it is not persisted or replicated. This is ok as the new master's
+      // allocator will send out new inverse offers and re-collect the
+      // information. This is similar to all the outstanding offers from an old
+      // master being invalidated, and new offers being sent out.
+      hashmap<FrameworkID, mesos::master::InverseOfferStatus> statuses;
+
+      // Represent the "unit of accounting" for maintenance. When a
+      // `FrameworkID` is present in the hashset it means an inverse offer has
+      // been sent out. When it is not present it means no offer is currently
+      // outstanding.
+      hashset<FrameworkID> offersOutstanding;
+    };
+
+    // When the `maintenance` is set the slave is scheduled to be unavailable at
+    // a given point in time, for an optional duration. This information is used
+    // to send out `InverseOffers`.
+    Option<Maintenance> maintenance;
   };
 
   hashmap<SlaveID, Slave> slaves;
@@ -509,6 +545,7 @@ void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
     const SlaveID& slaveId,
     const SlaveInfo& slaveInfo,
+    const Option<Unavailability>& unavailability,
     const Resources& total,
     const hashmap<FrameworkID, Resources>& used)
 {
@@ -540,6 +577,13 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
   slaves[slaveId].checkpoint = slaveInfo.checkpoint();
   slaves[slaveId].hostname = slaveInfo.hostname();
 
+  // NOTE: We currently implement maintenance in the allocator to be able to
+  // leverage state and features such as the FrameworkSorter and Filters.
+  if (unavailability.isSome()) {
+    slaves[slaveId].maintenance =
+      typename Slave::Maintenance(unavailability.get());
+  }
+
   LOG(INFO) << "Added slave " << slaveId << " (" << slaves[slaveId].hostname
             << ") with " << slaves[slaveId].total
             << " (allocated: " << slaves[slaveId].allocated << ")";

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5c2f032..1bed6a6 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5445,9 +5445,18 @@ void Master::addSlave(
     }
   }
 
+  CHECK(machines.contains(slave->machineId));
+
+  // Only set unavailability if the protobuf has one set.
+  Option<Unavailability> unavailability = None();
+  if (machines[slave->machineId].info.has_unavailability()) {
+    unavailability = machines[slave->machineId].info.unavailability();
+  }
+
   allocator->addSlave(
       slave->id,
       slave->info,
+      unavailability,
       slave->totalResources,
       slave->usedResources);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 9748ca0..0a24b6b 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -216,7 +216,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
 
   // Total cluster resources will become cpus=2, mem=1024.
   SlaveInfo slave1 = createSlaveInfo("cpus:2;mem:1024;disk:0");
-  allocator->addSlave(slave1.id(), slave1, slave1.resources(), EMPTY);
+  allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), EMPTY);
 
   // framework1 will be offered all of slave1's resources since it is
   // the only framework running so far.
@@ -242,7 +242,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // role2 share = 0
   //   framework2 share = 0
   SlaveInfo slave2 = createSlaveInfo("cpus:1;mem:512;disk:0");
-  allocator->addSlave(slave2.id(), slave2, slave2.resources(), EMPTY);
+  allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
 
   // framework2 will be offered all of slave2's resources since role2
   // has the lowest user share, and framework2 is its only framework.
@@ -262,7 +262,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // role2 share = 0.16 (cpus=1, mem=512)
   //   framework2 share = 1
   SlaveInfo slave3 = createSlaveInfo("cpus:3;mem:2048;disk:0");
-  allocator->addSlave(slave3.id(), slave3, slave3.resources(), EMPTY);
+  allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY);
 
   // framework2 will be offered all of slave3's resources since role2
   // has the lowest share.
@@ -287,7 +287,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // role2 share = 0.4 (cpus=4, mem=2560)
   //   framework2 share = 1
   SlaveInfo slave4 = createSlaveInfo("cpus:4;mem:4096;disk:0");
-  allocator->addSlave(slave4.id(), slave4, slave4.resources(), EMPTY);
+  allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY);
 
   // framework3 will be offered all of slave4's resources since role1
   // has the lowest user share, and framework3 has the lowest share of
@@ -315,7 +315,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
   // role2 share = 0.36 (cpus=4, mem=2560)
   //   framework2 share = 1
   SlaveInfo slave5 = createSlaveInfo("cpus:1;mem:512;disk:0");
-  allocator->addSlave(slave5.id(), slave5, slave5.resources(), EMPTY);
+  allocator->addSlave(slave5.id(), slave5, None(), slave5.resources(), EMPTY);
 
   // Even though framework4 doesn't have any resources, role2 has a
   // lower share than role1, so framework2 receives slave5's resources.
@@ -343,7 +343,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
   SlaveInfo slave1 = createSlaveInfo(
       "cpus:1;mem:512;disk:0;"
       "cpus(role1):100;mem(role1):1024;disk(role1):0");
-  allocator->addSlave(slave1.id(), slave1, slave1.resources(), EMPTY);
+  allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), EMPTY);
 
   // framework1 will be offered all of the resources.
   FrameworkInfo framework1 = createFrameworkInfo("role1");
@@ -361,7 +361,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
 
   // framework2 will be allocated the new resoures.
   SlaveInfo slave2 = createSlaveInfo("cpus:2;mem:512;disk:0");
-  allocator->addSlave(slave2.id(), slave2, slave2.resources(), EMPTY);
+  allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
 
   allocation = queue.get();
   AWAIT_READY(allocation);
@@ -373,7 +373,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
   // fairness across roles! We expect framework1 to receive this
   // slave's resources, since it has fewer unreserved resources.
   SlaveInfo slave3 = createSlaveInfo("cpus:2;mem:512;disk:0");
-  allocator->addSlave(slave3.id(), slave3, slave3.resources(), EMPTY);
+  allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY);
 
   allocation = queue.get();
   AWAIT_READY(allocation);
@@ -390,7 +390,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
 
   SlaveInfo slave4 = createSlaveInfo(
       "cpus(role1):2;mem(role1):1024;disk(role1):0");
-  allocator->addSlave(slave4.id(), slave4, slave4.resources(), EMPTY);
+  allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY);
 
   allocation = queue.get();
   AWAIT_READY(allocation);
@@ -413,10 +413,10 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
   hashmap<FrameworkID, Resources> EMPTY;
 
   SlaveInfo slave1 = createSlaveInfo("cpus:2;mem:1024;disk:0");
-  allocator->addSlave(slave1.id(), slave1, slave1.resources(), EMPTY);
+  allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), EMPTY);
 
   SlaveInfo slave2 = createSlaveInfo("cpus:2;mem:1024;disk:0");
-  allocator->addSlave(slave2.id(), slave2, slave2.resources(), EMPTY);
+  allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
 
   // Once framework1 is added, an allocation will occur. Return the
   // resources so that we can test what happens when there are 2
@@ -494,7 +494,7 @@ TEST_F(HierarchicalAllocatorTest, SameShareFairness)
       framework2.id(), framework2, hashmap<SlaveID, Resources>());
 
   SlaveInfo slave = createSlaveInfo("cpus:2;mem:1024;disk:0");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   // Ensure that the slave's resources are alternated between both
   // frameworks.
@@ -534,17 +534,17 @@ TEST_F(HierarchicalAllocatorTest, Reservations)
 
   SlaveInfo slave1 = createSlaveInfo(
       "cpus(role1):2;mem(role1):1024;disk(role1):0");
-  allocator->addSlave(slave1.id(), slave1, slave1.resources(), EMPTY);
+  allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), EMPTY);
 
   SlaveInfo slave2 = createSlaveInfo(
       "cpus(role2):2;mem(role2):1024;cpus:1;mem:1024;disk:0");
-  allocator->addSlave(slave2.id(), slave2, slave2.resources(), EMPTY);
+  allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
 
   // This slave's resources should never be allocated, since there
   // is no framework for role3.
   SlaveInfo slave3 = createSlaveInfo(
       "cpus(role3):1;mem(role3):1024;disk(role3):0");
-  allocator->addSlave(slave3.id(), slave3, slave3.resources(), EMPTY);
+  allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY);
 
   // framework1 should get all the resources from slave1, and the
   // unreserved resources from slave2.
@@ -588,7 +588,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
   SlaveInfo slave = createSlaveInfo(
       "cpus(role1):1;mem(role1):200;"
       "cpus:1;mem:200;disk:0");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   // Initially, all the resources are allocated.
   FrameworkInfo framework1 = createFrameworkInfo("role1");
@@ -660,14 +660,14 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
       "cpus:" + stringify(MIN_CPUS / 2) + ";"
       "mem:" + stringify((MIN_MEM / 2).megabytes()) + ";"
       "disk:128");
-  allocator->addSlave(slave1.id(), slave1, slave1.resources(), EMPTY);
+  allocator->addSlave(slave1.id(), slave1, None(), slave1.resources(), EMPTY);
 
   // Enough cpus to be considered allocatable.
   SlaveInfo slave2 = createSlaveInfo(
       "cpus:" + stringify(MIN_CPUS) + ";"
       "mem:" + stringify((MIN_MEM / 2).megabytes()) + ";"
       "disk:128");
-  allocator->addSlave(slave2.id(), slave2, slave2.resources(), EMPTY);
+  allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
 
   Future<Allocation> allocation = queue.get();
   AWAIT_READY(allocation);
@@ -681,7 +681,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
       "cpus:" + stringify(MIN_CPUS / 2) + ";"
       "mem:" + stringify((MIN_MEM).megabytes()) + ";"
       "disk:128");
-  allocator->addSlave(slave3.id(), slave3, slave3.resources(), EMPTY);
+  allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY);
 
   allocation = queue.get();
   AWAIT_READY(allocation);
@@ -698,7 +698,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
       "cpus(role1):" + stringify(MIN_CPUS / 1.5) + ";"
       "mem(role1):" + stringify((MIN_MEM / 2).megabytes()) + ";"
       "disk:128");
-  allocator->addSlave(slave4.id(), slave4, slave4.resources(), EMPTY);
+  allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY);
 
   allocation = queue.get();
   AWAIT_READY(allocation);
@@ -719,7 +719,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
   hashmap<FrameworkID, Resources> EMPTY;
 
   SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   // Initially, all the resources are allocated.
   FrameworkInfo framework = createFrameworkInfo("role1");
@@ -791,7 +791,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess)
   hashmap<FrameworkID, Resources> EMPTY;
 
   SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   // Construct an offer operation for the framework's allocation.
   Resources unreserved = Resources::parse("cpus:25;mem:50").get();
@@ -836,7 +836,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail)
   hashmap<FrameworkID, Resources> EMPTY;
 
   SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   // Expect to receive the all of the available resources.
   FrameworkInfo framework = createFrameworkInfo("role1");
@@ -873,7 +873,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
   hashmap<FrameworkID, Resources> EMPTY;
 
   SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   // Add a framework that can accept revocable resources.
   FrameworkInfo framework = createFrameworkInfo("role1");
@@ -930,7 +930,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
   hashmap<FrameworkID, Resources> EMPTY;
 
   SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   // Add a framework that does *not* accept revocable resources.
   FrameworkInfo framework = createFrameworkInfo("role1");
@@ -965,7 +965,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
   hashmap<FrameworkID, Resources> EMPTY;
 
   SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   // Add a framework that can accept revocable resources.
   FrameworkInfo framework = createFrameworkInfo("role1");
@@ -1022,7 +1022,7 @@ TEST_F(HierarchicalAllocatorTest, Whitelist)
   hashmap<FrameworkID, Resources> EMPTY;
 
   SlaveInfo slave = createSlaveInfo("cpus:2;mem:1024");
-  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+  allocator->addSlave(slave.id(), slave, None(), slave.resources(), EMPTY);
 
   FrameworkInfo framework = createFrameworkInfo("*");
   allocator->addFramework(
@@ -1128,7 +1128,12 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, AddAndUpdateSlave)
         "cpus:1;mem:128;disk:1024;"
         "ports:[31126-31510,31512-31623,31810-31852,31854-31964]").get();
 
-    allocator->addSlave(slaves[i].id(), slaves[i], slaves[i].resources(), used);
+    allocator->addSlave(
+        slaves[i].id(),
+        slaves[i],
+        None(),
+        slaves[i].resources(),
+        used);
   }
 
   // Wait for all the 'addSlave' operations to be processed.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index 8933196..c6a419b 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -103,7 +103,7 @@ TYPED_TEST(MasterAllocatorTest, SingleFramework)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.resources = Some("cpus:2;mem:1024;disk:0");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave>> slave = this->StartSlave(flags);
   ASSERT_SOME(slave);
@@ -151,7 +151,7 @@ TYPED_TEST(MasterAllocatorTest, ResourcesUnused)
   slave::Flags flags1 = this->CreateSlaveFlags();
   flags1.resources = Some("cpus:2;mem:1024");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave1 = this->StartSlave(&exec, flags1);
   ASSERT_SOME(slave1);
@@ -254,7 +254,7 @@ TYPED_TEST(MasterAllocatorTest, OutOfOrderDispatch)
   slave::Flags flags1 = this->CreateSlaveFlags();
   flags1.resources = Some("cpus:2;mem:1024");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave1 = this->StartSlave(flags1);
   ASSERT_SOME(slave1);
@@ -384,7 +384,7 @@ TYPED_TEST(MasterAllocatorTest, SchedulerFailover)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.resources = Some("cpus:3;mem:1024");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave = this->StartSlave(&exec, flags);
   ASSERT_SOME(slave);
@@ -518,7 +518,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkExited)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.resources = Some("cpus:3;mem:1024");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave = this->StartSlave(&containerizer, flags);
   ASSERT_SOME(slave);
@@ -652,7 +652,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost)
   slave::Flags flags1 = this->CreateSlaveFlags();
   flags1.resources = Some("cpus:2;mem:1024");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave1 = this->StartSlave(&exec, flags1);
   ASSERT_SOME(slave1);
@@ -719,7 +719,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost)
   slave::Flags flags2 = this->CreateSlaveFlags();
   flags2.resources = string("cpus:3;mem:256;disk:1024;ports:[31000-32000]");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   // Eventually after slave2 is launched, we should get
   // an offer that contains all of slave2's resources
@@ -769,7 +769,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveAdded)
   slave::Flags flags1 = this->CreateSlaveFlags();
   flags1.resources = Some("cpus:3;mem:1024");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave1 = this->StartSlave(&exec, flags1);
   ASSERT_SOME(slave1);
@@ -818,7 +818,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveAdded)
   slave::Flags flags2 = this->CreateSlaveFlags();
   flags2.resources = Some("cpus:4;mem:2048");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   // After slave2 launches, all of its resources are combined with the
   // resources on slave1 that the task isn't using.
@@ -863,7 +863,7 @@ TYPED_TEST(MasterAllocatorTest, TaskFinished)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.resources = Some("cpus:3;mem:1024");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave = this->StartSlave(&exec, flags);
   ASSERT_SOME(slave);
@@ -965,7 +965,7 @@ TYPED_TEST(MasterAllocatorTest, CpusOnlyOfferedAndTaskLaunched)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.resources = Some("cpus:2;mem:0");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave = this->StartSlave(&exec, flags);
   ASSERT_SOME(slave);
@@ -1043,7 +1043,7 @@ TYPED_TEST(MasterAllocatorTest, MemoryOnlyOfferedAndTaskLaunched)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.resources = Some("cpus:0;mem:200");
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave> > slave = this->StartSlave(&exec, flags);
   ASSERT_SOME(slave);
@@ -1260,7 +1260,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst)
     slaveDetector.appoint(master.get());
     schedulerDetector.appoint(master.get());
 
-    EXPECT_CALL(allocator, addSlave(_, _, _, _));
+    EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
     slave::Flags flags = this->CreateSlaveFlags();
     flags.resources = Some("cpus:2;mem:1024");
@@ -1330,7 +1330,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst)
 
     AWAIT_READY(addFramework);
 
-    EXPECT_CALL(allocator2, addSlave(_, _, _, _));
+    EXPECT_CALL(allocator2, addSlave(_, _, _, _, _));
 
     Future<vector<Offer>> resourceOffers2;
     EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1385,7 +1385,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst)
     slaveDetector.appoint(master.get());
     schedulerDetector.appoint(master.get());
 
-    EXPECT_CALL(allocator, addSlave(_, _, _, _));
+    EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
     slave::Flags flags = this->CreateSlaveFlags();
     flags.resources = Some("cpus:2;mem:1024");
@@ -1438,7 +1438,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst)
     EXPECT_CALL(allocator2, initialize(_, _, _));
 
     Future<Nothing> addSlave;
-    EXPECT_CALL(allocator2, addSlave(_, _, _, _))
+    EXPECT_CALL(allocator2, addSlave(_, _, _, _, _))
       .WillOnce(DoAll(InvokeAddSlave(&allocator2),
                       FutureSatisfy(&addSlave)));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 6b50311..4b65440 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1301,7 +1301,7 @@ ACTION_P(InvokeUpdateFramework, allocator)
 
 ACTION_P(InvokeAddSlave, allocator)
 {
-  allocator->real->addSlave(arg0, arg1, arg2, arg3);
+  allocator->real->addSlave(arg0, arg1, arg2, arg3, arg4);
 }
 
 
@@ -1431,9 +1431,9 @@ public:
     EXPECT_CALL(*this, updateFramework(_, _))
       .WillRepeatedly(DoDefault());
 
-    ON_CALL(*this, addSlave(_, _, _, _))
+    ON_CALL(*this, addSlave(_, _, _, _, _))
       .WillByDefault(InvokeAddSlave(this));
-    EXPECT_CALL(*this, addSlave(_, _, _, _))
+    EXPECT_CALL(*this, addSlave(_, _, _, _, _))
       .WillRepeatedly(DoDefault());
 
     ON_CALL(*this, removeSlave(_))
@@ -1514,9 +1514,10 @@ public:
       const FrameworkID&,
       const FrameworkInfo&));
 
-  MOCK_METHOD4(addSlave, void(
+  MOCK_METHOD5(addSlave, void(
       const SlaveID&,
       const SlaveInfo&,
+      const Option<Unavailability>&,
       const Resources&,
       const hashmap<FrameworkID, Resources>&));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index dfab497..572a8d6 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -138,7 +138,7 @@ TEST_F(ReservationEndpointsTest, AvailableResources)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -231,7 +231,7 @@ TEST_F(ReservationEndpointsTest, ReserveOfferedResources)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -305,7 +305,7 @@ TEST_F(ReservationEndpointsTest, UnreserveOfferedResources)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -391,7 +391,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -533,7 +533,7 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -684,7 +684,7 @@ TEST_F(ReservationEndpointsTest, InsufficientResources)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -726,7 +726,7 @@ TEST_F(ReservationEndpointsTest, NoHeader)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -776,7 +776,7 @@ TEST_F(ReservationEndpointsTest, BadCredentials)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -853,7 +853,7 @@ TEST_F(ReservationEndpointsTest, NoResources)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 
@@ -888,7 +888,7 @@ TEST_F(ReservationEndpointsTest, NonMatchingPrincipal)
   ASSERT_SOME(master);
 
   Future<SlaveID> slaveId;
-  EXPECT_CALL(allocator, addSlave(_, _, _, _))
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
     .WillOnce(DoAll(InvokeAddSlave(&allocator),
                     FutureArg<0>(&slaveId)));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index aeee367..91fcf0d 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -418,7 +418,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
   slave::Flags slaveFlags = CreateSlaveFlags();
   slaveFlags.resources = "cpus:1;mem:512";
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave>> slave = StartSlave(slaveFlags);
   ASSERT_SOME(slave);
@@ -509,7 +509,7 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
   slave::Flags slaveFlags = CreateSlaveFlags();
   slaveFlags.resources = "cpus(role):1;mem(role):512";
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Try<PID<Slave>> slave = StartSlave(slaveFlags);
   ASSERT_SOME(slave);

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea48105a/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 6aae14a..b636986 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2150,7 +2150,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
 
   slave::Flags flags = this->CreateSlaveFlags();
 
-  EXPECT_CALL(allocator, addSlave(_, _, _, _));
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _));
 
   Fetcher fetcher;
 


[13/16] mesos git commit: Maintenance Primitives: Added updateInverseOffer to Allocator.

Posted by jo...@apache.org.
Maintenance Primitives: Added updateInverseOffer to Allocator.

Review: https://reviews.apache.org/r/37280


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42f9ce5d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42f9ce5d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42f9ce5d

Branch: refs/heads/master
Commit: 42f9ce5d61bf3e2c48d6a3de86d2e3e5cd3f6b57
Parents: 6c568ba
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 14:23:51 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          | 10 +++++
 src/master/allocator/mesos/allocator.hpp    | 25 ++++++++++++
 src/master/allocator/mesos/hierarchical.hpp | 50 ++++++++++++++++++++++++
 src/tests/mesos.hpp                         | 16 ++++++++
 4 files changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/42f9ce5d/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 18d31ef..fb09e2a 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -149,6 +149,16 @@ public:
       const SlaveID& slaveId,
       const Option<Unavailability>& unavailability) = 0;
 
+  // Informs the allocator that the inverse offer has been responded to or
+  // revoked. If `status` is not set then the inverse offer was not responded
+  // to, possibly because the offer timed out or was rescinded. This might
+  // require the implementation of the function to remove any inverse offers
+  // that are outstanding.
+  virtual void updateInverseOffer(
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const Option<InverseOfferStatus>& status) = 0;
+
   // Informs the Allocator to recover resources that are considered
   // used by the framework.
   virtual void recoverResources(

http://git-wip-us.apache.org/repos/asf/mesos/blob/42f9ce5d/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 124dd3d..171548b 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -116,6 +116,11 @@ public:
       const SlaveID& slaveId,
       const Option<Unavailability>& unavailability);
 
+  void updateInverseOffer(
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const Option<mesos::master::InverseOfferStatus>& status);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -215,6 +220,11 @@ public:
       const SlaveID& slaveId,
       const Option<Unavailability>& unavailability) = 0;
 
+  virtual void updateInverseOffer(
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const Option<mesos::master::InverseOfferStatus>& status) = 0;
+
   virtual void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -467,6 +477,21 @@ inline void MesosAllocator<AllocatorProcess>::updateUnavailability(
 
 
 template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const Option<mesos::master::InverseOfferStatus>& status)
+{
+  return process::dispatch(
+      process,
+      &MesosAllocatorProcess::updateInverseOffer,
+      slaveId,
+      frameworkId,
+      status);
+}
+
+
+template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/42f9ce5d/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 8ae7475..8e3ec9c 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -154,6 +154,11 @@ public:
       const SlaveID& slaveId,
       const Option<Unavailability>& unavailability);
 
+  void updateInverseOffer(
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const Option<mesos::master::InverseOfferStatus>& status);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -861,6 +866,51 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
 
 template <class RoleSorter, class FrameworkSorter>
 void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const Option<mesos::master::InverseOfferStatus>& status)
+{
+  CHECK(initialized);
+  CHECK(frameworks.contains(frameworkId));
+  CHECK(slaves.contains(slaveId));
+  CHECK(slaves[slaveId].maintenance.isSome());
+
+  // NOTE: We currently implement maintenance in the allocator to be able to
+  // leverage state and features such as the FrameworkSorter and Filters.
+
+  // We use a reference by alias because we intend to modify the
+  // `maintenance` and to improve readability.
+  typename Slave::Maintenance& maintenance = slaves[slaveId].maintenance.get();
+
+  // Only handle inverse offers that we currently have outstanding. If it is not
+  // currently outstanding this means it is old and can be safely ignored.
+  if (maintenance.offersOutstanding.contains(frameworkId)) {
+
+    // We always remove the outstanding offer so that we will send a new offer
+    // out the next time we schedule inverse offers.
+    maintenance.offersOutstanding.erase(frameworkId);
+
+    // If the response is `Some`, this means the framework responded. Otherwise
+    // if it is `None` the inverse offer timed out or was rescinded.
+    if (status.isSome()) {
+      // For now we don't allow frameworks to respond with `UNKNOWN`. The caller
+      // should guard against this. This goes against the pattern of not
+      // checking external invariants; however, the allocator and master are
+      // currently so tightly coupled that this check is valuable.
+      CHECK_NE(
+          status.get().status(),
+          mesos::master::InverseOfferStatus::UNKNOWN);
+
+      // If the framework responded, we update our state to match.
+      maintenance.statuses[frameworkId].CopyFrom(status.get());
+    }
+  }
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/42f9ce5d/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 858618f..3db97ac 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1359,6 +1359,12 @@ ACTION_P(InvokeUpdateUnavailability, allocator)
 }
 
 
+ACTION_P(InvokeUpdateInverseOffer, allocator)
+{
+  return allocator->real->updateInverseOffer(arg0, arg1, arg2);
+}
+
+
 ACTION_P(InvokeRecoverResources, allocator)
 {
   allocator->real->recoverResources(arg0, arg1, arg2, arg3);
@@ -1487,6 +1493,11 @@ public:
     EXPECT_CALL(*this, updateUnavailability(_, _))
       .WillRepeatedly(DoDefault());
 
+    ON_CALL(*this, updateInverseOffer(_, _, _))
+      .WillByDefault(InvokeUpdateInverseOffer(this));
+    EXPECT_CALL(*this, updateInverseOffer(_, _, _))
+      .WillRepeatedly(DoDefault());
+
     ON_CALL(*this, recoverResources(_, _, _, _))
       .WillByDefault(InvokeRecoverResources(this));
     EXPECT_CALL(*this, recoverResources(_, _, _, _))
@@ -1568,6 +1579,11 @@ public:
       const SlaveID&,
       const Option<Unavailability>&));
 
+  MOCK_METHOD3(updateInverseOffer, void(
+      const SlaveID&,
+      const FrameworkID&,
+      const Option<mesos::master::InverseOfferStatus>&));
+
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,
       const SlaveID&,


[07/16] mesos git commit: Maintenance Primitives: Added updateUnavailability to master.

Posted by jo...@apache.org.
Maintenance Primitives: Added updateUnavailability to master.

Review: https://reviews.apache.org/r/37175


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f87f733d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f87f733d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f87f733d

Branch: refs/heads/master
Commit: f87f733dbd34e39c91125fabe541269aea806266
Parents: ea48105
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:40:10 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          |  8 +++
 src/master/allocator/mesos/allocator.hpp    | 21 +++++++
 src/master/allocator/mesos/hierarchical.hpp | 29 +++++++++
 src/master/http.cpp                         | 15 ++---
 src/master/master.cpp                       | 77 ++++++++++++++++++++++++
 src/master/master.hpp                       |  4 ++
 src/tests/master_maintenance_tests.cpp      | 21 ++++---
 src/tests/mesos.hpp                         | 15 +++++
 8 files changed, 174 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 257d2f6..b5bfc28 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -135,6 +135,14 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations) = 0;
 
+  // We currently support storing the next unavailability, if there is one, per
+  // slave. If `unavailability` is not set then there is no known upcoming
+  // unavailability. This might require the implementation of the function to
+  // remove any inverse offers that are outstanding.
+  virtual void updateUnavailability(
+      const SlaveID& slaveId,
+      const Option<Unavailability>& unavailability) = 0;
+
   // Informs the Allocator to recover resources that are considered
   // used by the framework.
   virtual void recoverResources(

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index c845723..ee6ec58 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -108,6 +108,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations);
 
+  void updateUnavailability(
+      const SlaveID& slaveId,
+      const Option<Unavailability>& unavailability);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -199,6 +203,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations) = 0;
 
+  virtual void updateUnavailability(
+      const SlaveID& slaveId,
+      const Option<Unavailability>& unavailability) = 0;
+
   virtual void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -433,6 +441,19 @@ MesosAllocator<AllocatorProcess>::updateAvailable(
 
 
 template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateUnavailability(
+    const SlaveID& slaveId,
+    const Option<Unavailability>& unavailability)
+{
+  return process::dispatch(
+      process,
+      &MesosAllocatorProcess::updateUnavailability,
+      slaveId,
+      unavailability);
+}
+
+
+template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index f86a701..77a5b4c 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -146,6 +146,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations);
 
+  void updateUnavailability(
+      const SlaveID& slaveId,
+      const Option<Unavailability>& unavailability);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -816,6 +820,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable(
 
 template <class RoleSorter, class FrameworkSorter>
 void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
+    const SlaveID& slaveId,
+    const Option<Unavailability>& unavailability)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  // NOTE: We currently implement maintenance in the allocator to be able to
+  // leverage state and features such as the FrameworkSorter and Filters.
+
+  // Remove any old unavailability.
+  slaves[slaveId].maintenance = None();
+
+  // If we have a new unavailability.
+  if (unavailability.isSome()) {
+    slaves[slaveId].maintenance =
+      typename Slave::Maintenance(unavailability.get());
+  }
+
+  allocate(slaveId);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index a814930..05b590e 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1494,19 +1494,19 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
         }
       }
 
-      // NOTE: Copies are needed because this loop modifies the container.
+      // NOTE: Copies are needed because `updateUnavailability()` in this loop
+      // modifies the container.
       foreachkey (const MachineID& id, utils::copy(master->machines)) {
         // Update the entry for each updated machine.
         if (updated.contains(id)) {
-          master->machines[id]
-            .info.mutable_unavailability()->CopyFrom(updated[id]);
-
+          master->updateUnavailability(id, updated[id]);
           continue;
         }
 
-        // Remove the unavailability for each removed machine.
-        master->machines[id].info.clear_unavailability();
+        // Transition each removed machine back to the `UP` mode and remove the
+        // unavailability.
         master->machines[id].info.set_mode(MachineInfo::UP);
+        master->updateUnavailability(id, None());
       }
 
       // Save each new machine, with the unavailability
@@ -1516,9 +1516,10 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
           MachineInfo info;
           info.mutable_id()->CopyFrom(id);
           info.set_mode(MachineInfo::DRAINING);
-          info.mutable_unavailability()->CopyFrom(window.unavailability());
 
           master->machines[id].info.CopyFrom(info);
+
+          master->updateUnavailability(id, window.unavailability());
         }
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1bed6a6..0b3ba56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -50,6 +50,7 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/check.hpp>
+#include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/ip.hpp>
 #include <stout/lambda.hpp>
@@ -3846,6 +3847,26 @@ void Master::reregisterSlave(
     // based authentication).
     LOG(INFO) << "Re-registering slave " << *slave;
 
+    // We don't allow re-registering this way with a different IP or
+    // hostname. This is because maintenance is scheduled at the
+    // machine level; so we would need to re-validate the slave's
+    // unavailability if the machine it is running on changed.
+    if (slave->pid.address.ip != from.address.ip ||
+        slave->info.hostname() != slaveInfo.hostname()) {
+      LOG(WARNING) << "Slave " << slaveInfo.id() << " at " << from
+                   << " (" << slaveInfo.hostname() << ") attempted to "
+                   << "re-register with different IP / hostname; expected "
+                   << slave->pid.address.ip << " (" << slave->info.hostname()
+                   << ") shutting it down";
+
+      ShutdownMessage message;
+      message.set_message(
+          "Slave attempted to re-register with different IP / hostname");
+
+      send(from, message);
+      return;
+    }
+
     // Update the slave pid and relink to it.
     // NOTE: Re-linking the slave here always rather than only when
     // the slave is disconnected can lead to multiple exited events
@@ -4102,6 +4123,62 @@ void Master::updateSlave(
 }
 
 
+void Master::updateUnavailability(
+    const MachineID& machineId,
+    const Option<Unavailability>& unavailability)
+{
+  if (unavailability.isSome()) {
+    machines[machineId].info.mutable_unavailability()->CopyFrom(
+        unavailability.get());
+  } else {
+    machines[machineId].info.clear_unavailability();
+  }
+
+  // TODO(jmlvanre): Only update allocator and rescind offers if the
+  // unavailability has actually changed.
+  if (machines.contains(machineId)) {
+    // For every slave on this machine, update the allocator.
+    foreach (const SlaveID& slaveId, machines[machineId].slaves) {
+      // The slave should not be in the machines mapping if it is removed.
+      CHECK(slaves.removed.get(slaveId).isNone());
+
+      // The slave should be registered if it is in the machines mapping.
+      CHECK(slaves.registered.contains(slaveId));
+
+      Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId));
+
+      if (unavailability.isSome()) {
+        // TODO(jmlvanre): Add stream operator for unavailability.
+        LOG(INFO) << "Updating unavailability of slave " << *slave
+                  << ", starting at "
+                  << Nanoseconds(unavailability.get().start().nanoseconds());
+      } else {
+        LOG(INFO) << "Removing unavailability of slave " << *slave;
+      }
+
+      // Remove and rescind offers since we want to inform frameworks of the
+      // unavailability change as soon as possible.
+      foreach (Offer* offer, utils::copy(slave->offers)) {
+        allocator->recoverResources(
+            offer->framework_id(), slave->id, offer->resources(), None());
+
+        removeOffer(offer, true); // Rescind!
+      }
+
+      // We remove / resind all the offers first so that any calls to the
+      // allocator to modify its internal state are queued before the update of
+      // the unavailability in the allocator. We do this so that the allocator's
+      // state can start from a "clean slate" for the new unavailability.
+      // NOTE: Any calls from the Allocator back into the master, for example
+      // `offer()`, are guaranteed to happen after this function exits due to
+      // the Actor pattern.
+
+      allocator->updateUnavailability(slaveId, unavailability);
+    }
+  }
+}
+
+
 // TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid'
 // because the status updates will be sent by the slave.
 void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d7d27bd..cd71a25 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -468,6 +468,10 @@ public:
       const SlaveID& slaveId,
       const Resources& oversubscribedResources);
 
+  void updateUnavailability(
+      const MachineID& machineId,
+      const Option<Unavailability>& unavailability);
+
   void shutdownSlave(
       const SlaveID& slaveId,
       const std::string& message);

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 5811446..a857ab9 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -327,6 +327,12 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
     .WillOnce(FutureArg<1>(&unavailabilityOffers))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
+  // The original offers should be rescinded when the unavailability
+  // is changed.
+  Future<Nothing> offerRescinded;
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillOnce(FutureSatisfy(&offerRescinded));
+
   // Start the test.
   driver.start();
 
@@ -337,10 +343,6 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
   // Check that unavailability is not set.
   foreach (const Offer& offer, normalOffers.get()) {
     EXPECT_FALSE(offer.has_unavailability());
-
-    // We have a few seconds between allocations (by default).  That should
-    // be enough time to post a schedule before the next allocation.
-    driver.declineOffer(offer.id());
   }
 
   // Schedule this slave for maintenance.
@@ -355,9 +357,13 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
   const Unavailability unavailability = createUnavailability(start, duration);
 
   // Post a valid schedule with one machine.
-  maintenance::Schedule schedule = createSchedule({
-      createWindow({machine}, unavailability)});
+  maintenance::Schedule schedule = createSchedule(
+      {createWindow({machine}, unavailability)});
 
+  // We have a few seconds between the first set of offers and the
+  // next allocation of offers.  This should be enough time to perform
+  // a maintenance schedule update.  This update will also trigger the
+  // rescinding of offers from the scheduled slave.
   Future<Response> response = process::http::post(
       master.get(),
       "maintenance/schedule",
@@ -366,9 +372,6 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
 
-  // Speed up the test by not waiting until the next allocation.
-  driver.reviveOffers();
-
   // Wait for some offers.
   AWAIT_READY(unavailabilityOffers);
   EXPECT_NE(0u, unavailabilityOffers.get().size());

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 4b65440..477b7e4 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1353,6 +1353,12 @@ ACTION_P(InvokeUpdateResources, allocator)
 }
 
 
+ACTION_P(InvokeUpdateUnavailability, allocator)
+{
+  return allocator->real->updateUnavailability(arg0, arg1);
+}
+
+
 ACTION_P(InvokeRecoverResources, allocator)
 {
   allocator->real->recoverResources(arg0, arg1, arg2, arg3);
@@ -1476,6 +1482,11 @@ public:
     EXPECT_CALL(*this, updateAvailable(_, _))
       .WillRepeatedly(DoDefault());
 
+    ON_CALL(*this, updateUnavailability(_, _))
+      .WillByDefault(InvokeUpdateUnavailability(this));
+    EXPECT_CALL(*this, updateUnavailability(_, _))
+      .WillRepeatedly(DoDefault());
+
     ON_CALL(*this, recoverResources(_, _, _, _))
       .WillByDefault(InvokeRecoverResources(this));
     EXPECT_CALL(*this, recoverResources(_, _, _, _))
@@ -1550,6 +1561,10 @@ public:
       const SlaveID&,
       const std::vector<Offer::Operation>&));
 
+  MOCK_METHOD2(updateUnavailability, void(
+      const SlaveID&,
+      const Option<Unavailability>&));
+
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,
       const SlaveID&,


[14/16] mesos git commit: Maintenance Primitives: Used V1 API for Master maintenance test.

Posted by jo...@apache.org.
Maintenance Primitives: Used V1 API for Master maintenance test.

Review: https://reviews.apache.org/r/37283


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf82689f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf82689f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf82689f

Branch: refs/heads/master
Commit: bf82689f69a21286177d52d7d7e5d2f713c1e5b1
Parents: 388eaa5
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:50:21 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/tests/master_maintenance_tests.cpp | 152 +++++++++++++++++++++-------
 1 file changed, 118 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bf82689f/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index a857ab9..8f39ac3 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -20,6 +20,12 @@
 
 #include <mesos/maintenance/maintenance.hpp>
 
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/http.hpp>
@@ -37,6 +43,9 @@
 
 #include "common/protobuf_utils.hpp"
 
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
 #include "master/master.hpp"
 
 #include "slave/flags.hpp"
@@ -48,9 +57,14 @@ using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
 
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
 using process::Clock;
 using process::Future;
 using process::PID;
+using process::Queue;
 using process::Time;
 
 using process::http::BadRequest;
@@ -65,6 +79,7 @@ using mesos::internal::protobuf::maintenance::createWindow;
 using std::string;
 using std::vector;
 
+using testing::AtMost;
 using testing::DoAll;
 
 namespace mesos {
@@ -111,9 +126,40 @@ public:
   // Default unavailability.  Used when the test does not care
   // about the value of the unavailability.
   Unavailability unavailability;
+
+protected:
+  // Helper class for using EXPECT_CALL since the Mesos scheduler API
+  // is callback based.
+  class Callbacks
+  {
+  public:
+    MOCK_METHOD0(connected, void(void));
+    MOCK_METHOD0(disconnected, void(void));
+    MOCK_METHOD1(received, void(const std::queue<Event>&));
+  };
 };
 
 
+// Enqueues all received events into a libprocess queue.
+// TODO(jmlvanre): Factor this common code out of tests into V1
+// helper.
+ACTION_P(Enqueue, queue)
+{
+  std::queue<Event> events = arg0;
+  while (!events.empty()) {
+    // Note that we currently drop HEARTBEATs because most of these tests
+    // are not designed to deal with heartbeats.
+    // TODO(vinod): Implement DROP_HTTP_CALLS that can filter heartbeats.
+    if (events.front().type() == Event::HEARTBEAT) {
+      VLOG(1) << "Ignoring HEARTBEAT event";
+    } else {
+      queue->put(events.front());
+    }
+    events.pop();
+  }
+}
+
+
 // Posts valid and invalid schedules to the maintenance schedule endpoint.
 TEST_F(MasterMaintenanceTest, UpdateSchedule)
 {
@@ -304,7 +350,10 @@ TEST_F(MasterMaintenanceTest, FailToUnscheduleDeactivatedMachines)
 // slave is scheduled to go down for maintenance.
 TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
 {
-  Try<PID<Master>> master = StartMaster();
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
   ASSERT_SOME(master);
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
@@ -312,36 +361,49 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
   Try<PID<Slave>> slave = StartSlave(&exec);
   ASSERT_SOME(slave);
 
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
 
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .Times(1);
+  Mesos mesos(
+      master.get(),
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
 
-  // Intercept offers sent to the scheduler.
-  Future<vector<Offer>> normalOffers;
-  Future<vector<Offer>> unavailabilityOffers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&normalOffers))
-    .WillOnce(FutureArg<1>(&unavailabilityOffers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
+  AWAIT_READY(connected);
 
-  // The original offers should be rescinded when the unavailability
-  // is changed.
-  Future<Nothing> offerRescinded;
-  EXPECT_CALL(sched, offerRescinded(&driver, _))
-    .WillOnce(FutureSatisfy(&offerRescinded));
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
 
-  // Start the test.
-  driver.start();
+  v1::FrameworkID id(event.get().subscribed().framework_id());
 
-  // Wait for some normal offers.
-  AWAIT_READY(normalOffers);
-  EXPECT_NE(0u, normalOffers.get().size());
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+  const size_t numberOfOffers = event.get().offers().offers().size();
 
-  // Check that unavailability is not set.
-  foreach (const Offer& offer, normalOffers.get()) {
+  // Regular offers shouldn't have unavailability.
+  foreach (const v1::Offer& offer, event.get().offers().offers()) {
     EXPECT_FALSE(offer.has_unavailability());
   }
 
@@ -372,19 +434,41 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
 
-  // Wait for some offers.
-  AWAIT_READY(unavailabilityOffers);
-  EXPECT_NE(0u, unavailabilityOffers.get().size());
+  // The original offers should be rescinded when the unavailability
+  // is changed. We expect as many rescind events as we received
+  // original offers.
+  for (size_t offerNumber = 0; offerNumber < numberOfOffers; ++offerNumber) {
+    event = events.get();
+    AWAIT_READY(event);
+    EXPECT_EQ(Event::RESCIND, event.get().type());
+  }
 
-  // Check that each offer has an unavailability.
-  foreach (const Offer& offer, unavailabilityOffers.get()) {
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+
+  // Make sure the new offers have the unavailability set.
+  foreach (const v1::Offer& offer, event.get().offers().offers()) {
     EXPECT_TRUE(offer.has_unavailability());
-    EXPECT_EQ(unavailability.start(), offer.unavailability().start());
-    EXPECT_EQ(unavailability.duration(), offer.unavailability().duration());
+    EXPECT_EQ(
+        unavailability.start().nanoseconds(),
+        offer.unavailability().start().nanoseconds());
+
+    EXPECT_EQ(
+        unavailability.duration().nanoseconds(),
+        offer.unavailability().duration().nanoseconds());
   }
 
-  driver.stop();
-  driver.join();
+  // We also expect an inverse offer for the slave to go under
+  // maintenance.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().inverse_offers().size());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
 
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }


[06/16] mesos git commit: Maintenance Primitives: Added `MachineID` to Slave struct in Master.

Posted by jo...@apache.org.
Maintenance Primitives: Added `MachineID` to Slave struct in Master.

Review: https://reviews.apache.org/r/37170


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9e7ee6b2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9e7ee6b2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9e7ee6b2

Branch: refs/heads/master
Commit: 9e7ee6b26f8afe419c7758327fc9ce9f580e0b54
Parents: 57385ec
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Aug 30 13:56:56 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/Makefile.am            |  1 +
 src/master/http.cpp        | 32 ++++++++++++++-------------
 src/master/machine.hpp     | 49 +++++++++++++++++++++++++++++++++++++++++
 src/master/maintenance.cpp |  6 ++---
 src/master/maintenance.hpp |  3 ++-
 src/master/master.cpp      | 21 +++++++++++++++++-
 src/master/master.hpp      | 12 +++++++---
 7 files changed, 101 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7ee6b2/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 8963cea..bb77c2d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -732,6 +732,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	master/constants.hpp						\
 	master/detector.hpp						\
 	master/flags.hpp						\
+	master/machine.hpp						\
 	master/maintenance.hpp						\
 	master/master.hpp						\
 	master/metrics.hpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7ee6b2/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 73e8857..a814930 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1464,7 +1464,7 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
   mesos::maintenance::Schedule schedule = protoSchedule.get();
   Try<Nothing> isValid = maintenance::validation::schedule(
       schedule,
-      master->machineInfos);
+      master->machines);
 
   if (isValid.isError()) {
     return BadRequest(isValid.error());
@@ -1495,17 +1495,18 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
       }
 
       // NOTE: Copies are needed because this loop modifies the container.
-      foreachkey (const MachineID& id, utils::copy(master->machineInfos)) {
+      foreachkey (const MachineID& id, utils::copy(master->machines)) {
         // Update the entry for each updated machine.
         if (updated.contains(id)) {
-          master->machineInfos[id]
-            .mutable_unavailability()->CopyFrom(updated[id]);
+          master->machines[id]
+            .info.mutable_unavailability()->CopyFrom(updated[id]);
 
           continue;
         }
 
-        // Delete the entry for each removed machine.
-        master->machineInfos.erase(id);
+        // Remove the unavailability for each removed machine.
+        master->machines[id].info.clear_unavailability();
+        master->machines[id].info.set_mode(MachineInfo::UP);
       }
 
       // Save each new machine, with the unavailability
@@ -1517,7 +1518,7 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
           info.set_mode(MachineInfo::DRAINING);
           info.mutable_unavailability()->CopyFrom(window.unavailability());
 
-          master->machineInfos[id] = info;
+          master->machines[id].info.CopyFrom(info);
         }
       }
 
@@ -1571,13 +1572,13 @@ Future<Response> Master::Http::machineDown(const Request& request) const
   // Check that all machines are part of a maintenance schedule.
   // TODO(josephw): Allow a transition from `UP` to `DOWN`.
   foreach (const MachineID& id, ids.values()) {
-    if (!master->machineInfos.contains(id)) {
+    if (!master->machines.contains(id)) {
       return BadRequest(
           "Machine '" + id.DebugString() +
             "' is not part of a maintenance schedule");
     }
 
-    if (master->machineInfos[id].mode() != MachineInfo::DRAINING) {
+    if (master->machines[id].info.mode() != MachineInfo::DRAINING) {
       return BadRequest(
           "Machine '" + id.DebugString() +
             "' is not in DRAINING mode and cannot be brought down");
@@ -1593,7 +1594,7 @@ Future<Response> Master::Http::machineDown(const Request& request) const
 
       // Update the master's local state with the downed machines.
       foreach (const MachineID& id, ids.values()) {
-        master->machineInfos[id].set_mode(MachineInfo::DOWN);
+        master->machines[id].info.set_mode(MachineInfo::DOWN);
       }
 
       return OK();
@@ -1641,13 +1642,13 @@ Future<Response> Master::Http::machineUp(const Request& request) const
 
   // Check that all machines are part of a maintenance schedule.
   foreach (const MachineID& id, ids.values()) {
-    if (!master->machineInfos.contains(id)) {
+    if (!master->machines.contains(id)) {
       return BadRequest(
           "Machine '" + id.DebugString() +
             "' is not part of a maintenance schedule");
     }
 
-    if (master->machineInfos[id].mode() != MachineInfo::DOWN) {
+    if (master->machines[id].info.mode() != MachineInfo::DOWN) {
       return BadRequest(
           "Machine '" + id.DebugString() +
             "' is not in DOWN mode and cannot be brought up");
@@ -1664,7 +1665,8 @@ Future<Response> Master::Http::machineUp(const Request& request) const
       // Update the master's local state with the reactivated machines.
       hashset<MachineID> updated;
       foreach (const MachineID& id, ids.values()) {
-        master->machineInfos.erase(id);
+        master->machines[id].info.set_mode(MachineInfo::UP);
+        master->machines[id].info.clear_unavailability();
         updated.insert(id);
       }
 
@@ -1718,8 +1720,8 @@ Future<Response> Master::Http::maintenanceStatus(const Request& request) const
 
   // Unwrap the master's machine information into two arrays of machines.
   mesos::maintenance::ClusterStatus status;
-  foreachkey (const MachineID& id, master->machineInfos) {
-    switch (master->machineInfos[id].mode()) {
+  foreachkey (const MachineID& id, master->machines) {
+    switch (master->machines[id].info.mode()) {
       case MachineInfo::DRAINING: {
         status.add_draining_machines()->CopyFrom(id);
         break;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7ee6b2/src/master/machine.hpp
----------------------------------------------------------------------
diff --git a/src/master/machine.hpp b/src/master/machine.hpp
new file mode 100644
index 0000000..c0d4afc
--- /dev/null
+++ b/src/master/machine.hpp
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESOS_MASTER_MACHINE_HPP__
+#define __MESOS_MASTER_MACHINE_HPP__
+
+#include <mesos/mesos.hpp>
+
+#include <stout/hashset.hpp>
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+// A C++ Wrapper object for MachineInfo that also stores some extra information.
+struct Machine
+{
+  // A default constructor to allow use of the `[]` operator.
+  Machine() {}
+
+  Machine(const MachineInfo& _info) : info(_info) {}
+
+  // The state of the machine represented as a protobuf.
+  MachineInfo info;
+
+  // The set of slaves currently registered from this machine.
+  hashset<SlaveID> slaves;
+};
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESOS_MASTER_MACHINE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7ee6b2/src/master/maintenance.cpp
----------------------------------------------------------------------
diff --git a/src/master/maintenance.cpp b/src/master/maintenance.cpp
index 277dd82..87308a6 100644
--- a/src/master/maintenance.cpp
+++ b/src/master/maintenance.cpp
@@ -201,7 +201,7 @@ namespace validation {
 
 Try<Nothing> schedule(
     const maintenance::Schedule& schedule,
-    const hashmap<MachineID, MachineInfo>& infos)
+    const hashmap<MachineID, Machine>& machines)
 {
   hashset<MachineID> updated;
   foreach (const maintenance::Window& window, schedule.windows()) {
@@ -238,8 +238,8 @@ Try<Nothing> schedule(
   }
 
   // Ensure that no `DOWN` machine is removed from the schedule.
-  foreachpair (const MachineID& id, const MachineInfo& info, infos) {
-    if (info.mode() == MachineInfo::DOWN && !updated.contains(id)) {
+  foreachpair (const MachineID& id, const Machine& machine, machines) {
+    if (machine.info.mode() == MachineInfo::DOWN && !updated.contains(id)) {
       return Error(
           "Machine '" + id.DebugString() +
             "' is deactivated and cannot be removed from the schedule");

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7ee6b2/src/master/maintenance.hpp
----------------------------------------------------------------------
diff --git a/src/master/maintenance.hpp b/src/master/maintenance.hpp
index bebaeb2..8d134aa 100644
--- a/src/master/maintenance.hpp
+++ b/src/master/maintenance.hpp
@@ -27,6 +27,7 @@
 #include <stout/nothing.hpp>
 #include <stout/try.hpp>
 
+#include "master/machine.hpp"
 #include "master/registrar.hpp"
 #include "master/registry.hpp"
 
@@ -124,7 +125,7 @@ namespace validation {
  */
 Try<Nothing> schedule(
     const mesos::maintenance::Schedule& schedule,
-    const hashmap<MachineID, MachineInfo>& infos);
+    const hashmap<MachineID, Machine>& machines);
 
 
 // Checks that the `duration` of the unavailability is non-negative.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7ee6b2/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c90311f..31fc83d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1397,7 +1397,7 @@ Future<Nothing> Master::_recover(const Registry& registry)
 
   // Save the machine info for each machine.
   foreach (const Registry::Machine& machine, registry.machines().machines()) {
-    machineInfos[machine.info().id()] = machine.info();
+    machines[machine.info().id()] = Machine(machine.info());
   }
 
   // Recovery is now complete!
@@ -3744,9 +3744,14 @@ void Master::_registerSlave(
         stringify(slaveInfo.id()));
     send(pid, message);
   } else {
+    MachineID machineId;
+    machineId.set_hostname(slaveInfo.hostname());
+    machineId.set_ip(stringify(pid.address.ip));
+
     Slave* slave = new Slave(
         slaveInfo,
         pid,
+        machineId,
         version.empty() ? Option<string>::none() : version,
         Clock::now(),
         checkpointedResources);
@@ -3942,9 +3947,14 @@ void Master::_reregisterSlave(
     send(pid, message);
   } else {
     // Re-admission succeeded.
+    MachineID machineId;
+    machineId.set_hostname(slaveInfo.hostname());
+    machineId.set_ip(stringify(pid.address.ip));
+
     Slave* slave = new Slave(
         slaveInfo,
         pid,
+        machineId,
         version.empty() ? Option<string>::none() : version,
         Clock::now(),
         checkpointedResources,
@@ -5355,6 +5365,10 @@ void Master::addSlave(
 
   link(slave->pid);
 
+  // Map the slave to the machine it is running on.
+  CHECK(!machines[slave->machineId].slaves.contains(slave->id));
+  machines[slave->machineId].slaves.insert(slave->id);
+
   // Set up an observer for the slave.
   slave->observer = new SlaveObserver(
       slave->pid,
@@ -5499,6 +5513,11 @@ void Master::removeSlave(
   slaves.removed.put(slave->id, Nothing());
   authenticated.erase(slave->pid);
 
+  // Remove the slave from the `machines` mapping.
+  CHECK(machines.contains(slave->machineId));
+  CHECK(machines[slave->machineId].slaves.contains(slave->id));
+  machines[slave->machineId].slaves.erase(slave->id);
+
   // Kill the slave observer.
   terminate(slave->observer);
   wait(slave->observer);

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e7ee6b2/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 12cc1ad..d7d27bd 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -70,6 +70,7 @@
 #include "master/contender.hpp"
 #include "master/detector.hpp"
 #include "master/flags.hpp"
+#include "master/machine.hpp"
 #include "master/metrics.hpp"
 #include "master/registrar.hpp"
 #include "master/validation.hpp"
@@ -109,6 +110,7 @@ struct Slave
 {
   Slave(const SlaveInfo& _info,
         const process::UPID& _pid,
+        const MachineID& _machineId,
         const Option<std::string> _version,
         const process::Time& _registeredTime,
         const Resources& _checkpointedResources,
@@ -118,6 +120,7 @@ struct Slave
           std::vector<Task>())
     : id(_info.id()),
       info(_info),
+      machineId(_machineId),
       pid(_pid),
       version(_version),
       registeredTime(_registeredTime),
@@ -280,6 +283,8 @@ struct Slave
   const SlaveID id;
   const SlaveInfo info;
 
+  const MachineID machineId;
+
   process::UPID pid;
 
   // The Mesos version of the slave. If set, the slave is >= 0.21.0.
@@ -954,9 +959,10 @@ private:
 
   MasterInfo info_;
 
-  // Holds some info which affects how a machine behaves.
-  // See the `MachineInfo` protobuf for more information.
-  hashmap<MachineID, MachineInfo> machineInfos;
+  // Holds some info which affects how a machine behaves, as well as state that
+  // represent the master's view of this machine. See the `MachineInfo` protobuf
+  // and `Machine` struct for more information.
+  hashmap<MachineID, Machine> machines;
 
   struct Maintenance
   {