You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/14 19:58:53 UTC
[09/16] mesos git commit: Maintenance Primitives: Added inverse
offers.
Maintenance Primitives: Added inverse offers.
Review: https://reviews.apache.org/r/37177
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1de99f4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1de99f4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1de99f4
Branch: refs/heads/master
Commit: a1de99f42323d8eb1396fcd10884eaac32a93eab
Parents: 8e04258
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Tue Aug 25 18:41:21 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Mon Sep 14 13:58:37 2015 -0400
----------------------------------------------------------------------
include/mesos/maintenance/maintenance.hpp | 15 +++
include/mesos/master/allocator.hpp | 6 +
src/master/allocator/mesos/allocator.hpp | 13 ++
src/master/allocator/mesos/hierarchical.hpp | 99 ++++++++++++++
src/master/master.cpp | 9 ++
src/master/master.hpp | 4 +
src/tests/hierarchical_allocator_tests.cpp | 157 +++++++++++++----------
src/tests/master_allocator_tests.cpp | 32 ++---
src/tests/mesos.hpp | 11 +-
src/tests/reservation_endpoints_tests.cpp | 20 +--
src/tests/reservation_tests.cpp | 4 +-
src/tests/resource_offers_tests.cpp | 2 +-
src/tests/slave_recovery_tests.cpp | 2 +-
13 files changed, 270 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/include/mesos/maintenance/maintenance.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/maintenance/maintenance.hpp b/include/mesos/maintenance/maintenance.hpp
index 7fec3ff..f676d01 100644
--- a/include/mesos/maintenance/maintenance.hpp
+++ b/include/mesos/maintenance/maintenance.hpp
@@ -22,4 +22,19 @@
// ONLY USEFUL AFTER RUNNING PROTOC.
#include <mesos/maintenance/maintenance.pb.h>
+#include <mesos/resources.hpp>
+
+namespace mesos {
+
+// A wrapper for resources and unavailability used to communicate between the
+// Allocator and Master in order to let the Master create InverseOffers from the
+// Allocator.
+struct UnavailableResources
+{
+ Resources resources;
+ Unavailability unavailability;
+};
+
+} // namespace mesos {
+
#endif // __MAINTENANCE_PROTO_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index b5bfc28..18d31ef 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -25,6 +25,8 @@
// ONLY USEFUL AFTER RUNNING PROTOC.
#include <mesos/master/allocator.pb.h>
+#include <mesos/maintenance/maintenance.hpp>
+
#include <mesos/resources.hpp>
#include <process/future.hpp>
@@ -67,6 +69,10 @@ public:
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>&
+ inverseOfferCallback,
const hashmap<std::string, RoleInfo>& roles) = 0;
virtual void addFramework(
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index ee6ec58..124dd3d 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -52,6 +52,10 @@ public:
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>&
+ inverseOfferCallback,
const hashmap<std::string, mesos::master::RoleInfo>& roles);
void addFramework(
@@ -147,6 +151,10 @@ public:
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>&
+ inverseOfferCallback,
const hashmap<std::string, mesos::master::RoleInfo>& roles) = 0;
virtual void addFramework(
@@ -250,6 +258,10 @@ inline void MesosAllocator<AllocatorProcess>::initialize(
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>&
+ inverseOfferCallback,
const hashmap<std::string, mesos::master::RoleInfo>& roles)
{
process::dispatch(
@@ -257,6 +269,7 @@ inline void MesosAllocator<AllocatorProcess>::initialize(
&MesosAllocatorProcess::initialize,
allocationInterval,
offerCallback,
+ inverseOfferCallback,
roles);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 77a5b4c..8ae7475 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -90,6 +90,10 @@ public:
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, Resources>&)>& offerCallback,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>&
+ inverseOfferCallback,
const hashmap<std::string, mesos::master::RoleInfo>& roles);
void addFramework(
@@ -176,6 +180,9 @@ protected:
// Allocate resources from the specified slaves.
void allocate(const hashset<SlaveID>& slaveIds);
+ // Send inverse offers from the specified slaves.
+ void deallocate(const hashset<SlaveID>& slaveIds);
+
// Remove a filter for the specified framework.
void expire(
const FrameworkID& frameworkId,
@@ -202,6 +209,10 @@ protected:
void(const FrameworkID&,
const hashmap<SlaveID, Resources>&)> offerCallback;
+ lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)> inverseOfferCallback;
+
struct Metrics
{
explicit Metrics(const Self& process)
@@ -366,10 +377,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, Resources>&)>& _offerCallback,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>&
+ _inverseOfferCallback,
const hashmap<std::string, mesos::master::RoleInfo>& _roles)
{
allocationInterval = _allocationInterval;
offerCallback = _offerCallback;
+ inverseOfferCallback = _inverseOfferCallback;
roles = _roles;
initialized = true;
@@ -1086,6 +1102,89 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
offerCallback(frameworkId, offerable[frameworkId]);
}
}
+
+ // NOTE: For now, we implement maintenance inverse offers within the
+ // allocator. We leverage the existing timer/cycle of offers to also do any
+ // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
+ deallocate(slaveIds_);
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate(
+ const hashset<SlaveID>& slaveIds_)
+{
+ if (frameworkSorters.empty()) {
+ LOG(ERROR) << "No frameworks specified, cannot send inverse offers!";
+ return;
+ }
+
+ // In this case, `offerable` is actually the slaves and/or resources that we
+ // want the master to create `InverseOffer`s from.
+ hashmap<FrameworkID, hashmap<SlaveID, UnavailableResources>> offerable;
+
+ // For maintenance, we use the framework sorters to determine which frameworks
+ // have (1) reserved and / or (2) unreserved resource on the specified
+ // slaveIds. This way we only send inverse offers to frameworks that have the
+ // potential to lose something. We keep track of which frameworks already have
+ // an outstanding inverse offer for the given slave in the
+ // UnavailabilityStatus of the specific slave using the `offerOutstanding`
+ // flag. This is equivalent to the accounting we do for resources when we send
+ // regular offers. If we didn't keep track of outstanding offers then we would
+ // keep generating new inverse offers even though the framework had not
+ // responded yet.
+
+ foreachvalue (FrameworkSorter* frameworkSorter, frameworkSorters) {
+ foreach (const SlaveID& slaveId, slaveIds_) {
+ CHECK(slaves.contains(slaveId));
+
+ if (slaves[slaveId].maintenance.isSome()) {
+ // We use a reference by alias because we intend to modify the
+ // `maintenance` and to improve readability.
+ typename Slave::Maintenance& maintenance =
+ slaves[slaveId].maintenance.get();
+
+ hashmap<std::string, Resources> allocation =
+ frameworkSorter->allocation(slaveId);
+
+ foreachkey (const std::string& frameworkId_, allocation) {
+ FrameworkID frameworkId;
+ frameworkId.set_value(frameworkId_);
+
+ // If this framework doesn't already have inverse offers for the
+ // specified slave.
+ if (!offerable[frameworkId].contains(slaveId)) {
+ // If there isn't already an outstanding inverse offer to this
+ // framework for the specified slave.
+ if (!maintenance.offersOutstanding.contains(frameworkId)) {
+ // For now we send inverse offers with empty resources when the
+ // inverse offer represents maintenance on the machine. In the
+ // future we could be more specific about the resources on the
+ // host, as we have the information available.
+ offerable[frameworkId][slaveId] =
+ UnavailableResources{
+ Resources(),
+ maintenance.unavailability};
+
+ // Mark this framework as having an offer oustanding for the
+ // specified slave.
+ maintenance.offersOutstanding.insert(frameworkId);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (offerable.empty()) {
+ VLOG(1) << "No inverse offers to send out!";
+ } else {
+ // Now send inverse offers to each framework.
+ foreachkey (const FrameworkID& frameworkId, offerable) {
+ inverseOfferCallback(frameworkId, offerable[frameworkId]);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0b3ba56..8471735 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -621,6 +621,7 @@ void Master::initialize()
allocator->initialize(
flags.allocation_interval,
defer(self(), &Master::offer, lambda::_1, lambda::_2),
+ defer(self(), &Master::inverseOffer, lambda::_1, lambda::_2),
roleInfos);
// Parse the whitelist. Passing Allocator::updateWhitelist()
@@ -4781,6 +4782,14 @@ void Master::offer(const FrameworkID& frameworkId,
}
+void Master::inverseOffer(
+ const FrameworkID& frameworkId,
+ const hashmap<SlaveID, UnavailableResources>& resources)
+{
+ // TODO(jmlvanre): Implement this function.
+}
+
+
// TODO(vinod): If due to network partition there are two instances
// of the framework that think they are leaders and try to
// authenticate with master they would be stepping on each other's
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index cd71a25..1ba0837 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -491,6 +491,10 @@ public:
const FrameworkID& framework,
const hashmap<SlaveID, Resources>& resources);
+ void inverseOffer(
+ const FrameworkID& framework,
+ const hashmap<SlaveID, UnavailableResources>& resources);
+
// Invoked when there is a newly elected leading master.
// Made public for testing purposes.
void detected(const process::Future<Option<MasterInfo>>& pid);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 0a24b6b..2f37c98 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -79,6 +79,13 @@ struct Allocation
};
+struct Deallocation
+{
+ FrameworkID frameworkId;
+ hashmap<SlaveID, UnavailableResources> resources;
+};
+
+
class HierarchicalAllocatorTestBase : public ::testing::Test
{
protected:
@@ -95,9 +102,13 @@ protected:
void initialize(
const vector<string>& _roles,
const master::Flags& _flags = master::Flags(),
- const Option<lambda::function<
+ Option<lambda::function<
void(const FrameworkID&,
- const hashmap<SlaveID, Resources>&)>>& offerCallback = None())
+ const hashmap<SlaveID, Resources>&)>> offerCallback = None(),
+ Option<lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>>
+ inverseOfferCallback = None())
{
flags = _flags;
@@ -111,17 +122,35 @@ protected:
roles[role] = info;
}
- if (offerCallback.isSome()) {
- allocator->initialize(
- flags.allocation_interval,
- offerCallback.get(),
- roles);
- } else {
- allocator->initialize(
- flags.allocation_interval,
- lambda::bind(&put, &queue, lambda::_1, lambda::_2),
- roles);
+ if (offerCallback.isNone()) {
+ offerCallback =
+ [this](const FrameworkID& frameworkId,
+ const hashmap<SlaveID, Resources>& resources) {
+ Allocation allocation;
+ allocation.frameworkId = frameworkId;
+ allocation.resources = resources;
+
+ allocations.put(allocation);
+ };
}
+
+ if (inverseOfferCallback.isNone()) {
+ inverseOfferCallback =
+ [this](const FrameworkID& frameworkId,
+ const hashmap<SlaveID, UnavailableResources>& resources) {
+ Deallocation deallocation;
+ deallocation.frameworkId = frameworkId;
+ deallocation.resources = resources;
+
+ deallocations.put(deallocation);
+ };
+ }
+
+ allocator->initialize(
+ flags.allocation_interval,
+ offerCallback.get(),
+ inverseOfferCallback.get(),
+ roles);
}
SlaveInfo createSlaveInfo(const string& resources)
@@ -158,25 +187,13 @@ protected:
return resource;
}
-private:
- static void put(
- process::Queue<Allocation>* queue,
- const FrameworkID& frameworkId,
- const hashmap<SlaveID, Resources>& resources)
- {
- Allocation allocation;
- allocation.frameworkId = frameworkId;
- allocation.resources = resources;
-
- queue->put(allocation);
- }
-
protected:
master::Flags flags;
Allocator* allocator;
- process::Queue<Allocation> queue;
+ process::Queue<Allocation> allocations;
+ process::Queue<Deallocation> deallocations;
hashmap<string, RoleInfo> roles;
@@ -224,7 +241,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
allocator->addFramework(
framework1.id(), framework1, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
EXPECT_EQ(slave1.resources(), Resources::sum(allocation.get().resources));
@@ -246,7 +263,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
// framework2 will be offered all of slave2's resources since role2
// has the lowest user share, and framework2 is its only framework.
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources));
@@ -266,7 +283,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
// framework2 will be offered all of slave3's resources since role2
// has the lowest share.
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources));
@@ -292,7 +309,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
// framework3 will be offered all of slave4's resources since role1
// has the lowest user share, and framework3 has the lowest share of
// role1's frameworks.
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework3.id(), allocation.get().frameworkId);
EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources));
@@ -319,7 +336,7 @@ TEST_F(HierarchicalAllocatorTest, UnreservedDRF)
// Even though framework4 doesn't have any resources, role2 has a
// lower share than role1, so framework2 receives slave5's resources.
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
EXPECT_EQ(slave5.resources(), Resources::sum(allocation.get().resources));
@@ -350,7 +367,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
allocator->addFramework(
framework1.id(), framework1, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
EXPECT_EQ(slave1.resources(), Resources::sum(allocation.get().resources));
@@ -363,7 +380,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
SlaveInfo slave2 = createSlaveInfo("cpus:2;mem:512;disk:0");
allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
EXPECT_EQ(slave2.resources(), Resources::sum(allocation.get().resources));
@@ -375,7 +392,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
SlaveInfo slave3 = createSlaveInfo("cpus:2;mem:512;disk:0");
allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY);
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
EXPECT_EQ(slave3.resources(), Resources::sum(allocation.get().resources));
@@ -392,7 +409,7 @@ TEST_F(HierarchicalAllocatorTest, ReservedDRF)
"cpus(role1):2;mem(role1):1024;disk(role1):0");
allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY);
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework3.id(), allocation.get().frameworkId);
EXPECT_EQ(slave4.resources(), Resources::sum(allocation.get().resources));
@@ -425,7 +442,7 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
allocator->addFramework(
framework1.id(), framework1, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
EXPECT_EQ(slave1.resources() + slave2.resources(),
@@ -448,28 +465,28 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained)
allocator->addFramework(
framework2.id(), framework2, hashmap<SlaveID, Resources>());
- hashmap<FrameworkID, Allocation> allocations;
+ hashmap<FrameworkID, Allocation> frameworkAllocations;
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
- allocations[allocation.get().frameworkId] = allocation.get();
+ frameworkAllocations[allocation.get().frameworkId] = allocation.get();
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
- allocations[allocation.get().frameworkId] = allocation.get();
+ frameworkAllocations[allocation.get().frameworkId] = allocation.get();
// Note that slave1 and slave2 have the same resources, we don't
// care which framework received which slave.. only that they each
// received one.
- ASSERT_TRUE(allocations.contains(framework1.id()));
- ASSERT_EQ(1u, allocations[framework1.id()].resources.size());
+ ASSERT_TRUE(frameworkAllocations.contains(framework1.id()));
+ ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size());
EXPECT_EQ(slave1.resources(),
- Resources::sum(allocations[framework1.id()].resources));
+ Resources::sum(frameworkAllocations[framework1.id()].resources));
- ASSERT_TRUE(allocations.contains(framework2.id()));
- ASSERT_EQ(1u, allocations[framework1.id()].resources.size());
+ ASSERT_TRUE(frameworkAllocations.contains(framework2.id()));
+ ASSERT_EQ(1u, frameworkAllocations[framework1.id()].resources.size());
EXPECT_EQ(slave2.resources(),
- Resources::sum(allocations[framework1.id()].resources));
+ Resources::sum(frameworkAllocations[framework1.id()].resources));
}
@@ -501,7 +518,7 @@ TEST_F(HierarchicalAllocatorTest, SameShareFairness)
hashmap<FrameworkID, size_t> counts;
for (int i = 0; i < 10; i++) {
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
counts[allocation.get().frameworkId]++;
@@ -552,7 +569,7 @@ TEST_F(HierarchicalAllocatorTest, Reservations)
allocator->addFramework(
framework1.id(), framework1, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
EXPECT_EQ(2u, allocation.get().resources.size());
@@ -566,7 +583,7 @@ TEST_F(HierarchicalAllocatorTest, Reservations)
allocator->addFramework(
framework2.id(), framework2, hashmap<SlaveID, Resources>());
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework2.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -595,7 +612,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
allocator->addFramework(
framework1.id(), framework1, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -613,7 +630,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
Clock::advance(flags.allocation_interval);
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -631,7 +648,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
Clock::advance(flags.allocation_interval);
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework1.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -669,7 +686,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
"disk:128");
allocator->addSlave(slave2.id(), slave2, None(), slave2.resources(), EMPTY);
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -683,7 +700,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
"disk:128");
allocator->addSlave(slave3.id(), slave3, None(), slave3.resources(), EMPTY);
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -700,7 +717,7 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
"disk:128");
allocator->addSlave(slave4.id(), slave4, None(), slave4.resources(), EMPTY);
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -726,7 +743,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
allocator->addFramework(
framework.id(), framework, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -764,7 +781,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
Clock::advance(flags.allocation_interval);
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -809,7 +826,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess)
allocator->addFramework(
framework.id(), framework, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -843,7 +860,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail)
allocator->addFramework(
framework.id(), framework, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(framework.id(), allocation.get().frameworkId);
EXPECT_EQ(1u, allocation.get().resources.size());
@@ -884,7 +901,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
framework.id(), framework, hashmap<SlaveID, Resources>());
// Initially, all the resources are allocated.
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
@@ -893,7 +910,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
allocator->updateSlave(slave.id(), oversubscribed);
// The next allocation should be for 10 oversubscribed resources.
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources));
@@ -902,7 +919,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
allocator->updateSlave(slave.id(), oversubscribed2);
// The next allocation should be for 2 oversubscribed cpus.
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(oversubscribed2 - oversubscribed,
Resources::sum(allocation.get().resources));
@@ -914,7 +931,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlave)
// Since there are no more available oversubscribed resources there
// shouldn't be an allocation.
Clock::settle();
- allocation = queue.get();
+ allocation = allocations.get();
ASSERT_TRUE(allocation.isPending());
}
@@ -938,7 +955,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
framework.id(), framework, hashmap<SlaveID, Resources>());
// Initially, all the resources are allocated.
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
@@ -949,7 +966,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
// No allocation should be made for oversubscribed resources because
// the framework has not opted in for them.
Clock::settle();
- allocation = queue.get();
+ allocation = allocations.get();
ASSERT_TRUE(allocation.isPending());
}
@@ -976,7 +993,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
framework.id(), framework, hashmap<SlaveID, Resources>());
// Initially, all the resources are allocated.
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
@@ -985,7 +1002,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
allocator->updateSlave(slave.id(), oversubscribed);
// The next allocation should be for 10 oversubscribed cpus.
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(oversubscribed, Resources::sum(allocation.get().resources));
@@ -999,7 +1016,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
// The next allocation should be for 6 oversubscribed and 2 regular
// cpus.
- allocation = queue.get();
+ allocation = allocations.get();
AWAIT_READY(allocation);
EXPECT_EQ(recovered, Resources::sum(allocation.get().resources));
}
@@ -1028,7 +1045,7 @@ TEST_F(HierarchicalAllocatorTest, Whitelist)
allocator->addFramework(
framework.id(), framework, hashmap<SlaveID, Resources>());
- Future<Allocation> allocation = queue.get();
+ Future<Allocation> allocation = allocations.get();
// Ensure a batch allocation is triggered.
Clock::advance(flags.allocation_interval);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index c6a419b..1fe3757 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -95,7 +95,7 @@ TYPED_TEST(MasterAllocatorTest, SingleFramework)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = this->StartMaster(&allocator);
ASSERT_SOME(master);
@@ -141,7 +141,7 @@ TYPED_TEST(MasterAllocatorTest, ResourcesUnused)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = this->StartMaster(&allocator);
ASSERT_SOME(master);
@@ -246,7 +246,7 @@ TYPED_TEST(MasterAllocatorTest, OutOfOrderDispatch)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = this->StartMaster(&allocator);
ASSERT_SOME(master);
@@ -374,7 +374,7 @@ TYPED_TEST(MasterAllocatorTest, SchedulerFailover)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = this->StartMaster(&allocator);
ASSERT_SOME(master);
@@ -496,7 +496,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkExited)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
master::Flags masterFlags = this->CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(50);
@@ -642,7 +642,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = this->StartMaster(&allocator);
ASSERT_SOME(master);
@@ -757,7 +757,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveAdded)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
master::Flags masterFlags = this->CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(50);
@@ -851,7 +851,7 @@ TYPED_TEST(MasterAllocatorTest, TaskFinished)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
master::Flags masterFlags = this->CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(50);
@@ -952,7 +952,7 @@ TYPED_TEST(MasterAllocatorTest, CpusOnlyOfferedAndTaskLaunched)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
master::Flags masterFlags = this->CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(50);
@@ -1030,7 +1030,7 @@ TYPED_TEST(MasterAllocatorTest, MemoryOnlyOfferedAndTaskLaunched)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
master::Flags masterFlags = this->CreateMasterFlags();
masterFlags.allocation_interval = Milliseconds(50);
@@ -1121,7 +1121,7 @@ TYPED_TEST(MasterAllocatorTest, Whitelist)
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Future<Nothing> updateWhitelist1;
EXPECT_CALL(allocator, updateWhitelist(Option<hashset<string>>(hosts)))
@@ -1161,7 +1161,7 @@ TYPED_TEST(MasterAllocatorTest, RoleTest)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
master::Flags masterFlags = this->CreateMasterFlags();
masterFlags.roles = Some("role2");
@@ -1253,7 +1253,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = this->StartMaster(&allocator);
ASSERT_SOME(master);
@@ -1311,7 +1311,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst)
{
TestAllocator<TypeParam> allocator2;
- EXPECT_CALL(allocator2, initialize(_, _, _));
+ EXPECT_CALL(allocator2, initialize(_, _, _, _));
Future<Nothing> addFramework;
EXPECT_CALL(allocator2, addFramework(_, _, _))
@@ -1378,7 +1378,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst)
{
TestAllocator<TypeParam> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = this->StartMaster(&allocator);
ASSERT_SOME(master);
@@ -1435,7 +1435,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst)
{
TestAllocator<TypeParam> allocator2;
- EXPECT_CALL(allocator2, initialize(_, _, _));
+ EXPECT_CALL(allocator2, initialize(_, _, _, _));
Future<Nothing> addSlave;
EXPECT_CALL(allocator2, addSlave(_, _, _, _, _))
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 477b7e4..858618f 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1265,7 +1265,7 @@ public:
ACTION_P(InvokeInitialize, allocator)
{
- allocator->real->initialize(arg0, arg1, arg2);
+ allocator->real->initialize(arg0, arg1, arg2, arg3);
}
@@ -1407,9 +1407,9 @@ public:
// to get the best of both worlds: the ability to use 'DoDefault'
// and no warnings when expectations are not explicit.
- ON_CALL(*this, initialize(_, _, _))
+ ON_CALL(*this, initialize(_, _, _, _))
.WillByDefault(InvokeInitialize(this));
- EXPECT_CALL(*this, initialize(_, _, _))
+ EXPECT_CALL(*this, initialize(_, _, _, _))
.WillRepeatedly(DoDefault());
ON_CALL(*this, addFramework(_, _, _))
@@ -1500,11 +1500,14 @@ public:
virtual ~TestAllocator() {}
- MOCK_METHOD3(initialize, void(
+ MOCK_METHOD4(initialize, void(
const Duration&,
const lambda::function<
void(const FrameworkID&,
const hashmap<SlaveID, Resources>&)>&,
+ const lambda::function<
+ void(const FrameworkID&,
+ const hashmap<SlaveID, UnavailableResources>&)>&,
const hashmap<std::string, mesos::master::RoleInfo>&));
MOCK_METHOD3(addFramework, void(
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index 572a8d6..398a2e1 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -132,7 +132,7 @@ TEST_F(ReservationEndpointsTest, AvailableResources)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
@@ -225,7 +225,7 @@ TEST_F(ReservationEndpointsTest, ReserveOfferedResources)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
@@ -299,7 +299,7 @@ TEST_F(ReservationEndpointsTest, UnreserveOfferedResources)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
@@ -381,7 +381,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
master::Flags masterFlags = CreateMasterFlags();
// Turn off allocation. We're doing it manually.
@@ -527,7 +527,7 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
// Turn off allocation. We're doing it manually.
masterFlags.allocation_interval = Seconds(1000);
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator, masterFlags);
ASSERT_SOME(master);
@@ -678,7 +678,7 @@ TEST_F(ReservationEndpointsTest, InsufficientResources)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
@@ -720,7 +720,7 @@ TEST_F(ReservationEndpointsTest, NoHeader)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
@@ -770,7 +770,7 @@ TEST_F(ReservationEndpointsTest, BadCredentials)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
@@ -847,7 +847,7 @@ TEST_F(ReservationEndpointsTest, NoResources)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
@@ -882,7 +882,7 @@ TEST_F(ReservationEndpointsTest, NonMatchingPrincipal)
{
TestAllocator<> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator);
ASSERT_SOME(master);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index 91fcf0d..6b7c43c 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -410,7 +410,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
masterFlags.allocation_interval = Milliseconds(50);
masterFlags.roles = frameworkInfo.role();
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator, masterFlags);
ASSERT_SOME(master);
@@ -501,7 +501,7 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
masterFlags.allocation_interval = Milliseconds(50);
masterFlags.roles = frameworkInfo.role();
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master>> master = StartMaster(&allocator, masterFlags);
ASSERT_SOME(master);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index 882a9ff..af40a07 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -283,7 +283,7 @@ TEST_F(ResourceOffersTest, Request)
{
TestAllocator<master::allocator::HierarchicalDRFAllocator> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _))
+ EXPECT_CALL(allocator, initialize(_, _, _, _))
.Times(1);
Try<PID<Master>> master = StartMaster(&allocator);
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1de99f4/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index b636986..dd8f823 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -2143,7 +2143,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
{
TestAllocator<master::allocator::HierarchicalDRFAllocator> allocator;
- EXPECT_CALL(allocator, initialize(_, _, _));
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
Try<PID<Master> > master = this->StartMaster(&allocator);
ASSERT_SOME(master);