You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/14 19:58:51 UTC

[07/16] mesos git commit: Maintenance Primitives: Added updateUnavailability to master.

Maintenance Primitives: Added updateUnavailability to master.

Review: https://reviews.apache.org/r/37175


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f87f733d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f87f733d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f87f733d

Branch: refs/heads/master
Commit: f87f733dbd34e39c91125fabe541269aea806266
Parents: ea48105
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:40:10 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          |  8 +++
 src/master/allocator/mesos/allocator.hpp    | 21 +++++++
 src/master/allocator/mesos/hierarchical.hpp | 29 +++++++++
 src/master/http.cpp                         | 15 ++---
 src/master/master.cpp                       | 77 ++++++++++++++++++++++++
 src/master/master.hpp                       |  4 ++
 src/tests/master_maintenance_tests.cpp      | 21 ++++---
 src/tests/mesos.hpp                         | 15 +++++
 8 files changed, 174 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 257d2f6..b5bfc28 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -135,6 +135,14 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations) = 0;
 
+  // We currently support storing the next unavailability, if there is one, per
+  // slave. If `unavailability` is not set then there is no known upcoming
+  // unavailability. This might require the implementation of the function to
+  // remove any inverse offers that are outstanding.
+  virtual void updateUnavailability(
+      const SlaveID& slaveId,
+      const Option<Unavailability>& unavailability) = 0;
+
   // Informs the Allocator to recover resources that are considered
   // used by the framework.
   virtual void recoverResources(

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index c845723..ee6ec58 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -108,6 +108,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations);
 
+  void updateUnavailability(
+      const SlaveID& slaveId,
+      const Option<Unavailability>& unavailability);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -199,6 +203,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations) = 0;
 
+  virtual void updateUnavailability(
+      const SlaveID& slaveId,
+      const Option<Unavailability>& unavailability) = 0;
+
   virtual void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -433,6 +441,19 @@ MesosAllocator<AllocatorProcess>::updateAvailable(
 
 
 template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateUnavailability(
+    const SlaveID& slaveId,
+    const Option<Unavailability>& unavailability)
+{
+  return process::dispatch(
+      process,
+      &MesosAllocatorProcess::updateUnavailability,
+      slaveId,
+      unavailability);
+}
+
+
+template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index f86a701..77a5b4c 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -146,6 +146,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations);
 
+  void updateUnavailability(
+      const SlaveID& slaveId,
+      const Option<Unavailability>& unavailability);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -816,6 +820,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable(
 
 template <class RoleSorter, class FrameworkSorter>
 void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
+    const SlaveID& slaveId,
+    const Option<Unavailability>& unavailability)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  // NOTE: We currently implement maintenance in the allocator to be able to
+  // leverage state and features such as the FrameworkSorter and Filters.
+
+  // Remove any old unavailability.
+  slaves[slaveId].maintenance = None();
+
+  // If we have a new unavailability.
+  if (unavailability.isSome()) {
+    slaves[slaveId].maintenance =
+      typename Slave::Maintenance(unavailability.get());
+  }
+
+  allocate(slaveId);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index a814930..05b590e 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1494,19 +1494,19 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
         }
       }
 
-      // NOTE: Copies are needed because this loop modifies the container.
+      // NOTE: Copies are needed because `updateUnavailability()` in this loop
+      // modifies the container.
       foreachkey (const MachineID& id, utils::copy(master->machines)) {
         // Update the entry for each updated machine.
         if (updated.contains(id)) {
-          master->machines[id]
-            .info.mutable_unavailability()->CopyFrom(updated[id]);
-
+          master->updateUnavailability(id, updated[id]);
           continue;
         }
 
-        // Remove the unavailability for each removed machine.
-        master->machines[id].info.clear_unavailability();
+        // Transition each removed machine back to the `UP` mode and remove the
+        // unavailability.
         master->machines[id].info.set_mode(MachineInfo::UP);
+        master->updateUnavailability(id, None());
       }
 
       // Save each new machine, with the unavailability
@@ -1516,9 +1516,10 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
           MachineInfo info;
           info.mutable_id()->CopyFrom(id);
           info.set_mode(MachineInfo::DRAINING);
-          info.mutable_unavailability()->CopyFrom(window.unavailability());
 
           master->machines[id].info.CopyFrom(info);
+
+          master->updateUnavailability(id, window.unavailability());
         }
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1bed6a6..0b3ba56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -50,6 +50,7 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/check.hpp>
+#include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/ip.hpp>
 #include <stout/lambda.hpp>
@@ -3846,6 +3847,26 @@ void Master::reregisterSlave(
     // based authentication).
     LOG(INFO) << "Re-registering slave " << *slave;
 
+    // We don't allow re-registering this way with a different IP or
+    // hostname. This is because maintenance is scheduled at the
+    // machine level; so we would need to re-validate the slave's
+    // unavailability if the machine it is running on changed.
+    if (slave->pid.address.ip != from.address.ip ||
+        slave->info.hostname() != slaveInfo.hostname()) {
+      LOG(WARNING) << "Slave " << slaveInfo.id() << " at " << from
+                   << " (" << slaveInfo.hostname() << ") attempted to "
+                   << "re-register with different IP / hostname; expected "
+                   << slave->pid.address.ip << " (" << slave->info.hostname()
+                   << ") shutting it down";
+
+      ShutdownMessage message;
+      message.set_message(
+          "Slave attempted to re-register with different IP / hostname");
+
+      send(from, message);
+      return;
+    }
+
     // Update the slave pid and relink to it.
     // NOTE: Re-linking the slave here always rather than only when
     // the slave is disconnected can lead to multiple exited events
@@ -4102,6 +4123,62 @@ void Master::updateSlave(
 }
 
 
+void Master::updateUnavailability(
+    const MachineID& machineId,
+    const Option<Unavailability>& unavailability)
+{
+  if (unavailability.isSome()) {
+    machines[machineId].info.mutable_unavailability()->CopyFrom(
+        unavailability.get());
+  } else {
+    machines[machineId].info.clear_unavailability();
+  }
+
+  // TODO(jmlvanre): Only update allocator and rescind offers if the
+  // unavailability has actually changed.
+  if (machines.contains(machineId)) {
+    // For every slave on this machine, update the allocator.
+    foreach (const SlaveID& slaveId, machines[machineId].slaves) {
+      // The slave should not be in the machines mapping if it is removed.
+      CHECK(slaves.removed.get(slaveId).isNone());
+
+      // The slave should be registered if it is in the machines mapping.
+      CHECK(slaves.registered.contains(slaveId));
+
+      Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId));
+
+      if (unavailability.isSome()) {
+        // TODO(jmlvanre): Add stream operator for unavailability.
+        LOG(INFO) << "Updating unavailability of slave " << *slave
+                  << ", starting at "
+                  << Nanoseconds(unavailability.get().start().nanoseconds());
+      } else {
+        LOG(INFO) << "Removing unavailability of slave " << *slave;
+      }
+
+      // Remove and rescind offers since we want to inform frameworks of the
+      // unavailability change as soon as possible.
+      foreach (Offer* offer, utils::copy(slave->offers)) {
+        allocator->recoverResources(
+            offer->framework_id(), slave->id, offer->resources(), None());
+
+        removeOffer(offer, true); // Rescind!
+      }
+
+      // We remove / resind all the offers first so that any calls to the
+      // allocator to modify its internal state are queued before the update of
+      // the unavailability in the allocator. We do this so that the allocator's
+      // state can start from a "clean slate" for the new unavailability.
+      // NOTE: Any calls from the Allocator back into the master, for example
+      // `offer()`, are guaranteed to happen after this function exits due to
+      // the Actor pattern.
+
+      allocator->updateUnavailability(slaveId, unavailability);
+    }
+  }
+}
+
+
 // TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid'
 // because the status updates will be sent by the slave.
 void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d7d27bd..cd71a25 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -468,6 +468,10 @@ public:
       const SlaveID& slaveId,
       const Resources& oversubscribedResources);
 
+  void updateUnavailability(
+      const MachineID& machineId,
+      const Option<Unavailability>& unavailability);
+
   void shutdownSlave(
       const SlaveID& slaveId,
       const std::string& message);

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 5811446..a857ab9 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -327,6 +327,12 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
     .WillOnce(FutureArg<1>(&unavailabilityOffers))
     .WillRepeatedly(Return()); // Ignore subsequent offers.
 
+  // The original offers should be rescinded when the unavailability
+  // is changed.
+  Future<Nothing> offerRescinded;
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillOnce(FutureSatisfy(&offerRescinded));
+
   // Start the test.
   driver.start();
 
@@ -337,10 +343,6 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
   // Check that unavailability is not set.
   foreach (const Offer& offer, normalOffers.get()) {
     EXPECT_FALSE(offer.has_unavailability());
-
-    // We have a few seconds between allocations (by default).  That should
-    // be enough time to post a schedule before the next allocation.
-    driver.declineOffer(offer.id());
   }
 
   // Schedule this slave for maintenance.
@@ -355,9 +357,13 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
   const Unavailability unavailability = createUnavailability(start, duration);
 
   // Post a valid schedule with one machine.
-  maintenance::Schedule schedule = createSchedule({
-      createWindow({machine}, unavailability)});
+  maintenance::Schedule schedule = createSchedule(
+      {createWindow({machine}, unavailability)});
 
+  // We have a few seconds between the first set of offers and the
+  // next allocation of offers.  This should be enough time to perform
+  // a maintenance schedule update.  This update will also trigger the
+  // rescinding of offers from the scheduled slave.
   Future<Response> response = process::http::post(
       master.get(),
       "maintenance/schedule",
@@ -366,9 +372,6 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
 
-  // Speed up the test by not waiting until the next allocation.
-  driver.reviveOffers();
-
   // Wait for some offers.
   AWAIT_READY(unavailabilityOffers);
   EXPECT_NE(0u, unavailabilityOffers.get().size());

http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 4b65440..477b7e4 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1353,6 +1353,12 @@ ACTION_P(InvokeUpdateResources, allocator)
 }
 
 
+ACTION_P(InvokeUpdateUnavailability, allocator)
+{
+  return allocator->real->updateUnavailability(arg0, arg1);
+}
+
+
 ACTION_P(InvokeRecoverResources, allocator)
 {
   allocator->real->recoverResources(arg0, arg1, arg2, arg3);
@@ -1476,6 +1482,11 @@ public:
     EXPECT_CALL(*this, updateAvailable(_, _))
       .WillRepeatedly(DoDefault());
 
+    ON_CALL(*this, updateUnavailability(_, _))
+      .WillByDefault(InvokeUpdateUnavailability(this));
+    EXPECT_CALL(*this, updateUnavailability(_, _))
+      .WillRepeatedly(DoDefault());
+
     ON_CALL(*this, recoverResources(_, _, _, _))
       .WillByDefault(InvokeRecoverResources(this));
     EXPECT_CALL(*this, recoverResources(_, _, _, _))
@@ -1550,6 +1561,10 @@ public:
       const SlaveID&,
       const std::vector<Offer::Operation>&));
 
+  MOCK_METHOD2(updateUnavailability, void(
+      const SlaveID&,
+      const Option<Unavailability>&));
+
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,
       const SlaveID&,