You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/25 01:18:44 UTC
mesos git commit: Updated the filter abstraction for Resources.
Repository: mesos
Updated Branches:
refs/heads/master b913ddf17 -> c64b58894
Updated the filter abstraction for Resources.
Review: https://reviews.apache.org/r/30911
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c64b5889
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c64b5889
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c64b5889
Branch: refs/heads/master
Commit: c64b5889460025eab3d282ff679fdad7dd927e43
Parents: b913ddf
Author: Michael Park <mc...@gmail.com>
Authored: Tue Feb 24 15:45:53 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 24 16:14:51 2015 -0800
----------------------------------------------------------------------
include/mesos/resources.hpp | 95 ++++--------------------
src/common/resources.cpp | 71 ++++++++++--------
src/master/allocator/mesos/hierarchical.hpp | 12 +--
src/master/master.hpp | 11 ++-
src/master/validation.cpp | 10 +--
src/slave/slave.cpp | 35 +++++----
src/tests/resources_tests.cpp | 2 +-
7 files changed, 91 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/include/mesos/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index c242bcc..da6d488 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -30,6 +30,7 @@
#include <stout/check.hpp>
#include <stout/error.hpp>
#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
@@ -94,11 +95,11 @@ public:
// Tests if the given Resource object is a persistent volume.
static bool isPersistentVolume(const Resource& resource);
- // Tests if the given Resource object is reserved.
- static bool isReserved(const Resource& resource);
-
- // Tests if the given Resource object is reserved for the given role.
- static bool isReserved(const Resource& resource, const std::string& role);
+ // Tests if the given Resource object is reserved. If the role is
+ // specified, tests that it's reserved for the given role.
+ static bool isReserved(
+ const Resource& resource,
+ const Option<std::string>& role = None());
// Tests if the given Resource object is unreserved.
static bool isUnreserved(const Resource& resource);
@@ -132,6 +133,10 @@ public:
// Checks if this Resources contains the given Resource.
bool contains(const Resource& that) const;
+ // Filter resources based on the given predicate.
+ Resources filter(
+ const lambda::function<bool(const Resource&)>& predicate) const;
+
// Returns the reserved resources, by role.
hashmap<std::string, Resources> reserved() const;
@@ -142,6 +147,9 @@ public:
// Returns the unreserved resources.
Resources unreserved() const;
+ // Returns the persistent volumes.
+ Resources persistentVolumes() const;
+
// Returns a Resources object with the same amount of each resource
// type as these Resources, but with all Resource objects marked as
// the specified role.
@@ -235,83 +243,6 @@ public:
Resources& operator -= (const Resource& that);
Resources& operator -= (const Resources& that);
- // The base class for all resources filters.
- // TODO(jieyu): Pull resources filters out of Resources class and
- // possibly put them inside a resources::filter namespace.
- class Filter
- {
- public:
- // Apply this filter to the given resources and return the
- // filtered resources.
- virtual Resources apply(const Resources& resources) const = 0;
- };
-
- class RoleFilter : public Filter
- {
- public:
- static RoleFilter any() { return RoleFilter(); }
-
- RoleFilter() : type(ANY) {}
-
- explicit RoleFilter(const std::string& _role)
- : type(SOME), role(_role) {}
-
- virtual Resources apply(const Resources& resources) const
- {
- if (type == ANY) {
- return resources;
- }
-
- CHECK_SOME(role);
-
- return role.get() == "*" ?
- resources.unreserved() :
- resources.reserved(role.get());
- }
-
- private:
- enum { ANY, SOME } type;
- Option<std::string> role;
- };
-
- class PersistentVolumeFilter : public Filter
- {
- public:
- PersistentVolumeFilter() {}
-
- virtual Resources apply(const Resources& resources) const
- {
- Resources result;
- foreach (const Resource& resource, resources) {
- if (isPersistentVolume(resource)) {
- result += resource;
- }
- }
- return result;
- }
- };
-
- // Resources that need checkpointing on the slave.
- // TODO(jieyu): This filter is only used by master and slave.
- // Consider pulling this out of this header.
- class CheckpointFilter : public Filter
- {
- public:
- CheckpointFilter() {}
-
- virtual Resources apply(const Resources& resources) const
- {
- Resources result;
- foreach (const Resource& resource, resources) {
- // TODO(jieyu): Consider dynamic reservation as well.
- if (isPersistentVolume(resource)) {
- result += resource;
- }
- }
- return result;
- }
- };
-
private:
// Similar to 'contains(const Resource&)' but skips the validity
// check. This can be used to avoid the performance overhead of
http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index a45bbaf..2c99b68 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -26,6 +26,7 @@
#include <mesos/values.hpp>
#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
#include <stout/strings.hpp>
using std::ostream;
@@ -406,15 +407,15 @@ bool Resources::isPersistentVolume(const Resource& resource)
}
-bool Resources::isReserved(const Resource& resource)
+bool Resources::isReserved(
+ const Resource& resource,
+ const Option<std::string>& role)
{
- return !isUnreserved(resource);
-}
-
-
-bool Resources::isReserved(const Resource& resource, const std::string& role)
-{
- return isReserved(resource) && resource.role() == role;
+ if (role.isSome()) {
+ return resource.role() != "*" && role.get() == resource.role();
+ } else {
+ return resource.role() != "*";
+ }
}
@@ -483,6 +484,19 @@ bool Resources::contains(const Resource& that) const
}
+Resources Resources::filter(
+ const lambda::function<bool(const Resource&)>& predicate) const
+{
+ Resources result;
+ foreach (const Resource& resource, resources) {
+ if (predicate(resource)) {
+ result += resource;
+ }
+ }
+ return result;
+}
+
+
hashmap<string, Resources> Resources::reserved() const
{
hashmap<string, Resources> result;
@@ -499,29 +513,19 @@ hashmap<string, Resources> Resources::reserved() const
Resources Resources::reserved(const string& role) const
{
- Resources result;
-
- foreach (const Resource& resource, resources) {
- if (isReserved(resource, role)) {
- result += resource;
- }
- }
-
- return result;
+ return filter(lambda::bind(isReserved, lambda::_1, role));
}
Resources Resources::unreserved() const
{
- Resources result;
+ return filter(isUnreserved);
+}
- foreach (const Resource& resource, resources) {
- if (isUnreserved(resource)) {
- result += resource;
- }
- }
- return result;
+Resources Resources::persistentVolumes() const
+{
+ return filter(isPersistentVolume);
}
@@ -538,21 +542,26 @@ Resources Resources::flatten(const string& role) const
}
+// A predicate that returns true for any resource.
+static bool any(const Resource&) { return true; }
+
+
Option<Resources> Resources::find(const Resource& target) const
{
Resources found;
Resources total = *this;
Resources remaining = Resources(target).flatten();
- // First look in the target role, then "*", then any remaining role.
- vector<RoleFilter> filters = {
- RoleFilter(target.role()),
- RoleFilter("*"),
- RoleFilter::any()
+ // First look in the target role, then unreserved, then any remaining role.
+ // TODO(mpark): Use a lambda for 'any' instead once we get full C++11.
+ vector<lambda::function<bool(const Resource&)>> predicates = {
+ lambda::bind(isReserved, lambda::_1, target.role()),
+ isUnreserved,
+ any
};
- foreach (const RoleFilter& filter, filters) {
- foreach (const Resource& resource, filter.apply(total)) {
+ foreach (const auto& predicate, predicates) {
+ foreach (const Resource& resource, total.filter(predicate)) {
// Need to flatten to ignore the roles in contains().
Resources flattened = Resources(resource).flatten();
http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 9b7ded3..c0b1da7 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -795,11 +795,11 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
FrameworkID frameworkId;
frameworkId.set_value(frameworkId_);
- Resources unreserved = slaves[slaveId].available.unreserved();
- Resources resources = unreserved;
- if (role != "*") {
- resources += slaves[slaveId].available.reserved(role);
- }
+ // NOTE: Currently, frameworks are allowed to have '*' role.
+ // Calling reserved('*') returns an empty Resources object.
+ Resources resources =
+ slaves[slaveId].available.unreserved() +
+ slaves[slaveId].available.reserved(role);
// If the resources are not allocatable, ignore.
if (!allocatable(resources)) {
@@ -825,7 +825,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
// roles.
frameworkSorters[role]->add(resources);
frameworkSorters[role]->allocated(frameworkId_, resources);
- roleSorter->allocated(role, unreserved);
+ roleSorter->allocated(role, resources.unreserved());
}
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8c44d6e..d414061 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -923,7 +923,8 @@ struct Slave
CHECK_SOME(resources);
totalResources = resources.get();
- checkpointedResources = Resources::CheckpointFilter().apply(totalResources);
+
+ checkpointedResources = totalResources.filter(needCheckpointing);
}
const SlaveID id;
@@ -982,6 +983,14 @@ struct Slave
private:
Slave(const Slave&); // No copying.
Slave& operator = (const Slave&); // No assigning.
+
+ // Returns true iff the resource needs to be checkpointed on the slave.
+ // TODO(mpark): Consider framework reservations.
+ // TODO(mpark): Factor this out to somewhere the slave can use it.
+ static bool needCheckpointing(const Resource& resource)
+ {
+ return Resources::isPersistentVolume(resource);
+ }
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index e093651..2f2e4ea 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -102,13 +102,11 @@ Option<Error> validateUniquePersistenceID(
hashmap<string, hashset<string>> persistenceIds;
// Check duplicated persistence ID within the given resources.
- foreach (const Resource& resource, resources) {
- if (!Resources::isPersistentVolume(resource)) {
- continue;
- }
+ Resources volumes = Resources(resources).persistentVolumes();
- const string& role = resource.role();
- const string& id = resource.disk().persistence().id();
+ foreach (const Resource& volume, volumes) {
+ const string& role = volume.role();
+ const string& id = volume.disk().persistence().id();
if (persistenceIds.contains(role) &&
persistenceIds[role].contains(id)) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d449108..e52ff5a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1299,23 +1299,24 @@ void Slave::_runTask(
// should already know about it. In case the slave doesn't know
// about them (e.g., CheckpointResourcesMessage was dropped or came
// out of order), we simply fail the slave to be safe.
- foreach (const Resource& resource, task.resources()) {
- if (Resources::isPersistentVolume(resource)) {
- CHECK(checkpointedResources.contains(resource))
- << "Unknown persistent volume " << resource
- << " for task " << task.task_id()
- << " of framework " << frameworkId;
- }
+ Resources volumes = Resources(task.resources()).persistentVolumes();
+
+ foreach (const Resource& volume, volumes) {
+ CHECK(checkpointedResources.contains(volume))
+ << "Unknown persistent volume " << volume
+ << " for task " << task.task_id()
+ << " of framework " << frameworkId;
}
if (task.has_executor()) {
- foreach (const Resource& resource, task.executor().resources()) {
- if (Resources::isPersistentVolume(resource)) {
- CHECK(checkpointedResources.contains(resource))
- << "Unknown persistent volume " << resource
- << " for executor " << task.executor().executor_id()
- << " of framework " << frameworkId;
- }
+ Resources volumes =
+ Resources(task.executor().resources()).persistentVolumes();
+
+ foreach (const Resource& volume, volumes) {
+ CHECK(checkpointedResources.contains(volume))
+ << "Unknown persistent volume " << volume
+ << " for executor " << task.executor().executor_id()
+ << " of framework " << frameworkId;
}
}
@@ -1983,11 +1984,9 @@ void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
// to support multiple disks, or raw disks. Depending on the
// DiskInfo, we may want to create either directories under a root
// directory, or LVM volumes from a given device.
- foreach (const Resource& volume, newCheckpointedResources) {
- if (!Resources::isPersistentVolume(volume)) {
- continue;
- }
+ Resources volumes = newCheckpointedResources.persistentVolumes();
+ foreach (const Resource& volume, volumes) {
// This is validated in master.
CHECK_NE(volume.role(), "*");
http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/tests/resources_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resources_tests.cpp b/src/tests/resources_tests.cpp
index 4298d10..7e0ad98 100644
--- a/src/tests/resources_tests.cpp
+++ b/src/tests/resources_tests.cpp
@@ -951,7 +951,7 @@ TEST(DiskResourcesTest, FilterPersistentVolumes)
resources += r1;
resources += r2;
- EXPECT_EQ(r1, Resources::PersistentVolumeFilter().apply(resources));
+ EXPECT_EQ(r1, resources.persistentVolumes());
}