You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/04/13 03:39:19 UTC

[1/3] mesos git commit: Updated filesystem layout for staging and mounting CSI volumes.

Repository: mesos
Updated Branches:
  refs/heads/master 554503985 -> 5a105893a


Updated filesystem layout for staging and mounting CSI volumes.

To support the `STAGE_UNSTAGE_VOLUME` in CSI v0.2, a CSI volume is now
staged at `<work_dir>/csi/<type>/<name>/mounts/<volume_id>/staging` and
mounted at `<work_dir>/csi/<type>/<name>/mounts/<volume_id>/target`.

The `DiskInfo.Source.Path.root` and `DiskInfo.Source.Mount.root` fields
now stores `./csi/<type>/<name>/mounts` for PATH and MOUNT disk
resources respectively, and the actual mount point is carved out from
the `root` and `id` fields. In the future, the `SharedInfo` might also
be used to construct the mount point for shared volumes.

Review: https://reviews.apache.org/r/66574/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02f9cb12
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02f9cb12
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02f9cb12

Branch: refs/heads/master
Commit: 02f9cb127893b794e9fcc8b529f45bee33dd2e4c
Parents: 5545039
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 19:42:15 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 19:42:15 2018 -0700

----------------------------------------------------------------------
 src/CMakeLists.txt                         |  4 ++
 src/Makefile.am                            | 19 +++++----
 src/csi/paths.cpp                          | 36 +++++++++-------
 src/csi/paths.hpp                          | 15 ++++---
 src/resource_provider/storage/provider.cpp | 56 +++++++++++++------------
 src/slave/paths.cpp                        | 18 ++++++++
 6 files changed, 92 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ac176d1..31af9ea 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -226,6 +226,9 @@ set(COMMON_SRC
   common/validation.cpp
   common/values.cpp)
 
+set(CSI_SRC
+  csi/paths.cpp)
+
 set(DOCKER_SRC
   docker/docker.cpp
   docker/spec.cpp)
@@ -428,6 +431,7 @@ set(MESOS_SRC
   ${AUTHENTICATION_SRC}
   ${AUTHORIZER_SRC}
   ${COMMON_SRC}
+  ${CSI_SRC}
   ${DOCKER_SRC}
   ${EXECUTOR_SRC}
   ${FILES_SRC}

http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index b590928..07eb138 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1524,28 +1524,29 @@ libbuild_la_CPPFLAGS += -DBUILD_FLAGS="\"$$BUILD_FLAGS\""
 libmesos_no_3rdparty_la_LIBADD += libbuild.la
 
 
-if ENABLE_GRPC
 # Convenience library for build the CSI client.
 noinst_LTLIBRARIES += libcsi.la
 libcsi_la_SOURCES =							\
-  csi/paths.cpp								\
-  csi/utils.cpp
+  csi/paths.cpp
+
+libcsi_la_SOURCES +=							\
+  csi/paths.hpp
+
+if ENABLE_GRPC
 libcsi_la_SOURCES +=							\
   ../include/csi/spec.hpp						\
-  csi/paths.hpp								\
+  csi/client.cpp							\
+  csi/client.hpp							\
   csi/state.hpp								\
   csi/state.proto							\
+  csi/utils.cpp								\
   csi/utils.hpp
-
-libcsi_la_SOURCES +=							\
-  csi/client.cpp							\
-  csi/client.hpp
+endif
 
 nodist_libcsi_la_SOURCES = $(CXX_CSI_PROTOS)
 libcsi_la_CPPFLAGS = $(MESOS_CPPFLAGS)
 
 libmesos_no_3rdparty_la_LIBADD += libcsi.la
-endif
 
 
 # Convenience library for building the replicated log in order to

http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/csi/paths.cpp
----------------------------------------------------------------------
diff --git a/src/csi/paths.cpp b/src/csi/paths.cpp
index 0d42db2..cc5b9ec 100644
--- a/src/csi/paths.cpp
+++ b/src/csi/paths.cpp
@@ -42,18 +42,20 @@ namespace csi {
 namespace paths {
 
 // File names.
-const char CONTAINER_INFO_FILE[] = "container.info";
-const char ENDPOINT_SOCKET_FILE[] = "endpoint.sock";
-const char VOLUME_STATE_FILE[] = "volume.state";
+constexpr char CONTAINER_INFO_FILE[] = "container.info";
+constexpr char ENDPOINT_SOCKET_FILE[] = "endpoint.sock";
+constexpr char VOLUME_STATE_FILE[] = "volume.state";
 
 
-const char CONTAINERS_DIR[] = "containers";
-const char VOLUMES_DIR[] = "volumes";
-const char MOUNTS_DIR[] = "mounts";
+constexpr char CONTAINERS_DIR[] = "containers";
+constexpr char VOLUMES_DIR[] = "volumes";
+constexpr char MOUNTS_DIR[] = "mounts";
+constexpr char STAGING_DIR[] = "staging";
+constexpr char TARGET_DIR[] = "target";
 
 
-const char ENDPOINT_DIR_SYMLINK[] = "endpoint";
-const char ENDPOINT_DIR[] = "mesos-csi-XXXXXX";
+constexpr char ENDPOINT_DIR_SYMLINK[] = "endpoint";
+constexpr char ENDPOINT_DIR[] = "mesos-csi-XXXXXX";
 
 
 Try<list<string>> getContainerPaths(
@@ -276,15 +278,19 @@ string getMountRootDir(
 }
 
 
-string getMountPath(
-    const string& rootDir,
-    const string& type,
-    const string& name,
+string getMountStagingPath(
+    const string& mountRootDir,
     const string& volumeId)
 {
-  return path::join(
-      getMountRootDir(rootDir, type, name),
-      http::encode(volumeId));
+  return path::join(mountRootDir, http::encode(volumeId), STAGING_DIR);
+}
+
+
+string getMountTargetPath(
+    const string& mountRootDir,
+    const string& volumeId)
+{
+  return path::join(mountRootDir, http::encode(volumeId), TARGET_DIR);
 }
 
 } // namespace paths {

http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/csi/paths.hpp
----------------------------------------------------------------------
diff --git a/src/csi/paths.hpp b/src/csi/paths.hpp
index 7892e5e..7a4e9e0 100644
--- a/src/csi/paths.hpp
+++ b/src/csi/paths.hpp
@@ -42,7 +42,9 @@ namespace paths {
 //           |   |-- <volume_id>
 //           |        |-- volume.state
 //           |-- mounts
-//               |-- <volume_id> (mount point)
+//               |-- <volume_id>
+//                    |- staging (staging mount point)
+//                    |- target (mount point)
 
 
 struct ContainerPath
@@ -134,10 +136,13 @@ std::string getMountRootDir(
     const std::string& name);
 
 
-std::string getMountPath(
-    const std::string& rootDir,
-    const std::string& type,
-    const std::string& name,
+std::string getMountStagingPath(
+    const std::string& mountRootDir,
+    const std::string& volumeId);
+
+
+std::string getMountTargetPath(
+    const std::string& mountRootDir,
     const std::string& volumeId);
 
 } // namespace paths {

http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ddbaa0f..ce94393 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -1785,24 +1785,23 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
   endpointVolume->set_host_path(endpointDir);
 
   // Prepare the directory where the mount points will be placed.
-  const string mountDir = csi::paths::getMountRootDir(
+  const string mountRootDir = csi::paths::getMountRootDir(
       slave::paths::getCsiRootDir(workDir),
       info.storage().plugin().type(),
       info.storage().plugin().name());
 
-  Try<Nothing> mkdir = os::mkdir(mountDir);
+  Try<Nothing> mkdir = os::mkdir(mountRootDir);
   if (mkdir.isError()) {
     return Failure(
-        "Failed to create directory '" + mountDir +
-        "': " + mkdir.error());
+        "Failed to create directory '" + mountRootDir + "': " + mkdir.error());
   }
 
   // Prepare a volume where the mount points will be placed.
   Volume* mountVolume = containerInfo.add_volumes();
   mountVolume->set_mode(Volume::RW);
-  mountVolume->set_container_path(mountDir);
+  mountVolume->set_container_path(mountRootDir);
   mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
-  mountVolume->mutable_source()->mutable_host_path()->set_path(mountDir);
+  mountVolume->mutable_source()->mutable_host_path()->set_path(mountRootDir);
   mountVolume->mutable_source()->mutable_host_path()
     ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
 
@@ -2194,16 +2193,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
         csi::v0::Client client) -> Future<Nothing> {
       VolumeData& volume = volumes.at(volumeId);
 
-      const string mountPath = csi::paths::getMountPath(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name(),
+      const string targetPath = csi::paths::getMountTargetPath(
+          csi::paths::getMountRootDir(
+              slave::paths::getCsiRootDir(workDir),
+              info.storage().plugin().type(),
+              info.storage().plugin().name()),
           volumeId);
 
-      Try<Nothing> mkdir = os::mkdir(mountPath);
+      Try<Nothing> mkdir = os::mkdir(targetPath);
       if (mkdir.isError()) {
         return Failure(
-            "Failed to create mount point '" + mountPath + "': " +
+            "Failed to create mount target path '" + targetPath + "': " +
             mkdir.error());
       }
 
@@ -2217,7 +2217,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
       csi::v0::NodePublishVolumeRequest request;
       request.set_volume_id(volumeId);
       *request.mutable_publish_info() = volume.state.publish_info();
-      request.set_target_path(mountPath);
+      request.set_target_path(targetPath);
       request.mutable_volume_capability()
         ->CopyFrom(volume.state.volume_capability());
       request.set_readonly(false);
@@ -2252,13 +2252,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     .then(defer(self(), [this, volumeId](csi::v0::Client client) {
       VolumeData& volume = volumes.at(volumeId);
 
-      const string mountPath = csi::paths::getMountPath(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name(),
+      const string targetPath = csi::paths::getMountTargetPath(
+          csi::paths::getMountRootDir(
+              slave::paths::getCsiRootDir(workDir),
+              info.storage().plugin().type(),
+              info.storage().plugin().name()),
           volumeId);
 
-      CHECK(os::exists(mountPath));
+      CHECK(os::exists(targetPath));
 
       // A previously failed `NodePublishVolume` call can be recovered through
       // the current `NodeUnpublishVolume` call. See:
@@ -2273,20 +2274,20 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 
       csi::v0::NodeUnpublishVolumeRequest request;
       request.set_volume_id(volumeId);
-      request.set_target_path(mountPath);
+      request.set_target_path(targetPath);
 
       return client.NodeUnpublishVolume(request)
-        .then(defer(self(), [this, volumeId, mountPath]() -> Future<Nothing> {
+        .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
           VolumeData& volume = volumes.at(volumeId);
 
           volume.state.set_state(VolumeState::NODE_READY);
           volume.state.clear_boot_id();
           checkpointVolumeState(volumeId);
 
-          Try<Nothing> rmdir = os::rmdir(mountPath);
+          Try<Nothing> rmdir = os::rmdir(targetPath);
           if (rmdir.isError()) {
             return Failure(
-                "Failed to remove mount point '" + mountPath + "': " +
+                "Failed to remove mount point '" + targetPath + "': " +
                 rmdir.error());
           }
 
@@ -2885,23 +2886,24 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
           ->CopyFrom(convertStringMapToLabels(volumeState.volume_attributes()));
       }
 
-      const string mountPath = csi::paths::getMountPath(
+      const string mountRootDir = csi::paths::getMountRootDir(
           slave::paths::getCsiRootDir("."),
           info.storage().plugin().type(),
-          info.storage().plugin().name(),
-          volumeId);
+          info.storage().plugin().name());
 
       switch (type) {
         case Resource::DiskInfo::Source::PATH: {
           // Set the root path relative to agent work dir.
           converted.mutable_disk()->mutable_source()->mutable_path()
-            ->set_root(mountPath);
+            ->set_root(mountRootDir);
+
           break;
         }
         case Resource::DiskInfo::Source::MOUNT: {
           // Set the root path relative to agent work dir.
           converted.mutable_disk()->mutable_source()->mutable_mount()
-            ->set_root(mountPath);
+            ->set_root(mountRootDir);
+
           break;
         }
         case Resource::DiskInfo::Source::BLOCK: {

http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index 8a7e162..690bfe3 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -36,6 +36,8 @@
 
 #include "common/validation.hpp"
 
+#include "csi/paths.hpp"
+
 #include "messages/messages.hpp"
 
 #include "slave/paths.hpp"
@@ -673,10 +675,18 @@ string getPersistentVolumePath(
       CHECK(volume.disk().source().has_path());
       CHECK(volume.disk().source().path().has_root());
       string root = volume.disk().source().path().root();
+
       if (!path::absolute(root)) {
         // A relative path in `root` is relative to agent work dir.
         root = path::join(workDir, root);
       }
+
+      if (volume.disk().source().has_id()) {
+        // For a CSI volume the mount point is derived from `root` and `id`.
+        root =
+          csi::paths::getMountTargetPath(root, volume.disk().source().id());
+      }
+
       return getPersistentVolumePath(
           root,
           role,
@@ -687,10 +697,18 @@ string getPersistentVolumePath(
       CHECK(volume.disk().source().has_mount());
       CHECK(volume.disk().source().mount().has_root());
       string root = volume.disk().source().mount().root();
+
       if (!path::absolute(root)) {
         // A relative path in `root` is relative to agent work dir.
         root = path::join(workDir, root);
       }
+
+      if (volume.disk().source().has_id()) {
+        // For a CSI volume the mount point is derived from `root` and `id`.
+        root =
+          csi::paths::getMountTargetPath(root, volume.disk().source().id());
+      }
+
       return root;
     }
     case Resource::DiskInfo::Source::BLOCK:


[2/3] mesos git commit: Supported `STAGE_UNSTAGE_VOLUME` CSI node capability in SLRP.

Posted by ch...@apache.org.
Supported `STAGE_UNSTAGE_VOLUME` CSI node capability in SLRP.

The storage local resource provider now properly calls `NodeStageVolume`
or `NodeUnstageVolume` when publishing or deleting volumes for CSI
plugins support the `STAGE_UNSTAGE_VOLUME` capability.

Review: https://reviews.apache.org/r/66575/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a9aa3ad8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a9aa3ad8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a9aa3ad8

Branch: refs/heads/master
Commit: a9aa3ad8599cdafbd8d36620887c7f9002ff56d2
Parents: 02f9cb1
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 19:42:26 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 19:42:26 2018 -0700

----------------------------------------------------------------------
 src/csi/state.proto                        |  33 +--
 src/resource_provider/storage/provider.cpp | 350 +++++++++++++++++++-----
 2 files changed, 293 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a9aa3ad8/src/csi/state.proto
----------------------------------------------------------------------
diff --git a/src/csi/state.proto b/src/csi/state.proto
index a252cb2..8445399 100644
--- a/src/csi/state.proto
+++ b/src/csi/state.proto
@@ -26,10 +26,13 @@ message VolumeState {
   enum State {
     UNKNOWN = 0;
     CREATED = 1;              // The volume is provisioned but not published.
-    NODE_READY = 2;           // The volume is attached to the node.
-    PUBLISHED =  3;           // The volume is mounted on the node.
+    NODE_READY = 2;           // The volume is made available on the node.
+    VOL_READY = 8;            // The volume is made publishable on the node.
+    PUBLISHED = 3;            // The volume is published on the node.
     CONTROLLER_PUBLISH = 4;   // `ControllerPublishVolume` is being called.
     CONTROLLER_UNPUBLISH = 5; // `ControllerUnpublishVolume` is being called.
+    NODE_STAGE = 9;           // `NodeStageVolume` is being called.
+    NODE_UNSTAGE = 10;        // `NodeUnstageVolume` is being called.
     NODE_PUBLISH = 6;         // `NodePublishVolume` is being called.
     NODE_UNPUBLISH = 7;       // `NodeUnpublishVolume` is being called.
   }
@@ -37,25 +40,23 @@ message VolumeState {
   // The state of the volume. This is a REQUIRED field.
   State state = 1;
 
-  // The capability used to publish the volume. This is a
-  // REQUIRED field.
+  // The capability used to publish the volume. This is a REQUIRED field.
   .csi.v0.VolumeCapability volume_capability = 2;
 
-  // Attributes of the volume to be used on the node. This field MUST
-  // match the attributes of the `Volume` returned by `CreateVolume`.
-  // This is an OPTIONAL field.
+  // Attributes of the volume to be used on the node. This field MUST match the
+  // attributes of the `Volume` returned by `CreateVolume`. This is an OPTIONAL
+  // field.
   map<string, string> volume_attributes = 3;
 
-  // If the plugin has the `PUBLISH_UNPUBLISH_VOLUME` controller
-  // capability, this field MUST be set to the value returned by
-  // `ControllerPublishVolume`. Otherwise, this field MUST remain unset.
-  // This is an OPTIONAL field.
+  // If the plugin has the `PUBLISH_UNPUBLISH_VOLUME` controller capability,
+  // this field MUST be set to the value returned by `ControllerPublishVolume`.
+  // Otherwise, this field MUST remain unset. This is an OPTIONAL field.
   map<string, string> publish_info = 4;
 
-  // This field is used to check if the node has been rebooted since the
-  // last time the volume is mounted. If yes, `NodePublishVolume` needs
-  // to be called to mount the volume again. It MUST be set to the boot
-  // ID of the node if the volume is mounted, and SHOULD remain unset
-  // otherwise. This is an OPTIONAL field.
+  // This field is used to check if the node has been rebooted since the volume
+  // was transitioned to `VOL_READY` state. If yes, `NodeStageVolume` needs to
+  // be called to make the volume publishable again. It MUST be set to the boot
+  // ID of the node if the volume is in `VOL_READY` state, and SHOULD remain
+  // unset otherwise. This is an OPTIONAL field.
   string boot_id = 5;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/a9aa3ad8/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ce94393..8ca2d3a 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -326,7 +326,7 @@ public:
 private:
   struct VolumeData
   {
-    VolumeData(const VolumeState& _state)
+    VolumeData(VolumeState&& _state)
       : state(_state), sequence(new Sequence("volume-sequence")) {}
 
     VolumeState state;
@@ -376,6 +376,8 @@ private:
   Future<Nothing> prepareNodeService();
   Future<Nothing> controllerPublish(const string& volumeId);
   Future<Nothing> controllerUnpublish(const string& volumeId);
+  Future<Nothing> nodeStage(const string& volumeId);
+  Future<Nothing> nodeUnstage(const string& volumeId);
   Future<Nothing> nodePublish(const string& volumeId);
   Future<Nothing> nodeUnpublish(const string& volumeId);
   Future<string> createVolume(
@@ -640,7 +642,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 
       auto err = [](const string& message) {
         LOG(ERROR)
-          << "Failed to watch for VolumeprofileAdaptor: " << message;
+          << "Failed to watch for DiskProfileAdaptor: " << message;
       };
 
       // Start watching the DiskProfileAdaptor.
@@ -843,59 +845,94 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
 
       Future<Nothing> recovered = Nothing();
 
-      switch (volume.state.state()) {
-        case VolumeState::CREATED:
-        case VolumeState::NODE_READY: {
-          break;
-        }
-        case VolumeState::PUBLISHED: {
-          if (volume.state.boot_id() != bootId) {
-            // The node has been restarted since the volume is mounted,
-            // so it is no longer in the `PUBLISHED` state.
-            volume.state.set_state(VolumeState::NODE_READY);
-            volume.state.clear_boot_id();
-            checkpointVolumeState(volumeId);
+      if (VolumeState::State_IsValid(volume.state.state())) {
+        switch (volume.state.state()) {
+          case VolumeState::CREATED:
+          case VolumeState::NODE_READY: {
+            break;
           }
-          break;
-        }
-        case VolumeState::CONTROLLER_PUBLISH: {
-          recovered =
-            volume.sequence->add(std::function<Future<Nothing>()>(
+          case VolumeState::VOL_READY:
+          case VolumeState::PUBLISHED: {
+            if (volume.state.boot_id() != bootId) {
+              // The node has been restarted since the volume is made
+              // publishable, so it is reset to `NODE_READY` state.
+              volume.state.set_state(VolumeState::NODE_READY);
+              volume.state.clear_boot_id();
+              checkpointVolumeState(volumeId);
+            }
+
+            break;
+          }
+          case VolumeState::CONTROLLER_PUBLISH: {
+            recovered = volume.sequence->add(std::function<Future<Nothing>()>(
                 defer(self(), &Self::controllerPublish, volumeId)));
-          break;
-        }
-        case VolumeState::CONTROLLER_UNPUBLISH: {
-          recovered =
-            volume.sequence->add(std::function<Future<Nothing>()>(
+
+            break;
+          }
+          case VolumeState::CONTROLLER_UNPUBLISH: {
+            recovered = volume.sequence->add(std::function<Future<Nothing>()>(
                 defer(self(), &Self::controllerUnpublish, volumeId)));
-          break;
-        }
-        case VolumeState::NODE_PUBLISH: {
-          recovered =
-            volume.sequence->add(std::function<Future<Nothing>()>(
-                defer(self(), &Self::nodePublish, volumeId)));
-          break;
-        }
-        case VolumeState::NODE_UNPUBLISH: {
-          recovered =
-            volume.sequence->add(std::function<Future<Nothing>()>(
-                defer(self(), &Self::nodeUnpublish, volumeId)));
-          break;
-        }
-        case VolumeState::UNKNOWN: {
-          recovered = Failure(
-              "Volume '" + volumeId + "' is in " +
-              stringify(volume.state.state()) + " state");
-        }
 
-        // NOTE: We avoid using a default clause for the following
-        // values in proto3's open enum to enable the compiler to detect
-        // missing enum cases for us. See:
-        // https://github.com/google/protobuf/issues/3917
-        case google::protobuf::kint32min:
-        case google::protobuf::kint32max: {
-          UNREACHABLE();
+            break;
+          }
+          case VolumeState::NODE_STAGE: {
+            recovered = volume.sequence->add(std::function<Future<Nothing>()>(
+                defer(self(), &Self::nodeStage, volumeId)));
+
+            break;
+          }
+          case VolumeState::NODE_UNSTAGE: {
+            recovered = volume.sequence->add(std::function<Future<Nothing>()>(
+                defer(self(), &Self::nodeUnstage, volumeId)));
+
+            break;
+          }
+          case VolumeState::NODE_PUBLISH: {
+            if (volume.state.boot_id() != bootId) {
+              // The node has been restarted since `NodePublishVolume` was
+              // called, so it is reset to `NODE_READY` state.
+              volume.state.set_state(VolumeState::NODE_READY);
+              volume.state.clear_boot_id();
+              checkpointVolumeState(volumeId);
+            } else {
+              recovered = volume.sequence->add(std::function<Future<Nothing>()>(
+                  defer(self(), &Self::nodePublish, volumeId)));
+            }
+
+            break;
+          }
+          case VolumeState::NODE_UNPUBLISH: {
+            if (volume.state.boot_id() != bootId) {
+              // The node has been restarted since `NodeUnpublishVolume` was
+              // called, so it is reset to `NODE_READY` state.
+              volume.state.set_state(VolumeState::NODE_READY);
+              volume.state.clear_boot_id();
+              checkpointVolumeState(volumeId);
+            } else {
+              recovered = volume.sequence->add(std::function<Future<Nothing>()>(
+                  defer(self(), &Self::nodeUnpublish, volumeId)));
+            }
+
+            break;
+          }
+          case VolumeState::UNKNOWN: {
+            recovered = Failure(
+                "Volume '" + volumeId + "' is in " +
+                stringify(volume.state.state()) + " state");
+
+            break;
+          }
+
+          // NOTE: We avoid using a default clause for the following values in
+          // proto3's open enum to enable the compiler to detect missing enum
+          // cases for us. See: https://github.com/google/protobuf/issues/3917
+          case google::protobuf::kint32min:
+          case google::protobuf::kint32max: {
+            UNREACHABLE();
+          }
         }
+      } else {
+        recovered = Failure("Volume '" + volumeId + "' is in UNDEFINED state");
       }
 
       futures.push_back(recovered);
@@ -1511,10 +1548,13 @@ void StorageLocalResourceProviderProcess::publishResources(
       std::function<Future<Nothing>()> controllerAndNodePublish =
         defer(self(), [=] {
           CHECK(volumes.contains(volumeId));
+          const VolumeData& volume = volumes.at(volumeId);
 
           Future<Nothing> published = Nothing();
 
-          switch (volumes.at(volumeId).state.state()) {
+          CHECK(VolumeState::State_IsValid(volume.state.state()));
+
+          switch (volume.state.state()) {
             case VolumeState::CONTROLLER_UNPUBLISH: {
               published = published
                 .then(defer(self(), &Self::controllerUnpublish, volumeId));
@@ -1526,18 +1566,34 @@ void StorageLocalResourceProviderProcess::publishResources(
             case VolumeState::CONTROLLER_PUBLISH: {
               published = published
                 .then(defer(self(), &Self::controllerPublish, volumeId))
+                .then(defer(self(), &Self::nodeStage, volumeId))
                 .then(defer(self(), &Self::nodePublish, volumeId));
 
               break;
             }
-            case VolumeState::NODE_UNPUBLISH: {
+            case VolumeState::NODE_UNSTAGE: {
               published = published
-                .then(defer(self(), &Self::nodeUnpublish, volumeId));
+                .then(defer(self(), &Self::nodeUnstage, volumeId));
 
               // NOTE: We continue to the next case to publish the volume in
               // `NODE_READY` state once the above is done.
             }
             case VolumeState::NODE_READY:
+            case VolumeState::NODE_STAGE: {
+              published = published
+                .then(defer(self(), &Self::nodeStage, volumeId))
+                .then(defer(self(), &Self::nodePublish, volumeId));
+
+              break;
+            }
+            case VolumeState::NODE_UNPUBLISH: {
+              published = published
+                .then(defer(self(), &Self::nodeUnpublish, volumeId));
+
+              // NOTE: We continue to the next case to publish the volume in
+              // `VOL_READY` state once the above is done.
+            }
+            case VolumeState::VOL_READY:
             case VolumeState::NODE_PUBLISH: {
               published = published
                 .then(defer(self(), &Self::nodePublish, volumeId));
@@ -2038,12 +2094,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
             -> Future<csi::v0::Client> {
           nodeCapabilities = response.capabilities();
 
-          // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
-          if (nodeCapabilities.stageUnstageVolume) {
-            return Failure(
-                "Node capability 'STAGE_UNSTAGE_VOLUME' is not supported");
-          }
-
           // Get the latest service future before proceeding to the next step.
           return getService(nodeContainerId.get());
         }))
@@ -2177,14 +2227,144 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
 }
 
 
-// Transition the state of the specified volume from `NODE_READY` or
+// Transitions the state of the specified volume from `NODE_READY` or
+// `NODE_STAGE` to `VOL_READY`.
+// NOTE: This can only be called after `prepareNodeService`.
+Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
+    const string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeData& volume = volumes.at(volumeId);
+
+  if (!nodeCapabilities.stageUnstageVolume) {
+    CHECK_EQ(VolumeState::NODE_READY, volume.state.state());
+
+    volume.state.set_state(VolumeState::VOL_READY);
+    volume.state.set_boot_id(bootId);
+    checkpointVolumeState(volumeId);
+
+    return Nothing();
+  }
+
+  CHECK_SOME(nodeContainerId);
+
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [this, volumeId](
+        csi::v0::Client client) -> Future<Nothing> {
+      VolumeData& volume = volumes.at(volumeId);
+
+      const string stagingPath = csi::paths::getMountStagingPath(
+          csi::paths::getMountRootDir(
+              slave::paths::getCsiRootDir(workDir),
+              info.storage().plugin().type(),
+              info.storage().plugin().name()),
+          volumeId);
+
+      Try<Nothing> mkdir = os::mkdir(stagingPath);
+      if (mkdir.isError()) {
+        return Failure(
+            "Failed to create mount staging path '" + stagingPath + "': " +
+            mkdir.error());
+      }
+
+      if (volume.state.state() == VolumeState::NODE_READY) {
+        volume.state.set_state(VolumeState::NODE_STAGE);
+        checkpointVolumeState(volumeId);
+      }
+
+      CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
+
+      csi::v0::NodeStageVolumeRequest request;
+      request.set_volume_id(volumeId);
+      *request.mutable_publish_info() = volume.state.publish_info();
+      request.set_staging_target_path(stagingPath);
+      request.mutable_volume_capability()
+        ->CopyFrom(volume.state.volume_capability());
+      *request.mutable_volume_attributes() = volume.state.volume_attributes();
+
+      return client.NodeStageVolume(request)
+        .then(defer(self(), [this, volumeId] {
+          VolumeData& volume = volumes.at(volumeId);
+
+          volume.state.set_state(VolumeState::VOL_READY);
+          volume.state.set_boot_id(bootId);
+          checkpointVolumeState(volumeId);
+
+          return Nothing();
+        }));
+    }));
+}
+
+
+// Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE`
+// or `NODE_UNSTAGE` to `NODE_READY`.
+// NOTE: This can only be called after `prepareNodeService`.
+Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
+    const string& volumeId)
+{
+  CHECK(volumes.contains(volumeId));
+  VolumeData& volume = volumes.at(volumeId);
+
+  if (!nodeCapabilities.stageUnstageVolume) {
+    CHECK_EQ(VolumeState::VOL_READY, volume.state.state());
+
+    volume.state.set_state(VolumeState::NODE_READY);
+    volume.state.clear_boot_id();
+    checkpointVolumeState(volumeId);
+
+    return Nothing();
+  }
+
+  CHECK_SOME(nodeContainerId);
+
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [this, volumeId](csi::v0::Client client) {
+      VolumeData& volume = volumes.at(volumeId);
+
+      const string stagingPath = csi::paths::getMountStagingPath(
+          csi::paths::getMountRootDir(
+              slave::paths::getCsiRootDir(workDir),
+              info.storage().plugin().type(),
+              info.storage().plugin().name()),
+          volumeId);
+
+      CHECK(os::exists(stagingPath));
+
+      // A previously failed `NodeStageVolume` call can be recovered through the
+      // current `NodeUnstageVolume` call. See:
+      // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
+      if (volume.state.state() == VolumeState::VOL_READY ||
+          volume.state.state() == VolumeState::NODE_STAGE) {
+        volume.state.set_state(VolumeState::NODE_UNSTAGE);
+        checkpointVolumeState(volumeId);
+      }
+
+      CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
+
+      csi::v0::NodeUnstageVolumeRequest request;
+      request.set_volume_id(volumeId);
+      request.set_staging_target_path(stagingPath);
+
+      return client.NodeUnstageVolume(request)
+        .then(defer(self(), [this, volumeId] {
+          VolumeData& volume = volumes.at(volumeId);
+
+          volume.state.set_state(VolumeState::NODE_READY);
+          volume.state.clear_boot_id();
+          checkpointVolumeState(volumeId);
+
+          return Nothing();
+        }));
+    }));
+}
+
+
+// Transitions the state of the specified volume from `VOL_READY` or
 // `NODE_PUBLISH` to `PUBLISHED`.
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
     const string& volumeId)
 {
-  // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
-
   CHECK(volumes.contains(volumeId));
   CHECK_SOME(nodeContainerId);
 
@@ -2207,7 +2387,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
             mkdir.error());
       }
 
-      if (volume.state.state() == VolumeState::NODE_READY) {
+      if (volume.state.state() == VolumeState::VOL_READY) {
         volume.state.set_state(VolumeState::NODE_PUBLISH);
         checkpointVolumeState(volumeId);
       }
@@ -2223,12 +2403,24 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
       request.set_readonly(false);
       *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
+      if (nodeCapabilities.stageUnstageVolume) {
+        const string stagingPath = csi::paths::getMountStagingPath(
+            csi::paths::getMountRootDir(
+                slave::paths::getCsiRootDir(workDir),
+                info.storage().plugin().type(),
+                info.storage().plugin().name()),
+            volumeId);
+
+        CHECK(os::exists(stagingPath));
+
+        request.set_staging_target_path(stagingPath);
+      }
+
       return client.NodePublishVolume(request)
         .then(defer(self(), [this, volumeId] {
           VolumeData& volume = volumes.at(volumeId);
 
           volume.state.set_state(VolumeState::PUBLISHED);
-          volume.state.set_boot_id(bootId);
           checkpointVolumeState(volumeId);
 
           return Nothing();
@@ -2237,14 +2429,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
 }
 
 
-// Transition the state of the specified volume from `PUBLISHED`,
-// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `NODE_READY`.
+// Transitions the state of the specified volume from `PUBLISHED`,
+// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
 // NOTE: This can only be called after `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     const string& volumeId)
 {
-  // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
-
   CHECK(volumes.contains(volumeId));
   CHECK_SOME(nodeContainerId);
 
@@ -2280,8 +2470,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
         .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
           VolumeData& volume = volumes.at(volumeId);
 
-          volume.state.set_state(VolumeState::NODE_READY);
-          volume.state.clear_boot_id();
+          volume.state.set_state(VolumeState::VOL_READY);
           checkpointVolumeState(volumeId);
 
           Try<Nothing> rmdir = os::rmdir(targetPath);
@@ -2362,6 +2551,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
         "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
+  CHECK_SOME(controllerContainerId);
+
   const string volumePath = csi::paths::getVolumePath(
       slave::paths::getCsiRootDir(workDir),
       info.storage().plugin().type(),
@@ -2376,17 +2567,28 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
     return Nothing();
   }
 
-  CHECK_SOME(controllerContainerId);
+  const VolumeData& volume = volumes.at(volumeId);
 
   Future<Nothing> deleted = Nothing();
 
-  switch (volumes.at(volumeId).state.state()) {
+  CHECK(VolumeState::State_IsValid(volume.state.state()));
+
+  switch (volume.state.state()) {
     case VolumeState::PUBLISHED:
     case VolumeState::NODE_PUBLISH:
     case VolumeState::NODE_UNPUBLISH: {
       deleted = deleted
         .then(defer(self(), &Self::nodeUnpublish, volumeId));
 
+      // NOTE: We continue to the next case to delete the volume in `VOL_READY`
+      // state once the above is done.
+    }
+    case VolumeState::VOL_READY:
+    case VolumeState::NODE_STAGE:
+    case VolumeState::NODE_UNSTAGE: {
+      deleted = deleted
+        .then(defer(self(), &Self::nodeUnstage, volumeId));
+
       // NOTE: We continue to the next case to delete the volume in `NODE_READY`
       // state once the above is done.
     }
@@ -2403,7 +2605,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       if (!preExisting) {
         deleted = deleted
           .then(defer(self(), &Self::getService, controllerContainerId.get()))
-          .then(defer(self(), [=](csi::v0::Client client) {
+          .then(defer(self(), [volumeId](csi::v0::Client client) {
             csi::v0::DeleteVolumeRequest request;
             request.set_volume_id(volumeId);
 
@@ -2434,7 +2636,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
   // the continuation would have already been run, the returned future will
   // become ready, making the future returned by the sequence ready as well.
   return deleted
-    .then(defer(self(), [=] {
+    .then(defer(self(), [this, volumeId, volumePath] {
       volumes.erase(volumeId);
       CHECK_SOME(os::rmdir(volumePath));
 


[3/3] mesos git commit: Added `STAGE_UNSTAGE_VOLUME` capability to the test CSI plugin.

Posted by ch...@apache.org.
Added `STAGE_UNSTAGE_VOLUME` capability to the test CSI plugin.

Now it is required to call `NodeStageVolume` before `NodePublishVolume`
for the test CSI plugin. `NodeStageVolume` would bind-mount the volume
directory to the specified staging path, and then `NodePublishVolume`
would bind-mount the staging path to the target path.

Review: https://reviews.apache.org/r/66576/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5a105893
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5a105893
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5a105893

Branch: refs/heads/master
Commit: 5a105893acbae841f68fe95c109a6ff3c3fd63a9
Parents: a9aa3ad
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 19:42:29 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 19:42:29 2018 -0700

----------------------------------------------------------------------
 src/examples/test_csi_plugin.cpp | 144 +++++++++++++++++++++++++++++++++-
 1 file changed, 140 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5a105893/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 357e022..9c4da88 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -216,6 +216,16 @@ public:
       const csi::v0::ControllerGetCapabilitiesRequest* request,
       csi::v0::ControllerGetCapabilitiesResponse* response) override;
 
+  virtual Status NodeStageVolume(
+      ServerContext* context,
+      const csi::v0::NodeStageVolumeRequest* request,
+      csi::v0::NodeStageVolumeResponse* response) override;
+
+  virtual Status NodeUnstageVolume(
+      ServerContext* context,
+      const csi::v0::NodeUnstageVolumeRequest* request,
+      csi::v0::NodeUnstageVolumeResponse* response) override;
+
   virtual Status NodePublishVolume(
       ServerContext* context,
       const csi::v0::NodePublishVolumeRequest* request,
@@ -594,6 +604,112 @@ Status TestCSIPlugin::ControllerGetCapabilities(
 }
 
 
+Status TestCSIPlugin::NodeStageVolume(
+    ServerContext* context,
+    const csi::v0::NodeStageVolumeRequest* request,
+    csi::v0::NodeStageVolumeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
+
+  // TODO(chhsiao): Validate required fields.
+
+  if (!volumes.contains(request->volume_id())) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Volume '" + request->volume_id() + "' is not found");
+  }
+
+  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
+  const string path = getVolumePath(volumeInfo);
+
+  auto it = request->volume_attributes().find("path");
+  if (it == request->volume_attributes().end() || it->second != path) {
+    return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+  }
+
+  if (!os::exists(request->staging_target_path())) {
+    return Status(
+        grpc::INVALID_ARGUMENT,
+        "Target path '" + request->staging_target_path() + "' is not found");
+  }
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to get mount table: " + table.error());
+  }
+
+  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
+    if (entry.target == request->staging_target_path()) {
+      return Status::OK;
+    }
+  }
+
+  Try<Nothing> mount = fs::mount(
+      path,
+      request->staging_target_path(),
+      None(),
+      MS_BIND,
+      None());
+
+  if (mount.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to mount from '" + path + "' to '" +
+        request->staging_target_path() + "': " + mount.error());
+  }
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::NodeUnstageVolume(
+    ServerContext* context,
+    const csi::v0::NodeUnstageVolumeRequest* request,
+    csi::v0::NodeUnstageVolumeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
+
+  // TODO(chhsiao): Validate required fields.
+
+  if (!volumes.contains(request->volume_id())) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Volume '" + request->volume_id() + "' is not found");
+  }
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to get mount table: " + table.error());
+  }
+
+  bool found = false;
+  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
+    if (entry.target == request->staging_target_path()) {
+      found = true;
+      break;
+    }
+  }
+
+  if (!found) {
+    return Status::OK;
+  }
+
+  Try<Nothing> unmount = fs::unmount(request->staging_target_path());
+  if (unmount.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to unmount '" + request->staging_target_path() +
+        "': " + unmount.error());
+  }
+
+  return Status::OK;
+}
+
+
 Status TestCSIPlugin::NodePublishVolume(
     ServerContext* context,
     const csi::v0::NodePublishVolumeRequest* request,
@@ -623,6 +739,12 @@ Status TestCSIPlugin::NodePublishVolume(
         "Target path '" + request->target_path() + "' is not found");
   }
 
+  if (request->staging_target_path().empty()) {
+    return Status(
+        grpc::FAILED_PRECONDITION,
+        "Expecting 'staging_target_path' to be set");
+  }
+
   Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
   if (table.isError()) {
     return Status(
@@ -630,6 +752,20 @@ Status TestCSIPlugin::NodePublishVolume(
         "Failed to get mount table: " + table.error());
   }
 
+  bool found = false;
+  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
+    if (entry.target == request->staging_target_path()) {
+      found = true;
+      break;
+    }
+  }
+
+  if (!found) {
+    return Status(
+        grpc::FAILED_PRECONDITION,
+        "Volume '" + request->volume_id() + "' has not been staged yet");
+  }
+
   foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
     if (entry.target == request->target_path()) {
       return Status::OK;
@@ -637,7 +773,7 @@ Status TestCSIPlugin::NodePublishVolume(
   }
 
   Try<Nothing> mount = fs::mount(
-      path,
+      request->staging_target_path(),
       request->target_path(),
       None(),
       MS_BIND,
@@ -700,9 +836,6 @@ Status TestCSIPlugin::NodeUnpublishVolume(
     return Status::OK;
   }
 
-  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
-  const string path = getVolumePath(volumeInfo);
-
   Try<Nothing> unmount = fs::unmount(request->target_path());
   if (unmount.isError()) {
     return Status(
@@ -735,6 +868,9 @@ Status TestCSIPlugin::NodeGetCapabilities(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
+  response->add_capabilities()->mutable_rpc()->set_type(
+      csi::v0::NodeServiceCapability::RPC::STAGE_UNSTAGE_VOLUME);
+
   return Status::OK;
 }