You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/09 06:13:14 UTC
[mesos] 05/06: Refactored the test CSI plugin to make it easy to
support CSI v1.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 8e60bc7a921a2d4b37e5bec32ad3a10ddec96e78
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Apr 4 18:48:24 2019 -0700
Refactored the test CSI plugin to make it easy to support CSI v1.
Review: https://reviews.apache.org/r/70403
---
src/examples/test_csi_plugin.cpp | 951 +++++++++++++++++++++++++--------------
1 file changed, 606 insertions(+), 345 deletions(-)
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 4321f8f..54753d9 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -14,9 +14,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include <algorithm>
+#include <limits>
#include <memory>
#include <thread>
#include <utility>
+#include <vector>
+
+#include <google/protobuf/map.h>
+#include <google/protobuf/message.h>
#include <grpcpp/grpcpp.h>
@@ -25,16 +31,24 @@
#include <mesos/type_utils.hpp>
+#include <mesos/csi/types.hpp>
#include <mesos/csi/v0.hpp>
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+
#include <stout/bytes.hpp>
#include <stout/flags.hpp>
#include <stout/foreach.hpp>
#include <stout/hashmap.hpp>
+#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/path.hpp>
+#include <stout/some.hpp>
#include <stout/stringify.hpp>
#include <stout/strings.hpp>
+#include <stout/try.hpp>
+#include <stout/unreachable.hpp>
#include <stout/os/exists.hpp>
#include <stout/os/ls.hpp>
@@ -47,6 +61,7 @@
#include "logging/logging.hpp"
+namespace http = process::http;
namespace fs = mesos::internal::fs;
using std::cerr;
@@ -59,6 +74,10 @@ using std::string;
using std::unique_ptr;
using std::vector;
+using google::protobuf::Map;
+using google::protobuf::MapPair;
+using google::protobuf::RepeatedPtrField;
+
using grpc::AsyncGenericService;
using grpc::ByteBuffer;
using grpc::ClientContext;
@@ -73,6 +92,10 @@ using grpc::ServerContext;
using grpc::Status;
using grpc::WriteOptions;
+using mesos::csi::types::VolumeCapability;
+
+using process::grpc::StatusError;
+
constexpr char PLUGIN_NAME[] = "org.apache.mesos.csi.test";
constexpr char NODE_ID[] = "localhost";
constexpr Bytes DEFAULT_VOLUME_CAPACITY = Megabytes(64);
@@ -144,7 +167,7 @@ public:
// Construct the default mount volume capability.
defaultVolumeCapability.mutable_mount();
defaultVolumeCapability.mutable_access_mode()
- ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+ ->set_mode(VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
// Scan for preprovisioned volumes.
//
@@ -280,12 +303,69 @@ private:
string getVolumePath(const VolumeInfo& volumeInfo);
Try<VolumeInfo> parseVolumePath(const string& path);
+ Try<VolumeInfo, StatusError> createVolume(
+ const string& name,
+ const Bytes& requiredBytes,
+ const Bytes& limitBytes,
+ const RepeatedPtrField<VolumeCapability>& capabilities,
+ const Map<string, string> parameters);
+
+ Try<Nothing, StatusError> deleteVolume(const string& volumeId);
+
+ Try<Nothing, StatusError> controllerPublishVolume(
+ const string& volumeId,
+ const string& nodeId,
+ const VolumeCapability& capability,
+ bool readonly,
+ const Map<string, string>& volumeContext);
+
+ Try<Nothing, StatusError> controllerUnpublishVolume(
+ const string& volumeId, const string& nodeId);
+
+ // Returns `StatusError` if the volume does not exist; returns `Option<Error>`
+ // with an error set if the volume is not compatible with the given arguments.
+ Try<Option<Error>, StatusError> validateVolumeCapabilities(
+ const string& volumeId,
+ const Map<string, string>& volumeContext,
+ const RepeatedPtrField<VolumeCapability>& capabilities,
+ const Option<Map<string, string>>& parameters = None());
+
+ Try<vector<VolumeInfo>, StatusError> listVolumes(
+ const Option<int32_t>& maxEntries,
+ const Option<string>& startingToken);
+
+ Try<Bytes, StatusError> getCapacity(
+ const RepeatedPtrField<VolumeCapability>& capabilities,
+ const Map<string, string>& parameters);
+
+ Try<Nothing, StatusError> nodeStageVolume(
+ const string& volumeId,
+ const Map<string, string>& publishContext,
+ const string& stagingPath,
+ const VolumeCapability& capability,
+ const Map<string, string>& volumeContext);
+
+ Try<Nothing, StatusError> nodeUnstageVolume(
+ const string& volumeId, const string& stagingPath);
+
+ Try<Nothing, StatusError> nodePublishVolume(
+ const string& volumeId,
+ const Map<string, string>& publishContext,
+ const string& stagingPath,
+ const string& targetPath,
+ const VolumeCapability& capability,
+ bool readonly,
+ const Map<string, string>& volumeContext);
+
+ Try<Nothing, StatusError> nodeUnpublishVolume(
+ const string& volumeId, const string& targetPath);
+
const string workDir;
const string endpoint;
Bytes availableCapacity;
- csi::v0::VolumeCapability defaultVolumeCapability;
- google::protobuf::Map<string, string> createParameters;
+ VolumeCapability defaultVolumeCapability;
+ Map<string, string> createParameters;
hashmap<string, VolumeInfo> volumes;
};
@@ -349,92 +429,26 @@ Status TestCSIPlugin::CreateVolume(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Validate required fields.
+ // TODO(chhsiao): Validate the request.
- if (request->name().empty()) {
- return Status(grpc::INVALID_ARGUMENT, "Volume name cannot be empty");
- }
-
- if (strings::contains(request->name(), stringify(os::PATH_SEPARATOR))) {
- return Status(
- grpc::INVALID_ARGUMENT,
- "Volume name cannot contain '" + stringify(os::PATH_SEPARATOR) + "'");
- }
+ Try<VolumeInfo, StatusError> result = createVolume(
+ request->name(),
+ request->capacity_range().required_bytes()
+ ? request->capacity_range().required_bytes() : 1,
+ request->capacity_range().limit_bytes()
+ ? request->capacity_range().limit_bytes()
+ : std::numeric_limits<int64_t>::max(),
+ mesos::csi::v0::devolve(request->volume_capabilities()),
+ request->parameters());
- foreach (const csi::v0::VolumeCapability& capability,
- request->volume_capabilities()) {
- if (capability != defaultVolumeCapability) {
- return Status(grpc::INVALID_ARGUMENT, "Unsupported volume capabilities");
- }
+ if (result.isError()) {
+ return result.error().status;
}
- if (request->parameters() != createParameters) {
- return Status(grpc::INVALID_ARGUMENT, "Unsupported create parameters");
- }
-
- // The volume ID is determined by `name`, so we check whether the volume
- // corresponding to `name` is compatible to the request if it exists.
- if (volumes.contains(request->name())) {
- const VolumeInfo volumeInfo = volumes.at(request->name());
-
- if (request->has_capacity_range()) {
- const csi::v0::CapacityRange& range = request->capacity_range();
-
- if (range.limit_bytes() != 0 &&
- volumeInfo.size > Bytes(range.limit_bytes())) {
- return Status(grpc::ALREADY_EXISTS, "Cannot satisfy 'limit_bytes'");
- } else if (range.required_bytes() != 0 &&
- volumeInfo.size < Bytes(range.required_bytes())) {
- return Status(grpc::ALREADY_EXISTS, "Cannot satisfy 'required_bytes'");
- }
- }
- } else {
- if (availableCapacity == Bytes(0)) {
- return Status(grpc::OUT_OF_RANGE, "Insufficient capacity");
- }
-
- VolumeInfo volumeInfo;
- volumeInfo.id = request->name();
- volumeInfo.size = min(DEFAULT_VOLUME_CAPACITY, availableCapacity);
-
- if (request->has_capacity_range()) {
- const csi::v0::CapacityRange& range = request->capacity_range();
-
- // The highest we can pick.
- Bytes limit = range.limit_bytes() != 0
- ? min(availableCapacity, Bytes(range.limit_bytes()))
- : availableCapacity;
-
- if (range.required_bytes() != 0 &&
- limit < Bytes(range.required_bytes())) {
- return Status(grpc::OUT_OF_RANGE, "Cannot satisfy 'required_bytes'");
- }
-
- volumeInfo.size = min(
- limit,
- max(DEFAULT_VOLUME_CAPACITY, Bytes(range.required_bytes())));
- }
-
- const string path = getVolumePath(volumeInfo);
-
- Try<Nothing> mkdir = os::mkdir(path);
- if (mkdir.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to create volume '" + volumeInfo.id + "': " + mkdir.error());
- }
-
- CHECK_GE(availableCapacity, volumeInfo.size);
- availableCapacity -= volumeInfo.size;
- volumes.put(volumeInfo.id, std::move(volumeInfo));
- }
-
- const VolumeInfo& volumeInfo = volumes.at(request->name());
-
- response->mutable_volume()->set_id(volumeInfo.id);
- response->mutable_volume()->set_capacity_bytes(volumeInfo.size.bytes());
+ response->mutable_volume()->set_id(result->id);
+ response->mutable_volume()->set_capacity_bytes(result->size.bytes());
(*response->mutable_volume()->mutable_attributes())["path"] =
- getVolumePath(volumeInfo);
+ getVolumePath(result.get());
return Status::OK;
}
@@ -447,26 +461,14 @@ Status TestCSIPlugin::DeleteVolume(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Validate required fields.
+ // TODO(chhsiao): Validate the request.
- if (!volumes.contains(request->volume_id())) {
- return Status::OK;
- }
+ Try<Nothing, StatusError> result = deleteVolume(request->volume_id());
- const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
- const string path = getVolumePath(volumeInfo);
-
- Try<Nothing> rmdir = os::rmdir(path);
- if (rmdir.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to delete volume '" + request->volume_id() + "': " +
- rmdir.error());
+ if (result.isError()) {
+ return result.error().status;
}
- availableCapacity += volumeInfo.size;
- volumes.erase(volumeInfo.id);
-
return Status::OK;
}
@@ -478,29 +480,19 @@ Status TestCSIPlugin::ControllerPublishVolume(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Validate required fields.
+ // TODO(chhsiao): Validate the request.
- if (!volumes.contains(request->volume_id())) {
- return Status(
- grpc::NOT_FOUND,
- "Volume '" + request->volume_id() + "' is not found");
- }
+ Try<Nothing, StatusError> result = controllerPublishVolume(
+ request->volume_id(),
+ request->node_id(),
+ mesos::csi::v0::devolve(request->volume_capability()),
+ request->readonly(),
+ request->volume_attributes());
- const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
- const string path = getVolumePath(volumeInfo);
-
- auto it = request->volume_attributes().find("path");
- if (it == request->volume_attributes().end() || it->second != path) {
- return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+ if (result.isError()) {
+ return result.error().status;
}
- if (request->node_id() != NODE_ID) {
- return Status(
- grpc::NOT_FOUND,
- "Node '" + request->node_id() + "' is not found");
- }
-
- // Do nothing.
return Status::OK;
}
@@ -512,21 +504,15 @@ Status TestCSIPlugin::ControllerUnpublishVolume(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Validate required fields.
+ // TODO(chhsiao): Validate the request.
- if (!volumes.contains(request->volume_id())) {
- return Status(
- grpc::NOT_FOUND,
- "Volume '" + request->volume_id() + "' is not found");
- }
+ Try<Nothing, StatusError> result =
+ controllerUnpublishVolume(request->volume_id(), request->node_id());
- if (request->node_id() != NODE_ID) {
- return Status(
- grpc::NOT_FOUND,
- "Node '" + request->node_id() + "' is not found");
+ if (result.isError()) {
+ return result.error().status;
}
- // Do nothing.
return Status::OK;
}
@@ -538,36 +524,24 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Validate required fields.
-
- if (!volumes.contains(request->volume_id())) {
- return Status(
- grpc::NOT_FOUND,
- "Volume '" + request->volume_id() + "' is not found");
- }
+ // TODO(chhsiao): Validate the request.
- const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
- const string path = getVolumePath(volumeInfo);
+ Try<Option<Error>, StatusError> result = validateVolumeCapabilities(
+ request->volume_id(),
+ request->volume_attributes(),
+ mesos::csi::v0::devolve(request->volume_capabilities()));
- auto it = request->volume_attributes().find("path");
- if (it == request->volume_attributes().end() || it->second != path) {
- return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+ if (result.isError()) {
+ return result.error().status;
}
- foreach (const csi::v0::VolumeCapability& capability,
- request->volume_capabilities()) {
- if (capability != defaultVolumeCapability) {
- response->set_supported(false);
- response->set_message("Unsupported volume capabilities");
-
- return Status::OK;
- }
+ if (result->isSome()) {
+ response->set_supported(false);
+ response->set_message(result->get().message);
+ } else {
+ response->set_supported(true);
}
- // TODO(chhsiao): Validate the parameters once we get CSI v1.
-
- response->set_supported(true);
-
return Status::OK;
}
@@ -579,17 +553,17 @@ Status TestCSIPlugin::ListVolumes(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Support the `max_entries` field.
- if (request->max_entries() > 0) {
- return Status(grpc::ABORTED, "Field 'max_entries' is not supported");
- }
+ Try<vector<VolumeInfo>, StatusError> result = listVolumes(
+ request->max_entries()
+ ? request->max_entries() : Option<int32_t>::none(),
+ !request->starting_token().empty()
+ ? request->starting_token() : Option<string>::none());
- // TODO(chhsiao): Support the `starting_token` fields.
- if (!request->starting_token().empty()) {
- return Status(grpc::ABORTED, "Field 'starting_token' is not supported");
+ if (result.isError()) {
+ return result.error().status;
}
- foreachvalue (const VolumeInfo& volumeInfo, volumes) {
+ foreach (const VolumeInfo& volumeInfo, result.get()) {
csi::v0::Volume* volume = response->add_entries()->mutable_volume();
volume->set_id(volumeInfo.id);
volume->set_capacity_bytes(volumeInfo.size.bytes());
@@ -607,25 +581,15 @@ Status TestCSIPlugin::GetCapacity(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- foreach (const csi::v0::VolumeCapability& capability,
- request->volume_capabilities()) {
- // We report zero capacity for any capability other than the
- // default-constructed mount volume capability since this plugin
- // does not support any filesystem types and mount flags.
- if (capability != defaultVolumeCapability) {
- response->set_available_capacity(0);
-
- return Status::OK;
- }
- }
-
- if (request->parameters() != createParameters) {
- response->set_available_capacity(0);
+ Try<Bytes, StatusError> result = getCapacity(
+ mesos::csi::v0::devolve(request->volume_capabilities()),
+ request->parameters());
- return Status::OK;
+ if (result.isError()) {
+ return result.error().status;
}
- response->set_available_capacity(availableCapacity.bytes());
+ response->set_available_capacity(result->bytes());
return Status::OK;
}
@@ -658,53 +622,17 @@ Status TestCSIPlugin::NodeStageVolume(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Validate required fields.
-
- if (!volumes.contains(request->volume_id())) {
- return Status(
- grpc::NOT_FOUND,
- "Volume '" + request->volume_id() + "' is not found");
- }
-
- const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
- const string path = getVolumePath(volumeInfo);
-
- auto it = request->volume_attributes().find("path");
- if (it == request->volume_attributes().end() || it->second != path) {
- return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
- }
-
- if (!os::exists(request->staging_target_path())) {
- return Status(
- grpc::INVALID_ARGUMENT,
- "Target path '" + request->staging_target_path() + "' is not found");
- }
+ // TODO(chhsiao): Validate the request.
- Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
- if (table.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to get mount table: " + table.error());
- }
-
- foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
- if (entry.target == request->staging_target_path()) {
- return Status::OK;
- }
- }
-
- Try<Nothing> mount = fs::mount(
- path,
+ Try<Nothing, StatusError> result = nodeStageVolume(
+ request->volume_id(),
+ request->publish_info(),
request->staging_target_path(),
- None(),
- MS_BIND,
- None());
+ mesos::csi::v0::devolve(request->volume_capability()),
+ request->volume_attributes());
- if (mount.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to mount from '" + path + "' to '" +
- request->staging_target_path() + "': " + mount.error());
+ if (result.isError()) {
+ return result.error().status;
}
return Status::OK;
@@ -718,39 +646,13 @@ Status TestCSIPlugin::NodeUnstageVolume(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Validate required fields.
-
- if (!volumes.contains(request->volume_id())) {
- return Status(
- grpc::NOT_FOUND,
- "Volume '" + request->volume_id() + "' is not found");
- }
-
- Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
- if (table.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to get mount table: " + table.error());
- }
-
- bool found = false;
- foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
- if (entry.target == request->staging_target_path()) {
- found = true;
- break;
- }
- }
+ // TODO(chhsiao): Validate the request.
- if (!found) {
- return Status::OK;
- }
+ Try<Nothing, StatusError> result =
+ nodeUnstageVolume(request->volume_id(), request->staging_target_path());
- Try<Nothing> unmount = fs::unmount(request->staging_target_path());
- if (unmount.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to unmount '" + request->staging_target_path() +
- "': " + unmount.error());
+ if (result.isError()) {
+ return result.error().status;
}
return Status::OK;
@@ -764,73 +666,19 @@ Status TestCSIPlugin::NodePublishVolume(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- // TODO(chhsiao): Validate required fields.
-
- if (!volumes.contains(request->volume_id())) {
- return Status(
- grpc::NOT_FOUND,
- "Volume '" + request->volume_id() + "' is not found");
- }
-
- const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
- const string path = getVolumePath(volumeInfo);
-
- auto it = request->volume_attributes().find("path");
- if (it == request->volume_attributes().end() || it->second != path) {
- return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
- }
-
- if (!os::exists(request->target_path())) {
- return Status(
- grpc::INVALID_ARGUMENT,
- "Target path '" + request->target_path() + "' is not found");
- }
-
- if (request->staging_target_path().empty()) {
- return Status(
- grpc::FAILED_PRECONDITION,
- "Expecting 'staging_target_path' to be set");
- }
-
- Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
- if (table.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to get mount table: " + table.error());
- }
-
- bool found = false;
- foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
- if (entry.target == request->staging_target_path()) {
- found = true;
- break;
- }
- }
-
- if (!found) {
- return Status(
- grpc::FAILED_PRECONDITION,
- "Volume '" + request->volume_id() + "' has not been staged yet");
- }
-
- foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
- if (entry.target == request->target_path()) {
- return Status::OK;
- }
- }
+ // TODO(chhsiao): Validate the request.
- Try<Nothing> mount = fs::mount(
+ Try<Nothing, StatusError> result = nodePublishVolume(
+ request->volume_id(),
+ request->publish_info(),
request->staging_target_path(),
request->target_path(),
- None(),
- MS_BIND | (request->readonly() ? MS_RDONLY : 0),
- None());
+ mesos::csi::v0::devolve(request->volume_capability()),
+ request->readonly(),
+ request->volume_attributes());
- if (mount.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to mount from '" + path + "' to '" +
- request->target_path() + "': " + mount.error());
+ if (result.isError()) {
+ return result.error().status;
}
return Status::OK;
@@ -844,37 +692,13 @@ Status TestCSIPlugin::NodeUnpublishVolume(
{
LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
- if (!volumes.contains(request->volume_id())) {
- return Status(
- grpc::NOT_FOUND,
- "Volume '" + request->volume_id() + "' is not found");
- }
-
- Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
- if (table.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to get mount table: " + table.error());
- }
-
- bool found = false;
- foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
- if (entry.target == request->target_path()) {
- found = true;
- break;
- }
- }
+ // TODO(chhsiao): Validate the request.
- if (!found) {
- return Status::OK;
- }
+ Try<Nothing, StatusError> result =
+ nodeUnpublishVolume(request->volume_id(), request->target_path());
- Try<Nothing> unmount = fs::unmount(request->target_path());
- if (unmount.isError()) {
- return Status(
- grpc::INTERNAL,
- "Failed to unmount '" + request->target_path() +
- "': " + unmount.error());
+ if (result.isError()) {
+ return result.error().status;
}
return Status::OK;
@@ -940,6 +764,443 @@ Try<TestCSIPlugin::VolumeInfo> TestCSIPlugin::parseVolumePath(
}
+Try<TestCSIPlugin::VolumeInfo, StatusError> TestCSIPlugin::createVolume(
+ const string& name,
+ const Bytes& requiredBytes,
+ const Bytes& limitBytes,
+ const RepeatedPtrField<VolumeCapability>& capabilities,
+ const Map<string, string> parameters)
+{
+ // The volume ID is determined by `name`, with reserved characters escaped.
+ const string volumeId = http::encode(name);
+
+ foreach (const VolumeCapability& capability, capabilities) {
+ if (capability != defaultVolumeCapability) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Unsupported volume capabilities"));
+ }
+ }
+
+ if (parameters != createParameters) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Unsupported create parameters"));
+ }
+
+ if (volumes.contains(volumeId)) {
+ const VolumeInfo& volumeInfo = volumes.at(volumeId);
+
+ if (volumeInfo.size > limitBytes) {
+ return StatusError(Status(
+ grpc::ALREADY_EXISTS, "Cannot satisfy limit bytes"));
+ }
+
+ if (volumeInfo.size < requiredBytes) {
+ return StatusError(Status(
+ grpc::ALREADY_EXISTS, "Cannot satisfy required bytes"));
+ }
+
+ return volumeInfo;
+ } else {
+ if (availableCapacity < requiredBytes) {
+ return StatusError(Status(grpc::OUT_OF_RANGE, "Insufficient capacity"));
+ }
+
+ VolumeInfo volumeInfo;
+ volumeInfo.id = volumeId;
+
+ // We assume that `requiredBytes <= limitBytes` has been verified.
+ const Bytes defaultSize = min(availableCapacity, DEFAULT_VOLUME_CAPACITY);
+ volumeInfo.size = min(max(defaultSize, requiredBytes), limitBytes);
+
+ const string path = getVolumePath(volumeInfo);
+
+ Try<Nothing> mkdir = os::mkdir(path);
+ if (mkdir.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL,
+ "Failed to create volume '" + volumeInfo.id + "': " + mkdir.error()));
+ }
+
+ CHECK_GE(availableCapacity, volumeInfo.size);
+ availableCapacity -= volumeInfo.size;
+ volumes.put(volumeInfo.id, volumeInfo);
+
+ return volumeInfo;
+ }
+
+ UNREACHABLE();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::deleteVolume(const string& volumeId)
+{
+ if (!volumes.contains(volumeId)) {
+ // Return a success for idempotency.
+ return Nothing();
+ }
+
+ const VolumeInfo& volumeInfo = volumes.at(volumeId);
+ const string path = getVolumePath(volumeInfo);
+
+ Try<Nothing> rmdir = os::rmdir(path);
+ if (rmdir.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL,
+ "Failed to delete volume '" + volumeId + "': " + rmdir.error()));
+ }
+
+ availableCapacity += volumeInfo.size;
+ volumes.erase(volumeInfo.id);
+
+ return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::controllerPublishVolume(
+ const string& volumeId,
+ const string& nodeId,
+ const VolumeCapability& capability,
+ bool readonly,
+ const Map<string, string>& volumeContext)
+{
+ if (!volumes.contains(volumeId)) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+ }
+
+ if (nodeId != NODE_ID) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Node '" + nodeId + "' does not exist"));
+ }
+
+ if (capability != defaultVolumeCapability) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
+ }
+
+ if (readonly) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Unsupported read-only mode"));
+ }
+
+ const VolumeInfo& volumeInfo = volumes.at(volumeId);
+ const string path = getVolumePath(volumeInfo);
+
+ if (!volumeContext.count("path") || volumeContext.at("path") != path) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Invalid volume context"));
+ }
+
+ // Do nothing.
+ return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::controllerUnpublishVolume(
+ const string& volumeId, const string& nodeId)
+{
+ if (!volumes.contains(volumeId)) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+ }
+
+ if (nodeId != NODE_ID) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Node '" + nodeId + "' does not exist"));
+ }
+
+ // Do nothing.
+ return Nothing();
+}
+
+
+Try<Option<Error>, StatusError> TestCSIPlugin::validateVolumeCapabilities(
+ const string& volumeId,
+ const Map<string, string>& volumeContext,
+ const RepeatedPtrField<VolumeCapability>& capabilities,
+ const Option<Map<string, string>>& parameters)
+{
+ if (!volumes.contains(volumeId)) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+ }
+
+ const VolumeInfo& volumeInfo = volumes.at(volumeId);
+ const string path = getVolumePath(volumeInfo);
+
+ if (!volumeContext.count("path") || volumeContext.at("path") != path) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Invalid volume context"));
+ }
+
+ foreach (const VolumeCapability& capability, capabilities) {
+ if (capability != defaultVolumeCapability) {
+ return Some(Error("Unsupported volume capabilities"));
+ }
+ }
+
+ if (parameters.isSome() && parameters.get() != createParameters) {
+ return Some(Error("Mismatched parameters"));
+ }
+
+ return None();
+}
+
+
+Try<vector<TestCSIPlugin::VolumeInfo>, StatusError> TestCSIPlugin::listVolumes(
+ const Option<int32_t>& maxEntries,
+ const Option<string>& startingToken)
+{
+ // TODO(chhsiao): Support max entries.
+ if (maxEntries.isSome()) {
+ return StatusError(Status(
+ grpc::ABORTED, "Specifying max entries is not supported"));
+ }
+
+ // TODO(chhsiao): Support starting token.
+ if (startingToken.isSome()) {
+ return StatusError(Status(
+ grpc::ABORTED, "Specifying starting token is not supported"));
+ }
+
+ return volumes.values();
+}
+
+
+Try<Bytes, StatusError> TestCSIPlugin::getCapacity(
+ const RepeatedPtrField<VolumeCapability>& capabilities,
+ const Map<string, string>& parameters)
+{
+ // We report zero capacity if any capability other than the default mount
+ // volume capability is given. If no capacity is given, the total available
+ // capacity will be returned.
+ foreach (const VolumeCapability& capability, capabilities) {
+ if (capability != defaultVolumeCapability) {
+ return Bytes(0);
+ }
+ }
+
+ if (parameters != createParameters) {
+ return Bytes(0);
+ }
+
+ return availableCapacity;
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::nodeStageVolume(
+ const string& volumeId,
+ const Map<string, string>& publishContext,
+ const string& stagingPath,
+ const VolumeCapability& capability,
+ const Map<string, string>& volumeContext)
+{
+ if (!volumes.contains(volumeId)) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+ }
+
+ if (!publishContext.empty()) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Invalid publish context"));
+ }
+
+ if (!os::exists(stagingPath)) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT,
+ "Staging path '" + stagingPath + "' does not exist"));
+ }
+
+ if (capability != defaultVolumeCapability) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
+ }
+
+ const VolumeInfo& volumeInfo = volumes.at(volumeId);
+ const string path = getVolumePath(volumeInfo);
+
+ if (!volumeContext.count("path") || volumeContext.at("path") != path) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Invalid volume context"));
+ }
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ if (table.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL, "Failed to get mount table: " + table.error()));
+ }
+
+ if (std::any_of(
+ table->entries.begin(),
+ table->entries.end(),
+ [&](const fs::MountInfoTable::Entry& entry) {
+ return entry.target == stagingPath;
+ })) {
+ return Nothing();
+ }
+
+ Try<Nothing> mount = fs::mount(path, stagingPath, None(), MS_BIND, None());
+ if (mount.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL,
+ "Failed to mount from '" + path + "' to '" + stagingPath +
+ "': " + mount.error()));
+ }
+
+ return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::nodeUnstageVolume(
+ const string& volumeId, const string& stagingPath)
+{
+ if (!volumes.contains(volumeId)) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+ }
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ if (table.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL, "Failed to get mount table: " + table.error()));
+ }
+
+ if (std::none_of(
+ table->entries.begin(),
+ table->entries.end(),
+ [&](const fs::MountInfoTable::Entry& entry) {
+ return entry.target == stagingPath;
+ })) {
+ return Nothing();
+ }
+
+ Try<Nothing> unmount = fs::unmount(stagingPath);
+ if (unmount.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL,
+ "Failed to unmount '" + stagingPath + "': " + unmount.error()));
+ }
+
+ return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::nodePublishVolume(
+ const string& volumeId,
+ const Map<string, string>& publishContext,
+ const string& stagingPath,
+ const string& targetPath,
+ const VolumeCapability& capability,
+ bool readonly,
+ const Map<string, string>& volumeContext)
+{
+ if (!volumes.contains(volumeId)) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+ }
+
+ if (!publishContext.empty()) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Invalid publish context"));
+ }
+
+ if (!os::exists(targetPath)) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT,
+ "Target path '" + targetPath + "' does not exist"));
+ }
+
+ if (capability != defaultVolumeCapability) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
+ }
+
+ const VolumeInfo& volumeInfo = volumes.at(volumeId);
+ const string path = getVolumePath(volumeInfo);
+
+ if (!volumeContext.count("path") || volumeContext.at("path") != path) {
+ return StatusError(Status(
+ grpc::INVALID_ARGUMENT, "Invalid volume context"));
+ }
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ if (table.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL, "Failed to get mount table: " + table.error()));
+ }
+
+ if (std::none_of(
+ table->entries.begin(),
+ table->entries.end(),
+ [&](const fs::MountInfoTable::Entry& entry) {
+ return entry.target == stagingPath;
+ })) {
+ return StatusError(Status(
+ grpc::FAILED_PRECONDITION,
+ "Volume '" + volumeId + "' has not been staged yet"));
+ }
+
+ if (std::any_of(
+ table->entries.begin(),
+ table->entries.end(),
+ [&](const fs::MountInfoTable::Entry& entry) {
+ return entry.target == targetPath;
+ })) {
+ return Nothing();
+ }
+
+ Try<Nothing> mount = fs::mount(
+ stagingPath,
+ targetPath,
+ None(),
+ MS_BIND | (readonly ? MS_RDONLY : 0),
+ None());
+
+ if (mount.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL,
+ "Failed to mount from '" + stagingPath + "' to '" + targetPath +
+ "': " + mount.error()));
+ }
+
+ return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::nodeUnpublishVolume(
+ const string& volumeId, const string& targetPath)
+{
+ if (!volumes.contains(volumeId)) {
+ return StatusError(Status(
+ grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+ }
+
+ Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+ if (table.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL, "Failed to get mount table: " + table.error()));
+ }
+
+ if (std::none_of(
+ table->entries.begin(),
+ table->entries.end(),
+ [&](const fs::MountInfoTable::Entry& entry) {
+ return entry.target == targetPath;
+ })) {
+ return Nothing();
+ }
+
+ Try<Nothing> unmount = fs::unmount(targetPath);
+ if (unmount.isError()) {
+ return StatusError(Status(
+ grpc::INTERNAL,
+ "Failed to unmount '" + targetPath + "': " + unmount.error()));
+ }
+
+ return Nothing();
+}
+
+
// Serves CSI calls from the given endpoint through forwarding the calls to
// another CSI endpoint and returning back the results.
class CSIProxy