You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/14 19:58:51 UTC
[07/16] mesos git commit: Maintenance Primitives: Added
updateUnavailability to master.
Maintenance Primitives: Added updateUnavailability to master.
Review: https://reviews.apache.org/r/37175
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f87f733d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f87f733d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f87f733d
Branch: refs/heads/master
Commit: f87f733dbd34e39c91125fabe541269aea806266
Parents: ea48105
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:40:10 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400
----------------------------------------------------------------------
include/mesos/master/allocator.hpp | 8 +++
src/master/allocator/mesos/allocator.hpp | 21 +++++++
src/master/allocator/mesos/hierarchical.hpp | 29 +++++++++
src/master/http.cpp | 15 ++---
src/master/master.cpp | 77 ++++++++++++++++++++++++
src/master/master.hpp | 4 ++
src/tests/master_maintenance_tests.cpp | 21 ++++---
src/tests/mesos.hpp | 15 +++++
8 files changed, 174 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 257d2f6..b5bfc28 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -135,6 +135,14 @@ public:
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations) = 0;
+ // We currently support storing the next unavailability, if there is one, per
+ // slave. If `unavailability` is not set then there is no known upcoming
+ // unavailability. This might require the implementation of the function to
+ // remove any inverse offers that are outstanding.
+ virtual void updateUnavailability(
+ const SlaveID& slaveId,
+ const Option<Unavailability>& unavailability) = 0;
+
// Informs the Allocator to recover resources that are considered
// used by the framework.
virtual void recoverResources(
http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index c845723..ee6ec58 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -108,6 +108,10 @@ public:
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations);
+ void updateUnavailability(
+ const SlaveID& slaveId,
+ const Option<Unavailability>& unavailability);
+
void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -199,6 +203,10 @@ public:
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations) = 0;
+ virtual void updateUnavailability(
+ const SlaveID& slaveId,
+ const Option<Unavailability>& unavailability) = 0;
+
virtual void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -433,6 +441,19 @@ MesosAllocator<AllocatorProcess>::updateAvailable(
template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateUnavailability(
+ const SlaveID& slaveId,
+ const Option<Unavailability>& unavailability)
+{
+ return process::dispatch(
+ process,
+ &MesosAllocatorProcess::updateUnavailability,
+ slaveId,
+ unavailability);
+}
+
+
+template <typename AllocatorProcess>
inline void MesosAllocator<AllocatorProcess>::recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index f86a701..77a5b4c 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -146,6 +146,10 @@ public:
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations);
+ void updateUnavailability(
+ const SlaveID& slaveId,
+ const Option<Unavailability>& unavailability);
+
void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -816,6 +820,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable(
template <class RoleSorter, class FrameworkSorter>
void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
+ const SlaveID& slaveId,
+ const Option<Unavailability>& unavailability)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ // NOTE: We currently implement maintenance in the allocator to be able to
+ // leverage state and features such as the FrameworkSorter and Filters.
+
+ // Remove any old unavailability.
+ slaves[slaveId].maintenance = None();
+
+ // If we have a new unavailability.
+ if (unavailability.isSome()) {
+ slaves[slaveId].maintenance =
+ typename Slave::Maintenance(unavailability.get());
+ }
+
+ allocate(slaveId);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index a814930..05b590e 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -1494,19 +1494,19 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
}
}
- // NOTE: Copies are needed because this loop modifies the container.
+ // NOTE: Copies are needed because `updateUnavailability()` in this loop
+ // modifies the container.
foreachkey (const MachineID& id, utils::copy(master->machines)) {
// Update the entry for each updated machine.
if (updated.contains(id)) {
- master->machines[id]
- .info.mutable_unavailability()->CopyFrom(updated[id]);
-
+ master->updateUnavailability(id, updated[id]);
continue;
}
- // Remove the unavailability for each removed machine.
- master->machines[id].info.clear_unavailability();
+ // Transition each removed machine back to the `UP` mode and remove the
+ // unavailability.
master->machines[id].info.set_mode(MachineInfo::UP);
+ master->updateUnavailability(id, None());
}
// Save each new machine, with the unavailability
@@ -1516,9 +1516,10 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const
MachineInfo info;
info.mutable_id()->CopyFrom(id);
info.set_mode(MachineInfo::DRAINING);
- info.mutable_unavailability()->CopyFrom(window.unavailability());
master->machines[id].info.CopyFrom(info);
+
+ master->updateUnavailability(id, window.unavailability());
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 1bed6a6..0b3ba56 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -50,6 +50,7 @@
#include <process/metrics/metrics.hpp>
#include <stout/check.hpp>
+#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/ip.hpp>
#include <stout/lambda.hpp>
@@ -3846,6 +3847,26 @@ void Master::reregisterSlave(
// based authentication).
LOG(INFO) << "Re-registering slave " << *slave;
+ // We don't allow re-registering this way with a different IP or
+ // hostname. This is because maintenance is scheduled at the
+ // machine level; so we would need to re-validate the slave's
+ // unavailability if the machine it is running on changed.
+ if (slave->pid.address.ip != from.address.ip ||
+ slave->info.hostname() != slaveInfo.hostname()) {
+ LOG(WARNING) << "Slave " << slaveInfo.id() << " at " << from
+ << " (" << slaveInfo.hostname() << ") attempted to "
+ << "re-register with different IP / hostname; expected "
+ << slave->pid.address.ip << " (" << slave->info.hostname()
+ << ") shutting it down";
+
+ ShutdownMessage message;
+ message.set_message(
+ "Slave attempted to re-register with different IP / hostname");
+
+ send(from, message);
+ return;
+ }
+
// Update the slave pid and relink to it.
// NOTE: Re-linking the slave here always rather than only when
// the slave is disconnected can lead to multiple exited events
@@ -4102,6 +4123,62 @@ void Master::updateSlave(
}
+void Master::updateUnavailability(
+ const MachineID& machineId,
+ const Option<Unavailability>& unavailability)
+{
+ if (unavailability.isSome()) {
+ machines[machineId].info.mutable_unavailability()->CopyFrom(
+ unavailability.get());
+ } else {
+ machines[machineId].info.clear_unavailability();
+ }
+
+ // TODO(jmlvanre): Only update allocator and rescind offers if the
+ // unavailability has actually changed.
+ if (machines.contains(machineId)) {
+ // For every slave on this machine, update the allocator.
+ foreach (const SlaveID& slaveId, machines[machineId].slaves) {
+ // The slave should not be in the machines mapping if it is removed.
+ CHECK(slaves.removed.get(slaveId).isNone());
+
+ // The slave should be registered if it is in the machines mapping.
+ CHECK(slaves.registered.contains(slaveId));
+
+ Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId));
+
+ if (unavailability.isSome()) {
+ // TODO(jmlvanre): Add stream operator for unavailability.
+ LOG(INFO) << "Updating unavailability of slave " << *slave
+ << ", starting at "
+ << Nanoseconds(unavailability.get().start().nanoseconds());
+ } else {
+ LOG(INFO) << "Removing unavailability of slave " << *slave;
+ }
+
+ // Remove and rescind offers since we want to inform frameworks of the
+ // unavailability change as soon as possible.
+ foreach (Offer* offer, utils::copy(slave->offers)) {
+ allocator->recoverResources(
+ offer->framework_id(), slave->id, offer->resources(), None());
+
+ removeOffer(offer, true); // Rescind!
+ }
+
+ // We remove / resind all the offers first so that any calls to the
+ // allocator to modify its internal state are queued before the update of
+ // the unavailability in the allocator. We do this so that the allocator's
+ // state can start from a "clean slate" for the new unavailability.
+ // NOTE: Any calls from the Allocator back into the master, for example
+ // `offer()`, are guaranteed to happen after this function exits due to
+ // the Actor pattern.
+
+ allocator->updateUnavailability(slaveId, unavailability);
+ }
+ }
+}
+
+
// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid'
// because the status updates will be sent by the slave.
void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d7d27bd..cd71a25 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -468,6 +468,10 @@ public:
const SlaveID& slaveId,
const Resources& oversubscribedResources);
+ void updateUnavailability(
+ const MachineID& machineId,
+ const Option<Unavailability>& unavailability);
+
void shutdownSlave(
const SlaveID& slaveId,
const std::string& message);
http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 5811446..a857ab9 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -327,6 +327,12 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
.WillOnce(FutureArg<1>(&unavailabilityOffers))
.WillRepeatedly(Return()); // Ignore subsequent offers.
+ // The original offers should be rescinded when the unavailability
+ // is changed.
+ Future<Nothing> offerRescinded;
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .WillOnce(FutureSatisfy(&offerRescinded));
+
// Start the test.
driver.start();
@@ -337,10 +343,6 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
// Check that unavailability is not set.
foreach (const Offer& offer, normalOffers.get()) {
EXPECT_FALSE(offer.has_unavailability());
-
- // We have a few seconds between allocations (by default). That should
- // be enough time to post a schedule before the next allocation.
- driver.declineOffer(offer.id());
}
// Schedule this slave for maintenance.
@@ -355,9 +357,13 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
const Unavailability unavailability = createUnavailability(start, duration);
// Post a valid schedule with one machine.
- maintenance::Schedule schedule = createSchedule({
- createWindow({machine}, unavailability)});
+ maintenance::Schedule schedule = createSchedule(
+ {createWindow({machine}, unavailability)});
+ // We have a few seconds between the first set of offers and the
+ // next allocation of offers. This should be enough time to perform
+ // a maintenance schedule update. This update will also trigger the
+ // rescinding of offers from the scheduled slave.
Future<Response> response = process::http::post(
master.get(),
"maintenance/schedule",
@@ -366,9 +372,6 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
- // Speed up the test by not waiting until the next allocation.
- driver.reviveOffers();
-
// Wait for some offers.
AWAIT_READY(unavailabilityOffers);
EXPECT_NE(0u, unavailabilityOffers.get().size());
http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 4b65440..477b7e4 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1353,6 +1353,12 @@ ACTION_P(InvokeUpdateResources, allocator)
}
+ACTION_P(InvokeUpdateUnavailability, allocator)
+{
+ return allocator->real->updateUnavailability(arg0, arg1);
+}
+
+
ACTION_P(InvokeRecoverResources, allocator)
{
allocator->real->recoverResources(arg0, arg1, arg2, arg3);
@@ -1476,6 +1482,11 @@ public:
EXPECT_CALL(*this, updateAvailable(_, _))
.WillRepeatedly(DoDefault());
+ ON_CALL(*this, updateUnavailability(_, _))
+ .WillByDefault(InvokeUpdateUnavailability(this));
+ EXPECT_CALL(*this, updateUnavailability(_, _))
+ .WillRepeatedly(DoDefault());
+
ON_CALL(*this, recoverResources(_, _, _, _))
.WillByDefault(InvokeRecoverResources(this));
EXPECT_CALL(*this, recoverResources(_, _, _, _))
@@ -1550,6 +1561,10 @@ public:
const SlaveID&,
const std::vector<Offer::Operation>&));
+ MOCK_METHOD2(updateUnavailability, void(
+ const SlaveID&,
+ const Option<Unavailability>&));
+
MOCK_METHOD4(recoverResources, void(
const FrameworkID&,
const SlaveID&,