You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:03 UTC

[mesos] 06/15: Cleanup volume attaching and publishing for SLRP.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 287dbaa65429c8a18e12151e054d6652c75d5181
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:23:48 2019 -0700

    Cleanup volume attaching and publishing for SLRP.
    
    This patch introduces methods for volume attaching and publishing that
    conform to `VolumeManager`'s public interface in SLRP, and cleans up
    SLRP based on these functions. They will be moved out from SLRP to v0
    `VolumeManager` later.
    
    Volume attaching and publishing are implemented with internal helpers,
    each individually deals with one steady and two transient states, and
    makes a proper CSI call to achieve its goal state. If the given volume
    is not in the designed state, it would recursively call other helpers
    to transition the volume to the designed state first.
    
    These helper functions are serialized through the volume's own sequence
    to avoid racing operations on the same volume.
    
    Review: https://reviews.apache.org/r/70215/
---
 src/resource_provider/storage/provider.cpp         | 560 ++++++++++-----------
 src/resource_provider/storage/provider_process.hpp |  69 +--
 2 files changed, 300 insertions(+), 329 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 428d239..84e5557 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -1550,130 +1550,80 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareServices()
 Future<Nothing> StorageLocalResourceProviderProcess::publishVolume(
     const string& volumeId)
 {
-  CHECK(volumes.contains(volumeId));
-
-  // To avoid any race with, e.g., `deleteVolume` calls, we sequentialize this
-  // lambda with any other operation on the same volume below, so the volume is
-  // guaranteed to exist in the deferred execution.
-  std::function<Future<Nothing>()> controllerAndNodePublish =
-    defer(self(), [this, volumeId] {
-      CHECK(volumes.contains(volumeId));
-      const VolumeData& volume = volumes.at(volumeId);
-
-      Future<Nothing> published = Nothing();
-
-      CHECK(VolumeState::State_IsValid(volume.state.state()));
-
-      switch (volume.state.state()) {
-        case VolumeState::CONTROLLER_UNPUBLISH: {
-          published = published
-            .then(defer(self(), &Self::controllerUnpublish, volumeId));
-
-          // NOTE: We continue to the next case to publish the volume in
-          // `CREATED` state once the above is done.
-        }
-        case VolumeState::CREATED:
-        case VolumeState::CONTROLLER_PUBLISH: {
-          published = published
-            .then(defer(self(), &Self::controllerPublish, volumeId))
-            .then(defer(self(), &Self::nodeStage, volumeId))
-            .then(defer(self(), &Self::nodePublish, volumeId));
-
-          break;
-        }
-        case VolumeState::NODE_UNSTAGE: {
-          published = published
-            .then(defer(self(), &Self::nodeUnstage, volumeId));
-
-          // NOTE: We continue to the next case to publish the volume in
-          // `NODE_READY` state once the above is done.
-        }
-        case VolumeState::NODE_READY:
-        case VolumeState::NODE_STAGE: {
-          published = published
-            .then(defer(self(), &Self::nodeStage, volumeId))
-            .then(defer(self(), &Self::nodePublish, volumeId));
-
-          break;
-        }
-        case VolumeState::NODE_UNPUBLISH: {
-          published = published
-            .then(defer(self(), &Self::nodeUnpublish, volumeId));
-
-          // NOTE: We continue to the next case to publish the volume in
-          // `VOL_READY` state once the above is done.
-        }
-        case VolumeState::VOL_READY:
-        case VolumeState::NODE_PUBLISH: {
-          published = published
-            .then(defer(self(), &Self::nodePublish, volumeId));
-
-          break;
-        }
-        case VolumeState::PUBLISHED: {
-          break;
-        }
-        case VolumeState::UNKNOWN: {
-          UNREACHABLE();
-        }
+  if (!volumes.contains(volumeId)) {
+    return Failure("Cannot publish unknown volume '" + volumeId + "'");
+  }
 
-        // NOTE: We avoid using a default clause for the following
-        // values in proto3's open enum to enable the compiler to detect
-        // missing enum cases for us. See:
-        // https://github.com/google/protobuf/issues/3917
-        case google::protobuf::kint32min:
-        case google::protobuf::kint32max: {
-          UNREACHABLE();
-        }
-      }
+  VolumeData& volume = volumes.at(volumeId);
 
-      return published;
-    });
+  LOG(INFO) << "Publishing volume '" << volumeId << "' in "
+            << volume.state.state() << " state";
 
-    return volumes.at(volumeId).sequence->add(controllerAndNodePublish);
+  // Volume publishing is serialized with other operations on the same volume to
+  // avoid races.
+  return volume.sequence->add(std::function<Future<Nothing>()>(
+      process::defer(self(), &Self::_publishVolume, volumeId)));
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
+Future<Nothing> StorageLocalResourceProviderProcess::_attachVolume(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  VolumeData& volume = volumes.at(volumeId);
+  VolumeState& volumeState = volumes.at(volumeId).state;
 
-  if (!controllerCapabilities->publishUnpublishVolume) {
-    CHECK_EQ(VolumeState::CREATED, volume.state.state());
+  if (volumeState.state() == VolumeState::NODE_READY) {
+    return Nothing();
+  }
 
-    volume.state.set_state(VolumeState::NODE_READY);
-    checkpointVolumeState(volumeId);
+  if (volumeState.state() != VolumeState::CREATED &&
+      volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
+      volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
+    return Failure(
+        "Cannot attach volume '" + volumeId + "' in " +
+        stringify(volumeState.state()) + " state");
+  }
 
+  if (!controllerCapabilities->publishUnpublishVolume) {
+    // Since this is a no-op, no need to checkpoint here.
+    volumeState.set_state(VolumeState::NODE_READY);
     return Nothing();
   }
 
-  if (volume.state.state() == VolumeState::CREATED) {
-    volume.state.set_state(VolumeState::CONTROLLER_PUBLISH);
-    checkpointVolumeState(volumeId);
+  // A previously failed `ControllerUnpublishVolume` call can be recovered
+  // through an extra `ControllerUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerunpublishvolume // NOLINT
+  if (volumeState.state() == VolumeState::CONTROLLER_UNPUBLISH) {
+    // Retry after recovering the volume to `CREATED` state.
+    return _detachVolume(volumeId)
+      .then(process::defer(self(), &Self::_attachVolume, volumeId));
   }
 
-  CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state());
+  if (volumeState.state() == VolumeState::CREATED) {
+    volumeState.set_state(VolumeState::CONTROLLER_PUBLISH);
+    checkpointVolumeState(volumeId);
+  }
 
-  CHECK_SOME(nodeId);
+  LOG(INFO)
+    << "Calling '/csi.v0.Controller/ControllerPublishVolume' for volume '"
+    << volumeId << "'";
 
-  csi::v0::ControllerPublishVolumeRequest request;
+  ControllerPublishVolumeRequest request;
   request.set_volume_id(volumeId);
-  request.set_node_id(nodeId.get());
+  request.set_node_id(CHECK_NOTNONE(nodeId));
   *request.mutable_volume_capability() =
-    csi::v0::evolve(volume.state.volume_capability());
+    csi::v0::evolve(volumeState.volume_capability());
   request.set_readonly(false);
-  *request.mutable_volume_attributes() = volume.state.volume_attributes();
+  *request.mutable_volume_attributes() = volumeState.volume_attributes();
 
-  return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
-      csi::CONTROLLER_SERVICE, std::move(request))
-    .then(defer(self(), [this, volumeId](
-        const csi::v0::ControllerPublishVolumeResponse& response) {
-      VolumeData& volume = volumes.at(volumeId);
+  return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId](
+        const ControllerPublishVolumeResponse& response) {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::NODE_READY);
+      *volumeState.mutable_publish_info() = response.publish_info();
 
-      volume.state.set_state(VolumeState::NODE_READY);
-      *volume.state.mutable_publish_info() = response.publish_info();
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -1681,45 +1631,55 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
+Future<Nothing> StorageLocalResourceProviderProcess::_detachVolume(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  VolumeData& volume = volumes.at(volumeId);
+  VolumeState& volumeState = volumes.at(volumeId).state;
 
-  if (!controllerCapabilities->publishUnpublishVolume) {
-    CHECK_EQ(VolumeState::NODE_READY, volume.state.state());
+  if (volumeState.state() == VolumeState::CREATED) {
+    return Nothing();
+  }
 
-    volume.state.set_state(VolumeState::CREATED);
-    checkpointVolumeState(volumeId);
+  if (volumeState.state() != VolumeState::NODE_READY &&
+      volumeState.state() != VolumeState::CONTROLLER_PUBLISH &&
+      volumeState.state() != VolumeState::CONTROLLER_UNPUBLISH) {
+    // Retry after transitioning the volume to `CREATED` state.
+    return _unpublishVolume(volumeId)
+      .then(process::defer(self(), &Self::_detachVolume, volumeId));
+  }
 
+  if (!controllerCapabilities->publishUnpublishVolume) {
+    // Since this is a no-op, no need to checkpoint here.
+    volumeState.set_state(VolumeState::CREATED);
     return Nothing();
   }
 
   // A previously failed `ControllerPublishVolume` call can be recovered through
   // the current `ControllerUnpublishVolume` call. See:
   // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
-  if (volume.state.state() == VolumeState::NODE_READY ||
-      volume.state.state() == VolumeState::CONTROLLER_PUBLISH) {
-    volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH);
+  if (volumeState.state() == VolumeState::NODE_READY ||
+      volumeState.state() == VolumeState::CONTROLLER_PUBLISH) {
+    volumeState.set_state(VolumeState::CONTROLLER_UNPUBLISH);
     checkpointVolumeState(volumeId);
   }
 
-  CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state());
-
-  CHECK_SOME(nodeId);
+  LOG(INFO)
+    << "Calling '/csi.v0.Controller/ControllerUnpublishVolume' for volume '"
+    << volumeId << "'";
 
-  csi::v0::ControllerUnpublishVolumeRequest request;
+  ControllerUnpublishVolumeRequest request;
   request.set_volume_id(volumeId);
-  request.set_node_id(nodeId.get());
+  request.set_node_id(CHECK_NOTNONE(nodeId));
 
-  return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
-      csi::CONTROLLER_SERVICE, std::move(request))
-    .then(defer(self(), [this, volumeId] {
-      VolumeData& volume = volumes.at(volumeId);
+  return call<CONTROLLER_UNPUBLISH_VOLUME>(
+      CONTROLLER_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::CREATED);
+      volumeState.mutable_publish_info()->clear();
 
-      volume.state.set_state(VolumeState::CREATED);
-      volume.state.mutable_publish_info()->clear();
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -1727,58 +1687,85 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
+Future<Nothing> StorageLocalResourceProviderProcess::_publishVolume(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  VolumeData& volume = volumes.at(volumeId);
+  VolumeState& volumeState = volumes.at(volumeId).state;
 
-  if (!nodeCapabilities->stageUnstageVolume) {
-    CHECK_EQ(VolumeState::NODE_READY, volume.state.state());
+  if (volumeState.state() == VolumeState::PUBLISHED) {
+    CHECK(volumeState.node_publish_required());
+    return Nothing();
+  }
 
-    volume.state.set_state(VolumeState::VOL_READY);
-    volume.state.set_boot_id(CHECK_NOTNONE(bootId));
-    checkpointVolumeState(volumeId);
+  if (volumeState.state() != VolumeState::VOL_READY &&
+      volumeState.state() != VolumeState::NODE_PUBLISH &&
+      volumeState.state() != VolumeState::NODE_UNPUBLISH) {
+    // Retry after transitioning the volume to `VOL_READY` state.
+    return __publishVolume(volumeId)
+      .then(process::defer(self(), &Self::_publishVolume, volumeId));
+  }
 
-    return Nothing();
+  // A previously failed `NodeUnpublishVolume` call can be recovered through an
+  // extra `NodeUnpublishVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunpublishvolume // NOLINT
+  if (volumeState.state() == VolumeState::NODE_UNPUBLISH) {
+    // Retry after recovering the volume to `VOL_READY` state.
+    return __unpublishVolume(volumeId)
+      .then(process::defer(self(), &Self::_publishVolume, volumeId));
   }
 
-  const string stagingPath = csi::paths::getMountStagingPath(
-      csi::paths::getMountRootDir(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name()),
+  const string targetPath = paths::getMountTargetPath(
+      paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
       volumeId);
 
-  // NOTE: The staging path will be cleaned up in `deleteVolume`.
-  Try<Nothing> mkdir = os::mkdir(stagingPath);
+  // NOTE: The target path will be cleaned up during volume removal.
+  Try<Nothing> mkdir = os::mkdir(targetPath);
   if (mkdir.isError()) {
     return Failure(
-        "Failed to create mount staging path '" + stagingPath +
+        "Failed to create mount target path '" + targetPath +
         "': " + mkdir.error());
   }
 
-  if (volume.state.state() == VolumeState::NODE_READY) {
-    volume.state.set_state(VolumeState::NODE_STAGE);
+  if (volumeState.state() == VolumeState::VOL_READY) {
+    volumeState.set_state(VolumeState::NODE_PUBLISH);
     checkpointVolumeState(volumeId);
   }
 
-  CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
+  LOG(INFO) << "Calling '/csi.v0.Node/NodePublishVolume' for volume '"
+            << volumeId << "'";
 
-  csi::v0::NodeStageVolumeRequest request;
+  NodePublishVolumeRequest request;
   request.set_volume_id(volumeId);
-  *request.mutable_publish_info() = volume.state.publish_info();
-  request.set_staging_target_path(stagingPath);
+  *request.mutable_publish_info() = volumeState.publish_info();
+  request.set_target_path(targetPath);
   *request.mutable_volume_capability() =
-    csi::v0::evolve(volume.state.volume_capability());
-  *request.mutable_volume_attributes() = volume.state.volume_attributes();
+    csi::v0::evolve(volumeState.volume_capability());
+  request.set_readonly(false);
+  *request.mutable_volume_attributes() = volumeState.volume_attributes();
 
-  return call<csi::v0::NODE_STAGE_VOLUME>(csi::NODE_SERVICE, std::move(request))
-    .then(defer(self(), [this, volumeId] {
-      VolumeData& volume = volumes.at(volumeId);
+  if (nodeCapabilities->stageUnstageVolume) {
+    const string stagingPath = paths::getMountStagingPath(
+        paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
+        volumeId);
+
+    CHECK(os::exists(stagingPath));
+    request.set_staging_target_path(stagingPath);
+  }
+
+  return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
+    .then(defer(self(), [this, volumeId, targetPath] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+
+      volumeState.set_state(VolumeState::PUBLISHED);
+
+      // NOTE: This is the first time a container is going to consume the
+      // persistent volume, so the `node_publish_required` field is set to
+      // indicate that this volume must remain published so it can be
+      // synchronously cleaned up when the persistent volume is destroyed.
+      volumeState.set_node_publish_required(true);
 
-      volume.state.set_state(VolumeState::VOL_READY);
-      volume.state.set_boot_id(CHECK_NOTNONE(bootId));
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -1786,53 +1773,76 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
+Future<Nothing> StorageLocalResourceProviderProcess::__publishVolume(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  VolumeData& volume = volumes.at(volumeId);
+  VolumeState& volumeState = volumes.at(volumeId).state;
 
-  if (!nodeCapabilities->stageUnstageVolume) {
-    CHECK_EQ(VolumeState::VOL_READY, volume.state.state());
+  if (volumeState.state() == VolumeState::VOL_READY) {
+    CHECK(!volumeState.boot_id().empty());
+    return Nothing();
+  }
 
-    volume.state.set_state(VolumeState::NODE_READY);
-    volume.state.clear_boot_id();
-    checkpointVolumeState(volumeId);
+  if (volumeState.state() != VolumeState::NODE_READY &&
+      volumeState.state() != VolumeState::NODE_STAGE &&
+      volumeState.state() != VolumeState::NODE_UNSTAGE) {
+    // Retry after transitioning the volume to `NODE_READY` state.
+    return _attachVolume(volumeId)
+      .then(process::defer(self(), &Self::__publishVolume, volumeId));
+  }
 
+  if (!nodeCapabilities->stageUnstageVolume) {
+    // Since this is a no-op, no need to checkpoint here.
+    volumeState.set_state(VolumeState::VOL_READY);
+    volumeState.set_boot_id(CHECK_NOTNONE(bootId));
     return Nothing();
   }
 
-  const string stagingPath = csi::paths::getMountStagingPath(
-      csi::paths::getMountRootDir(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name()),
+  // A previously failed `NodeUnstageVolume` call can be recovered through an
+  // extra `NodeUnstageVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodeunstagevolume // NOLINT
+  if (volumeState.state() == VolumeState::NODE_UNSTAGE) {
+    // Retry after recovering the volume to `NODE_READY` state.
+    return _unpublishVolume(volumeId)
+      .then(process::defer(self(), &Self::__publishVolume, volumeId));
+  }
+
+  const string stagingPath = paths::getMountStagingPath(
+      paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
       volumeId);
 
-  CHECK(os::exists(stagingPath));
+  // NOTE: The staging path will be cleaned up in during volume removal.
+  Try<Nothing> mkdir = os::mkdir(stagingPath);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create mount staging path '" + stagingPath +
+        "': " + mkdir.error());
+  }
 
-  // A previously failed `NodeStageVolume` call can be recovered through the
-  // current `NodeUnstageVolume` call. See:
-  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
-  if (volume.state.state() == VolumeState::VOL_READY ||
-      volume.state.state() == VolumeState::NODE_STAGE) {
-    volume.state.set_state(VolumeState::NODE_UNSTAGE);
+  if (volumeState.state() == VolumeState::NODE_READY) {
+    volumeState.set_state(VolumeState::NODE_STAGE);
     checkpointVolumeState(volumeId);
   }
 
-  CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
+  LOG(INFO) << "Calling '/csi.v0.Node/NodeStageVolume' for volume '" << volumeId
+            << "'";
 
-  csi::v0::NodeUnstageVolumeRequest request;
+  NodeStageVolumeRequest request;
   request.set_volume_id(volumeId);
+  *request.mutable_publish_info() = volumeState.publish_info();
   request.set_staging_target_path(stagingPath);
+  *request.mutable_volume_capability() =
+    csi::v0::evolve(volumeState.volume_capability());
+  *request.mutable_volume_attributes() = volumeState.volume_attributes();
 
-  return call<csi::v0::NODE_UNSTAGE_VOLUME>(
-      csi::NODE_SERVICE, std::move(request))
-    .then(defer(self(), [this, volumeId] {
-      VolumeData& volume = volumes.at(volumeId);
+  return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::VOL_READY);
+      volumeState.set_boot_id(CHECK_NOTNONE(bootId));
 
-      volume.state.set_state(VolumeState::NODE_READY);
-      volume.state.clear_boot_id();
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -1840,68 +1850,60 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
+Future<Nothing> StorageLocalResourceProviderProcess::_unpublishVolume(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  VolumeData& volume = volumes.at(volumeId);
-
-  const string targetPath = csi::paths::getMountTargetPath(
-      csi::paths::getMountRootDir(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name()),
-      volumeId);
+  VolumeState& volumeState = volumes.at(volumeId).state;
 
-  // NOTE: The target path will be cleaned up in `deleteVolume`.
-  Try<Nothing> mkdir = os::mkdir(targetPath);
-  if (mkdir.isError()) {
-    return Failure(
-        "Failed to create mount target path '" + targetPath +
-        "': " + mkdir.error());
+  if (volumeState.state() == VolumeState::NODE_READY) {
+    CHECK(volumeState.boot_id().empty());
+    return Nothing();
   }
 
-  if (volume.state.state() == VolumeState::VOL_READY) {
-    volume.state.set_state(VolumeState::NODE_PUBLISH);
-    checkpointVolumeState(volumeId);
+  if (volumeState.state() != VolumeState::VOL_READY &&
+      volumeState.state() != VolumeState::NODE_STAGE &&
+      volumeState.state() != VolumeState::NODE_UNSTAGE) {
+    // Retry after transitioning the volume to `VOL_READY` state.
+    return __unpublishVolume(volumeId)
+      .then(process::defer(self(), &Self::_unpublishVolume, volumeId));
   }
 
-  CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state());
-
-  csi::v0::NodePublishVolumeRequest request;
-  request.set_volume_id(volumeId);
-  *request.mutable_publish_info() = volume.state.publish_info();
-  request.set_target_path(targetPath);
-  *request.mutable_volume_capability() =
-    csi::v0::evolve(volume.state.volume_capability());
-  request.set_readonly(false);
-  *request.mutable_volume_attributes() = volume.state.volume_attributes();
+  if (!nodeCapabilities->stageUnstageVolume) {
+    // Since this is a no-op, no need to checkpoint here.
+    volumeState.set_state(VolumeState::NODE_READY);
+    volumeState.clear_boot_id();
+    return Nothing();
+  }
 
-  if (nodeCapabilities->stageUnstageVolume) {
-    const string stagingPath = csi::paths::getMountStagingPath(
-        csi::paths::getMountRootDir(
-            slave::paths::getCsiRootDir(workDir),
-            info.storage().plugin().type(),
-            info.storage().plugin().name()),
-        volumeId);
+  // A previously failed `NodeStageVolume` call can be recovered through the
+  // current `NodeUnstageVolume` call. See:
+  // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
+  if (volumeState.state() == VolumeState::VOL_READY ||
+      volumeState.state() == VolumeState::NODE_STAGE) {
+    volumeState.set_state(VolumeState::NODE_UNSTAGE);
+    checkpointVolumeState(volumeId);
+  }
 
-    CHECK(os::exists(stagingPath));
+  const string stagingPath = paths::getMountStagingPath(
+      paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
+      volumeId);
 
-    request.set_staging_target_path(stagingPath);
-  }
+  CHECK(os::exists(stagingPath));
 
-  return call<csi::v0::NODE_PUBLISH_VOLUME>(
-      csi::NODE_SERVICE, std::move(request))
-    .then(defer(self(), [this, volumeId] {
-      VolumeData& volume = volumes.at(volumeId);
+  LOG(INFO) << "Calling '/csi.v0.Node/NodeUnstageVolume' for volume '"
+            << volumeId << "'";
 
-      volume.state.set_state(VolumeState::PUBLISHED);
+  NodeUnstageVolumeRequest request;
+  request.set_volume_id(volumeId);
+  request.set_staging_target_path(stagingPath);
 
-      // NOTE: The `node_publish_required` field is always set up by the
-      // successful `nodePublish` call, as it indicates that a container is
-      // going to use the volume. However, it will not cleared by a
-      // `nodeUnpublish` call, but by a `deleteVolume` call instead.
-      volume.state.set_node_publish_required(true);
+  return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::NODE_READY);
+      volumeState.clear_boot_id();
 
       checkpointVolumeState(volumeId);
 
@@ -1910,42 +1912,52 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
+Future<Nothing> StorageLocalResourceProviderProcess::__unpublishVolume(
     const string& volumeId)
 {
   CHECK(volumes.contains(volumeId));
-  VolumeData& volume = volumes.at(volumeId);
+  VolumeState& volumeState = volumes.at(volumeId).state;
 
-  const string targetPath = csi::paths::getMountTargetPath(
-      csi::paths::getMountRootDir(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name()),
-      volumeId);
+  if (volumeState.state() == VolumeState::VOL_READY) {
+    return Nothing();
+  }
 
-  CHECK(os::exists(targetPath));
+  if (volumeState.state() != VolumeState::PUBLISHED &&
+      volumeState.state() != VolumeState::NODE_PUBLISH &&
+      volumeState.state() != VolumeState::NODE_UNPUBLISH) {
+    return Failure(
+        "Cannot unpublish volume '" + volumeId + "' in " +
+        stringify(volumeState.state()) + "state");
+  }
 
   // A previously failed `NodePublishVolume` call can be recovered through the
   // current `NodeUnpublishVolume` call. See:
   // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
-  if (volume.state.state() == VolumeState::PUBLISHED ||
-      volume.state.state() == VolumeState::NODE_PUBLISH) {
-    volume.state.set_state(VolumeState::NODE_UNPUBLISH);
+  if (volumeState.state() == VolumeState::PUBLISHED ||
+      volumeState.state() == VolumeState::NODE_PUBLISH) {
+    volumeState.set_state(VolumeState::NODE_UNPUBLISH);
     checkpointVolumeState(volumeId);
   }
 
-  CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state());
+  const string targetPath = paths::getMountTargetPath(
+      paths::getMountRootDir(rootDir, pluginInfo.type(), pluginInfo.name()),
+      volumeId);
+
+  CHECK(os::exists(targetPath));
+
+  LOG(INFO) << "Calling '/csi.v0.Node/NodeUnpublishVolume' for volume '"
+            << volumeId << "'";
 
-  csi::v0::NodeUnpublishVolumeRequest request;
+  NodeUnpublishVolumeRequest request;
   request.set_volume_id(volumeId);
   request.set_target_path(targetPath);
 
-  return call<csi::v0::NODE_UNPUBLISH_VOLUME>(
-      csi::NODE_SERVICE, std::move(request))
-    .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
-      VolumeData& volume = volumes.at(volumeId);
+  return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
+    .then(process::defer(self(), [this, volumeId] {
+      CHECK(volumes.contains(volumeId));
+      VolumeState& volumeState = volumes.at(volumeId).state;
+      volumeState.set_state(VolumeState::VOL_READY);
 
-      volume.state.set_state(VolumeState::VOL_READY);
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -2032,63 +2044,21 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
       checkpointVolumeState(volumeId);
     }
 
-    CHECK(VolumeState::State_IsValid(volume.state.state()));
-
-    switch (volume.state.state()) {
-      case VolumeState::PUBLISHED:
-      case VolumeState::NODE_PUBLISH:
-      case VolumeState::NODE_UNPUBLISH: {
-        deleted = deleted.then(defer(self(), &Self::nodeUnpublish, volumeId));
-
-        // NOTE: We continue to the next case to delete the volume in
-        // `VOL_READY` state once the above is done.
-      }
-      case VolumeState::VOL_READY:
-      case VolumeState::NODE_STAGE:
-      case VolumeState::NODE_UNSTAGE: {
-        deleted = deleted.then(defer(self(), &Self::nodeUnstage, volumeId));
-
-        // NOTE: We continue to the next case to delete the volume in
-        // `NODE_READY` state once the above is done.
-      }
-      case VolumeState::NODE_READY:
-      case VolumeState::CONTROLLER_PUBLISH:
-      case VolumeState::CONTROLLER_UNPUBLISH: {
-        deleted =
-          deleted.then(defer(self(), &Self::controllerUnpublish, volumeId));
-
-        // NOTE: We continue to the next case to delete the volume in `CREATED`
-        // state once the above is done.
-      }
-      case VolumeState::CREATED: {
-        break;
-      }
-      case VolumeState::UNKNOWN: {
-        UNREACHABLE();
-      }
-
-      // NOTE: We avoid using a default clause for the following values in
-      // proto3's open enum to enable the compiler to detect missing enum cases
-      // for us. See:
-      // https://github.com/google/protobuf/issues/3917
-      case google::protobuf::kint32min:
-      case google::protobuf::kint32max: {
-        UNREACHABLE();
-      }
-    }
+    deleted = _detachVolume(volumeId);
   }
 
   // We only delete the volume if the `CREATE_DELETE_VOLUME` capability is
   // supported. Otherwise, we simply leave it as a preprovisioned volume.
   if (controllerCapabilities->createDeleteVolume) {
-    deleted = deleted.then(defer(self(), [this, volumeId] {
-      csi::v0::DeleteVolumeRequest request;
-      request.set_volume_id(volumeId);
-
-      return call<csi::v0::DELETE_VOLUME>(
-          csi::CONTROLLER_SERVICE, std::move(request), true) // Retry.
-        .then([] { return Nothing(); });
-    }));
+    deleted = deleted
+      .then(defer(self(), [this, volumeId] {
+        csi::v0::DeleteVolumeRequest request;
+        request.set_volume_id(volumeId);
+
+        return call<csi::v0::DELETE_VOLUME>(
+            csi::CONTROLLER_SERVICE, std::move(request), true) // Retry.
+          .then([] { return Nothing(); });
+      }));
   }
 
   // NOTE: The last asynchronous continuation of `deleteVolume`, which is
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
index 359db0c..0b73531 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -192,41 +192,42 @@ private:
 
   process::Future<Nothing> publishVolume(const std::string& volumeId);
 
-  // Transitions the state of the specified volume from `CREATED` or
-  // `CONTROLLER_PUBLISH` to `NODE_READY`.
+  // The following methods are used to manage volume lifecycles. Transient
+  // states are omitted.
   //
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<Nothing> controllerPublish(const std::string& volumeId);
-
-  // Transitions the state of the specified volume from `NODE_READY`,
-  // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
-  //
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<Nothing> controllerUnpublish(const std::string& volumeId);
-
-  // Transitions the state of the specified volume from `NODE_READY` or
-  // `NODE_STAGE` to `VOL_READY`.
-  //
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<Nothing> nodeStage(const std::string& volumeId);
-
-  // Transitions the state of the specified volume from `VOL_READY`,
-  // `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`.
-  //
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<Nothing> nodeUnstage(const std::string& volumeId);
-
-  // Transitions the state of the specified volume from `VOL_READY` or
-  // `NODE_PUBLISH` to `PUBLISHED`.
-  //
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<Nothing> nodePublish(const std::string& volumeId);
-
-  // Transitions the state of the specified volume from `PUBLISHED`,
-  // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
-  //
-  // NOTE: This can only be called after `prepareServices`.
-  process::Future<Nothing> nodeUnpublish(const std::string& volumeId);
+  //                          +------------+
+  //                 +  +  +  |  CREATED   |  ^
+  //   _attachVolume |  |  |  +---+----^---+  |
+  //                 |  |  |      |    |      | _detachVolume
+  //                 |  |  |  +---v----+---+  |
+  //                 v  +  +  | NODE_READY |  +  ^
+  //                    |  |  +---+----^---+  |  |
+  //    __publishVolume |  |      |    |      |  | _unpublishVolume
+  //                    |  |  +---v----+---+  |  |
+  //                    v  +  | VOL_READY  |  +  +  ^
+  //                       |  +---+----^---+  |  |  |
+  //        _publishVolume |      |    |      |  |  | __unpublishVolume
+  //                       |  +---v----+---+  |  |  |
+  //                       V  | PUBLISHED  |  +  +  +
+  //                          +------------+
+
+  // Transition a volume to `NODE_READY` state from any state above.
+  process::Future<Nothing> _attachVolume(const std::string& volumeId);
+
+  // Transition a volume to `CREATED` state from any state below.
+  process::Future<Nothing> _detachVolume(const std::string& volumeId);
+
+  // Transition a volume to `PUBLISHED` state from any state above.
+  process::Future<Nothing> _publishVolume(const std::string& volumeId);
+
+  // Transition a volume to `VOL_READY` state from any state above.
+  process::Future<Nothing> __publishVolume(const std::string& volumeId);
+
+  // Transition a volume to `NODE_READY` state from any state below.
+  process::Future<Nothing> _unpublishVolume(const std::string& volumeId);
+
+  // Transition a volume to `VOL_READY` state from any state below.
+  process::Future<Nothing> __unpublishVolume(const std::string& volumeId);
 
   // Returns a CSI volume ID.
   //