You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/05/02 22:47:00 UTC

[mesos] 03/03: Added SLRP unit tests for destroying unpublished persistent volumes.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 18bc6c95a67e2ac2dd4d5557608d75b7fb01d383
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Feb 11 20:52:26 2019 -0800

    Added SLRP unit tests for destroying unpublished persistent volumes.
    
    This patch adds 3 unit tests: `DestroyUnpublishedPersistentVolume`,
    `DestroyUnpublishedPersistentVolumeWithRecovery`, and
    `DestroyUnpublishedPersistentVolumeWithReboot` to test that the SLRP is
    resilient to misbehaved CSI plugins that fail to publish volumes.
    
    Review: https://reviews.apache.org/r/69955
---
 .../storage_local_resource_provider_tests.cpp      | 667 +++++++++++++++++++++
 1 file changed, 667 insertions(+)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index efc03c2..a2d2705 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -416,6 +416,46 @@ public:
     UNREACHABLE();
   }
 
+  // Set up an expected `NodePublishVolume` CSI call for a given mock CSI
+  // plugin. When the call is made to the mock plugin, `result` will be
+  // responded. When the response is received by the volume manager, the
+  // returned future will be satisfied.
+  Future<Nothing> futureNodePublishVolumeCall(
+      MockCSIPlugin* plugin, const Try<Nothing, StatusError>& result)
+  {
+    if (GetParam() == csi::v0::API_VERSION) {
+      EXPECT_CALL(*plugin, NodePublishVolume(
+          _, _, A<csi::v0::NodePublishVolumeResponse*>()))
+        .WillOnce(Invoke([result](
+            grpc::ServerContext* context,
+            const csi::v0::NodePublishVolumeRequest* request,
+            csi::v0::NodePublishVolumeResponse* response) {
+          return result.isError() ? result.error().status : grpc::Status::OK;
+        }));
+
+      return FUTURE_DISPATCH(_, &csi::v0::VolumeManagerProcess::__call<
+          csi::v0::NodePublishVolumeResponse>);
+    } else if (GetParam() == csi::v1::API_VERSION) {
+      EXPECT_CALL(*plugin, NodePublishVolume(
+          _, _, A<csi::v1::NodePublishVolumeResponse*>()))
+        .WillOnce(Invoke([result](
+            grpc::ServerContext* context,
+            const csi::v1::NodePublishVolumeRequest* request,
+            csi::v1::NodePublishVolumeResponse* response) {
+          return result.isError() ? result.error().status : grpc::Status::OK;
+        }));
+
+      return FUTURE_DISPATCH(_, &csi::v1::VolumeManagerProcess::__call<
+          csi::v1::NodePublishVolumeResponse>);
+    }
+
+    // This extra closure is necessary in order to use `FAIL` as it requires a
+    // void return type.
+    [&] { FAIL() << "Unsupported CSI API version " << GetParam(); }();
+
+    UNREACHABLE();
+  }
+
   // Create a JSON string representing a disk profile mapping containing the
   // given profile-parameter pairs.
   static string createDiskProfileMapping(
@@ -3130,6 +3170,633 @@ TEST_P(StorageLocalResourceProviderTest, CreatePersistentBlockVolume)
 }
 
 
+// This test verifies that if a persistent volumes is never published by the
+// storage local resource provider, the volume can be destroyed.
+//
+// To accomplish this:
+//   1. Create a MOUNT disk from a RAW disk resource.
+//   2. Create a persistent volume on the MOUNT disk then launches a task to
+//      write a file into it.
+//   3. Return `UNIMPLEMENTED` for the `NodePublishVolume` call. The task will
+//      fail to launch.
+//   4. Destroy the persistent volume and the MOUNT disk.
+TEST_P(StorageLocalResourceProviderTest, DestroyUnpublishedPersistentVolume)
+{
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  const string mockCsiEndpoint =
+    "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+  MockCSIPlugin plugin;
+  ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+  setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint);
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // We use the following filter to filter offers that do not have wanted
+  // resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline unwanted offers. The master can send such offers before the
+  // resource provider receives profile updates.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isStoragePool<Resource>, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  Offer offer = offers->at(0);
+
+  // Create a MOUNT disk.
+  Resource raw = *Resources(offer.resources())
+    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test"))
+    .begin();
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isMountDisk<Resource>, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.acceptOffers(
+      {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  offer = offers->at(0);
+
+  Resource created = *Resources(offer.resources())
+    .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test"))
+    .begin();
+
+  // Create a persistent MOUNT volume then launch a task to write a file.
+  Resource persistentVolume = created;
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_id(id::UUID::random().toString());
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_principal(framework.principal());
+  persistentVolume.mutable_disk()->mutable_volume()
+    ->set_container_path("volume");
+  persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+  Future<Nothing> taskFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FAILED)))
+    .WillOnce(FutureSatisfy(&taskFailed));
+
+  EXPECT_CALL(
+      sched, resourceOffers(&driver, OffersHaveResource(persistentVolume)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // Fail resource publishing.
+  Future<Nothing> nodePublishVolumeCall = futureNodePublishVolumeCall(
+      &plugin, StatusError(grpc::Status(grpc::UNIMPLEMENTED, "")));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(persistentVolume),
+       LAUNCH({createTask(
+           offer.slave_id(),
+           persistentVolume,
+           createCommandInfo("touch " + path::join("volume", "file")))})});
+
+  AWAIT_READY(nodePublishVolumeCall);
+
+  AWAIT_READY(taskFailed);
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  offer = offers->at(0);
+
+  // Destroy the persistent volume and the MOUNT disk.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateOperationStatusMessage> destroyDiskOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+  Future<UpdateOperationStatusMessage> destroyOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(raw)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.acceptOffers(
+      {offer.id()}, {DESTROY(persistentVolume), DESTROY_DISK(created)});
+
+  AWAIT_READY(destroyOperationStatus);
+  EXPECT_EQ(OPERATION_FINISHED, destroyOperationStatus->status().state());
+
+  AWAIT_READY(destroyDiskOperationStatus);
+  EXPECT_EQ(OPERATION_FINISHED, destroyDiskOperationStatus->status().state());
+
+  AWAIT_EXPECT_READY(offers)
+    << "Failed to wait for an offer containing resource '" << raw << "'";
+}
+
+
+// This test verifies that if a persistent volumes is never published by the
+// storage local resource provider, the volume can be destroyed after recovery.
+//
+// To accomplish this:
+//   1. Create a MOUNT disk from a RAW disk resource.
+//   2. Create a persistent volume on the MOUNT disk then launches a task to
+//      write a file into it.
+//   3. Return `UNIMPLEMENTED` for the `NodePublishVolume` call. The task will
+//      fail to launch.
+//   4. Restart the agent.
+//   5. Destroy the persistent volume and the MOUNT disk.
+TEST_P(
+    StorageLocalResourceProviderTest,
+    DestroyUnpublishedPersistentVolumeWithRecovery)
+{
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  const string mockCsiEndpoint =
+    "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+  MockCSIPlugin plugin;
+  ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+  setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint);
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // We use the following filter to filter offers that do not have wanted
+  // resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline unwanted offers. The master can send such offers before the
+  // resource provider receives profile updates.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isStoragePool<Resource>, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  Offer offer = offers->at(0);
+
+  // Create a MOUNT disk.
+  Resource raw = *Resources(offer.resources())
+    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test"))
+    .begin();
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isMountDisk<Resource>, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.acceptOffers(
+      {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  offer = offers->at(0);
+
+  Resource created = *Resources(offer.resources())
+    .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test"))
+    .begin();
+
+  // Create a persistent MOUNT volume then launch a task to write a file.
+  Resource persistentVolume = created;
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_id(id::UUID::random().toString());
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_principal(framework.principal());
+  persistentVolume.mutable_disk()->mutable_volume()
+    ->set_container_path("volume");
+  persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+  Future<Nothing> taskFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FAILED)))
+    .WillOnce(FutureSatisfy(&taskFailed));
+
+  EXPECT_CALL(
+      sched, resourceOffers(&driver, OffersHaveResource(persistentVolume)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // Fail resource publishing.
+  Future<Nothing> nodePublishVolumeCall = futureNodePublishVolumeCall(
+      &plugin, StatusError(grpc::Status(grpc::UNIMPLEMENTED, "")));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(persistentVolume),
+       LAUNCH({createTask(
+           offer.slave_id(),
+           persistentVolume,
+           createCommandInfo("touch " + path::join("volume", "file")))})});
+
+  AWAIT_READY(nodePublishVolumeCall);
+
+  AWAIT_READY(taskFailed);
+
+  AWAIT_READY(offers);
+
+  // Restart the agent.
+  EXPECT_CALL(sched, offerRescinded(_, _));
+
+  slave.get()->terminate();
+
+  // NOTE: The mock CSI plugin always returns 4GB for `GetCapacity` calls, hence
+  // the resource provider would report an extra storage pool, and when it shows
+  // up, we know that the resource provider has finished reconciling storage
+  // pools and thus operations won't be dropped.
+  //
+  // To achieve this, we drop `SlaveRegisteredMessage`s other than the first one
+  // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first two
+  // `UpdateSlaveMessage`s (one sent after agent reregistration and one after
+  // resource provider reregistration) and wait for the third one, which should
+  // contain the extra storage pool. We let it fall through to trigger an offer
+  // allocation for the persistent volume.
+  //
+  // Since the extra storage pool is never used, we reject the offers if only
+  // the storage pool is presented.
+  //
+  // TODO(chhsiao): Remove this workaround once MESOS-9553 is done.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isStoragePool<Resource>, lambda::_1, "test"))))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  // NOTE: The order of these expectations is reversed because Google Mock will
+  // search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  DROP_PROTOBUF(UpdateSlaveMessage(), _, _);
+  DROP_PROTOBUF(UpdateSlaveMessage(), _, _);
+  DROP_PROTOBUFS(SlaveReregisteredMessage(), _, _);
+  FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  EXPECT_CALL(
+      sched, resourceOffers(&driver, OffersHaveResource(persistentVolume)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(updateSlaveMessage);
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+  ASSERT_FALSE(Resources(
+      updateSlaveMessage->resource_providers().providers(0).total_resources())
+    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test"))
+    .empty());
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  offer = offers->at(0);
+
+  // Destroy the persistent volume and the MOUNT disk.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateOperationStatusMessage> destroyDiskOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+  Future<UpdateOperationStatusMessage> destroyOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(raw)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.acceptOffers(
+      {offer.id()}, {DESTROY(persistentVolume), DESTROY_DISK(created)});
+
+  AWAIT_READY(destroyOperationStatus);
+  EXPECT_EQ(OPERATION_FINISHED, destroyOperationStatus->status().state());
+
+  AWAIT_READY(destroyDiskOperationStatus);
+  EXPECT_EQ(OPERATION_FINISHED, destroyDiskOperationStatus->status().state());
+
+  AWAIT_EXPECT_READY(offers)
+    << "Failed to wait for an offer containing resource '" << raw << "'";
+}
+
+
+// This test verifies that if a persistent volumes is never published by the
+// storage local resource provider, the volume can be destroyed after agent
+// reboot.
+//
+// To accomplish this:
+//   1. Create a MOUNT disk from a RAW disk resource.
+//   2. Create a persistent volume on the MOUNT disk then launches a task to
+//      write a file into it.
+//   3. Return `UNIMPLEMENTED` for the `NodePublishVolume` call. The task will
+//      fail to launch.
+//   4. Simulate an agent reboot.
+//   5. Destroy the persistent volume and the MOUNT disk.
+TEST_P(
+    StorageLocalResourceProviderTest,
+    DestroyUnpublishedPersistentVolumeWithReboot)
+{
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  const string mockCsiEndpoint =
+    "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+  MockCSIPlugin plugin;
+  ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+  setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint);
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // We use the following filter to filter offers that do not have wanted
+  // resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline unwanted offers. The master can send such offers before the
+  // resource provider receives profile updates.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isStoragePool<Resource>, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  Offer offer = offers->at(0);
+
+  // Create a MOUNT disk.
+  Resource raw = *Resources(offer.resources())
+    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test"))
+    .begin();
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isMountDisk<Resource>, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.acceptOffers(
+      {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  offer = offers->at(0);
+
+  Resource created = *Resources(offer.resources())
+    .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test"))
+    .begin();
+
+  // Create a persistent MOUNT volume then launch a task to write a file.
+  Resource persistentVolume = created;
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_id(id::UUID::random().toString());
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_principal(framework.principal());
+  persistentVolume.mutable_disk()->mutable_volume()
+    ->set_container_path("volume");
+  persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+  Future<Nothing> taskFailed;
+  EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FAILED)))
+    .WillOnce(FutureSatisfy(&taskFailed));
+
+  EXPECT_CALL(
+      sched, resourceOffers(&driver, OffersHaveResource(persistentVolume)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // Fail resource publishing.
+  Future<Nothing> nodePublishVolumeCall = futureNodePublishVolumeCall(
+      &plugin, StatusError(grpc::Status(grpc::UNIMPLEMENTED, "")));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE(persistentVolume),
+       LAUNCH({createTask(
+           offer.slave_id(),
+           persistentVolume,
+           createCommandInfo("touch " + path::join("volume", "file")))})});
+
+  AWAIT_READY(nodePublishVolumeCall);
+
+  AWAIT_READY(taskFailed);
+
+  AWAIT_READY(offers);
+
+  // Shutdown the agent and unmount all CSI volumes to simulate a reboot.
+  EXPECT_CALL(sched, offerRescinded(_, _));
+
+  slave->reset();
+
+  const string csiRootDir = slave::paths::getCsiRootDir(slaveFlags.work_dir);
+  ASSERT_SOME(fs::unmountAll(csiRootDir));
+
+  // Inject the boot IDs to simulate a reboot.
+  ASSERT_SOME(os::write(
+      slave::paths::getBootIdPath(
+          slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+      "rebooted! ;)"));
+
+  Try<list<string>> volumePaths =
+    csi::paths::getVolumePaths(csiRootDir, "*", "*");
+  ASSERT_SOME(volumePaths);
+  ASSERT_FALSE(volumePaths->empty());
+
+  foreach (const string& path, volumePaths.get()) {
+    Try<csi::paths::VolumePath> volumePath =
+      csi::paths::parseVolumePath(csiRootDir, path);
+    ASSERT_SOME(volumePath);
+
+    const string volumeStatePath = csi::paths::getVolumeStatePath(
+        csiRootDir, volumePath->type, volumePath->name, volumePath->volumeId);
+
+    Result<csi::state::VolumeState> volumeState =
+      slave::state::read<csi::state::VolumeState>(volumeStatePath);
+
+    ASSERT_SOME(volumeState);
+
+    if (volumeState->state() == csi::state::VolumeState::PUBLISHED) {
+      volumeState->set_boot_id("rebooted! ;)");
+      ASSERT_SOME(slave::state::checkpoint(volumeStatePath, volumeState.get()));
+    }
+  }
+
+  // NOTE: The mock CSI plugin always returns 4GB for `GetCapacity` calls, hence
+  // the resource provider would report an extra storage pool, and when it shows
+  // up, we know that the resource provider has finished reconciling storage
+  // pools and thus operations won't be dropped.
+  //
+  // To achieve this, we drop `SlaveRegisteredMessage`s other than the first one
+  // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first two
+  // `UpdateSlaveMessage`s (one sent after agent reregistration and one after
+  // resource provider reregistration) and wait for the third one, which should
+  // contain the extra storage pool. We let it fall through to trigger an offer
+  // allocation for the persistent volume.
+  //
+  // Since the extra storage pool is never used, we reject the offers if only
+  // the storage pool is presented.
+  //
+  // TODO(chhsiao): Remove this workaround once MESOS-9553 is done.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isStoragePool<Resource>, lambda::_1, "test"))))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  // NOTE: The order of these expectations is reversed because Google Mock will
+  // search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  DROP_PROTOBUF(UpdateSlaveMessage(), _, _);
+  DROP_PROTOBUF(UpdateSlaveMessage(), _, _);
+  DROP_PROTOBUFS(SlaveReregisteredMessage(), _, _);
+  FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Restart the agent.
+  EXPECT_CALL(
+      sched, resourceOffers(&driver, OffersHaveResource(persistentVolume)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(updateSlaveMessage);
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+  ASSERT_FALSE(Resources(
+      updateSlaveMessage->resource_providers().providers(0).total_resources())
+    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test"))
+    .empty());
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  offer = offers->at(0);
+
+  // Destroy the persistent volume and the MOUNT disk.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateOperationStatusMessage> destroyDiskOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+  Future<UpdateOperationStatusMessage> destroyOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  EXPECT_CALL(
+      sched, resourceOffers(&driver, OffersHaveResource(raw)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.acceptOffers(
+      {offer.id()}, {DESTROY(persistentVolume), DESTROY_DISK(created)});
+
+  AWAIT_READY(destroyOperationStatus);
+  EXPECT_EQ(OPERATION_FINISHED, destroyOperationStatus->status().state());
+
+  AWAIT_READY(destroyDiskOperationStatus);
+  EXPECT_EQ(OPERATION_FINISHED, destroyDiskOperationStatus->status().state());
+
+  AWAIT_EXPECT_READY(offers)
+    << "Failed to wait for an offer containing resource '" << raw << "'";
+}
+
+
 // This test verifies that if the storage local resource provider fails to clean
 // up a persistent volume, the volume will not be destroyed.
 //