You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/25 01:18:44 UTC

mesos git commit: Updated the filter abstraction for Resources.

Repository: mesos
Updated Branches:
  refs/heads/master b913ddf17 -> c64b58894


Updated the filter abstraction for Resources.

Review: https://reviews.apache.org/r/30911


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c64b5889
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c64b5889
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c64b5889

Branch: refs/heads/master
Commit: c64b5889460025eab3d282ff679fdad7dd927e43
Parents: b913ddf
Author: Michael Park <mc...@gmail.com>
Authored: Tue Feb 24 15:45:53 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 24 16:14:51 2015 -0800

----------------------------------------------------------------------
 include/mesos/resources.hpp                 | 95 ++++--------------------
 src/common/resources.cpp                    | 71 ++++++++++--------
 src/master/allocator/mesos/hierarchical.hpp | 12 +--
 src/master/master.hpp                       | 11 ++-
 src/master/validation.cpp                   | 10 +--
 src/slave/slave.cpp                         | 35 +++++----
 src/tests/resources_tests.cpp               |  2 +-
 7 files changed, 91 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/include/mesos/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index c242bcc..da6d488 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -30,6 +30,7 @@
 #include <stout/check.hpp>
 #include <stout/error.hpp>
 #include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 
@@ -94,11 +95,11 @@ public:
   // Tests if the given Resource object is a persistent volume.
   static bool isPersistentVolume(const Resource& resource);
 
-  // Tests if the given Resource object is reserved.
-  static bool isReserved(const Resource& resource);
-
-  // Tests if the given Resource object is reserved for the given role.
-  static bool isReserved(const Resource& resource, const std::string& role);
+  // Tests if the given Resource object is reserved. If the role is
+  // specified, tests that it's reserved for the given role.
+  static bool isReserved(
+      const Resource& resource,
+      const Option<std::string>& role = None());
 
   // Tests if the given Resource object is unreserved.
   static bool isUnreserved(const Resource& resource);
@@ -132,6 +133,10 @@ public:
   // Checks if this Resources contains the given Resource.
   bool contains(const Resource& that) const;
 
+  // Filter resources based on the given predicate.
+  Resources filter(
+      const lambda::function<bool(const Resource&)>& predicate) const;
+
   // Returns the reserved resources, by role.
   hashmap<std::string, Resources> reserved() const;
 
@@ -142,6 +147,9 @@ public:
   // Returns the unreserved resources.
   Resources unreserved() const;
 
+  // Returns the persistent volumes.
+  Resources persistentVolumes() const;
+
   // Returns a Resources object with the same amount of each resource
   // type as these Resources, but with all Resource objects marked as
   // the specified role.
@@ -235,83 +243,6 @@ public:
   Resources& operator -= (const Resource& that);
   Resources& operator -= (const Resources& that);
 
-  // The base class for all resources filters.
-  // TODO(jieyu): Pull resources filters out of Resources class and
-  // possibly put them inside a resources::filter namespace.
-  class Filter
-  {
-  public:
-    // Apply this filter to the given resources and return the
-    // filtered resources.
-    virtual Resources apply(const Resources& resources) const = 0;
-  };
-
-  class RoleFilter : public Filter
-  {
-  public:
-    static RoleFilter any() { return RoleFilter(); }
-
-    RoleFilter() : type(ANY) {}
-
-    explicit RoleFilter(const std::string& _role)
-      : type(SOME), role(_role) {}
-
-    virtual Resources apply(const Resources& resources) const
-    {
-      if (type == ANY) {
-        return resources;
-      }
-
-      CHECK_SOME(role);
-
-      return role.get() == "*" ?
-        resources.unreserved() :
-        resources.reserved(role.get());
-    }
-
-  private:
-    enum { ANY, SOME } type;
-    Option<std::string> role;
-  };
-
-  class PersistentVolumeFilter : public Filter
-  {
-  public:
-    PersistentVolumeFilter() {}
-
-    virtual Resources apply(const Resources& resources) const
-    {
-      Resources result;
-      foreach (const Resource& resource, resources) {
-        if (isPersistentVolume(resource)) {
-          result += resource;
-        }
-      }
-      return result;
-    }
-  };
-
-  // Resources that need checkpointing on the slave.
-  // TODO(jieyu): This filter is only used by master and slave.
-  // Consider pulling this out of this header.
-  class CheckpointFilter : public Filter
-  {
-  public:
-    CheckpointFilter() {}
-
-    virtual Resources apply(const Resources& resources) const
-    {
-      Resources result;
-      foreach (const Resource& resource, resources) {
-        // TODO(jieyu): Consider dynamic reservation as well.
-        if (isPersistentVolume(resource)) {
-          result += resource;
-        }
-      }
-      return result;
-    }
-  };
-
 private:
   // Similar to 'contains(const Resource&)' but skips the validity
   // check. This can be used to avoid the performance overhead of

http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index a45bbaf..2c99b68 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -26,6 +26,7 @@
 #include <mesos/values.hpp>
 
 #include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
 #include <stout/strings.hpp>
 
 using std::ostream;
@@ -406,15 +407,15 @@ bool Resources::isPersistentVolume(const Resource& resource)
 }
 
 
-bool Resources::isReserved(const Resource& resource)
+bool Resources::isReserved(
+    const Resource& resource,
+    const Option<std::string>& role)
 {
-  return !isUnreserved(resource);
-}
-
-
-bool Resources::isReserved(const Resource& resource, const std::string& role)
-{
-  return isReserved(resource) && resource.role() == role;
+  if (role.isSome()) {
+    return resource.role() != "*" && role.get() == resource.role();
+  } else {
+    return resource.role() != "*";
+  }
 }
 
 
@@ -483,6 +484,19 @@ bool Resources::contains(const Resource& that) const
 }
 
 
+Resources Resources::filter(
+    const lambda::function<bool(const Resource&)>& predicate) const
+{
+  Resources result;
+  foreach (const Resource& resource, resources) {
+    if (predicate(resource)) {
+      result += resource;
+    }
+  }
+  return result;
+}
+
+
 hashmap<string, Resources> Resources::reserved() const
 {
   hashmap<string, Resources> result;
@@ -499,29 +513,19 @@ hashmap<string, Resources> Resources::reserved() const
 
 Resources Resources::reserved(const string& role) const
 {
-  Resources result;
-
-  foreach (const Resource& resource, resources) {
-    if (isReserved(resource, role)) {
-      result += resource;
-    }
-  }
-
-  return result;
+  return filter(lambda::bind(isReserved, lambda::_1, role));
 }
 
 
 Resources Resources::unreserved() const
 {
-  Resources result;
+  return filter(isUnreserved);
+}
 
-  foreach (const Resource& resource, resources) {
-    if (isUnreserved(resource)) {
-      result += resource;
-    }
-  }
 
-  return result;
+Resources Resources::persistentVolumes() const
+{
+  return filter(isPersistentVolume);
 }
 
 
@@ -538,21 +542,26 @@ Resources Resources::flatten(const string& role) const
 }
 
 
+// A predicate that returns true for any resource.
+static bool any(const Resource&) { return true; }
+
+
 Option<Resources> Resources::find(const Resource& target) const
 {
   Resources found;
   Resources total = *this;
   Resources remaining = Resources(target).flatten();
 
-  // First look in the target role, then "*", then any remaining role.
-  vector<RoleFilter> filters = {
-    RoleFilter(target.role()),
-    RoleFilter("*"),
-    RoleFilter::any()
+  // First look in the target role, then unreserved, then any remaining role.
+  // TODO(mpark): Use a lambda for 'any' instead once we get full C++11.
+  vector<lambda::function<bool(const Resource&)>> predicates = {
+    lambda::bind(isReserved, lambda::_1, target.role()),
+    isUnreserved,
+    any
   };
 
-  foreach (const RoleFilter& filter, filters) {
-    foreach (const Resource& resource, filter.apply(total)) {
+  foreach (const auto& predicate, predicates) {
+    foreach (const Resource& resource, total.filter(predicate)) {
       // Need to flatten to ignore the roles in contains().
       Resources flattened = Resources(resource).flatten();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 9b7ded3..c0b1da7 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -795,11 +795,11 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
         FrameworkID frameworkId;
         frameworkId.set_value(frameworkId_);
 
-        Resources unreserved = slaves[slaveId].available.unreserved();
-        Resources resources = unreserved;
-        if (role != "*") {
-          resources += slaves[slaveId].available.reserved(role);
-        }
+        // NOTE: Currently, frameworks are allowed to have '*' role.
+        // Calling reserved('*') returns an empty Resources object.
+        Resources resources =
+          slaves[slaveId].available.unreserved() +
+          slaves[slaveId].available.reserved(role);
 
         // If the resources are not allocatable, ignore.
         if (!allocatable(resources)) {
@@ -825,7 +825,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
         // roles.
         frameworkSorters[role]->add(resources);
         frameworkSorters[role]->allocated(frameworkId_, resources);
-        roleSorter->allocated(role, unreserved);
+        roleSorter->allocated(role, resources.unreserved());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8c44d6e..d414061 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -923,7 +923,8 @@ struct Slave
     CHECK_SOME(resources);
 
     totalResources = resources.get();
-    checkpointedResources = Resources::CheckpointFilter().apply(totalResources);
+
+    checkpointedResources = totalResources.filter(needCheckpointing);
   }
 
   const SlaveID id;
@@ -982,6 +983,14 @@ struct Slave
 private:
   Slave(const Slave&);              // No copying.
   Slave& operator = (const Slave&); // No assigning.
+
+  // Returns true iff the resource needs to be checkpointed on the slave.
+  // TODO(mpark): Consider framework reservations.
+  // TODO(mpark): Factor this out to somewhere the slave can use it.
+  static bool needCheckpointing(const Resource& resource)
+  {
+    return Resources::isPersistentVolume(resource);
+  }
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index e093651..2f2e4ea 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -102,13 +102,11 @@ Option<Error> validateUniquePersistenceID(
   hashmap<string, hashset<string>> persistenceIds;
 
   // Check duplicated persistence ID within the given resources.
-  foreach (const Resource& resource, resources) {
-    if (!Resources::isPersistentVolume(resource)) {
-      continue;
-    }
+  Resources volumes = Resources(resources).persistentVolumes();
 
-    const string& role = resource.role();
-    const string& id = resource.disk().persistence().id();
+  foreach (const Resource& volume, volumes) {
+    const string& role = volume.role();
+    const string& id = volume.disk().persistence().id();
 
     if (persistenceIds.contains(role) &&
         persistenceIds[role].contains(id)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d449108..e52ff5a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1299,23 +1299,24 @@ void Slave::_runTask(
   // should already know about it. In case the slave doesn't know
   // about them (e.g., CheckpointResourcesMessage was dropped or came
   // out of order), we simply fail the slave to be safe.
-  foreach (const Resource& resource, task.resources()) {
-    if (Resources::isPersistentVolume(resource)) {
-      CHECK(checkpointedResources.contains(resource))
-        << "Unknown persistent volume " << resource
-        << " for task " << task.task_id()
-        << " of framework " << frameworkId;
-    }
+  Resources volumes = Resources(task.resources()).persistentVolumes();
+
+  foreach (const Resource& volume, volumes) {
+    CHECK(checkpointedResources.contains(volume))
+      << "Unknown persistent volume " << volume
+      << " for task " << task.task_id()
+      << " of framework " << frameworkId;
   }
 
   if (task.has_executor()) {
-    foreach (const Resource& resource, task.executor().resources()) {
-      if (Resources::isPersistentVolume(resource)) {
-        CHECK(checkpointedResources.contains(resource))
-          << "Unknown persistent volume " << resource
-          << " for executor " << task.executor().executor_id()
-          << " of framework " << frameworkId;
-      }
+    Resources volumes =
+      Resources(task.executor().resources()).persistentVolumes();
+
+    foreach (const Resource& volume, volumes) {
+      CHECK(checkpointedResources.contains(volume))
+        << "Unknown persistent volume " << volume
+        << " for executor " << task.executor().executor_id()
+        << " of framework " << frameworkId;
     }
   }
 
@@ -1983,11 +1984,9 @@ void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
   // to support multiple disks, or raw disks. Depending on the
   // DiskInfo, we may want to create either directories under a root
   // directory, or LVM volumes from a given device.
-  foreach (const Resource& volume, newCheckpointedResources) {
-    if (!Resources::isPersistentVolume(volume)) {
-      continue;
-    }
+  Resources volumes = newCheckpointedResources.persistentVolumes();
 
+  foreach (const Resource& volume, volumes) {
     // This is validated in master.
     CHECK_NE(volume.role(), "*");
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c64b5889/src/tests/resources_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resources_tests.cpp b/src/tests/resources_tests.cpp
index 4298d10..7e0ad98 100644
--- a/src/tests/resources_tests.cpp
+++ b/src/tests/resources_tests.cpp
@@ -951,7 +951,7 @@ TEST(DiskResourcesTest, FilterPersistentVolumes)
   resources += r1;
   resources += r2;
 
-  EXPECT_EQ(r1, Resources::PersistentVolumeFilter().apply(resources));
+  EXPECT_EQ(r1, resources.persistentVolumes());
 }