You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2016/08/01 17:00:25 UTC

[1/2] mesos git commit: Support arithmetic operations for shared resources with shared count.

Repository: mesos
Updated Branches:
  refs/heads/master 48a492cd9 -> 31315957e


Support arithmetic operations for shared resources with shared count.

A new class Resource_ is added that allows 'Resources' to group
identical shared resource objects together into a single 'Resource_'
object and tracked by its shared count. Non-shared resource objects
are not grouped.

Note that v1 changes for shared resources are in the next commit.

Review: https://reviews.apache.org/r/45959/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e61a0a8a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e61a0a8a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e61a0a8a

Branch: refs/heads/master
Commit: e61a0a8abed0ffe9ef4d918c7e873c83438ce80b
Parents: 48a492c
Author: Anindya Sinha <an...@apple.com>
Authored: Sat Jul 30 00:50:26 2016 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Mon Aug 1 09:50:48 2016 -0700

----------------------------------------------------------------------
 include/mesos/resources.hpp   | 120 ++++++++++++--
 src/common/resources.cpp      | 329 +++++++++++++++++++++++++++++--------
 src/master/validation.cpp     |   4 +-
 src/tests/mesos.hpp           |  21 ++-
 src/tests/resources_tests.cpp | 246 +++++++++++++++++++++++----
 5 files changed, 609 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e61a0a8a/include/mesos/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index 6638c8f..829f39d 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -61,6 +61,74 @@ namespace mesos {
 // +=, -=, etc.).
 class Resources
 {
+private:
+  // An internal abstraction to facilitate managing shared resources.
+  // It allows 'Resources' to group identical shared resource objects
+  // together into a single 'Resource_' object and tracked by its internal
+  // counter. Non-shared resource objects are not grouped.
+  //
+  // The rest of the private section is below the public section. We
+  // need to define Resource_ first because the public typedefs below
+  // depend on it.
+  class Resource_
+  {
+  public:
+    /*implicit*/ Resource_(const Resource& _resource)
+      : resource(_resource),
+        sharedCount(None())
+    {
+      // Setting the counter to 1 to denote "one copy" of the shared resource.
+      if (resource.has_shared()) {
+        sharedCount = 1;
+      }
+    }
+
+    // By implicitly converting to Resource we are able to keep Resource_
+    // logic internal and expose only the protobuf object.
+    operator const Resource&() const { return resource; }
+
+    // Check whether this Resource_ object corresponds to a shared resource.
+    bool isShared() const { return sharedCount.isSome(); }
+
+    // Validates this Resource_ object.
+    Option<Error> validate() const;
+
+    // Check whether this Resource_ object is empty.
+    bool isEmpty() const;
+
+    // The `Resource_` arithmetric, comparison operators and `contains()`
+    // method require the wrapped `resource` protobuf to have the same
+    // sharedness.
+    //
+    // For shared resources, the `resource` protobuf needs to be equal,
+    // and only the shared counters are adjusted or compared.
+    // For non-shared resources, the shared counters are none and the
+    // semantics of the Resource_ object's operators/contains() method
+    // are the same as those of the Resource objects.
+
+    // Checks if this Resource_ is a superset of the given Resource_.
+    bool contains(const Resource_& that) const;
+
+    Resource_& operator+=(const Resource_& that);
+    Resource_& operator-=(const Resource_& that);
+    bool operator==(const Resource_& that) const;
+    bool operator!=(const Resource_& that) const;
+
+    // Friend classes and functions for access to private members.
+    friend class Resources;
+    friend std::ostream& operator<<(
+        std::ostream& stream, const Resource_& resource_);
+
+  private:
+    // The protobuf Resource that is being managed.
+    Resource resource;
+
+    // The counter for grouping shared 'resource' objects, None if the
+    // 'resource' is non-shared. This is an int so as to support arithmetic
+    // operations involving subtraction.
+    Option<int> sharedCount;
+  };
+
 public:
   /**
    * Returns a Resource with the given name, value, and role.
@@ -217,6 +285,15 @@ public:
   // Checks if this Resources contains the given Resource.
   bool contains(const Resource& that) const;
 
+  // Count the Resource objects that match the specified value.
+  //
+  // NOTE:
+  // - For a non-shared resource the count can be at most 1 because all
+  //   non-shared Resource objects in Resources are unique.
+  // - For a shared resource the count can be greater than 1.
+  // - If the resource is not in the Resources object, the count is 0.
+  size_t count(const Resource& that) const;
+
   // Filter resources based on the given predicate.
   Resources filter(
       const lambda::function<bool(const Resource&)>& predicate) const;
@@ -338,22 +415,17 @@ public:
   // NOTE: Non-`const` `iterator`, `begin()` and `end()` are __intentionally__
   // defined with `const` semantics in order to prevent mutable access to the
   // `Resource` objects within `resources`.
-  typedef google::protobuf::RepeatedPtrField<Resource>::const_iterator
-  iterator;
-
-  typedef google::protobuf::RepeatedPtrField<Resource>::const_iterator
-  const_iterator;
+  typedef std::vector<Resource_>::const_iterator iterator;
+  typedef std::vector<Resource_>::const_iterator const_iterator;
 
   const_iterator begin()
   {
-    using google::protobuf::RepeatedPtrField;
-    return static_cast<const RepeatedPtrField<Resource>&>(resources).begin();
+    return static_cast<const std::vector<Resource_>&>(resources).begin();
   }
 
   const_iterator end()
   {
-    using google::protobuf::RepeatedPtrField;
-    return static_cast<const RepeatedPtrField<Resource>&>(resources).end();
+    return static_cast<const std::vector<Resource_>&>(resources).end();
   }
 
   const_iterator begin() const { return resources.begin(); }
@@ -361,7 +433,9 @@ public:
 
   // Using this operator makes it easy to copy a resources object into
   // a protocol buffer field.
-  operator const google::protobuf::RepeatedPtrField<Resource>&() const;
+  // Note that the google::protobuf::RepeatedPtrField<Resource> is
+  // generated at runtime.
+  operator const google::protobuf::RepeatedPtrField<Resource>() const;
 
   bool operator==(const Resources& that) const;
   bool operator!=(const Resources& that) const;
@@ -386,6 +460,9 @@ public:
   void add(const Resource& r);
   void subtract(const Resource& r);
 
+  friend std::ostream& operator<<(
+      std::ostream& stream, const Resource_& resource_);
+
 private:
   // Similar to 'contains(const Resource&)' but skips the validity
   // check. This can be used to avoid the performance overhead of
@@ -394,17 +471,36 @@ private:
   //
   // TODO(jieyu): Measure performance overhead of validity check to
   // ensure this is warranted.
-  bool _contains(const Resource& that) const;
+  bool _contains(const Resource_& that) const;
 
   // Similar to the public 'find', but only for a single Resource
   // object. The target resource may span multiple roles, so this
   // returns Resources.
   Option<Resources> find(const Resource& target) const;
 
-  google::protobuf::RepeatedPtrField<Resource> resources;
+  // The add and subtract methods and operators on Resource_ are only
+  // allowed from within Resources class so we hide them.
+
+  // Validation-free versions of += and -= `Resource_` operators.
+  // These can be used when `r` is already validated.
+  void add(const Resource_& r);
+  void subtract(const Resource_& r);
+
+  Resources operator+(const Resource_& that) const;
+  Resources& operator+=(const Resource_& that);
+
+  Resources operator-(const Resource_& that) const;
+  Resources& operator-=(const Resource_& that);
+
+  std::vector<Resource_> resources;
 };
 
 
+std::ostream& operator<<(
+    std::ostream& stream,
+    const Resources::Resource_& resource);
+
+
 std::ostream& operator<<(std::ostream& stream, const Resource& resource);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e61a0a8a/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index 468581d..309b176 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -207,6 +207,11 @@ bool operator==(const Resource& left, const Resource& right)
     return false;
   }
 
+  // Check SharedInfo.
+  if (left.has_shared() != right.has_shared()) {
+    return false;
+  }
+
   if (left.type() == Value::SCALAR) {
     return left.scalar() == right.scalar();
   } else if (left.type() == Value::RANGES) {
@@ -232,6 +237,17 @@ namespace internal {
 // different name, type or role are not addable.
 static bool addable(const Resource& left, const Resource& right)
 {
+  // Check SharedInfo.
+  if (left.has_shared() != right.has_shared()) {
+    return false;
+  }
+
+  // For shared resources, they can be added only if left == right.
+  if (left.has_shared()) {
+    return left == right;
+  }
+
+  // Now, we verify if the two non-shared resources can be added.
   if (left.name() != right.name() ||
       left.type() != right.type() ||
       left.role() != right.role()) {
@@ -253,18 +269,18 @@ static bool addable(const Resource& left, const Resource& right)
   if (left.has_disk()) {
     if (left.disk() != right.disk()) { return false; }
 
-    // Two Resources that represent exclusive 'MOUNT' disks cannot be
-    // added together; this would defeat the exclusivity.
+    // Two non-shared resources that represent exclusive 'MOUNT' disks
+    // cannot be added together; this would defeat the exclusivity.
     if (left.disk().has_source() &&
         left.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
       return false;
     }
 
     // TODO(jieyu): Even if two Resource objects with DiskInfo have
-    // the same persistence ID, they cannot be added together. In
-    // fact, this shouldn't happen if we do not add resources from
-    // different namespaces (e.g., across slave). Consider adding a
-    // warning.
+    // the same persistence ID, they cannot be added together if they
+    // are non-shared. In fact, this shouldn't happen if we do not
+    // add resources from different namespaces (e.g., across slave).
+    // Consider adding a warning.
     if (left.disk().has_persistence()) {
       return false;
     }
@@ -289,6 +305,17 @@ static bool addable(const Resource& left, const Resource& right)
 // contain "right".
 static bool subtractable(const Resource& left, const Resource& right)
 {
+  // Check SharedInfo.
+  if (left.has_shared() != right.has_shared()) {
+    return false;
+  }
+
+  // For shared resources, they can be subtracted only if left == right.
+  if (left.has_shared()) {
+    return left == right;
+  }
+
+  // Now, we verify if the two non-shared resources can be subtracted.
   if (left.name() != right.name() ||
       left.type() != right.type() ||
       left.role() != right.role()) {
@@ -310,8 +337,8 @@ static bool subtractable(const Resource& left, const Resource& right)
   if (left.has_disk()) {
     if (left.disk() != right.disk()) { return false; }
 
-    // Two Resources that represent exclusive 'MOUNT' disks cannot be
-    // subtracted from eachother if they are not the exact same mount;
+    // Two resources that represent exclusive 'MOUNT' disks cannot be
+    // subtracted from each other if they are not the exact same mount;
     // this would defeat the exclusivity.
     if (left.disk().has_source() &&
         left.disk().source().type() == Resource::DiskInfo::Source::MOUNT &&
@@ -319,8 +346,8 @@ static bool subtractable(const Resource& left, const Resource& right)
       return false;
     }
 
-    // NOTE: For Resource objects that have DiskInfo, we can only do
-    // subtraction if they are equal.
+    // NOTE: For Resource objects that have DiskInfo, we can only subtract
+    // if they are equal.
     if (left.disk().has_persistence() && left != right) {
       return false;
     }
@@ -790,6 +817,106 @@ bool Resources::isRevocable(const Resource& resource)
 // Public member functions.
 /////////////////////////////////////////////////
 
+Option<Error> Resources::Resource_::validate() const
+{
+  if (isShared() && sharedCount.get() < 0) {
+    return Error("Invalid shared resource: count < 0");
+  }
+
+  return Resources::validate(resource);
+}
+
+
+bool Resources::Resource_::isEmpty() const
+{
+  if (isShared() && sharedCount.get() == 0) {
+    return true;
+  }
+
+  return Resources::isEmpty(resource);
+}
+
+
+bool Resources::Resource_::contains(const Resource_& that) const
+{
+  // Both Resource_ objects should have the same sharedness.
+  if (isShared() != that.isShared()) {
+    return false;
+  }
+
+  // Assuming the wrapped Resource objects are equal, the 'contains'
+  // relationship is determined by the relationship of the counters
+  // for shared resources.
+  if (isShared()) {
+    return sharedCount.get() >= that.sharedCount.get() &&
+           resource == that.resource;
+  }
+
+  // For non-shared resources just compare the protobufs.
+  return internal::contains(resource, that.resource);
+}
+
+
+Resources::Resource_& Resources::Resource_::operator+=(const Resource_& that)
+{
+  if (internal::addable(resource, that.resource)) {
+    if (!isShared()) {
+      resource += that.resource;
+    } else {
+      // 'addable' makes sure both 'resource' fields are shared and
+      // equal, so we just need to sum up the counters here.
+      CHECK_SOME(sharedCount);
+      CHECK_SOME(that.sharedCount);
+
+      sharedCount = sharedCount.get() + that.sharedCount.get();
+    }
+  }
+
+  return *this;
+}
+
+
+Resources::Resource_& Resources::Resource_::operator-=(const Resource_& that)
+{
+  if (internal::subtractable(resource, that.resource)) {
+    if (!isShared()) {
+      resource -= that.resource;
+    } else {
+      // 'subtractable' makes sure both 'resource' fields are shared and
+      // equal, so we just need to subtract the counters here.
+      CHECK_SOME(sharedCount);
+      CHECK_SOME(that.sharedCount);
+
+      sharedCount = sharedCount.get() - that.sharedCount.get();
+    }
+  }
+
+  return *this;
+}
+
+
+bool Resources::Resource_::operator==(const Resource_& that) const
+{
+  // Both Resource_ objects should have the same sharedness.
+  if (isShared() != that.isShared()) {
+    return false;
+  }
+
+  // For shared resources to be equal, the shared counts need to match.
+  if (isShared() && (sharedCount.get() != that.sharedCount.get())) {
+    return false;
+  }
+
+  return resource == that.resource;
+}
+
+
+bool Resources::Resource_::operator!=(const Resource_& that) const
+{
+  return !(*this == that);
+}
+
+
 Resources::Resources(const Resource& resource)
 {
   // NOTE: Invalid and zero Resource object will be ignored.
@@ -819,15 +946,15 @@ bool Resources::contains(const Resources& that) const
 {
   Resources remaining = *this;
 
-  foreach (const Resource& resource, that.resources) {
+  foreach (const Resource_& resource_, that.resources) {
     // NOTE: We use _contains because Resources only contain valid
     // Resource objects, and we don't want the performance hit of the
     // validity check.
-    if (!remaining._contains(resource)) {
+    if (!remaining._contains(resource_)) {
       return false;
     }
 
-    remaining.subtract(resource);
+    remaining.subtract(resource_);
   }
 
   return true;
@@ -839,7 +966,21 @@ bool Resources::contains(const Resource& that) const
   // NOTE: We must validate 'that' because invalid resources can lead
   // to false positives here (e.g., "cpus:-1" will return true). This
   // is because 'contains' assumes resources are valid.
-  return validate(that).isNone() && _contains(that);
+  return validate(that).isNone() && _contains(Resource_(that));
+}
+
+
+size_t Resources::count(const Resource& that) const
+{
+  foreach (const Resource_& resource_, resources) {
+    if (resource_.resource == that) {
+      // Return 1 for non-shared resources because non-shared
+      // Resource objects in Resources are unique.
+      return resource_.isShared() ? resource_.sharedCount.get() : 1;
+    }
+  }
+
+  return 0;
 }
 
 
@@ -847,9 +988,9 @@ Resources Resources::filter(
     const lambda::function<bool(const Resource&)>& predicate) const
 {
   Resources result;
-  foreach (const Resource& resource, resources) {
-    if (predicate(resource)) {
-      result.add(resource);
+  foreach (const Resource_& resource_, resources) {
+    if (predicate(resource_.resource)) {
+      result.add(resource_);
     }
   }
   return result;
@@ -860,9 +1001,9 @@ hashmap<string, Resources> Resources::reservations() const
 {
   hashmap<string, Resources> result;
 
-  foreach (const Resource& resource, resources) {
-    if (isReserved(resource)) {
-      result[resource.role()].add(resource);
+  foreach (const Resource_& resource_, resources) {
+    if (isReserved(resource_.resource)) {
+      result[resource_.resource.role()].add(resource_);
     }
   }
 
@@ -907,14 +1048,14 @@ Resources Resources::flatten(
 {
   Resources flattened;
 
-  foreach (Resource resource, resources) {
-    resource.set_role(role);
+  foreach (Resource_ resource_, resources) {
+    resource_.resource.set_role(role);
     if (reservation.isNone()) {
-      resource.clear_reservation();
+      resource_.resource.clear_reservation();
     } else {
-      resource.mutable_reservation()->CopyFrom(reservation.get());
+      resource_.resource.mutable_reservation()->CopyFrom(reservation.get());
     }
-    flattened += resource;
+    flattened += resource_;
   }
 
   return flattened;
@@ -1284,10 +1425,10 @@ Option<Value::Ranges> Resources::ephemeral_ports() const
 // Private member functions.
 /////////////////////////////////////////////////
 
-bool Resources::_contains(const Resource& that) const
+bool Resources::_contains(const Resource_& that) const
 {
-  foreach (const Resource& resource, resources) {
-    if (internal::contains(resource, that)) {
+  foreach (const Resource_& resource_, resources) {
+    if (resource_.contains(that)) {
       return true;
     }
   }
@@ -1310,21 +1451,21 @@ Option<Resources> Resources::find(const Resource& target) const
   };
 
   foreach (const auto& predicate, predicates) {
-    foreach (const Resource& resource, total.filter(predicate)) {
+    foreach (const Resource_& resource_, total.filter(predicate)) {
       // Need to flatten to ignore the roles in contains().
-      Resources flattened = Resources(resource).flatten();
+      Resources flattened = Resources(resource_.resource).flatten();
 
       if (flattened.contains(remaining)) {
         // The target has been found, return the result.
-        if (!resource.has_reservation()) {
-          return found + remaining.flatten(resource.role());
+        if (!resource_.resource.has_reservation()) {
+          return found + remaining.flatten(resource_.resource.role());
         } else {
-          return found +
-                 remaining.flatten(resource.role(), resource.reservation());
+          return found + remaining.flatten(
+              resource_.resource.role(), resource_.resource.reservation());
         }
       } else if (remaining.contains(flattened)) {
-        found.add(resource);
-        total.subtract(resource);
+        found.add(resource_);
+        total.subtract(resource_);
         remaining -= flattened;
         break;
       }
@@ -1339,9 +1480,14 @@ Option<Resources> Resources::find(const Resource& target) const
 // Overloaded operators.
 /////////////////////////////////////////////////
 
-Resources::operator const RepeatedPtrField<Resource>&() const
+Resources::operator const RepeatedPtrField<Resource>() const
 {
-  return resources;
+  RepeatedPtrField<Resource> all;
+  foreach(const Resource& resource, resources) {
+    all.Add()->CopyFrom(resource);
+  }
+
+  return all;
 }
 
 
@@ -1357,6 +1503,14 @@ bool Resources::operator!=(const Resources& that) const
 }
 
 
+Resources Resources::operator+(const Resource_& that) const
+{
+  Resources result = *this;
+  result += that;
+  return result;
+}
+
+
 Resources Resources::operator+(const Resource& that) const
 {
   Resources result = *this;
@@ -1373,16 +1527,16 @@ Resources Resources::operator+(const Resources& that) const
 }
 
 
-void Resources::add(const Resource& that)
+void Resources::add(const Resource_& that)
 {
-  if (isEmpty(that)) {
+  if (that.isEmpty()) {
     return;
   }
 
   bool found = false;
-  foreach (Resource& resource, resources) {
-    if (internal::addable(resource, that)) {
-      resource += that;
+  foreach (Resource_& resource_, resources) {
+    if (internal::addable(resource_.resource, that)) {
+      resource_ += that;
       found = true;
       break;
     }
@@ -1390,13 +1544,20 @@ void Resources::add(const Resource& that)
 
   // Cannot be combined with any existing Resource object.
   if (!found) {
-    resources.Add()->CopyFrom(that);
+    resources.push_back(that);
   }
 }
 
-Resources& Resources::operator+=(const Resource& that)
+
+void Resources::add(const Resource& that)
+{
+  add(Resource_(that));
+}
+
+
+Resources& Resources::operator+=(const Resource_& that)
 {
-  if (validate(that).isNone()) {
+  if (that.validate().isNone()) {
     add(that);
   }
 
@@ -1404,16 +1565,32 @@ Resources& Resources::operator+=(const Resource& that)
 }
 
 
+Resources& Resources::operator+=(const Resource& that)
+{
+  *this += Resource_(that);
+
+  return *this;
+}
+
+
 Resources& Resources::operator+=(const Resources& that)
 {
-  foreach (const Resource& resource, that.resources) {
-    add(resource);
+  foreach (const Resource_& resource_, that) {
+    add(resource_);
   }
 
   return *this;
 }
 
 
+Resources Resources::operator-(const Resource_& that) const
+{
+  Resources result = *this;
+  result -= that;
+  return result;
+}
+
+
 Resources Resources::operator-(const Resource& that) const
 {
   Resources result = *this;
@@ -1430,28 +1607,27 @@ Resources Resources::operator-(const Resources& that) const
 }
 
 
-void Resources::subtract(const Resource& that)
+void Resources::subtract(const Resource_& that)
 {
-  if (isEmpty(that)) {
+  if (that.isEmpty()) {
     return;
   }
 
-  for (int i = 0; i < resources.size(); i++) {
-    Resource* resource = resources.Mutable(i);
+  for (size_t i = 0; i < resources.size(); i++) {
+    Resource_& resource_ = resources[i];
 
-    if (internal::subtractable(*resource, that)) {
-      *resource -= that;
+    if (internal::subtractable(resource_.resource, that)) {
+      resource_ -= that;
 
       // Remove the resource if it becomes invalid or zero. We need
       // to do the validation because we want to strip negative
       // scalar Resource object.
-      if (validate(*resource).isSome() || isEmpty(*resource)) {
+      if (resource_.validate().isSome() || resource_.isEmpty()) {
         // As `resources` is not ordered, and erasing an element
-        // from the middle using `DeleteSubrange` is expensive, we
-        // swap with the last element and then shrink the
-        // 'RepeatedPtrField' by one.
-        resources.Mutable(i)->Swap(resources.Mutable(resources.size() - 1));
-        resources.RemoveLast();
+        // from the middle is expensive, we swap with the last element
+        // and then shrink the vector by one.
+        resources[i] = resources.back();
+        resources.pop_back();
       }
 
       break;
@@ -1460,9 +1636,15 @@ void Resources::subtract(const Resource& that)
 }
 
 
-Resources& Resources::operator-=(const Resource& that)
+void Resources::subtract(const Resource& that)
+{
+  subtract(Resource_(that));
+}
+
+
+Resources& Resources::operator-=(const Resource_& that)
 {
-  if (validate(that).isNone()) {
+  if (that.validate().isNone()) {
     subtract(that);
   }
 
@@ -1470,10 +1652,18 @@ Resources& Resources::operator-=(const Resource& that)
 }
 
 
+Resources& Resources::operator-=(const Resource& that)
+{
+  *this -= Resource_(that);
+
+  return *this;
+}
+
+
 Resources& Resources::operator-=(const Resources& that)
 {
-  foreach (const Resource& resource, that.resources) {
-    subtract(resource);
+  foreach (const Resource_& resource_, that) {
+    subtract(resource_);
   }
 
   return *this;
@@ -1587,6 +1777,17 @@ ostream& operator<<(ostream& stream, const Resource& resource)
 }
 
 
+ostream& operator<<(ostream& stream, const Resources::Resource_& resource_)
+{
+  stream << resource_.resource;
+  if (resource_.isShared()) {
+    stream << "<" << resource_.sharedCount.get() << ">";
+  }
+
+  return stream;
+}
+
+
 ostream& operator<<(ostream& stream, const Resources& resources)
 {
   Resources::const_iterator it = resources.begin();

http://git-wip-us.apache.org/repos/asf/mesos/blob/e61a0a8a/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 52002be..f834376 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -445,12 +445,12 @@ Option<Error> validateDiskInfo(const RepeatedPtrField<Resource>& resources)
 // Validates the uniqueness of the persistence IDs used in the given
 // resources. They need to be unique per role on each slave.
 Option<Error> validateUniquePersistenceID(
-    const RepeatedPtrField<Resource>& resources)
+    const Resources& resources)
 {
   hashmap<string, hashset<string>> persistenceIds;
 
   // Check duplicated persistence ID within the given resources.
-  Resources volumes = Resources(resources).persistentVolumes();
+  Resources volumes = resources.persistentVolumes();
 
   foreach (const Resource& volume, volumes) {
     const string& role = volume.role();

http://git-wip-us.apache.org/repos/asf/mesos/blob/e61a0a8a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 51c66f1..9174a38 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -594,13 +594,18 @@ inline Resource createDiskResource(
     const std::string& role,
     const Option<std::string>& persistenceID,
     const Option<std::string>& containerPath,
-    const Option<Resource::DiskInfo::Source>& source = None())
+    const Option<Resource::DiskInfo::Source>& source = None(),
+    bool isShared = false)
 {
   Resource resource = Resources::parse("disk", value, role).get();
 
   if (persistenceID.isSome() || containerPath.isSome() || source.isSome()) {
     resource.mutable_disk()->CopyFrom(
         createDiskInfo(persistenceID, containerPath, None(), None(), source));
+
+    if (isShared) {
+      resource.mutable_shared();
+    }
   }
 
   return resource;
@@ -616,7 +621,8 @@ inline Resource createPersistentVolume(
     const std::string& containerPath,
     const Option<std::string>& reservationPrincipal = None(),
     const Option<Resource::DiskInfo::Source>& source = None(),
-    const Option<std::string>& creatorPrincipal = None())
+    const Option<std::string>& creatorPrincipal = None(),
+    bool isShared = false)
 {
   Resource volume = Resources::parse(
       "disk",
@@ -636,6 +642,10 @@ inline Resource createPersistentVolume(
     volume.mutable_reservation()->set_principal(reservationPrincipal.get());
   }
 
+  if (isShared) {
+    volume.mutable_shared();
+  }
+
   return volume;
 }
 
@@ -647,7 +657,8 @@ inline Resource createPersistentVolume(
     const std::string& persistenceId,
     const std::string& containerPath,
     const Option<std::string>& reservationPrincipal = None(),
-    const Option<std::string>& creatorPrincipal = None())
+    const Option<std::string>& creatorPrincipal = None(),
+    bool isShared = false)
 {
   Option<Resource::DiskInfo::Source> source = None();
   if (volume.has_disk() && volume.disk().has_source()) {
@@ -667,6 +678,10 @@ inline Resource createPersistentVolume(
     volume.mutable_reservation()->set_principal(reservationPrincipal.get());
   }
 
+  if (isShared) {
+    volume.mutable_shared();
+  }
+
   return volume;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e61a0a8a/src/tests/resources_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resources_tests.cpp b/src/tests/resources_tests.cpp
index 4111e08..06ccc0c 100644
--- a/src/tests/resources_tests.cpp
+++ b/src/tests/resources_tests.cpp
@@ -182,10 +182,10 @@ TEST(ResourcesTest, ParsingFromJSON)
   ASSERT_SOME(resourcesTry);
 
   Resources cpuResources(resourcesTry.get());
-  auto cpus = cpuResources.begin();
+  const Resource& cpus = *(cpuResources.begin());
 
-  ASSERT_EQ(Value::SCALAR, cpus->type());
-  EXPECT_EQ(45.55, cpus->scalar().value());
+  ASSERT_EQ(Value::SCALAR, cpus.type());
+  EXPECT_EQ(45.55, cpus.scalar().value());
 
   EXPECT_EQ(1u, cpuResources.size());
 
@@ -213,17 +213,17 @@ TEST(ResourcesTest, ParsingFromJSON)
   ASSERT_SOME(resourcesTry);
 
   Resources portResources(resourcesTry.get());
-  auto ports = portResources.begin();
+  const Resource& ports = *(portResources.begin());
 
-  EXPECT_EQ(Value::RANGES, ports->type());
-  EXPECT_EQ(2, ports->ranges().range_size());
+  EXPECT_EQ(Value::RANGES, ports.type());
+  EXPECT_EQ(2, ports.ranges().range_size());
 
   // Do not specify the ordering of ranges, only check the values.
-  if (10000 != ports->ranges().range(0).begin()) {
-    EXPECT_EQ(30000u, ports->ranges().range(0).begin());
-    EXPECT_EQ(10000u, ports->ranges().range(1).begin());
+  if (10000 != ports.ranges().range(0).begin()) {
+    EXPECT_EQ(30000u, ports.ranges().range(0).begin());
+    EXPECT_EQ(10000u, ports.ranges().range(1).begin());
   } else {
-    EXPECT_EQ(30000u, ports->ranges().range(1).begin());
+    EXPECT_EQ(30000u, ports.ranges().range(1).begin());
   }
 
   jsonString =
@@ -244,18 +244,18 @@ TEST(ResourcesTest, ParsingFromJSON)
   ASSERT_SOME(resourcesTry);
 
   Resources pandaResources(resourcesTry.get());
-  auto pandas = pandaResources.begin();
+  const Resource& pandas = *(pandaResources.begin());
 
-  EXPECT_EQ(Value::SET, pandas->type());
-  EXPECT_EQ(2, pandas->set().item_size());
-  EXPECT_EQ("pandas", pandas->name());
+  EXPECT_EQ(Value::SET, pandas.type());
+  EXPECT_EQ(2, pandas.set().item_size());
+  EXPECT_EQ("pandas", pandas.name());
 
   // Do not specify the ordering of the set's items, only check the values.
-  if ("lun_lun" != pandas->set().item(0)) {
-    EXPECT_EQ("yang_yang", pandas->set().item(0));
-    EXPECT_EQ("lun_lun", pandas->set().item(1));
+  if ("lun_lun" != pandas.set().item(0)) {
+    EXPECT_EQ("yang_yang", pandas.set().item(0));
+    EXPECT_EQ("lun_lun", pandas.set().item(1));
   } else {
-    EXPECT_EQ("yang_yang", pandas->set().item(1));
+    EXPECT_EQ("yang_yang", pandas.set().item(1));
   }
 
   jsonString =
@@ -301,9 +301,9 @@ TEST(ResourcesTest, ParsingFromJSON)
   Resources r1(resourcesTry.get());
 
   Resources r2;
-  r2 += *cpus;
-  r2 += *ports;
-  r2 += *pandas;
+  r2 += cpus;
+  r2 += ports;
+  r2 += pandas;
 
   EXPECT_EQ(r1, r2);
 }
@@ -327,11 +327,11 @@ TEST(ResourcesTest, ParsingFromJSONWithRoles)
   ASSERT_SOME(resourcesTry);
 
   Resources cpuResources(resourcesTry.get());
-  auto cpus = cpuResources.begin();
+  const Resource& cpus = *(cpuResources.begin());
 
-  ASSERT_EQ(Value::SCALAR, cpus->type());
-  EXPECT_EQ(45.55, cpus->scalar().value());
-  EXPECT_EQ("role1", cpus->role());
+  ASSERT_EQ(Value::SCALAR, cpus.type());
+  EXPECT_EQ(45.55, cpus.scalar().value());
+  EXPECT_EQ("role1", cpus.role());
 
   jsonString =
     "[\n"
@@ -349,14 +349,14 @@ TEST(ResourcesTest, ParsingFromJSONWithRoles)
   ASSERT_SOME(resourcesTry);
 
   Resources cpuResources2(resourcesTry.get());
-  auto cpus2 = cpuResources2.begin();
+  const Resource& cpus2 = *(cpuResources2.begin());
 
   Resources resources;
-  resources += *cpus2;
-  resources += *cpus;
-  resources += *cpus;
+  resources += cpus2;
+  resources += cpus;
+  resources += cpus;
 
-  EXPECT_TRUE(resources.contains(Resources(*cpus)));
+  EXPECT_TRUE(resources.contains(Resources(cpus)));
   EXPECT_EQ(145.54, resources.cpus().get());
 
   foreach (const Resource& resource, resources) {
@@ -2410,6 +2410,192 @@ TEST(RevocableResourceTest, Filter)
 }
 
 
+TEST(ResourcesTest, Count)
+{
+  // The summation of identical shared resources is valid and
+  // the result is reflected in the count.
+  Resource sharedDisk = createDiskResource(
+      "100", "role1", "1", "path1", None(), true);
+  EXPECT_EQ(1, (Resources(sharedDisk)).count(sharedDisk));
+  EXPECT_EQ(2, (Resources(sharedDisk) + sharedDisk).count(sharedDisk));
+
+  // The summation is invalid and a no-op for non-shared disks so the
+  // count remains 1.
+  Resource nonSharedDisk = createDiskResource("100", "role1", "1", "path1");
+  EXPECT_EQ(1, (Resources(nonSharedDisk)).count(nonSharedDisk));
+  EXPECT_EQ(1, (Resources(nonSharedDisk) + nonSharedDisk).count(nonSharedDisk));
+
+  // After the summation the scalar changes so the count is 0.
+  Resource cpus = Resources::parse("cpus", "1", "*").get();
+  EXPECT_EQ(1, Resources(cpus).count(cpus));
+  EXPECT_EQ(0, (Resources(cpus) + cpus).count(cpus));
+}
+
+
+TEST(SharedResourcesTest, ScalarAdditionShared)
+{
+  // Shared persistent volume.
+  Resource disk = createDiskResource(
+      "50", "role1", "1", "path", None(), true);
+
+  Resources r1;
+  r1 += Resources::parse("cpus", "1", "*").get();
+  r1 += Resources::parse("mem", "5", "*").get();
+  r1 += disk;
+
+  EXPECT_EQ(1, r1.count(disk));
+
+  Resources r2 = Resources::parse("cpus:2;mem:10").get() + disk;
+
+  EXPECT_EQ(1, r2.count(disk));
+
+  // Verify addition (operator+) on Resources.
+  Resources sum = r1 + r2;
+
+  EXPECT_FALSE(sum.empty());
+  EXPECT_EQ(3u, sum.size());
+  EXPECT_EQ(3, sum.get<Value::Scalar>("cpus").get().value());
+  EXPECT_EQ(15, sum.get<Value::Scalar>("mem").get().value());
+  EXPECT_EQ(50, sum.get<Value::Scalar>("disk").get().value());
+  EXPECT_EQ(2, sum.count(disk));
+
+  // Verify operator+= on Resources is the same as operator+.
+  Resources r = r1;
+  r += r2;
+  EXPECT_EQ(r, sum);
+}
+
+
+TEST(SharedResourcesTest, ScalarSubtractionShared)
+{
+  // Shared persistent volume.
+  Resource disk = createDiskResource(
+      "8192", "role1", "1", "path", None(), true);
+
+  Resources r1 = Resources::parse("cpus:40;mem:4096").get() + disk + disk;
+  Resources r2 = Resources::parse("cpus:5;mem:512").get() + disk;
+
+  // Verify subtraction (operator-) on Resources.
+  Resources diff = r1 - r2;
+
+  EXPECT_FALSE(diff.empty());
+  EXPECT_EQ(3u, diff.size());
+  EXPECT_EQ(35, diff.get<Value::Scalar>("cpus").get().value());
+  EXPECT_EQ(3584, diff.get<Value::Scalar>("mem").get().value());
+  EXPECT_EQ(8192, diff.get<Value::Scalar>("disk").get().value());
+  EXPECT_EQ(1, diff.count(disk));
+  EXPECT_TRUE(diff.contains(disk));
+
+  // Verify operator-= on Resources is the same as operator-.
+  Resources r = r1;
+  r -= r2;
+  EXPECT_EQ(diff, r);
+
+  // Verify that when all copies of shared resource is removed, that specific
+  // shared resource is no longer contained in the Resources object.
+  EXPECT_EQ(2, r1.count(disk));
+  EXPECT_TRUE(r1.contains(disk));
+  EXPECT_EQ(1, r2.count(disk));
+  EXPECT_TRUE(r2.contains(disk));
+
+  EXPECT_EQ(0, (r1 - r2 - r2).count(disk));
+  EXPECT_FALSE((r1 - r2 - r2).contains(disk));
+  EXPECT_EQ(0, (r2 - r1).count(disk));
+  EXPECT_FALSE((r2 - r1).contains(disk));
+}
+
+
+TEST(SharedResourcesTest, ScalarSharedCompoundExpressions)
+{
+  // Shared persistent volume.
+  Resource disk = createDiskResource(
+      "50", "role1", "1", "path", None(), true);
+
+  Resources r1 = Resources::parse("cpus:2;mem:10").get() +
+    disk + disk + disk + disk;
+  Resources r2 = Resources::parse("cpus:2;mem:10").get() + disk + disk + disk;
+
+  EXPECT_EQ(4, r1.count(disk));
+  EXPECT_EQ(3, r2.count(disk));
+
+  // Verify multiple arithmetic operations on shared resources.
+  EXPECT_EQ(r1 + r1 - r1, r1);
+  EXPECT_EQ(r1 + r2 - r1, r2);
+  EXPECT_EQ(r2 + r1 - r2, r1);
+  EXPECT_EQ(r2 + r1 - r1, r2);
+  EXPECT_EQ(r2 - r1 + r1, r1);
+  EXPECT_EQ(r1 - r2 + r2, r1);
+
+  // Verify subtraction of Resources when only shared counts vary.
+  EXPECT_TRUE((r2 - r1).empty());
+  EXPECT_FALSE((r1 - r2).empty());
+}
+
+
+// Verify shared counts on addition and subtraction of shared
+// resources which differ in their scalar values.
+TEST(SharedResourcesTest, ScalarNonEqualSharedOperations)
+{
+  // Shared persistent volumes.
+  Resource disk1 = createDiskResource(
+      "50", "role1", "1", "path1", None(), true);
+  Resource disk2 = createDiskResource(
+      "100", "role1", "2", "path2", None(), true);
+
+  Resources r1 = Resources(disk1) + disk2;
+
+  EXPECT_EQ(1, r1.count(disk1));
+  EXPECT_EQ(1, r1.count(disk2));
+
+  Resources r2 = Resources(disk1) + disk2 - disk1;
+
+  EXPECT_EQ(0, r2.count(disk1));
+  EXPECT_EQ(1, r2.count(disk2));
+
+  // Cannot subtract nonequal shared resources.
+  Resources r3 = Resources(disk1) - disk2;
+
+  EXPECT_EQ(1, r3.count(disk1));
+  EXPECT_EQ(0, r3.count(disk2));
+}
+
+
+// Verify addition and subtraction of similar resources which differ in
+// their sharedness only.
+TEST(SharedResourcesTest, ScalarSharedAndNonSharedOperations)
+{
+  Resource sharedDisk = createDiskResource(
+      "100", "role1", "1", "path", None(), true);
+
+  Resource nonSharedDisk = createDiskResource("100", "role1", "1", "path");
+
+  Resources r1 = Resources::parse("cpus:1;mem:5").get() + sharedDisk;
+  Resources r2 = Resources::parse("cpus:1;mem:5").get() + nonSharedDisk;
+
+  // r1 and r2 don't contain each other because of sharedDisk and
+  // nonSharedDisk's different sharedness.
+  EXPECT_FALSE(r2.contains(r1));
+  EXPECT_FALSE(r1.contains(r2));
+
+  // Additions of resources with non-matching sharedness.
+  Resources r3 = sharedDisk;
+  r3 += sharedDisk;
+  r3 += nonSharedDisk;
+
+  EXPECT_FALSE(r3.empty());
+  EXPECT_EQ(2u, r3.size());
+  EXPECT_EQ(200, r3.get<Value::Scalar>("disk").get().value());
+  EXPECT_EQ(2, r3.count(sharedDisk));
+  EXPECT_EQ(1, r3.count(nonSharedDisk));
+
+  // Cannot subtract resources with non-matching sharedness.
+  Resources r4 = nonSharedDisk;
+  r4 -= sharedDisk;
+
+  EXPECT_EQ(r4, nonSharedDisk);
+}
+
+
 struct Parameter
 {
   Resources resources;


[2/2] mesos git commit: Add v1 changes for shared resources.

Posted by ya...@apache.org.
Add v1 changes for shared resources.

Review: https://reviews.apache.org/r/48616/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31315957
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31315957
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31315957

Branch: refs/heads/master
Commit: 31315957e986948d992966229b51e5c42d9b256b
Parents: e61a0a8
Author: Anindya Sinha <an...@apple.com>
Authored: Mon Aug 1 01:21:37 2016 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Mon Aug 1 09:50:50 2016 -0700

----------------------------------------------------------------------
 include/mesos/v1/resources.hpp | 120 +++++++++++--
 src/v1/resources.cpp           | 328 +++++++++++++++++++++++++++++-------
 2 files changed, 372 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/31315957/include/mesos/v1/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resources.hpp b/include/mesos/v1/resources.hpp
index fdbabf5..f3c5f31 100644
--- a/include/mesos/v1/resources.hpp
+++ b/include/mesos/v1/resources.hpp
@@ -61,6 +61,74 @@ namespace v1 {
 // +=, -=, etc.).
 class Resources
 {
+private:
+  // An internal abstraction to facilitate managing shared resources.
+  // It allows 'Resources' to group identical shared resource objects
+  // together into a single 'Resource_' object and tracked by its internal
+  // counter. Non-shared resource objects are not grouped.
+  //
+  // The rest of the private section is below the public section. We
+  // need to define Resource_ first because the public typedefs below
+  // depend on it.
+  class Resource_
+  {
+  public:
+    /*implicit*/ Resource_(const Resource& _resource)
+      : resource(_resource),
+        sharedCount(None())
+    {
+      // Setting the counter to 1 to denote "one copy" of the shared resource.
+      if (resource.has_shared()) {
+        sharedCount = 1;
+      }
+    }
+
+    // By implicitly converting to Resource we are able to keep Resource_
+    // logic internal and expose only the protobuf object.
+    operator const Resource&() const { return resource; }
+
+    // Check whether this Resource_ object corresponds to a shared resource.
+    bool isShared() const { return sharedCount.isSome(); }
+
+    // Validates this Resource_ object.
+    Option<Error> validate() const;
+
+    // Check whether this Resource_ object is empty.
+    bool isEmpty() const;
+
+    // The `Resource_` arithmetric, comparison operators and `contains()`
+    // method require the wrapped `resource` protobuf to have the same
+    // sharedness.
+    //
+    // For shared resources, the `resource` protobuf needs to be equal,
+    // and only the shared counters are adjusted or compared.
+    // For non-shared resources, the shared counters are none and the
+    // semantics of the Resource_ object's operators/contains() method
+    // are the same as those of the Resource objects.
+
+    // Checks if this Resource_ is a superset of the given Resource_.
+    bool contains(const Resource_& that) const;
+
+    Resource_& operator+=(const Resource_& that);
+    Resource_& operator-=(const Resource_& that);
+    bool operator==(const Resource_& that) const;
+    bool operator!=(const Resource_& that) const;
+
+    // Friend classes and functions for access to private members.
+    friend class Resources;
+    friend std::ostream& operator<<(
+        std::ostream& stream, const Resource_& resource_);
+
+  private:
+    // The protobuf Resource that is being managed.
+    Resource resource;
+
+    // The counter for grouping shared 'resource' objects, None if the
+    // 'resource' is non-shared. This is an int so as to support arithmetic
+    // operations involving subtraction.
+    Option<int> sharedCount;
+  };
+
 public:
   /**
    * Returns a Resource with the given name, value, and role.
@@ -217,6 +285,15 @@ public:
   // Checks if this Resources contains the given Resource.
   bool contains(const Resource& that) const;
 
+  // Count the Resource objects that match the specified value.
+  //
+  // NOTE:
+  // - For a non-shared resource the count can be at most 1 because all
+  //   non-shared Resource objects in Resources are unique.
+  // - For a shared resource the count can be greater than 1.
+  // - If the resource is not in the Resources object, the count is 0.
+  size_t count(const Resource& that) const;
+
   // Filter resources based on the given predicate.
   Resources filter(
       const lambda::function<bool(const Resource&)>& predicate) const;
@@ -338,22 +415,17 @@ public:
   // NOTE: Non-`const` `iterator`, `begin()` and `end()` are __intentionally__
   // defined with `const` semantics in order to prevent mutable access to the
   // `Resource` objects within `resources`.
-  typedef google::protobuf::RepeatedPtrField<Resource>::const_iterator
-  iterator;
-
-  typedef google::protobuf::RepeatedPtrField<Resource>::const_iterator
-  const_iterator;
+  typedef std::vector<Resource_>::const_iterator iterator;
+  typedef std::vector<Resource_>::const_iterator const_iterator;
 
   const_iterator begin()
   {
-    using google::protobuf::RepeatedPtrField;
-    return static_cast<const RepeatedPtrField<Resource>&>(resources).begin();
+    return static_cast<const std::vector<Resource_>&>(resources).begin();
   }
 
   const_iterator end()
   {
-    using google::protobuf::RepeatedPtrField;
-    return static_cast<const RepeatedPtrField<Resource>&>(resources).end();
+    return static_cast<const std::vector<Resource_>&>(resources).end();
   }
 
   const_iterator begin() const { return resources.begin(); }
@@ -361,7 +433,9 @@ public:
 
   // Using this operator makes it easy to copy a resources object into
   // a protocol buffer field.
-  operator const google::protobuf::RepeatedPtrField<Resource>&() const;
+  // Note that the google::protobuf::RepeatedPtrField<Resource> is
+  // generated at runtime.
+  operator const google::protobuf::RepeatedPtrField<Resource>() const;
 
   bool operator==(const Resources& that) const;
   bool operator!=(const Resources& that) const;
@@ -386,6 +460,9 @@ public:
   void add(const Resource& r);
   void subtract(const Resource& r);
 
+  friend std::ostream& operator<<(
+      std::ostream& stream, const Resource_& resource_);
+
 private:
   // Similar to 'contains(const Resource&)' but skips the validity
   // check. This can be used to avoid the performance overhead of
@@ -394,17 +471,36 @@ private:
   //
   // TODO(jieyu): Measure performance overhead of validity check to
   // ensure this is warranted.
-  bool _contains(const Resource& that) const;
+  bool _contains(const Resource_& that) const;
 
   // Similar to the public 'find', but only for a single Resource
   // object. The target resource may span multiple roles, so this
   // returns Resources.
   Option<Resources> find(const Resource& target) const;
 
-  google::protobuf::RepeatedPtrField<Resource> resources;
+  // The add and subtract methods and operators on Resource_ are only
+  // allowed from within Resources class so we hide them.
+
+  // Validation-free versions of += and -= `Resource_` operators.
+  // These can be used when `r` is already validated.
+  void add(const Resource_& r);
+  void subtract(const Resource_& r);
+
+  Resources operator+(const Resource_& that) const;
+  Resources& operator+=(const Resource_& that);
+
+  Resources operator-(const Resource_& that) const;
+  Resources& operator-=(const Resource_& that);
+
+  std::vector<Resource_> resources;
 };
 
 
+std::ostream& operator<<(
+    std::ostream& stream,
+    const Resources::Resource_& resource);
+
+
 std::ostream& operator<<(std::ostream& stream, const Resource& resource);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/31315957/src/v1/resources.cpp
----------------------------------------------------------------------
diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp
index 230d55b..3f67e32 100644
--- a/src/v1/resources.cpp
+++ b/src/v1/resources.cpp
@@ -209,6 +209,11 @@ bool operator==(const Resource& left, const Resource& right)
     return false;
   }
 
+  // Check SharedInfo.
+  if (left.has_shared() != right.has_shared()) {
+    return false;
+  }
+
   if (left.type() == Value::SCALAR) {
     return left.scalar() == right.scalar();
   } else if (left.type() == Value::RANGES) {
@@ -234,6 +239,17 @@ namespace internal {
 // different name, type or role are not addable.
 static bool addable(const Resource& left, const Resource& right)
 {
+  // Check SharedInfo.
+  if (left.has_shared() != right.has_shared()) {
+    return false;
+  }
+
+  // For shared resources, they can be added only if left == right.
+  if (left.has_shared()) {
+    return left == right;
+  }
+
+  // Now, we verify if the two non-shared resources can be added.
   if (left.name() != right.name() ||
       left.type() != right.type() ||
       left.role() != right.role()) {
@@ -255,18 +271,18 @@ static bool addable(const Resource& left, const Resource& right)
   if (left.has_disk()) {
     if (left.disk() != right.disk()) { return false; }
 
-    // Two Resources that represent exclusive 'MOUNT' disks cannot be
-    // added together; this would defeat the exclusivity.
+    // Two non-shared resources that represent exclusive 'MOUNT' disks
+    // cannot be added together; this would defeat the exclusivity.
     if (left.disk().has_source() &&
         left.disk().source().type() == Resource::DiskInfo::Source::MOUNT) {
       return false;
     }
 
     // TODO(jieyu): Even if two Resource objects with DiskInfo have
-    // the same persistence ID, they cannot be added together. In
-    // fact, this shouldn't happen if we do not add resources from
-    // different namespaces (e.g., across slave). Consider adding a
-    // warning.
+    // the same persistence ID, they cannot be added together if they
+    // are non-shared. In fact, this shouldn't happen if we do not
+    // add resources from different namespaces (e.g., across slave).
+    // Consider adding a warning.
     if (left.disk().has_persistence()) {
       return false;
     }
@@ -291,6 +307,17 @@ static bool addable(const Resource& left, const Resource& right)
 // contain "right".
 static bool subtractable(const Resource& left, const Resource& right)
 {
+  // Check SharedInfo.
+  if (left.has_shared() != right.has_shared()) {
+    return false;
+  }
+
+  // For shared resources, they can be subtracted only if left == right.
+  if (left.has_shared()) {
+    return left == right;
+  }
+
+  // Now, we verify if the two non-shared resources can be subtracted.
   if (left.name() != right.name() ||
       left.type() != right.type() ||
       left.role() != right.role()) {
@@ -312,8 +339,8 @@ static bool subtractable(const Resource& left, const Resource& right)
   if (left.has_disk()) {
     if (left.disk() != right.disk()) { return false; }
 
-    // Two Resources that represent exclusive 'MOUNT' disks cannot be
-    // subtracted from eachother if they are not the exact same mount;
+    // Two resources that represent exclusive 'MOUNT' disks cannot be
+    // subtracted from each other if they are not the exact same mount;
     // this would defeat the exclusivity.
     if (left.disk().has_source() &&
         left.disk().source().type() == Resource::DiskInfo::Source::MOUNT &&
@@ -321,8 +348,8 @@ static bool subtractable(const Resource& left, const Resource& right)
       return false;
     }
 
-    // NOTE: For Resource objects that have DiskInfo, we can only do
-    // subtraction if they are equal.
+    // NOTE: For Resource objects that have DiskInfo, we can only subtract
+    // if they are equal.
     if (left.disk().has_persistence() && left != right) {
       return false;
     }
@@ -793,6 +820,106 @@ bool Resources::isRevocable(const Resource& resource)
 // Public member functions.
 /////////////////////////////////////////////////
 
+Option<Error> Resources::Resource_::validate() const
+{
+  if (isShared() && sharedCount.get() < 0) {
+    return Error("Invalid shared resource: count < 0");
+  }
+
+  return Resources::validate(resource);
+}
+
+
+bool Resources::Resource_::isEmpty() const
+{
+  if (isShared() && sharedCount.get() == 0) {
+    return true;
+  }
+
+  return Resources::isEmpty(resource);
+}
+
+
+bool Resources::Resource_::contains(const Resource_& that) const
+{
+  // Both Resource_ objects should have the same sharedness.
+  if (isShared() != that.isShared()) {
+    return false;
+  }
+
+  // Assuming the wrapped Resource objects are equal, the 'contains'
+  // relationship is determined by the relationship of the counters
+  // for shared resources.
+  if (isShared()) {
+    return sharedCount.get() >= that.sharedCount.get() &&
+           resource == that.resource;
+  }
+
+  // For non-shared resources just compare the protobufs.
+  return internal::contains(resource, that.resource);
+}
+
+
+Resources::Resource_& Resources::Resource_::operator+=(const Resource_& that)
+{
+  if (internal::addable(resource, that.resource)) {
+    if (!isShared()) {
+      resource += that.resource;
+    } else {
+      // 'addable' makes sure both 'resource' fields are shared and
+      // equal, so we just need to sum up the counters here.
+      CHECK_SOME(sharedCount);
+      CHECK_SOME(that.sharedCount);
+
+      sharedCount = sharedCount.get() + that.sharedCount.get();
+    }
+  }
+
+  return *this;
+}
+
+
+Resources::Resource_& Resources::Resource_::operator-=(const Resource_& that)
+{
+  if (internal::subtractable(resource, that.resource)) {
+    if (!isShared()) {
+      resource -= that.resource;
+    } else {
+      // 'subtractable' makes sure both 'resource' fields are shared and
+      // equal, so we just need to subtract the counters here.
+      CHECK_SOME(sharedCount);
+      CHECK_SOME(that.sharedCount);
+
+      sharedCount = sharedCount.get() - that.sharedCount.get();
+    }
+  }
+
+  return *this;
+}
+
+
+bool Resources::Resource_::operator==(const Resource_& that) const
+{
+  // Both Resource_ objects should have the same sharedness.
+  if (isShared() != that.isShared()) {
+    return false;
+  }
+
+  // For shared resources to be equal, the shared counts need to match.
+  if (isShared() && (sharedCount.get() != that.sharedCount.get())) {
+    return false;
+  }
+
+  return resource == that.resource;
+}
+
+
+bool Resources::Resource_::operator!=(const Resource_& that) const
+{
+  return !(*this == that);
+}
+
+
 Resources::Resources(const Resource& resource)
 {
   // NOTE: Invalid and zero Resource object will be ignored.
@@ -822,15 +949,15 @@ bool Resources::contains(const Resources& that) const
 {
   Resources remaining = *this;
 
-  foreach (const Resource& resource, that.resources) {
+  foreach (const Resource_& resource_, that.resources) {
     // NOTE: We use _contains because Resources only contain valid
     // Resource objects, and we don't want the performance hit of the
     // validity check.
-    if (!remaining._contains(resource)) {
+    if (!remaining._contains(resource_)) {
       return false;
     }
 
-    remaining.subtract(resource);
+    remaining.subtract(resource_);
   }
 
   return true;
@@ -842,7 +969,21 @@ bool Resources::contains(const Resource& that) const
   // NOTE: We must validate 'that' because invalid resources can lead
   // to false positives here (e.g., "cpus:-1" will return true). This
   // is because 'contains' assumes resources are valid.
-  return validate(that).isNone() && _contains(that);
+  return validate(that).isNone() && _contains(Resource_(that));
+}
+
+
+size_t Resources::count(const Resource& that) const
+{
+  foreach (const Resource_& resource_, resources) {
+    if (resource_.resource == that) {
+      // Return 1 for non-shared resources because non-shared
+      // Resource objects in Resources are unique.
+      return resource_.isShared() ? resource_.sharedCount.get() : 1;
+    }
+  }
+
+  return 0;
 }
 
 
@@ -850,9 +991,9 @@ Resources Resources::filter(
     const lambda::function<bool(const Resource&)>& predicate) const
 {
   Resources result;
-  foreach (const Resource& resource, resources) {
-    if (predicate(resource)) {
-      result.add(resource);
+  foreach (const Resource_& resource_, resources) {
+    if (predicate(resource_.resource)) {
+      result.add(resource_);
     }
   }
   return result;
@@ -863,9 +1004,9 @@ hashmap<string, Resources> Resources::reservations() const
 {
   hashmap<string, Resources> result;
 
-  foreach (const Resource& resource, resources) {
-    if (isReserved(resource)) {
-      result[resource.role()].add(resource);
+  foreach (const Resource_& resource_, resources) {
+    if (isReserved(resource_.resource)) {
+      result[resource_.resource.role()].add(resource_);
     }
   }
 
@@ -910,14 +1051,14 @@ Resources Resources::flatten(
 {
   Resources flattened;
 
-  foreach (Resource resource, resources) {
-    resource.set_role(role);
+  foreach (Resource_ resource_, resources) {
+    resource_.resource.set_role(role);
     if (reservation.isNone()) {
-      resource.clear_reservation();
+      resource_.resource.clear_reservation();
     } else {
-      resource.mutable_reservation()->CopyFrom(reservation.get());
+      resource_.resource.mutable_reservation()->CopyFrom(reservation.get());
     }
-    flattened += resource;
+    flattened += resource_;
   }
 
   return flattened;
@@ -1287,10 +1428,10 @@ Option<Value::Ranges> Resources::ephemeral_ports() const
 // Private member functions.
 /////////////////////////////////////////////////
 
-bool Resources::_contains(const Resource& that) const
+bool Resources::_contains(const Resource_& that) const
 {
-  foreach (const Resource& resource, resources) {
-    if (internal::contains(resource, that)) {
+  foreach (const Resource_& resource_, resources) {
+    if (resource_.contains(that)) {
       return true;
     }
   }
@@ -1313,21 +1454,21 @@ Option<Resources> Resources::find(const Resource& target) const
   };
 
   foreach (const auto& predicate, predicates) {
-    foreach (const Resource& resource, total.filter(predicate)) {
+    foreach (const Resource_& resource_, total.filter(predicate)) {
       // Need to flatten to ignore the roles in contains().
-      Resources flattened = Resources(resource).flatten();
+      Resources flattened = Resources(resource_.resource).flatten();
 
       if (flattened.contains(remaining)) {
         // The target has been found, return the result.
-        if (!resource.has_reservation()) {
-          return found + remaining.flatten(resource.role());
+        if (!resource_.resource.has_reservation()) {
+          return found + remaining.flatten(resource_.resource.role());
         } else {
-          return found +
-                 remaining.flatten(resource.role(), resource.reservation());
+          return found + remaining.flatten(
+              resource_.resource.role(), resource_.resource.reservation());
         }
       } else if (remaining.contains(flattened)) {
-        found.add(resource);
-        total.subtract(resource);
+        found.add(resource_);
+        total.subtract(resource_);
         remaining -= flattened;
         break;
       }
@@ -1342,9 +1483,14 @@ Option<Resources> Resources::find(const Resource& target) const
 // Overloaded operators.
 /////////////////////////////////////////////////
 
-Resources::operator const RepeatedPtrField<Resource>&() const
+Resources::operator const RepeatedPtrField<Resource>() const
 {
-  return resources;
+  RepeatedPtrField<Resource> all;
+  foreach(const Resource& resource, resources) {
+    all.Add()->CopyFrom(resource);
+  }
+
+  return all;
 }
 
 
@@ -1360,6 +1506,14 @@ bool Resources::operator!=(const Resources& that) const
 }
 
 
+Resources Resources::operator+(const Resource_& that) const
+{
+  Resources result = *this;
+  result += that;
+  return result;
+}
+
+
 Resources Resources::operator+(const Resource& that) const
 {
   Resources result = *this;
@@ -1376,16 +1530,16 @@ Resources Resources::operator+(const Resources& that) const
 }
 
 
-void Resources::add(const Resource& that)
+void Resources::add(const Resource_& that)
 {
-  if (isEmpty(that)) {
+  if (that.isEmpty()) {
     return;
   }
 
   bool found = false;
-  foreach (Resource& resource, resources) {
-    if (internal::addable(resource, that)) {
-      resource += that;
+  foreach (Resource_& resource_, resources) {
+    if (internal::addable(resource_.resource, that)) {
+      resource_ += that;
       found = true;
       break;
     }
@@ -1393,14 +1547,20 @@ void Resources::add(const Resource& that)
 
   // Cannot be combined with any existing Resource object.
   if (!found) {
-    resources.Add()->CopyFrom(that);
+    resources.push_back(that);
   }
 }
 
 
-Resources& Resources::operator+=(const Resource& that)
+void Resources::add(const Resource& that)
+{
+  add(Resource_(that));
+}
+
+
+Resources& Resources::operator+=(const Resource_& that)
 {
-  if (validate(that).isNone()) {
+  if (that.validate().isNone()) {
     add(that);
   }
 
@@ -1408,16 +1568,32 @@ Resources& Resources::operator+=(const Resource& that)
 }
 
 
+Resources& Resources::operator+=(const Resource& that)
+{
+  *this += Resource_(that);
+
+  return *this;
+}
+
+
 Resources& Resources::operator+=(const Resources& that)
 {
-  foreach (const Resource& resource, that.resources) {
-    add(resource);
+  foreach (const Resource_& resource_, that) {
+    add(resource_);
   }
 
   return *this;
 }
 
 
+Resources Resources::operator-(const Resource_& that) const
+{
+  Resources result = *this;
+  result -= that;
+  return result;
+}
+
+
 Resources Resources::operator-(const Resource& that) const
 {
   Resources result = *this;
@@ -1434,28 +1610,27 @@ Resources Resources::operator-(const Resources& that) const
 }
 
 
-void Resources::subtract(const Resource& that)
+void Resources::subtract(const Resource_& that)
 {
-  if (isEmpty(that)) {
+  if (that.isEmpty()) {
     return;
   }
 
-  for (int i = 0; i < resources.size(); i++) {
-    Resource* resource = resources.Mutable(i);
+  for (size_t i = 0; i < resources.size(); i++) {
+    Resource_& resource_ = resources[i];
 
-    if (internal::subtractable(*resource, that)) {
-      *resource -= that;
+    if (internal::subtractable(resource_.resource, that)) {
+      resource_ -= that;
 
       // Remove the resource if it becomes invalid or zero. We need
       // to do the validation because we want to strip negative
       // scalar Resource object.
-      if (validate(*resource).isSome() || isEmpty(*resource)) {
+      if (resource_.validate().isSome() || resource_.isEmpty()) {
         // As `resources` is not ordered, and erasing an element
-        // from the middle using `DeleteSubrange` is expensive, we
-        // swap with the last element and then shrink the
-        // 'RepeatedPtrField' by one.
-        resources.Mutable(i)->Swap(resources.Mutable(resources.size() - 1));
-        resources.RemoveLast();
+        // from the middle is expensive, we swap with the last element
+        // and then shrink the vector by one.
+        resources[i] = resources.back();
+        resources.pop_back();
       }
 
       break;
@@ -1464,9 +1639,15 @@ void Resources::subtract(const Resource& that)
 }
 
 
-Resources& Resources::operator-=(const Resource& that)
+void Resources::subtract(const Resource& that)
+{
+  subtract(Resource_(that));
+}
+
+
+Resources& Resources::operator-=(const Resource_& that)
 {
-  if (validate(that).isNone()) {
+  if (that.validate().isNone()) {
     subtract(that);
   }
 
@@ -1474,10 +1655,18 @@ Resources& Resources::operator-=(const Resource& that)
 }
 
 
+Resources& Resources::operator-=(const Resource& that)
+{
+  *this -= Resource_(that);
+
+  return *this;
+}
+
+
 Resources& Resources::operator-=(const Resources& that)
 {
-  foreach (const Resource& resource, that.resources) {
-    subtract(resource);
+  foreach (const Resource_& resource_, that) {
+    subtract(resource_);
   }
 
   return *this;
@@ -1591,6 +1780,17 @@ ostream& operator<<(ostream& stream, const Resource& resource)
 }
 
 
+ostream& operator<<(ostream& stream, const Resources::Resource_& resource_)
+{
+  stream << resource_.resource;
+  if (resource_.isShared()) {
+    stream << "<" << resource_.sharedCount.get() << ">";
+  }
+
+  return stream;
+}
+
+
 ostream& operator<<(ostream& stream, const Resources& resources)
 {
   Resources::const_iterator it = resources.begin();