You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/02/03 22:35:50 UTC

[1/2] mesos git commit: Fix handling of total resources in role and quote role sorters.

Repository: mesos
Updated Branches:
  refs/heads/master eec6f32ad -> 3a537a1d1


Fix handling of total resources in role and quote role sorters.

Currently the total resources in role and quota role sorters are tied to
the agent's total resources (which we don't update during allocation
and recovery of resources). To be consistent with this we shouldn't use
the framework's allocation (which could contain additional allocations
for shared resources) to update them in `updateAllocation()`. We
instead use the agent's total resources to update the two sorters'.

Review: https://reviews.apache.org/r/53096/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/24b50202
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/24b50202
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/24b50202

Branch: refs/heads/master
Commit: 24b502021bd29f37f7d6cc196e47eada4eb19e93
Parents: eec6f32
Author: Anindya Sinha <an...@apple.com>
Authored: Fri Feb 3 14:05:16 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Fri Feb 3 14:26:40 2017 -0800

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp |  76 ++---
 src/master/allocator/mesos/hierarchical.hpp |   6 +
 src/tests/persistent_volume_tests.cpp       | 344 +++++++++++++++++++++++
 3 files changed, 391 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/24b50202/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 5f54056..df8db93 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -719,20 +719,14 @@ void HierarchicalAllocatorProcess::updateAllocation(
     // resources in the allocator and each of the sorters. The available
     // resource quantities remain unchanged.
 
-    // Update the per-slave allocation.
-    Try<Resources> updatedSlaveAllocation = slave.allocated.apply(operation);
-
-    CHECK_SOME(updatedSlaveAllocation);
-
-    slave.allocated = updatedSlaveAllocation.get();
-
-    // Update the total resources.
+    // Update the total resources in the allocator and the role and
+    // quota sorters.
     Try<Resources> updatedTotal = slave.total.apply(operation);
     CHECK_SOME(updatedTotal);
 
-    slave.total = updatedTotal.get();
+    updateSlaveTotal(slaveId, updatedTotal.get());
 
-    // Update the total and allocated resources in each sorter.
+    // Update the total resources in the framework sorter.
     Resources frameworkAllocation =
       frameworkSorter->allocation(frameworkId.value(), slaveId);
 
@@ -741,35 +735,31 @@ void HierarchicalAllocatorProcess::updateAllocation(
 
     CHECK_SOME(updatedFrameworkAllocation);
 
-    // Update the total and allocated resources in the framework sorter
-    // for the current role.
     frameworkSorter->remove(slaveId, frameworkAllocation);
     frameworkSorter->add(slaveId, updatedFrameworkAllocation.get());
 
+    // Update the per-slave allocation.
+    Try<Resources> updatedSlaveAllocation = slave.allocated.apply(operation);
+    CHECK_SOME(updatedSlaveAllocation);
+
+    slave.allocated = updatedSlaveAllocation.get();
+
+    // Update the allocation in the framework sorter.
     frameworkSorter->update(
         frameworkId.value(),
         slaveId,
         frameworkAllocation,
         updatedFrameworkAllocation.get());
 
-    // Update the total and allocated resources in the role sorter.
-    roleSorter->remove(slaveId, frameworkAllocation);
-    roleSorter->add(slaveId, updatedFrameworkAllocation.get());
-
+    // Update the allocation in the role sorter.
     roleSorter->update(
         framework.role,
         slaveId,
         frameworkAllocation,
         updatedFrameworkAllocation.get());
 
-    // Update the total and allocated resources in the quota role
-    // sorter. Note that we always update the quota role sorter's total
-    // resources; we only update its allocated resources if this role
-    // has quota set.
-    quotaRoleSorter->remove(slaveId, frameworkAllocation.nonRevocable());
-    quotaRoleSorter->add(
-        slaveId, updatedFrameworkAllocation.get().nonRevocable());
-
+    // Update the allocated resources in the quota sorter. We only update
+    // the allocated resources if this role has quota set.
     if (quotas.contains(framework.role)) {
       // See comment at `quotaRoleSorter` declaration regarding non-revocable.
       quotaRoleSorter->update(
@@ -818,17 +808,8 @@ Future<Nothing> HierarchicalAllocatorProcess::updateAvailable(
   Try<Resources> updatedTotal = slave.total.apply(operations);
   CHECK_SOME(updatedTotal);
 
-  const Resources oldTotal = slave.total;
-  slave.total = updatedTotal.get();
-
-  // Now, update the total resources in the role sorters by removing
-  // the previous resources at this slave and adding the new resources.
-  roleSorter->remove(slaveId, oldTotal);
-  roleSorter->add(slaveId, updatedTotal.get());
-
-  // See comment at `quotaRoleSorter` declaration regarding non-revocable.
-  quotaRoleSorter->remove(slaveId, oldTotal.nonRevocable());
-  quotaRoleSorter->add(slaveId, updatedTotal.get().nonRevocable());
+  // Update the total resources in the allocator and role and quota sorters.
+  updateSlaveTotal(slaveId, updatedTotal.get());
 
   return Nothing();
 }
@@ -2055,6 +2036,31 @@ double HierarchicalAllocatorProcess::_offer_filters_active(
   return result;
 }
 
+
+void HierarchicalAllocatorProcess::updateSlaveTotal(
+    const SlaveID& slaveId,
+    const Resources& total)
+{
+  CHECK(slaves.contains(slaveId));
+
+  Slave& slave = slaves.at(slaveId);
+
+  const Resources oldTotal = slave.total;
+  slave.total = total;
+
+  // Currently `roleSorter` and `quotaRoleSorter`, being the root-level
+  // sorters, maintain all of `slaves[slaveId].total` (or the `nonRevocable()`
+  // portion in the case of `quotaRoleSorter`) in their own totals (which
+  // don't get updated in the allocation runs or during recovery of allocated
+  // resources). So, we update them using the resources in `slave.total`.
+  roleSorter->remove(slaveId, oldTotal);
+  roleSorter->add(slaveId, total);
+
+  // See comment at `quotaRoleSorter` declaration regarding non-revocable.
+  quotaRoleSorter->remove(slaveId, oldTotal.nonRevocable());
+  quotaRoleSorter->add(slaveId, total.nonRevocable());
+}
+
 } // namespace internal {
 } // namespace allocator {
 } // namespace master {

http://git-wip-us.apache.org/repos/asf/mesos/blob/24b50202/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index a99ed35..331b85b 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -493,6 +493,12 @@ protected:
 
   // Factory function for framework sorters.
   const std::function<Sorter*()> frameworkSorterFactory;
+
+private:
+  // Helper to update the agent's total resources maintained in the allocator
+  // and the role and quota sorters (whose total resources match the agent's
+  // total resources).
+  void updateSlaveTotal(const SlaveID& slaveId, const Resources& total);
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/24b50202/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 468a85b..842e113 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -1011,6 +1011,7 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
   driver.join();
 }
 
+
 // This test verifies that pending offers with shared persistent volumes
 // are rescinded when the volumes are destroyed.
 TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy)
@@ -1289,6 +1290,349 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
 }
 
 
+// This test verifies that DESTROY of non-shared persistent volume succeeds
+// when a shared persistent volume is in use. This is to catch any
+// regression to MESOS-6444.
+TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
+{
+  // Manipulate the clock manually in order to
+  // control the timing of the offer cycle.
+  Clock::pause();
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::SHARED_RESOURCES);
+
+  // Create a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Create a slave. Resources are being statically reserved because persistent
+  // volume creation requires reserved resources.
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Create a scheduler/framework.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Expect an offer from the slave.
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  Resource sharedVolume = createPersistentVolume(
+      getDiskResource(Megabytes(2048), 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal(),
+      true); // Shared.
+
+  Resource nonSharedVolume = createPersistentVolume(
+      getDiskResource(Megabytes(2048), 2),
+      "id2",
+      "path2",
+      None(),
+      frameworkInfo.principal());
+
+  // Create a long-lived task using a shared volume.
+  Resources taskResources1 = Resources::parse(
+      "cpus:1;mem:128").get() + sharedVolume;
+
+  TaskInfo task1 = createTask(
+      offer.slave_id(),
+      taskResources1,
+      "sleep 1000");
+
+  // Create a short-lived task using a non-shared volume.
+  Resources taskResources2 = Resources::parse(
+      "cpus:1;mem:256").get() + nonSharedVolume;
+
+  TaskInfo task2 = createTask(
+      offer.slave_id(),
+      taskResources2,
+      "exit 0");
+
+  const hashset<TaskID> tasks{task1.task_id(), task2.task_id()};
+
+  // Expect an offer containing the persistent volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // We should receive a TASK_RUNNING each of the 2 tasks. We track task
+  // termination by a TASK_FINISHED for the short-lived task.
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+  Future<TaskStatus> status3;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2))
+    .WillOnce(FutureArg<1>(&status3));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(sharedVolume),
+       CREATE(nonSharedVolume),
+       LAUNCH({task1, task2})},
+      filters);
+
+  // Wait for TASK_RUNNING for both the tasks, and TASK_FINISHED for
+  // the short-lived task.
+  AWAIT_READY(status1);
+  AWAIT_READY(status2);
+  AWAIT_READY(status3);
+
+  hashset<TaskID> tasksRunning;
+  hashset<TaskID> tasksFinished;
+  vector<Future<TaskStatus>> statuses{status1, status2, status3};
+
+  foreach (const Future<TaskStatus>& status, statuses) {
+    if (status.get().state() == TASK_RUNNING) {
+      tasksRunning.insert(status.get().task_id());
+    } else {
+      tasksFinished.insert(status.get().task_id());
+    }
+  }
+
+  ASSERT_EQ(tasks, tasksRunning);
+  EXPECT_EQ(1u, tasksFinished.size());
+  EXPECT_EQ(task2.task_id(), *(tasksFinished.begin()));
+
+  // Advance the clock to generate an offer.
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  // Await the offer containing the persistent volume.
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  offer = offers.get()[0];
+
+  // Check that the persistent volumes are offered back. The shared volume
+  // is offered since it can be used in multiple tasks; the non-shared
+  // volume is offered since there are no tasks using it.
+  EXPECT_TRUE(Resources(offer.resources()).contains(sharedVolume));
+  EXPECT_TRUE(Resources(offer.resources()).contains(nonSharedVolume));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // Destroy the non-shared persistent volume since no task is using it.
+  driver.acceptOffers(
+      {offer.id()},
+      {DESTROY(nonSharedVolume)},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  offer = offers.get()[0];
+
+  // Check that the shared persistent volume is in the offer, but the
+  // non-shared volume is not in the offer.
+  EXPECT_TRUE(Resources(offer.resources()).contains(sharedVolume));
+  EXPECT_FALSE(Resources(offer.resources()).contains(nonSharedVolume));
+
+  // We kill the long-lived task and wait for TASK_KILLED, so we can
+  // DESTROY the persistent volume once the task terminates.
+  Future<TaskStatus> status4;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status4));
+
+  driver.killTask(task1.task_id());
+
+  AWAIT_READY(status4);
+  EXPECT_EQ(task1.task_id(), status4.get().task_id());
+  EXPECT_EQ(TASK_KILLED, status4.get().state());
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // Destroy the shared persistent volume.
+  driver.acceptOffers(
+      {offer.id()},
+      {DESTROY(sharedVolume)},
+      filters);
+
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::resume();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  offer = offers.get()[0];
+
+  // Check that the persistent volumes are not in the offer.
+  EXPECT_FALSE(Resources(offer.resources()).contains(sharedVolume));
+  EXPECT_FALSE(Resources(offer.resources()).contains(nonSharedVolume));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that multiple iterations of CREATE and LAUNCH
+// for the same framework is successfully handled in different
+// ACCEPT calls.
+TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleIterations)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  slaveFlags.resources = getSlaveResources();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // 1. Create framework so that all resources are offered to this framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::SHARED_RESOURCES);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  // 2. The framework CREATEs the 1st shared volume, and LAUNCHes a task
+  //    which uses this shared volume.
+  Resource volume1 = createPersistentVolume(
+      getDiskResource(Megabytes(2048), 1),
+      "id1",
+      "path1",
+      None(),
+      frameworkInfo.principal(),
+      true); // Shared volume.
+
+  TaskInfo task1 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128").get() + volume1,
+      "sleep 1000");
+
+  // Expect an offer containing the persistent volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // We use a filter of 0 seconds so the resources will be available
+  // in the next allocation cycle.
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume1),
+       LAUNCH({task1})},
+      filters);
+
+  // Advance the clock to generate an offer from the recovered resources.
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+
+  offer = offers.get()[0];
+
+  EXPECT_TRUE(Resources(offer.resources()).contains(volume1));
+
+  // 3. The framework CREATEs the 2nd shared volume, and LAUNCHes a task
+  //    using this shared volume.
+  Resource volume2 = createPersistentVolume(
+      getDiskResource(Megabytes(2048), 2),
+      "id2",
+      "path2",
+      None(),
+      frameworkInfo.principal(),
+      true); // Shared volume.
+
+  TaskInfo task2 = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128").get() + volume2,
+      "sleep 1000");
+
+  // Expect an offer containing the persistent volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(volume2),
+       LAUNCH({task2})},
+      filters);
+
+  // Advance the clock to generate an offer from the recovered resources.
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+
+  offer = offers.get()[0];
+
+  EXPECT_TRUE(Resources(offer.resources()).contains(volume1));
+  EXPECT_TRUE(Resources(offer.resources()).contains(volume2));
+
+  driver.stop();
+  driver.join();
+
+  // Resume the clock so the terminating task and executor can be reaped.
+  Clock::resume();
+}
+
+
 // This test verifies that persistent volumes are recovered properly
 // after the slave restarts. The idea is to launch a command which
 // keeps testing if the persistent volume exists, and fails if it does


[2/2] mesos git commit: Updated the persistent volume test framework to include shared volumes.

Posted by ya...@apache.org.
Updated the persistent volume test framework to include shared volumes.

Review: https://reviews.apache.org/r/45962/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3a537a1d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3a537a1d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3a537a1d

Branch: refs/heads/master
Commit: 3a537a1d18e500fb1efe2f4d4f0718fb25c51848
Parents: 24b5020
Author: Anindya Sinha <an...@apple.com>
Authored: Fri Feb 3 14:29:54 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Fri Feb 3 14:34:33 2017 -0800

----------------------------------------------------------------------
 src/examples/persistent_volume_framework.cpp | 248 ++++++++++++++++------
 1 file changed, 185 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3a537a1d/src/examples/persistent_volume_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/persistent_volume_framework.cpp b/src/examples/persistent_volume_framework.cpp
index 9d45bb4..222018e 100644
--- a/src/examples/persistent_volume_framework.cpp
+++ b/src/examples/persistent_volume_framework.cpp
@@ -29,6 +29,7 @@
 #include <mesos/authorizer/acls.hpp>
 
 #include <stout/flags.hpp>
+#include <stout/hashset.hpp>
 #include <stout/json.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -64,7 +65,8 @@ static Resource SHARD_PERSISTENT_VOLUME(
     const string& role,
     const string& persistenceId,
     const string& containerPath,
-    const string& principal)
+    const string& principal,
+    bool isShared)
 {
   Volume volume;
   volume.set_container_path(containerPath);
@@ -78,6 +80,10 @@ static Resource SHARD_PERSISTENT_VOLUME(
   Resource resource = Resources::parse("disk", "8", role).get();
   resource.mutable_disk()->CopyFrom(info);
 
+  if (isShared) {
+    resource.mutable_shared();
+  }
+
   return resource;
 }
 
@@ -91,6 +97,15 @@ static Offer::Operation CREATE(const Resources& volumes)
 }
 
 
+static Offer::Operation DESTROY(const Resources& volumes)
+{
+  Offer::Operation operation;
+  operation.set_type(Offer::Operation::DESTROY);
+  operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
+  return operation;
+}
+
+
 static Offer::Operation LAUNCH(const vector<TaskInfo>& tasks)
 {
   Offer::Operation operation;
@@ -104,24 +119,39 @@ static Offer::Operation LAUNCH(const vector<TaskInfo>& tasks)
 }
 
 
-// The framework launches a task on each registered slave using a
-// persistent volume. It restarts the task once the previous one on
-// the slave finishes. The framework terminates once the number of
-// tasks launched on each slave reaches a limit.
+// The framework launches a task on each registered agent using a
+// (possibly shared) persistent volume. In the case of regular
+// persistent volumes, the next task is started once the previous
+// one terminates; and in the case of shared persistent volumes,
+// tasks can use the same shared volume simultaneously. The
+// framework terminates once the number of tasks launched on each
+// agent reaches a limit.
 class PersistentVolumeScheduler : public Scheduler
 {
 public:
   PersistentVolumeScheduler(
       const FrameworkInfo& _frameworkInfo,
       size_t numShards,
+      size_t numSharedShards,
       size_t tasksPerShard)
     : frameworkInfo(_frameworkInfo)
   {
+    // Initialize the shards using regular persistent volume.
     for (size_t i = 0; i < numShards; i++) {
       shards.push_back(Shard(
           "shard-" + stringify(i),
           frameworkInfo.role(),
-          tasksPerShard));
+          tasksPerShard,
+          false));
+    }
+
+    // Initialize the shards using shared persistent volume.
+    for (size_t i = 0; i < numSharedShards; i++) {
+      shards.push_back(Shard(
+          "shared-shard-" + stringify(i),
+          frameworkInfo.role(),
+          tasksPerShard,
+          true));
     }
   }
 
@@ -166,12 +196,15 @@ public:
       foreach (Shard& shard, shards) {
         switch (shard.state) {
           case Shard::INIT:
+            CHECK_EQ(0u, shard.launched);
+
             if (offered.contains(shard.resources)) {
               Resource volume = SHARD_PERSISTENT_VOLUME(
                   frameworkInfo.role(),
                   UUID::random().toString(),
                   "volume",
-                  frameworkInfo.principal());
+                  frameworkInfo.principal(),
+                  shard.volume.isShared);
 
               Try<Resources> resources = shard.resources.apply(CREATE(volume));
               CHECK_SOME(resources);
@@ -181,14 +214,17 @@ public:
               task.mutable_task_id()->set_value(UUID::random().toString());
               task.mutable_slave_id()->CopyFrom(offer.slave_id());
               task.mutable_resources()->CopyFrom(resources.get());
-              task.mutable_command()->set_value("touch volume/persisted");
+
+              // TODO(anindya_sinha): Add a flag to allow specifying a
+              // custom write command for the consumer task.
+              task.mutable_command()->set_value(
+                  "echo hello > volume/persisted");
 
               // Update the shard.
               shard.state = Shard::STAGING;
-              shard.taskId = task.task_id();
-              shard.volume.id = volume.disk().persistence().id();
-              shard.volume.slave = offer.slave_id().value();
+              shard.taskIds.insert(task.task_id());
               shard.resources = resources.get();
+              shard.volume.resource = volume;
               shard.launched++;
 
               operations.push_back(CREATE(volume));
@@ -201,31 +237,81 @@ public:
               CHECK_SOME(resources);
               offered = resources.get();
             }
+
             break;
-          case Shard::WAITING:
+
+          case Shard::STAGING:
+            CHECK_LE(shard.launched, shard.tasks);
+
+            if (shard.launched == shard.tasks) {
+              LOG(INFO) << "All tasks launched, but one or more yet to run";
+              break;
+            }
+
             if (offered.contains(shard.resources)) {
-              CHECK_EQ(shard.volume.slave, offer.slave_id().value());
+              // Set mode to RO for persistent volume resource since we are
+              // just reading from the persistent volume.
+              CHECK_SOME(shard.volume.resource);
+
+              Resource volume = shard.volume.resource.get();
+              Resources taskResources = shard.resources - volume;
+              volume.mutable_disk()->mutable_volume()->set_mode(Volume::RO);
+              taskResources += volume;
 
               TaskInfo task;
               task.set_name(shard.name);
               task.mutable_task_id()->set_value(UUID::random().toString());
               task.mutable_slave_id()->CopyFrom(offer.slave_id());
-              task.mutable_resources()->CopyFrom(shard.resources);
-              task.mutable_command()->set_value("test -f volume/persisted");
+              task.mutable_resources()->CopyFrom(taskResources);
+
+              // The read task tries to access the content written in the
+              // persistent volume for up to 15 seconds. This is to handle
+              // the scenario where the writer task may not have done the
+              // write to the volume even though it was launched earlier.
+              // TODO(anindya_sinha): Add a flag to allow specifying a
+              // custom read command for the consumer task.
+              task.mutable_command()->set_value(R"~(
+                  COUNTER=0
+                  while [ $COUNTER -lt 15 ]; do
+                    cat volume/persisted
+                    if [ $? -eq 0 ]; then
+                      exit 0
+                    fi
+                    COUNTER=$[COUNTER+1]
+                    sleep 1
+                  done
+                  exit 1
+                  )~");
 
               // Update the shard.
-              shard.state = Shard::STAGING;
-              shard.taskId = task.task_id();
+              shard.taskIds.insert(task.task_id());
               shard.launched++;
 
+              // Launch the next instance of this task with the already
+              // created volume.
               operations.push_back(LAUNCH({task}));
             }
+
             break;
-          case Shard::STAGING:
+
+          case Shard::TERMINATING:
+            CHECK_EQ(shard.terminated, shard.launched);
+            CHECK_SOME(shard.volume.resource);
+
+            // Send (or resend) DESTROY of the volume here.
+            if (offered.contains(shard.volume.resource.get())) {
+              operations.push_back(DESTROY(shard.volume.resource.get()));
+            } else {
+              shard.state = Shard::DONE;
+            }
+
+            break;
+
           case Shard::RUNNING:
           case Shard::DONE:
             // Ignore the offer.
             break;
+
           default:
             LOG(ERROR) << "Unexpected shard state: " << shard.state;
             driver->abort();
@@ -233,6 +319,19 @@ public:
         }
       }
 
+      // Check the terminal condition.
+      bool terminal = true;
+      foreach (const Shard& shard, shards) {
+        if (shard.state != Shard::DONE) {
+          terminal = false;
+          break;
+        }
+      }
+
+      if (terminal) {
+        driver->stop();
+      }
+
       driver->acceptOffers({offer.id()}, operations);
     }
   }
@@ -252,16 +351,22 @@ public:
               << status.state();
 
     foreach (Shard& shard, shards) {
-      if (shard.taskId == status.task_id()) {
+      if (shard.taskIds.contains(status.task_id())) {
         switch (status.state()) {
           case TASK_RUNNING:
-            shard.state = Shard::RUNNING;
+            CHECK(shard.launched <= shard.tasks);
+            if (shard.launched == shard.tasks &&
+                shard.state == Shard::STAGING) {
+              shard.state = Shard::RUNNING;
+            }
             break;
           case TASK_FINISHED:
-            if (shard.launched >= shard.tasks) {
-              shard.state = Shard::DONE;
-            } else {
-              shard.state = Shard::WAITING;
+            ++shard.terminated;
+
+            CHECK(shard.terminated <= shard.tasks);
+            if (shard.terminated == shard.tasks &&
+                shard.state == Shard::RUNNING) {
+              shard.state = Shard::TERMINATING;
             }
             break;
           case TASK_STAGING:
@@ -278,19 +383,6 @@ public:
         break;
       }
     }
-
-    // Check the terminal condition.
-    bool terminal = true;
-    foreach (const Shard& shard, shards) {
-      if (shard.state != Shard::DONE) {
-        terminal = false;
-        break;
-      }
-    }
-
-    if (terminal) {
-      driver->stop();
-    }
   }
 
   virtual void frameworkMessage(
@@ -330,45 +422,66 @@ public:
 private:
   struct Shard
   {
+    // States that the state machine for each shard goes through.
+    //
+    // The state machine per shard runs as follows:
+    // 1. The shard is STAGING when it launches a writer task that writes
+    //    to the persistent volume.
+    // 2. The shard launches one or more reader tasks that read the same file
+    //    from the persistent volume to confirm the write operation. Once all
+    //    tasks are launched, the shard is RUNNING.
+    // 3. When all tasks finish, the shard starts TERMINATING by destroying
+    //    the persistent volume.
+    // 4. Once the persistent volume is successfully destroyed, the shard
+    //    is DONE and terminates.
+    //
+    // In the case of regular persistent volumes, the tasks run in sequence;
+    // whereas in the case of shared persistent volumes, the tasks run in
+    // parallel.
     enum State
     {
-      INIT = 0,   // The shard hasn't been launched yet.
-      STAGING,    // The shard has been launched.
-      RUNNING,    // The shard is running.
-      WAITING,    // The shard is waiting to be re-launched.
-      DONE,       // The shard has finished all tasks.
-
-      // TODO(jieyu): Add another state so that we can track the
-      // destroy of the volume once all tasks finish.
+      INIT = 0,    // The shard is in initialized state.
+      STAGING,     // The shard is awaiting offers to launch more tasks.
+      RUNNING,     // The shard is running (i.e., all tasks have
+                   // successfully launched).
+      TERMINATING, // All tasks are finished and needs cleanup.
+      DONE,        // The shard has finished cleaning up.
     };
 
     // The persistent volume associated with this shard.
     struct Volume
     {
-      // The persistence ID.
-      string id;
-
-      // An identifier used to uniquely identify a slave (even across
-      // reboot). In the test, we use the slave ID since slaves will not
-      // be rebooted. Note that we cannot use hostname as the identifier
-      // in a local cluster because all slaves share the same hostname.
-      string slave;
+      explicit Volume(bool _isShared)
+        : isShared(_isShared) {}
+
+      // `Resource` object for this volume.
+      Option<Resource> resource;
+
+      // Flag to indicate if this is a regular or shared persistent volume.
+      bool isShared;
     };
 
-    Shard(const string& _name, const string& role, size_t _tasks)
+    Shard(
+        const string& _name,
+        const string& role,
+        size_t _tasks,
+        bool isShared)
       : name(_name),
         state(INIT),
+        volume(isShared),
         resources(SHARD_INITIAL_RESOURCES(role)),
         launched(0),
+        terminated(0),
         tasks(_tasks) {}
 
     string name;
-    State state;          // The current state of this shard.
-    TaskID taskId;        // The ID of the current task.
-    Volume volume;        // The persistent volume associated with the shard.
-    Resources resources;  // Resources required to launch the shard.
-    size_t launched;      // How many tasks this shard has launched.
-    size_t tasks;         // How many tasks this shard should launch.
+    State state;             // The current state of this shard.
+    hashset<TaskID> taskIds; // The IDs of the tasks running on this shard.
+    Volume volume;           // The persistent volume associated with the shard.
+    Resources resources;     // Resources required to launch the shard.
+    size_t launched;         // How many tasks this shard has launched.
+    size_t terminated;       // How many tasks this shard has terminated.
+    size_t tasks;            // How many tasks this shard should launch.
   };
 
   FrameworkInfo frameworkInfo;
@@ -401,8 +514,13 @@ public:
 
     add(&Flags::num_shards,
         "num_shards",
-        "The number of shards the framework will run.",
-        3);
+        "The number of shards the framework will run using regular volume.",
+        2);
+
+    add(&Flags::num_shared_shards,
+        "num_shared_shards",
+        "The number of shards the framework will run using shared volume.",
+        2);
 
     add(&Flags::tasks_per_shard,
         "tasks_per_shard",
@@ -414,6 +532,7 @@ public:
   string role;
   string principal;
   size_t num_shards;
+  size_t num_shared_shards;
   size_t tasks_per_shard;
 };
 
@@ -452,6 +571,8 @@ int main(int argc, char** argv)
   framework.set_role(flags.role);
   framework.set_checkpoint(true);
   framework.set_principal(flags.principal);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::SHARED_RESOURCES);
 
   if (flags.master.get() == "local") {
     // Configure master.
@@ -465,13 +586,14 @@ int main(int argc, char** argv)
 
     os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
 
-    // Configure slave.
+    // Configure agent.
     os::setenv("MESOS_DEFAULT_ROLE", flags.role);
   }
 
   PersistentVolumeScheduler scheduler(
       framework,
       flags.num_shards,
+      flags.num_shared_shards,
       flags.tasks_per_shard);
 
   MesosSchedulerDriver* driver = new MesosSchedulerDriver(