You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/20 20:37:35 UTC
[1/5] mesos git commit: Renamed Filter to OfferFilter.
Repository: mesos
Updated Branches:
refs/heads/master e91e45540 -> a92ff3cd7
Renamed Filter to OfferFilter.
Review: https://reviews.apache.org/r/38244
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f3883949
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f3883949
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f3883949
Branch: refs/heads/master
Commit: f38839494edf62b2a16196506390f7d0add0a8cb
Parents: e91e455
Author: Artem Harutyunyan <ar...@mesosphere.io>
Authored: Sat Sep 19 14:18:33 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:03 2015 -0400
----------------------------------------------------------------------
src/master/allocator/mesos/hierarchical.hpp | 83 +++++++++++++-----------
1 file changed, 46 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f3883949/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 8f2232a..e944120 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -51,7 +51,7 @@ namespace master {
namespace allocator {
// Forward declarations.
-class Filter;
+class OfferFilter;
// We forward declare the hierarchical allocator process so that we
@@ -195,12 +195,12 @@ protected:
void expire(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- Filter* filter);
+ OfferFilter* offerFilter);
// Checks whether the slave is whitelisted.
bool isWhitelisted(const SlaveID& slaveId);
- // Returns true if there is a filter for this framework
+ // Returns true if there is a resource offer filter for this framework
// on this slave.
bool isFiltered(
const FrameworkID& frameworkId,
@@ -250,8 +250,8 @@ protected:
// Whether the framework desires revocable resources.
bool revocable;
- // Active filters for the framework.
- hashmap<SlaveID, hashset<Filter*>> filters;
+ // Active filters on offers for the framework.
+ hashmap<SlaveID, hashset<OfferFilter*>> offerFilters;
};
double _event_queue_dispatches()
@@ -291,7 +291,7 @@ protected:
// slave, and the responses from frameworks as to whether they will be able
// to gracefully handle this unavailability.
// NOTE: We currently implement maintenance in the allocator to be able to
- // leverage state and features such as the FrameworkSorter and Filters.
+ // leverage state and features such as the FrameworkSorter and OfferFilter.
struct Maintenance
{
Maintenance(const Unavailability& _unavailability)
@@ -348,19 +348,19 @@ protected:
// Used to represent "filters" for resources unused in offers.
-class Filter
+class OfferFilter
{
public:
- virtual ~Filter() {}
+ virtual ~OfferFilter() {}
virtual bool filter(const Resources& resources) = 0;
};
-class RefusedFilter: public Filter
+class RefusedOfferFilter: public OfferFilter
{
public:
- RefusedFilter(
+ RefusedOfferFilter(
const Resources& _resources,
const process::Timeout& _timeout)
: resources(_resources), timeout(_timeout) {}
@@ -490,7 +490,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework(
}
// Do not delete the filters contained in this
- // framework's 'filters' hashset yet, see comments in
+ // framework's `offerFilters` hashset yet, see comments in
// HierarchicalAllocatorProcess::reviveOffers and
// HierarchicalAllocatorProcess::expire.
frameworks.erase(frameworkId);
@@ -536,10 +536,10 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
// the added/removed and activated/deactivated in the future.
// Do not delete the filters contained in this
- // framework's 'filters' hashset yet, see comments in
+ // framework's `offerFilters` hashset yet, see comments in
// HierarchicalAllocatorProcess::reviveOffers and
// HierarchicalAllocatorProcess::expire.
- frameworks[frameworkId].filters.clear();
+ frameworks[frameworkId].offerFilters.clear();
LOG(INFO) << "Deactivated framework " << frameworkId;
}
@@ -611,7 +611,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
slaves[slaveId].hostname = slaveInfo.hostname();
// NOTE: We currently implement maintenance in the allocator to be able to
- // leverage state and features such as the FrameworkSorter and Filters.
+ // leverage state and features such as the FrameworkSorter and OfferFilter.
if (unavailability.isSome()) {
slaves[slaveId].maintenance =
typename Slave::Maintenance(unavailability.get());
@@ -857,7 +857,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
CHECK(slaves.contains(slaveId));
// NOTE: We currently implement maintenance in the allocator to be able to
- // leverage state and features such as the FrameworkSorter and Filters.
+ // leverage state and features such as the FrameworkSorter and OfferFilter.
// Remove any old unavailability.
slaves[slaveId].maintenance = None();
@@ -885,7 +885,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
CHECK(slaves[slaveId].maintenance.isSome());
// NOTE: We currently implement maintenance in the allocator to be able to
- // leverage state and features such as the FrameworkSorter and Filters.
+ // leverage state and features such as the FrameworkSorter and OfferFilter.
// We use a reference by alias because we intend to modify the
// `maintenance` and to improve readability.
@@ -1001,13 +1001,19 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
<< " for " << seconds.get();
// Create a new filter and delay its expiration.
- Filter* filter = new RefusedFilter(
+ OfferFilter* offerFilter = new RefusedOfferFilter(
resources,
process::Timeout::in(seconds.get()));
- frameworks[frameworkId].filters[slaveId].insert(filter);
+ frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
- delay(seconds.get(), self(), &Self::expire, frameworkId, slaveId, filter);
+ delay(
+ seconds.get(),
+ self(),
+ &Self::expire,
+ frameworkId,
+ slaveId,
+ offerFilter);
}
}
@@ -1029,17 +1035,17 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
{
CHECK(initialized);
- frameworks[frameworkId].filters.clear();
+ frameworks[frameworkId].offerFilters.clear();
frameworks[frameworkId].quiesced = false;
- // We delete each actual Filter when
- // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
- // Filter here it's possible that the same Filter (i.e., same
- // address) could get reused and HierarchicalAllocatorProcess::expire
+ // We delete each actual `OfferFilter` when
+ // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
+ // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
+ // address) could get reused and `HierarchicalAllocatorProcess::expire`
// would expire that filter too soon. Note that this only works
// right now because ALL Filter types "expire".
- LOG(INFO) << "Removed filters for framework " << frameworkId;
+ LOG(INFO) << "Removed offer filters for framework " << frameworkId;
allocate();
}
@@ -1266,7 +1272,7 @@ void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- Filter* filter)
+ OfferFilter* offerFilter)
{
// The filter might have already been removed (e.g., if the
// framework no longer exists or in
@@ -1274,15 +1280,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
// keep the address from getting reused possibly causing premature
// expiration).
if (frameworks.contains(frameworkId) &&
- frameworks[frameworkId].filters.contains(slaveId) &&
- frameworks[frameworkId].filters[slaveId].contains(filter)) {
- frameworks[frameworkId].filters[slaveId].erase(filter);
- if (frameworks[frameworkId].filters[slaveId].empty()) {
- frameworks[frameworkId].filters.erase(slaveId);
+ frameworks[frameworkId].offerFilters.contains(slaveId) &&
+ frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) {
+ frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter);
+ if (frameworks[frameworkId].offerFilters[slaveId].empty()) {
+ frameworks[frameworkId].offerFilters.erase(slaveId);
}
}
- delete filter;
+ delete offerFilter;
}
@@ -1312,18 +1318,21 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
// framework. This is a short term fix until the following is resolved:
// https://issues.apache.org/jira/browse/MESOS-444.
if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
- VLOG(1) << "Filtered " << resources
+ VLOG(1) << "Filtered offer with " << resources
<< " on non-checkpointing slave " << slaveId
<< " for checkpointing framework " << frameworkId;
+
return true;
}
- if (frameworks[frameworkId].filters.contains(slaveId)) {
- foreach (Filter* filter, frameworks[frameworkId].filters[slaveId]) {
- if (filter->filter(resources)) {
- VLOG(1) << "Filtered " << resources
+ if (frameworks[frameworkId].offerFilters.contains(slaveId)) {
+ foreach (
+ OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) {
+ if (offerFilter->filter(resources)) {
+ VLOG(1) << "Filtered offer with " << resources
<< " on slave " << slaveId
<< " for framework " << frameworkId;
+
return true;
}
}
[4/5] mesos git commit: Maintenance Primitives: Added test for
inverse offer filters.
Posted by jo...@apache.org.
Maintenance Primitives: Added test for inverse offer filters.
Checks that filters change which inverse offer is sent when.
Review: https://reviews.apache.org/r/38475
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31defd41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31defd41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31defd41
Branch: refs/heads/master
Commit: 31defd41bae4670247aa5b92f799234bcad9200b
Parents: 9d03297
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Sat Sep 19 14:24:44 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:22 2015 -0400
----------------------------------------------------------------------
src/tests/master_maintenance_tests.cpp | 325 +++++++++++++++++++++++++++-
1 file changed, 320 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/31defd41/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 9892bc3..c5277a1 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -51,6 +51,7 @@
#include "slave/flags.hpp"
+#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
@@ -83,6 +84,7 @@ using std::vector;
using testing::AtMost;
using testing::DoAll;
+using testing::Not;
namespace mesos {
namespace internal {
@@ -1167,11 +1169,6 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
mesos.send(call);
}
- // TODO(hartem): The filters in this test do not actually
- // do anything, because inverse offer filters have not been
- // implemented yet. Instead, the accept/decline calls use
- // the default time, which results in a slow test.
-
{
// Decline an inverse offer, with a filter.
Call call;
@@ -1227,6 +1224,324 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
}
+
+// Test ensures that inverse offers support filters.
+TEST_F(MasterMaintenanceTest, InverseOffersFilters)
+{
+ // Set up a master.
+ // NOTE: We don't use `StartMaster()` because we need to access these flags.
+ master::Flags flags = CreateMasterFlags();
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ ExecutorInfo executor1 = CREATE_EXECUTOR_INFO("executor-1", "exit 1");
+ ExecutorInfo executor2 = CREATE_EXECUTOR_INFO("executor-2", "exit 2");
+
+ MockExecutor exec1(executor1.executor_id());
+ MockExecutor exec2(executor2.executor_id());
+
+ hashmap<ExecutorID, Executor*> execs;
+ execs[executor1.executor_id()] = &exec1;
+ execs[executor2.executor_id()] = &exec2;
+
+ TestContainerizer containerizer(execs);
+
+ EXPECT_CALL(exec1, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec1, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ EXPECT_CALL(exec2, registered(_, _, _, _))
+ .Times(1);
+
+ EXPECT_CALL(exec2, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ // Capture the registration message for the first slave.
+ Future<SlaveRegisteredMessage> slave1RegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+ // We need two agents for this test.
+ Try<PID<Slave>> slave1 = StartSlave(&containerizer);
+ ASSERT_SOME(slave1);
+
+ // We need to make sure the first slave registers before we schedule the
+ // machine it is running on for maintenance.
+ AWAIT_READY(slave1RegisteredMessage);
+
+ // Capture the registration message for the second slave.
+ Future<SlaveRegisteredMessage> slave2RegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), Not(slave1.get()));
+
+ slave::Flags slaveFlags2 = MesosTest::CreateSlaveFlags();
+ slaveFlags2.hostname = maintenanceHostname + "-2";
+
+ Try<PID<Slave>> slave2 = StartSlave(&containerizer, slaveFlags2);
+ ASSERT_SOME(slave2);
+
+ // We need to make sure the second slave registers before we schedule the
+ // machine it is running on for maintenance.
+ AWAIT_READY(slave2RegisteredMessage);
+
+ // Before starting any frameworks, put the first machine into `DRAINING` mode.
+ MachineID machine1;
+ machine1.set_hostname(maintenanceHostname);
+ machine1.set_ip(stringify(slave1.get().address.ip));
+
+ MachineID machine2;
+ machine2.set_hostname(slaveFlags2.hostname.get());
+ machine2.set_ip(stringify(slave2.get().address.ip));
+
+ const Time start = Clock::now() + Seconds(60);
+ const Duration duration = Seconds(120);
+ const Unavailability unavailability = createUnavailability(start, duration);
+
+ maintenance::Schedule schedule = createSchedule(
+ {createWindow({machine1, machine2}, unavailability)});
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "maintenance/schedule",
+ headers,
+ stringify(JSON::Protobuf(schedule)));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ // Pause the clock before starting a framework.
+ // This ensures deterministic offer-ing behavior during the test.
+ Clock::pause();
+
+ // Now start a framework.
+ Callbacks callbacks;
+
+ Future<Nothing> connected;
+ EXPECT_CALL(callbacks, connected())
+ .WillOnce(FutureSatisfy(&connected));
+
+ Mesos mesos(
+ master.get(),
+ lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+ lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+ AWAIT_READY(connected);
+
+ Queue<Event> events;
+
+ EXPECT_CALL(callbacks, received(_))
+ .WillRepeatedly(Enqueue(&events));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ mesos.send(call);
+ }
+
+ Future<Event> event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+ v1::FrameworkID id(event.get().subscribed().framework_id());
+
+ // Trigger a batch allocation.
+ Clock::advance(flags.allocation_interval);
+ Clock::settle();
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_EQ(2, event.get().offers().offers().size());
+ EXPECT_EQ(0, event.get().offers().inverse_offers().size());
+
+ // All the offers should have unavailability.
+ foreach (const v1::Offer& offer, event.get().offers().offers()) {
+ EXPECT_TRUE(offer.has_unavailability());
+ }
+
+ // Save both offers.
+ v1::Offer offer1 = event.get().offers().offers(0);
+ v1::Offer offer2 = event.get().offers().offers(1);
+
+ // Spawn dummy tasks using both offers.
+ v1::TaskInfo taskInfo1 =
+ evolve(createTask(devolve(offer1), "exit 1", executor1.executor_id()));
+
+ v1::TaskInfo taskInfo2 =
+ evolve(createTask(devolve(offer2), "exit 2", executor2.executor_id()));
+
+ sleep(2);
+
+ {
+ // Accept the first offer.
+ Call call;
+ call.mutable_framework_id()->CopyFrom(id);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer1.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo1);
+
+ mesos.send(call);
+ }
+
+ {
+ // Accept the second offer.
+ Call call;
+ call.mutable_framework_id()->CopyFrom(id);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer2.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo2);
+
+ mesos.send(call);
+ }
+
+ // The order of events is deterministic from here on.
+ Clock::resume();
+
+ // Expect two inverse offers.
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_EQ(0, event.get().offers().offers().size());
+ EXPECT_EQ(2, event.get().offers().inverse_offers().size());
+
+ // Save these inverse offers.
+ v1::InverseOffer inverseOffer1 = event.get().offers().inverse_offers(0);
+ v1::InverseOffer inverseOffer2 = event.get().offers().inverse_offers(1);
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
+
+ {
+ // Acknowledge TASK_RUNNING update for one task.
+ Call call;
+ call.mutable_framework_id()->CopyFrom(id);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(taskInfo1.task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id());
+ acknowledge->set_uuid(event.get().update().status().uuid());
+
+ mesos.send(call);
+ }
+
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::UPDATE, event.get().type());
+ EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
+
+ {
+ // Acknowledge TASK_RUNNING update for the other task.
+ Call call;
+ call.mutable_framework_id()->CopyFrom(id);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(taskInfo2.task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offer2.agent_id());
+ acknowledge->set_uuid(event.get().update().status().uuid());
+
+ mesos.send(call);
+ }
+
+ {
+ // Decline the second inverse offer, with a filter set such that we
+ // should not see this inverse offer in the next allocation.
+ Call call;
+ call.mutable_framework_id()->CopyFrom(id);
+ call.set_type(Call::DECLINE);
+
+ Call::Decline* decline = call.mutable_decline();
+ decline->add_offer_ids()->CopyFrom(inverseOffer2.id());
+
+ v1::Filters filters;
+ filters.set_refuse_seconds(flags.allocation_interval.secs() + 1);
+ decline->mutable_filters()->CopyFrom(filters);
+
+ mesos.send(call);
+ }
+
+ {
+ // Accept the first inverse offer, with a filter set such that we
+ // should immediately see this inverse offer again.
+ Call call;
+ call.mutable_framework_id()->CopyFrom(id);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(inverseOffer1.id());
+
+ v1::Filters filters;
+ filters.set_refuse_seconds(0);
+ accept->mutable_filters()->CopyFrom(filters);
+
+ mesos.send(call);
+ }
+
+ // Expect one inverse offer.
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_EQ(0, event.get().offers().offers().size());
+ EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+ EXPECT_EQ(
+ inverseOffer1.agent_id(),
+ event.get().offers().inverse_offers(0).agent_id());
+
+ inverseOffer1 = event.get().offers().inverse_offers(0);
+
+ {
+ // Do another immediate filter, but decline it this time.
+ Call call;
+ call.mutable_framework_id()->CopyFrom(id);
+ call.set_type(Call::DECLINE);
+
+ Call::Decline* decline = call.mutable_decline();
+ decline->add_offer_ids()->CopyFrom(inverseOffer1.id());
+
+ v1::Filters filters;
+ filters.set_refuse_seconds(0);
+ decline->mutable_filters()->CopyFrom(filters);
+
+ mesos.send(call);
+ }
+
+ // Expect the same inverse offer.
+ event = events.get();
+ AWAIT_READY(event);
+ EXPECT_EQ(Event::OFFERS, event.get().type());
+ EXPECT_EQ(0, event.get().offers().offers().size());
+ EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+ EXPECT_EQ(
+ inverseOffer1.agent_id(),
+ event.get().offers().inverse_offers(0).agent_id());
+
+ EXPECT_CALL(exec1, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(exec2, shutdown(_))
+ .Times(AtMost(1));
+
+ Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[5/5] mesos git commit: Synchronized global environment manipulation
in TestContainerizer.
Posted by jo...@apache.org.
Synchronized global environment manipulation in TestContainerizer.
This fixes non-deterministic behavior in the TestContainerizer. There
is still an open issue mentioned in the comments and documented in
MESOS-3475.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a92ff3cd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a92ff3cd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a92ff3cd
Branch: refs/heads/master
Commit: a92ff3cd7388cfcf948e4ffa3dabcad98a29e3a8
Parents: 31defd4
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Sep 20 14:04:42 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:28 2015 -0400
----------------------------------------------------------------------
src/tests/containerizer.cpp | 105 +++++++++++++++++++++++----------------
1 file changed, 62 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a92ff3cd/src/tests/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer.cpp b/src/tests/containerizer.cpp
index 5134e63..1f74315 100644
--- a/src/tests/containerizer.cpp
+++ b/src/tests/containerizer.cpp
@@ -17,6 +17,11 @@
*/
#include "tests/containerizer.hpp"
+
+#include <mutex>
+
+#include "stout/synchronized.hpp"
+
#include "tests/mesos.hpp"
using std::map;
@@ -99,56 +104,70 @@ Future<bool> TestContainerizer::_launch(
containers_[key] = containerId;
Executor* executor = executors[executorInfo.executor_id()];
- Owned<MesosExecutorDriver> driver(new MesosExecutorDriver(executor));
- drivers[containerId] = driver;
-
- // Prepare additional environment variables for the executor.
- // TODO(benh): Need to get flags passed into the TestContainerizer
- // in order to properly use here.
- slave::Flags flags;
- flags.recovery_timeout = Duration::zero();
-
- // We need to save the original set of environment variables so we
- // can reset the environment after calling 'driver->start()' below.
- hashmap<string, string> original = os::environment();
-
- const map<string, string> environment = executorEnvironment(
- executorInfo,
- directory,
- slaveId,
- slavePid,
- checkpoint,
- flags);
- foreachpair (const string& name, const string variable, environment) {
- os::setenv(name, variable);
- }
+ // We need to synchronize all reads and writes to the environment as this is
+ // global state.
+ // TODO(jmlvanre): Even this is not sufficient, as other aspects of the code
+ // may read an environment variable while we are manipulating it. The better
+ // solution is to pass the environment variables into the fork, or to set them
+ // on the command line. See MESOS-3475.
+ static std::mutex mutex;
+
+ synchronized(mutex) {
+ // Since the constructor for `MesosExecutorDriver` reads environment
+ // variables to load flags, even it needs to be within this synchronization
+ // section.
+ Owned<MesosExecutorDriver> driver(new MesosExecutorDriver(executor));
+ drivers[containerId] = driver;
+
+ // Prepare additional environment variables for the executor.
+ // TODO(benh): Need to get flags passed into the TestContainerizer
+ // in order to properly use here.
+ slave::Flags flags;
+ flags.recovery_timeout = Duration::zero();
+
+ // We need to save the original set of environment variables so we
+ // can reset the environment after calling 'driver->start()' below.
+ hashmap<string, string> original = os::environment();
+
+ const map<string, string> environment = executorEnvironment(
+ executorInfo,
+ directory,
+ slaveId,
+ slavePid,
+ checkpoint,
+ flags);
+
+ foreachpair (const string& name, const string variable, environment) {
+ os::setenv(name, variable);
+ }
- // TODO(benh): Can this be removed and done exlusively in the
- // 'executorEnvironment()' function? There are other places in the
- // code where we do this as well and it's likely we can do this once
- // in 'executorEnvironment()'.
- foreach (const Environment::Variable& variable,
- executorInfo.command().environment().variables()) {
- os::setenv(variable.name(), variable.value());
- }
+ // TODO(benh): Can this be removed and done exlusively in the
+ // 'executorEnvironment()' function? There are other places in the
+ // code where we do this as well and it's likely we can do this once
+ // in 'executorEnvironment()'.
+ foreach (const Environment::Variable& variable,
+ executorInfo.command().environment().variables()) {
+ os::setenv(variable.name(), variable.value());
+ }
- os::setenv("MESOS_LOCAL", "1");
+ os::setenv("MESOS_LOCAL", "1");
- driver->start();
+ driver->start();
- os::unsetenv("MESOS_LOCAL");
+ os::unsetenv("MESOS_LOCAL");
- // Unset the environment variables we set by resetting them to their
- // original values and also removing any that were not part of the
- // original environment.
- foreachpair (const string& name, const string& value, original) {
- os::setenv(name, value);
- }
+ // Unset the environment variables we set by resetting them to their
+ // original values and also removing any that were not part of the
+ // original environment.
+ foreachpair (const string& name, const string& value, original) {
+ os::setenv(name, value);
+ }
- foreachkey (const string& name, environment) {
- if (!original.contains(name)) {
- os::unsetenv(name);
+ foreachkey (const string& name, environment) {
+ if (!original.contains(name)) {
+ os::unsetenv(name);
+ }
}
}
[3/5] mesos git commit: Added fitler support for Inverse Offers.
Posted by jo...@apache.org.
Added fitler support for Inverse Offers.
Review: https://reviews.apache.org/r/38324
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d03297a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d03297a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d03297a
Branch: refs/heads/master
Commit: 9d03297a9064dcde3ec920db4ef66003b4d323da
Parents: eec3fec
Author: Artem Harutyunyan <ar...@mesosphere.io>
Authored: Sat Sep 19 14:24:35 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:15 2015 -0400
----------------------------------------------------------------------
include/mesos/master/allocator.hpp | 3 +-
src/master/allocator/mesos/allocator.hpp | 12 +-
src/master/allocator/mesos/hierarchical.hpp | 205 +++++++++++++++++++++--
src/master/master.cpp | 6 +-
src/tests/mesos.hpp | 11 +-
5 files changed, 215 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 7301058..3fea47f 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -160,7 +160,8 @@ public:
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<InverseOfferStatus>& status) = 0;
+ const Option<InverseOfferStatus>& status,
+ const Option<Filters>& filters = None()) = 0;
// Informs the Allocator to recover resources that are considered
// used by the framework.
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 4f02dd1..904dc62 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -120,7 +120,8 @@ public:
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status);
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters);
void recoverResources(
const FrameworkID& frameworkId,
@@ -228,7 +229,8 @@ public:
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status) = 0;
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters = None()) = 0;
virtual void recoverResources(
const FrameworkID& frameworkId,
@@ -489,7 +491,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status)
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters)
{
return process::dispatch(
process,
@@ -497,7 +500,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
slaveId,
frameworkId,
unavailableResources,
- status);
+ status,
+ filters);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index a4c4107..d3496bc 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -52,7 +52,7 @@ namespace allocator {
// Forward declarations.
class OfferFilter;
-
+class InverseOfferFilter;
// We forward declare the hierarchical allocator process so that we
// can typedef an instantiation of it with DRF sorters.
@@ -158,7 +158,8 @@ public:
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status);
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters);
void recoverResources(
const FrameworkID& frameworkId,
@@ -192,12 +193,18 @@ protected:
// Send inverse offers from the specified slaves.
void deallocate(const hashset<SlaveID>& slaveIds);
- // Remove a filter for the specified framework.
+ // Remove an offer filter for the specified framework.
void expire(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
OfferFilter* offerFilter);
+ // Remove an inverse offer filter for the specified framework.
+ void expire(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ InverseOfferFilter* inverseOfferFilter);
+
// Checks whether the slave is whitelisted.
bool isWhitelisted(const SlaveID& slaveId);
@@ -208,6 +215,12 @@ protected:
const SlaveID& slaveId,
const Resources& resources);
+ // Returns true if there is an inverse offer filter for this framework
+ // on this slave.
+ bool isFiltered(
+ const FrameworkID& frameworkID,
+ const SlaveID& slaveID);
+
bool allocatable(const Resources& resources);
bool initialized;
@@ -251,8 +264,9 @@ protected:
// Whether the framework desires revocable resources.
bool revocable;
- // Active filters on offers for the framework.
+ // Active offer and inverse offer filters for the framework.
hashmap<SlaveID, hashset<OfferFilter*>> offerFilters;
+ hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters;
};
double _event_queue_dispatches()
@@ -382,6 +396,41 @@ public:
};
+// Used to represent "filters" for inverse offers.
+// NOTE: Since this specific allocator implementation only sends inverse offers
+// for maintenance primitives, and those are at the whole slave level, we only
+// need to filter based on the time-out.
+// If this allocator implementation starts sending out more resource specific
+// inverse offers, then we can capture the `unavailableResources` in the filter
+// function.
+class InverseOfferFilter
+{
+public:
+ virtual ~InverseOfferFilter() {}
+
+ virtual bool filter() = 0;
+};
+
+
+// NOTE: See comment above `InverseOfferFilter` regarding capturing
+// `unavailableResources` if this allocator starts sending fine-grained inverse
+// offers.
+class RefusedInverseOfferFilter: public InverseOfferFilter
+{
+public:
+ RefusedInverseOfferFilter(const process::Timeout& _timeout)
+ : timeout(_timeout) {}
+
+ virtual bool filter()
+ {
+ // See comment above why we currently don't do more fine-grained filtering.
+ return timeout.remaining() > Seconds(0);
+ }
+
+ const process::Timeout timeout;
+};
+
+
template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
@@ -541,6 +590,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
// HierarchicalAllocatorProcess::reviveOffers and
// HierarchicalAllocatorProcess::expire.
frameworks[frameworkId].offerFilters.clear();
+ frameworks[frameworkId].inverseOfferFilters.clear();
LOG(INFO) << "Deactivated framework " << frameworkId;
}
@@ -860,6 +910,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
// NOTE: We currently implement maintenance in the allocator to be able to
// leverage state and features such as the FrameworkSorter and OfferFilter.
+ // We explicitly remove all filters for the inverse offers of this slave. We
+ // do this because we want to force frameworks to reassess the calculations
+ // they have made to respond to the inverse offer. Unavailability of a slave
+ // can have a large effect on failure domain calculations and inter-leaved
+ // unavailability schedules.
+ foreachvalue (Framework& framework, frameworks) {
+ framework.inverseOfferFilters.erase(slaveId);
+ }
+
// Remove any old unavailability.
slaves[slaveId].maintenance = None();
@@ -879,7 +938,8 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
const Option<UnavailableResources>& unavailableResources,
- const Option<mesos::master::InverseOfferStatus>& status)
+ const Option<mesos::master::InverseOfferStatus>& status,
+ const Option<Filters>& filters)
{
CHECK(initialized);
CHECK(frameworks.contains(frameworkId));
@@ -915,6 +975,58 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
maintenance.statuses[frameworkId].CopyFrom(status.get());
}
}
+
+ // No need to install filters if `filters` is none.
+ if (filters.isNone()) {
+ return;
+ }
+
+ // Create a refused resource filter.
+ Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+ if (seconds.isError()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused inverse offer filter because the input value "
+ << "is invalid: " << seconds.error();
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ } else if (seconds.get() < Duration::zero()) {
+ LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+ << "the refused inverse offer filter because the input value "
+ << "is negative";
+
+ seconds = Duration::create(Filters().refuse_seconds());
+ }
+
+ CHECK_SOME(seconds);
+
+ if (seconds.get() != Duration::zero()) {
+ VLOG(1) << "Framework " << frameworkId
+ << " filtered inverse offers from slave " << slaveId
+ << " for " << seconds.get();
+
+ // Create a new inverse offer filter and delay its expiration.
+ InverseOfferFilter* inverseOfferFilter =
+ new RefusedInverseOfferFilter(process::Timeout::in(seconds.get()));
+
+ frameworks[frameworkId]
+ .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
+
+ // We need to disambiguate the function call to pick the correct
+ // expire() overload.
+ void (Self::*expireInverseOffer)(
+ const FrameworkID&,
+ const SlaveID&,
+ InverseOfferFilter*) = &Self::expire;
+
+ delay(
+ seconds.get(),
+ self(),
+ expireInverseOffer,
+ frameworkId,
+ slaveId,
+ inverseOfferFilter);
+ }
}
@@ -1009,10 +1121,17 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
+ // We need to disambiguate the function call to pick the correct
+ // expire() overload.
+ void (Self::*expireOffer)(
+ const FrameworkID&,
+ const SlaveID&,
+ OfferFilter*) = &Self::expire;
+
delay(
seconds.get(),
self(),
- &Self::expire,
+ expireOffer,
frameworkId,
slaveId,
offerFilter);
@@ -1038,6 +1157,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
CHECK(initialized);
frameworks[frameworkId].offerFilters.clear();
+ frameworks[frameworkId].inverseOfferFilters.clear();
frameworks[frameworkId].quiesced = false;
// We delete each actual `OfferFilter` when
@@ -1239,14 +1359,26 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate(
// If there isn't already an outstanding inverse offer to this
// framework for the specified slave.
if (!maintenance.offersOutstanding.contains(frameworkId)) {
+ // Ignore in case the framework filters inverse offers for this
+ // slave.
+ // NOTE: Since this specific allocator implementation only sends
+ // inverse offers for maintenance primitives, and those are at the
+ // whole slave level, we only need to filter based on the
+ // time-out.
+ if (isFiltered(frameworkId, slaveId)) {
+ continue;
+ }
+
+ const UnavailableResources unavailableResources =
+ UnavailableResources{
+ Resources(),
+ maintenance.unavailability};
+
// For now we send inverse offers with empty resources when the
// inverse offer represents maintenance on the machine. In the
// future we could be more specific about the resources on the
// host, as we have the information available.
- offerable[frameworkId][slaveId] =
- UnavailableResources{
- Resources(),
- maintenance.unavailability};
+ offerable[frameworkId][slaveId] = unavailableResources;
// Mark this framework as having an offer oustanding for the
// specified slave.
@@ -1295,6 +1427,34 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ InverseOfferFilter* inverseOfferFilter)
+{
+ // The filter might have already been removed (e.g., if the
+ // framework no longer exists or in
+ // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+ // keep the address from getting reused possibly causing premature
+ // expiration).
+ if (frameworks.contains(frameworkId) &&
+ frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
+ frameworks[frameworkId].inverseOfferFilters[slaveId]
+ .contains(inverseOfferFilter)) {
+ frameworks[frameworkId].inverseOfferFilters[slaveId]
+ .erase(inverseOfferFilter);
+
+ if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
+ frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
+ }
+ }
+
+ delete inverseOfferFilter;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
bool
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
const SlaveID& slaveId)
@@ -1345,6 +1505,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
template <class RoleSorter, class FrameworkSorter>
+bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId)
+{
+ CHECK(frameworks.contains(frameworkId));
+ CHECK(slaves.contains(slaveId));
+
+ if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
+ foreach (
+ InverseOfferFilter* inverseOfferFilter,
+ frameworks[frameworkId].inverseOfferFilters[slaveId]) {
+ if (inverseOfferFilter->filter()) {
+ VLOG(1) << "Filtered unavailability on slave " << slaveId
+ << " for framework " << frameworkId;
+
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
bool
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
const Resources& resources)
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5393ee8..6c0db21 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2850,7 +2850,8 @@ void Master::accept(
UnavailableResources{
inverseOffer->resources(),
inverseOffer->unavailability()},
- status);
+ status,
+ accept.filters());
removeInverseOffer(inverseOffer);
continue;
@@ -3324,7 +3325,8 @@ void Master::decline(
UnavailableResources{
inverseOffer->resources(),
inverseOffer->unavailability()},
- status);
+ status,
+ decline.filters());
removeInverseOffer(inverseOffer);
continue;
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index dd587bb..e1c0635 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1361,7 +1361,7 @@ ACTION_P(InvokeUpdateUnavailability, allocator)
ACTION_P(InvokeUpdateInverseOffer, allocator)
{
- return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3);
+ return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3, arg4);
}
@@ -1499,9 +1499,9 @@ public:
EXPECT_CALL(*this, updateUnavailability(_, _))
.WillRepeatedly(DoDefault());
- ON_CALL(*this, updateInverseOffer(_, _, _, _))
+ ON_CALL(*this, updateInverseOffer(_, _, _, _, _))
.WillByDefault(InvokeUpdateInverseOffer(this));
- EXPECT_CALL(*this, updateInverseOffer(_, _, _, _))
+ EXPECT_CALL(*this, updateInverseOffer(_, _, _, _, _))
.WillRepeatedly(DoDefault());
ON_CALL(*this, recoverResources(_, _, _, _))
@@ -1590,11 +1590,12 @@ public:
const SlaveID&,
const Option<Unavailability>&));
- MOCK_METHOD4(updateInverseOffer, void(
+ MOCK_METHOD5(updateInverseOffer, void(
const SlaveID&,
const FrameworkID&,
const Option<UnavailableResources>&,
- const Option<mesos::master::InverseOfferStatus>&));
+ const Option<mesos::master::InverseOfferStatus>&,
+ const Option<Filters>&));
MOCK_METHOD4(recoverResources, void(
const FrameworkID&,
[2/5] mesos git commit: Propagated UnavailableResources from Inverse
Offers to the allocator.
Posted by jo...@apache.org.
Propagated UnavailableResources from Inverse Offers to the allocator.
Review: https://reviews.apache.org/r/38246
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eec3fec0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eec3fec0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eec3fec0
Branch: refs/heads/master
Commit: eec3fec01a28d48fd26a71de7d918df71032d4ef
Parents: f388394
Author: Artem Harutyunyan <ar...@mesosphere.io>
Authored: Sat Sep 19 14:24:23 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:09 2015 -0400
----------------------------------------------------------------------
include/mesos/master/allocator.hpp | 5 ++++-
src/master/allocator/mesos/allocator.hpp | 4 ++++
src/master/allocator/mesos/hierarchical.hpp | 2 ++
src/master/master.cpp | 27 ++++++++++++++++++++++++
src/tests/mesos.hpp | 9 ++++----
5 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 2dc6312..7301058 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -153,10 +153,13 @@ public:
// revoked. If `status` is not set then the inverse offer was not responded
// to, possibly because the offer timed out or was rescinded. This might
// require the implementation of the function to remove any inverse offers
- // that are outstanding.
+ // that are outstanding. The `unavailableResources` can be used by the
+ // allocator to distinguish between different inverse offers sent to the same
+ // framework for the same slave.
virtual void updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
+ const Option<UnavailableResources>& unavailableResources,
const Option<InverseOfferStatus>& status) = 0;
// Informs the Allocator to recover resources that are considered
http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 86f6c55..4f02dd1 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -119,6 +119,7 @@ public:
void updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
+ const Option<UnavailableResources>& unavailableResources,
const Option<mesos::master::InverseOfferStatus>& status);
void recoverResources(
@@ -226,6 +227,7 @@ public:
virtual void updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
+ const Option<UnavailableResources>& unavailableResources,
const Option<mesos::master::InverseOfferStatus>& status) = 0;
virtual void recoverResources(
@@ -486,6 +488,7 @@ template <typename AllocatorProcess>
inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
+ const Option<UnavailableResources>& unavailableResources,
const Option<mesos::master::InverseOfferStatus>& status)
{
return process::dispatch(
@@ -493,6 +496,7 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
&MesosAllocatorProcess::updateInverseOffer,
slaveId,
frameworkId,
+ unavailableResources,
status);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index e944120..a4c4107 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -157,6 +157,7 @@ public:
void updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
+ const Option<UnavailableResources>& unavailableResources,
const Option<mesos::master::InverseOfferStatus>& status);
void recoverResources(
@@ -877,6 +878,7 @@ void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
const SlaveID& slaveId,
const FrameworkID& frameworkId,
+ const Option<UnavailableResources>& unavailableResources,
const Option<mesos::master::InverseOfferStatus>& status)
{
CHECK(initialized);
http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5eef29b..5393ee8 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2343,6 +2343,9 @@ void Master::_subscribe(
allocator->updateInverseOffer(
inverseOffer->slave_id(),
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
None());
removeInverseOffer(inverseOffer, true); // Rescind.
@@ -2508,6 +2511,9 @@ void Master::deactivate(Framework* framework)
allocator->updateInverseOffer(
inverseOffer->slave_id(),
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
None());
removeInverseOffer(inverseOffer, true); // Rescind.
@@ -2557,6 +2563,9 @@ void Master::deactivate(Slave* slave)
allocator->updateInverseOffer(
slave->id,
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
None());
removeInverseOffer(inverseOffer, true); // Rescind!
@@ -2838,6 +2847,9 @@ void Master::accept(
allocator->updateInverseOffer(
inverseOffer->slave_id(),
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
status);
removeInverseOffer(inverseOffer);
@@ -3309,6 +3321,9 @@ void Master::decline(
allocator->updateInverseOffer(
inverseOffer->slave_id(),
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
status);
removeInverseOffer(inverseOffer);
@@ -4320,6 +4335,9 @@ void Master::updateUnavailability(
allocator->updateInverseOffer(
slave->id,
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
None());
removeInverseOffer(inverseOffer, true); // Rescind!
@@ -5494,6 +5512,9 @@ void Master::_failoverFramework(Framework* framework)
allocator->updateInverseOffer(
inverseOffer->slave_id(),
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
None());
removeInverseOffer(inverseOffer);
@@ -5606,6 +5627,9 @@ void Master::removeFramework(Framework* framework)
allocator->updateInverseOffer(
inverseOffer->slave_id(),
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
None());
removeInverseOffer(inverseOffer);
@@ -6247,6 +6271,9 @@ void Master::inverseOfferTimeout(const OfferID& inverseOfferId)
allocator->updateInverseOffer(
inverseOffer->slave_id(),
inverseOffer->framework_id(),
+ UnavailableResources{
+ inverseOffer->resources(),
+ inverseOffer->unavailability()},
None());
removeInverseOffer(inverseOffer, true);
http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 760dcb7..dd587bb 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1361,7 +1361,7 @@ ACTION_P(InvokeUpdateUnavailability, allocator)
ACTION_P(InvokeUpdateInverseOffer, allocator)
{
- return allocator->real->updateInverseOffer(arg0, arg1, arg2);
+ return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3);
}
@@ -1499,9 +1499,9 @@ public:
EXPECT_CALL(*this, updateUnavailability(_, _))
.WillRepeatedly(DoDefault());
- ON_CALL(*this, updateInverseOffer(_, _, _))
+ ON_CALL(*this, updateInverseOffer(_, _, _, _))
.WillByDefault(InvokeUpdateInverseOffer(this));
- EXPECT_CALL(*this, updateInverseOffer(_, _, _))
+ EXPECT_CALL(*this, updateInverseOffer(_, _, _, _))
.WillRepeatedly(DoDefault());
ON_CALL(*this, recoverResources(_, _, _, _))
@@ -1590,9 +1590,10 @@ public:
const SlaveID&,
const Option<Unavailability>&));
- MOCK_METHOD3(updateInverseOffer, void(
+ MOCK_METHOD4(updateInverseOffer, void(
const SlaveID&,
const FrameworkID&,
+ const Option<UnavailableResources>&,
const Option<mesos::master::InverseOfferStatus>&));
MOCK_METHOD4(recoverResources, void(