You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/04 01:18:24 UTC

[01/13] mesos git commit: Ensured that agent does not delete volume upon grow or shrink.

Repository: mesos
Updated Branches:
  refs/heads/master 2402f99ca -> a483fb0d0


Ensured that agent does not delete volume upon grow or shrink.

Previously, `slave::syncCheckpointedResources` implementation will
delete a persistent volume using `Resources::contains` check, which
could cause a resized volume being deleted. The function was rewritten
to compare `set_difference` between old and new paths for all persistent
volumes and perform creation/deletion accordingly.

Review: https://reviews.apache.org/r/66218/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/57e705a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/57e705a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/57e705a4

Branch: refs/heads/master
Commit: 57e705a475ad03bfbef2605b3573a601239e6242
Parents: 2402f99
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:07 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:07 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 46 +++++++++++++++++++++++++++++-----------------
 1 file changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/57e705a4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6ca3d79..69280d9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -4231,8 +4231,31 @@ void Slave::checkpointResourcesMessage(
 Try<Nothing> Slave::syncCheckpointedResources(
     const Resources& newCheckpointedResources)
 {
-  Resources oldVolumes = checkpointedResources.persistentVolumes();
-  Resources newVolumes = newCheckpointedResources.persistentVolumes();
+  auto toPathMap = [](const string& workDir, const Resources& resources) {
+    hashmap<string, Resource> pathMap;
+    const Resources& persistentVolumes = resources.persistentVolumes();
+
+    foreach (const Resource& volume, persistentVolumes) {
+      // This is validated in master.
+      CHECK(Resources::isReserved(volume));
+      string path = paths::getPersistentVolumePath(workDir, volume);
+      pathMap[path] = volume;
+    }
+
+    return pathMap;
+  };
+
+  const hashmap<string, Resource> oldPathMap =
+    toPathMap(flags.work_dir, checkpointedResources);
+
+  const hashmap<string, Resource> newPathMap =
+    toPathMap(flags.work_dir, newCheckpointedResources);
+
+  const hashset<string> oldPaths = oldPathMap.keys();
+  const hashset<string> newPaths = newPathMap.keys();
+
+  const hashset<string> createPaths = newPaths - oldPaths;
+  const hashset<string> deletePaths = oldPaths - newPaths;
 
   // Create persistent volumes that do not already exist.
   //
@@ -4240,15 +4263,8 @@ Try<Nothing> Slave::syncCheckpointedResources(
   // to support multiple disks, or raw disks. Depending on the
   // DiskInfo, we may want to create either directories under a root
   // directory, or LVM volumes from a given device.
-  foreach (const Resource& volume, newVolumes) {
-    // This is validated in master.
-    CHECK(Resources::isReserved(volume));
-
-    if (oldVolumes.contains(volume)) {
-      continue;
-    }
-
-    string path = paths::getPersistentVolumePath(flags.work_dir, volume);
+  foreach (const string& path, createPaths) {
+    const Resource& volume = newPathMap.at(path);
 
     // If creation of persistent volume fails, the agent exits.
     string volumeDescription = "persistent volume " +
@@ -4278,12 +4294,8 @@ Try<Nothing> Slave::syncCheckpointedResources(
   // remove the filesystem objects for the removed volume. Note that
   // for MOUNT disks, we don't remove the root directory (mount point)
   // of the volume.
-  foreach (const Resource& volume, oldVolumes) {
-    if (newVolumes.contains(volume)) {
-      continue;
-    }
-
-    string path = paths::getPersistentVolumePath(flags.work_dir, volume);
+  foreach (const string& path, deletePaths) {
+    const Resource& volume = oldPathMap.at(path);
 
     LOG(INFO) << "Deleting persistent volume '"
               << volume.disk().persistence().id()


[11/13] mesos git commit: Implemented operator API to grow and shrink persistent volume.

Posted by ch...@apache.org.
Implemented operator API to grow and shrink persistent volume.

These operator APIs is implemented as speculative for now, but
we plan to convert them to non-speculative in the future.

Review: https://reviews.apache.org/r/66051/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e2d3112f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e2d3112f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e2d3112f

Branch: refs/heads/master
Commit: e2d3112f3aaa19f33319db2dad16b2076a2df046
Parents: d418f15
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:05:02 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:05:02 2018 -0700

----------------------------------------------------------------------
 src/master/http.cpp       | 140 +++++++++++++++++++++++++++++++++++++++++
 src/master/master.hpp     |  10 +++
 src/master/validation.cpp |  26 ++++++++
 3 files changed, 176 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e2d3112f/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 135ae43..77cf47a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -773,6 +773,12 @@ Future<Response> Master::Http::api(
     case mesos::master::Call::DESTROY_VOLUMES:
       return destroyVolumes(call, principal, acceptType);
 
+    case mesos::master::Call::GROW_VOLUME:
+      return growVolume(call, principal, acceptType);
+
+    case mesos::master::Call::SHRINK_VOLUME:
+      return shrinkVolume(call, principal, acceptType);
+
     case mesos::master::Call::GET_MAINTENANCE_STATUS:
       return getMaintenanceStatus(call, principal, acceptType);
 
@@ -1490,6 +1496,140 @@ Future<Response> Master::Http::destroyVolumes(
 }
 
 
+Future<Response> Master::Http::growVolume(
+    const mesos::master::Call& call,
+    const Option<Principal>& principal,
+    ContentType /*contentType*/) const
+{
+  // TODO(greggomann): Remove this check once the `Principal` type is used in
+  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
+  // See MESOS-7202.
+  if (principal.isSome() && principal->value.isNone()) {
+    return Forbidden(
+        "The request's authenticated principal contains claims, but no value "
+        "string. The master currently requires that principals have a value");
+  }
+
+  CHECK_EQ(mesos::master::Call::GROW_VOLUME, call.type());
+  CHECK(call.has_grow_volume());
+
+  // Only agent default resources are supported right now.
+  CHECK(call.grow_volume().has_slave_id());
+
+  const SlaveID& slaveId = call.grow_volume().slave_id();
+
+  Slave* slave = master->slaves.registered.get(slaveId);
+  if (slave == nullptr) {
+    return BadRequest("No agent found with specified ID");
+  }
+
+  // Create an operation.
+  Offer::Operation operation;
+  operation.set_type(Offer::Operation::GROW_VOLUME);
+
+  operation.mutable_grow_volume()->mutable_volume()->CopyFrom(
+      call.grow_volume().volume());
+
+  operation.mutable_grow_volume()->mutable_addition()->CopyFrom(
+      call.grow_volume().addition());
+
+  Option<Error> error = validateAndUpgradeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
+
+  error = validation::operation::validate(
+      operation.grow_volume(), slave->capabilities);
+
+  if (error.isSome()) {
+    return BadRequest(
+        "Invalid GROW_VOLUME operation on agent " +
+        stringify(*slave) + ": " + error->message);
+  }
+
+  return master->authorizeResizeVolume(
+      operation.grow_volume().volume(), principal)
+    .then(defer(master->self(), [=](bool authorized) -> Future<Response> {
+      if (!authorized) {
+        return Forbidden();
+      }
+
+      // The `volume` and `addition` fields contain the resources required for
+      // this operation.
+      return _operation(
+          slaveId,
+          Resources(operation.grow_volume().volume()) +
+            Resources(operation.grow_volume().addition()),
+          operation);
+    }));
+}
+
+
+Future<Response> Master::Http::shrinkVolume(
+    const mesos::master::Call& call,
+    const Option<Principal>& principal,
+    ContentType /*contentType*/) const
+{
+  // TODO(greggomann): Remove this check once the `Principal` type is used in
+  // `ReservationInfo`, `DiskInfo`, and within the master's `principals` map.
+  // See MESOS-7202.
+  if (principal.isSome() && principal->value.isNone()) {
+    return Forbidden(
+        "The request's authenticated principal contains claims, but no value "
+        "string. The master currently requires that principals have a value");
+  }
+
+  CHECK_EQ(mesos::master::Call::SHRINK_VOLUME, call.type());
+  CHECK(call.has_shrink_volume());
+
+  // Only persistent volumes are supported right now.
+  CHECK(call.shrink_volume().has_slave_id());
+
+  const SlaveID& slaveId = call.shrink_volume().slave_id();
+
+  Slave* slave = master->slaves.registered.get(slaveId);
+  if (slave == nullptr) {
+    return BadRequest("No agent found with specified ID");
+  }
+
+  // Create an operation.
+  Offer::Operation operation;
+  operation.set_type(Offer::Operation::SHRINK_VOLUME);
+
+  operation.mutable_shrink_volume()->mutable_volume()->CopyFrom(
+      call.shrink_volume().volume());
+
+  operation.mutable_shrink_volume()->mutable_subtract()->CopyFrom(
+      call.shrink_volume().subtract());
+
+  Option<Error> error = validateAndUpgradeResources(&operation);
+  if (error.isSome()) {
+    return BadRequest(error->message);
+  }
+
+  error = validation::operation::validate(
+      operation.shrink_volume(), slave->capabilities);
+
+  if (error.isSome()) {
+    return BadRequest(
+        "Invalid SHRINK_VOLUME operation on agent " +
+        stringify(*slave) + ": " + error->message);
+  }
+
+  return master->authorizeResizeVolume(
+      operation.shrink_volume().volume(), principal)
+    .then(defer(master->self(), [=](bool authorized) -> Future<Response> {
+      if (!authorized) {
+        return Forbidden();
+      }
+
+      // The `volume` field contains the resources required for this operation.
+      return _operation(
+          slaveId, operation.shrink_volume().volume(), operation);
+    }));
+}
+
+
 string Master::Http::FRAMEWORKS_HELP()
 {
   return HELP(

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2d3112f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 270f60a..e4f7b45 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1700,6 +1700,16 @@ private:
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
 
+    process::Future<process::http::Response> growVolume(
+        const mesos::master::Call& call,
+        const Option<process::http::authentication::Principal>& principal,
+        ContentType contentType) const;
+
+    process::Future<process::http::Response> shrinkVolume(
+        const mesos::master::Call& call,
+        const Option<process::http::authentication::Principal>& principal,
+        ContentType contentType) const;
+
     process::Future<process::http::Response> reserveResources(
         const mesos::master::Call& call,
         const Option<process::http::authentication::Principal>& principal,

http://git-wip-us.apache.org/repos/asf/mesos/blob/e2d3112f/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 74ed171..0c1c924 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -190,6 +190,32 @@ Option<Error> validate(
       }
       return None();
 
+    case mesos::master::Call::GROW_VOLUME:
+      if (!call.has_grow_volume()) {
+        return Error("Expecting 'grow_volume' to be present");
+      }
+
+      if (!call.grow_volume().has_slave_id()) {
+        return Error(
+            "Expecting 'agent_id' to be present; only agent default resources "
+            "are supported right now");
+      }
+
+      return None();
+
+    case mesos::master::Call::SHRINK_VOLUME:
+      if (!call.has_shrink_volume()) {
+        return Error("Expecting 'shrink_volume' to be present");
+      }
+
+      if (!call.shrink_volume().has_slave_id()) {
+        return Error(
+            "Expecting 'agent_id' to be present; only agent default resources "
+            "are supported right now");
+      }
+
+      return None();
+
     case mesos::master::Call::GET_MAINTENANCE_STATUS:
       return None();
 


[05/13] mesos git commit: Added helper functions to create grow and shrink volume in test.

Posted by ch...@apache.org.
Added helper functions to create grow and shrink volume in test.

Review: https://reviews.apache.org/r/66219/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/71057f2c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/71057f2c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/71057f2c

Branch: refs/heads/master
Commit: 71057f2c34b0ee4278be20a1da55423a81fc30a0
Parents: 8251087
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:42 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:42 2018 -0700

----------------------------------------------------------------------
 src/tests/mesos.hpp | 56 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 56 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/71057f2c/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 907bca5..8da3b02 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1342,6 +1342,32 @@ inline typename TOffer::Operation DESTROY(const TResources& volumes)
 }
 
 
+template <typename TResource, typename TOffer>
+inline typename TOffer::Operation GROW_VOLUME(
+    const TResource& volume,
+    const TResource& addition)
+{
+  typename TOffer::Operation operation;
+  operation.set_type(TOffer::Operation::GROW_VOLUME);
+  operation.mutable_grow_volume()->mutable_volume()->CopyFrom(volume);
+  operation.mutable_grow_volume()->mutable_addition()->CopyFrom(addition);
+  return operation;
+}
+
+
+template <typename TResource, typename TOffer, typename TValueScalar>
+inline typename TOffer::Operation SHRINK_VOLUME(
+    const TResource& volume,
+    const TValueScalar& subtract)
+{
+  typename TOffer::Operation operation;
+  operation.set_type(TOffer::Operation::SHRINK_VOLUME);
+  operation.mutable_shrink_volume()->mutable_volume()->CopyFrom(volume);
+  operation.mutable_shrink_volume()->mutable_subtract()->CopyFrom(subtract);
+  return operation;
+}
+
+
 template <typename TOffer, typename TTaskInfo>
 inline typename TOffer::Operation LAUNCH(const std::vector<TTaskInfo>& tasks)
 {
@@ -1745,6 +1771,20 @@ inline Offer::Operation DESTROY(Args&&... args)
 }
 
 
+template <typename... Args>
+inline Offer::Operation GROW_VOLUME(Args&&... args)
+{
+  return common::GROW_VOLUME<Resource, Offer>(std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline Offer::Operation SHRINK_VOLUME(Args&&... args)
+{
+  return common::SHRINK_VOLUME<Resource, Offer>(std::forward<Args>(args)...);
+}
+
+
 // We specify the argument to allow brace initialized construction.
 inline Offer::Operation LAUNCH(const std::vector<TaskInfo>& tasks)
 {
@@ -2043,6 +2083,22 @@ inline mesos::v1::Offer::Operation DESTROY(Args&&... args)
 }
 
 
+template <typename... Args>
+inline mesos::v1::Offer::Operation GROW_VOLUME(Args&&... args)
+{
+  return common::GROW_VOLUME<mesos::v1::Resource, mesos::v1::Offer>(
+      std::forward<Args>(args)...);
+}
+
+
+template <typename... Args>
+inline mesos::v1::Offer::Operation SHRINK_VOLUME(Args&&... args)
+{
+  return common::SHRINK_VOLUME<mesos::v1::Resource, mesos::v1::Offer>(
+      std::forward<Args>(args)...);
+}
+
+
 // We specify the argument to allow brace initialized construction.
 inline mesos::v1::Offer::Operation LAUNCH(
     const std::vector<mesos::v1::TaskInfo>& tasks)


[12/13] mesos git commit: Added test for `GROW_VOLUME` and `SHRINK_VOLUME` operator API.

Posted by ch...@apache.org.
Added test for `GROW_VOLUME` and `SHRINK_VOLUME` operator API.

Review: https://reviews.apache.org/r/66227/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f8d28f40
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f8d28f40
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f8d28f40

Branch: refs/heads/master
Commit: f8d28f409b4cfd6330e3063b59d5192a063f32aa
Parents: e2d3112
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:05:08 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:05:08 2018 -0700

----------------------------------------------------------------------
 src/tests/api_tests.cpp | 221 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 221 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f8d28f40/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index dd8e221..1ed26a7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -43,6 +43,7 @@
 #include "common/http.hpp"
 #include "common/protobuf_utils.hpp"
 #include "common/recordio.hpp"
+#include "common/resources_utils.hpp"
 
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
@@ -3680,6 +3681,226 @@ TEST_P(MasterAPITest, CreateAndDestroyVolumes)
 }
 
 
+// Test growing a persistent volume through the master operator API.
+TEST_P(MasterAPITest, GrowVolume)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // For capturing the SlaveID so we can use it in API calls.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  // Do Static reservation so we can create persistent volumes from it.
+  slaveFlags.resources = "disk(role1):192";
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID slaveId = slaveRegisteredMessage->slave_id();
+
+  // Create the persistent volume.
+  v1::master::Call v1CreateVolumesCall;
+  v1CreateVolumesCall.set_type(v1::master::Call::CREATE_VOLUMES);
+  v1::master::Call_CreateVolumes* createVolumes =
+    v1CreateVolumesCall.mutable_create_volumes();
+
+  Resource volume = createPersistentVolume(
+      Megabytes(64),
+      "role1",
+      "id1",
+      "path1",
+      None(),
+      None(),
+      DEFAULT_CREDENTIAL.principal());
+
+  createVolumes->add_volumes()->CopyFrom(evolve(volume));
+  createVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
+
+  ContentType contentType = GetParam();
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> v1CreateVolumesResponse = http::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1CreateVolumesCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      http::Accepted().status,
+      v1CreateVolumesResponse);
+
+  Resource addition = Resources::parse("disk", "128", "role1").get();
+
+  // Grow the persistent volume.
+  v1::master::Call v1GrowVolumeCall;
+  v1GrowVolumeCall.set_type(v1::master::Call::GROW_VOLUME);
+
+  v1::master::Call::GrowVolume* growVolume =
+    v1GrowVolumeCall.mutable_grow_volume();
+
+  growVolume->mutable_agent_id()->CopyFrom(evolve(slaveId));
+  growVolume->mutable_volume()->CopyFrom(evolve(volume));
+  growVolume->mutable_addition()->CopyFrom(evolve(addition));
+
+  Future<http::Response> v1GrowVolumeResponse = http::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1GrowVolumeCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      http::Accepted().status,
+      v1GrowVolumeResponse);
+
+  Resource grownVolume = createPersistentVolume(
+      Megabytes(192),
+      "role1",
+      "id1",
+      "path1",
+      None(),
+      None(),
+      DEFAULT_CREDENTIAL.principal());
+
+  v1::master::Call v1GetAgentsCall;
+  v1GetAgentsCall.set_type(v1::master::Call::GET_AGENTS);
+
+  Future<v1::master::Response> v1GetAgentsResponse =
+    post(master.get()->pid, v1GetAgentsCall, contentType);
+
+  AWAIT_READY(v1GetAgentsResponse);
+  ASSERT_TRUE(v1GetAgentsResponse->IsInitialized());
+  ASSERT_EQ(v1::master::Response::GET_AGENTS, v1GetAgentsResponse->type());
+  ASSERT_EQ(1, v1GetAgentsResponse->get_agents().agents_size());
+
+  RepeatedPtrField<Resource> agentResources = devolve<Resource>(
+      v1GetAgentsResponse->get_agents().agents(0).total_resources());
+
+  upgradeResources(&agentResources);
+
+  EXPECT_EQ(
+      Resources(grownVolume),
+      Resources(agentResources).persistentVolumes());
+}
+
+
+// Test shrinking a persistent volume through the master operator API.
+TEST_P(MasterAPITest, ShrinkVolume)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // For capturing the SlaveID so we can use it in API calls.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  // Do static reservation so we can create persistent volumes from it.
+  slaveFlags.resources = "disk(role1):192";
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID slaveId = slaveRegisteredMessage->slave_id();
+
+  // Create a persistent volume with all disk space.
+  v1::master::Call v1CreateVolumesCall;
+  v1CreateVolumesCall.set_type(v1::master::Call::CREATE_VOLUMES);
+  v1::master::Call_CreateVolumes* createVolumes =
+    v1CreateVolumesCall.mutable_create_volumes();
+
+  Resource volume = createPersistentVolume(
+      Megabytes(192),
+      "role1",
+      "id1",
+      "path1",
+      None(),
+      None(),
+      DEFAULT_CREDENTIAL.principal());
+
+  createVolumes->add_volumes()->CopyFrom(evolve(volume));
+  createVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
+
+  ContentType contentType = GetParam();
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> v1CreateVolumesResponse = http::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1CreateVolumesCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      http::Accepted().status,
+      v1CreateVolumesResponse);
+
+  Resource shrunkVolume = createPersistentVolume(
+      Megabytes(128),
+      "role1",
+      "id1",
+      "path1",
+      None(),
+      None(),
+      DEFAULT_CREDENTIAL.principal());
+
+  // Shrink the persistent volume.
+  v1::master::Call v1ShrinkVolumeCall;
+  v1ShrinkVolumeCall.set_type(v1::master::Call::SHRINK_VOLUME);
+
+  v1::master::Call::ShrinkVolume* shrinkVolume =
+    v1ShrinkVolumeCall.mutable_shrink_volume();
+
+  shrinkVolume->mutable_agent_id()->CopyFrom(evolve(slaveId));
+  shrinkVolume->mutable_volume()->CopyFrom(evolve(volume));
+  shrinkVolume->mutable_subtract()->set_value(64);
+
+  Future<http::Response> v1ShrinkVolumeResponse = http::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1ShrinkVolumeCall),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      http::Accepted().status,
+      v1ShrinkVolumeResponse);
+
+  v1::master::Call v1GetAgentsCall;
+  v1GetAgentsCall.set_type(v1::master::Call::GET_AGENTS);
+
+  Future<v1::master::Response> v1GetAgentsResponse =
+    post(master.get()->pid, v1GetAgentsCall, contentType);
+
+  AWAIT_READY(v1GetAgentsResponse);
+  ASSERT_TRUE(v1GetAgentsResponse->IsInitialized());
+  ASSERT_EQ(v1::master::Response::GET_AGENTS, v1GetAgentsResponse->type());
+  ASSERT_EQ(1, v1GetAgentsResponse->get_agents().agents_size());
+
+  RepeatedPtrField<Resource> agentResources = devolve<Resource>(
+      v1GetAgentsResponse->get_agents().agents(0).total_resources());
+
+  upgradeResources(&agentResources);
+
+  EXPECT_EQ(
+      Resources(shrunkVolume),
+      Resources(agentResources).persistentVolumes());
+}
+
+
 TEST_P(MasterAPITest, GetWeights)
 {
   // Start a master with `--weights` flag.


[03/13] mesos git commit: Added a new `RESIZE_VOLUME` agent capability.

Posted by ch...@apache.org.
Added a new `RESIZE_VOLUME` agent capability.

This will be used as a feature flag to gate the new volume resize
feature. This feature will be turn on by default once released.

Review: https://reviews.apache.org/r/66733/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f2b4977
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f2b4977
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f2b4977

Branch: refs/heads/master
Commit: 9f2b4977c1fd6b1d0ab5cc5674fe825a46ae7961
Parents: 7d88e06
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:21 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:21 2018 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto     |  4 ++++
 include/mesos/v1/mesos.proto  |  4 ++++
 src/common/protobuf_utils.cpp |  3 ++-
 src/common/protobuf_utils.hpp |  7 +++++++
 src/slave/constants.cpp       |  3 ++-
 src/slave/flags.cpp           |  5 +++++
 src/tests/master_tests.cpp    | 15 +++++++++++----
 src/tests/slave_tests.cpp     | 16 +++++++++++-----
 8 files changed, 46 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9f2b4977/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 9930bea..463e6ad 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -993,6 +993,10 @@ message SlaveInfo {
       //
       // (2) The ability to provide operation feedback.
       RESOURCE_PROVIDER = 4;
+
+      // This expresses the capability for the agent to handle persistent volume
+      // resize operations safely. This capability is turned on by default.
+      RESIZE_VOLUME = 5;
     }
 
     // Enum fields should be optional, see: MESOS-4997.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f2b4977/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 4101243..8eaad9c 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -985,6 +985,10 @@ message AgentInfo {
       //
       // (2) The ability to provide operation feedback.
       RESOURCE_PROVIDER = 4;
+
+      // This expresses the capability for the agent to handle persistent volume
+      // resize operations safely. This capability is turned on by default.
+      RESIZE_VOLUME = 5;
     }
 
     // Enum fields should be optional, see: MESOS-4997.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f2b4977/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 2e675c4..c5d873c 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -1061,7 +1061,8 @@ bool operator==(const Capabilities& left, const Capabilities& right)
   return left.multiRole == right.multiRole &&
          left.hierarchicalRole == right.hierarchicalRole &&
          left.reservationRefinement == right.reservationRefinement &&
-         left.resourceProvider == right.resourceProvider;
+         left.resourceProvider == right.resourceProvider &&
+         left.resizeVolume == right.resizeVolume;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f2b4977/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index ae060f3..1662125 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -281,6 +281,9 @@ struct Capabilities
         case SlaveInfo::Capability::RESOURCE_PROVIDER:
           resourceProvider = true;
           break;
+        case SlaveInfo::Capability::RESIZE_VOLUME:
+          resizeVolume = true;
+          break;
         // If adding another case here be sure to update the
         // equality operator.
       }
@@ -292,6 +295,7 @@ struct Capabilities
   bool hierarchicalRole = false;
   bool reservationRefinement = false;
   bool resourceProvider = false;
+  bool resizeVolume = false;
 
   google::protobuf::RepeatedPtrField<SlaveInfo::Capability>
   toRepeatedPtrField() const
@@ -309,6 +313,9 @@ struct Capabilities
     if (resourceProvider) {
       result.Add()->set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
     }
+    if (resizeVolume) {
+      result.Add()->set_type(SlaveInfo::Capability::RESIZE_VOLUME);
+    }
 
     return result;
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f2b4977/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 51de71b..103384c 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -39,7 +39,8 @@ vector<SlaveInfo::Capability> AGENT_CAPABILITIES()
     SlaveInfo::Capability::HIERARCHICAL_ROLE,
     SlaveInfo::Capability::MULTI_ROLE,
     SlaveInfo::Capability::RESERVATION_REFINEMENT,
-    SlaveInfo::Capability::RESOURCE_PROVIDER};
+    SlaveInfo::Capability::RESOURCE_PROVIDER,
+    SlaveInfo::Capability::RESIZE_VOLUME};
 
   vector<SlaveInfo::Capability> result;
   foreach (SlaveInfo::Capability::Type type, types) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f2b4977/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 02e1a8b..a319b5e 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -740,6 +740,11 @@ mesos::internal::slave::Flags::Flags()
                 "At least the following agent features need to be enabled: "
                 "MULTI_ROLE, HIERARCHICAL_ROLE, RESERVATION_REFINEMENT");
           }
+
+          if (capabilities.resizeVolume && !capabilities.resourceProvider) {
+            return Error(
+                "RESIZE_VOLUME feature requires RESOURCE_PROVIDER feature");
+          }
         }
 
         return None();

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f2b4977/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index d5ce52c..e159573 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -5045,11 +5045,18 @@ TEST_F(MasterTest, StateEndpointAgentCapabilities)
   ASSERT_EQ(1u, slaveInfo.values.count("capabilities"));
   JSON::Value slaveCapabilities = slaveInfo.values.at("capabilities");
 
-  // Agents should have MULTI_ROLE, HIERARCHICAL_ROLE, RESERVATION_REFINEMENT,
-  // and RESOURCE_PROVIDER capabilities in current implementation.
+  // Agents should have the following capabilities in the current
+  // implementation.
   Try<JSON::Value> expectedCapabilities = JSON::parse(
-      "[\"MULTI_ROLE\",\"HIERARCHICAL_ROLE\",\"RESERVATION_REFINEMENT\","
-      "\"RESOURCE_PROVIDER\"]");
+    R"~(
+      [
+        "MULTI_ROLE",
+        "HIERARCHICAL_ROLE",
+        "RESERVATION_REFINEMENT",
+        "RESOURCE_PROVIDER",
+        "RESIZE_VOLUME"
+      ]
+    )~");
 
   ASSERT_SOME(expectedCapabilities);
   EXPECT_TRUE(slaveCapabilities.contains(expectedCapabilities.get()));

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f2b4977/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index ae82438..9e39884 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1699,12 +1699,18 @@ TEST_F(SlaveTest, StateEndpoint)
   EXPECT_FALSE(state.values["capabilities"].as<JSON::Array>().values.empty());
   JSON::Value slaveCapabilities = state.values.at("capabilities");
 
-  // Agents should always have MULTI_ROLE, HIERARCHICAL_ROLE,
-  // RESERVATION_REFINEMENT, and RESOURCE_PROVIDER capabilities
-  // in current implementation.
+  // Agents should have the following capabilities in the current
+  // implementation.
   Try<JSON::Value> expectedCapabilities = JSON::parse(
-      "[\"MULTI_ROLE\",\"HIERARCHICAL_ROLE\",\"RESERVATION_REFINEMENT\","
-      "\"RESOURCE_PROVIDER\"]");
+    R"~(
+      [
+        "MULTI_ROLE",
+        "HIERARCHICAL_ROLE",
+        "RESERVATION_REFINEMENT",
+        "RESOURCE_PROVIDER",
+        "RESIZE_VOLUME"
+      ]
+    )~");
 
   ASSERT_SOME(expectedCapabilities);
   EXPECT_TRUE(slaveCapabilities.contains(expectedCapabilities.get()));


[02/13] mesos git commit: Added offer operation to grow and shrink persistent volumes.

Posted by ch...@apache.org.
Added offer operation to grow and shrink persistent volumes.

Review: https://reviews.apache.org/r/66049/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7d88e06c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7d88e06c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7d88e06c

Branch: refs/heads/master
Commit: 7d88e06cb9752c55aa0c80f0989c58e05ee495f9
Parents: 57e705a
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:11 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:11 2018 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                  | 22 ++++++++++++++++++++++
 include/mesos/v1/mesos.proto               | 22 ++++++++++++++++++++++
 src/common/protobuf_utils.cpp              | 19 +++++++++++++++++++
 src/common/resources_utils.cpp             | 13 +++++++++++++
 src/master/master.cpp                      | 23 +++++++++++++++++++++++
 src/resource_provider/storage/provider.cpp |  6 ++++++
 src/tests/mesos.hpp                        |  6 ++++++
 src/tests/persistent_volume_tests.cpp      |  2 ++
 src/tests/reservation_tests.cpp            |  2 ++
 9 files changed, 115 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index a61de9d..9930bea 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1923,6 +1923,8 @@ message Offer {
       DESTROY_VOLUME = 8; // EXPERIMENTAL.
       CREATE_BLOCK = 9;   // EXPERIMENTAL.
       DESTROY_BLOCK = 10; // EXPERIMENTAL.
+      GROW_VOLUME = 11;   // EXPERIMENTAL.
+      SHRINK_VOLUME = 12; // EXPERIMENTAL.
     }
 
     // TODO(vinod): Deprecate this in favor of `LaunchGroup` below.
@@ -1959,6 +1961,24 @@ message Offer {
       repeated Resource volumes = 1;
     }
 
+    // Grow a volume by an additional disk resource.
+    // NOTE: This is currently experimental and only for persistent volumes
+    // created on ROOT/PATH disk.
+    message GrowVolume {
+      required Resource volume = 1;
+      required Resource addition = 2;
+    }
+
+    // Shrink a volume by the size specified in the `subtract` field.
+    // NOTE: This is currently experimental and only for persistent volumes
+    // created on ROOT/PATH disk.
+    message ShrinkVolume {
+      required Resource volume = 1;
+
+      // See comments in `Value.Scalar` for maximum precision supported.
+      required Value.Scalar subtract = 2;
+    }
+
     // NOTE: For the time being, this API is subject to change and the related
     // feature is experimental.
     message CreateVolume {
@@ -1997,6 +2017,8 @@ message Offer {
     optional Unreserve unreserve = 4;
     optional Create create = 5;
     optional Destroy destroy = 6;
+    optional GrowVolume grow_volume = 13; // EXPERIMENTAL.
+    optional ShrinkVolume shrink_volume = 14; // EXPERIMENTAL.
     optional CreateVolume create_volume = 8;
     optional DestroyVolume destroy_volume = 9;
     optional CreateBlock create_block = 10;

http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 317d9ef..4101243 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1915,6 +1915,8 @@ message Offer {
       DESTROY_VOLUME = 8; // EXPERIMENTAL.
       CREATE_BLOCK = 9;   // EXPERIMENTAL.
       DESTROY_BLOCK = 10; // EXPERIMENTAL.
+      GROW_VOLUME = 11;   // EXPERIMENTAL.
+      SHRINK_VOLUME = 12; // EXPERIMENTAL.
     }
 
     // TODO(vinod): Deprecate this in favor of `LaunchGroup` below.
@@ -1951,6 +1953,24 @@ message Offer {
       repeated Resource volumes = 1;
     }
 
+    // Grow a volume by an additional disk resource.
+    // NOTE: This is currently experimental and only for persistent volumes
+    // created on ROOT/PATH disk.
+    message GrowVolume {
+      required Resource volume = 1;
+      required Resource addition = 2;
+    }
+
+    // Shrink a volume by the size specified in the `subtract` field.
+    // NOTE: This is currently experimental and only for persistent volumes
+    // created on ROOT/PATH disk.
+    message ShrinkVolume {
+      required Resource volume = 1;
+
+      // See comments in `Value.Scalar` for maximum precision supported.
+      required Value.Scalar subtract = 2;
+    }
+
     // NOTE: For the time being, this API is subject to change and the related
     // feature is experimental.
     message CreateVolume {
@@ -1989,6 +2009,8 @@ message Offer {
     optional Unreserve unreserve = 4;
     optional Create create = 5;
     optional Destroy destroy = 6;
+    optional GrowVolume grow_volume = 13; // EXPERIMENTAL.
+    optional ShrinkVolume shrink_volume = 14; // EXPERIMENTAL.
     optional CreateVolume create_volume = 8;
     optional DestroyVolume destroy_volume = 9;
     optional CreateBlock create_block = 10;

http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 78bffd8..2e675c4 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -37,6 +37,7 @@
 #include <stout/foreach.hpp>
 #include <stout/net.hpp>
 #include <stout/stringify.hpp>
+#include <stout/unimplemented.hpp>
 #include <stout/unreachable.hpp>
 #include <stout/uuid.hpp>
 
@@ -707,6 +708,11 @@ void injectAllocationInfo(
       break;
     }
 
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
+      UNIMPLEMENTED;
+    }
+
     case Offer::Operation::CREATE_VOLUME: {
       inject(
           *operation->mutable_create_volume()->mutable_source(),
@@ -822,6 +828,11 @@ void stripAllocationInfo(Offer::Operation* operation)
       break;
     }
 
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
+      UNIMPLEMENTED;
+    }
+
     case Offer::Operation::CREATE_VOLUME: {
       strip(*operation->mutable_create_volume()->mutable_source());
 
@@ -867,6 +878,10 @@ bool isSpeculativeOperation(const Offer::Operation& operation)
     case Offer::Operation::CREATE:
     case Offer::Operation::DESTROY:
       return true;
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
+      UNIMPLEMENTED;
+    }
     case Offer::Operation::UNKNOWN:
       UNREACHABLE();
   }
@@ -1020,6 +1035,10 @@ Try<Resources> getConsumedResources(const Offer::Operation& operation)
 
       return consumed;
     }
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
+      UNIMPLEMENTED;
+    }
     case Offer::Operation::LAUNCH:
     case Offer::Operation::LAUNCH_GROUP:
       // TODO(bbannier): Consider adding support for 'LAUNCH' and

http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 9be01c1..6f56026 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -16,6 +16,7 @@
 
 #include <stout/foreach.hpp>
 #include <stout/stringify.hpp>
+#include <stout/unimplemented.hpp>
 
 #include "common/resources_utils.hpp"
 
@@ -195,6 +196,10 @@ Try<vector<TResourceConversion>> getResourceConversions(
       }
       break;
     }
+    case TOperation::GROW_VOLUME:
+    case TOperation::SHRINK_VOLUME: {
+      UNIMPLEMENTED;
+    }
   }
 
   return conversions;
@@ -259,6 +264,10 @@ Result<ResourceProviderID> getResourceProviderId(
       }
       resource = operation.destroy().volumes(0);
       break;
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
+      UNIMPLEMENTED;
+    }
     case Offer::Operation::CREATE_VOLUME:
       resource = operation.create_volume().source();
       break;
@@ -631,6 +640,10 @@ Option<Error> validateAndUpgradeResources(Offer::Operation* operation)
 
       break;
     }
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
+      UNIMPLEMENTED;
+    }
     case Offer::Operation::LAUNCH: {
       // TODO(mpark): Once we perform a sanity check validation for
       // offer operations as specified in MESOS-7760, this should no

http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c723a29..56cf61f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -69,6 +69,7 @@
 #include <stout/option.hpp>
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
+#include <stout/unimplemented.hpp>
 #include <stout/unreachable.hpp>
 #include <stout/utils.hpp>
 #include <stout/uuid.hpp>
@@ -4132,6 +4133,10 @@ void Master::accept(
 
             break;
           }
+          case Offer::Operation::GROW_VOLUME:
+          case Offer::Operation::SHRINK_VOLUME: {
+            UNIMPLEMENTED;
+          }
           case Offer::Operation::UNKNOWN: {
             LOG(WARNING) << "Ignoring unknown operation";
             break;
@@ -4203,6 +4208,10 @@ void Master::accept(
             accept.add_operations()->CopyFrom(operation);
             break;
           }
+          case Offer::Operation::GROW_VOLUME:
+          case Offer::Operation::SHRINK_VOLUME: {
+            UNIMPLEMENTED;
+          }
           case Offer::Operation::UNKNOWN: {
             LOG(WARNING) << "Ignoring unknown operation";
             break;
@@ -4239,6 +4248,10 @@ void Master::accept(
         // No-op.
         break;
       }
+      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::SHRINK_VOLUME: {
+        UNIMPLEMENTED;
+      }
       case Offer::Operation::LAUNCH: {
         foreach (
             TaskInfo& task, *operation.mutable_launch()->mutable_task_infos()) {
@@ -4403,6 +4416,11 @@ void Master::accept(
         break;
       }
 
+      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::SHRINK_VOLUME: {
+        UNIMPLEMENTED;
+      }
+
       case Offer::Operation::CREATE_VOLUME: {
         // TODO(nfnt): Implement authorization for 'CREATE_VOLUME'.
         break;
@@ -4884,6 +4902,11 @@ void Master::_accept(
         break;
       }
 
+      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::SHRINK_VOLUME: {
+        UNIMPLEMENTED;
+      }
+
       case Offer::Operation::LAUNCH: {
         foreach (const TaskInfo& task, operation.launch().task_infos()) {
           Future<bool> authorization = authorizations.front();

http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index d1267cf..eb35780 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -2845,6 +2845,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
 
       break;
     }
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
+      // TODO(chhsiao): These operations are currently not supported for
+      // resource providers, and should have been validated by the master.
+      UNREACHABLE();
+    }
     case Offer::Operation::UNKNOWN:
     case Offer::Operation::LAUNCH:
     case Offer::Operation::LAUNCH_GROUP: {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 756a521..907bca5 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -3126,6 +3126,12 @@ public:
         break;
       case Operation::DESTROY:
         break;
+      // TODO(zhitao): Implement default operation for `GROW_VOLUME` and
+      // `SHRINK_VOLUME` on mocked resource provider.
+      case Operation::GROW_VOLUME:
+        break;
+      case Operation::SHRINK_VOLUME:
+        break;
       case Operation::CREATE_VOLUME:
         update->mutable_status()->add_converted_resources()->CopyFrom(
             operation.info().create_volume().source());

http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 4edf781..c3de96e 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -176,6 +176,8 @@ protected:
           case Offer::Operation::DESTROY_VOLUME:
           case Offer::Operation::CREATE_BLOCK:
           case Offer::Operation::DESTROY_BLOCK:
+          case Offer::Operation::GROW_VOLUME:
+          case Offer::Operation::SHRINK_VOLUME:
             UNREACHABLE();
           case Offer::Operation::CREATE: {
             Resources resources = message.operation_info().create().volumes();

http://git-wip-us.apache.org/repos/asf/mesos/blob/7d88e06c/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index 5570df2..7d121bf 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -95,6 +95,8 @@ public:
           case Offer::Operation::DESTROY_VOLUME:
           case Offer::Operation::CREATE_BLOCK:
           case Offer::Operation::DESTROY_BLOCK:
+          case Offer::Operation::GROW_VOLUME:
+          case Offer::Operation::SHRINK_VOLUME:
             UNREACHABLE();
           case Offer::Operation::RESERVE: {
             Resources resources =


[09/13] mesos git commit: Added test for authorization actions for `RESIZE_VOLUME`.

Posted by ch...@apache.org.
Added test for authorization actions for `RESIZE_VOLUME`.

Review: https://reviews.apache.org/r/66532/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/360ae2f9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/360ae2f9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/360ae2f9

Branch: refs/heads/master
Commit: 360ae2f95ef4850018843e88fd8adc1e847f6ce0
Parents: 9656329
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:57 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:57 2018 -0700

----------------------------------------------------------------------
 src/tests/authorization_tests.cpp     | 294 +++++++++++++++++++++++
 src/tests/persistent_volume_tests.cpp | 366 +++++++++++++++++++++++++++++
 2 files changed, 660 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/360ae2f9/src/tests/authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/authorization_tests.cpp b/src/tests/authorization_tests.cpp
index a76ad18..f6f7769 100644
--- a/src/tests/authorization_tests.cpp
+++ b/src/tests/authorization_tests.cpp
@@ -1977,6 +1977,300 @@ TYPED_TEST(AuthorizationTest, DestroyVolume)
 }
 
 
+// Tests the authorization of ACLs used for resizing volumes.
+TYPED_TEST(AuthorizationTest, ResizeVolume)
+{
+  ACLs acls;
+
+  {
+    // Principal "foo" can resize volumes for any role.
+    mesos::ACL::ResizeVolume* acl = acls.add_resize_volumes();
+    acl->mutable_principals()->add_values("foo");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::ANY);
+  }
+
+  {
+    // Principal "bar" can only resize volumes for the "panda" role.
+    mesos::ACL::ResizeVolume* acl = acls.add_resize_volumes();
+    acl->mutable_principals()->add_values("bar");
+    acl->mutable_roles()->add_values("panda");
+  }
+
+  {
+    // Principal "baz" cannot resize volumes.
+    mesos::ACL::ResizeVolume* acl = acls.add_resize_volumes();
+    acl->mutable_principals()->add_values("baz");
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // Principal "elizabeth-ii" can resize volumes for the "king" role and its
+    // nested ones.
+    mesos::ACL::ResizeVolume* acl = acls.add_resize_volumes();
+    acl->mutable_principals()->add_values("elizabeth-ii");
+    acl->mutable_roles()->add_values("king/%");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // Principal "charles" can resize volumes for any role below the "king/"
+    // role. Not in "king" itself.
+    mesos::ACL::ResizeVolume* acl = acls.add_resize_volumes();
+    acl->mutable_principals()->add_values("charles");
+    acl->mutable_roles()->add_values("king/%");
+  }
+
+  {
+    // Principal "j-welby" can resize volumes only for the "king" role but
+    // not in any nested one.
+    mesos::ACL::ResizeVolume* acl = acls.add_resize_volumes();
+    acl->mutable_principals()->add_values("j-welby");
+    acl->mutable_roles()->add_values("king");
+  }
+
+  {
+    // No other principals can resize volumes.
+    mesos::ACL::ResizeVolume* acl = acls.add_resize_volumes();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  {
+    // No other principals can resize volumes.
+    mesos::ACL::ResizeVolume* acl = acls.add_resize_volumes();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  // Create an `Authorizer` with the ACLs.
+  Try<Authorizer*> create = TypeParam::create(parameterize(acls));
+  ASSERT_SOME(create);
+  Owned<Authorizer> authorizer(create.get());
+
+  // Principal "foo" can create volumes for any role, so this request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()->set_value("awesome_role");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("foo");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("awesome_role");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" can create volumes for the "panda" role,
+  // so this request will pass.
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->set_value("panda");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // Principal "bar" cannot resize volumes for the "giraffe" role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()->set_value("giraffe");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("bar");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("giraffe");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "baz" cannot resize volumes for any role,
+  // so this request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("baz");
+    request.mutable_object()->set_value("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("baz");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // Principal "zelda" is not mentioned in the ACLs of the Authorizer, so it
+  // will be caught by the final ACL, which provides a default case that denies
+  // access for all other principals. This request will fail.
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("zelda");
+    request.mutable_object()->set_value("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("zelda");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("panda");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  // "elizabeth-ii" has full permissions for the "king" role as well as all
+  // its nested roles. She should be able to resize volumes in the next
+  // three blocks.
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("elizabeth-ii");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // "charles" doesn't have permissions for the "king" role, so the first
+  // test should fail. However he has permissions for "king"'s nested roles
+  // so the next two tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("charles");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  // "j-welby" only has permissions for the role "king" itself, but not
+  // for its nested roles, therefore only the first of the following three
+  // tests should succeed.
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king");
+    AWAIT_EXPECT_TRUE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+
+  {
+    authorization::Request request;
+    request.set_action(authorization::RESIZE_VOLUME);
+    request.mutable_subject()->set_value("j-welby");
+    request.mutable_object()
+      ->mutable_resource()
+      ->mutable_reservations()
+      ->Add()
+      ->set_role("king/prince/duke");
+    AWAIT_EXPECT_FALSE(authorizer->authorized(request));
+  }
+}
+
+
 // This tests the authorization of requests to update quotas.
 TYPED_TEST(AuthorizationTest, UpdateQuota)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/360ae2f9/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 5b2a23c..477e6e2 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -985,6 +985,372 @@ TEST_P(PersistentVolumeTest, NonSpeculativeShrinkAndLaunch)
 }
 
 
+// This test verifies that grow and shrink operations can complete
+// successfully when authorization succeeds.
+TEST_P(PersistentVolumeTest, GoodACLGrowThenShrink)
+{
+  if (GetParam() == MOUNT) {
+    // It is not possible to have a valid `GrowVolume` on a MOUNT disk because
+    // the volume must use up all disk space at `Create` and no space will be
+    // left for `addition`. Therefore we skip this test.
+    // TODO(zhitao): Make MOUNT a meaningful parameter value for this test, or
+    // create a new fixture to avoid testing against it.
+    return;
+  }
+
+  Clock::pause();
+
+  ACLs acls;
+
+  // This ACL declares that the principal of `DEFAULT_CREDENTIAL`
+  // can resize persistent volumes for DEFAULT_TEST_ROLE.
+  mesos::ACL::ResizeVolume* resize = acls.add_resize_volumes();
+  resize->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
+  resize->mutable_roles()->add_values(DEFAULT_TEST_ROLE);
+
+  // Create a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.acls = acls;
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offersBeforeCreate;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersBeforeCreate));
+
+  driver.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeCreate);
+  ASSERT_FALSE(offersBeforeCreate->empty());
+
+  Offer offer = offersBeforeCreate->at(0);
+
+  // The disk spaces will be merged if the fixture parameter is `NONE`.
+  Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048);
+
+  Bytes bytesDifference = Megabytes(512);
+
+  // Construct a persistent volume which do not use up all disk resources.
+  Resource volume = createPersistentVolume(
+      getDiskResource(totalBytes - bytesDifference, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Resource difference = getDiskResource(bytesDifference, 1);
+
+  Resource grownVolume = createPersistentVolume(
+      getDiskResource(totalBytes, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Future<vector<Offer>> offersAfterGrow;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersAfterGrow));
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Create a persistent volume then grow it.
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume), GROW_VOLUME(volume, difference)},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterGrow);
+  ASSERT_FALSE(offersAfterGrow->empty());
+
+  offer = offersAfterGrow->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(grownVolume), frameworkInfo.roles(0)),
+      Resources(offer.resources()).persistentVolumes());
+
+  EXPECT_FALSE(
+      Resources(offer.resources()).contains(
+      allocatedResources(difference, frameworkInfo.roles(0))));
+
+  Future<vector<Offer>> offersAfterShrink;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersAfterShrink));
+
+  // Shrink the volume back to original size.
+  driver.acceptOffers(
+      {offer.id()},
+      {SHRINK_VOLUME(grownVolume, difference.scalar())},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterShrink);
+  ASSERT_FALSE(offersAfterShrink->empty());
+  offer = offersAfterShrink->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(volume), frameworkInfo.roles(0)),
+      Resources(offer.resources()).persistentVolumes());
+
+  EXPECT_TRUE(
+      Resources(offer.resources()).contains(
+      allocatedResources(difference, frameworkInfo.roles(0))));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+// This test verifies that grow and shrink operations get dropped if
+// authorization fails and no principal is supplied.
+TEST_P(PersistentVolumeTest, BadACLDropGrowAndShrink)
+{
+  if (GetParam() == MOUNT) {
+    // It is not possible to have a valid `GrowVolume` on a MOUNT disk because
+    // the volume must use up all disk space at `Create` and no space will be
+    // left for `addition`. Therefore we skip this test.
+    // TODO(zhitao): Make MOUNT a meaningful parameter value for this test, or
+    // create a new fixture to avoid testing against it.
+    return;
+  }
+
+  Clock::pause();
+
+  ACLs acls;
+
+  // This ACL declares that no principal can resize any volume.
+  mesos::ACL::ResizeVolume* resize = acls.add_resize_volumes();
+  resize->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+  resize->mutable_roles()->set_type(mesos::ACL::Entity::NONE);
+
+  // Create a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.acls = acls;
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // DEFAULT_FRAMEWORK_INFO uses DEFAULT_CREDENTIAL.
+  FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo1.set_roles(0, DEFAULT_TEST_ROLE);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched1, registered(&driver1, _, _));
+
+  Future<vector<Offer>> offersBeforeCreate;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offersBeforeCreate));
+
+  driver1.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeCreate);
+  ASSERT_FALSE(offersBeforeCreate->empty());
+
+  Offer offer = offersBeforeCreate->at(0);
+
+  // Disk spaces will be merged if fixture parameter is `NONE`.
+  Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048);
+
+  Bytes bytesDifference = Megabytes(512);
+
+  // Construct a persistent volume which does not use up all disk resources.
+  Resource volume = createPersistentVolume(
+      getDiskResource(totalBytes - bytesDifference, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo1.principal());
+
+  Resource difference = getDiskResource(bytesDifference, 1);
+
+  Resource grownVolume = createPersistentVolume(
+      getDiskResource(totalBytes, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo1.principal());
+
+  Future<vector<Offer>> offersAfterGrow1;
+
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offersAfterGrow1));
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Creating the persistent volume will succeed, but growing will fail due to
+  // ACL.
+  driver1.acceptOffers(
+      {offer.id()},
+      {CREATE(volume), GROW_VOLUME(volume, difference)},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterGrow1);
+  ASSERT_FALSE(offersAfterGrow1->empty());
+
+  offer = offersAfterGrow1->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(volume), DEFAULT_TEST_ROLE),
+      Resources(offer.resources()).persistentVolumes());
+
+  EXPECT_TRUE(
+      Resources(offer.resources()).contains(
+      allocatedResources(difference, DEFAULT_TEST_ROLE)));
+
+  Future<vector<Offer>> offersAfterShrink1;
+
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offersAfterShrink1));
+
+  driver1.acceptOffers(
+      {offer.id()},
+      {SHRINK_VOLUME(volume, difference.scalar())},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterShrink1);
+  ASSERT_FALSE(offersAfterShrink1->empty());
+
+  offer = offersAfterShrink1->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(volume), DEFAULT_TEST_ROLE),
+      Resources(offer.resources()).persistentVolumes());
+
+  driver1.stop();
+  driver1.join();
+
+  // Start the second framework with no principal.
+  FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo2.clear_principal();
+  frameworkInfo2.set_roles(0, DEFAULT_TEST_ROLE);
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched2, registered(&driver2, _, _));
+
+  Future<vector<Offer>> offersBeforeGrow2;
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offersBeforeGrow2));
+
+  driver2.start();
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeGrow2);
+  ASSERT_FALSE(offersBeforeGrow2->empty());
+
+  offer = offersBeforeGrow2->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(volume), DEFAULT_TEST_ROLE),
+      Resources(offer.resources()).persistentVolumes());
+
+  EXPECT_TRUE(
+      Resources(offer.resources()).contains(
+      allocatedResources(difference, DEFAULT_TEST_ROLE)));
+
+  Future<vector<Offer>> offersAfterGrow2;
+
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offersAfterGrow2));
+
+  driver2.acceptOffers(
+      {offer.id()},
+      {GROW_VOLUME(volume, difference)},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterGrow2);
+  ASSERT_FALSE(offersAfterGrow2->empty());
+  offer = offersAfterGrow2->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(volume), DEFAULT_TEST_ROLE),
+      Resources(offer.resources()).persistentVolumes());
+
+  Future<vector<Offer>> offersAfterShrink2;
+
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _))
+    .WillOnce(FutureArg<1>(&offersAfterShrink2));
+
+  driver2.acceptOffers(
+      {offer.id()},
+      {SHRINK_VOLUME(volume, difference.scalar())},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterShrink2);
+  ASSERT_FALSE(offersAfterShrink2->empty());
+  offer = offersAfterShrink2->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(volume), DEFAULT_TEST_ROLE),
+      Resources(offer.resources()).persistentVolumes());
+
+  driver2.stop();
+  driver2.join();
+
+  Clock::resume();
+}
+
+
 // This test verifies that the slave checkpoints the resources for
 // persistent volumes to the disk, recovers them upon restart, and
 // sends them to the master during re-registration.


[04/13] mesos git commit: Implemented grow and shrink of persistent volumes.

Posted by ch...@apache.org.
Implemented grow and shrink of persistent volumes.

The new offer operations are implemented as speculative operations, but
we will use validation to make them non-speculative on API level so that
we can transition later without a breaking change.

Review: https://reviews.apache.org/r/66050/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8251087b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8251087b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8251087b

Branch: refs/heads/master
Commit: 8251087b041ebdfd70ad5aa1e0296e139d4663f0
Parents: 9f2b497
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:36 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:36 2018 -0700

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp  |  49 +++++++----
 src/common/resources_utils.cpp | 104 +++++++++++++++++++++--
 src/master/master.cpp          | 160 +++++++++++++++++++++++++++++++-----
 src/master/validation.cpp      | 129 +++++++++++++++++++++++++++++
 src/master/validation.hpp      |  10 +++
 5 files changed, 411 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index c5d873c..82ba141 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -37,7 +37,6 @@
 #include <stout/foreach.hpp>
 #include <stout/net.hpp>
 #include <stout/stringify.hpp>
-#include <stout/unimplemented.hpp>
 #include <stout/unreachable.hpp>
 #include <stout/uuid.hpp>
 
@@ -708,9 +707,24 @@ void injectAllocationInfo(
       break;
     }
 
-    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::GROW_VOLUME: {
+      inject(
+          *operation->mutable_grow_volume()->mutable_volume(),
+          allocationInfo);
+
+      inject(
+          *operation->mutable_grow_volume()->mutable_addition(),
+          allocationInfo);
+
+      break;
+    }
+
     case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
+      inject(
+          *operation->mutable_shrink_volume()->mutable_volume(),
+          allocationInfo);
+
+      break;
     }
 
     case Offer::Operation::CREATE_VOLUME: {
@@ -828,9 +842,17 @@ void stripAllocationInfo(Offer::Operation* operation)
       break;
     }
 
-    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::GROW_VOLUME: {
+      strip(*operation->mutable_grow_volume()->mutable_volume());
+      strip(*operation->mutable_grow_volume()->mutable_addition());
+
+      break;
+    }
+
     case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
+      strip(*operation->mutable_shrink_volume()->mutable_volume());
+
+      break;
     }
 
     case Offer::Operation::CREATE_VOLUME: {
@@ -877,11 +899,12 @@ bool isSpeculativeOperation(const Offer::Operation& operation)
     case Offer::Operation::UNRESERVE:
     case Offer::Operation::CREATE:
     case Offer::Operation::DESTROY:
-      return true;
+    // TODO(zhitao): Convert `GROW_VOLUME` and `SHRINK_VOLUME` to
+    // non-speculative operations once we can support non-speculative operator
+    // API.
     case Offer::Operation::GROW_VOLUME:
-    case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
-    }
+    case Offer::Operation::SHRINK_VOLUME:
+      return true;
     case Offer::Operation::UNKNOWN:
       UNREACHABLE();
   }
@@ -1020,7 +1043,9 @@ Try<Resources> getConsumedResources(const Offer::Operation& operation)
     case Offer::Operation::RESERVE:
     case Offer::Operation::UNRESERVE:
     case Offer::Operation::CREATE:
-    case Offer::Operation::DESTROY: {
+    case Offer::Operation::DESTROY:
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
       Try<vector<ResourceConversion>> conversions =
         getResourceConversions(operation);
 
@@ -1035,10 +1060,6 @@ Try<Resources> getConsumedResources(const Offer::Operation& operation)
 
       return consumed;
     }
-    case Offer::Operation::GROW_VOLUME:
-    case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
-    }
     case Offer::Operation::LAUNCH:
     case Offer::Operation::LAUNCH_GROUP:
       // TODO(bbannier): Consider adding support for 'LAUNCH' and

http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 6f56026..eb72995 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -16,7 +16,6 @@
 
 #include <stout/foreach.hpp>
 #include <stout/stringify.hpp>
-#include <stout/unimplemented.hpp>
 
 #include "common/resources_utils.hpp"
 
@@ -196,9 +195,56 @@ Try<vector<TResourceConversion>> getResourceConversions(
       }
       break;
     }
-    case TOperation::GROW_VOLUME:
+
+    case TOperation::GROW_VOLUME: {
+      const TResource& volume = operation.grow_volume().volume();
+      const TResource& addition = operation.grow_volume().addition();
+
+      if (TResources::hasResourceProvider(volume)) {
+        return Error("Operation not supported for resource provider");
+      }
+
+      // To grow a persistent volume, we consume the original volume and the
+      // additional resource and convert into a single volume with the new size.
+      TResource converted = volume;
+      *converted.mutable_scalar() += addition.scalar();
+
+      conversions.emplace_back(TResources(volume) + addition, converted);
+      break;
+    }
+
     case TOperation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
+      const TResource& volume = operation.shrink_volume().volume();
+
+      if (TResources::hasResourceProvider(volume)) {
+        return Error("Operation not supported for resource provider");
+      }
+
+      // To shrink a persistent volume, we consume the original volume and
+      // convert into a new volume with reduced size and a freed disk resource
+      // without persistent volume info.
+      TResource freed = volume;
+
+      *freed.mutable_scalar() = operation.shrink_volume().subtract();
+
+      // TODO(zhitao): Move this to helper function
+      // `Resources::stripPersistentVolume`.
+      if (freed.disk().has_source()) {
+        freed.mutable_disk()->clear_persistence();
+        freed.mutable_disk()->clear_volume();
+      } else {
+        freed.clear_disk();
+      }
+
+      // Since we only allow persistent volumes to be shared, the
+      // freed resource must be non-shared.
+      freed.clear_shared();
+
+      TResource shrunk = volume;
+      *shrunk.mutable_scalar() -= operation.shrink_volume().subtract();
+
+      conversions.emplace_back(volume, TResources(shrunk) + freed);
+      break;
     }
   }
 
@@ -265,9 +311,11 @@ Result<ResourceProviderID> getResourceProviderId(
       resource = operation.destroy().volumes(0);
       break;
     case Offer::Operation::GROW_VOLUME:
-    case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
-    }
+      resource = operation.grow_volume().volume();
+      break;
+    case Offer::Operation::SHRINK_VOLUME:
+      resource = operation.shrink_volume().volume();
+      break;
     case Offer::Operation::CREATE_VOLUME:
       resource = operation.create_volume().source();
       break;
@@ -640,9 +688,49 @@ Option<Error> validateAndUpgradeResources(Offer::Operation* operation)
 
       break;
     }
-    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::GROW_VOLUME: {
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_grow_volume()) {
+        return Error(
+            "A GROW_VOLUME operation must have"
+            " the Offer.Operation.grow_volume field set");
+      }
+
+      Option<Error> error = Resources::validate(
+          operation->grow_volume().volume());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      error = Resources::validate(operation->grow_volume().addition());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      break;
+    }
     case Offer::Operation::SHRINK_VOLUME: {
-      UNIMPLEMENTED;
+      // TODO(mpark): Once we perform a sanity check validation for
+      // offer operations as specified in MESOS-7760, this should no
+      // longer have to be handled in this function.
+      if (!operation->has_shrink_volume()) {
+        return Error(
+            "A SHRINK_VOLUME offer operation must have"
+            " the Offer.Operation.shrink_volume field set");
+      }
+
+      Option<Error> error = Resources::validate(
+          operation->shrink_volume().volume());
+
+      if (error.isSome()) {
+        return error;
+      }
+
+      break;
     }
     case Offer::Operation::LAUNCH: {
       // TODO(mpark): Once we perform a sanity check validation for

http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 56cf61f..b9946b5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -69,7 +69,6 @@
 #include <stout/option.hpp>
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
-#include <stout/unimplemented.hpp>
 #include <stout/unreachable.hpp>
 #include <stout/utils.hpp>
 #include <stout/uuid.hpp>
@@ -4107,6 +4106,8 @@ void Master::accept(
           case Offer::Operation::UNRESERVE:
           case Offer::Operation::CREATE:
           case Offer::Operation::DESTROY:
+          case Offer::Operation::GROW_VOLUME:
+          case Offer::Operation::SHRINK_VOLUME:
           case Offer::Operation::CREATE_VOLUME:
           case Offer::Operation::DESTROY_VOLUME:
           case Offer::Operation::CREATE_BLOCK:
@@ -4133,10 +4134,6 @@ void Master::accept(
 
             break;
           }
-          case Offer::Operation::GROW_VOLUME:
-          case Offer::Operation::SHRINK_VOLUME: {
-            UNIMPLEMENTED;
-          }
           case Offer::Operation::UNKNOWN: {
             LOG(WARNING) << "Ignoring unknown operation";
             break;
@@ -4172,6 +4169,8 @@ void Master::accept(
           case Offer::Operation::UNRESERVE:
           case Offer::Operation::CREATE:
           case Offer::Operation::DESTROY:
+          case Offer::Operation::GROW_VOLUME:
+          case Offer::Operation::SHRINK_VOLUME:
           case Offer::Operation::CREATE_VOLUME:
           case Offer::Operation::DESTROY_VOLUME:
           case Offer::Operation::CREATE_BLOCK:
@@ -4208,10 +4207,6 @@ void Master::accept(
             accept.add_operations()->CopyFrom(operation);
             break;
           }
-          case Offer::Operation::GROW_VOLUME:
-          case Offer::Operation::SHRINK_VOLUME: {
-            UNIMPLEMENTED;
-          }
           case Offer::Operation::UNKNOWN: {
             LOG(WARNING) << "Ignoring unknown operation";
             break;
@@ -4241,6 +4236,8 @@ void Master::accept(
       case Offer::Operation::UNRESERVE:
       case Offer::Operation::CREATE:
       case Offer::Operation::DESTROY:
+      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::SHRINK_VOLUME:
       case Offer::Operation::CREATE_VOLUME:
       case Offer::Operation::DESTROY_VOLUME:
       case Offer::Operation::CREATE_BLOCK:
@@ -4248,10 +4245,6 @@ void Master::accept(
         // No-op.
         break;
       }
-      case Offer::Operation::GROW_VOLUME:
-      case Offer::Operation::SHRINK_VOLUME: {
-        UNIMPLEMENTED;
-      }
       case Offer::Operation::LAUNCH: {
         foreach (
             TaskInfo& task, *operation.mutable_launch()->mutable_task_infos()) {
@@ -4416,9 +4409,14 @@ void Master::accept(
         break;
       }
 
-      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::GROW_VOLUME: {
+        // TODO(zhitao): Add support for authorization of grow volume.
+        break;
+      }
+
       case Offer::Operation::SHRINK_VOLUME: {
-        UNIMPLEMENTED;
+        // TODO(zhitao): Add support for authorization of shrink volume.
+        break;
       }
 
       case Offer::Operation::CREATE_VOLUME: {
@@ -4563,6 +4561,12 @@ void Master::_accept(
   // launched, we remove its resource from offered resources.
   Resources _offeredResources = offeredResources;
 
+  // Converted resources from volume resizes. These converted resources are not
+  // put into `_offeredResources`, so no other operations can consume them.
+  // TODO(zhitao): This will be unnecessary once `GROW_VOLUME` and
+  // `SHRINK_VOLUME` become non-speculative.
+  Resources resizedResources;
+
   // We keep track of the shared resources from the offers separately.
   // `offeredSharedResources` can be modified by CREATE/DESTROY but we
   // don't remove from it when a task is successfully launched so this
@@ -4902,9 +4906,124 @@ void Master::_accept(
         break;
       }
 
-      case Offer::Operation::GROW_VOLUME:
+      case Offer::Operation::GROW_VOLUME: {
+        // TODO(zhitao): Authorize GROW_VOLUME from `authorizations`.
+
+        // Make sure this grow volume operation is valid.
+        Option<Error> error = validation::operation::validate(
+            operation.grow_volume(), slave->capabilities);
+
+        if (error.isSome()) {
+          drop(
+              framework,
+              operation,
+              error->message + "; on agent " + stringify(*slave));
+          continue;
+        }
+
+        // TODO(zhitao): Convert this operation to non-speculative once we can
+        // support that in the operator API.
+        Try<vector<ResourceConversion>> _conversions =
+          getResourceConversions(operation);
+
+        if (_conversions.isError()) {
+          drop(framework, operation, _conversions.error());
+          continue;
+        }
+
+        CHECK_EQ(1u, _conversions->size());
+        const Resources& consumed = _conversions->at(0).consumed;
+        const Resources& converted = _conversions->at(0).converted;
+
+        if (!_offeredResources.contains(consumed)) {
+          drop(
+              framework,
+              operation,
+              "Invalid GROW_VOLUME operation: " +
+              stringify(_offeredResources) + " does not contain " +
+              stringify(consumed));
+
+          continue;
+        }
+
+        _offeredResources -= consumed;
+        resizedResources += converted;
+
+        LOG(INFO) << "Processing GROW_VOLUME operation for volume "
+                  << operation.grow_volume().volume()
+                  << " with additional resource "
+                  << operation.grow_volume().addition()
+                  << " from framework "
+                  << *framework << " on agent " << *slave;
+
+        _apply(slave, framework, operation);
+
+        conversions.insert(
+            conversions.end(),
+            _conversions->begin(),
+            _conversions->end());
+
+        break;
+      }
+
       case Offer::Operation::SHRINK_VOLUME: {
-        UNIMPLEMENTED;
+        // TODO(zhitao): Authorize SHRINK_VOLUME from `authorizations`.
+
+        // Make sure this shrink volume operation is valid.
+        Option<Error> error = validation::operation::validate(
+            operation.shrink_volume(), slave->capabilities);
+
+        if (error.isSome()) {
+          drop(
+              framework,
+              operation,
+              error->message + "; on agent " + stringify(*slave));
+          continue;
+        }
+
+        // TODO(zhitao): Convert this operation to non-speculative once we can
+        // support that in the operator API.
+        Try<vector<ResourceConversion>> _conversions =
+          getResourceConversions(operation);
+
+        if (_conversions.isError()) {
+          drop(framework, operation, _conversions.error());
+          continue;
+        }
+
+        CHECK_EQ(1u, _conversions->size());
+        const Resources& consumed = _conversions->at(0).consumed;
+        const Resources& converted = _conversions->at(0).converted;
+
+        if (!_offeredResources.contains(consumed)) {
+          drop(
+              framework,
+              operation,
+              "Invalid SHRINK_VOLUME operation: " +
+              stringify(_offeredResources) + " does not contain " +
+              stringify(consumed));
+
+          continue;
+        }
+
+        _offeredResources -= consumed;
+        resizedResources += converted;
+
+        LOG(INFO) << "Processing SHRINK_VOLUME operation for volume "
+                  << operation.shrink_volume().volume()
+                  << " subtracting scalar value "
+                  << operation.shrink_volume().subtract()
+                  << " from framework "
+                  << *framework << " on agent " << *slave;
+
+        _apply(slave, framework, operation);
+
+        conversions.insert(
+            conversions.end(),
+            _conversions->begin(),
+            _conversions->end());
+
+        break;
       }
 
       case Offer::Operation::LAUNCH: {
@@ -5500,12 +5619,15 @@ void Master::_accept(
         conversions);
   }
 
-  if (!_offeredResources.empty()) {
+
+  // TODO(zhitao): Remove `resizedResources` once `GROW_VOLUME` and
+  // `SHRINK_VOLUME` become non-speculative.
+  if (!_offeredResources.empty() || !resizedResources.empty()) {
     // Tell the allocator about the unused (e.g., refused) resources.
     allocator->recoverResources(
         frameworkId,
         slaveId,
-        _offeredResources,
+        _offeredResources + resizedResources,
         accept.filters());
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 15dfa8a..74ed171 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2342,6 +2342,135 @@ Option<Error> validate(
 }
 
 
+Option<Error> validate(
+    const Offer::Operation::GrowVolume& growVolume,
+    const protobuf::slave::Capabilities& agentCapabilities)
+{
+  Option<Error> error = Resources::validate(growVolume.volume());
+  if (error.isSome()) {
+    return Error(
+        "Invalid resource in the 'GrowVolume.volume' field: " +
+        error->message);
+  }
+
+  error = Resources::validate(growVolume.addition());
+  if (error.isSome()) {
+    return Error(
+        "Invalid resource in the 'GrowVolume.addition' field: " +
+        error->message);
+  }
+
+  Value::Scalar zero;
+  zero.set_value(0);
+
+  // The `Scalar` comparison contains a fixed-point conversion.
+  if (growVolume.addition().scalar() <= zero) {
+    return Error(
+        "The size of 'GrowVolume.addition' field must be greater than zero");
+  }
+
+  if (Resources::hasResourceProvider(growVolume.volume())) {
+    return Error("Growing a volume from a resource provider is not supported");
+  }
+
+  error = resource::validatePersistentVolume(Resources(growVolume.volume()));
+  if (error.isSome()) {
+    return Error(
+        "Invalid persistent volume in the 'GrowVolume.volume' field: " +
+        error->message);
+  }
+
+  if (growVolume.volume().has_shared()) {
+    return Error("Growing a shared persistent volume is not supported");
+  }
+
+  // TODO(zhitao): Move this to a helper function
+  // `Resources::stripPersistentVolume`.
+  Resource stripped = growVolume.volume();
+
+  if (stripped.disk().has_source()) {
+    // PATH/MOUNT disk.
+    stripped.mutable_disk()->clear_persistence();
+    stripped.mutable_disk()->clear_volume();
+  } else {
+    // ROOT disk.
+    stripped.clear_disk();
+  }
+
+  if ((Resources(stripped) + growVolume.addition()).size() != 1) {
+    return Error(
+        "Incompatible resources in the 'GrowVolume.volume' and "
+        "'GrowVolume.addition' fields");
+  }
+
+  if (!agentCapabilities.resizeVolume) {
+    return Error(
+        "Volume " + stringify(growVolume.volume()) +
+        " cannot be grown on an agent without RESIZE_VOLUME capability");
+  }
+
+  return None();
+}
+
+
+Option<Error> validate(
+    const Offer::Operation::ShrinkVolume& shrinkVolume,
+    const protobuf::slave::Capabilities& agentCapabilities)
+{
+  Option<Error> error = Resources::validate(shrinkVolume.volume());
+  if (error.isSome()) {
+    return Error(
+        "Invalid resource in the 'ShrinkVolume.volume' field: " +
+        error->message);
+  }
+
+  Value::Scalar zero;
+  zero.set_value(0);
+
+  // The `Scalar` comparison contains a fixed-point conversion.
+  if (shrinkVolume.subtract() <= zero) {
+    return Error(
+        "Value of 'ShrinkVolume.subtract' must be greater than zero");
+  }
+
+  if (shrinkVolume.volume().scalar() <= shrinkVolume.subtract()) {
+    return Error(
+        "Value of 'ShrinkVolume.subtract' must be smaller than the size "
+        "of 'ShrinkVolume.volume'");
+  }
+
+  if (Resources::hasResourceProvider(shrinkVolume.volume())) {
+    return Error(
+        "Shrinking a volume from a resource provider is not supported");
+  }
+
+  if (shrinkVolume.volume().disk().source().type() ==
+      Resource::DiskInfo::Source::MOUNT) {
+    return Error(
+        "Shrinking a volume on a MOUNT disk is not supported");
+  }
+
+  error = resource::validatePersistentVolume(Resources(shrinkVolume.volume()));
+  if (error.isSome()) {
+    return Error(
+        "Invalid persistent volume in the 'ShrinkVolume.volume' field: " +
+        error->message);
+  }
+
+  if (shrinkVolume.volume().has_shared()) {
+    return Error("Shrinking a shared persistent volume is not supported");
+  }
+
+  if (!agentCapabilities.resizeVolume) {
+    return Error(
+        "Volume " + stringify(shrinkVolume.volume()) +
+        " cannot be shrunk on an agent without RESIZE_VOLUME capability");
+  }
+
+  return None();
+}
+
+
 Option<Error> validate(const Offer::Operation::CreateVolume& createVolume)
 {
   const Resource& source = createVolume.source();

http://git-wip-us.apache.org/repos/asf/mesos/blob/8251087b/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index c1ab754..1ba6d65 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -302,6 +302,16 @@ Option<Error> validate(
     const Option<FrameworkInfo>& frameworkInfo = None());
 
 
+Option<Error> validate(
+    const Offer::Operation::GrowVolume& growVolume,
+    const protobuf::slave::Capabilities& agentCapabilities);
+
+
+Option<Error> validate(
+    const Offer::Operation::ShrinkVolume& shrinkVolume,
+    const protobuf::slave::Capabilities& agentCapabilities);
+
+
 Option<Error> validate(const Offer::Operation::CreateVolume& createVolume);
 Option<Error> validate(const Offer::Operation::DestroyVolume& destroyVolume);
 Option<Error> validate(const Offer::Operation::CreateBlock& createBlock);


[07/13] mesos git commit: Added tests for `GROW_VOLUME` and `SHRINK_VOLUME` operations.

Posted by ch...@apache.org.
Added tests for `GROW_VOLUME` and `SHRINK_VOLUME` operations.

Review: https://reviews.apache.org/r/66220/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/274f2e68
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/274f2e68
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/274f2e68

Branch: refs/heads/master
Commit: 274f2e6804b3f9322ae9e19c2c0acecd179cbd10
Parents: fa1ca07
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:48 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:48 2018 -0700

----------------------------------------------------------------------
 src/tests/persistent_volume_tests.cpp | 537 +++++++++++++++++++++++++++++
 1 file changed, 537 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/274f2e68/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index c3de96e..5b2a23c 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -448,6 +448,543 @@ TEST_P(PersistentVolumeTest, CreateAndDestroyPersistentVolumes)
 }
 
 
+// This test verifies that a framework can grow a persistent volume and receive
+// the grown volume in further offers.
+TEST_P(PersistentVolumeTest, GrowVolume)
+{
+  if (GetParam() == MOUNT) {
+    // It is not possible to have a valid `GrowVolume` on a MOUNT disk because
+    // the volume must use up all disk space at `Create` and no space will be
+    // left for `addition`. Therefore we skip this test.
+    // TODO(zhitao): Make MOUNT a meaningful parameter value for this test, or
+    // create a new fixture to avoid testing against it.
+    return;
+  }
+
+  Clock::pause();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  // Create a master.
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offersBeforeCreate;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersBeforeCreate));
+
+  driver.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeCreate);
+  ASSERT_FALSE(offersBeforeCreate->empty());
+
+  Offer offer = offersBeforeCreate->at(0);
+
+  // The disk spaces will be merged if the fixture parameter is `NONE`.
+  Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048);
+
+  Bytes additionBytes = Megabytes(512);
+
+  // Construct a persistent volume which does not use up all disk resources.
+  Resource volume = createPersistentVolume(
+      getDiskResource(totalBytes - additionBytes, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Resource addition = getDiskResource(additionBytes, 1);
+
+  Resource grownVolume = createPersistentVolume(
+      getDiskResource(totalBytes, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Future<vector<Offer>> offersBeforeGrow;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersBeforeGrow));
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Create the persistent volume.
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume)},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeGrow);
+  ASSERT_FALSE(offersBeforeGrow->empty());
+
+  offer = offersBeforeGrow->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(volume), frameworkInfo.roles(0)),
+      Resources(offer.resources()).persistentVolumes());
+
+  EXPECT_TRUE(
+      Resources(offer.resources()).contains(
+      allocatedResources(addition, frameworkInfo.roles(0))));
+
+  // Make sure the volume exists, and leave a non-empty file there.
+  string volumePath = slave::paths::getPersistentVolumePath(
+      slaveFlags.work_dir, volume);
+
+  EXPECT_TRUE(os::exists(volumePath));
+  string filePath = path::join(volumePath, "file");
+  ASSERT_SOME(os::write(filePath, "abc"));
+
+  Future<vector<Offer>> offersAfterGrow;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersAfterGrow));
+
+  // Grow the volume.
+  driver.acceptOffers(
+      {offer.id()},
+      {GROW_VOLUME(volume, addition)},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterGrow);
+  ASSERT_FALSE(offersAfterGrow->empty());
+
+  EXPECT_TRUE(os::exists(volumePath));
+  EXPECT_SOME_EQ("abc", os::read(filePath));
+
+  offer = offersAfterGrow->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(grownVolume), frameworkInfo.roles(0)),
+      Resources(offer.resources()).persistentVolumes());
+
+  EXPECT_FALSE(
+      Resources(offer.resources()).contains(
+      allocatedResources(addition, frameworkInfo.roles(0))));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that a framework can shrink a persistent volume and see
+// the shrunk volume in further offers.
+TEST_P(PersistentVolumeTest, ShrinkVolume)
+{
+  Clock::pause();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  // Create a master.
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offersBeforeCreate;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersBeforeCreate));
+
+  driver.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeCreate);
+  ASSERT_FALSE(offersBeforeCreate->empty());
+
+  Offer offer = offersBeforeCreate->at(0);
+
+  // The disk spaces will be merged if the fixture parameter is `NONE`.
+  Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048);
+
+  Bytes shrinkBytes = Megabytes(512);
+
+  // Construct a persistent volume which uses up all disk resources.
+  Resource volume = createPersistentVolume(
+      getDiskResource(totalBytes, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Resource subtract = getDiskResource(shrinkBytes, 1);
+
+  Resource shrunkVolume = createPersistentVolume(
+      getDiskResource(totalBytes - shrinkBytes, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Future<vector<Offer>> offersBeforeShrink;
+
+  // Expect an offer containing the original volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersBeforeShrink));
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Create the persistent volume.
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume)},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeShrink);
+  ASSERT_FALSE(offersBeforeShrink->empty());
+
+  offer = offersBeforeShrink->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(volume), frameworkInfo.roles(0)),
+      Resources(offer.resources()).persistentVolumes());
+
+  EXPECT_FALSE(
+      Resources(offer.resources()).contains(
+      allocatedResources(subtract, frameworkInfo.roles(0))));
+
+  // Make sure the volume exists, and leaves a non-empty file there.
+  string volumePath = slave::paths::getPersistentVolumePath(
+      slaveFlags.work_dir, volume);
+
+  EXPECT_TRUE(os::exists(volumePath));
+  string filePath = path::join(volumePath, "file");
+  ASSERT_SOME(os::write(filePath, "abc"));
+
+  Future<vector<Offer>> offersAfterShrink;
+
+  // Expect an offer containing shrunk volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersAfterShrink));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {SHRINK_VOLUME(volume, subtract.scalar())},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterShrink);
+  ASSERT_FALSE(offersAfterShrink->empty());
+
+  EXPECT_TRUE(os::exists(volumePath));
+  EXPECT_SOME_EQ("abc", os::read(filePath));
+
+  offer = offersAfterShrink->at(0);
+
+  if (GetParam() != MOUNT) {
+    EXPECT_EQ(
+        allocatedResources(Resources(shrunkVolume), frameworkInfo.roles(0)),
+        Resources(offer.resources()).persistentVolumes());
+
+    EXPECT_TRUE(
+        Resources(offer.resources()).contains(
+        allocatedResources(subtract, frameworkInfo.roles(0))));
+  } else {
+    EXPECT_EQ(
+        allocatedResources(Resources(volume), frameworkInfo.roles(0)),
+        Resources(offer.resources()).persistentVolumes());
+
+    EXPECT_FALSE(
+        Resources(offer.resources()).contains(
+        allocatedResources(subtract, frameworkInfo.roles(0))));
+  }
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that a subsequent `LAUNCH` depending on a grown volume
+// will be dropped because we intend to keep `GROW_VOLUME` non-speculative.
+TEST_P(PersistentVolumeTest, NonSpeculativeGrowAndLaunch)
+{
+  if (GetParam() == MOUNT) {
+    // It is not possible to have a valid `GrowVolume` on a MOUNT disk because
+    // the volume must use up all disk space at `Create` and no space will be
+    // left for `addition`. Therefore we skip this test.
+    // TODO(zhitao): Make MOUNT a meaningful parameter value for this test, or
+    // create a new fixture to avoid testing against it.
+    return;
+  }
+
+  Clock::pause();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  // Create a master.
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offersBeforeOperations;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersBeforeOperations));
+
+  driver.start();
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeOperations);
+  ASSERT_FALSE(offersBeforeOperations->empty());
+
+  Offer offer = offersBeforeOperations->at(0);
+
+  // Disk spaces will be merged if fixture parameter is `NONE`.
+  Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048);
+
+  Bytes additionBytes = Megabytes(512);
+
+  // Construct a persistent volume which do not use up all disk resources.
+  Resource volume = createPersistentVolume(
+      getDiskResource(totalBytes - additionBytes, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Resource addition = getDiskResource(additionBytes, 1);
+
+  Resource grownVolume = createPersistentVolume(
+      getDiskResource(totalBytes, 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Future<TaskStatus> taskError;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&taskError));
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128").get() + grownVolume,
+      "echo abc > path1/file");
+
+  Future<vector<Offer>> offersAfterOperations;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersAfterOperations));
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Create and grow will succeed, but launch will be droppd with
+  // `TASK_ERROR`.
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume), GROW_VOLUME(volume, addition), LAUNCH({task})},
+      filters);
+
+  AWAIT_READY(taskError);
+  EXPECT_EQ(task.task_id(), taskError->task_id());
+  EXPECT_EQ(TASK_ERROR, taskError->state());
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterOperations);
+  ASSERT_FALSE(offersAfterOperations->empty());
+
+  offer = offersAfterOperations->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(grownVolume), frameworkInfo.roles(0)),
+      Resources(offer.resources()).persistentVolumes());
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that a subsequent `LAUNCH` depends on a shrunk volume
+// will be dropped because we intend to keep `SHRINK_VOLUME` non-speculative.
+TEST_P(PersistentVolumeTest, NonSpeculativeShrinkAndLaunch)
+{
+  if (GetParam() == MOUNT) {
+    // It is not possible to have a valid `GrowVolume` on a MOUNT disk because
+    // the volume must use up all disk space at `Create` and no space will be
+    // left for `addition`. Therefore we skip this test.
+    // TODO(zhitao): Make MOUNT a meaningful parameter value for this test, or
+    // create a new fixture to avoid testing against it.
+    return;
+  }
+
+  Clock::pause();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  // Create a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offersBeforeOperations;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersBeforeOperations));
+
+  driver.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersBeforeOperations);
+  ASSERT_FALSE(offersBeforeOperations->empty());
+
+  Offer offer = offersBeforeOperations->at(0);
+
+  Resource volume = createPersistentVolume(
+      getDiskResource(Megabytes(1024), 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Value::Scalar subtract;
+  subtract.set_value(512);
+
+  Resource shrunkVolume = createPersistentVolume(
+      getDiskResource(Megabytes(512), 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal());
+
+  Future<TaskStatus> taskError;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&taskError));
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128").get() + shrunkVolume,
+      "echo abc > path1/file");
+
+  Future<vector<Offer>> offersAfterOperations;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offersAfterOperations));
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Create and shrink volume will succeed, but launch will be dropped with
+  // `TASK_ERROR`.
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume), SHRINK_VOLUME(volume, subtract), LAUNCH({task})},
+      filters);
+
+  AWAIT_READY(taskError);
+  EXPECT_EQ(task.task_id(), taskError->task_id());
+  EXPECT_EQ(TASK_ERROR, taskError->state());
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offersAfterOperations);
+  ASSERT_FALSE(offersAfterOperations->empty());
+  offer = offersAfterOperations->at(0);
+
+  EXPECT_EQ(
+      allocatedResources(Resources(shrunkVolume), frameworkInfo.roles(0)),
+      Resources(offer.resources()).persistentVolumes());
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test verifies that the slave checkpoints the resources for
 // persistent volumes to the disk, recovers them upon restart, and
 // sends them to the master during re-registration.


[10/13] mesos git commit: Added new operator API to grow and shrink persistent volume.

Posted by ch...@apache.org.
Added new operator API to grow and shrink persistent volume.

The same API could be used in the future to grow or shrink CSI volumes,
but currently only persistent volumes are supported.

Review: https://reviews.apache.org/r/66052/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d418f155
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d418f155
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d418f155

Branch: refs/heads/master
Commit: d418f155271723903c70371c6be41de86d639ff5
Parents: 360ae2f
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:05:01 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:05:01 2018 -0700

----------------------------------------------------------------------
 include/mesos/master/master.proto    | 31 +++++++++++++++++++++++++++++++
 include/mesos/v1/master/master.proto | 31 +++++++++++++++++++++++++++++++
 2 files changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d418f155/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index aa63904..54f8412 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -79,6 +79,9 @@ message Call {
     CREATE_VOLUMES = 21;     // See 'CreateVolumes' below.
     DESTROY_VOLUMES = 22;    // See 'DestroyVolumes' below.
 
+    GROW_VOLUME = 34;        // See 'GrowVolume' below.
+    SHRINK_VOLUME = 35;      // See 'ShrinkVolume' below.
+
     // Retrieves the cluster's maintenance status.
     GET_MAINTENANCE_STATUS = 23;
     // Retrieves the cluster's maintenance schedule.
@@ -169,6 +172,32 @@ message Call {
     repeated Resource volumes = 2;
   }
 
+  // Grow a volume by an additional disk resource.
+  // NOTE: This is currently experimental and only for persistent volumes
+  // created on ROOT/PATH disks.
+  message GrowVolume {
+    // `slave_id` must be set if `volume` is an agent-local resource, and must
+    // be unset if `volume` is an external resource.
+    optional SlaveID slave_id = 1;
+
+    required Resource volume = 2;
+    required Resource addition = 3;
+  }
+
+  // Shrink a volume by the size specified in the `subtract` field.
+  // NOTE: This is currently experimental and only for persistent volumes
+  // created on ROOT/PATH disks.
+  message ShrinkVolume {
+    // `slave_id` must be set if `volume` is an agent-local resource, and must
+    // be unset if `volume` is an external resource.
+    optional SlaveID slave_id = 1;
+
+    required Resource volume = 2;
+
+    // See comments in `Value.Scalar` for maximum precision supported.
+    required Value.Scalar subtract = 3;
+  }
+
   // Updates the cluster's maintenance schedule.
   message UpdateMaintenanceSchedule {
     required maintenance.Schedule schedule = 1;
@@ -227,6 +256,8 @@ message Call {
   optional UnreserveResources unreserve_resources = 8;
   optional CreateVolumes create_volumes = 9;
   optional DestroyVolumes destroy_volumes = 10;
+  optional GrowVolume grow_volume = 18;
+  optional ShrinkVolume shrink_volume = 19;
   optional UpdateMaintenanceSchedule update_maintenance_schedule = 11;
   optional StartMaintenance start_maintenance = 12;
   optional StopMaintenance stop_maintenance = 13;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d418f155/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index ddb28f9..12f019d 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -77,6 +77,9 @@ message Call {
     CREATE_VOLUMES = 21;     // See 'CreateVolumes' below.
     DESTROY_VOLUMES = 22;    // See 'DestroyVolumes' below.
 
+    GROW_VOLUME = 34;        // See 'GrowVolume' below.
+    SHRINK_VOLUME = 35;      // See 'ShrinkVolume' below.
+
     // Retrieves the cluster's maintenance status.
     GET_MAINTENANCE_STATUS = 23;
     // Retrieves the cluster's maintenance schedule.
@@ -167,6 +170,32 @@ message Call {
     repeated Resource volumes = 2;
   }
 
+  // Grow a volume by an additional disk resource.
+  // NOTE: This is currently experimental and only for persistent volumes
+  // created on ROOT/PATH disks.
+  message GrowVolume {
+    // `agent_id` must be set if `volume` is an agent-local resource, and must
+    // be unset if `volume` is an external resource.
+    optional AgentID agent_id = 1;
+
+    required Resource volume = 2;
+    required Resource addition = 3;
+  }
+
+  // Shrink a volume by the size specified in the `subtract` field.
+  // NOTE: This is currently experimental and only for persistent volumes
+  // created on ROOT/PATH disks.
+  message ShrinkVolume {
+    // `agent_id` must be set if `volume` is an agent-local resource, and must
+    // be unset if `volume` is an external resource.
+    optional AgentID agent_id = 1;
+
+    required Resource volume = 2;
+
+    // See comments in `Value.Scalar` for maximum precision supported.
+    required Value.Scalar subtract = 3;
+  }
+
   // Updates the cluster's maintenance schedule.
   message UpdateMaintenanceSchedule {
     required maintenance.Schedule schedule = 1;
@@ -225,6 +254,8 @@ message Call {
   optional UnreserveResources unreserve_resources = 8;
   optional CreateVolumes create_volumes = 9;
   optional DestroyVolumes destroy_volumes = 10;
+  optional GrowVolume grow_volume = 18;
+  optional ShrinkVolume shrink_volume = 19;
   optional UpdateMaintenanceSchedule update_maintenance_schedule = 11;
   optional StartMaintenance start_maintenance = 12;
   optional StopMaintenance stop_maintenance = 13;


[06/13] mesos git commit: Added tests for validation of `GrowVolume` and `ShrinkVolume`.

Posted by ch...@apache.org.
Added tests for validation of `GrowVolume` and `ShrinkVolume`.

Review: https://reviews.apache.org/r/66858/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fa1ca073
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fa1ca073
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fa1ca073

Branch: refs/heads/master
Commit: fa1ca073ceec0fe24073a1c873ebbb328f806885
Parents: 71057f2
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:44 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:44 2018 -0700

----------------------------------------------------------------------
 src/tests/master_validation_tests.cpp | 298 +++++++++++++++++++++++++++++
 1 file changed, 298 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fa1ca073/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index a522961..fb1d8bd 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1460,6 +1460,304 @@ TEST_F(DestroyOperationValidationTest, MultipleResourceProviders)
 }
 
 
+class GrowVolumeOperationValidationTest : public MesosTest {
+protected:
+  Offer::Operation::GrowVolume createGrowVolume()
+  {
+    Resource volume = createPersistentVolume(
+        Megabytes(128),
+        "role1",
+        "id1",
+        "path1");
+
+    Resource addition = Resources::parse("disk", "128", "role1").get();
+
+    Offer::Operation::GrowVolume growVolume;
+    growVolume.mutable_volume()->CopyFrom(volume);
+    growVolume.mutable_addition()->CopyFrom(addition);
+
+    return growVolume;
+  }
+};
+
+
+// This test verifies that validation succeeds on a valid operation.
+TEST_F(GrowVolumeOperationValidationTest, Valid)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::GrowVolume growVolume = createGrowVolume();
+
+  Option<Error> error = operation::validate(growVolume, capabilities);
+  EXPECT_NONE(error);
+}
+
+
+// This test verifies that validation fails if `GrowVolume.volume` is not a
+// persistent volume.
+TEST_F(GrowVolumeOperationValidationTest, NonPersistentVolume)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::GrowVolume growVolume = createGrowVolume();
+  growVolume.mutable_volume()->mutable_disk()->clear_persistence();
+
+  Option<Error> error = operation::validate(growVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `GrowVolume.addition` has a zero
+// value.
+TEST_F(GrowVolumeOperationValidationTest, ZeroAddition)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::GrowVolume growVolume = createGrowVolume();
+  growVolume.mutable_addition()->mutable_scalar()->set_value(0);
+
+  Option<Error> error = operation::validate(growVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `GrowVolume.volume` and
+// `GrowVolume.addition' are incompatible.
+TEST_F(GrowVolumeOperationValidationTest, IncompatibleDisk)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  // Make the volume on a PATH disk so it cannot be grown with a ROOT disk.
+  Resource pathVolume = createPersistentVolume(
+       Megabytes(128),
+        "role1",
+        "id1",
+        "path1",
+        None(),
+        createDiskSourcePath("root"));
+
+  Offer::Operation::GrowVolume growVolume = createGrowVolume();
+  growVolume.mutable_volume()->CopyFrom(pathVolume);
+
+  Option<Error> error = operation::validate(growVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `GrowVolume.volume` is a shared
+// persistent volume.
+TEST_F(GrowVolumeOperationValidationTest, Shared)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::GrowVolume growVolume = createGrowVolume();
+  growVolume.mutable_volume()->mutable_shared();
+
+  Option<Error> error = operation::validate(growVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `GrowVolume.volume` has resource
+// provider id.
+TEST_F(GrowVolumeOperationValidationTest, ResourceProvider)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::GrowVolume growVolume = createGrowVolume();
+  growVolume.mutable_volume()->mutable_provider_id()->set_value("provider");
+
+  Option<Error> error = operation::validate(growVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `GrowVolume.volume` and
+// `GrowVolume.addition` are on MOUNT disks, which are not addable.
+TEST_F(GrowVolumeOperationValidationTest, Mount)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Resource mountVolume = createPersistentVolume(
+       Megabytes(128),
+        "role1",
+        "id1",
+        "path1",
+        None(),
+        createDiskSourceMount());
+
+  Resource mountDisk = createDiskResource(
+      "128", "role1", None(), None(), createDiskSourceMount());
+
+  Offer::Operation::GrowVolume growVolume = createGrowVolume();
+  growVolume.mutable_volume()->CopyFrom(mountVolume);
+  growVolume.mutable_addition()->CopyFrom(mountDisk);
+
+  Option<Error> error = operation::validate(growVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation  fails if agent has no RESIZE_VOLUME
+// capability.
+TEST_F(GrowVolumeOperationValidationTest, MissingCapability)
+{
+  protobuf::slave::Capabilities capabilities;
+
+  Option<Error> error = operation::validate(createGrowVolume(), capabilities);
+  EXPECT_SOME(error);
+}
+
+
+class ShrinkVolumeOperationValidationTest : public MesosTest {
+protected:
+  Offer::Operation::ShrinkVolume createShrinkVolume()
+  {
+    Resource volume = createPersistentVolume(
+        Megabytes(128),
+        "role1",
+        "id1",
+        "path1");
+
+    Offer::Operation::ShrinkVolume shrinkVolume;
+    shrinkVolume.mutable_volume()->CopyFrom(volume);
+    shrinkVolume.mutable_subtract()->set_value(64);
+
+    return shrinkVolume;
+  }
+};
+
+
+// This test verifies that validation succeeds on a valid `ShrinkVolume`
+// operation.
+TEST_F(ShrinkVolumeOperationValidationTest, Valid)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::ShrinkVolume shrinkVolume = createShrinkVolume();
+
+  Option<Error> error = operation::validate(shrinkVolume, capabilities);
+  EXPECT_NONE(error);
+}
+
+
+// This test verifies that validation fails if `ShrinkVolume.volume` is not a
+// persistent volume.
+TEST_F(ShrinkVolumeOperationValidationTest, NonPersistentVolume)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::ShrinkVolume shrinkVolume = createShrinkVolume();
+  shrinkVolume.mutable_volume()->mutable_disk()->clear_persistence();
+
+  Option<Error> error = operation::validate(shrinkVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `ShrinkVolume.subtract` has a
+// zero value.
+TEST_F(ShrinkVolumeOperationValidationTest, ZeroSubtract)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::ShrinkVolume shrinkVolume = createShrinkVolume();
+  shrinkVolume.mutable_subtract()->set_value(0);
+
+  Option<Error> error = operation::validate(shrinkVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `ShrinkVolume.subtract` has a
+// value equal to the size of `ShrinkVolume.volume`
+TEST_F(ShrinkVolumeOperationValidationTest, EmptyAfterShrink)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::ShrinkVolume shrinkVolume = createShrinkVolume();
+  shrinkVolume.mutable_subtract()->CopyFrom(shrinkVolume.volume().scalar());
+
+  Option<Error> error = operation::validate(shrinkVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `ShrinkVolume.volume` is a
+// MOUNT disk.
+TEST_F(ShrinkVolumeOperationValidationTest, Mount)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Resource mountVolume = createPersistentVolume(
+       Megabytes(128),
+        "role1",
+        "id1",
+        "path1",
+        None(),
+        createDiskSourceMount());
+
+  Offer::Operation::ShrinkVolume shrinkVolume = createShrinkVolume();
+  shrinkVolume.mutable_volume()->CopyFrom(mountVolume);
+
+  Option<Error> error = operation::validate(shrinkVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `ShrinkVolume.volume` is a
+// shared volume.
+TEST_F(ShrinkVolumeOperationValidationTest, Shared)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::ShrinkVolume shrinkVolume = createShrinkVolume();
+  shrinkVolume.mutable_volume()->mutable_shared();
+
+  Option<Error> error = operation::validate(shrinkVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if `ShrinkVolume.volume` has
+// resource provider id.
+TEST_F(ShrinkVolumeOperationValidationTest, ResourceProvider)
+{
+  protobuf::slave::Capabilities capabilities;
+  capabilities.resizeVolume = true;
+
+  Offer::Operation::ShrinkVolume shrinkVolume = createShrinkVolume();
+  shrinkVolume.mutable_volume()->mutable_provider_id()->set_value("provider");
+
+  Option<Error> error = operation::validate(shrinkVolume, capabilities);
+  EXPECT_SOME(error);
+}
+
+
+// This test verifies that validation fails if agent has no RESIZE_VOLUME
+// capability.
+TEST_F(ShrinkVolumeOperationValidationTest, MissingCapability)
+{
+  protobuf::slave::Capabilities capabilities;
+
+  Option<Error> error = operation::validate(createShrinkVolume(), capabilities);
+  EXPECT_SOME(error);
+}
+
+
 TEST(OperationValidationTest, CreateVolume)
 {
   Resource disk1 = createDiskResource(


[08/13] mesos git commit: Added new authorization for `ResizeVolume`.

Posted by ch...@apache.org.
Added new authorization for `ResizeVolume`.

The new authorization action is modelled after `CreateVolume`, and will
be shared by both `GROW_VOLUME` and `SHRINK_VOLUME` operations and
corresponding operator APIs.

Review: https://reviews.apache.org/r/66531/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9656329c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9656329c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9656329c

Branch: refs/heads/master
Commit: 9656329cb313316e7e82ba5a73fae6ca0997e3af
Parents: 274f2e6
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 17:04:50 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:04:50 2018 -0700

----------------------------------------------------------------------
 include/mesos/authorizer/acls.proto       |  10 +++
 include/mesos/authorizer/authorizer.proto |   3 +
 src/authorizer/local/authorizer.cpp       |   9 +++
 src/master/master.cpp                     | 103 ++++++++++++++++++++++++-
 src/master/master.hpp                     |  23 ++++++
 5 files changed, 144 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9656329c/include/mesos/authorizer/acls.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/acls.proto b/include/mesos/authorizer/acls.proto
index 8ef3321..e488993 100644
--- a/include/mesos/authorizer/acls.proto
+++ b/include/mesos/authorizer/acls.proto
@@ -114,6 +114,15 @@ message ACL {
     required Entity creator_principals = 2;
   }
 
+  // Specifies which roles a principal can resize volumes for.
+  message ResizeVolume {
+    // Subjects: Framework principal or Operator username.
+    required Entity principals = 1;
+
+    // Objects: The principal(s) can resize volumes for these roles.
+    required Entity roles = 2;
+  }
+
   // Which principals are authorized to see quotas for the given roles.
   message GetQuota {
     // Subjects: Operator username.
@@ -589,4 +598,5 @@ message ACLs {
   repeated ACL.ViewStandaloneContainer view_standalone_containers = 46;
   repeated ACL.ModifyResourceProviderConfig modify_resource_provider_configs = 45;
   repeated ACL.PruneImages prune_images = 47;
+  repeated ACL.ResizeVolume resize_volumes = 48;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9656329c/include/mesos/authorizer/authorizer.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.proto b/include/mesos/authorizer/authorizer.proto
index 1508c01..bb1010d 100644
--- a/include/mesos/authorizer/authorizer.proto
+++ b/include/mesos/authorizer/authorizer.proto
@@ -254,6 +254,9 @@ enum Action {
   // This action will not fill in any object fields. A principal is either
   // allowed to prune unused container images or is unauthorized.
   PRUNE_IMAGES = 41;
+
+  // `RESIZE_VOLUME` will have an object with `Resource` set.
+  RESIZE_VOLUME = 42;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9656329c/src/authorizer/local/authorizer.cpp
----------------------------------------------------------------------
diff --git a/src/authorizer/local/authorizer.cpp b/src/authorizer/local/authorizer.cpp
index c098ba9..61e9ab5 100644
--- a/src/authorizer/local/authorizer.cpp
+++ b/src/authorizer/local/authorizer.cpp
@@ -417,6 +417,7 @@ public:
 
           break;
         case authorization::CREATE_VOLUME:
+        case authorization::RESIZE_VOLUME:
         case authorization::GET_QUOTA:
         case authorization::RESERVE_RESOURCES:
         case authorization::UPDATE_QUOTA:
@@ -594,6 +595,7 @@ public:
     } else {
       switch (action_) {
         case authorization::CREATE_VOLUME:
+        case authorization::RESIZE_VOLUME:
         case authorization::RESERVE_RESOURCES: {
           entityObject.set_type(ACL::Entity::SOME);
           if (object->resource) {
@@ -875,6 +877,11 @@ public:
             createHierarchicalRoleACLs(acls.create_volumes());
         break;
       }
+      case authorization::RESIZE_VOLUME: {
+        hierarchicalRoleACLs =
+            createHierarchicalRoleACLs(acls.resize_volumes());
+        break;
+      }
       case authorization::RESERVE_RESOURCES: {
         hierarchicalRoleACLs =
             createHierarchicalRoleACLs(acls.reserve_resources());
@@ -1113,6 +1120,7 @@ public:
         return getNestedContainerObjectApprover(subject, action);
       }
       case authorization::CREATE_VOLUME:
+      case authorization::RESIZE_VOLUME:
       case authorization::RESERVE_RESOURCES:
       case authorization::UPDATE_WEIGHT:
       case authorization::VIEW_ROLE:
@@ -1520,6 +1528,7 @@ private:
         return acls_;
       case authorization::REGISTER_FRAMEWORK:
       case authorization::CREATE_VOLUME:
+      case authorization::RESIZE_VOLUME:
       case authorization::RESERVE_RESOURCES:
       case authorization::UPDATE_WEIGHT:
       case authorization::VIEW_ROLE:

http://git-wip-us.apache.org/repos/asf/mesos/blob/9656329c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b9946b5..c117b8c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3801,6 +3801,43 @@ Future<bool> Master::authorizeDestroyVolume(
 }
 
 
+Future<bool> Master::authorizeResizeVolume(
+    const Resource& volume,
+    const Option<Principal>& principal)
+{
+  if (authorizer.isNone()) {
+    return true; // Authorization is disabled.
+  }
+
+  authorization::Request request;
+  request.set_action(authorization::RESIZE_VOLUME);
+
+  Option<authorization::Subject> subject = createSubject(principal);
+  if (subject.isSome()) {
+    request.mutable_subject()->CopyFrom(subject.get());
+  }
+
+  request.mutable_object()->mutable_resource()->CopyFrom(volume);
+
+  string role;
+  if (volume.reservations_size() > 0) {
+    // Check for role in the "post-reservation-refinement" format.
+    role = volume.reservations().rbegin()->role();
+  } else {
+    // Check for role in the "pre-reservation-refinement" format.
+    role = volume.role();
+  }
+
+  request.mutable_object()->set_value(role);
+
+  LOG(INFO) << "Authorizing principal '"
+            << (principal.isSome() ? stringify(principal.get()) : "ANY")
+            << "' to resize volume '" << volume << "'";
+
+  return authorizer.get()->authorized(request);
+}
+
+
 Future<bool> Master::authorizeSlave(
     const SlaveInfo& slaveInfo,
     const Option<Principal>& principal)
@@ -4410,12 +4447,26 @@ void Master::accept(
       }
 
       case Offer::Operation::GROW_VOLUME: {
-        // TODO(zhitao): Add support for authorization of grow volume.
+        Option<Principal> principal = framework->info.has_principal()
+          ? Principal(framework->info.principal())
+          : Option<Principal>::none();
+
+        futures.push_back(
+            authorizeResizeVolume(
+                operation.grow_volume().volume(), principal));
+
         break;
       }
 
       case Offer::Operation::SHRINK_VOLUME: {
-        // TODO(zhitao): Add support for authorization of shrink volume.
+        Option<Principal> principal = framework->info.has_principal()
+          ? Principal(framework->info.principal())
+          : Option<Principal>::none();
+
+        futures.push_back(
+            authorizeResizeVolume(
+                operation.shrink_volume().volume(), principal));
+
         break;
       }
 
@@ -4907,7 +4958,29 @@ void Master::_accept(
       }
 
       case Offer::Operation::GROW_VOLUME: {
-        // TODO(zhitao): Authorize GROW_VOLUME from `authorizations`.
+        Future<bool> authorization = authorizations.front();
+        authorizations.pop_front();
+
+        CHECK(!authorization.isDiscarded());
+
+        if (authorization.isFailed()) {
+          // TODO(greggomann): We may want to retry this failed authorization
+          // request rather than dropping it immediately.
+          drop(framework,
+               operation,
+               "Authorization of principal '" + framework->info.principal() +
+               "' to grow a volume failed: " +
+               authorization.failure());
+
+          continue;
+        } else if (!authorization.get()) {
+          drop(framework,
+               operation,
+               "Not authorized to grow a volume as '" +
+                 framework->info.principal() + "'");
+
+          continue;
+        }
 
         // Make sure this grow volume operation is valid.
         Option<Error> error = validation::operation::validate(
@@ -4967,7 +5040,29 @@ void Master::_accept(
       }
 
       case Offer::Operation::SHRINK_VOLUME: {
-        // TODO(zhitao): Authorize SHRINK_VOLUME from `authorizations`.
+        Future<bool> authorization = authorizations.front();
+        authorizations.pop_front();
+
+        CHECK(!authorization.isDiscarded());
+
+        if (authorization.isFailed()) {
+          // TODO(greggomann): We may want to retry this failed authorization
+          // request rather than dropping it immediately.
+          drop(framework,
+               operation,
+               "Authorization of principal '" + framework->info.principal() +
+               "' to shrink a volume failed: " +
+               authorization.failure());
+
+          continue;
+        } else if (!authorization.get()) {
+          drop(framework,
+               operation,
+               "Not authorized to shrink a volume as '" +
+                 framework->info.principal() + "'");
+
+          continue;
+        }
 
         // Make sure this shrink volume operation is valid.
         Option<Error> error = validation::operation::validate(

http://git-wip-us.apache.org/repos/asf/mesos/blob/9656329c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c30cf08..270f60a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -873,6 +873,29 @@ protected:
       const Offer::Operation::Destroy& destroy,
       const Option<process::http::authentication::Principal>& principal);
 
+  /**
+   * Authorizes resize of a volume triggered by either `GROW_VOLUME` or
+   * `SHRINK_VOLUME` operations.
+   *
+   * Returns whether the triggering operation is authorized with the provided
+   * principal. This function is used for authorization of operations
+   * originating both from frameworks and operators. Note that operations may be
+   * validated AFTER authorization, so it's possible that the operation could be
+   * malformed.
+   *
+   * @param volume The volume being resized.
+   * @param principal An `Option` containing the principal attempting this
+   *     operation.
+   *
+   * @return A `Future` containing a boolean value representing the success or
+   *     failure of this authorization. A failed `Future` implies that
+   *     validation of the operation did not succeed.
+   */
+  process::Future<bool> authorizeResizeVolume(
+      const Resource& volume,
+      const Option<process::http::authentication::Principal>& principal);
+
+
   // Determine if a new executor needs to be launched.
   bool isLaunchExecutor (
       const ExecutorID& executorId,


[13/13] mesos git commit: Improved tests for resizing persistent volumes.

Posted by ch...@apache.org.
Improved tests for resizing persistent volumes.

Now the `GrowVolume` and `ShrinkVolume` tests launch tasks after
resizing the volumes to ensure that the operations take effect on
agents. The `NonSpeculativeGrowAndLaunch` and
`NonSpeculativeShrinkAndLaunch` tests launch an additional task to
verify that the original volume consumed by the operations cannot be
used by subsequent tasks.

This patch also adjusted when the clock is resumed so schedulers will
not receive unexpected offers.

Review: https://reviews.apache.org/r/66920/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a483fb0d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a483fb0d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a483fb0d

Branch: refs/heads/master
Commit: a483fb0d0aa07ce061faa17336c453fa8d61c89c
Parents: f8d28f4
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu May 3 17:05:13 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 3 17:05:13 2018 -0700

----------------------------------------------------------------------
 src/tests/persistent_volume_tests.cpp | 213 ++++++++++++++++++++++-------
 1 file changed, 166 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a483fb0d/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 477e6e2..43f31b2 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -37,6 +37,8 @@
 
 #include <stout/os/exists.hpp>
 
+#include "common/resources_utils.hpp"
+
 #ifdef __linux__
 #include "linux/fs.hpp"
 #endif // __linux__
@@ -448,8 +450,8 @@ TEST_P(PersistentVolumeTest, CreateAndDestroyPersistentVolumes)
 }
 
 
-// This test verifies that a framework can grow a persistent volume and receive
-// the grown volume in further offers.
+// This test verifies that a framework can grow a persistent volume and use the
+// grown volume afterward.
 TEST_P(PersistentVolumeTest, GrowVolume)
 {
   if (GetParam() == MOUNT) {
@@ -463,8 +465,9 @@ TEST_P(PersistentVolumeTest, GrowVolume)
 
   Clock::pause();
 
+  // Register a framework with role "default-role/foo" for dynamic reservations.
   FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+  frameworkInfo.set_roles(0, strings::join("/", DEFAULT_TEST_ROLE, "foo"));
 
   // Create a master.
   master::Flags masterFlags = CreateMasterFlags();
@@ -503,6 +506,16 @@ TEST_P(PersistentVolumeTest, GrowVolume)
 
   Bytes additionBytes = Megabytes(512);
 
+  // Construct a dynamic reservation for all disk resources.
+  // NOTE: We dynamically reserve all disk resources so they become checkpointed
+  // resources and thus will be verified on the agent when launching a task.
+  Resource::ReservationInfo dynamicReservation = createDynamicReservationInfo(
+      frameworkInfo.roles(0),
+      frameworkInfo.principal());
+
+  Resource dynamicallyReserved = getDiskResource(totalBytes, 1);
+  dynamicallyReserved.add_reservations()->CopyFrom(dynamicReservation);
+
   // Construct a persistent volume which does not use up all disk resources.
   Resource volume = createPersistentVolume(
       getDiskResource(totalBytes - additionBytes, 1),
@@ -510,8 +523,12 @@ TEST_P(PersistentVolumeTest, GrowVolume)
       "path1",
       None(),
       frameworkInfo.principal());
+  volume.add_reservations()->CopyFrom(dynamicReservation);
+  ASSERT_TRUE(needCheckpointing(volume));
 
   Resource addition = getDiskResource(additionBytes, 1);
+  addition.add_reservations()->CopyFrom(dynamicReservation);
+  ASSERT_TRUE(needCheckpointing(addition));
 
   Resource grownVolume = createPersistentVolume(
       getDiskResource(totalBytes, 1),
@@ -519,6 +536,8 @@ TEST_P(PersistentVolumeTest, GrowVolume)
       "path1",
       None(),
       frameworkInfo.principal());
+  grownVolume.add_reservations()->CopyFrom(dynamicReservation);
+  ASSERT_TRUE(needCheckpointing(grownVolume));
 
   Future<vector<Offer>> offersBeforeGrow;
 
@@ -533,7 +552,7 @@ TEST_P(PersistentVolumeTest, GrowVolume)
   // Create the persistent volume.
   driver.acceptOffers(
       {offer.id()},
-      {CREATE(volume)},
+      {RESERVE(dynamicallyReserved), CREATE(volume)},
       filters);
 
   Clock::settle();
@@ -577,9 +596,6 @@ TEST_P(PersistentVolumeTest, GrowVolume)
   AWAIT_READY(offersAfterGrow);
   ASSERT_FALSE(offersAfterGrow->empty());
 
-  EXPECT_TRUE(os::exists(volumePath));
-  EXPECT_SOME_EQ("abc", os::read(filePath));
-
   offer = offersAfterGrow->at(0);
 
   EXPECT_EQ(
@@ -590,21 +606,52 @@ TEST_P(PersistentVolumeTest, GrowVolume)
       Resources(offer.resources()).contains(
       allocatedResources(addition, frameworkInfo.roles(0))));
 
-  Clock::resume();
+  Future<TaskStatus> taskStarting;
+  Future<TaskStatus> taskRunning;
+  Future<TaskStatus> taskFinished;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&taskStarting))
+    .WillOnce(FutureArg<1>(&taskRunning))
+    .WillOnce(FutureArg<1>(&taskFinished));
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      offer.resources(),
+      "test `cat path1/file` = abc");
+
+  // Launch a task to verify that `GROW_VOLUME` takes effect on the agent and
+  // the task can use the grown volume.
+  driver.acceptOffers({offer.id()}, {LAUNCH({task})}, filters);
+
+  AWAIT_READY(taskStarting);
+  EXPECT_EQ(task.task_id(), taskStarting->task_id());
+  EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+  AWAIT_READY(taskRunning);
+  EXPECT_EQ(task.task_id(), taskRunning->task_id());
+  EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+  AWAIT_READY(taskFinished);
+  EXPECT_EQ(task.task_id(), taskFinished->task_id());
+  EXPECT_EQ(TASK_FINISHED, taskFinished->state());
 
   driver.stop();
   driver.join();
+
+  Clock::resume();
 }
 
 
-// This test verifies that a framework can shrink a persistent volume and see
-// the shrunk volume in further offers.
+// This test verifies that a framework can shrink a persistent volume and use
+// the shrunk volume afterward.
 TEST_P(PersistentVolumeTest, ShrinkVolume)
 {
   Clock::pause();
 
+  // Register a framework with role "default-role/foo" for dynamic reservations.
   FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+  frameworkInfo.set_roles(0, strings::join("/", DEFAULT_TEST_ROLE, "foo"));
 
   // Create a master.
   master::Flags masterFlags = CreateMasterFlags();
@@ -641,7 +688,17 @@ TEST_P(PersistentVolumeTest, ShrinkVolume)
   // The disk spaces will be merged if the fixture parameter is `NONE`.
   Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048);
 
-  Bytes shrinkBytes = Megabytes(512);
+  Bytes subtractBytes = Megabytes(512);
+
+  // Construct a dynamic reservation for all disk resources.
+  // NOTE: We dynamically reserve all disk resources so they become checkpointed
+  // resources and thus will be verified on the agent when launching a task.
+  Resource::ReservationInfo dynamicReservation = createDynamicReservationInfo(
+      frameworkInfo.roles(0),
+      frameworkInfo.principal());
+
+  Resource dynamicallyReserved = getDiskResource(totalBytes, 1);
+  dynamicallyReserved.add_reservations()->CopyFrom(dynamicReservation);
 
   // Construct a persistent volume which uses up all disk resources.
   Resource volume = createPersistentVolume(
@@ -650,15 +707,21 @@ TEST_P(PersistentVolumeTest, ShrinkVolume)
       "path1",
       None(),
       frameworkInfo.principal());
+  volume.add_reservations()->CopyFrom(dynamicReservation);
+  ASSERT_TRUE(needCheckpointing(volume));
 
-  Resource subtract = getDiskResource(shrinkBytes, 1);
+  Resource subtract = getDiskResource(subtractBytes, 1);
+  subtract.add_reservations()->CopyFrom(dynamicReservation);
+  ASSERT_TRUE(needCheckpointing(subtract));
 
   Resource shrunkVolume = createPersistentVolume(
-      getDiskResource(totalBytes - shrinkBytes, 1),
+      getDiskResource(totalBytes - subtractBytes, 1),
       "id1",
       "path1",
       None(),
       frameworkInfo.principal());
+  shrunkVolume.add_reservations()->CopyFrom(dynamicReservation);
+  ASSERT_TRUE(needCheckpointing(shrunkVolume));
 
   Future<vector<Offer>> offersBeforeShrink;
 
@@ -674,7 +737,7 @@ TEST_P(PersistentVolumeTest, ShrinkVolume)
   // Create the persistent volume.
   driver.acceptOffers(
       {offer.id()},
-      {CREATE(volume)},
+      {RESERVE(dynamicallyReserved), CREATE(volume)},
       filters);
 
   Clock::settle();
@@ -718,9 +781,6 @@ TEST_P(PersistentVolumeTest, ShrinkVolume)
   AWAIT_READY(offersAfterShrink);
   ASSERT_FALSE(offersAfterShrink->empty());
 
-  EXPECT_TRUE(os::exists(volumePath));
-  EXPECT_SOME_EQ("abc", os::read(filePath));
-
   offer = offersAfterShrink->at(0);
 
   if (GetParam() != MOUNT) {
@@ -741,15 +801,48 @@ TEST_P(PersistentVolumeTest, ShrinkVolume)
         allocatedResources(subtract, frameworkInfo.roles(0))));
   }
 
-  Clock::resume();
+  Future<TaskStatus> taskStarting;
+  Future<TaskStatus> taskRunning;
+  Future<TaskStatus> taskFinished;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&taskStarting))
+    .WillOnce(FutureArg<1>(&taskRunning))
+    .WillOnce(FutureArg<1>(&taskFinished));
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      offer.resources(),
+      "test `cat path1/file` = abc");
+
+  // Launch a task to verify that: if the fixture parameter is NONE or PATH,
+  // `SHRINK_VOLUME` takes effect on the agent and the task can use the shrunk
+  // volume as well as the freed disk resource; otherwise, `SHRINK_VOLUME`
+  // takes no effect on the agent.
+  driver.acceptOffers({offer.id()}, {LAUNCH({task})}, filters);
+
+  AWAIT_READY(taskStarting);
+  EXPECT_EQ(task.task_id(), taskStarting->task_id());
+  EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+  AWAIT_READY(taskRunning);
+  EXPECT_EQ(task.task_id(), taskRunning->task_id());
+  EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+  AWAIT_READY(taskFinished);
+  EXPECT_EQ(task.task_id(), taskFinished->task_id());
+  EXPECT_EQ(TASK_FINISHED, taskFinished->state());
 
   driver.stop();
   driver.join();
+
+  Clock::resume();
 }
 
 
-// This test verifies that a subsequent `LAUNCH` depending on a grown volume
-// will be dropped because we intend to keep `GROW_VOLUME` non-speculative.
+// This test verifies that any task to launch after a `GROW_VOLUME` in the same
+// `ACCEPT` call is dropped if the task consumes the original or grown volume,
+// because we intend to make `GROW_VOLUME` non-speculative.
 TEST_P(PersistentVolumeTest, NonSpeculativeGrowAndLaunch)
 {
   if (GetParam() == MOUNT) {
@@ -821,16 +914,23 @@ TEST_P(PersistentVolumeTest, NonSpeculativeGrowAndLaunch)
       None(),
       frameworkInfo.principal());
 
-  Future<TaskStatus> taskError;
+  Future<TaskStatus> taskError1;
+  Future<TaskStatus> taskError2;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&taskError));
+    .WillOnce(FutureArg<1>(&taskError1))
+    .WillOnce(FutureArg<1>(&taskError2));
 
-  TaskInfo task = createTask(
+  TaskInfo task1 = createTask(
       offer.slave_id(),
       Resources::parse("cpus:1;mem:128").get() + grownVolume,
       "echo abc > path1/file");
 
+  TaskInfo task2 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128").get() + volume,
+      "echo abc > path1/file");
+
   Future<vector<Offer>> offersAfterOperations;
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -841,16 +941,21 @@ TEST_P(PersistentVolumeTest, NonSpeculativeGrowAndLaunch)
   Filters filters;
   filters.set_refuse_seconds(0);
 
-  // Create and grow will succeed, but launch will be droppd with
-  // `TASK_ERROR`.
+  // The create and grow volume operations will succeed, but the tasks will be
+  // dropped with `TASK_ERROR`.
   driver.acceptOffers(
       {offer.id()},
-      {CREATE(volume), GROW_VOLUME(volume, addition), LAUNCH({task})},
+      {CREATE(volume), GROW_VOLUME(volume, addition), LAUNCH({task1, task2})},
       filters);
 
-  AWAIT_READY(taskError);
-  EXPECT_EQ(task.task_id(), taskError->task_id());
-  EXPECT_EQ(TASK_ERROR, taskError->state());
+  AWAIT_READY(taskError1);
+  AWAIT_READY(taskError2);
+
+  hashset<TaskID> expectedTasks = {task1.task_id(), task2.task_id()};
+  hashset<TaskID> actualTasks = {taskError1->task_id(), taskError2->task_id()};
+  EXPECT_EQ(expectedTasks, actualTasks);
+  EXPECT_EQ(TASK_ERROR, taskError1->state());
+  EXPECT_EQ(TASK_ERROR, taskError2->state());
 
   Clock::settle();
   Clock::advance(masterFlags.allocation_interval);
@@ -864,15 +969,16 @@ TEST_P(PersistentVolumeTest, NonSpeculativeGrowAndLaunch)
       allocatedResources(Resources(grownVolume), frameworkInfo.roles(0)),
       Resources(offer.resources()).persistentVolumes());
 
-  Clock::resume();
-
   driver.stop();
   driver.join();
+
+  Clock::resume();
 }
 
 
-// This test verifies that a subsequent `LAUNCH` depends on a shrunk volume
-// will be dropped because we intend to keep `SHRINK_VOLUME` non-speculative.
+// This test verifies that any task to launch after a `SHRINK_VOLUME` in the
+// same `ACCEPT` call is dropped if the task consumes the original or shrunk
+// volume, because we intend to make `SHRINK_VOLUME` non-speculative.
 TEST_P(PersistentVolumeTest, NonSpeculativeShrinkAndLaunch)
 {
   if (GetParam() == MOUNT) {
@@ -937,15 +1043,23 @@ TEST_P(PersistentVolumeTest, NonSpeculativeShrinkAndLaunch)
       None(),
       frameworkInfo.principal());
 
-  Future<TaskStatus> taskError;
+  Future<TaskStatus> taskError1;
+  Future<TaskStatus> taskError2;
+
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&taskError));
+    .WillOnce(FutureArg<1>(&taskError1))
+    .WillOnce(FutureArg<1>(&taskError2));
 
-  TaskInfo task = createTask(
+  TaskInfo task1 = createTask(
       offer.slave_id(),
       Resources::parse("cpus:1;mem:128").get() + shrunkVolume,
       "echo abc > path1/file");
 
+  TaskInfo task2 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128").get() + volume,
+      "echo abc > path1/file");
+
   Future<vector<Offer>> offersAfterOperations;
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -956,16 +1070,21 @@ TEST_P(PersistentVolumeTest, NonSpeculativeShrinkAndLaunch)
   Filters filters;
   filters.set_refuse_seconds(0);
 
-  // Create and shrink volume will succeed, but launch will be dropped with
-  // `TASK_ERROR`.
+  // The create and shrink volume operations will succeed, but the tasks will be
+  // dropped with `TASK_ERROR`.
   driver.acceptOffers(
       {offer.id()},
-      {CREATE(volume), SHRINK_VOLUME(volume, subtract), LAUNCH({task})},
+      {CREATE(volume), SHRINK_VOLUME(volume, subtract), LAUNCH({task1, task2})},
       filters);
 
-  AWAIT_READY(taskError);
-  EXPECT_EQ(task.task_id(), taskError->task_id());
-  EXPECT_EQ(TASK_ERROR, taskError->state());
+  AWAIT_READY(taskError1);
+  AWAIT_READY(taskError2);
+
+  hashset<TaskID> expectedTasks = {task1.task_id(), task2.task_id()};
+  hashset<TaskID> actualTasks = {taskError1->task_id(), taskError2->task_id()};
+  EXPECT_EQ(expectedTasks, actualTasks);
+  EXPECT_EQ(TASK_ERROR, taskError1->state());
+  EXPECT_EQ(TASK_ERROR, taskError2->state());
 
   Clock::settle();
   Clock::advance(masterFlags.allocation_interval);
@@ -978,10 +1097,10 @@ TEST_P(PersistentVolumeTest, NonSpeculativeShrinkAndLaunch)
       allocatedResources(Resources(shrunkVolume), frameworkInfo.roles(0)),
       Resources(offer.resources()).persistentVolumes());
 
-  Clock::resume();
-
   driver.stop();
   driver.join();
+
+  Clock::resume();
 }
 
 
@@ -1124,10 +1243,10 @@ TEST_P(PersistentVolumeTest, GoodACLGrowThenShrink)
       Resources(offer.resources()).contains(
       allocatedResources(difference, frameworkInfo.roles(0))));
 
-  Clock::resume();
-
   driver.stop();
   driver.join();
+
+  Clock::resume();
 }
 
 // This test verifies that grow and shrink operations get dropped if