You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/02/02 22:32:16 UTC

mesos git commit: Updated Resources::apply to handle Resource.AllocationInfo.

Repository: mesos
Updated Branches:
  refs/heads/master 074113537 -> 36d191b02


Updated Resources::apply to handle Resource.AllocationInfo.

Note that previously, `Resource` did not contain the
`AllocationInfo` field.

For backwards compatibility with old schedulers and tooling,
the master injects the allocation info into operations to
ensure that operations can apply correctly to the offered
resources (which are allocated).

When applying these operations with allocated resources to
resources that are unallocated (e.g. applying an operation
to update the agent's total resources), the caller must either
strip the allocation info from the operations prior to applying,
or the apply code needs to ignore the allocation info. For now,
we took the approach of ignoring the allocation info within the
`apply()` function in order to simplify the call-sites.

Note that we assume no `Resources` store a mix of allocated and
unallocated resources, which is brittle and enforcement of this
invariant should be added.

We should consider moving the adjustment code from
`Resources::apply()` up into the call sites. We do this already
via injecting the allocation info into operations, but we don't
have a helper for stripping the allocation info from operations.

Review: https://reviews.apache.org/r/55828


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/36d191b0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/36d191b0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/36d191b0

Branch: refs/heads/master
Commit: 36d191b02907907b9f0c3154213f82bc1039d902
Parents: 0741135
Author: Benjamin Mahler <bm...@apache.org>
Authored: Sat Jan 21 22:50:09 2017 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Feb 2 14:31:45 2017 -0800

----------------------------------------------------------------------
 src/common/resources.cpp      | 100 ++++++++++++++++++++++++++++++++-----
 src/tests/resources_tests.cpp |  77 ++++++++++++++++++++++++++++
 src/tests/resources_utils.cpp |  14 ++++++
 src/tests/resources_utils.hpp |  16 ++++++
 src/v1/resources.cpp          |  97 ++++++++++++++++++++++++++++++-----
 5 files changed, 279 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/36d191b0/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index cdf6a84..388e3ef 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -1254,6 +1254,55 @@ Option<Resources> Resources::find(const Resources& targets) const
 
 Try<Resources> Resources::apply(const Offer::Operation& operation) const
 {
+  // Note that previously, `Resource` did not contain the
+  // `AllocationInfo` field.
+  //
+  // For backwards compatibility with old schedulers and
+  // tooling, the master injects the allocation info into
+  // operations to ensure that operations can apply correctly
+  // to the offered resources (which are allocated).
+  //
+  // When applying these operations with allocated resources
+  // to resources that are unallocated (e.g. applying an operation
+  // to update the agent's total resources), the caller must
+  // either strip the allocation info from the operations prior
+  // to applying, or the apply code needs to ignore the allocation
+  // info. For now, we took the approach of ignoring the
+  // allocation info within the `apply()` function in order to
+  // simplify the call-sites.
+  //
+  // Note that we assume no `Resources` store a mix of
+  // allocated and unallocated resources, which is brittle
+  // and enforcement of this invariant should be added.
+  //
+  // TODO(bmahler): Consider removing the adjustment code
+  // here. The call sites that apply operations with
+  // allocated resources to resources that are unallocated
+  // would need to be updated to manually strip the
+  // allocation info from the operation prior to applying.
+
+  const bool isAllocated = [](const Resources& resources) {
+    foreach (const Resource& resource, resources) {
+      if (resource.has_allocation_info()) {
+        return true;
+      }
+    }
+    return false;
+  }(*this);
+
+  // Returns a resource adjusted per the comment above: if the
+  // operation's resource is allocated and the operation is
+  // being applied to unallocated resources, the allocation
+  // info in the operation's resource is stripped.
+  auto adjustedResource = [isAllocated](Resource operationResource)
+      -> Try<Resource> {
+    if (operationResource.has_allocation_info() && !isAllocated) {
+      operationResource.clear_allocation_info();
+    }
+
+    return operationResource;
+  };
+
   Resources result = *this;
 
   switch (operation.type()) {
@@ -1278,15 +1327,22 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
           return Error("Invalid RESERVE Operation: Missing 'reservation'");
         }
 
-        Resources unreserved = Resources(reserved).flatten();
+        Try<Resource> adjustedReservation = adjustedResource(reserved);
+        if (adjustedReservation.isError()) {
+          return Error("Invalid RESERVE Operation: " +
+                       adjustedReservation.error());
+        }
+
+        Resources unreserved = Resources(adjustedReservation.get()).flatten();
 
         if (!result.contains(unreserved)) {
           return Error("Invalid RESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(unreserved));
+                       " does not contain " +
+                       stringify(adjustedReservation.get()));
         }
 
         result -= unreserved;
-        result.add(reserved);
+        result.add(adjustedReservation.get());
       }
       break;
     }
@@ -1304,14 +1360,21 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
           return Error("Invalid UNRESERVE Operation: Missing 'reservation'");
         }
 
-        if (!result.contains(reserved)) {
+        Try<Resource> adjustedReservation = adjustedResource(reserved);
+        if (adjustedReservation.isError()) {
+          return Error("Invalid UNRESERVE Operation: " +
+                       adjustedReservation.error());
+        }
+
+        if (!result.contains(adjustedReservation.get())) {
           return Error("Invalid UNRESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(reserved));
+                       " does not contain " +
+                       stringify(adjustedReservation.get()));
         }
 
-        Resources unreserved = Resources(reserved).flatten();
+        Resources unreserved = Resources(adjustedReservation.get()).flatten();
 
-        result.subtract(reserved);
+        result.subtract(adjustedReservation.get());
         result += unreserved;
       }
       break;
@@ -1330,13 +1393,18 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
           return Error("Invalid CREATE Operation: Missing 'persistence'");
         }
 
+        Try<Resource> adjustedVolume = adjustedResource(volume);
+        if (adjustedVolume.isError()) {
+          return Error("Invalid CREATE Operation: " + adjustedVolume.error());
+        }
+
         // Strip persistence and volume from the disk info so that we
         // can subtract it from the original resources.
         // TODO(jieyu): Non-persistent volumes are not supported for
         // now. Persistent volumes can only be be created from regular
         // disk resources. Revisit this once we start to support
         // non-persistent volumes.
-        Resource stripped = volume;
+        Resource stripped = adjustedVolume.get();
 
         if (stripped.disk().has_source()) {
           stripped.mutable_disk()->clear_persistence();
@@ -1351,11 +1419,12 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
 
         if (!result.contains(stripped)) {
           return Error("Invalid CREATE Operation: Insufficient disk resources"
-                       " for persistent volume " + stringify(volume));
+                       " for persistent volume " +
+                       stringify(adjustedVolume.get()));
         }
 
         result.subtract(stripped);
-        result.add(volume);
+        result.add(adjustedVolume.get());
       }
       break;
     }
@@ -1373,14 +1442,19 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
           return Error("Invalid DESTROY Operation: Missing 'persistence'");
         }
 
-        if (!result.contains(volume)) {
+        Try<Resource> adjustedVolume = adjustedResource(volume);
+        if (adjustedVolume.isError()) {
+          return Error("Invalid DESTROY Operation: " + adjustedVolume.error());
+        }
+
+        if (!result.contains(adjustedVolume.get())) {
           return Error(
               "Invalid DESTROY Operation: Persistent volume does not exist");
         }
 
         // Strip persistence and volume from the disk info so that we
         // can subtract it from the original resources.
-        Resource stripped = volume;
+        Resource stripped = adjustedVolume.get();
 
         if (stripped.disk().has_source()) {
           stripped.mutable_disk()->clear_persistence();
@@ -1393,7 +1467,7 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
         // return the resource to non-shared state after destroy.
         stripped.clear_shared();
 
-        result.subtract(volume);
+        result.subtract(adjustedVolume.get());
         result.add(stripped);
       }
       break;

http://git-wip-us.apache.org/repos/asf/mesos/blob/36d191b0/src/tests/resources_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resources_tests.cpp b/src/tests/resources_tests.cpp
index 33f0ee0..2bdce3c 100644
--- a/src/tests/resources_tests.cpp
+++ b/src/tests/resources_tests.cpp
@@ -2392,6 +2392,83 @@ TEST(ResourcesOperationTest, CreateSharedPersistentVolume)
 }
 
 
+// Operations can contain allocated or unallocated resources.
+// When applying an operation, `Resources::apply()` will handle
+// the case where the operation contains allocated resources and
+// is being applied to unallocated resources. Here the allocation
+// info is simply ignored.
+//
+// TODO(bmahler): Instead of having `Resources::apply()` handle
+// this, consider requiring that callers strip the allocation
+// info when applying an operation on allocated resources to
+// resources that are unallocated.
+TEST(ResourcesOperationTest, ApplyWithAllocationInfo)
+{
+  auto unallocated = [](const Resources& resources) {
+    Resources result = resources;
+    result.unallocate();
+    return result;
+  };
+
+  // Apply a RESERVE -> CREATE -> DESTROY -> UNRESERVE sequence
+  // of operations with allocated resources to unallocated resources.
+  Resources total = Resources::parse("cpus:1;mem:500;disk:1000").get();
+
+  // Apply the RESERVE with allocated resources.
+  Offer::Operation reserve;
+  reserve.set_type(Offer::Operation::RESERVE);
+  reserve.mutable_reserve()->mutable_resources()->CopyFrom(total);
+
+  for (int i = 0; i < reserve.reserve().resources_size(); ++i) {
+    Resource* resource = reserve.mutable_reserve()->mutable_resources(i);
+
+    resource->mutable_reservation();
+    resource->set_role("role");
+
+    resource->mutable_allocation_info()->set_role("role");
+  }
+
+  Try<Resources> applied = total.apply(reserve);
+  EXPECT_SOME_EQ(unallocated(reserve.reserve().resources()), applied);
+
+  // Apply the CREATE with allocated resources.
+  Resources disk = Resources(reserve.reserve().resources()).filter(
+      [](const Resource& r) { return r.name() == "disk"; });
+  Resources nonDisk = Resources(reserve.reserve().resources()).filter(
+      [](const Resource& r) { return r.name() != "disk"; });
+
+  ASSERT_EQ(1u, disk.size());
+
+  Resource volume = *disk.begin();
+  volume.mutable_disk()->mutable_persistence()->set_id("id");
+
+  Offer::Operation create;
+  create.set_type(Offer::Operation::CREATE);
+  create.mutable_create()->add_volumes()->CopyFrom(volume);
+
+  applied = applied->apply(create);
+  EXPECT_SOME_EQ(unallocated(nonDisk + volume), applied);
+
+  // Apply the DESTROY with allocated resources.
+  Offer::Operation destroy;
+  destroy.set_type(Offer::Operation::DESTROY);
+  destroy.mutable_destroy()->mutable_volumes()->CopyFrom(
+      create.create().volumes());
+
+  applied = applied->apply(destroy);
+  EXPECT_SOME_EQ(unallocated(reserve.reserve().resources()), applied);
+
+  // Apply the UNRESERVE with allocated resources.
+  Offer::Operation unreserve;
+  unreserve.set_type(Offer::Operation::UNRESERVE);
+  unreserve.mutable_unreserve()->mutable_resources()->CopyFrom(
+      reserve.reserve().resources());
+
+  applied = applied->apply(unreserve);
+  EXPECT_SOME_EQ(total, applied);
+}
+
+
 TEST(ResourcesOperationTest, FlattenResources)
 {
   Resources unreservedCpus = Resources::parse("cpus:1").get();

http://git-wip-us.apache.org/repos/asf/mesos/blob/36d191b0/src/tests/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resources_utils.cpp b/src/tests/resources_utils.cpp
index be1feb9..2cef55f 100644
--- a/src/tests/resources_utils.cpp
+++ b/src/tests/resources_utils.cpp
@@ -14,6 +14,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <string>
+
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
@@ -22,10 +24,22 @@
 
 #include "tests/resources_utils.hpp"
 
+using std::string;
+
 namespace mesos {
 namespace internal {
 namespace tests {
 
+Resources allocatedResources(
+    const Resources& resources,
+    const string& role)
+{
+  Resources result = resources;
+  result.allocate(role);
+  return result;
+}
+
+
 Resource createPorts(const ::mesos::Value::Ranges& ranges)
 {
   Value value;

http://git-wip-us.apache.org/repos/asf/mesos/blob/36d191b0/src/tests/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/tests/resources_utils.hpp b/src/tests/resources_utils.hpp
index 18dcca7..1f41f02 100644
--- a/src/tests/resources_utils.hpp
+++ b/src/tests/resources_utils.hpp
@@ -17,6 +17,9 @@
 #ifndef __TESTS_RESOURCES_UTILS_HPP__
 #define __TESTS_RESOURCES_UTILS_HPP__
 
+#include <string>
+#include <ostream>
+
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
@@ -26,6 +29,19 @@ namespace mesos {
 namespace internal {
 namespace tests {
 
+// Returns a copy of the resources that are allocated to the role.
+//
+// TODO(bmahler): Consider adding a top-level `AllocatedResources`
+// that has the invariant that all resources contained within it
+// have an `AllocationInfo` set. This class could prevent
+// malformed operations between `Resources` and
+// `AllocatedResources`, and could clarify interfaces that take
+// allocated resources (e.g. allocator, sorter, etc).
+Resources allocatedResources(
+    const Resources& resources,
+    const std::string& role);
+
+
 // Creates a "ports(*)" resource for the given ranges.
 Resource createPorts(const ::mesos::Value::Ranges& ranges);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/36d191b0/src/v1/resources.cpp
----------------------------------------------------------------------
diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp
index 6fd8687..e47c4d4 100644
--- a/src/v1/resources.cpp
+++ b/src/v1/resources.cpp
@@ -1243,6 +1243,55 @@ Option<Resources> Resources::find(const Resources& targets) const
 
 Try<Resources> Resources::apply(const Offer::Operation& operation) const
 {
+  // Note that previously, `Resource` did not contain the
+  // `AllocationInfo` field.
+  //
+  // For backwards compatibility with old schedulers and
+  // tooling, the master injects the allocation info into
+  // operations to ensure that operations can apply correctly
+  // to the offered resources (which are allocated).
+  //
+  // When applying these operations with allocated resources
+  // to resources that are unallocated (e.g. applying an operation
+  // to update the agent's total resources), the caller must
+  // either strip the allocation info from the operations prior
+  // to applying, or the apply code needs to ignore the allocation
+  // info. For now, we took the approach of ignoring the
+  // allocation info within the `apply()` function in order to
+  // simplify the call-sites.
+  //
+  // Note that we assume no `Resources` store a mix of
+  // allocated and unallocated resources, which is brittle
+  // and enforcement of this invariant should be added.
+  //
+  // TODO(bmahler): Consider removing the adjustment code
+  // here. The call sites that apply operations with
+  // allocated resources to resources that are unallocated
+  // would need to be updated to manually strip the
+  // allocation info from the operation prior to applying.
+
+  const bool isAllocated = [](const Resources& resources) {
+    foreach (const Resource& resource, resources) {
+      if (resource.has_allocation_info()) {
+        return true;
+      }
+    }
+    return false;
+  }(*this);
+
+  // Returns a resource adjusted per the comment above: if the
+  // operation's resource is allocated and the operation is
+  // being applied to unallocated resources, the allocation
+  // info in the operation's resource is stripped.
+  auto adjustedResource = [isAllocated](Resource operationResource)
+      -> Try<Resource> {
+    if (operationResource.has_allocation_info() && !isAllocated) {
+      operationResource.clear_allocation_info();
+    }
+
+    return operationResource;
+  };
+
   Resources result = *this;
 
   switch (operation.type()) {
@@ -1267,7 +1316,13 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
           return Error("Invalid RESERVE Operation: Missing 'reservation'");
         }
 
-        Resources unreserved = Resources(reserved).flatten();
+        Try<Resource> adjustedReservation = adjustedResource(reserved);
+        if (adjustedReservation.isError()) {
+          return Error("Invalid RESERVE Operation: " +
+                       adjustedReservation.error());
+        }
+
+        Resources unreserved = Resources(adjustedReservation.get()).flatten();
 
         if (!result.contains(unreserved)) {
           return Error("Invalid RESERVE Operation: " + stringify(result) +
@@ -1275,7 +1330,7 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
         }
 
         result -= unreserved;
-        result.add(reserved);
+        result.add(adjustedReservation.get());
       }
       break;
     }
@@ -1293,14 +1348,21 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
           return Error("Invalid UNRESERVE Operation: Missing 'reservation'");
         }
 
-        if (!result.contains(reserved)) {
+        Try<Resource> adjustedReservation = adjustedResource(reserved);
+        if (adjustedReservation.isError()) {
+          return Error("Invalid UNRESERVE Operation: " +
+                       adjustedReservation.error());
+        }
+
+        if (!result.contains(adjustedReservation.get())) {
           return Error("Invalid UNRESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(reserved));
+                       " does not contain " +
+                       stringify(adjustedReservation.get()));
         }
 
-        Resources unreserved = Resources(reserved).flatten();
+        Resources unreserved = Resources(adjustedReservation.get()).flatten();
 
-        result.subtract(reserved);
+        result.subtract(adjustedReservation.get());
         result += unreserved;
       }
       break;
@@ -1319,13 +1381,18 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
           return Error("Invalid CREATE Operation: Missing 'persistence'");
         }
 
+        Try<Resource> adjustedVolume = adjustedResource(volume);
+        if (adjustedVolume.isError()) {
+          return Error("Invalid CREATE Operation: " + adjustedVolume.error());
+        }
+
         // Strip persistence and volume from the disk info so that we
         // can subtract it from the original resources.
         // TODO(jieyu): Non-persistent volumes are not supported for
         // now. Persistent volumes can only be be created from regular
         // disk resources. Revisit this once we start to support
         // non-persistent volumes.
-        Resource stripped = volume;
+        Resource stripped = adjustedVolume.get();
 
         if (stripped.disk().has_source()) {
           stripped.mutable_disk()->clear_persistence();
@@ -1340,11 +1407,12 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
 
         if (!result.contains(stripped)) {
           return Error("Invalid CREATE Operation: Insufficient disk resources"
-                       " for persistent volume " + stringify(volume));
+                       " for persistent volume " +
+                       stringify(adjustedVolume.get()));
         }
 
         result.subtract(stripped);
-        result.add(volume);
+        result.add(adjustedVolume.get());
       }
       break;
     }
@@ -1362,14 +1430,19 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
           return Error("Invalid DESTROY Operation: Missing 'persistence'");
         }
 
-        if (!result.contains(volume)) {
+        Try<Resource> adjustedVolume = adjustedResource(volume);
+        if (adjustedVolume.isError()) {
+          return Error("Invalid DESTROY Operation: " + adjustedVolume.error());
+        }
+
+        if (!result.contains(adjustedVolume.get())) {
           return Error(
               "Invalid DESTROY Operation: Persistent volume does not exist");
         }
 
         // Strip persistence and volume from the disk info so that we
         // can subtract it from the original resources.
-        Resource stripped = volume;
+        Resource stripped = adjustedVolume.get();
 
         if (stripped.disk().has_source()) {
           stripped.mutable_disk()->clear_persistence();
@@ -1382,7 +1455,7 @@ Try<Resources> Resources::apply(const Offer::Operation& operation) const
         // return the resource to non-shared state after destroy.
         stripped.clear_shared();
 
-        result.subtract(volume);
+        result.subtract(adjustedVolume.get());
         result.add(stripped);
       }
       break;