You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:06 UTC

[mesos] 09/15: Refactored SLRP to use v0 `VolumeManager`.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit b2f1d3bbf0fdae99c5945df15323bbd28a067b79
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:23:55 2019 -0700

    Refactored SLRP to use v0 `VolumeManager`.
    
    This patch moves volume management code from SLRP to the v0
    `VolumeManager`, and make SLRP uses the `VolumeManager` interface
    polymorphically.
    
    However, since SLRP now no longer keeps track of CSI volume states, it
    will not be able to verify that a persistent volume is published before
    being destroyed (although this should be guaranteed by volume manager
    recovery).
    
    Review: https://reviews.apache.org/r/70222/
---
 src/csi/v0_volume_manager.cpp                      | 1015 ++++++++++++++++-
 src/csi/v0_volume_manager_process.hpp              |  107 ++
 src/resource_provider/storage/provider.cpp         | 1190 +-------------------
 src/resource_provider/storage/provider_process.hpp |  151 +--
 .../storage_local_resource_provider_tests.cpp      |   25 +-
 5 files changed, 1183 insertions(+), 1305 deletions(-)

diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp
index 2a4d3eb..c6112be 100644
--- a/src/csi/v0_volume_manager.cpp
+++ b/src/csi/v0_volume_manager.cpp
@@ -16,26 +16,57 @@
 
 #include "csi/v0_volume_manager.hpp"
 
+#include <algorithm>
+#include <cstdlib>
+#include <functional>
+#include <list>
+
+#include <process/after.hpp>
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
+#include <process/loop.hpp>
 #include <process/process.hpp>
 
 #include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/none.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+#include <stout/some.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+#include <stout/unreachable.hpp>
 
+#include "csi/client.hpp"
+#include "csi/paths.hpp"
+#include "csi/utils.hpp"
 #include "csi/v0_volume_manager_process.hpp"
 
+#include "slave/state.hpp"
+
 namespace http = process::http;
+namespace slave = mesos::internal::slave;
 
+using std::list;
 using std::string;
 using std::vector;
 
 using google::protobuf::Map;
 
+using mesos::csi::state::VolumeState;
+
+using process::Break;
+using process::Continue;
+using process::ControlFlow;
 using process::Failure;
 using process::Future;
 using process::ProcessBase;
 
+using process::grpc::StatusError;
+
 using process::grpc::client::Runtime;
 
 namespace mesos{
@@ -76,13 +107,163 @@ VolumeManagerProcess::VolumeManagerProcess(
 
 Future<Nothing> VolumeManagerProcess::recover()
 {
-  return Failure("Unimplemented");
+  Try<string> bootId_ = os::bootId();
+  if (bootId_.isError()) {
+    return Failure("Failed to get boot ID: " + bootId_.error());
+  }
+
+  bootId = bootId_.get();
+
+  return serviceManager->recover()
+    .then(process::defer(self(), &Self::prepareServices))
+    .then(process::defer(self(), [this]() -> Future<Nothing> {
+      // Recover the states of CSI volumes.
+      Try<list<string>> volumePaths =
+        paths::getVolumePaths(rootDir, info.type(), info.name());
+
+      if (volumePaths.isError()) {
+        return Failure(
+            "Failed to find volumes for CSI plugin type '" + info.type() +
+            "' and name '" + info.name() + "': " + volumePaths.error());
+      }
+
+      vector<Future<Nothing>> futures;
+
+      foreach (const string& path, volumePaths.get()) {
+        Try<paths::VolumePath> volumePath =
+          paths::parseVolumePath(rootDir, path);
+
+        if (volumePath.isError()) {
+          return Failure(
+              "Failed to parse volume path '" + path +
+              "': " + volumePath.error());
+        }
+
+        CHECK_EQ(info.type(), volumePath->type);
+        CHECK_EQ(info.name(), volumePath->name);
+
+        const string& volumeId = volumePath->volumeId;
+        const string statePath = paths::getVolumeStatePath(
+            rootDir, info.type(), info.name(), volumeId);
+
+        if (!os::exists(statePath)) {
+          continue;
+        }
+
+        Result<VolumeState> volumeState =
+          slave::state::read<VolumeState>(statePath);
+
+        if (volumeState.isError()) {
+          return Failure(
+              "Failed to read volume state from '" + statePath +
+              "': " + volumeState.error());
+        }
+
+        if (volumeState.isNone()) {
+          continue;
+        }
+
+        volumes.put(volumeId, std::move(volumeState.get()));
+        VolumeData& volume = volumes.at(volumeId);
+
+        if (!VolumeState::State_IsValid(volume.state.state())) {
+          return Failure("Volume '" + volumeId + "' is in INVALID state");
+        }
+
+        // First, if there is a node reboot after the volume is made
+        // publishable, it should be reset to `NODE_READY`.
+        switch (volume.state.state()) {
+          case VolumeState::CREATED:
+          case VolumeState::NODE_READY:
+          case VolumeState::CONTROLLER_PUBLISH:
+          case VolumeState::CONTROLLER_UNPUBLISH:
+          case VolumeState::NODE_STAGE: {
+            break;
+          }
+          case VolumeState::VOL_READY:
+          case VolumeState::PUBLISHED:
+          case VolumeState::NODE_UNSTAGE:
+          case VolumeState::NODE_PUBLISH:
+          case VolumeState::NODE_UNPUBLISH: {
+            if (bootId != volume.state.boot_id()) {
+              // Since this is a no-op, no need to checkpoint here.
+              volume.state.set_state(VolumeState::NODE_READY);
+              volume.state.clear_boot_id();
+            }
+
+            break;
+          }
+          case VolumeState::UNKNOWN: {
+            return Failure("Volume '" + volumeId + "' is in UNKNOWN state");
+          }
+
+          // NOTE: We avoid using a default clause for the following values in
+          // proto3's open enum to enable the compiler to detect missing enum
+          // cases for us. See: https://github.com/google/protobuf/issues/3917
+          case google::protobuf::kint32min:
+          case google::protobuf::kint32max: {
+            UNREACHABLE();
+          }
+        }
+
+        // Second, if the volume has been used by a container before recovery,
+        // we have to bring the volume back to `PUBLISHED` so data can be
+        // cleaned up synchronously when needed.
+        if (volume.state.node_publish_required()) {
+          futures.push_back(publishVolume(volumeId));
+        }
+      }
+
+      // Garbage collect leftover mount paths that were failed to remove before.
+      const string mountRootDir =
+        paths::getMountRootDir(rootDir, info.type(), info.name());
+
+      Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir);
+      if (mountPaths.isError()) {
+        // TODO(chhsiao): This could indicate that something is seriously wrong.
+        // To help debugging the problem, we should surface the error via
+        // MESOS-8745.
+        return Failure(
+            "Failed to find mount paths for CSI plugin type '" + info.type() +
+            "' and name '" + info.name() + "': " + mountPaths.error());
+      }
+
+      foreach (const string& path, mountPaths.get()) {
+        Try<string> volumeId = paths::parseMountPath(mountRootDir, path);
+        if (volumeId.isError()) {
+          return Failure(
+              "Failed to parse mount path '" + path + "': " + volumeId.error());
+        }
+
+        if (!volumes.contains(volumeId.get())) {
+          garbageCollectMountPath(volumeId.get());
+        }
+      }
+
+      return process::collect(futures).then([] { return Nothing(); });
+    }));
 }
 
 
 Future<vector<VolumeInfo>> VolumeManagerProcess::listVolumes()
 {
-  return Failure("Unimplemented");
+  if (!controllerCapabilities->listVolumes) {
+    return vector<VolumeInfo>();
+  }
+
+  // TODO(chhsiao): Set the max entries and use a loop to do multiple
+  // `ListVolumes` calls.
+  return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest())
+    .then(process::defer(self(), [](const ListVolumesResponse& response) {
+      vector<VolumeInfo> result;
+      foreach (const auto& entry, response.entries()) {
+        result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()),
+                                    entry.volume().id(),
+                                    entry.volume().attributes()});
+      }
+
+      return result;
+    }));
 }
 
 
@@ -90,7 +271,18 @@ Future<Bytes> VolumeManagerProcess::getCapacity(
     const types::VolumeCapability& capability,
     const Map<string, string>& parameters)
 {
-  return Failure("Unimplemented");
+  if (!controllerCapabilities->getCapacity) {
+    return Bytes(0);
+  }
+
+  GetCapacityRequest request;
+  *request.add_volume_capabilities() = evolve(capability);
+  *request.mutable_parameters() = parameters;
+
+  return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request))
+    .then([](const GetCapacityResponse& response) {
+      return Bytes(response.available_capacity());
+    });
 }
 
 
@@ -100,7 +292,46 @@ Future<VolumeInfo> VolumeManagerProcess::createVolume(
     const types::VolumeCapability& capability,
     const Map<string, string>& parameters)
 {
-  return Failure("Unimplemented");
+  if (!controllerCapabilities->createDeleteVolume) {
+    return Failure(
+        "CREATE_DELETE_VOLUME controller capability is not supported for CSI "
+        "plugin type '" + info.type() + "' and name '" + info.name());
+  }
+
+  LOG(INFO) << "Creating volume with name '" << name << "'";
+
+  CreateVolumeRequest request;
+  request.set_name(name);
+  request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
+  request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
+  *request.add_volume_capabilities() = evolve(capability);
+  *request.mutable_parameters() = parameters;
+
+  // We retry the `CreateVolume` call for MESOS-9517.
+  return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+    .then(process::defer(self(), [=](
+        const CreateVolumeResponse& response) -> Future<VolumeInfo> {
+      const string& volumeId = response.volume().id();
+
+      // NOTE: If the volume is already tracked, there might already be
+      // operations running in its sequence. Since this continuation runs
+      // outside the sequence, we fail the call here to avoid any race issue.
+      // This also means that this call is not idempotent.
+      if (volumes.contains(volumeId)) {
+        return Failure("Volume with name '" + name + "' already exists");
+      }
+
+      VolumeState volumeState;
+      volumeState.set_state(VolumeState::CREATED);
+      *volumeState.mutable_volume_capability() = capability;
+      *volumeState.mutable_parameters() = parameters;
+      *volumeState.mutable_volume_attributes() = response.volume().attributes();
+
+      volumes.put(volumeId, std::move(volumeState));
+      checkpointVolumeState(volumeId);
+
+      return VolumeInfo{capacity, volumeId, response.volume().attributes()};
+    }));
 }
 
 
@@ -109,13 +340,85 @@ Future<Option<Error>> VolumeManagerProcess::validateVolume(
     const types::VolumeCapability& capability,
     const Map<string, string>& parameters)
 {
-  return Failure("Unimplemented");
+  // If the volume has been checkpointed, the validation succeeds only if the
+  // capability and parameters of the specified profile are the same as those in
+  // the checkpoint.
+  if (volumes.contains(volumeInfo.id)) {
+    const VolumeState& volumeState = volumes.at(volumeInfo.id).state;
+
+    if (volumeState.volume_capability() != capability) {
+      return Some(
+          Error("Mismatched capability for volume '" + volumeInfo.id + "'"));
+    }
+
+    if (volumeState.parameters() != parameters) {
+      return Some(
+          Error("Mismatched parameters for volume '" + volumeInfo.id + "'"));
+    }
+
+    return None();
+  }
+
+  if (!parameters.empty()) {
+    LOG(WARNING)
+      << "Validating volumes against parameters is not supported in CSI v0";
+  }
+
+  LOG(INFO) << "Validating volume '" << volumeInfo.id << "'";
+
+  ValidateVolumeCapabilitiesRequest request;
+  request.set_volume_id(volumeInfo.id);
+  *request.add_volume_capabilities() = evolve(capability);
+  *request.mutable_volume_attributes() = volumeInfo.context;
+
+  return call<VALIDATE_VOLUME_CAPABILITIES>(
+      CONTROLLER_SERVICE, std::move(request))
+    .then(process::defer(self(), [=](
+        const ValidateVolumeCapabilitiesResponse& response)
+        -> Future<Option<Error>> {
+      if (!response.supported()) {
+        return Error(
+            "Unsupported volume capability for volume '" + volumeInfo.id +
+            "': " + response.message());
+      }
+
+      // NOTE: If the volume is already tracked, there might already be
+      // operations running in its sequence. Since this continuation runs
+      // outside the sequence, we fail the call here to avoid any race issue.
+      // This also means that this call is not idempotent.
+      if (volumes.contains(volumeInfo.id)) {
+        return Failure("Volume '" + volumeInfo.id + "' already validated");
+      }
+
+      VolumeState volumeState;
+      volumeState.set_state(VolumeState::CREATED);
+      *volumeState.mutable_volume_capability() = capability;
+      *volumeState.mutable_parameters() = parameters;
+      *volumeState.mutable_volume_attributes() = volumeInfo.context;
+
+      volumes.put(volumeInfo.id, std::move(volumeState));
+      checkpointVolumeState(volumeInfo.id);
+
+      return None();
+    }));
 }
 
 
 Future<bool> VolumeManagerProcess::deleteVolume(const string& volumeId)
 {
-  return Failure("Unimplemented");
+  if (!volumes.contains(volumeId)) {
+    return __deleteVolume(volumeId);
+  }
+
+  VolumeData& volume = volumes.at(volumeId);
+
+  LOG(INFO) << "Deleting volume '" << volumeId << "' in "
+            << volume.state.state() << " state";
+
+  // Volume deletion is sequentialized with other operations on the same volume
+  // to avoid races.
+  return volume.sequence->add(std::function<Future<bool>()>(
+      process::defer(self(), &Self::_deleteVolume, volumeId)));
 }
 
 
@@ -133,7 +436,19 @@ Future<Nothing> VolumeManagerProcess::detachVolume(const string& volumeId)
 
 Future<Nothing> VolumeManagerProcess::publishVolume(const string& volumeId)
 {
-  return Failure("Unimplemented");
+  if (!volumes.contains(volumeId)) {
+    return Failure("Cannot publish unknown volume '" + volumeId + "'");
+  }
+
+  VolumeData& volume = volumes.at(volumeId);
+
+  LOG(INFO) << "Publishing volume '" << volumeId << "' in "
+            << volume.state.state() << " state";
+
+  // Volume publishing is serialized with other operations on the same volume to
+  // avoid races.
+  return volume.sequence->add(std::function<Future<Nothing>()>(
+      process::defer(self(), &Self::_publishVolume, volumeId)));
 }
 
 
@@ -143,6 +458,692 @@ Future<Nothing> VolumeManagerProcess::unpublishVolume(const string& volumeId)
 }
 
 
+template <RPC Rpc>
+Future<Response<Rpc>> VolumeManagerProcess::call(
+    const Service& service,
+    const Request<Rpc>& request,
+    const bool retry) // Made immutable in the following mutable lambda.
+{
+  Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+
+  return process::loop(
+      self(),
+      [=] {
+        // Make the call to the latest service endpoint.
+        return serviceManager->getServiceEndpoint(service)
+          .then(process::defer(
+              self(), &VolumeManagerProcess::_call<Rpc>, lambda::_1, request));
+      },
+      [=](const Try<Response<Rpc>, StatusError>& result) mutable
+          -> Future<ControlFlow<Response<Rpc>>> {
+        Option<Duration> backoff = retry
+          ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
+          : Option<Duration>::none();
+
+        maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+
+        // We dispatch `__call` for testing purpose.
+        return process::dispatch(
+            self(), &VolumeManagerProcess::__call<Rpc>, result, backoff);
+      });
+}
+
+
+template <RPC Rpc>
+Future<Try<Response<Rpc>, StatusError>> VolumeManagerProcess::_call(
+    const string& endpoint, const Request<Rpc>& request)
+{
+  ++metrics->csi_plugin_rpcs_pending.at(Rpc);
+
+  return Client(endpoint, runtime).call<Rpc>(request)
+    .onAny(defer(self(), [=](
+        const Future<Try<Response<Rpc>, StatusError>>& future) {
+      --metrics->csi_plugin_rpcs_pending.at(Rpc);
+      if (future.isReady() && future->isSome()) {
+        ++metrics->csi_plugin_rpcs_successes.at(Rpc);
+      } else if (future.isDiscarded()) {
+        ++metrics->csi_plugin_rpcs_cancelled.at(Rpc);
+      } else {
+        ++metrics->csi_plugin_rpcs_errors.at(Rpc);
+      }
+    }));
+}
+
+
+template <RPC Rpc>
+Future<ControlFlow<Response<Rpc>>> VolumeManagerProcess::__call(
+    const Try<Response<Rpc>, StatusError>& result,
+    const Option<Duration>& backoff)
+{
+  if (result.isSome()) {
+    return Break(result.get());
+  }
+
+  if (backoff.isNone()) {
+    return Failure(result.error());
+  }
+
+  // See the link below for retryable status codes:
+  // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
+  switch (result.error().status.error_code()) {
+    case grpc::DEADLINE_EXCEEDED:
+    case grpc::UNAVAILABLE: {
+      LOG(ERROR) << "Received '" << result.error() << "' while calling " << Rpc
+                 << ". Retrying in " << backoff.get();
+
+      return process::after(backoff.get())
+        .then([]() -> Future<ControlFlow<Response<Rpc>>> {
+          return Continue();
+        });
+    }
+    case grpc::CANCELLED:
+    case grpc::UNKNOWN:
+    case grpc::INVALID_ARGUMENT:
+    case grpc::NOT_FOUND:
+    case grpc::ALREADY_EXISTS:
+    case grpc::PERMISSION_DENIED:
+    case grpc::UNAUTHENTICATED:
+    case grpc::RESOURCE_EXHAUSTED:
+    case grpc::FAILED_PRECONDITION:
+    case grpc::ABORTED:
+    case grpc::OUT_OF_RANGE:
+    case grpc::UNIMPLEMENTED:
+    case grpc::INTERNAL:
+    case grpc::DATA_LOSS: {
+      return Failure(result.error());
+    }
+    case grpc::OK:
+    case grpc::DO_NOT_USE: {
+      UNREACHABLE();
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
+Future<Nothing> VolumeManagerProcess::prepareServices()
+{
+  CHECK(!services.empty());
+
+  // Get the plugin capabilities.
+  return call<GET_PLUGIN_CAPABILITIES>(
+      *services.begin(), GetPluginCapabilitiesRequest())
+    .then(process::defer(self(), [=](
+        const GetPluginCapabilitiesResponse& response) -> Future<Nothing> {
+      pluginCapabilities = response.capabilities();
+
+      if (services.contains(CONTROLLER_SERVICE) &&
+          !pluginCapabilities->controllerService) {
+        return Failure(
+            "CONTROLLER_SERVICE plugin capability is not supported for CSI "
+            "plugin type '" + info.type() + "' and name '" + info.name() + "'");
+      }
+
+      return Nothing();
+    }))
+    // Check if all services have consistent plugin infos.
+    .then(process::defer(self(), [this] {
+      vector<Future<GetPluginInfoResponse>> futures;
+      foreach (const Service& service, services) {
+        futures.push_back(
+            call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest())
+              .onReady([service](const GetPluginInfoResponse& response) {
+                LOG(INFO) << service << " loaded: " << stringify(response);
+              }));
+      }
+
+      return process::collect(futures)
+        .then([](const vector<GetPluginInfoResponse>& pluginInfos) {
+          for (size_t i = 1; i < pluginInfos.size(); ++i) {
+            if (pluginInfos[i].name() != pluginInfos[0].name() ||
+                pluginInfos[i].vendor_version() !=
+                  pluginInfos[0].vendor_version()) {
+              LOG(WARNING) << "Inconsistent plugin services. Please check with "
+                              "the plugin vendor to ensure compatibility.";
+            }
+          }
+
+          return Nothing();
+        });
+    }))
+    // Get the controller capabilities.
+    .then(process::defer(self(), [this]() -> Future<Nothing> {
+      if (!services.contains(CONTROLLER_SERVICE)) {
+        controllerCapabilities = ControllerCapabilities();
+        return Nothing();
+      }
+
+      return call<CONTROLLER_GET_CAPABILITIES>(
+          CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest())
+        .then(process::defer(self(), [this](
+            const ControllerGetCapabilitiesResponse& response) {
+          controllerCapabilities = response.capabilities();
+          return Nothing();
+        }));
+    }))
+    // Get the node capabilities and ID.
+    .then(process::defer(self(), [this]() -> Future<Nothing> {
+      if (!services.contains(NODE_SERVICE)) {
+        nodeCapabilities = NodeCapabilities();
+        return Nothing();
+      }
+
+      return call<NODE_GET_CAPABILITIES>(
+          NODE_SERVICE, NodeGetCapabilitiesRequest())
+        .then(process::defer(self(), [this](
+            const NodeGetCapabilitiesResponse& response) -> Future<Nothing> {
+          nodeCapabilities = response.capabilities();
+
+          if (controllerCapabilities->publishUnpublishVolume) {
+            return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest())
+              .then(process::defer(self(), [this](
+                  const NodeGetIdResponse& response) {
+                nodeId = response.node_id();
+                return Nothing();
+              }));
+          }
+
+          return Nothing();
+        }));
+    }));
+}
+
+
+Future<bool> VolumeManagerProcess::_deleteVolume(const std::string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeState& volumeState = volumes.at(volumeId).state;
+
+  if (volumeState.node_publish_required()) {
+    CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
+
+    const string targetPath = paths::getMountTargetPath(
+        paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+    // NOTE: Normally the volume should have been cleaned up. However this may
+    // not be true for preprovisioned volumes (e.g., leftover from a previous
+    // resource provider instance). To prevent data leakage in such cases, we
+    // clean up the data (but not the target path) here.
+    Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
+    if (rmdir.isError()) {
+      return Failure(
+          "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
+    }
+
+    volumeState.set_node_publish_required(false);
+    checkpointVolumeState(volumeId);
+  }
+
+  if (volumeState.state() != VolumeState::CREATED) {
+    // Retry after transitioning the volume to `CREATED` state.
+    return _detachVolume(volumeId)
+      .then(process::defer(self(), &Self::_deleteVolume, volumeId));
+  }
+
+  // NOTE: The last asynchronous continuation, which is supposed to be run in
+  // the volume's sequence, would cause the sequence to be destructed, which
+  // would in turn discard the returned future. However, since the continuation
+  // would have already been run, the returned future will become ready, making
+  // the future returned by the sequence ready as well.
+  return __deleteVolume(volumeId)
+    .then(process::defer(self(), [this, volumeId](bool deleted) {
+      volumes.erase(volumeId);
+
+      const string volumePath =
+        paths::getVolumePath(rootDir, info.type(), info.name(), volumeId);
+
+      Try<Nothing> rmdir = os::rmdir(volumePath);
+      CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
+                        << volumePath << "': " << rmdir.error();
+
+      garbageCollectMountPath(volumeId);
+
+      return deleted;
+    }));
+}
+
+
+Future<bool> VolumeManagerProcess::__deleteVolume(
+    const string& volumeId)
+{
+  if (!controllerCapabilities->createDeleteVolume) {
+    return false;
+  }
+
+  LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '"
+            << volumeId << "'";
+
+  DeleteVolumeRequest request;
+  request.set_volume_id(volumeId);
+
+  // We retry the `DeleteVolume` call for MESOS-9517.
+  return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+    .then([] { return true; });
+}
+
+
+Future<Nothing> VolumeManagerProcess::_attachVolume(const string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeState& volumeState = volumes.at(volumeId).state;
+
+  if (volumeState.state() == VolumeState::NODE_READY) {
+    return Nothing();
+  }
+
+  if (volumeState.state() != VolumeState::CREATED &&
+      volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
+      volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
+    return Failure(
+        "Cannot attach volume '" + volumeId + "' in " +
+        stringify(volumeState.state()) + " state");
+  }
+
+  if (!controllerCapabilities->publishUnpublishVolume) {
+    // Since this is a no-op, no need to checkpoint here.
+    volumeState.set_state(VolumeState::NODE_READY);
+    return Nothing();
+  }
+
+  // A previously failed `ControllerUnpublishVolume` call can be recovered
+  // through an extra `ControllerUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerunpublishvolume // NOLINT
+  if (volumeState.state() == VolumeState::CONTROLLER_UNPUBLISH) {
+    // Retry after recovering the volume to `CREATED` state.
+    return _detachVolume(volumeId)
+      .then(process::defer(self(), &Self::_attachVolume, volumeId));
+  }
+
+  if (volumeState.state() == VolumeState::CREATED) {
+    volumeState.set_state(VolumeState::CONTROLLER_PUBLISH);
+    checkpointVolumeState(volumeId);
+  }
+
+  LOG(INFO)
+    << "Calling '/csi.v0.Controller/ControllerPublishVolume' for volume '"
+    << volumeId << "'";
+
+  ControllerPublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_node_id(CHECK_NOTNONE(nodeId));
+  *request.mutable_volume_capability() =
+    evolve(volumeState.volume_capability());
+  request.set_readonly(false);
+  *request.mutable_volume_attributes() = volumeState.volume_attributes();
+
+  return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId](
+        const ControllerPublishVolumeResponse& response) {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::NODE_READY);
+      *volumeState.mutable_publish_info() = response.publish_info();
+
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
+    }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::_detachVolume(const string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeState& volumeState = volumes.at(volumeId).state;
+
+  if (volumeState.state() == VolumeState::CREATED) {
+    return Nothing();
+  }
+
+  if (volumeState.state() != VolumeState::NODE_READY &&
+      volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
+      volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
+    // Retry after transitioning the volume to `CREATED` state.
+    return _unpublishVolume(volumeId)
+      .then(process::defer(self(), &Self::_detachVolume, volumeId));
+  }
+
+  if (!controllerCapabilities->publishUnpublishVolume) {
+    // Since this is a no-op, no need to checkpoint here.
+    volumeState.set_state(VolumeState::CREATED);
+    return Nothing();
+  }
+
+  // A previously failed `ControllerPublishVolume` call can be recovered through
+  // the current `ControllerUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
+  if (volumeState.state() == VolumeState::NODE_READY ||
+      volumeState.state() == VolumeState::CONTROLLER_PUBLISH) {
+    volumeState.set_state(VolumeState::CONTROLLER_UNPUBLISH);
+    checkpointVolumeState(volumeId);
+  }
+
+  LOG(INFO)
+    << "Calling '/csi.v0.Controller/ControllerUnpublishVolume' for volume '"
+    << volumeId << "'";
+
+  ControllerUnpublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_node_id(CHECK_NOTNONE(nodeId));
+
+  return call<CONTROLLER_UNPUBLISH_VOLUME>(
+      CONTROLLER_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::CREATED);
+      volumeState.mutable_publish_info()->clear();
+
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
+    }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::_publishVolume(const string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeState& volumeState = volumes.at(volumeId).state;
+
+  if (volumeState.state() == VolumeState::PUBLISHED) {
+    CHECK(volumeState.node_publish_required());
+    return Nothing();
+  }
+
+  if (volumeState.state() != VolumeState::VOL_READY &&
+      volumeState.state() != VolumeState::NODE_PUBLISH &&
+      volumeState.state() != VolumeState::NODE_UNPUBLISH) {
+    // Retry after transitioning the volume to `VOL_READY` state.
+    return __publishVolume(volumeId)
+      .then(process::defer(self(), &Self::_publishVolume, volumeId));
+  }
+
+  // A previously failed `NodeUnpublishVolume` call can be recovered through an
+  // extra `NodeUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunpublishvolume // NOLINT
+  if (volumeState.state() == VolumeState::NODE_UNPUBLISH) {
+    // Retry after recovering the volume to `VOL_READY` state.
+    return __unpublishVolume(volumeId)
+      .then(process::defer(self(), &Self::_publishVolume, volumeId));
+  }
+
+  const string targetPath = paths::getMountTargetPath(
+      paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+  // NOTE: The target path will be cleaned up during volume removal.
+  Try<Nothing> mkdir = os::mkdir(targetPath);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create mount target path '" + targetPath +
+        "': " + mkdir.error());
+  }
+
+  if (volumeState.state() == VolumeState::VOL_READY) {
+    volumeState.set_state(VolumeState::NODE_PUBLISH);
+    checkpointVolumeState(volumeId);
+  }
+
+  LOG(INFO) << "Calling '/csi.v0.Node/NodePublishVolume' for volume '"
+            << volumeId << "'";
+
+  NodePublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  *request.mutable_publish_info() = volumeState.publish_info();
+  request.set_target_path(targetPath);
+  *request.mutable_volume_capability() =
+    evolve(volumeState.volume_capability());
+  request.set_readonly(false);
+  *request.mutable_volume_attributes() = volumeState.volume_attributes();
+
+  if (nodeCapabilities->stageUnstageVolume) {
+    const string stagingPath = paths::getMountStagingPath(
+        paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+    CHECK(os::exists(stagingPath));
+    request.set_staging_target_path(stagingPath);
+  }
+
+  return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
+    .then(defer(self(), [this, volumeId, targetPath] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+
+      volumeState.set_state(VolumeState::PUBLISHED);
+
+      // NOTE: This is the first time a container is going to consume the
+      // persistent volume, so the `node_publish_required` field is set to
+      // indicate that this volume must remain published so it can be
+      // synchronously cleaned up when the persistent volume is destroyed.
+      volumeState.set_node_publish_required(true);
+
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
+    }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::__publishVolume(const string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeState& volumeState = volumes.at(volumeId).state;
+
+  if (volumeState.state() == VolumeState::VOL_READY) {
+    CHECK(!volumeState.boot_id().empty());
+    return Nothing();
+  }
+
+  if (volumeState.state() != VolumeState::NODE_READY &&
+      volumeState.state() != VolumeState::NODE_STAGE &&
+      volumeState.state() != VolumeState::NODE_UNSTAGE) {
+    // Retry after transitioning the volume to `NODE_READY` state.
+    return _attachVolume(volumeId)
+      .then(process::defer(self(), &Self::__publishVolume, volumeId));
+  }
+
+  if (!nodeCapabilities->stageUnstageVolume) {
+    // Since this is a no-op, no need to checkpoint here.
+    volumeState.set_state(VolumeState::VOL_READY);
+    volumeState.set_boot_id(CHECK_NOTNONE(bootId));
+    return Nothing();
+  }
+
+  // A previously failed `NodeUnstageVolume` call can be recovered through an
+  // extra `NodeUnstageVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunstagevolume // NOLINT
+  if (volumeState.state() == VolumeState::NODE_UNSTAGE) {
+    // Retry after recovering the volume to `NODE_READY` state.
+    return _unpublishVolume(volumeId)
+      .then(process::defer(self(), &Self::__publishVolume, volumeId));
+  }
+
+  const string stagingPath = paths::getMountStagingPath(
+      paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+  // NOTE: The staging path will be cleaned up in during volume removal.
+  Try<Nothing> mkdir = os::mkdir(stagingPath);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create mount staging path '" + stagingPath +
+        "': " + mkdir.error());
+  }
+
+  if (volumeState.state() == VolumeState::NODE_READY) {
+    volumeState.set_state(VolumeState::NODE_STAGE);
+    checkpointVolumeState(volumeId);
+  }
+
+  LOG(INFO) << "Calling '/csi.v0.Node/NodeStageVolume' for volume '" << volumeId
+            << "'";
+
+  NodeStageVolumeRequest request;
+  request.set_volume_id(volumeId);
+  *request.mutable_publish_info() = volumeState.publish_info();
+  request.set_staging_target_path(stagingPath);
+  *request.mutable_volume_capability() =
+    evolve(volumeState.volume_capability());
+  *request.mutable_volume_attributes() = volumeState.volume_attributes();
+
+  return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::VOL_READY);
+      volumeState.set_boot_id(CHECK_NOTNONE(bootId));
+
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
+    }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::_unpublishVolume(const string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeState& volumeState = volumes.at(volumeId).state;
+
+  if (volumeState.state() == VolumeState::NODE_READY) {
+    CHECK(volumeState.boot_id().empty());
+    return Nothing();
+  }
+
+  if (volumeState.state() != VolumeState::VOL_READY &&
+      volumeState.state() != VolumeState::NODE_STAGE &&
+      volumeState.state() != VolumeState::NODE_UNSTAGE) {
+    // Retry after transitioning the volume to `VOL_READY` state.
+    return __unpublishVolume(volumeId)
+      .then(process::defer(self(), &Self::_unpublishVolume, volumeId));
+  }
+
+  if (!nodeCapabilities->stageUnstageVolume) {
+    // Since this is a no-op, no need to checkpoint here.
+    volumeState.set_state(VolumeState::NODE_READY);
+    volumeState.clear_boot_id();
+    return Nothing();
+  }
+
+  // A previously failed `NodeStageVolume` call can be recovered through the
+  // current `NodeUnstageVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
+  if (volumeState.state() == VolumeState::VOL_READY ||
+      volumeState.state() == VolumeState::NODE_STAGE) {
+    volumeState.set_state(VolumeState::NODE_UNSTAGE);
+    checkpointVolumeState(volumeId);
+  }
+
+  const string stagingPath = paths::getMountStagingPath(
+      paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+  CHECK(os::exists(stagingPath));
+
+  LOG(INFO) << "Calling '/csi.v0.Node/NodeUnstageVolume' for volume '"
+            << volumeId << "'";
+
+  NodeUnstageVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_staging_target_path(stagingPath);
+
+  return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::NODE_READY);
+      volumeState.clear_boot_id();
+
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
+    }));
+}
+
+
+Future<Nothing> VolumeManagerProcess::__unpublishVolume(const string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeState& volumeState = volumes.at(volumeId).state;
+
+  if (volumeState.state() == VolumeState::VOL_READY) {
+    return Nothing();
+  }
+
+  if (volumeState.state() != VolumeState::PUBLISHED &&
+      volumeState.state() != VolumeState::NODE_PUBLISH &&
+      volumeState.state() != VolumeState::NODE_UNPUBLISH) {
+    return Failure(
+        "Cannot unpublish volume '" + volumeId + "' in " +
+        stringify(volumeState.state()) + "state");
+  }
+
+  // A previously failed `NodePublishVolume` call can be recovered through the
+  // current `NodeUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
+  if (volumeState.state() == VolumeState::PUBLISHED ||
+      volumeState.state() == VolumeState::NODE_PUBLISH) {
+    volumeState.set_state(VolumeState::NODE_UNPUBLISH);
+    checkpointVolumeState(volumeId);
+  }
+
+  const string targetPath = paths::getMountTargetPath(
+      paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+  CHECK(os::exists(targetPath));
+
+  LOG(INFO) << "Calling '/csi.v0.Node/NodeUnpublishVolume' for volume '"
+            << volumeId << "'";
+
+  NodeUnpublishVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_target_path(targetPath);
+
+  return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::VOL_READY);
+
+      checkpointVolumeState(volumeId);
+
+      return Nothing();
+    }));
+}
+
+
+void VolumeManagerProcess::checkpointVolumeState(const string& volumeId)
+{
+  const string statePath =
+    paths::getVolumeStatePath(rootDir, info.type(), info.name(), volumeId);
+
+  // NOTE: We ensure the checkpoint is synced to the filesystem to avoid
+  // resulting in a stale or empty checkpoint when a system crash happens.
+  Try<Nothing> checkpoint =
+    slave::state::checkpoint(statePath, volumes.at(volumeId).state, true);
+
+  CHECK_SOME(checkpoint)
+    << "Failed to checkpoint volume state to '" << statePath << "':"
+    << checkpoint.error();
+}
+
+
+void VolumeManagerProcess::garbageCollectMountPath(const string& volumeId)
+{
+  CHECK(!volumes.contains(volumeId));
+
+  const string path = paths::getMountPath(
+      paths::getMountRootDir(rootDir, info.type(), info.name()), volumeId);
+
+  if (os::exists(path)) {
+    Try<Nothing> rmdir = os::rmdir(path);
+    if (rmdir.isError()) {
+      LOG(ERROR) << "Failed to remove directory '" << path
+                 << "': " << rmdir.error();
+    }
+  }
+}
+
+
 VolumeManager::VolumeManager(
     const http::URL& agentUrl,
     const string& rootDir,
diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp
index 9db99de..214fc1f 100644
--- a/src/csi/v0_volume_manager_process.hpp
+++ b/src/csi/v0_volume_manager_process.hpp
@@ -29,17 +29,25 @@
 #include <process/future.hpp>
 #include <process/grpc.hpp>
 #include <process/http.hpp>
+#include <process/loop.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
+#include <process/sequence.hpp>
 
 #include <stout/bytes.hpp>
+#include <stout/duration.hpp>
 #include <stout/error.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
+#include <stout/try.hpp>
 
 #include "csi/metrics.hpp"
+#include "csi/rpc.hpp"
 #include "csi/service_manager.hpp"
+#include "csi/state.hpp"
+#include "csi/utils.hpp"
 #include "csi/v0_volume_manager.hpp"
 #include "csi/volume_manager.hpp"
 
@@ -47,6 +55,16 @@ namespace mesos {
 namespace csi {
 namespace v0 {
 
+// The CSI volume manager initially picks a random amount of time between
+// `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI calls.
+// Subsequent retries are exponentially backed off based on this interval (e.g.,
+// 2nd retry uses a random value between `[0, b * 2^1]`, 3rd retry between
+// `[0, b * 2^2]`, etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
+//
+// TODO(chhsiao): Make the retry parameters configurable.
+constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
+constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
+
 
 class VolumeManagerProcess : public process::Process<VolumeManagerProcess>
 {
@@ -90,7 +108,76 @@ public:
 
   process::Future<Nothing> unpublishVolume(const std::string& volumeId);
 
+  // Wrapper functions to make CSI calls and update RPC metrics. Made public for
+  // testing purpose.
+  //
+  // The call is made asynchronously and thus no guarantee is provided on the
+  // order in which calls are sent. Callers need to either ensure to not have
+  // multiple conflicting calls in flight, or treat results idempotently.
+  //
+  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
+  // calls on the same volume, and 2) no profile update while there are ongoing
+  // `CREATE_DISK` or `DESTROY_DISK` operations.
+  template <RPC Rpc>
+  process::Future<Response<Rpc>> call(
+      const Service& service, const Request<Rpc>& request, bool retry = false);
+
+  template <RPC Rpc>
+  process::Future<Try<Response<Rpc>, process::grpc::StatusError>>
+  _call(const std::string& endpoint, const Request<Rpc>& request);
+
+  template <RPC Rpc>
+  process::Future<process::ControlFlow<Response<Rpc>>> __call(
+      const Try<Response<Rpc>, process::grpc::StatusError>& result,
+      const Option<Duration>& backoff);
+
 private:
+  process::Future<Nothing> prepareServices();
+
+  process::Future<bool> _deleteVolume(const std::string& volumeId);
+  process::Future<bool> __deleteVolume(const std::string& volumeId);
+
+  // The following methods are used to manage volume lifecycles. Transient
+  // states are omitted.
+  //
+  //                          +------------+
+  //                 +  +  +  |  CREATED   |  ^
+  //   _attachVolume |  |  |  +---+----^---+  |
+  //                 |  |  |      |    |      | _detachVolume
+  //                 |  |  |  +---v----+---+  |
+  //                 v  +  +  | NODE_READY |  +  ^
+  //                    |  |  +---+----^---+  |  |
+  //    __publishVolume |  |      |    |      |  | _unpublishVolume
+  //                    |  |  +---v----+---+  |  |
+  //                    v  +  | VOL_READY  |  +  +  ^
+  //                       |  +---+----^---+  |  |  |
+  //        _publishVolume |      |    |      |  |  | __unpublishVolume
+  //                       |  +---v----+---+  |  |  |
+  //                       V  | PUBLISHED  |  +  +  +
+  //                          +------------+
+
+  // Transition a volume to `NODE_READY` state from any state above.
+  process::Future<Nothing> _attachVolume(const std::string& volumeId);
+
+  // Transition a volume to `CREATED` state from any state below.
+  process::Future<Nothing> _detachVolume(const std::string& volumeId);
+
+  // Transition a volume to `PUBLISHED` state from any state above.
+  process::Future<Nothing> _publishVolume(const std::string& volumeId);
+
+  // Transition a volume to `VOL_READY` state from any state above.
+  process::Future<Nothing> __publishVolume(const std::string& volumeId);
+
+  // Transition a volume to `NODE_READY` state from any state below.
+  process::Future<Nothing> _unpublishVolume(const std::string& volumeId);
+
+  // Transition a volume to `VOL_READY` state from any state below.
+  process::Future<Nothing> __unpublishVolume(const std::string& volumeId);
+
+  void checkpointVolumeState(const std::string& volumeId);
+
+  void garbageCollectMountPath(const std::string& volumeId);
+
   const std::string rootDir;
   const CSIPluginInfo info;
   const hashset<Service> services;
@@ -98,6 +185,26 @@ private:
   process::grpc::client::Runtime runtime;
   Metrics* metrics;
   process::Owned<ServiceManager> serviceManager;
+
+  Option<std::string> bootId;
+  Option<PluginCapabilities> pluginCapabilities;
+  Option<ControllerCapabilities> controllerCapabilities;
+  Option<NodeCapabilities> nodeCapabilities;
+  Option<std::string> nodeId;
+
+  struct VolumeData
+  {
+    VolumeData(state::VolumeState&& _state)
+      : state(_state), sequence(new process::Sequence("csi-volume-sequence")) {}
+
+    state::VolumeState state;
+
+    // We call all CSI operations on the same volume in a sequence to ensure
+    // that they are processed in a sequential order.
+    process::Owned<process::Sequence> sequence;
+  };
+
+  hashmap<std::string, VolumeData> volumes;
 };
 
 } // namespace v0 {
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index c5a5213..fba5b18 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -18,29 +18,24 @@
 
 #include <algorithm>
 #include <cctype>
-#include <cstdlib>
 #include <functional>
 #include <list>
 #include <memory>
 #include <numeric>
 #include <queue>
-#include <type_traits>
 #include <utility>
 #include <vector>
 
 #include <glog/logging.h>
 
-#include <process/after.hpp>
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
-#include <process/dispatch.hpp>
 #include <process/future.hpp>
 #include <process/grpc.hpp>
 #include <process/id.hpp>
 #include <process/loop.hpp>
 #include <process/process.hpp>
-#include <process/sequence.hpp>
 
 #include <process/metrics/counter.hpp>
 #include <process/metrics/metrics.hpp>
@@ -74,13 +69,8 @@
 #include "common/protobuf_utils.hpp"
 #include "common/resources_utils.hpp"
 
-#include "csi/client.hpp"
 #include "csi/metrics.hpp"
 #include "csi/paths.hpp"
-#include "csi/rpc.hpp"
-#include "csi/service_manager.hpp"
-#include "csi/state.hpp"
-#include "csi/utils.hpp"
 #include "csi/volume_manager.hpp"
 
 #include "internal/devolve.hpp"
@@ -98,10 +88,6 @@
 
 namespace http = process::http;
 
-// TODO(chhsiao): Remove `using namespace` statements after refactoring.
-using namespace mesos::csi;
-using namespace mesos::csi::v0;
-
 using std::accumulate;
 using std::find;
 using std::list;
@@ -110,24 +96,18 @@ using std::shared_ptr;
 using std::string;
 using std::vector;
 
-using google::protobuf::Map;
-
-using process::after;
 using process::await;
-using process::Break;
 using process::collect;
 using process::Continue;
 using process::ControlFlow;
 using process::defer;
 using process::delay;
-using process::dispatch;
 using process::Failure;
 using process::Future;
 using process::loop;
 using process::Owned;
 using process::ProcessBase;
 using process::Promise;
-using process::Sequence;
 using process::spawn;
 
 using process::grpc::StatusError;
@@ -137,9 +117,8 @@ using process::http::authentication::Principal;
 using process::metrics::Counter;
 using process::metrics::PushGauge;
 
-using mesos::csi::ServiceManager;
-
-using mesos::csi::state::VolumeState;
+using mesos::csi::VolumeInfo;
+using mesos::csi::VolumeManager;
 
 using mesos::internal::protobuf::convertLabelsToStringMap;
 using mesos::internal::protobuf::convertStringMapToLabels;
@@ -339,140 +318,31 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 }
 
 
-template <
-    csi::v0::RPC rpc,
-    typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
-Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call(
-    const csi::Service& service,
-    const csi::v0::Request<rpc>& request,
-    const bool retry) // Make immutable in the following mutable lambda.
-{
-  Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
-
-  return loop(
-      self(),
-      [=] {
-        // Make the call to the latest service endpoint.
-        return serviceManager->getServiceEndpoint(service)
-          .then(defer(
-              self(),
-              &StorageLocalResourceProviderProcess::_call<rpc>,
-              lambda::_1,
-              request));
-      },
-      [=](const Try<csi::v0::Response<rpc>, StatusError>& result) mutable
-          -> Future<ControlFlow<csi::v0::Response<rpc>>> {
-        Option<Duration> backoff = retry
-          ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
-          : Option<Duration>::none();
-
-        maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
-
-        // We dispatch `__call` for testing purpose.
-        return dispatch(
-            self(),
-            &StorageLocalResourceProviderProcess::__call<rpc>,
-            result,
-            backoff);
-      });
-}
-
-
-template <csi::v0::RPC rpc>
-Future<Try<csi::v0::Response<rpc>, StatusError>>
-StorageLocalResourceProviderProcess::_call(
-    const string& endpoint, const csi::v0::Request<rpc>& request)
-{
-  ++metrics.csi_plugin_rpcs_pending.at(rpc);
-
-  return csi::v0::Client(endpoint, runtime).call<rpc>(request)
-    .onAny(defer(self(), [=](
-        const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) {
-      --metrics.csi_plugin_rpcs_pending.at(rpc);
-      if (future.isReady() && future->isSome()) {
-        ++metrics.csi_plugin_rpcs_successes.at(rpc);
-      } else if (future.isDiscarded()) {
-        ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
-      } else {
-        ++metrics.csi_plugin_rpcs_errors.at(rpc);
-      }
-    }));
-}
-
-
-template <csi::v0::RPC rpc>
-Future<ControlFlow<csi::v0::Response<rpc>>>
-StorageLocalResourceProviderProcess::__call(
-    const Try<csi::v0::Response<rpc>, StatusError>& result,
-    const Option<Duration>& backoff)
-{
-  if (result.isSome()) {
-    return Break(result.get());
-  }
-
-  if (backoff.isNone()) {
-    return Failure(result.error());
-  }
-
-  // See the link below for retryable status codes:
-  // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT
-  switch (result.error().status.error_code()) {
-    case grpc::DEADLINE_EXCEEDED:
-    case grpc::UNAVAILABLE: {
-      LOG(ERROR)
-        << "Received '" << result.error() << "' while calling " << rpc
-        << ". Retrying in " << backoff.get();
-
-      return after(backoff.get())
-        .then([]() -> Future<ControlFlow<csi::v0::Response<rpc>>> {
-          return Continue();
-        });
-    }
-    case grpc::CANCELLED:
-    case grpc::UNKNOWN:
-    case grpc::INVALID_ARGUMENT:
-    case grpc::NOT_FOUND:
-    case grpc::ALREADY_EXISTS:
-    case grpc::PERMISSION_DENIED:
-    case grpc::UNAUTHENTICATED:
-    case grpc::RESOURCE_EXHAUSTED:
-    case grpc::FAILED_PRECONDITION:
-    case grpc::ABORTED:
-    case grpc::OUT_OF_RANGE:
-    case grpc::UNIMPLEMENTED:
-    case grpc::INTERNAL:
-    case grpc::DATA_LOSS: {
-      return Failure(result.error());
-    }
-    case grpc::OK:
-    case grpc::DO_NOT_USE: {
-      UNREACHABLE();
-    }
-  }
-
-  UNREACHABLE();
-}
-
-
 void StorageLocalResourceProviderProcess::initialize()
 {
   const Principal principal = LocalResourceProvider::principal(info);
   CHECK(principal.claims.contains("cid_prefix"));
   const string& containerPrefix = principal.claims.at("cid_prefix");
 
-  rootDir = slave::paths::getCsiRootDir(workDir);
-  pluginInfo = info.storage().plugin();
-  services = {CONTROLLER_SERVICE, NODE_SERVICE};
-
-  serviceManager.reset(new ServiceManager(
+  Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create(
       extractParentEndpoint(url),
-      rootDir,
-      pluginInfo,
-      services,
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin(),
+      {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
       containerPrefix,
       authToken,
-      runtime,
-      &metrics));
+      &metrics);
+
+  if (volumeManager_.isError()) {
+    LOG(ERROR)
+      << "Failed to create CSI volume manager for resource provider with type '"
+      << info.type() << "' and name '" << info.name()
+      << "': " << volumeManager_.error();
+
+    fatal();
+  }
+
+  volumeManager = std::move(volumeManager_).get();
 
   auto die = [=](const string& message) {
     LOG(ERROR)
@@ -503,7 +373,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 {
   CHECK_EQ(RECOVERING, state);
 
-  return recoverVolumes()
+  return volumeManager->recover()
     .then(defer(self(), [=]() -> Future<Nothing> {
       // Recover the resource provider ID and state from the latest symlink. If
       // the symlink does not exist, this is a new resource provider, and the
@@ -604,147 +474,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
-{
-  Try<string> bootId_ = os::bootId();
-  if (bootId_.isError()) {
-    return Failure("Failed to get boot ID: " + bootId_.error());
-  }
-
-  bootId = bootId_.get();
-
-  return serviceManager->recover()
-    .then(process::defer(self(), &Self::prepareServices))
-    .then(process::defer(self(), [this]() -> Future<Nothing> {
-      // Recover the states of CSI volumes.
-      Try<list<string>> volumePaths =
-        paths::getVolumePaths(rootDir, pluginInfo.type(), pluginInfo.name());
-
-      if (volumePaths.isError()) {
-        return Failure(
-            "Failed to find volumes for CSI plugin type '" + pluginInfo.type() +
-            "' and name '" + pluginInfo.name() + "': " + volumePaths.error());
-      }
-
-      vector<Future<Nothing>> futures;
-
-      foreach (const string& path, volumePaths.get()) {
-        Try<paths::VolumePath> volumePath =
-          paths::parseVolumePath(rootDir, path);
-
-        if (volumePath.isError()) {
-          return Failure(
-              "Failed to parse volume path '" + path +
-              "': " + volumePath.error());
-        }
-
-        CHECK_EQ(pluginInfo.type(), volumePath->type);
-        CHECK_EQ(pluginInfo.name(), volumePath->name);
-
-        const string& volumeId = volumePath->volumeId;
-        const string statePath = paths::getVolumeStatePath(
-            rootDir, pluginInfo.type(), pluginInfo.name(), volumeId);
-
-        if (!os::exists(statePath)) {
-          continue;
-        }
-
-        Result<VolumeState> volumeState =
-          slave::state::read<VolumeState>(statePath);
-
-        if (volumeState.isError()) {
-          return Failure(
-              "Failed to read volume state from '" + statePath +
-              "': " + volumeState.error());
-        }
-
-        if (volumeState.isNone()) {
-          continue;
-        }
-
-        volumes.put(volumeId, std::move(volumeState.get()));
-        VolumeData& volume = volumes.at(volumeId);
-
-        if (!VolumeState::State_IsValid(volume.state.state())) {
-          return Failure("Volume '" + volumeId + "' is in INVALID state");
-        }
-
-        // First, if there is a node reboot after the volume is made
-        // publishable, it should be reset to `NODE_READY`.
-        switch (volume.state.state()) {
-          case VolumeState::CREATED:
-          case VolumeState::NODE_READY:
-          case VolumeState::CONTROLLER_PUBLISH:
-          case VolumeState::CONTROLLER_UNPUBLISH:
-          case VolumeState::NODE_STAGE: {
-            break;
-          }
-          case VolumeState::VOL_READY:
-          case VolumeState::PUBLISHED:
-          case VolumeState::NODE_UNSTAGE:
-          case VolumeState::NODE_PUBLISH:
-          case VolumeState::NODE_UNPUBLISH: {
-            if (bootId != volume.state.boot_id()) {
-              // Since this is a no-op, no need to checkpoint here.
-              volume.state.set_state(VolumeState::NODE_READY);
-              volume.state.clear_boot_id();
-            }
-
-            break;
-          }
-          case VolumeState::UNKNOWN: {
-            return Failure("Volume '" + volumeId + "' is in UNKNOWN state");
-          }
-
-          // NOTE: We avoid using a default clause for the following values in
-          // proto3's open enum to enable the compiler to detect missing enum
-          // cases for us. See: https://github.com/google/protobuf/issues/3917
-          case google::protobuf::kint32min:
-          case google::protobuf::kint32max: {
-            UNREACHABLE();
-          }
-        }
-
-        // Second, if the volume has been used by a container before recovery,
-        // we have to bring the volume back to `PUBLISHED` so data can be
-        // cleaned up synchronously when needed.
-        if (volume.state.node_publish_required()) {
-          futures.push_back(publishVolume(volumeId));
-        }
-      }
-
-      // Garbage collect leftover mount paths that were failed to remove before.
-      const string mountRootDir =
-        paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name());
-
-      Try<list<string>> mountPaths = paths::getMountPaths(mountRootDir);
-      if (mountPaths.isError()) {
-        // TODO(chhsiao): This could indicate that something is seriously wrong.
-        // To help debugging the problem, we should surface the error via
-        // MESOS-8745.
-        return Failure(
-            "Failed to find mount paths for CSI plugin type '" +
-            pluginInfo.type() + "' and name '" + pluginInfo.name() +
-            "': " + mountPaths.error());
-      }
-
-      foreach (const string& path, mountPaths.get()) {
-        Try<string> volumeId = paths::parseMountPath(mountRootDir, path);
-        if (volumeId.isError()) {
-          return Failure(
-              "Failed to parse mount path '" + path + "': " + volumeId.error());
-        }
-
-        if (!volumes.contains(volumeId.get())) {
-          garbageCollectMountPath(volumeId.get());
-        }
-      }
-
-      return process::collect(futures).then([] { return Nothing(); });
-    }));
-}
-
-
 void StorageLocalResourceProviderProcess::doReliableRegistration()
 {
   if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
@@ -1113,7 +842,7 @@ Future<Resources> StorageLocalResourceProviderProcess::getRawVolumes()
 {
   CHECK(info.has_id());
 
-  return listVolumes()
+  return volumeManager->listVolumes()
     .then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) {
       Resources resources;
 
@@ -1156,15 +885,15 @@ Future<Resources> StorageLocalResourceProviderProcess::getStoragePools()
   foreachpair (const string& profile,
                const DiskProfileAdaptor::ProfileInfo& profileInfo,
                profileInfos) {
-    futures.push_back(
-        getCapacity(profileInfo.capability, profileInfo.parameters)
-          .then(defer(self(), [=](const Bytes& capacity) -> Resources {
-            if (capacity == 0) {
-              return Resources();
-            }
+    futures.push_back(volumeManager->getCapacity(
+        profileInfo.capability, profileInfo.parameters)
+      .then(defer(self(), [=](const Bytes& capacity) -> Resources {
+        if (capacity == 0) {
+          return Resources();
+        }
 
-            return createRawDiskResource(info, capacity, profile, vendor);
-          })));
+        return createRawDiskResource(info, capacity, profile, vendor);
+      })));
   }
 
   return collect(futures)
@@ -1395,7 +1124,6 @@ void StorageLocalResourceProviderProcess::publishResources(
         case Resource::DiskInfo::Source::MOUNT:
         case Resource::DiskInfo::Source::BLOCK: {
           CHECK(resource.disk().source().has_id());
-          CHECK(volumes.contains(resource.disk().source().id()));
           volumeIds.insert(resource.disk().source().id());
           break;
         }
@@ -1418,7 +1146,7 @@ void StorageLocalResourceProviderProcess::publishResources(
     vector<Future<Nothing>> futures;
 
     foreach (const string& volumeId, volumeIds) {
-      futures.push_back(publishVolume(volumeId));
+      futures.push_back(volumeManager->publishVolume(volumeId));
     }
 
     allPublished = collect(futures);
@@ -1526,766 +1254,6 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareServices()
-{
-  CHECK(!services.empty());
-
-  // Get the plugin capabilities.
-  return call<GET_PLUGIN_CAPABILITIES>(
-      *services.begin(), GetPluginCapabilitiesRequest())
-    .then(process::defer(self(), [=](
-        const GetPluginCapabilitiesResponse& response) -> Future<Nothing> {
-      pluginCapabilities = response.capabilities();
-
-      if (services.contains(CONTROLLER_SERVICE) &&
-          !pluginCapabilities->controllerService) {
-        return Failure(
-            "CONTROLLER_SERVICE plugin capability is not supported for CSI "
-            "plugin type '" +
-            pluginInfo.type() + "' and name '" + pluginInfo.name() + "'");
-      }
-
-      return Nothing();
-    }))
-    // Check if all services have consistent plugin infos.
-    .then(process::defer(self(), [this] {
-      vector<Future<GetPluginInfoResponse>> futures;
-      foreach (const Service& service, services) {
-        futures.push_back(
-            call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest())
-              .onReady([service](const GetPluginInfoResponse& response) {
-                LOG(INFO) << service << " loaded: " << stringify(response);
-              }));
-      }
-
-      return process::collect(futures)
-        .then([](const vector<GetPluginInfoResponse>& pluginInfos) {
-          for (size_t i = 1; i < pluginInfos.size(); ++i) {
-            if (pluginInfos[i].name() != pluginInfos[0].name() ||
-                pluginInfos[i].vendor_version() !=
-                  pluginInfos[0].vendor_version()) {
-              LOG(WARNING) << "Inconsistent plugin services. Please check with "
-                              "the plugin vendor to ensure compatibility.";
-            }
-          }
-
-          return Nothing();
-        });
-    }))
-    // Get the controller capabilities.
-    .then(process::defer(self(), [this]() -> Future<Nothing> {
-      if (!services.contains(CONTROLLER_SERVICE)) {
-        controllerCapabilities = ControllerCapabilities();
-        return Nothing();
-      }
-
-      return call<CONTROLLER_GET_CAPABILITIES>(
-          CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest())
-        .then(process::defer(self(), [this](
-            const ControllerGetCapabilitiesResponse& response) {
-          controllerCapabilities = response.capabilities();
-          return Nothing();
-        }));
-    }))
-    // Get the node capabilities and ID.
-    .then(process::defer(self(), [this]() -> Future<Nothing> {
-      if (!services.contains(NODE_SERVICE)) {
-        nodeCapabilities = NodeCapabilities();
-        return Nothing();
-      }
-
-      return call<NODE_GET_CAPABILITIES>(
-          NODE_SERVICE, NodeGetCapabilitiesRequest())
-        .then(process::defer(self(), [this](
-            const NodeGetCapabilitiesResponse& response) -> Future<Nothing> {
-          nodeCapabilities = response.capabilities();
-
-          if (controllerCapabilities->publishUnpublishVolume) {
-            return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest())
-              .then(process::defer(self(), [this](
-                  const NodeGetIdResponse& response) {
-                nodeId = response.node_id();
-                return Nothing();
-              }));
-          }
-
-          return Nothing();
-        }));
-    }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::publishVolume(
-    const string& volumeId)
-{
-  if (!volumes.contains(volumeId)) {
-    return Failure("Cannot publish unknown volume '" + volumeId + "'");
-  }
-
-  VolumeData& volume = volumes.at(volumeId);
-
-  LOG(INFO) << "Publishing volume '" << volumeId << "' in "
-            << volume.state.state() << " state";
-
-  // Volume publishing is serialized with other operations on the same volume to
-  // avoid races.
-  return volume.sequence->add(std::function<Future<Nothing>()>(
-      process::defer(self(), &Self::_publishVolume, volumeId)));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::_attachVolume(
-    const string& volumeId)
-{
-  CHECK(volumes.contains(volumeId));
-  VolumeState& volumeState = volumes.at(volumeId).state;
-
-  if (volumeState.state() == VolumeState::NODE_READY) {
-    return Nothing();
-  }
-
-  if (volumeState.state() != VolumeState::CREATED &&
-      volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
-      volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
-    return Failure(
-        "Cannot attach volume '" + volumeId + "' in " +
-        stringify(volumeState.state()) + " state");
-  }
-
-  if (!controllerCapabilities->publishUnpublishVolume) {
-    // Since this is a no-op, no need to checkpoint here.
-    volumeState.set_state(VolumeState::NODE_READY);
-    return Nothing();
-  }
-
-  // A previously failed `ControllerUnpublishVolume` call can be recovered
-  // through an extra `ControllerUnpublishVolume` call. See:
-  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerunpublishvolume // NOLINT
-  if (volumeState.state() == VolumeState::CONTROLLER_UNPUBLISH) {
-    // Retry after recovering the volume to `CREATED` state.
-    return _detachVolume(volumeId)
-      .then(process::defer(self(), &Self::_attachVolume, volumeId));
-  }
-
-  if (volumeState.state() == VolumeState::CREATED) {
-    volumeState.set_state(VolumeState::CONTROLLER_PUBLISH);
-    checkpointVolumeState(volumeId);
-  }
-
-  LOG(INFO)
-    << "Calling '/csi.v0.Controller/ControllerPublishVolume' for volume '"
-    << volumeId << "'";
-
-  ControllerPublishVolumeRequest request;
-  request.set_volume_id(volumeId);
-  request.set_node_id(CHECK_NOTNONE(nodeId));
-  *request.mutable_volume_capability() =
-    csi::v0::evolve(volumeState.volume_capability());
-  request.set_readonly(false);
-  *request.mutable_volume_attributes() = volumeState.volume_attributes();
-
-  return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, std::move(request))
-    .then(process::defer(self(), [this, volumeId](
-        const ControllerPublishVolumeResponse& response) {
-      CHECK(volumes.contains(volumeId));
-      VolumeState& volumeState = volumes.at(volumeId).state;
-      volumeState.set_state(VolumeState::NODE_READY);
-      *volumeState.mutable_publish_info() = response.publish_info();
-
-      checkpointVolumeState(volumeId);
-
-      return Nothing();
-    }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::_detachVolume(
-    const string& volumeId)
-{
-  CHECK(volumes.contains(volumeId));
-  VolumeState& volumeState = volumes.at(volumeId).state;
-
-  if (volumeState.state() == VolumeState::CREATED) {
-    return Nothing();
-  }
-
-  if (volumeState.state() != VolumeState::NODE_READY &&
-      volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
-      volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
-    // Retry after transitioning the volume to `CREATED` state.
-    return _unpublishVolume(volumeId)
-      .then(process::defer(self(), &Self::_detachVolume, volumeId));
-  }
-
-  if (!controllerCapabilities->publishUnpublishVolume) {
-    // Since this is a no-op, no need to checkpoint here.
-    volumeState.set_state(VolumeState::CREATED);
-    return Nothing();
-  }
-
-  // A previously failed `ControllerPublishVolume` call can be recovered through
-  // the current `ControllerUnpublishVolume` call. See:
-  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
-  if (volumeState.state() == VolumeState::NODE_READY ||
-      volumeState.state() == VolumeState::CONTROLLER_PUBLISH) {
-    volumeState.set_state(VolumeState::CONTROLLER_UNPUBLISH);
-    checkpointVolumeState(volumeId);
-  }
-
-  LOG(INFO)
-    << "Calling '/csi.v0.Controller/ControllerUnpublishVolume' for volume '"
-    << volumeId << "'";
-
-  ControllerUnpublishVolumeRequest request;
-  request.set_volume_id(volumeId);
-  request.set_node_id(CHECK_NOTNONE(nodeId));
-
-  return call<CONTROLLER_UNPUBLISH_VOLUME>(
-      CONTROLLER_SERVICE, std::move(request))
-    .then(process::defer(self(), [this, volumeId] {
-      CHECK(volumes.contains(volumeId));
-      VolumeState& volumeState = volumes.at(volumeId).state;
-      volumeState.set_state(VolumeState::CREATED);
-      volumeState.mutable_publish_info()->clear();
-
-      checkpointVolumeState(volumeId);
-
-      return Nothing();
-    }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::_publishVolume(
-    const string& volumeId)
-{
-  CHECK(volumes.contains(volumeId));
-  VolumeState& volumeState = volumes.at(volumeId).state;
-
-  if (volumeState.state() == VolumeState::PUBLISHED) {
-    CHECK(volumeState.node_publish_required());
-    return Nothing();
-  }
-
-  if (volumeState.state() != VolumeState::VOL_READY &&
-      volumeState.state() != VolumeState::NODE_PUBLISH &&
-      volumeState.state() != VolumeState::NODE_UNPUBLISH) {
-    // Retry after transitioning the volume to `VOL_READY` state.
-    return __publishVolume(volumeId)
-      .then(process::defer(self(), &Self::_publishVolume, volumeId));
-  }
-
-  // A previously failed `NodeUnpublishVolume` call can be recovered through an
-  // extra `NodeUnpublishVolume` call. See:
-  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunpublishvolume // NOLINT
-  if (volumeState.state() == VolumeState::NODE_UNPUBLISH) {
-    // Retry after recovering the volume to `VOL_READY` state.
-    return __unpublishVolume(volumeId)
-      .then(process::defer(self(), &Self::_publishVolume, volumeId));
-  }
-
-  const string targetPath = paths::getMountTargetPath(
-      paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
-      volumeId);
-
-  // NOTE: The target path will be cleaned up during volume removal.
-  Try<Nothing> mkdir = os::mkdir(targetPath);
-  if (mkdir.isError()) {
-    return Failure(
-        "Failed to create mount target path '" + targetPath +
-        "': " + mkdir.error());
-  }
-
-  if (volumeState.state() == VolumeState::VOL_READY) {
-    volumeState.set_state(VolumeState::NODE_PUBLISH);
-    checkpointVolumeState(volumeId);
-  }
-
-  LOG(INFO) << "Calling '/csi.v0.Node/NodePublishVolume' for volume '"
-            << volumeId << "'";
-
-  NodePublishVolumeRequest request;
-  request.set_volume_id(volumeId);
-  *request.mutable_publish_info() = volumeState.publish_info();
-  request.set_target_path(targetPath);
-  *request.mutable_volume_capability() =
-    csi::v0::evolve(volumeState.volume_capability());
-  request.set_readonly(false);
-  *request.mutable_volume_attributes() = volumeState.volume_attributes();
-
-  if (nodeCapabilities->stageUnstageVolume) {
-    const string stagingPath = paths::getMountStagingPath(
-        paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
-        volumeId);
-
-    CHECK(os::exists(stagingPath));
-    request.set_staging_target_path(stagingPath);
-  }
-
-  return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
-    .then(defer(self(), [this, volumeId, targetPath] {
-      CHECK(volumes.contains(volumeId));
-      VolumeState& volumeState = volumes.at(volumeId).state;
-
-      volumeState.set_state(VolumeState::PUBLISHED);
-
-      // NOTE: This is the first time a container is going to consume the
-      // persistent volume, so the `node_publish_required` field is set to
-      // indicate that this volume must remain published so it can be
-      // synchronously cleaned up when the persistent volume is destroyed.
-      volumeState.set_node_publish_required(true);
-
-      checkpointVolumeState(volumeId);
-
-      return Nothing();
-    }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::__publishVolume(
-    const string& volumeId)
-{
-  CHECK(volumes.contains(volumeId));
-  VolumeState& volumeState = volumes.at(volumeId).state;
-
-  if (volumeState.state() == VolumeState::VOL_READY) {
-    CHECK(!volumeState.boot_id().empty());
-    return Nothing();
-  }
-
-  if (volumeState.state() != VolumeState::NODE_READY &&
-      volumeState.state() != VolumeState::NODE_STAGE &&
-      volumeState.state() != VolumeState::NODE_UNSTAGE) {
-    // Retry after transitioning the volume to `NODE_READY` state.
-    return _attachVolume(volumeId)
-      .then(process::defer(self(), &Self::__publishVolume, volumeId));
-  }
-
-  if (!nodeCapabilities->stageUnstageVolume) {
-    // Since this is a no-op, no need to checkpoint here.
-    volumeState.set_state(VolumeState::VOL_READY);
-    volumeState.set_boot_id(CHECK_NOTNONE(bootId));
-    return Nothing();
-  }
-
-  // A previously failed `NodeUnstageVolume` call can be recovered through an
-  // extra `NodeUnstageVolume` call. See:
-  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunstagevolume // NOLINT
-  if (volumeState.state() == VolumeState::NODE_UNSTAGE) {
-    // Retry after recovering the volume to `NODE_READY` state.
-    return _unpublishVolume(volumeId)
-      .then(process::defer(self(), &Self::__publishVolume, volumeId));
-  }
-
-  const string stagingPath = paths::getMountStagingPath(
-      paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
-      volumeId);
-
-  // NOTE: The staging path will be cleaned up in during volume removal.
-  Try<Nothing> mkdir = os::mkdir(stagingPath);
-  if (mkdir.isError()) {
-    return Failure(
-        "Failed to create mount staging path '" + stagingPath +
-        "': " + mkdir.error());
-  }
-
-  if (volumeState.state() == VolumeState::NODE_READY) {
-    volumeState.set_state(VolumeState::NODE_STAGE);
-    checkpointVolumeState(volumeId);
-  }
-
-  LOG(INFO) << "Calling '/csi.v0.Node/NodeStageVolume' for volume '" << volumeId
-            << "'";
-
-  NodeStageVolumeRequest request;
-  request.set_volume_id(volumeId);
-  *request.mutable_publish_info() = volumeState.publish_info();
-  request.set_staging_target_path(stagingPath);
-  *request.mutable_volume_capability() =
-    csi::v0::evolve(volumeState.volume_capability());
-  *request.mutable_volume_attributes() = volumeState.volume_attributes();
-
-  return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request))
-    .then(process::defer(self(), [this, volumeId] {
-      CHECK(volumes.contains(volumeId));
-      VolumeState& volumeState = volumes.at(volumeId).state;
-      volumeState.set_state(VolumeState::VOL_READY);
-      volumeState.set_boot_id(CHECK_NOTNONE(bootId));
-
-      checkpointVolumeState(volumeId);
-
-      return Nothing();
-    }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::_unpublishVolume(
-    const string& volumeId)
-{
-  CHECK(volumes.contains(volumeId));
-  VolumeState& volumeState = volumes.at(volumeId).state;
-
-  if (volumeState.state() == VolumeState::NODE_READY) {
-    CHECK(volumeState.boot_id().empty());
-    return Nothing();
-  }
-
-  if (volumeState.state() != VolumeState::VOL_READY &&
-      volumeState.state() != VolumeState::NODE_STAGE &&
-      volumeState.state() != VolumeState::NODE_UNSTAGE) {
-    // Retry after transitioning the volume to `VOL_READY` state.
-    return __unpublishVolume(volumeId)
-      .then(process::defer(self(), &Self::_unpublishVolume, volumeId));
-  }
-
-  if (!nodeCapabilities->stageUnstageVolume) {
-    // Since this is a no-op, no need to checkpoint here.
-    volumeState.set_state(VolumeState::NODE_READY);
-    volumeState.clear_boot_id();
-    return Nothing();
-  }
-
-  // A previously failed `NodeStageVolume` call can be recovered through the
-  // current `NodeUnstageVolume` call. See:
-  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
-  if (volumeState.state() == VolumeState::VOL_READY ||
-      volumeState.state() == VolumeState::NODE_STAGE) {
-    volumeState.set_state(VolumeState::NODE_UNSTAGE);
-    checkpointVolumeState(volumeId);
-  }
-
-  const string stagingPath = paths::getMountStagingPath(
-      paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
-      volumeId);
-
-  CHECK(os::exists(stagingPath));
-
-  LOG(INFO) << "Calling '/csi.v0.Node/NodeUnstageVolume' for volume '"
-            << volumeId << "'";
-
-  NodeUnstageVolumeRequest request;
-  request.set_volume_id(volumeId);
-  request.set_staging_target_path(stagingPath);
-
-  return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request))
-    .then(process::defer(self(), [this, volumeId] {
-      CHECK(volumes.contains(volumeId));
-      VolumeState& volumeState = volumes.at(volumeId).state;
-      volumeState.set_state(VolumeState::NODE_READY);
-      volumeState.clear_boot_id();
-
-      checkpointVolumeState(volumeId);
-
-      return Nothing();
-    }));
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::__unpublishVolume(
-    const string& volumeId)
-{
-  CHECK(volumes.contains(volumeId));
-  VolumeState& volumeState = volumes.at(volumeId).state;
-
-  if (volumeState.state() == VolumeState::VOL_READY) {
-    return Nothing();
-  }
-
-  if (volumeState.state() != VolumeState::PUBLISHED &&
-      volumeState.state() != VolumeState::NODE_PUBLISH &&
-      volumeState.state() != VolumeState::NODE_UNPUBLISH) {
-    return Failure(
-        "Cannot unpublish volume '" + volumeId + "' in " +
-        stringify(volumeState.state()) + "state");
-  }
-
-  // A previously failed `NodePublishVolume` call can be recovered through the
-  // current `NodeUnpublishVolume` call. See:
-  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
-  if (volumeState.state() == VolumeState::PUBLISHED ||
-      volumeState.state() == VolumeState::NODE_PUBLISH) {
-    volumeState.set_state(VolumeState::NODE_UNPUBLISH);
-    checkpointVolumeState(volumeId);
-  }
-
-  const string targetPath = paths::getMountTargetPath(
-      paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
-      volumeId);
-
-  CHECK(os::exists(targetPath));
-
-  LOG(INFO) << "Calling '/csi.v0.Node/NodeUnpublishVolume' for volume '"
-            << volumeId << "'";
-
-  NodeUnpublishVolumeRequest request;
-  request.set_volume_id(volumeId);
-  request.set_target_path(targetPath);
-
-  return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
-    .then(process::defer(self(), [this, volumeId] {
-      CHECK(volumes.contains(volumeId));
-      VolumeState& volumeState = volumes.at(volumeId).state;
-      volumeState.set_state(VolumeState::VOL_READY);
-
-      checkpointVolumeState(volumeId);
-
-      return Nothing();
-    }));
-}
-
-
-Future<VolumeInfo> StorageLocalResourceProviderProcess::createVolume(
-    const string& name,
-    const Bytes& capacity,
-    const types::VolumeCapability& capability,
-    const Map<string, string>& parameters)
-{
-  if (!controllerCapabilities->createDeleteVolume) {
-    return Failure(
-        "CREATE_DELETE_VOLUME controller capability is not supported for CSI "
-        "plugin type '" + info.type() + "' and name '" + info.name());
-  }
-
-  LOG(INFO) << "Creating volume with name '" << name << "'";
-
-  CreateVolumeRequest request;
-  request.set_name(name);
-  request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
-  request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
-  *request.add_volume_capabilities() = csi::v0::evolve(capability);
-  *request.mutable_parameters() = parameters;
-
-  // We retry the `CreateVolume` call for MESOS-9517.
-  return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
-    .then(process::defer(self(), [=](
-        const CreateVolumeResponse& response) -> Future<VolumeInfo> {
-      const string& volumeId = response.volume().id();
-
-      // NOTE: If the volume is already tracked, there might already be
-      // operations running in its sequence. Since this continuation runs
-      // outside the sequence, we fail the call here to avoid any race issue.
-      // This also means that this call is not idempotent.
-      if (volumes.contains(volumeId)) {
-        return Failure("Volume with name '" + name + "' already exists");
-      }
-
-      VolumeState volumeState;
-      volumeState.set_state(VolumeState::CREATED);
-      *volumeState.mutable_volume_capability() = capability;
-      *volumeState.mutable_parameters() = parameters;
-      *volumeState.mutable_volume_attributes() = response.volume().attributes();
-
-      volumes.put(volumeId, std::move(volumeState));
-      checkpointVolumeState(volumeId);
-
-      return VolumeInfo{capacity, volumeId, response.volume().attributes()};
-    }));
-}
-
-
-Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
-    const string& volumeId)
-{
-  if (!volumes.contains(volumeId)) {
-    return __deleteVolume(volumeId);
-  }
-
-  VolumeData& volume = volumes.at(volumeId);
-
-  LOG(INFO) << "Deleting volume '" << volumeId << "' in "
-            << volume.state.state() << " state";
-
-  // Volume deletion is sequentialized with other operations on the same volume
-  // to avoid races.
-  return volume.sequence->add(std::function<Future<bool>()>(
-      process::defer(self(), &Self::_deleteVolume, volumeId)));
-}
-
-
-Future<bool> StorageLocalResourceProviderProcess::_deleteVolume(
-    const std::string& volumeId)
-{
-  CHECK(volumes.contains(volumeId));
-  VolumeState& volumeState = volumes.at(volumeId).state;
-
-  if (volumeState.node_publish_required()) {
-    CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
-
-    const string targetPath = paths::getMountTargetPath(
-        paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
-        volumeId);
-
-    // NOTE: Normally the volume should have been cleaned up. However this may
-    // not be true for preprovisioned volumes (e.g., leftover from a previous
-    // resource provider instance). To prevent data leakage in such cases, we
-    // clean up the data (but not the target path) here.
-    Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
-    if (rmdir.isError()) {
-      return Failure(
-          "Failed to clean up volume '" + volumeId + "': " + rmdir.error());
-    }
-
-    volumeState.set_node_publish_required(false);
-    checkpointVolumeState(volumeId);
-  }
-
-  if (volumeState.state() != VolumeState::CREATED) {
-    // Retry after transitioning the volume to `CREATED` state.
-    return _detachVolume(volumeId)
-      .then(process::defer(self(), &Self::_deleteVolume, volumeId));
-  }
-
-  // NOTE: The last asynchronous continuation, which is supposed to be run in
-  // the volume's sequence, would cause the sequence to be destructed, which
-  // would in turn discard the returned future. However, since the continuation
-  // would have already been run, the returned future will become ready, making
-  // the future returned by the sequence ready as well.
-  return __deleteVolume(volumeId)
-    .then(process::defer(self(), [this, volumeId](bool deleted) {
-      volumes.erase(volumeId);
-
-      const string volumePath = paths::getVolumePath(
-          rootDir, pluginInfo.type(), pluginInfo.name(), volumeId);
-
-      Try<Nothing> rmdir = os::rmdir(volumePath);
-      CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
-                        << volumePath << "': " << rmdir.error();
-
-      garbageCollectMountPath(volumeId);
-
-      return deleted;
-    }));
-}
-
-
-Future<bool> StorageLocalResourceProviderProcess::__deleteVolume(
-    const string& volumeId)
-{
-  if (!controllerCapabilities->createDeleteVolume) {
-    return false;
-  }
-
-  LOG(INFO) << "Calling '/csi.v0.Controller/DeleteVolume' for volume '"
-            << volumeId << "'";
-
-  DeleteVolumeRequest request;
-  request.set_volume_id(volumeId);
-
-  // We retry the `DeleteVolume` call for MESOS-9517.
-  return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
-    .then([] { return true; });
-}
-
-
-Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume(
-    const VolumeInfo& volumeInfo,
-    const types::VolumeCapability& capability,
-    const Map<string, string>& parameters)
-{
-  // If the volume has been checkpointed, the validation succeeds only if the
-  // capability and parameters of the specified profile are the same as those in
-  // the checkpoint.
-  if (volumes.contains(volumeInfo.id)) {
-    const VolumeState& volumeState = volumes.at(volumeInfo.id).state;
-
-    if (volumeState.volume_capability() != capability) {
-      return Some(
-          Error("Mismatched capability for volume '" + volumeInfo.id + "'"));
-    }
-
-    if (volumeState.parameters() != parameters) {
-      return Some(
-          Error("Mismatched parameters for volume '" + volumeInfo.id + "'"));
-    }
-
-    return None();
-  }
-
-  if (!parameters.empty()) {
-    LOG(WARNING)
-      << "Validating volumes against parameters is not supported in CSI v0";
-  }
-
-  LOG(INFO) << "Validating volume '" << volumeInfo.id << "'";
-
-  ValidateVolumeCapabilitiesRequest request;
-  request.set_volume_id(volumeInfo.id);
-  *request.add_volume_capabilities() = csi::v0::evolve(capability);
-  *request.mutable_volume_attributes() = volumeInfo.context;
-
-  return call<VALIDATE_VOLUME_CAPABILITIES>(
-      CONTROLLER_SERVICE, std::move(request))
-    .then(process::defer(self(), [=](
-        const ValidateVolumeCapabilitiesResponse& response)
-        -> Future<Option<Error>> {
-      if (!response.supported()) {
-        return Error(
-            "Unsupported volume capability for volume '" + volumeInfo.id +
-            "': " + response.message());
-      }
-
-      // NOTE: If the volume is already tracked, there might already be
-      // operations running in its sequence. Since this continuation runs
-      // outside the sequence, we fail the call here to avoid any race issue.
-      // This also means that this call is not idempotent.
-      if (volumes.contains(volumeInfo.id)) {
-        return Failure("Volume '" + volumeInfo.id + "' already validated");
-      }
-
-      VolumeState volumeState;
-      volumeState.set_state(VolumeState::CREATED);
-      *volumeState.mutable_volume_capability() = capability;
-      *volumeState.mutable_parameters() = parameters;
-      *volumeState.mutable_volume_attributes() = volumeInfo.context;
-
-      volumes.put(volumeInfo.id, std::move(volumeState));
-      checkpointVolumeState(volumeInfo.id);
-
-      return None();
-    }));
-}
-
-
-Future<vector<VolumeInfo>> StorageLocalResourceProviderProcess::listVolumes()
-{
-  if (!controllerCapabilities->listVolumes) {
-    return vector<VolumeInfo>();
-  }
-
-  // TODO(chhsiao): Set the max entries and use a loop to do multiple
-  // `ListVolumes` calls.
-  return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest())
-    .then(process::defer(self(), [](const ListVolumesResponse& response) {
-      vector<VolumeInfo> result;
-      foreach (const auto& entry, response.entries()) {
-        result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()),
-                                    entry.volume().id(),
-                                    entry.volume().attributes()});
-      }
-
-      return result;
-    }));
-}
-
-
-Future<Bytes> StorageLocalResourceProviderProcess::getCapacity(
-    const types::VolumeCapability& capability,
-    const Map<string, string>& parameters)
-{
-  if (!controllerCapabilities->getCapacity) {
-    return Bytes(0);
-  }
-
-  GetCapacityRequest request;
-  *request.add_volume_capabilities() = csi::v0::evolve(capability);
-  *request.mutable_parameters() = parameters;
-
-  return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request))
-    .then([](const GetCapacityResponse& response) {
-      return Bytes(response.available_capacity());
-    });
-}
-
-
 Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
     const id::UUID& operationUuid)
 {
@@ -2519,7 +1487,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
   // afterward. See MESOS-9254.
   Future<VolumeInfo> created;
   if (resource.disk().source().has_profile()) {
-    created = createVolume(
+    created = volumeManager->createVolume(
         operationUuid.toString(),
         resource.scalar().value() * Bytes::MEGABYTES,
         profileInfo.capability,
@@ -2532,7 +1500,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
           resource.disk().source().metadata()))
     };
 
-    created = validateVolume(
+    created = volumeManager->validateVolume(
         volumeInfo, profileInfo.capability, profileInfo.parameters)
       .then([resource, profile, volumeInfo](
           const Option<Error>& error) -> Future<VolumeInfo> {
@@ -2596,7 +1564,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
   CHECK(!Resources::isPersistentVolume(resource));
   CHECK(resource.disk().source().has_id());
 
-  return deleteVolume(resource.disk().source().id())
+  return volumeManager->deleteVolume(resource.disk().source().id())
     .then(defer(self(), [=](bool deprovisioned) {
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_type(
@@ -2669,6 +1637,7 @@ StorageLocalResourceProviderProcess::applyCreate(
 
   foreach (const Resource& resource, operation.create().volumes()) {
     CHECK(Resources::isPersistentVolume(resource));
+    CHECK(resource.disk().source().has_id());
 
     // TODO(chhsiao): Support persistent BLOCK volumes.
     if (resource.disk().source().type() != Resource::DiskInfo::Source::MOUNT) {
@@ -2677,6 +1646,12 @@ StorageLocalResourceProviderProcess::applyCreate(
           stringify(resource.disk().persistence().id()) + "' on a " +
           stringify(resource.disk().source().type()) + " disk");
     }
+
+    // TODO(chhsiao): Ideally, we could perform a sanity check to verify that
+    // the target path is empty before creating a new persistent volume.
+    // However, right now we cannot distinguish the case where a framework is
+    // recreating its own persistent volume after the agent ID changes from the
+    // case where existing data is being leaked to another framework.
   }
 
   return getResourceConversions(operation);
@@ -2690,43 +1665,32 @@ StorageLocalResourceProviderProcess::applyDestroy(
   CHECK(operation.has_destroy());
 
   foreach (const Resource& resource, operation.destroy().volumes()) {
-    // TODO(chhsiao): Support cleaning up persistent BLOCK volumes, presumably
-    // with `dd` or any other utility to zero out the block device.
     CHECK(Resources::isPersistentVolume(resource));
-    CHECK(resource.disk().source().type() == Resource::DiskInfo::Source::MOUNT);
     CHECK(resource.disk().source().has_id());
 
-    const string& volumeId = resource.disk().source().id();
-    CHECK(volumes.contains(volumeId));
-
-    const VolumeState& volumeState = volumes.at(volumeId).state;
-
-    // NOTE: Data can only be written to the persistent volume when when it is
-    // in `PUBLISHED` state (i.e., mounted). Once a volume has been transitioned
-    // to `PUBLISHED`, we will set the `node_publish_required` field and always
-    // recover it back to `PUBLISHED` after a failover, until a `DESTROY_DISK`
-    // is applied, which only comes after `DESTROY`. So we only need to clean up
-    // the volume if it has the field set.
-    if (!volumeState.node_publish_required()) {
-      continue;
-    }
-
-    CHECK_EQ(VolumeState::PUBLISHED, volumeState.state());
+    // TODO(chhsiao): Support cleaning up persistent BLOCK volumes, presumably
+    // with `dd` or any other utility to zero out the block device.
+    CHECK_EQ(Resource::DiskInfo::Source::MOUNT,
+             resource.disk().source().type());
 
     const string targetPath = csi::paths::getMountTargetPath(
         csi::paths::getMountRootDir(
             slave::paths::getCsiRootDir(workDir),
             info.storage().plugin().type(),
             info.storage().plugin().name()),
-        volumeId);
-
-    // Only the data in the target path, but not itself, should be removed.
-    Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
-    if (rmdir.isError()) {
-      return Error(
-          "Failed to remove persistent volume '" +
-          stringify(resource.disk().persistence().id()) + "' at '" +
-          targetPath + "': " + rmdir.error());
+        resource.disk().source().id());
+
+    if (os::exists(targetPath)) {
+      // NOTE: We always clean up the data in the target path (but not the
+      // directory itself) even if the volume is not published, in which case
+      // this should be a no-op.
+      Try<Nothing> rmdir = os::rmdir(targetPath, true, false);
+      if (rmdir.isError()) {
+        return Error(
+            "Failed to remove persistent volume '" +
+            stringify(resource.disk().persistence().id()) + "' at '" +
+            targetPath + "': " + rmdir.error());
+      }
     }
   }
 
@@ -2854,28 +1818,6 @@ void StorageLocalResourceProviderProcess::garbageCollectOperationPath(
 }
 
 
-void StorageLocalResourceProviderProcess::garbageCollectMountPath(
-    const string& volumeId)
-{
-  CHECK(!volumes.contains(volumeId));
-
-  const string path = csi::paths::getMountPath(
-      csi::paths::getMountRootDir(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name()),
-      volumeId);
-
-  if (os::exists(path)) {
-    Try<Nothing> rmdir = os::rmdir(path);
-    if (rmdir.isError()) {
-      LOG(ERROR)
-        << "Failed to remove directory '" << path << "': " << rmdir.error();
-    }
-  }
-}
-
-
 void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 {
   ResourceProviderState state;
@@ -2925,26 +1867,6 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 }
 
 
-void StorageLocalResourceProviderProcess::checkpointVolumeState(
-    const string& volumeId)
-{
-  const string statePath = csi::paths::getVolumeStatePath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      volumeId);
-
-  // NOTE: We ensure the checkpoint is synced to the filesystem to avoid
-  // resulting in a stale or empty checkpoint when a system crash happens.
-  Try<Nothing> checkpoint =
-    slave::state::checkpoint(statePath, volumes.at(volumeId).state, true);
-
-  CHECK_SOME(checkpoint)
-    << "Failed to checkpoint volume state to '" << statePath << "':"
-    << checkpoint.error();
-}
-
-
 void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
 {
   Call call;
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
index 56d3682..6e14d90 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -19,7 +19,6 @@
 
 #include <memory>
 #include <string>
-#include <type_traits>
 #include <vector>
 
 #include <mesos/mesos.hpp>
@@ -32,9 +31,7 @@
 #include <mesos/v1/resource_provider.hpp>
 
 #include <process/future.hpp>
-#include <process/grpc.hpp>
 #include <process/http.hpp>
-#include <process/loop.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/sequence.hpp>
@@ -42,8 +39,6 @@
 #include <process/metrics/counter.hpp>
 #include <process/metrics/push_gauge.hpp>
 
-#include <stout/bytes.hpp>
-#include <stout/duration.hpp>
 #include <stout/hashset.hpp>
 #include <stout/linkedhashmap.hpp>
 #include <stout/nothing.hpp>
@@ -52,10 +47,6 @@
 #include <stout/uuid.hpp>
 
 #include "csi/metrics.hpp"
-#include "csi/rpc.hpp"
-#include "csi/service_manager.hpp"
-#include "csi/state.hpp"
-#include "csi/utils.hpp"
 #include "csi/volume_manager.hpp"
 
 #include "status_update_manager/operation.hpp"
@@ -63,18 +54,6 @@
 namespace mesos {
 namespace internal {
 
-// Storage local resource provider initially picks a random amount of time
-// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI
-// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent
-// retries are exponentially backed off based on this interval (e.g., 2nd retry
-// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`,
-// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`.
-//
-// TODO(chhsiao): Make the retry parameters configurable.
-constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10);
-constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10);
-
-
 class StorageLocalResourceProviderProcess
   : public process::Process<StorageLocalResourceProviderProcess>
 {
@@ -97,57 +76,11 @@ public:
   void disconnected();
   void received(const resource_provider::Event& event);
 
-  // Wrapper functions to make CSI calls and update RPC metrics. Made public for
-  // testing purpose.
-  //
-  // The call is made asynchronously and thus no guarantee is provided on the
-  // order in which calls are sent. Callers need to either ensure to not have
-  // multiple conflicting calls in flight, or treat results idempotently.
-  //
-  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
-  // calls on the same volume, and 2) no profile update while there are ongoing
-  // `CREATE_DISK` or `DESTROY_DISK` operations.
-  //
-  // NOTE: Since this function uses `getService` to obtain the latest service
-  // future, which depends on probe results, it is disabled for making probe
-  // calls; `_call` should be used directly instead.
-  template <
-      csi::v0::RPC rpc,
-      typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
-  process::Future<csi::v0::Response<rpc>> call(
-      const csi::Service& service,
-      const csi::v0::Request<rpc>& request,
-      bool retry = false);
-
-  template <csi::v0::RPC rpc>
-  process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>>
-  _call(const std::string& endpoint, const csi::v0::Request<rpc>& request);
-
-  template <csi::v0::RPC rpc>
-  process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call(
-      const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result,
-      const Option<Duration>& backoff);
-
 private:
-  struct VolumeData
-  {
-    VolumeData(csi::state::VolumeState&& _state)
-      : state(_state), sequence(new process::Sequence("volume-sequence")) {}
-
-    csi::state::VolumeState state;
-
-    // We run all CSI operations for the same volume on a sequence to
-    // ensure that they are processed in a sequential order.
-    process::Owned<process::Sequence> sequence;
-  };
-
   void initialize() override;
   void fatal();
 
-  // The recover functions are responsible to recover the state of the
-  // resource provider and CSI volumes from checkpointed data.
   process::Future<Nothing> recover();
-  process::Future<Nothing> recoverVolumes();
 
   void doReliableRegistration();
 
@@ -192,73 +125,6 @@ private:
   void reconcileOperations(
       const resource_provider::Event::ReconcileOperations& reconcile);
 
-  process::Future<Nothing> prepareServices();
-
-  process::Future<Nothing> publishVolume(const std::string& volumeId);
-
-  // The following methods are used to manage volume lifecycles. Transient
-  // states are omitted.
-  //
-  //                          +------------+
-  //                 +  +  +  |  CREATED   |  ^
-  //   _attachVolume |  |  |  +---+----^---+  |
-  //                 |  |  |      |    |      | _detachVolume
-  //                 |  |  |  +---v----+---+  |
-  //                 v  +  +  | NODE_READY |  +  ^
-  //                    |  |  +---+----^---+  |  |
-  //    __publishVolume |  |      |    |      |  | _unpublishVolume
-  //                    |  |  +---v----+---+  |  |
-  //                    v  +  | VOL_READY  |  +  +  ^
-  //                       |  +---+----^---+  |  |  |
-  //        _publishVolume |      |    |      |  |  | __unpublishVolume
-  //                       |  +---v----+---+  |  |  |
-  //                       V  | PUBLISHED  |  +  +  +
-  //                          +------------+
-
-  // Transition a volume to `NODE_READY` state from any state above.
-  process::Future<Nothing> _attachVolume(const std::string& volumeId);
-
-  // Transition a volume to `CREATED` state from any state below.
-  process::Future<Nothing> _detachVolume(const std::string& volumeId);
-
-  // Transition a volume to `PUBLISHED` state from any state above.
-  process::Future<Nothing> _publishVolume(const std::string& volumeId);
-
-  // Transition a volume to `VOL_READY` state from any state above.
-  process::Future<Nothing> __publishVolume(const std::string& volumeId);
-
-  // Transition a volume to `NODE_READY` state from any state below.
-  process::Future<Nothing> _unpublishVolume(const std::string& volumeId);
-
-  // Transition a volume to `VOL_READY` state from any state below.
-  process::Future<Nothing> __unpublishVolume(const std::string& volumeId);
-
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<csi::VolumeInfo> createVolume(
-      const std::string& name,
-      const Bytes& capacity,
-      const csi::types::VolumeCapability& capability,
-      const google::protobuf::Map<std::string, std::string>& parameters);
-
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<bool> deleteVolume(const std::string& volumeId);
-  process::Future<bool> _deleteVolume(const std::string& volumeId);
-  process::Future<bool> __deleteVolume(const std::string& volumeId);
-
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<Option<Error>> validateVolume(
-      const csi::VolumeInfo& volumeInfo,
-      const csi::types::VolumeCapability& capability,
-      const google::protobuf::Map<std::string, std::string>& parameters);
-
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<std::vector<csi::VolumeInfo>> listVolumes();
-
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<Bytes> getCapacity(
-      const csi::types::VolumeCapability& capability,
-      const google::protobuf::Map<std::string, std::string>& parameters);
-
   // Applies the operation. Speculative operations will be synchronously
   // applied. Do nothing if the operation is already in a terminal state.
   process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
@@ -295,10 +161,8 @@ private:
       const Try<std::vector<ResourceConversion>>& conversions);
 
   void garbageCollectOperationPath(const id::UUID& operationUuid);
-  void garbageCollectMountPath(const std::string& volumeId);
 
   void checkpointResourceProviderState();
-  void checkpointVolumeState(const std::string& volumeId);
 
   void sendResourceProviderStateUpdate();
 
@@ -326,25 +190,13 @@ private:
 
   std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
-  process::grpc::client::Runtime runtime;
   process::Owned<v1::resource_provider::Driver> driver;
   OperationStatusUpdateManager statusUpdateManager;
 
   // The mapping of known profiles fetched from the DiskProfileAdaptor.
   hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos;
 
-  process::Owned<csi::ServiceManager> serviceManager;
-
-  // TODO(chhsiao): Remove the following variables after refactoring.
-  std::string rootDir;
-  CSIPluginInfo pluginInfo;
-  hashset<csi::Service> services;
-
-  Option<std::string> bootId;
-  Option<csi::v0::PluginCapabilities> pluginCapabilities;
-  Option<csi::v0::ControllerCapabilities> controllerCapabilities;
-  Option<csi::v0::NodeCapabilities> nodeCapabilities;
-  Option<std::string> nodeId;
+  process::Owned<csi::VolumeManager> volumeManager;
 
   // We maintain the following invariant: if one operation depends on
   // another, they cannot be in PENDING state at the same time, i.e.,
@@ -356,7 +208,6 @@ private:
   LinkedHashMap<id::UUID, Operation> operations;
   Resources totalResources;
   id::UUID resourceVersion;
-  hashmap<std::string, VolumeData> volumes;
 
   // If pending, it means that the storage pools are being reconciled, and all
   // incoming operations that disallow reconciliation will be dropped.
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 0fbd602..bb71935 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -43,6 +43,7 @@
 #include "csi/paths.hpp"
 #include "csi/rpc.hpp"
 #include "csi/state.hpp"
+#include "csi/v0_volume_manager_process.hpp"
 
 #include "linux/fs.hpp"
 
@@ -50,8 +51,6 @@
 
 #include "module/manager.hpp"
 
-#include "resource_provider/storage/provider_process.hpp"
-
 #include "slave/container_daemon_process.hpp"
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
@@ -5054,7 +5053,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
   ASSERT_EQ(0u, createVolumeRequests.size());
 
   Future<Nothing> createVolumeCall = FUTURE_DISPATCH(
-      _, &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>);
+      _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>);
 
   // Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call.
   createVolumeResults.put(
@@ -5062,7 +5061,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
 
   AWAIT_READY(createVolumeCall);
 
-  Duration createVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+  Duration createVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
 
   // Settle the clock to ensure that the retry timer has been set, then advance
   // the clock by the maximum backoff to trigger a retry.
@@ -5080,15 +5079,14 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
     ASSERT_EQ(0u, createVolumeRequests.size());
 
     createVolumeCall = FUTURE_DISPATCH(
-        _,
-        &StorageLocalResourceProviderProcess::__call<csi::v0::CREATE_VOLUME>);
+        _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>);
 
     createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
 
     AWAIT_READY(createVolumeCall);
 
-    createVolumeBackoff =
-      std::min(createVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+    createVolumeBackoff = std::min(
+        createVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX);
 
     // Settle the clock to ensure that the retry timer has been set, then
     // advance the clock by the maximum backoff to trigger a retry.
@@ -5142,7 +5140,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
   ASSERT_EQ(0u, deleteVolumeRequests.size());
 
   Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH(
-      _, &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>);
+      _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>);
 
   // Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call.
   deleteVolumeResults.put(
@@ -5150,7 +5148,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
 
   AWAIT_READY(deleteVolumeCall);
 
-  Duration deleteVolumeBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+  Duration deleteVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
 
   // Settle the clock to ensure that the retry timer has been set, then advance
   // the clock by the maximum backoff to trigger a retry.
@@ -5168,15 +5166,14 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
     ASSERT_EQ(0u, deleteVolumeRequests.size());
 
     deleteVolumeCall = FUTURE_DISPATCH(
-        _,
-        &StorageLocalResourceProviderProcess::__call<csi::v0::DELETE_VOLUME>);
+        _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>);
 
     deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
 
     AWAIT_READY(deleteVolumeCall);
 
-    deleteVolumeBackoff =
-      std::min(deleteVolumeBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX);
+    deleteVolumeBackoff = std::min(
+        deleteVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX);
 
     // Settle the clock to ensure that the retry timer has been set, then
     // advance the clock by the maximum backoff to trigger a retry.