You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/04/13 03:39:19 UTC
[1/3] mesos git commit: Updated filesystem layout for staging and
mounting CSI volumes.
Repository: mesos
Updated Branches:
refs/heads/master 554503985 -> 5a105893a
Updated filesystem layout for staging and mounting CSI volumes.
To support the `STAGE_UNSTAGE_VOLUME` in CSI v0.2, a CSI volume is now
staged at `<work_dir>/csi/<type>/<name>/mounts/<volume_id>/staging` and
mounted at `<work_dir>/csi/<type>/<name>/mounts/<volume_id>/target`.
The `DiskInfo.Source.Path.root` and `DiskInfo.Source.Mount.root` fields
now stores `./csi/<type>/<name>/mounts` for PATH and MOUNT disk
resources respectively, and the actual mount point is carved out from
the `root` and `id` fields. In the future, the `SharedInfo` might also
be used to construct the mount point for shared volumes.
Review: https://reviews.apache.org/r/66574/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02f9cb12
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02f9cb12
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02f9cb12
Branch: refs/heads/master
Commit: 02f9cb127893b794e9fcc8b529f45bee33dd2e4c
Parents: 5545039
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 19:42:15 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 19:42:15 2018 -0700
----------------------------------------------------------------------
src/CMakeLists.txt | 4 ++
src/Makefile.am | 19 +++++----
src/csi/paths.cpp | 36 +++++++++-------
src/csi/paths.hpp | 15 ++++---
src/resource_provider/storage/provider.cpp | 56 +++++++++++++------------
src/slave/paths.cpp | 18 ++++++++
6 files changed, 92 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ac176d1..31af9ea 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -226,6 +226,9 @@ set(COMMON_SRC
common/validation.cpp
common/values.cpp)
+set(CSI_SRC
+ csi/paths.cpp)
+
set(DOCKER_SRC
docker/docker.cpp
docker/spec.cpp)
@@ -428,6 +431,7 @@ set(MESOS_SRC
${AUTHENTICATION_SRC}
${AUTHORIZER_SRC}
${COMMON_SRC}
+ ${CSI_SRC}
${DOCKER_SRC}
${EXECUTOR_SRC}
${FILES_SRC}
http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index b590928..07eb138 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1524,28 +1524,29 @@ libbuild_la_CPPFLAGS += -DBUILD_FLAGS="\"$$BUILD_FLAGS\""
libmesos_no_3rdparty_la_LIBADD += libbuild.la
-if ENABLE_GRPC
# Convenience library for build the CSI client.
noinst_LTLIBRARIES += libcsi.la
libcsi_la_SOURCES = \
- csi/paths.cpp \
- csi/utils.cpp
+ csi/paths.cpp
+
+libcsi_la_SOURCES += \
+ csi/paths.hpp
+
+if ENABLE_GRPC
libcsi_la_SOURCES += \
../include/csi/spec.hpp \
- csi/paths.hpp \
+ csi/client.cpp \
+ csi/client.hpp \
csi/state.hpp \
csi/state.proto \
+ csi/utils.cpp \
csi/utils.hpp
-
-libcsi_la_SOURCES += \
- csi/client.cpp \
- csi/client.hpp
+endif
nodist_libcsi_la_SOURCES = $(CXX_CSI_PROTOS)
libcsi_la_CPPFLAGS = $(MESOS_CPPFLAGS)
libmesos_no_3rdparty_la_LIBADD += libcsi.la
-endif
# Convenience library for building the replicated log in order to
http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/csi/paths.cpp
----------------------------------------------------------------------
diff --git a/src/csi/paths.cpp b/src/csi/paths.cpp
index 0d42db2..cc5b9ec 100644
--- a/src/csi/paths.cpp
+++ b/src/csi/paths.cpp
@@ -42,18 +42,20 @@ namespace csi {
namespace paths {
// File names.
-const char CONTAINER_INFO_FILE[] = "container.info";
-const char ENDPOINT_SOCKET_FILE[] = "endpoint.sock";
-const char VOLUME_STATE_FILE[] = "volume.state";
+constexpr char CONTAINER_INFO_FILE[] = "container.info";
+constexpr char ENDPOINT_SOCKET_FILE[] = "endpoint.sock";
+constexpr char VOLUME_STATE_FILE[] = "volume.state";
-const char CONTAINERS_DIR[] = "containers";
-const char VOLUMES_DIR[] = "volumes";
-const char MOUNTS_DIR[] = "mounts";
+constexpr char CONTAINERS_DIR[] = "containers";
+constexpr char VOLUMES_DIR[] = "volumes";
+constexpr char MOUNTS_DIR[] = "mounts";
+constexpr char STAGING_DIR[] = "staging";
+constexpr char TARGET_DIR[] = "target";
-const char ENDPOINT_DIR_SYMLINK[] = "endpoint";
-const char ENDPOINT_DIR[] = "mesos-csi-XXXXXX";
+constexpr char ENDPOINT_DIR_SYMLINK[] = "endpoint";
+constexpr char ENDPOINT_DIR[] = "mesos-csi-XXXXXX";
Try<list<string>> getContainerPaths(
@@ -276,15 +278,19 @@ string getMountRootDir(
}
-string getMountPath(
- const string& rootDir,
- const string& type,
- const string& name,
+string getMountStagingPath(
+ const string& mountRootDir,
const string& volumeId)
{
- return path::join(
- getMountRootDir(rootDir, type, name),
- http::encode(volumeId));
+ return path::join(mountRootDir, http::encode(volumeId), STAGING_DIR);
+}
+
+
+string getMountTargetPath(
+ const string& mountRootDir,
+ const string& volumeId)
+{
+ return path::join(mountRootDir, http::encode(volumeId), TARGET_DIR);
}
} // namespace paths {
http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/csi/paths.hpp
----------------------------------------------------------------------
diff --git a/src/csi/paths.hpp b/src/csi/paths.hpp
index 7892e5e..7a4e9e0 100644
--- a/src/csi/paths.hpp
+++ b/src/csi/paths.hpp
@@ -42,7 +42,9 @@ namespace paths {
// | |-- <volume_id>
// | |-- volume.state
// |-- mounts
-// |-- <volume_id> (mount point)
+// |-- <volume_id>
+// |- staging (staging mount point)
+// |- target (mount point)
struct ContainerPath
@@ -134,10 +136,13 @@ std::string getMountRootDir(
const std::string& name);
-std::string getMountPath(
- const std::string& rootDir,
- const std::string& type,
- const std::string& name,
+std::string getMountStagingPath(
+ const std::string& mountRootDir,
+ const std::string& volumeId);
+
+
+std::string getMountTargetPath(
+ const std::string& mountRootDir,
const std::string& volumeId);
} // namespace paths {
http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ddbaa0f..ce94393 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -1785,24 +1785,23 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
endpointVolume->set_host_path(endpointDir);
// Prepare the directory where the mount points will be placed.
- const string mountDir = csi::paths::getMountRootDir(
+ const string mountRootDir = csi::paths::getMountRootDir(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
info.storage().plugin().name());
- Try<Nothing> mkdir = os::mkdir(mountDir);
+ Try<Nothing> mkdir = os::mkdir(mountRootDir);
if (mkdir.isError()) {
return Failure(
- "Failed to create directory '" + mountDir +
- "': " + mkdir.error());
+ "Failed to create directory '" + mountRootDir + "': " + mkdir.error());
}
// Prepare a volume where the mount points will be placed.
Volume* mountVolume = containerInfo.add_volumes();
mountVolume->set_mode(Volume::RW);
- mountVolume->set_container_path(mountDir);
+ mountVolume->set_container_path(mountRootDir);
mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
- mountVolume->mutable_source()->mutable_host_path()->set_path(mountDir);
+ mountVolume->mutable_source()->mutable_host_path()->set_path(mountRootDir);
mountVolume->mutable_source()->mutable_host_path()
->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
@@ -2194,16 +2193,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
csi::v0::Client client) -> Future<Nothing> {
VolumeData& volume = volumes.at(volumeId);
- const string mountPath = csi::paths::getMountPath(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name(),
+ const string targetPath = csi::paths::getMountTargetPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
volumeId);
- Try<Nothing> mkdir = os::mkdir(mountPath);
+ Try<Nothing> mkdir = os::mkdir(targetPath);
if (mkdir.isError()) {
return Failure(
- "Failed to create mount point '" + mountPath + "': " +
+ "Failed to create mount target path '" + targetPath + "': " +
mkdir.error());
}
@@ -2217,7 +2217,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
csi::v0::NodePublishVolumeRequest request;
request.set_volume_id(volumeId);
*request.mutable_publish_info() = volume.state.publish_info();
- request.set_target_path(mountPath);
+ request.set_target_path(targetPath);
request.mutable_volume_capability()
->CopyFrom(volume.state.volume_capability());
request.set_readonly(false);
@@ -2252,13 +2252,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
.then(defer(self(), [this, volumeId](csi::v0::Client client) {
VolumeData& volume = volumes.at(volumeId);
- const string mountPath = csi::paths::getMountPath(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name(),
+ const string targetPath = csi::paths::getMountTargetPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
volumeId);
- CHECK(os::exists(mountPath));
+ CHECK(os::exists(targetPath));
// A previously failed `NodePublishVolume` call can be recovered through
// the current `NodeUnpublishVolume` call. See:
@@ -2273,20 +2274,20 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
csi::v0::NodeUnpublishVolumeRequest request;
request.set_volume_id(volumeId);
- request.set_target_path(mountPath);
+ request.set_target_path(targetPath);
return client.NodeUnpublishVolume(request)
- .then(defer(self(), [this, volumeId, mountPath]() -> Future<Nothing> {
+ .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
VolumeData& volume = volumes.at(volumeId);
volume.state.set_state(VolumeState::NODE_READY);
volume.state.clear_boot_id();
checkpointVolumeState(volumeId);
- Try<Nothing> rmdir = os::rmdir(mountPath);
+ Try<Nothing> rmdir = os::rmdir(targetPath);
if (rmdir.isError()) {
return Failure(
- "Failed to remove mount point '" + mountPath + "': " +
+ "Failed to remove mount point '" + targetPath + "': " +
rmdir.error());
}
@@ -2885,23 +2886,24 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
->CopyFrom(convertStringMapToLabels(volumeState.volume_attributes()));
}
- const string mountPath = csi::paths::getMountPath(
+ const string mountRootDir = csi::paths::getMountRootDir(
slave::paths::getCsiRootDir("."),
info.storage().plugin().type(),
- info.storage().plugin().name(),
- volumeId);
+ info.storage().plugin().name());
switch (type) {
case Resource::DiskInfo::Source::PATH: {
// Set the root path relative to agent work dir.
converted.mutable_disk()->mutable_source()->mutable_path()
- ->set_root(mountPath);
+ ->set_root(mountRootDir);
+
break;
}
case Resource::DiskInfo::Source::MOUNT: {
// Set the root path relative to agent work dir.
converted.mutable_disk()->mutable_source()->mutable_mount()
- ->set_root(mountPath);
+ ->set_root(mountRootDir);
+
break;
}
case Resource::DiskInfo::Source::BLOCK: {
http://git-wip-us.apache.org/repos/asf/mesos/blob/02f9cb12/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index 8a7e162..690bfe3 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -36,6 +36,8 @@
#include "common/validation.hpp"
+#include "csi/paths.hpp"
+
#include "messages/messages.hpp"
#include "slave/paths.hpp"
@@ -673,10 +675,18 @@ string getPersistentVolumePath(
CHECK(volume.disk().source().has_path());
CHECK(volume.disk().source().path().has_root());
string root = volume.disk().source().path().root();
+
if (!path::absolute(root)) {
// A relative path in `root` is relative to agent work dir.
root = path::join(workDir, root);
}
+
+ if (volume.disk().source().has_id()) {
+ // For a CSI volume the mount point is derived from `root` and `id`.
+ root =
+ csi::paths::getMountTargetPath(root, volume.disk().source().id());
+ }
+
return getPersistentVolumePath(
root,
role,
@@ -687,10 +697,18 @@ string getPersistentVolumePath(
CHECK(volume.disk().source().has_mount());
CHECK(volume.disk().source().mount().has_root());
string root = volume.disk().source().mount().root();
+
if (!path::absolute(root)) {
// A relative path in `root` is relative to agent work dir.
root = path::join(workDir, root);
}
+
+ if (volume.disk().source().has_id()) {
+ // For a CSI volume the mount point is derived from `root` and `id`.
+ root =
+ csi::paths::getMountTargetPath(root, volume.disk().source().id());
+ }
+
return root;
}
case Resource::DiskInfo::Source::BLOCK:
[2/3] mesos git commit: Supported `STAGE_UNSTAGE_VOLUME` CSI node
capability in SLRP.
Posted by ch...@apache.org.
Supported `STAGE_UNSTAGE_VOLUME` CSI node capability in SLRP.
The storage local resource provider now properly calls `NodeStageVolume`
or `NodeUnstageVolume` when publishing or deleting volumes for CSI
plugins support the `STAGE_UNSTAGE_VOLUME` capability.
Review: https://reviews.apache.org/r/66575/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a9aa3ad8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a9aa3ad8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a9aa3ad8
Branch: refs/heads/master
Commit: a9aa3ad8599cdafbd8d36620887c7f9002ff56d2
Parents: 02f9cb1
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 19:42:26 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 19:42:26 2018 -0700
----------------------------------------------------------------------
src/csi/state.proto | 33 +--
src/resource_provider/storage/provider.cpp | 350 +++++++++++++++++++-----
2 files changed, 293 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a9aa3ad8/src/csi/state.proto
----------------------------------------------------------------------
diff --git a/src/csi/state.proto b/src/csi/state.proto
index a252cb2..8445399 100644
--- a/src/csi/state.proto
+++ b/src/csi/state.proto
@@ -26,10 +26,13 @@ message VolumeState {
enum State {
UNKNOWN = 0;
CREATED = 1; // The volume is provisioned but not published.
- NODE_READY = 2; // The volume is attached to the node.
- PUBLISHED = 3; // The volume is mounted on the node.
+ NODE_READY = 2; // The volume is made available on the node.
+ VOL_READY = 8; // The volume is made publishable on the node.
+ PUBLISHED = 3; // The volume is published on the node.
CONTROLLER_PUBLISH = 4; // `ControllerPublishVolume` is being called.
CONTROLLER_UNPUBLISH = 5; // `ControllerUnpublishVolume` is being called.
+ NODE_STAGE = 9; // `NodeStageVolume` is being called.
+ NODE_UNSTAGE = 10; // `NodeUnstageVolume` is being called.
NODE_PUBLISH = 6; // `NodePublishVolume` is being called.
NODE_UNPUBLISH = 7; // `NodeUnpublishVolume` is being called.
}
@@ -37,25 +40,23 @@ message VolumeState {
// The state of the volume. This is a REQUIRED field.
State state = 1;
- // The capability used to publish the volume. This is a
- // REQUIRED field.
+ // The capability used to publish the volume. This is a REQUIRED field.
.csi.v0.VolumeCapability volume_capability = 2;
- // Attributes of the volume to be used on the node. This field MUST
- // match the attributes of the `Volume` returned by `CreateVolume`.
- // This is an OPTIONAL field.
+ // Attributes of the volume to be used on the node. This field MUST match the
+ // attributes of the `Volume` returned by `CreateVolume`. This is an OPTIONAL
+ // field.
map<string, string> volume_attributes = 3;
- // If the plugin has the `PUBLISH_UNPUBLISH_VOLUME` controller
- // capability, this field MUST be set to the value returned by
- // `ControllerPublishVolume`. Otherwise, this field MUST remain unset.
- // This is an OPTIONAL field.
+ // If the plugin has the `PUBLISH_UNPUBLISH_VOLUME` controller capability,
+ // this field MUST be set to the value returned by `ControllerPublishVolume`.
+ // Otherwise, this field MUST remain unset. This is an OPTIONAL field.
map<string, string> publish_info = 4;
- // This field is used to check if the node has been rebooted since the
- // last time the volume is mounted. If yes, `NodePublishVolume` needs
- // to be called to mount the volume again. It MUST be set to the boot
- // ID of the node if the volume is mounted, and SHOULD remain unset
- // otherwise. This is an OPTIONAL field.
+ // This field is used to check if the node has been rebooted since the volume
+ // was transitioned to `VOL_READY` state. If yes, `NodeStageVolume` needs to
+ // be called to make the volume publishable again. It MUST be set to the boot
+ // ID of the node if the volume is in `VOL_READY` state, and SHOULD remain
+ // unset otherwise. This is an OPTIONAL field.
string boot_id = 5;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a9aa3ad8/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ce94393..8ca2d3a 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -326,7 +326,7 @@ public:
private:
struct VolumeData
{
- VolumeData(const VolumeState& _state)
+ VolumeData(VolumeState&& _state)
: state(_state), sequence(new Sequence("volume-sequence")) {}
VolumeState state;
@@ -376,6 +376,8 @@ private:
Future<Nothing> prepareNodeService();
Future<Nothing> controllerPublish(const string& volumeId);
Future<Nothing> controllerUnpublish(const string& volumeId);
+ Future<Nothing> nodeStage(const string& volumeId);
+ Future<Nothing> nodeUnstage(const string& volumeId);
Future<Nothing> nodePublish(const string& volumeId);
Future<Nothing> nodeUnpublish(const string& volumeId);
Future<string> createVolume(
@@ -640,7 +642,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
auto err = [](const string& message) {
LOG(ERROR)
- << "Failed to watch for VolumeprofileAdaptor: " << message;
+ << "Failed to watch for DiskProfileAdaptor: " << message;
};
// Start watching the DiskProfileAdaptor.
@@ -843,59 +845,94 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
Future<Nothing> recovered = Nothing();
- switch (volume.state.state()) {
- case VolumeState::CREATED:
- case VolumeState::NODE_READY: {
- break;
- }
- case VolumeState::PUBLISHED: {
- if (volume.state.boot_id() != bootId) {
- // The node has been restarted since the volume is mounted,
- // so it is no longer in the `PUBLISHED` state.
- volume.state.set_state(VolumeState::NODE_READY);
- volume.state.clear_boot_id();
- checkpointVolumeState(volumeId);
+ if (VolumeState::State_IsValid(volume.state.state())) {
+ switch (volume.state.state()) {
+ case VolumeState::CREATED:
+ case VolumeState::NODE_READY: {
+ break;
}
- break;
- }
- case VolumeState::CONTROLLER_PUBLISH: {
- recovered =
- volume.sequence->add(std::function<Future<Nothing>()>(
+ case VolumeState::VOL_READY:
+ case VolumeState::PUBLISHED: {
+ if (volume.state.boot_id() != bootId) {
+ // The node has been restarted since the volume is made
+ // publishable, so it is reset to `NODE_READY` state.
+ volume.state.set_state(VolumeState::NODE_READY);
+ volume.state.clear_boot_id();
+ checkpointVolumeState(volumeId);
+ }
+
+ break;
+ }
+ case VolumeState::CONTROLLER_PUBLISH: {
+ recovered = volume.sequence->add(std::function<Future<Nothing>()>(
defer(self(), &Self::controllerPublish, volumeId)));
- break;
- }
- case VolumeState::CONTROLLER_UNPUBLISH: {
- recovered =
- volume.sequence->add(std::function<Future<Nothing>()>(
+
+ break;
+ }
+ case VolumeState::CONTROLLER_UNPUBLISH: {
+ recovered = volume.sequence->add(std::function<Future<Nothing>()>(
defer(self(), &Self::controllerUnpublish, volumeId)));
- break;
- }
- case VolumeState::NODE_PUBLISH: {
- recovered =
- volume.sequence->add(std::function<Future<Nothing>()>(
- defer(self(), &Self::nodePublish, volumeId)));
- break;
- }
- case VolumeState::NODE_UNPUBLISH: {
- recovered =
- volume.sequence->add(std::function<Future<Nothing>()>(
- defer(self(), &Self::nodeUnpublish, volumeId)));
- break;
- }
- case VolumeState::UNKNOWN: {
- recovered = Failure(
- "Volume '" + volumeId + "' is in " +
- stringify(volume.state.state()) + " state");
- }
- // NOTE: We avoid using a default clause for the following
- // values in proto3's open enum to enable the compiler to detect
- // missing enum cases for us. See:
- // https://github.com/google/protobuf/issues/3917
- case google::protobuf::kint32min:
- case google::protobuf::kint32max: {
- UNREACHABLE();
+ break;
+ }
+ case VolumeState::NODE_STAGE: {
+ recovered = volume.sequence->add(std::function<Future<Nothing>()>(
+ defer(self(), &Self::nodeStage, volumeId)));
+
+ break;
+ }
+ case VolumeState::NODE_UNSTAGE: {
+ recovered = volume.sequence->add(std::function<Future<Nothing>()>(
+ defer(self(), &Self::nodeUnstage, volumeId)));
+
+ break;
+ }
+ case VolumeState::NODE_PUBLISH: {
+ if (volume.state.boot_id() != bootId) {
+ // The node has been restarted since `NodePublishVolume` was
+ // called, so it is reset to `NODE_READY` state.
+ volume.state.set_state(VolumeState::NODE_READY);
+ volume.state.clear_boot_id();
+ checkpointVolumeState(volumeId);
+ } else {
+ recovered = volume.sequence->add(std::function<Future<Nothing>()>(
+ defer(self(), &Self::nodePublish, volumeId)));
+ }
+
+ break;
+ }
+ case VolumeState::NODE_UNPUBLISH: {
+ if (volume.state.boot_id() != bootId) {
+ // The node has been restarted since `NodeUnpublishVolume` was
+ // called, so it is reset to `NODE_READY` state.
+ volume.state.set_state(VolumeState::NODE_READY);
+ volume.state.clear_boot_id();
+ checkpointVolumeState(volumeId);
+ } else {
+ recovered = volume.sequence->add(std::function<Future<Nothing>()>(
+ defer(self(), &Self::nodeUnpublish, volumeId)));
+ }
+
+ break;
+ }
+ case VolumeState::UNKNOWN: {
+ recovered = Failure(
+ "Volume '" + volumeId + "' is in " +
+ stringify(volume.state.state()) + " state");
+
+ break;
+ }
+
+ // NOTE: We avoid using a default clause for the following values in
+ // proto3's open enum to enable the compiler to detect missing enum
+ // cases for us. See: https://github.com/google/protobuf/issues/3917
+ case google::protobuf::kint32min:
+ case google::protobuf::kint32max: {
+ UNREACHABLE();
+ }
}
+ } else {
+ recovered = Failure("Volume '" + volumeId + "' is in UNDEFINED state");
}
futures.push_back(recovered);
@@ -1511,10 +1548,13 @@ void StorageLocalResourceProviderProcess::publishResources(
std::function<Future<Nothing>()> controllerAndNodePublish =
defer(self(), [=] {
CHECK(volumes.contains(volumeId));
+ const VolumeData& volume = volumes.at(volumeId);
Future<Nothing> published = Nothing();
- switch (volumes.at(volumeId).state.state()) {
+ CHECK(VolumeState::State_IsValid(volume.state.state()));
+
+ switch (volume.state.state()) {
case VolumeState::CONTROLLER_UNPUBLISH: {
published = published
.then(defer(self(), &Self::controllerUnpublish, volumeId));
@@ -1526,18 +1566,34 @@ void StorageLocalResourceProviderProcess::publishResources(
case VolumeState::CONTROLLER_PUBLISH: {
published = published
.then(defer(self(), &Self::controllerPublish, volumeId))
+ .then(defer(self(), &Self::nodeStage, volumeId))
.then(defer(self(), &Self::nodePublish, volumeId));
break;
}
- case VolumeState::NODE_UNPUBLISH: {
+ case VolumeState::NODE_UNSTAGE: {
published = published
- .then(defer(self(), &Self::nodeUnpublish, volumeId));
+ .then(defer(self(), &Self::nodeUnstage, volumeId));
// NOTE: We continue to the next case to publish the volume in
// `NODE_READY` state once the above is done.
}
case VolumeState::NODE_READY:
+ case VolumeState::NODE_STAGE: {
+ published = published
+ .then(defer(self(), &Self::nodeStage, volumeId))
+ .then(defer(self(), &Self::nodePublish, volumeId));
+
+ break;
+ }
+ case VolumeState::NODE_UNPUBLISH: {
+ published = published
+ .then(defer(self(), &Self::nodeUnpublish, volumeId));
+
+ // NOTE: We continue to the next case to publish the volume in
+ // `VOL_READY` state once the above is done.
+ }
+ case VolumeState::VOL_READY:
case VolumeState::NODE_PUBLISH: {
published = published
.then(defer(self(), &Self::nodePublish, volumeId));
@@ -2038,12 +2094,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
-> Future<csi::v0::Client> {
nodeCapabilities = response.capabilities();
- // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
- if (nodeCapabilities.stageUnstageVolume) {
- return Failure(
- "Node capability 'STAGE_UNSTAGE_VOLUME' is not supported");
- }
-
// Get the latest service future before proceeding to the next step.
return getService(nodeContainerId.get());
}))
@@ -2177,14 +2227,144 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
}
-// Transition the state of the specified volume from `NODE_READY` or
+// Transitions the state of the specified volume from `NODE_READY` or
+// `NODE_STAGE` to `VOL_READY`.
+// NOTE: This can only be called after `prepareNodeService`.
+Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
+ const string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeData& volume = volumes.at(volumeId);
+
+ if (!nodeCapabilities.stageUnstageVolume) {
+ CHECK_EQ(VolumeState::NODE_READY, volume.state.state());
+
+ volume.state.set_state(VolumeState::VOL_READY);
+ volume.state.set_boot_id(bootId);
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }
+
+ CHECK_SOME(nodeContainerId);
+
+ return getService(nodeContainerId.get())
+ .then(defer(self(), [this, volumeId](
+ csi::v0::Client client) -> Future<Nothing> {
+ VolumeData& volume = volumes.at(volumeId);
+
+ const string stagingPath = csi::paths::getMountStagingPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
+ volumeId);
+
+ Try<Nothing> mkdir = os::mkdir(stagingPath);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create mount staging path '" + stagingPath + "': " +
+ mkdir.error());
+ }
+
+ if (volume.state.state() == VolumeState::NODE_READY) {
+ volume.state.set_state(VolumeState::NODE_STAGE);
+ checkpointVolumeState(volumeId);
+ }
+
+ CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
+
+ csi::v0::NodeStageVolumeRequest request;
+ request.set_volume_id(volumeId);
+ *request.mutable_publish_info() = volume.state.publish_info();
+ request.set_staging_target_path(stagingPath);
+ request.mutable_volume_capability()
+ ->CopyFrom(volume.state.volume_capability());
+ *request.mutable_volume_attributes() = volume.state.volume_attributes();
+
+ return client.NodeStageVolume(request)
+ .then(defer(self(), [this, volumeId] {
+ VolumeData& volume = volumes.at(volumeId);
+
+ volume.state.set_state(VolumeState::VOL_READY);
+ volume.state.set_boot_id(bootId);
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }));
+ }));
+}
+
+
+// Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE`
+// or `NODE_UNSTAGE` to `NODE_READY`.
+// NOTE: This can only be called after `prepareNodeService`.
+Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
+ const string& volumeId)
+{
+ CHECK(volumes.contains(volumeId));
+ VolumeData& volume = volumes.at(volumeId);
+
+ if (!nodeCapabilities.stageUnstageVolume) {
+ CHECK_EQ(VolumeState::VOL_READY, volume.state.state());
+
+ volume.state.set_state(VolumeState::NODE_READY);
+ volume.state.clear_boot_id();
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }
+
+ CHECK_SOME(nodeContainerId);
+
+ return getService(nodeContainerId.get())
+ .then(defer(self(), [this, volumeId](csi::v0::Client client) {
+ VolumeData& volume = volumes.at(volumeId);
+
+ const string stagingPath = csi::paths::getMountStagingPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
+ volumeId);
+
+ CHECK(os::exists(stagingPath));
+
+ // A previously failed `NodeStageVolume` call can be recovered through the
+ // current `NodeUnstageVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
+ if (volume.state.state() == VolumeState::VOL_READY ||
+ volume.state.state() == VolumeState::NODE_STAGE) {
+ volume.state.set_state(VolumeState::NODE_UNSTAGE);
+ checkpointVolumeState(volumeId);
+ }
+
+ CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
+
+ csi::v0::NodeUnstageVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_staging_target_path(stagingPath);
+
+ return client.NodeUnstageVolume(request)
+ .then(defer(self(), [this, volumeId] {
+ VolumeData& volume = volumes.at(volumeId);
+
+ volume.state.set_state(VolumeState::NODE_READY);
+ volume.state.clear_boot_id();
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
+ }));
+ }));
+}
+
+
+// Transitions the state of the specified volume from `VOL_READY` or
// `NODE_PUBLISH` to `PUBLISHED`.
// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
const string& volumeId)
{
- // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
-
CHECK(volumes.contains(volumeId));
CHECK_SOME(nodeContainerId);
@@ -2207,7 +2387,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
mkdir.error());
}
- if (volume.state.state() == VolumeState::NODE_READY) {
+ if (volume.state.state() == VolumeState::VOL_READY) {
volume.state.set_state(VolumeState::NODE_PUBLISH);
checkpointVolumeState(volumeId);
}
@@ -2223,12 +2403,24 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
request.set_readonly(false);
*request.mutable_volume_attributes() = volume.state.volume_attributes();
+ if (nodeCapabilities.stageUnstageVolume) {
+ const string stagingPath = csi::paths::getMountStagingPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
+ volumeId);
+
+ CHECK(os::exists(stagingPath));
+
+ request.set_staging_target_path(stagingPath);
+ }
+
return client.NodePublishVolume(request)
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
volume.state.set_state(VolumeState::PUBLISHED);
- volume.state.set_boot_id(bootId);
checkpointVolumeState(volumeId);
return Nothing();
@@ -2237,14 +2429,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
}
-// Transition the state of the specified volume from `PUBLISHED`,
-// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `NODE_READY`.
+// Transitions the state of the specified volume from `PUBLISHED`,
+// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
const string& volumeId)
{
- // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
-
CHECK(volumes.contains(volumeId));
CHECK_SOME(nodeContainerId);
@@ -2280,8 +2470,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
.then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
VolumeData& volume = volumes.at(volumeId);
- volume.state.set_state(VolumeState::NODE_READY);
- volume.state.clear_boot_id();
+ volume.state.set_state(VolumeState::VOL_READY);
checkpointVolumeState(volumeId);
Try<Nothing> rmdir = os::rmdir(targetPath);
@@ -2362,6 +2551,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
"Controller capability 'CREATE_DELETE_VOLUME' is not supported");
}
+ CHECK_SOME(controllerContainerId);
+
const string volumePath = csi::paths::getVolumePath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
@@ -2376,17 +2567,28 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
return Nothing();
}
- CHECK_SOME(controllerContainerId);
+ const VolumeData& volume = volumes.at(volumeId);
Future<Nothing> deleted = Nothing();
- switch (volumes.at(volumeId).state.state()) {
+ CHECK(VolumeState::State_IsValid(volume.state.state()));
+
+ switch (volume.state.state()) {
case VolumeState::PUBLISHED:
case VolumeState::NODE_PUBLISH:
case VolumeState::NODE_UNPUBLISH: {
deleted = deleted
.then(defer(self(), &Self::nodeUnpublish, volumeId));
+ // NOTE: We continue to the next case to delete the volume in `VOL_READY`
+ // state once the above is done.
+ }
+ case VolumeState::VOL_READY:
+ case VolumeState::NODE_STAGE:
+ case VolumeState::NODE_UNSTAGE: {
+ deleted = deleted
+ .then(defer(self(), &Self::nodeUnstage, volumeId));
+
// NOTE: We continue to the next case to delete the volume in `NODE_READY`
// state once the above is done.
}
@@ -2403,7 +2605,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
if (!preExisting) {
deleted = deleted
.then(defer(self(), &Self::getService, controllerContainerId.get()))
- .then(defer(self(), [=](csi::v0::Client client) {
+ .then(defer(self(), [volumeId](csi::v0::Client client) {
csi::v0::DeleteVolumeRequest request;
request.set_volume_id(volumeId);
@@ -2434,7 +2636,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
// the continuation would have already been run, the returned future will
// become ready, making the future returned by the sequence ready as well.
return deleted
- .then(defer(self(), [=] {
+ .then(defer(self(), [this, volumeId, volumePath] {
volumes.erase(volumeId);
CHECK_SOME(os::rmdir(volumePath));
[3/3] mesos git commit: Added `STAGE_UNSTAGE_VOLUME` capability to
the test CSI plugin.
Posted by ch...@apache.org.
Added `STAGE_UNSTAGE_VOLUME` capability to the test CSI plugin.
Now it is required to call `NodeStageVolume` before `NodePublishVolume`
for the test CSI plugin. `NodeStageVolume` would bind-mount the volume
directory to the specified staging path, and then `NodePublishVolume`
would bind-mount the staging path to the target path.
Review: https://reviews.apache.org/r/66576/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5a105893
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5a105893
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5a105893
Branch: refs/heads/master
Commit: 5a105893acbae841f68fe95c109a6ff3c3fd63a9
Parents: a9aa3ad
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 19:42:29 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 19:42:29 2018 -0700
----------------------------------------------------------------------
src/examples/test_csi_plugin.cpp | 144 +++++++++++++++++++++++++++++++++-
1 file changed, 140 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5a105893/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 357e022..9c4da88 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -216,6 +216,16 @@ public:
const csi::v0::ControllerGetCapabilitiesRequest* request,
csi::v0::ControllerGetCapabilitiesResponse* response) override;
+ virtual Status NodeStageVolume(
+ ServerContext* context,
+ const csi::v0::NodeStageVolumeRequest* request,
+ csi::v0::NodeStageVolumeResponse* response) override;
+
+ virtual Status NodeUnstageVolume(
+ ServerContext* context,
+ const csi::v0::NodeUnstageVolumeRequest* request,
+ csi::v0::NodeUnstageVolumeResponse* response) override;
+
virtual Status NodePublishVolume(
ServerContext* context,
const csi::v0::NodePublishVolumeRequest* request,
@@ -594,6 +604,112 @@ Status TestCSIPlugin::ControllerGetCapabilities(
}
+Status TestCSIPlugin::NodeStageVolume(
+ ServerContext* context,
+ const csi::v0::NodeStageVolumeRequest* request,
+ csi::v0::NodeStageVolumeResponse* response)
+{
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
+
+ // TODO(chhsiao): Validate required fields.
+
+ if (!volumes.contains(request->volume_id())) {
+ return Status(
+ grpc::NOT_FOUND,
+ "Volume '" + request->volume_id() + "' is not found");
+ }
+
+ const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
+ const string path = getVolumePath(volumeInfo);
+
+ auto it = request->volume_attributes().find("path");
+ if (it == request->volume_attributes().end() || it->second != path) {
+ return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+ }
+
+ if (!os::exists(request->staging_target_path())) {
+ return Status(
+ grpc::INVALID_ARGUMENT,
+ "Target path '" + request->staging_target_path() + "' is not found");
+ }
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ if (table.isError()) {
+ return Status(
+ grpc::INTERNAL,
+ "Failed to get mount table: " + table.error());
+ }
+
+ foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
+ if (entry.target == request->staging_target_path()) {
+ return Status::OK;
+ }
+ }
+
+ Try<Nothing> mount = fs::mount(
+ path,
+ request->staging_target_path(),
+ None(),
+ MS_BIND,
+ None());
+
+ if (mount.isError()) {
+ return Status(
+ grpc::INTERNAL,
+ "Failed to mount from '" + path + "' to '" +
+ request->staging_target_path() + "': " + mount.error());
+ }
+
+ return Status::OK;
+}
+
+
+Status TestCSIPlugin::NodeUnstageVolume(
+ ServerContext* context,
+ const csi::v0::NodeUnstageVolumeRequest* request,
+ csi::v0::NodeUnstageVolumeResponse* response)
+{
+ LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
+
+ // TODO(chhsiao): Validate required fields.
+
+ if (!volumes.contains(request->volume_id())) {
+ return Status(
+ grpc::NOT_FOUND,
+ "Volume '" + request->volume_id() + "' is not found");
+ }
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ if (table.isError()) {
+ return Status(
+ grpc::INTERNAL,
+ "Failed to get mount table: " + table.error());
+ }
+
+ bool found = false;
+ foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
+ if (entry.target == request->staging_target_path()) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ return Status::OK;
+ }
+
+ Try<Nothing> unmount = fs::unmount(request->staging_target_path());
+ if (unmount.isError()) {
+ return Status(
+ grpc::INTERNAL,
+ "Failed to unmount '" + request->staging_target_path() +
+ "': " + unmount.error());
+ }
+
+ return Status::OK;
+}
+
+
Status TestCSIPlugin::NodePublishVolume(
ServerContext* context,
const csi::v0::NodePublishVolumeRequest* request,
@@ -623,6 +739,12 @@ Status TestCSIPlugin::NodePublishVolume(
"Target path '" + request->target_path() + "' is not found");
}
+ if (request->staging_target_path().empty()) {
+ return Status(
+ grpc::FAILED_PRECONDITION,
+ "Expecting 'staging_target_path' to be set");
+ }
+
Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
if (table.isError()) {
return Status(
@@ -630,6 +752,20 @@ Status TestCSIPlugin::NodePublishVolume(
"Failed to get mount table: " + table.error());
}
+ bool found = false;
+ foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
+ if (entry.target == request->staging_target_path()) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ return Status(
+ grpc::FAILED_PRECONDITION,
+ "Volume '" + request->volume_id() + "' has not been staged yet");
+ }
+
foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
if (entry.target == request->target_path()) {
return Status::OK;
@@ -637,7 +773,7 @@ Status TestCSIPlugin::NodePublishVolume(
}
Try<Nothing> mount = fs::mount(
- path,
+ request->staging_target_path(),
request->target_path(),
None(),
MS_BIND,
@@ -700,9 +836,6 @@ Status TestCSIPlugin::NodeUnpublishVolume(
return Status::OK;
}
- const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
- const string path = getVolumePath(volumeInfo);
-
Try<Nothing> unmount = fs::unmount(request->target_path());
if (unmount.isError()) {
return Status(
@@ -735,6 +868,9 @@ Status TestCSIPlugin::NodeGetCapabilities(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
+ response->add_capabilities()->mutable_rpc()->set_type(
+ csi::v0::NodeServiceCapability::RPC::STAGE_UNSTAGE_VOLUME);
+
return Status::OK;
}