You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2018/10/05 18:05:32 UTC

[mesos] 02/04: Optimized `class Resources` with copy-on-write.

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 291dcc3a4218bdf1ac9124da903f80891875afe8
Author: Meng Zhu <mz...@mesosphere.io>
AuthorDate: Thu Sep 20 13:20:06 2018 -0700

    Optimized `class Resources` with copy-on-write.
    
    This patch lets `class Resources` only store shared
    pointers to the underlying resource objects, so that
    read-only filter operations such as `reserved()`,
    `unreserved()` and etc. can avoid making copies of
    the whole resource objects. Instead, only shared pointers
    are copied.
    
    In write operations, we check if there are more than one
    references to the resource object. If so, a copy is made
    for safe mutation without affecting owners.
    
    To maintain the usage abstraction that `class Resources`
    still holds resource objects, we utilize
    `boost::indirect_iterator` iterator adapter to deference
    the shared pointers as we iterate.
    
    Review: https://reviews.apache.org/r/68490/
---
 include/mesos/resources.hpp    |  52 +++++++--
 include/mesos/v1/resources.hpp |  52 +++++++--
 src/common/resources.cpp       | 248 ++++++++++++++++++++++++++---------------
 src/v1/resources.cpp           | 236 +++++++++++++++++++++++++--------------
 4 files changed, 398 insertions(+), 190 deletions(-)

diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index 6f81b14..914b7be 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -23,6 +23,8 @@
 #include <string>
 #include <vector>
 
+#include <boost/iterator/indirect_iterator.hpp>
+
 #include <google/protobuf/repeated_field.h>
 
 #include <mesos/mesos.hpp>
@@ -585,20 +587,29 @@ public:
   // which holds the ephemeral ports allocation logic.
   Option<Value::Ranges> ephemeral_ports() const;
 
-  // NOTE: Non-`const` `iterator`, `begin()` and `end()` are __intentionally__
-  // defined with `const` semantics in order to prevent mutable access to the
-  // `Resource` objects within `resources`.
-  typedef std::vector<Resource_>::const_iterator iterator;
-  typedef std::vector<Resource_>::const_iterator const_iterator;
+  // We use `boost::indirect_iterator` to expose `Resource` (implicitly
+  // converted from `Resource_`) iteration, while actually storing
+  // `shared_ptr<Resource_>`.
+  //
+  // NOTE: Non-const `begin()` and `end()` intentionally return const
+  // iterators to prevent mutable access to the `Resource` objects.
+
+  typedef boost::indirect_iterator<
+      std::vector<std::shared_ptr<Resource_>>::const_iterator>
+    const_iterator;
 
   const_iterator begin()
   {
-    return static_cast<const std::vector<Resource_>&>(resources).begin();
+    return static_cast<const std::vector<std::shared_ptr<Resource_>>&>(
+               resources)
+      .begin();
   }
 
   const_iterator end()
   {
-    return static_cast<const std::vector<Resource_>&>(resources).end();
+    return static_cast<const std::vector<std::shared_ptr<Resource_>>&>(
+               resources)
+      .end();
   }
 
   const_iterator begin() const { return resources.begin(); }
@@ -666,6 +677,10 @@ private:
   // objects, so here the API can also accept a `Resource` object.
   void add(const Resource_& r);
   void add(Resource_&& r);
+
+  // TODO(mzhu): Add move support.
+  void add(const std::shared_ptr<Resource_>& that);
+
   void subtract(const Resource_& r);
 
   Resources& operator+=(const Resource_& that);
@@ -673,7 +688,28 @@ private:
 
   Resources& operator-=(const Resource_& that);
 
-  std::vector<Resource_> resources;
+  // Resources are stored using copy-on-write:
+  //
+  //   (1) Copies are done by copying the `shared_ptr`. This
+  //       makes read-only filtering (e.g. `unreserved()`)
+  //       inexpensive as we do not have to perform copies
+  //       of the resource objects.
+  //
+  //   (2) When a write occurs:
+  //      (a) If there's a single reference to the resource
+  //          object, we mutate directly.
+  //      (b) If there's more than a single reference to the
+  //          resource object, we copy first, then mutate the copy.
+  //
+  // TODO(mzhu): When mutating the resource objects, we need to manually
+  // conduct the copy-on-write check as described in (2). This is
+  // brittle. Explore more robust designs such as introducing a customized
+  // copy-on-write abstraction that hides direct setters and only allow
+  // mutations in a controlled fashion.
+  //
+  // TODO(mzhu): Consider using `boost::intrusive_ptr` for
+  // possibly better performance.
+  std::vector<std::shared_ptr<Resource_>> resources;
 };
 
 
diff --git a/include/mesos/v1/resources.hpp b/include/mesos/v1/resources.hpp
index 0911053..07d6092 100644
--- a/include/mesos/v1/resources.hpp
+++ b/include/mesos/v1/resources.hpp
@@ -23,6 +23,8 @@
 #include <string>
 #include <vector>
 
+#include <boost/iterator/indirect_iterator.hpp>
+
 #include <google/protobuf/repeated_field.h>
 
 #include <mesos/v1/mesos.hpp>
@@ -579,20 +581,29 @@ public:
   // which holds the ephemeral ports allocation logic.
   Option<Value::Ranges> ephemeral_ports() const;
 
-  // NOTE: Non-`const` `iterator`, `begin()` and `end()` are __intentionally__
-  // defined with `const` semantics in order to prevent mutable access to the
-  // `Resource` objects within `resources`.
-  typedef std::vector<Resource_>::const_iterator iterator;
-  typedef std::vector<Resource_>::const_iterator const_iterator;
+  // We use `boost::indirect_iterator` to expose `Resource` (implicitly
+  // converted from `Resource_`) iteration, while actually storing
+  // `shared_ptr<Resource_>`.
+  //
+  // NOTE: Non-const `begin()` and `end()` intentionally return const
+  // iterators to prevent mutable access to the `Resource` objects.
+
+  typedef boost::indirect_iterator<
+      std::vector<std::shared_ptr<Resource_>>::const_iterator>
+    const_iterator;
 
   const_iterator begin()
   {
-    return static_cast<const std::vector<Resource_>&>(resources).begin();
+    return static_cast<const std::vector<std::shared_ptr<Resource_>>&>(
+               resources)
+      .begin();
   }
 
   const_iterator end()
   {
-    return static_cast<const std::vector<Resource_>&>(resources).end();
+    return static_cast<const std::vector<std::shared_ptr<Resource_>>&>(
+               resources)
+      .end();
   }
 
   const_iterator begin() const { return resources.begin(); }
@@ -660,6 +671,10 @@ private:
   // objects, so here the API can also accept a `Resource` object.
   void add(const Resource_& r);
   void add(Resource_&& r);
+
+  // TODO(mzhu): Add move support.
+  void add(const std::shared_ptr<Resource_>& that);
+
   void subtract(const Resource_& r);
 
   Resources& operator+=(const Resource_& that);
@@ -667,7 +682,28 @@ private:
 
   Resources& operator-=(const Resource_& that);
 
-  std::vector<Resource_> resources;
+  // Resources are stored using copy-on-write:
+  //
+  //   (1) Copies are done by copying the `shared_ptr`. This
+  //       makes read-only filtering (e.g. `unreserved()`)
+  //       inexpensive as we do not have to perform copies
+  //       of the resource objects.
+  //
+  //   (2) When a write occurs:
+  //      (a) If there's a single reference to the resource
+  //          object, we mutate directly.
+  //      (b) If there's more than a single reference to the
+  //          resource object, we copy first, then mutate the copy.
+  //
+  // TODO(mzhu): When mutating the resource objects, we need to manually
+  // conduct the copy-on-write check as described in (2). This is
+  // brittle. Explore more robust designs such as introducing a customized
+  // copy-on-write abstraction that hides direct setters and only allow
+  // mutations in a controlled fashion.
+  //
+  // TODO(mzhu): Consider using `boost::intrusive_ptr` for
+  // possibly better performance.
+  std::vector<std::shared_ptr<Resource_>> resources;
 };
 
 
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index 0110c0e..4719ea5 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -40,9 +40,11 @@
 
 #include "common/resources_utils.hpp"
 
+using std::make_shared;
 using std::map;
 using std::ostream;
 using std::set;
+using std::shared_ptr;
 using std::string;
 using std::vector;
 
@@ -1468,16 +1470,16 @@ bool Resources::contains(const Resources& that) const
 {
   Resources remaining = *this;
 
-  foreach (const Resource_& resource_, that.resources) {
+  foreach (const shared_ptr<Resource_>& resource_, that.resources) {
     // NOTE: We use _contains because Resources only contain valid
     // Resource objects, and we don't want the performance hit of the
     // validity check.
-    if (!remaining._contains(resource_)) {
+    if (!remaining._contains(*resource_)) {
       return false;
     }
 
-    if (isPersistentVolume(resource_.resource)) {
-      remaining.subtract(resource_);
+    if (isPersistentVolume(resource_->resource)) {
+      remaining.subtract(*resource_);
     }
   }
 
@@ -1496,11 +1498,11 @@ bool Resources::contains(const Resource& that) const
 
 size_t Resources::count(const Resource& that) const
 {
-  foreach (const Resource_& resource_, resources) {
-    if (resource_.resource == that) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource == that) {
       // Return 1 for non-shared resources because non-shared
       // Resource objects in Resources are unique.
-      return resource_.isShared() ? resource_.sharedCount.get() : 1;
+      return resource_->isShared() ? CHECK_NOTNONE(resource_->sharedCount) : 1;
     }
   }
 
@@ -1510,17 +1512,25 @@ size_t Resources::count(const Resource& that) const
 
 void Resources::allocate(const string& role)
 {
-  foreach (Resource_& resource_, resources) {
-    resource_.resource.mutable_allocation_info()->set_role(role);
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    // Copy-on-write (if more than 1 reference).
+    if (resource_.use_count() > 1) {
+      resource_ = make_shared<Resource_>(*resource_);
+    }
+    resource_->resource.mutable_allocation_info()->set_role(role);
   }
 }
 
 
 void Resources::unallocate()
 {
-  foreach (Resource_& resource_, resources) {
-    if (resource_.resource.has_allocation_info()) {
-      resource_.resource.clear_allocation_info();
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.has_allocation_info()) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        resource_ = make_shared<Resource_>(*resource_);
+      }
+      resource_->resource.clear_allocation_info();
     }
   }
 }
@@ -1530,8 +1540,10 @@ Resources Resources::filter(
     const lambda::function<bool(const Resource&)>& predicate) const
 {
   Resources result;
-  foreach (const Resource_& resource_, resources) {
-    if (predicate(resource_.resource)) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (predicate(resource_->resource)) {
+      // TODO(mzhu): `add` is O(n), explore whether we can simply
+      // do `push_back` by assuming resources are already combined.
       result.add(resource_);
     }
   }
@@ -1543,9 +1555,9 @@ hashmap<string, Resources> Resources::reservations() const
 {
   hashmap<string, Resources> result;
 
-  foreach (const Resource_& resource_, resources) {
-    if (isReserved(resource_.resource)) {
-      result[reservationRole(resource_.resource)].add(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (isReserved(resource_->resource)) {
+      result[reservationRole(resource_->resource)].add(resource_);
     }
   }
 
@@ -1607,27 +1619,30 @@ hashmap<string, Resources> Resources::allocations() const
 {
   hashmap<string, Resources> result;
 
-  foreach (const Resource_& resource_, resources) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
     // We require that this is called only when
     // the resources are allocated.
-    CHECK(resource_.resource.has_allocation_info());
-    CHECK(resource_.resource.allocation_info().has_role());
-    result[resource_.resource.allocation_info().role()].add(resource_);
+    CHECK(resource_->resource.has_allocation_info());
+    CHECK(resource_->resource.allocation_info().has_role());
+    result[resource_->resource.allocation_info().role()].add(resource_);
   }
 
   return result;
 }
 
 
+// TODO(mzhu): Introduce `pushReservations`, so that for O(n) reservations
+// we only need to copy `Resources` once.
 Resources Resources::pushReservation(
     const Resource::ReservationInfo& reservation) const
 {
   Resources result;
 
-  foreach (Resource_ resource_, *this) {
-    resource_.resource.add_reservations()->CopyFrom(reservation);
-    CHECK_NONE(Resources::validate(resource_.resource));
-    result.add(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    Resource_ r_ = *resource_;
+    r_.resource.add_reservations()->CopyFrom(reservation);
+    CHECK_NONE(Resources::validate(r_.resource));
+    result.add(std::move(r_));
   }
 
   return result;
@@ -1638,10 +1653,11 @@ Resources Resources::popReservation() const
 {
   Resources result;
 
-  foreach (Resource_ resource_, resources) {
-    CHECK_GT(resource_.resource.reservations_size(), 0);
-    resource_.resource.mutable_reservations()->RemoveLast();
-    result.add(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    CHECK_GT(resource_->resource.reservations_size(), 0);
+    Resource_ r_ = *resource_;
+    r_.resource.mutable_reservations()->RemoveLast();
+    result.add(std::move(r_));
   }
 
   return result;
@@ -1652,9 +1668,14 @@ Resources Resources::toUnreserved() const
 {
   Resources result;
 
-  foreach (Resource_ resource_, *this) {
-    resource_.resource.clear_reservations();
-    result.add(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (isReserved(resource_->resource)) {
+      Resource_ r_ = *resource_;
+      r_.resource.clear_reservations();
+      result.add(std::move(r_));
+    } else {
+      result.add(resource_);
+    }
   }
 
   return result;
@@ -1665,15 +1686,15 @@ Resources Resources::createStrippedScalarQuantity() const
 {
   Resources stripped;
 
-  foreach (const Resource& resource, resources) {
-    if (resource.type() == Value::SCALAR) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.type() == Value::SCALAR) {
       Resource scalar;
 
-      scalar.set_name(resource.name());
-      scalar.set_type(resource.type());
-      scalar.mutable_scalar()->CopyFrom(resource.scalar());
+      scalar.set_name(resource_->resource.name());
+      scalar.set_type(resource_->resource.type());
+      scalar.mutable_scalar()->CopyFrom(resource_->resource.scalar());
 
-      stripped.add(scalar);
+      stripped.add(std::move(scalar));
     }
   }
 
@@ -1685,6 +1706,7 @@ Option<Resources> Resources::find(const Resources& targets) const
 {
   Resources total;
 
+  // TODO(mzhu): Traverse `Resource_` to preserve `sharedCount`.
   foreach (const Resource& target, targets) {
     Option<Resources> found = find(target);
 
@@ -1740,10 +1762,10 @@ Option<Value::Scalar> Resources::get(const string& name) const
   Value::Scalar total;
   bool found = false;
 
-  foreach (const Resource& resource, resources) {
-    if (resource.name() == name &&
-        resource.type() == Value::SCALAR) {
-      total += resource.scalar();
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.name() == name &&
+        resource_->resource.type() == Value::SCALAR) {
+      total += resource_->resource.scalar();
       found = true;
     }
   }
@@ -1762,10 +1784,10 @@ Option<Value::Set> Resources::get(const string& name) const
   Value::Set total;
   bool found = false;
 
-  foreach (const Resource& resource, resources) {
-    if (resource.name() == name &&
-        resource.type() == Value::SET) {
-      total += resource.set();
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.name() == name &&
+        resource_->resource.type() == Value::SET) {
+      total += resource_->resource.set();
       found = true;
     }
   }
@@ -1784,10 +1806,10 @@ Option<Value::Ranges> Resources::get(const string& name) const
   Value::Ranges total;
   bool found = false;
 
-  foreach (const Resource& resource, resources) {
-    if (resource.name() == name &&
-        resource.type() == Value::RANGES) {
-      total += resource.ranges();
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.name() == name &&
+        resource_->resource.type() == Value::RANGES) {
+      total += resource_->resource.ranges();
       found = true;
     }
   }
@@ -1819,8 +1841,8 @@ Resources Resources::scalars() const
 set<string> Resources::names() const
 {
   set<string> result;
-  foreach (const Resource& resource, resources) {
-    result.insert(resource.name());
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    result.insert(resource_->resource.name());
   }
 
   return result;
@@ -1830,8 +1852,8 @@ set<string> Resources::names() const
 map<string, Value_Type> Resources::types() const
 {
   map<string, Value_Type> result;
-  foreach (const Resource& resource, resources) {
-    result[resource.name()] = resource.type();
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    result[resource_->resource.name()] = resource_->resource.type();
   }
 
   return result;
@@ -1904,11 +1926,11 @@ Option<Value::Ranges> Resources::ephemeral_ports() const
 }
 
 
-Option<Resource> Resources::match(const Resource& resource) const
+Option<Resource> Resources::match(const Resource& that) const
 {
-  foreach (const Resource_& resource_, resources) {
-    if (compareResourceMetadata(resource_.resource, resource)) {
-      return resource_.resource;
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (compareResourceMetadata(resource_->resource, that)) {
+      return resource_->resource;
     }
   }
 
@@ -1921,8 +1943,8 @@ Option<Resource> Resources::match(const Resource& resource) const
 
 bool Resources::_contains(const Resource_& that) const
 {
-  foreach (const Resource_& resource_, resources) {
-    if (resource_.contains(that)) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->contains(that)) {
       return true;
     }
   }
@@ -1949,23 +1971,27 @@ Option<Resources> Resources::find(const Resource& target) const
   predicates.push_back([](const Resource&) { return true; });
 
   foreach (const auto& predicate, predicates) {
-    foreach (const Resource_& resource_, total.filter(predicate)) {
+    foreach (
+        const shared_ptr<Resource_>& resource_,
+        total.filter(predicate).resources) {
       // Need to `toUnreserved` to ignore the roles in contains().
-      Resources unreserved = Resources(resource_.resource).toUnreserved();
+      Resources unreserved;
+      unreserved.add(resource_);
+      unreserved = unreserved.toUnreserved();
 
       if (unreserved.contains(remaining)) {
         // The target has been found, return the result.
-        foreach (Resource_ r, remaining) {
-          r.resource.mutable_reservations()->CopyFrom(
-              resource_.resource.reservations());
-
-          found.add(r);
+        // TODO(mzhu): Traverse `Resource_` to preserve `sharedCount`.
+        foreach (Resource r, remaining) {
+          r.mutable_reservations()->CopyFrom(
+              resource_->resource.reservations());
+          found.add(std::move(r));
         }
 
         return found;
       } else if (remaining.contains(unreserved)) {
         found.add(resource_);
-        total.subtract(resource_);
+        total.subtract(*resource_);
         remaining -= unreserved;
         break;
       }
@@ -1983,8 +2009,8 @@ Option<Resources> Resources::find(const Resource& target) const
 Resources::operator RepeatedPtrField<Resource>() const
 {
   RepeatedPtrField<Resource> all;
-  foreach (const Resource& resource, resources) {
-    all.Add()->CopyFrom(resource);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    all.Add()->CopyFrom(resource_->resource);
   }
 
   return all;
@@ -2074,9 +2100,14 @@ void Resources::add(const Resource_& that)
   }
 
   bool found = false;
-  foreach (Resource_& resource_, resources) {
-    if (internal::addable(resource_.resource, that)) {
-      resource_ += that;
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    if (internal::addable(resource_->resource, that.resource)) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        resource_ = make_shared<Resource_>(*resource_);
+      }
+
+      *resource_ += that;
       found = true;
       break;
     }
@@ -2084,7 +2115,7 @@ void Resources::add(const Resource_& that)
 
   // Cannot be combined with any existing Resource object.
   if (!found) {
-    resources.push_back(that);
+    resources.push_back(make_shared<Resource_>(that));
   }
 }
 
@@ -2096,9 +2127,43 @@ void Resources::add(Resource_&& that)
   }
 
   bool found = false;
-  foreach (Resource_& resource_, resources) {
-    if (internal::addable(resource_.resource, that)) {
-      resource_ += that;
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    if (internal::addable(resource_->resource, that.resource)) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        that += *resource_;
+        resource_ = make_shared<Resource_>(std::move(that));
+      } else {
+        *resource_ += that;
+      }
+
+      found = true;
+      break;
+    }
+  }
+
+  // Cannot be combined with any existing Resource object.
+  if (!found) {
+    resources.push_back(make_shared<Resource_>(std::move(that)));
+  }
+}
+
+
+void Resources::add(const shared_ptr<Resource_>& that)
+{
+  if (that->isEmpty()) {
+    return;
+  }
+
+  bool found = false;
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    if (internal::addable(resource_->resource, that->resource)) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        resource_ = make_shared<Resource_>(*resource_);
+      }
+
+      *resource_ += *that;
       found = true;
       break;
     }
@@ -2106,7 +2171,7 @@ void Resources::add(Resource_&& that)
 
   // Cannot be combined with any existing Resource object.
   if (!found) {
-    resources.push_back(std::move(that));
+    resources.push_back(that);
   }
 }
 
@@ -2149,7 +2214,7 @@ Resources& Resources::operator+=(Resource&& that)
 
 Resources& Resources::operator+=(const Resources& that)
 {
-  foreach (const Resource_& resource_, that) {
+  foreach (const shared_ptr<Resource_>& resource_, that.resources) {
     add(resource_);
   }
 
@@ -2159,7 +2224,7 @@ Resources& Resources::operator+=(const Resources& that)
 
 Resources& Resources::operator+=(Resources&& that)
 {
-  foreach (Resource_& resource_, that.resources) {
+  foreach (const shared_ptr<Resource_>& resource_, that.resources) {
     add(std::move(resource_));
   }
 
@@ -2190,10 +2255,15 @@ void Resources::subtract(const Resource_& that)
   }
 
   for (size_t i = 0; i < resources.size(); i++) {
-    Resource_& resource_ = resources[i];
+    shared_ptr<Resource_>& resource_ = resources[i];
+
+    if (internal::subtractable(resource_->resource, that)) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        resource_ = make_shared<Resource_>(*resource_);
+      }
 
-    if (internal::subtractable(resource_.resource, that)) {
-      resource_ -= that;
+      *resource_ -= that;
 
       // Remove the resource if it has become negative or empty.
       // Note that a negative resource means the caller is
@@ -2205,11 +2275,11 @@ void Resources::subtract(const Resource_& that)
       // A "negative" Resource_ either has a negative sharedCount or
       // a negative scalar value.
       bool negative =
-        (resource_.isShared() && resource_.sharedCount.get() < 0) ||
-        (resource_.resource.type() == Value::SCALAR &&
-         resource_.resource.scalar().value() < 0);
+        (resource_->isShared() && resource_->sharedCount.get() < 0) ||
+        (resource_->resource.type() == Value::SCALAR &&
+         resource_->resource.scalar().value() < 0);
 
-      if (negative || resource_.isEmpty()) {
+      if (negative || resource_->isEmpty()) {
         // As `resources` is not ordered, and erasing an element
         // from the middle is expensive, we swap with the last element
         // and then shrink the vector by one.
@@ -2243,8 +2313,8 @@ Resources& Resources::operator-=(const Resource& that)
 
 Resources& Resources::operator-=(const Resources& that)
 {
-  foreach (const Resource_& resource_, that) {
-    subtract(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, that.resources) {
+    subtract(*resource_);
   }
 
   return *this;
diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp
index 228a732..49f3e0f 100644
--- a/src/v1/resources.cpp
+++ b/src/v1/resources.cpp
@@ -41,9 +41,11 @@
 
 #include "common/resources_utils.hpp"
 
+using std::make_shared;
 using std::map;
 using std::ostream;
 using std::set;
+using std::shared_ptr;
 using std::string;
 using std::vector;
 
@@ -1486,16 +1488,16 @@ bool Resources::contains(const Resources& that) const
 {
   Resources remaining = *this;
 
-  foreach (const Resource_& resource_, that.resources) {
+  foreach (const shared_ptr<Resource_>& resource_, that.resources) {
     // NOTE: We use _contains because Resources only contain valid
     // Resource objects, and we don't want the performance hit of the
     // validity check.
-    if (!remaining._contains(resource_)) {
+    if (!remaining._contains(*resource_)) {
       return false;
     }
 
-    if (isPersistentVolume(resource_.resource)) {
-      remaining.subtract(resource_);
+    if (isPersistentVolume(resource_->resource)) {
+      remaining.subtract(*resource_);
     }
   }
 
@@ -1514,11 +1516,11 @@ bool Resources::contains(const Resource& that) const
 
 size_t Resources::count(const Resource& that) const
 {
-  foreach (const Resource_& resource_, resources) {
-    if (resource_.resource == that) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource == that) {
       // Return 1 for non-shared resources because non-shared
       // Resource objects in Resources are unique.
-      return resource_.isShared() ? resource_.sharedCount.get() : 1;
+      return resource_->isShared() ? CHECK_NOTNONE(resource_->sharedCount) : 1;
     }
   }
 
@@ -1528,17 +1530,25 @@ size_t Resources::count(const Resource& that) const
 
 void Resources::allocate(const string& role)
 {
-  foreach (Resource_& resource_, resources) {
-    resource_.resource.mutable_allocation_info()->set_role(role);
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    // Copy-on-write (if more than 1 reference).
+    if (resource_.use_count() > 1) {
+      resource_ = make_shared<Resource_>(*resource_);
+    }
+    resource_->resource.mutable_allocation_info()->set_role(role);
   }
 }
 
 
 void Resources::unallocate()
 {
-  foreach (Resource_& resource_, resources) {
-    if (resource_.resource.has_allocation_info()) {
-      resource_.resource.clear_allocation_info();
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.has_allocation_info()) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        resource_ = make_shared<Resource_>(*resource_);
+      }
+      resource_->resource.clear_allocation_info();
     }
   }
 }
@@ -1548,8 +1558,10 @@ Resources Resources::filter(
     const lambda::function<bool(const Resource&)>& predicate) const
 {
   Resources result;
-  foreach (const Resource_& resource_, resources) {
-    if (predicate(resource_.resource)) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (predicate(resource_->resource)) {
+      // TODO(mzhu): `add` is O(n), investigate  whether we can simply
+      // do `push_back` by assuming resources are already combined.
       result.add(resource_);
     }
   }
@@ -1561,9 +1573,9 @@ hashmap<string, Resources> Resources::reservations() const
 {
   hashmap<string, Resources> result;
 
-  foreach (const Resource_& resource_, resources) {
-    if (isReserved(resource_.resource)) {
-      result[reservationRole(resource_.resource)].add(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (isReserved(resource_->resource)) {
+      result[reservationRole(resource_->resource)].add(resource_);
     }
   }
 
@@ -1625,12 +1637,12 @@ hashmap<string, Resources> Resources::allocations() const
 {
   hashmap<string, Resources> result;
 
-  foreach (const Resource_& resource_, resources) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
     // We require that this is called only when
     // the resources are allocated.
-    CHECK(resource_.resource.has_allocation_info());
-    CHECK(resource_.resource.allocation_info().has_role());
-    result[resource_.resource.allocation_info().role()].add(resource_);
+    CHECK(resource_->resource.has_allocation_info());
+    CHECK(resource_->resource.allocation_info().has_role());
+    result[resource_->resource.allocation_info().role()].add(resource_);
   }
 
   return result;
@@ -1642,10 +1654,11 @@ Resources Resources::pushReservation(
 {
   Resources result;
 
-  foreach (Resource_ resource_, *this) {
-    resource_.resource.add_reservations()->CopyFrom(reservation);
-    CHECK_NONE(Resources::validate(resource_.resource));
-    result.add(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    Resource_ r_ = *resource_;
+    r_.resource.add_reservations()->CopyFrom(reservation);
+    CHECK_NONE(Resources::validate(r_.resource));
+    result.add(std::move(r_));
   }
 
   return result;
@@ -1656,10 +1669,11 @@ Resources Resources::popReservation() const
 {
   Resources result;
 
-  foreach (Resource_ resource_, resources) {
-    CHECK_GT(resource_.resource.reservations_size(), 0);
-    resource_.resource.mutable_reservations()->RemoveLast();
-    result.add(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    CHECK_GT(resource_->resource.reservations_size(), 0);
+    Resource_ r_ = *resource_;
+    r_.resource.mutable_reservations()->RemoveLast();
+    result.add(std::move(r_));
   }
 
   return result;
@@ -1670,9 +1684,14 @@ Resources Resources::toUnreserved() const
 {
   Resources result;
 
-  foreach (Resource_ resource_, *this) {
-    resource_.resource.clear_reservations();
-    result.add(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (isReserved(resource_->resource)) {
+      Resource_ r_ = *resource_;
+      r_.resource.clear_reservations();
+      result.add(std::move(r_));
+    } else {
+      result.add(resource_);
+    }
   }
 
   return result;
@@ -1683,15 +1702,15 @@ Resources Resources::createStrippedScalarQuantity() const
 {
   Resources stripped;
 
-  foreach (const Resource& resource, resources) {
-    if (resource.type() == Value::SCALAR) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.type() == Value::SCALAR) {
       Resource scalar;
 
-      scalar.set_name(resource.name());
-      scalar.set_type(resource.type());
-      scalar.mutable_scalar()->CopyFrom(resource.scalar());
+      scalar.set_name(resource_->resource.name());
+      scalar.set_type(resource_->resource.type());
+      scalar.mutable_scalar()->CopyFrom(resource_->resource.scalar());
 
-      stripped.add(scalar);
+      stripped.add(std::move(scalar));
     }
   }
 
@@ -1758,10 +1777,10 @@ Option<Value::Scalar> Resources::get(const string& name) const
   Value::Scalar total;
   bool found = false;
 
-  foreach (const Resource& resource, resources) {
-    if (resource.name() == name &&
-        resource.type() == Value::SCALAR) {
-      total += resource.scalar();
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.name() == name &&
+        resource_->resource.type() == Value::SCALAR) {
+      total += resource_->resource.scalar();
       found = true;
     }
   }
@@ -1780,10 +1799,10 @@ Option<Value::Set> Resources::get(const string& name) const
   Value::Set total;
   bool found = false;
 
-  foreach (const Resource& resource, resources) {
-    if (resource.name() == name &&
-        resource.type() == Value::SET) {
-      total += resource.set();
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.name() == name &&
+        resource_->resource.type() == Value::SET) {
+      total += resource_->resource.set();
       found = true;
     }
   }
@@ -1802,10 +1821,10 @@ Option<Value::Ranges> Resources::get(const string& name) const
   Value::Ranges total;
   bool found = false;
 
-  foreach (const Resource& resource, resources) {
-    if (resource.name() == name &&
-        resource.type() == Value::RANGES) {
-      total += resource.ranges();
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->resource.name() == name &&
+        resource_->resource.type() == Value::RANGES) {
+      total += resource_->resource.ranges();
       found = true;
     }
   }
@@ -1837,8 +1856,8 @@ Resources Resources::scalars() const
 set<string> Resources::names() const
 {
   set<string> result;
-  foreach (const Resource& resource, resources) {
-    result.insert(resource.name());
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    result.insert(resource_->resource.name());
   }
 
   return result;
@@ -1848,8 +1867,8 @@ set<string> Resources::names() const
 map<string, Value_Type> Resources::types() const
 {
   map<string, Value_Type> result;
-  foreach (const Resource& resource, resources) {
-    result[resource.name()] = resource.type();
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    result[resource_->resource.name()] = resource_->resource.type();
   }
 
   return result;
@@ -1928,8 +1947,8 @@ Option<Value::Ranges> Resources::ephemeral_ports() const
 
 bool Resources::_contains(const Resource_& that) const
 {
-  foreach (const Resource_& resource_, resources) {
-    if (resource_.contains(that)) {
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    if (resource_->contains(that)) {
       return true;
     }
   }
@@ -1956,23 +1975,26 @@ Option<Resources> Resources::find(const Resource& target) const
   predicates.push_back([](const Resource&) { return true; });
 
   foreach (const auto& predicate, predicates) {
-    foreach (const Resource_& resource_, total.filter(predicate)) {
+    foreach (
+        const shared_ptr<Resource_>& resource_,
+        total.filter(predicate).resources) {
       // Need to `toUnreserved` to ignore the roles in contains().
-      Resources unreserved = Resources(resource_.resource).toUnreserved();
+      Resources unreserved;
+      unreserved.add(resource_);
+      unreserved = unreserved.toUnreserved();
 
       if (unreserved.contains(remaining)) {
         // The target has been found, return the result.
-        foreach (Resource_ r, remaining) {
-          r.resource.mutable_reservations()->CopyFrom(
-              resource_.resource.reservations());
-
-          found.add(r);
+        foreach (Resource r, remaining) {
+          r.mutable_reservations()->CopyFrom(
+              resource_->resource.reservations());
+          found.add(std::move(r));
         }
 
         return found;
       } else if (remaining.contains(unreserved)) {
         found.add(resource_);
-        total.subtract(resource_);
+        total.subtract(*resource_);
         remaining -= unreserved;
         break;
       }
@@ -1990,8 +2012,8 @@ Option<Resources> Resources::find(const Resource& target) const
 Resources::operator RepeatedPtrField<Resource>() const
 {
   RepeatedPtrField<Resource> all;
-  foreach (const Resource& resource, resources) {
-    all.Add()->CopyFrom(resource);
+  foreach (const shared_ptr<Resource_>& resource_, resources) {
+    all.Add()->CopyFrom(resource_->resource);
   }
 
   return all;
@@ -2081,9 +2103,14 @@ void Resources::add(const Resource_& that)
   }
 
   bool found = false;
-  foreach (Resource_& resource_, resources) {
-    if (internal::addable(resource_.resource, that)) {
-      resource_ += that;
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    if (internal::addable(resource_->resource, that.resource)) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        resource_ = make_shared<Resource_>(*resource_);
+      }
+
+      *resource_ += that;
       found = true;
       break;
     }
@@ -2091,7 +2118,7 @@ void Resources::add(const Resource_& that)
 
   // Cannot be combined with any existing Resource object.
   if (!found) {
-    resources.push_back(that);
+    resources.push_back(make_shared<Resource_>(that));
   }
 }
 
@@ -2103,9 +2130,16 @@ void Resources::add(Resource_&& that)
   }
 
   bool found = false;
-  foreach (Resource_& resource_, resources) {
-    if (internal::addable(resource_.resource, that)) {
-      resource_ += that;
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    if (internal::addable(resource_->resource, that.resource)) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        that += *resource_;
+        resource_ = make_shared<Resource_>(std::move(that));
+      } else {
+        *resource_ += that;
+      }
+
       found = true;
       break;
     }
@@ -2113,7 +2147,34 @@ void Resources::add(Resource_&& that)
 
   // Cannot be combined with any existing Resource object.
   if (!found) {
-    resources.push_back(std::move(that));
+    resources.push_back(make_shared<Resource_>(std::move(that)));
+  }
+}
+
+
+void Resources::add(const shared_ptr<Resource_>& that)
+{
+  if (that->isEmpty()) {
+    return;
+  }
+
+  bool found = false;
+  foreach (shared_ptr<Resource_>& resource_, resources) {
+    if (internal::addable(resource_->resource, that->resource)) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        resource_ = make_shared<Resource_>(*resource_);
+      }
+
+      *resource_ += *that;
+      found = true;
+      break;
+    }
+  }
+
+  // Cannot be combined with any existing Resource object.
+  if (!found) {
+    resources.push_back(that);
   }
 }
 
@@ -2156,7 +2217,7 @@ Resources& Resources::operator+=(Resource&& that)
 
 Resources& Resources::operator+=(const Resources& that)
 {
-  foreach (const Resource_& resource_, that) {
+  foreach (const shared_ptr<Resource_>& resource_, that.resources) {
     add(resource_);
   }
 
@@ -2166,7 +2227,7 @@ Resources& Resources::operator+=(const Resources& that)
 
 Resources& Resources::operator+=(Resources&& that)
 {
-  foreach (Resource_& resource_, that.resources) {
+  foreach (const shared_ptr<Resource_>& resource_, that.resources) {
     add(std::move(resource_));
   }
 
@@ -2197,10 +2258,15 @@ void Resources::subtract(const Resource_& that)
   }
 
   for (size_t i = 0; i < resources.size(); i++) {
-    Resource_& resource_ = resources[i];
+    shared_ptr<Resource_>& resource_ = resources[i];
+
+    if (internal::subtractable(resource_->resource, that)) {
+      // Copy-on-write (if more than 1 reference).
+      if (resource_.use_count() > 1) {
+        resource_ = make_shared<Resource_>(*resource_);
+      }
 
-    if (internal::subtractable(resource_.resource, that)) {
-      resource_ -= that;
+      *resource_ -= that;
 
       // Remove the resource if it has become negative or empty.
       // Note that a negative resource means the caller is
@@ -2212,11 +2278,11 @@ void Resources::subtract(const Resource_& that)
       // A "negative" Resource_ either has a negative sharedCount or
       // a negative scalar value.
       bool negative =
-        (resource_.isShared() && resource_.sharedCount.get() < 0) ||
-        (resource_.resource.type() == Value::SCALAR &&
-         resource_.resource.scalar().value() < 0);
+        (resource_->isShared() && resource_->sharedCount.get() < 0) ||
+        (resource_->resource.type() == Value::SCALAR &&
+         resource_->resource.scalar().value() < 0);
 
-      if (negative || resource_.isEmpty()) {
+      if (negative || resource_->isEmpty()) {
         // As `resources` is not ordered, and erasing an element
         // from the middle is expensive, we swap with the last element
         // and then shrink the vector by one.
@@ -2250,8 +2316,8 @@ Resources& Resources::operator-=(const Resource& that)
 
 Resources& Resources::operator-=(const Resources& that)
 {
-  foreach (const Resource_& resource_, that) {
-    subtract(resource_);
+  foreach (const shared_ptr<Resource_>& resource_, that.resources) {
+    subtract(*resource_);
   }
 
   return *this;