You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:15:12 UTC
[mesos] 02/06: Preliminary SLRP refactoring for RPC retry.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 994f2089da451981f2e5a236b2a69965d0e745fc
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Jan 22 20:18:06 2019 -0800
Preliminary SLRP refactoring for RPC retry.
This patch refactors the `StorageLocalResourceProvider::call` function
to obtain the latest service future through `getService` before making
the actual RPC call. The subsequent patch would utilize this to support
RPC retry across plugin restarts.
Review: https://reviews.apache.org/r/69811
---
src/resource_provider/storage/provider.cpp | 821 ++++++++++++++---------------
1 file changed, 396 insertions(+), 425 deletions(-)
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f7a8634..f9f9312 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -390,12 +390,32 @@ private:
void reconcileOperations(
const Event::ReconcileOperations& reconcile);
- template <csi::v0::RPC rpc>
+ // Wrapper functions to make CSI calls and update RPC metrics.
+ //
+ // The call is made asynchronously and thus no guarantee is provided on the
+ // order in which calls are sent. Callers need to either ensure to not have
+ // multiple conflicting calls in flight, or treat results idempotently.
+ //
+ // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
+ // calls on the same volume, and 2) no profile update while there are ongoing
+ // `CREATE_DISK` or `DESTROY_DISK` operations.
+ //
+ // NOTE: Since this function uses `getService` to obtain the latest service
+ // future, which depends on probe results, it is disabled for making probe
+ // calls; `_call` should be used directly instead.
+ template <
+ csi::v0::RPC rpc,
+ typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0>
Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
+ const ContainerID& containerId,
+ const typename csi::v0::RPCTraits<rpc>::request_type& request);
+
+ template <csi::v0::RPC rpc>
+ Future<typename csi::v0::RPCTraits<rpc>::response_type> _call(
csi::v0::Client client,
- typename csi::v0::RPCTraits<rpc>::request_type&& request);
+ const typename csi::v0::RPCTraits<rpc>::request_type& request);
- Future<csi::v0::Client> connect(const string& endpoint);
+ Future<csi::v0::Client> waitService(const string& endpoint);
Future<csi::v0::Client> getService(const ContainerID& containerId);
Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
Future<Nothing> waitContainer(const ContainerID& containerId);
@@ -1847,15 +1867,29 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
}
-template <csi::v0::RPC rpc>
+template <
+ csi::v0::RPC rpc,
+ typename std::enable_if<rpc != csi::v0::PROBE, int>::type>
Future<typename csi::v0::RPCTraits<rpc>::response_type>
StorageLocalResourceProviderProcess::call(
+ const ContainerID& containerId,
+ const typename csi::v0::RPCTraits<rpc>::request_type& request)
+{
+ // Get the latest service future before making the call.
+ return getService(containerId)
+ .then(defer(self(), &Self::_call<rpc>, lambda::_1, request));
+}
+
+
+template <csi::v0::RPC rpc>
+Future<typename csi::v0::RPCTraits<rpc>::response_type>
+StorageLocalResourceProviderProcess::_call(
csi::v0::Client client,
- typename csi::v0::RPCTraits<rpc>::request_type&& request)
+ const typename csi::v0::RPCTraits<rpc>::request_type& request)
{
++metrics.csi_plugin_rpcs_pending.at(rpc);
- return client.call<rpc>(std::move(request))
+ return client.call<rpc>(request)
.onAny(defer(self(), [=](
const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) {
--metrics.csi_plugin_rpcs_pending.at(rpc);
@@ -1873,18 +1907,18 @@ StorageLocalResourceProviderProcess::call(
// Returns a future of a CSI client that waits for the endpoint socket
// to appear if necessary, then connects to the socket and check its
// readiness.
-Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService(
const string& endpoint)
{
- Future<csi::v0::Client> future;
+ Future<csi::v0::Client> service;
if (os::exists(endpoint)) {
- future = csi::v0::Client("unix://" + endpoint, runtime);
+ service = csi::v0::Client("unix://" + endpoint, runtime);
} else {
// Wait for the endpoint socket to appear until the timeout expires.
Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
- future = loop(
+ service = loop(
self(),
[=]() -> Future<Nothing> {
if (timeout.expired()) {
@@ -1902,13 +1936,10 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
});
}
- return future
+ return service
.then(defer(self(), [=](csi::v0::Client client) {
- return call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
- .then(defer(self(), [=](
- const csi::v0::ProbeResponse& response) -> csi::v0::Client {
- return client;
- }));
+ return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
+ .then([=]() -> csi::v0::Client { return client; });
}));
}
@@ -2018,7 +2049,7 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
<< " type '" << info.storage().plugin().type() << "' and "
<< " name '" << info.storage().plugin().name() << "'";
- CHECK(services.at(containerId)->associate(connect(endpointPath)));
+ CHECK(services.at(containerId)->associate(waitService(endpointPath)));
return services.at(containerId)->future()
.then([] { return Nothing(); });
})),
@@ -2179,31 +2210,23 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
{
CHECK_SOME(nodeContainerId);
- return getService(nodeContainerId.get())
- .then(defer(self(), [=](csi::v0::Client client) {
- // Get the plugin info.
- return call<csi::v0::GET_PLUGIN_INFO>(
- client, csi::v0::GetPluginInfoRequest())
- .then(defer(self(), [=](
- const csi::v0::GetPluginInfoResponse& response) {
- pluginInfo = response;
+ // Get the plugin info.
+ return call<csi::v0::GET_PLUGIN_INFO>(
+ nodeContainerId.get(), csi::v0::GetPluginInfoRequest())
+ .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
+ pluginInfo = response;
- LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
+ LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
- // Get the latest service future before proceeding to the next step.
- return getService(nodeContainerId.get());
- }));
- }))
- .then(defer(self(), [=](csi::v0::Client client) {
// Get the plugin capabilities.
return call<csi::v0::GET_PLUGIN_CAPABILITIES>(
- client, csi::v0::GetPluginCapabilitiesRequest())
- .then(defer(self(), [=](
- const csi::v0::GetPluginCapabilitiesResponse& response) {
- pluginCapabilities = response.capabilities();
+ nodeContainerId.get(), csi::v0::GetPluginCapabilitiesRequest());
+ }))
+ .then(defer(self(), [=](
+ const csi::v0::GetPluginCapabilitiesResponse& response) {
+ pluginCapabilities = response.capabilities();
- return Nothing();
- }));
+ return Nothing();
}));
}
@@ -2222,36 +2245,29 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
}
- return getService(controllerContainerId.get())
- .then(defer(self(), [=](csi::v0::Client client) {
- // Get the controller plugin info and check for consistency.
- return call<csi::v0::GET_PLUGIN_INFO>(
- client, csi::v0::GetPluginInfoRequest())
- .then(defer(self(), [=](
- const csi::v0::GetPluginInfoResponse& response) {
- LOG(INFO) << "Controller plugin loaded: " << stringify(response);
-
- if (pluginInfo->name() != response.name() ||
- pluginInfo->vendor_version() != response.vendor_version()) {
- LOG(WARNING)
- << "Inconsistent controller and node plugin components. Please "
- "check with the plugin vendor to ensure compatibility.";
- }
+ // Get the controller plugin info and check for consistency.
+ return call<csi::v0::GET_PLUGIN_INFO>(
+ controllerContainerId.get(), csi::v0::GetPluginInfoRequest())
+ .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) {
+ LOG(INFO) << "Controller plugin loaded: " << stringify(response);
+
+ if (pluginInfo->name() != response.name() ||
+ pluginInfo->vendor_version() != response.vendor_version()) {
+ LOG(WARNING)
+ << "Inconsistent controller and node plugin components. Please check "
+ "with the plugin vendor to ensure compatibility.";
+ }
- // Get the latest service future before proceeding to the next step.
- return getService(controllerContainerId.get());
- }));
- }))
- .then(defer(self(), [=](csi::v0::Client client) {
// Get the controller capabilities.
return call<csi::v0::CONTROLLER_GET_CAPABILITIES>(
- client, csi::v0::ControllerGetCapabilitiesRequest())
- .then(defer(self(), [=](
- const csi::v0::ControllerGetCapabilitiesResponse& response) {
- controllerCapabilities = response.capabilities();
+ controllerContainerId.get(),
+ csi::v0::ControllerGetCapabilitiesRequest());
+ }))
+ .then(defer(self(), [=](
+ const csi::v0::ControllerGetCapabilitiesResponse& response) {
+ controllerCapabilities = response.capabilities();
- return Nothing();
- }));
+ return Nothing();
}));
}
@@ -2262,32 +2278,25 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
{
CHECK_SOME(nodeContainerId);
- return getService(nodeContainerId.get())
- .then(defer(self(), [=](csi::v0::Client client) {
- // Get the node capabilities.
- return call<csi::v0::NODE_GET_CAPABILITIES>(
- client, csi::v0::NodeGetCapabilitiesRequest())
- .then(defer(self(), [=](
- const csi::v0::NodeGetCapabilitiesResponse& response)
- -> Future<csi::v0::Client> {
- nodeCapabilities = response.capabilities();
-
- // Get the latest service future before proceeding to the next step.
- return getService(nodeContainerId.get());
- }))
- .then(defer(self(), [=](csi::v0::Client client) -> Future<Nothing> {
- if (!controllerCapabilities.publishUnpublishVolume) {
- return Nothing();
- }
+ // Get the node capabilities.
+ return call<csi::v0::NODE_GET_CAPABILITIES>(
+ nodeContainerId.get(), csi::v0::NodeGetCapabilitiesRequest())
+ .then(defer(self(), [=](
+ const csi::v0::NodeGetCapabilitiesResponse& response)
+ -> Future<Nothing> {
+ nodeCapabilities = response.capabilities();
- // Get the node ID.
- return call<csi::v0::NODE_GET_ID>(client, csi::v0::NodeGetIdRequest())
- .then(defer(self(), [=](
- const csi::v0::NodeGetIdResponse& response) {
- nodeId = response.node_id();
+ if (!controllerCapabilities.publishUnpublishVolume) {
+ return Nothing();
+ }
- return Nothing();
- }));
+ // Get the node ID.
+ return call<csi::v0::NODE_GET_ID>(
+ nodeContainerId.get(), csi::v0::NodeGetIdRequest())
+ .then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) {
+ nodeId = response.node_id();
+
+ return Nothing();
}));
}));
}
@@ -2295,6 +2304,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
// Transitions the state of the specified volume from `CREATED` or
// `CONTROLLER_PUBLISH` to `NODE_READY`.
+//
// NOTE: This can only be called after `prepareControllerService` and
// `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
@@ -2312,47 +2322,42 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
return Nothing();
}
- CHECK_SOME(controllerContainerId);
- CHECK_SOME(nodeId);
+ if (volume.state.state() == VolumeState::CREATED) {
+ volume.state.set_state(VolumeState::CONTROLLER_PUBLISH);
+ checkpointVolumeState(volumeId);
+ }
- return getService(controllerContainerId.get())
- .then(defer(self(), [this, volumeId](
- csi::v0::Client client) -> Future<Nothing> {
- VolumeData& volume = volumes.at(volumeId);
+ CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state());
- if (volume.state.state() == VolumeState::CREATED) {
- volume.state.set_state(VolumeState::CONTROLLER_PUBLISH);
- checkpointVolumeState(volumeId);
- }
+ CHECK_SOME(nodeId);
- CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state());
+ csi::v0::ControllerPublishVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_node_id(nodeId.get());
+ *request.mutable_volume_capability() = volume.state.volume_capability();
+ request.set_readonly(false);
+ *request.mutable_volume_attributes() = volume.state.volume_attributes();
- csi::v0::ControllerPublishVolumeRequest request;
- request.set_volume_id(volumeId);
- request.set_node_id(nodeId.get());
- request.mutable_volume_capability()
- ->CopyFrom(volume.state.volume_capability());
- request.set_readonly(false);
- *request.mutable_volume_attributes() = volume.state.volume_attributes();
+ CHECK_SOME(controllerContainerId);
- return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
- client, std::move(request))
- .then(defer(self(), [this, volumeId](
- const csi::v0::ControllerPublishVolumeResponse& response) {
- VolumeData& volume = volumes.at(volumeId);
+ return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
+ controllerContainerId.get(), std::move(request))
+ .then(defer(self(), [this, volumeId](
+ const csi::v0::ControllerPublishVolumeResponse& response) {
+ VolumeData& volume = volumes.at(volumeId);
- volume.state.set_state(VolumeState::NODE_READY);
- *volume.state.mutable_publish_info() = response.publish_info();
- checkpointVolumeState(volumeId);
+ volume.state.set_state(VolumeState::NODE_READY);
+ *volume.state.mutable_publish_info() = response.publish_info();
+ checkpointVolumeState(volumeId);
- return Nothing();
- }));
+ return Nothing();
}));
}
// Transitions the state of the specified volume from `NODE_READY`,
// `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`.
+//
// NOTE: This can only be called after `prepareControllerService` and
// `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
@@ -2370,45 +2375,42 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
return Nothing();
}
- CHECK_SOME(controllerContainerId);
- CHECK_SOME(nodeId);
+ // A previously failed `ControllerPublishVolume` call can be recovered through
+ // the current `ControllerUnpublishVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
+ if (volume.state.state() == VolumeState::NODE_READY ||
+ volume.state.state() == VolumeState::CONTROLLER_PUBLISH) {
+ volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH);
+ checkpointVolumeState(volumeId);
+ }
- return getService(controllerContainerId.get())
- .then(defer(self(), [this, volumeId](csi::v0::Client client) {
- VolumeData& volume = volumes.at(volumeId);
+ CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state());
- // A previously failed `ControllerPublishVolume` call can be recovered
- // through the current `ControllerUnpublishVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT
- if (volume.state.state() == VolumeState::NODE_READY ||
- volume.state.state() == VolumeState::CONTROLLER_PUBLISH) {
- volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH);
- checkpointVolumeState(volumeId);
- }
+ CHECK_SOME(nodeId);
- CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state());
+ csi::v0::ControllerUnpublishVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_node_id(nodeId.get());
- csi::v0::ControllerUnpublishVolumeRequest request;
- request.set_volume_id(volumeId);
- request.set_node_id(nodeId.get());
+ CHECK_SOME(controllerContainerId);
- return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
- client, std::move(request))
- .then(defer(self(), [this, volumeId] {
- VolumeData& volume = volumes.at(volumeId);
+ return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
+ controllerContainerId.get(), std::move(request))
+ .then(defer(self(), [this, volumeId] {
+ VolumeData& volume = volumes.at(volumeId);
- volume.state.set_state(VolumeState::CREATED);
- volume.state.mutable_publish_info()->clear();
- checkpointVolumeState(volumeId);
+ volume.state.set_state(VolumeState::CREATED);
+ volume.state.mutable_publish_info()->clear();
+ checkpointVolumeState(volumeId);
- return Nothing();
- }));
+ return Nothing();
}));
}
// Transitions the state of the specified volume from `NODE_READY` or
// `NODE_STAGE` to `VOL_READY`.
+//
// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
const string& volumeId)
@@ -2426,58 +2428,53 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
return Nothing();
}
- CHECK_SOME(nodeContainerId);
-
- return getService(nodeContainerId.get())
- .then(defer(self(), [this, volumeId](
- csi::v0::Client client) -> Future<Nothing> {
- VolumeData& volume = volumes.at(volumeId);
+ const string stagingPath = csi::paths::getMountStagingPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
+ volumeId);
- const string stagingPath = csi::paths::getMountStagingPath(
- csi::paths::getMountRootDir(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name()),
- volumeId);
+ Try<Nothing> mkdir = os::mkdir(stagingPath);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create mount staging path '" + stagingPath +
+ "': " + mkdir.error());
+ }
- Try<Nothing> mkdir = os::mkdir(stagingPath);
- if (mkdir.isError()) {
- return Failure(
- "Failed to create mount staging path '" + stagingPath + "': " +
- mkdir.error());
- }
+ if (volume.state.state() == VolumeState::NODE_READY) {
+ volume.state.set_state(VolumeState::NODE_STAGE);
+ checkpointVolumeState(volumeId);
+ }
- if (volume.state.state() == VolumeState::NODE_READY) {
- volume.state.set_state(VolumeState::NODE_STAGE);
- checkpointVolumeState(volumeId);
- }
+ CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
- CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state());
+ csi::v0::NodeStageVolumeRequest request;
+ request.set_volume_id(volumeId);
+ *request.mutable_publish_info() = volume.state.publish_info();
+ request.set_staging_target_path(stagingPath);
+ *request.mutable_volume_capability() = volume.state.volume_capability();
+ *request.mutable_volume_attributes() = volume.state.volume_attributes();
- csi::v0::NodeStageVolumeRequest request;
- request.set_volume_id(volumeId);
- *request.mutable_publish_info() = volume.state.publish_info();
- request.set_staging_target_path(stagingPath);
- request.mutable_volume_capability()
- ->CopyFrom(volume.state.volume_capability());
- *request.mutable_volume_attributes() = volume.state.volume_attributes();
+ CHECK_SOME(nodeContainerId);
- return call<csi::v0::NODE_STAGE_VOLUME>(client, std::move(request))
- .then(defer(self(), [this, volumeId] {
- VolumeData& volume = volumes.at(volumeId);
+ return call<csi::v0::NODE_STAGE_VOLUME>(
+ nodeContainerId.get(), std::move(request))
+ .then(defer(self(), [this, volumeId] {
+ VolumeData& volume = volumes.at(volumeId);
- volume.state.set_state(VolumeState::VOL_READY);
- volume.state.set_boot_id(bootId);
- checkpointVolumeState(volumeId);
+ volume.state.set_state(VolumeState::VOL_READY);
+ volume.state.set_boot_id(bootId);
+ checkpointVolumeState(volumeId);
- return Nothing();
- }));
+ return Nothing();
}));
}
// Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE`
// or `NODE_UNSTAGE` to `NODE_READY`.
+//
// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
const string& volumeId)
@@ -2495,173 +2492,165 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
return Nothing();
}
- CHECK_SOME(nodeContainerId);
+ const string stagingPath = csi::paths::getMountStagingPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
+ volumeId);
- return getService(nodeContainerId.get())
- .then(defer(self(), [this, volumeId](csi::v0::Client client) {
- VolumeData& volume = volumes.at(volumeId);
+ CHECK(os::exists(stagingPath));
- const string stagingPath = csi::paths::getMountStagingPath(
- csi::paths::getMountRootDir(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name()),
- volumeId);
-
- CHECK(os::exists(stagingPath));
-
- // A previously failed `NodeStageVolume` call can be recovered through the
- // current `NodeUnstageVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
- if (volume.state.state() == VolumeState::VOL_READY ||
- volume.state.state() == VolumeState::NODE_STAGE) {
- volume.state.set_state(VolumeState::NODE_UNSTAGE);
- checkpointVolumeState(volumeId);
- }
+ // A previously failed `NodeStageVolume` call can be recovered through the
+ // current `NodeUnstageVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT
+ if (volume.state.state() == VolumeState::VOL_READY ||
+ volume.state.state() == VolumeState::NODE_STAGE) {
+ volume.state.set_state(VolumeState::NODE_UNSTAGE);
+ checkpointVolumeState(volumeId);
+ }
- CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
+ CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state());
- csi::v0::NodeUnstageVolumeRequest request;
- request.set_volume_id(volumeId);
- request.set_staging_target_path(stagingPath);
+ csi::v0::NodeUnstageVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_staging_target_path(stagingPath);
- return call<csi::v0::NODE_UNSTAGE_VOLUME>(client, std::move(request))
- .then(defer(self(), [this, volumeId] {
- VolumeData& volume = volumes.at(volumeId);
+ CHECK_SOME(nodeContainerId);
- volume.state.set_state(VolumeState::NODE_READY);
- volume.state.clear_boot_id();
- checkpointVolumeState(volumeId);
+ return call<csi::v0::NODE_UNSTAGE_VOLUME>(
+ nodeContainerId.get(), std::move(request))
+ .then(defer(self(), [this, volumeId] {
+ VolumeData& volume = volumes.at(volumeId);
- return Nothing();
- }));
+ volume.state.set_state(VolumeState::NODE_READY);
+ volume.state.clear_boot_id();
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
}));
}
// Transitions the state of the specified volume from `VOL_READY` or
// `NODE_PUBLISH` to `PUBLISHED`.
+//
// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
const string& volumeId)
{
CHECK(volumes.contains(volumeId));
- CHECK_SOME(nodeContainerId);
+ VolumeData& volume = volumes.at(volumeId);
- return getService(nodeContainerId.get())
- .then(defer(self(), [this, volumeId](
- csi::v0::Client client) -> Future<Nothing> {
- VolumeData& volume = volumes.at(volumeId);
+ const string targetPath = csi::paths::getMountTargetPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
+ volumeId);
- const string targetPath = csi::paths::getMountTargetPath(
- csi::paths::getMountRootDir(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name()),
- volumeId);
+ Try<Nothing> mkdir = os::mkdir(targetPath);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create mount target path '" + targetPath +
+ "': " + mkdir.error());
+ }
- Try<Nothing> mkdir = os::mkdir(targetPath);
- if (mkdir.isError()) {
- return Failure(
- "Failed to create mount target path '" + targetPath + "': " +
- mkdir.error());
- }
+ if (volume.state.state() == VolumeState::VOL_READY) {
+ volume.state.set_state(VolumeState::NODE_PUBLISH);
+ checkpointVolumeState(volumeId);
+ }
- if (volume.state.state() == VolumeState::VOL_READY) {
- volume.state.set_state(VolumeState::NODE_PUBLISH);
- checkpointVolumeState(volumeId);
- }
+ CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state());
- CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state());
+ csi::v0::NodePublishVolumeRequest request;
+ request.set_volume_id(volumeId);
+ *request.mutable_publish_info() = volume.state.publish_info();
+ request.set_target_path(targetPath);
+ *request.mutable_volume_capability() = volume.state.volume_capability();
+ request.set_readonly(false);
+ *request.mutable_volume_attributes() = volume.state.volume_attributes();
- csi::v0::NodePublishVolumeRequest request;
- request.set_volume_id(volumeId);
- *request.mutable_publish_info() = volume.state.publish_info();
- request.set_target_path(targetPath);
- request.mutable_volume_capability()
- ->CopyFrom(volume.state.volume_capability());
- request.set_readonly(false);
- *request.mutable_volume_attributes() = volume.state.volume_attributes();
+ if (nodeCapabilities.stageUnstageVolume) {
+ const string stagingPath = csi::paths::getMountStagingPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
+ volumeId);
- if (nodeCapabilities.stageUnstageVolume) {
- const string stagingPath = csi::paths::getMountStagingPath(
- csi::paths::getMountRootDir(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name()),
- volumeId);
+ CHECK(os::exists(stagingPath));
- CHECK(os::exists(stagingPath));
+ request.set_staging_target_path(stagingPath);
+ }
- request.set_staging_target_path(stagingPath);
- }
+ CHECK_SOME(nodeContainerId);
- return call<csi::v0::NODE_PUBLISH_VOLUME>(client, std::move(request))
- .then(defer(self(), [this, volumeId] {
- VolumeData& volume = volumes.at(volumeId);
+ return call<csi::v0::NODE_PUBLISH_VOLUME>(
+ nodeContainerId.get(), std::move(request))
+ .then(defer(self(), [this, volumeId] {
+ VolumeData& volume = volumes.at(volumeId);
- volume.state.set_state(VolumeState::PUBLISHED);
- checkpointVolumeState(volumeId);
+ volume.state.set_state(VolumeState::PUBLISHED);
+ checkpointVolumeState(volumeId);
- return Nothing();
- }));
+ return Nothing();
}));
}
// Transitions the state of the specified volume from `PUBLISHED`,
// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`.
+//
// NOTE: This can only be called after `prepareNodeService`.
Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
const string& volumeId)
{
CHECK(volumes.contains(volumeId));
- CHECK_SOME(nodeContainerId);
+ VolumeData& volume = volumes.at(volumeId);
- return getService(nodeContainerId.get())
- .then(defer(self(), [this, volumeId](csi::v0::Client client) {
- VolumeData& volume = volumes.at(volumeId);
+ const string targetPath = csi::paths::getMountTargetPath(
+ csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name()),
+ volumeId);
- const string targetPath = csi::paths::getMountTargetPath(
- csi::paths::getMountRootDir(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name()),
- volumeId);
-
- CHECK(os::exists(targetPath));
-
- // A previously failed `NodePublishVolume` call can be recovered through
- // the current `NodeUnpublishVolume` call. See:
- // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
- if (volume.state.state() == VolumeState::PUBLISHED ||
- volume.state.state() == VolumeState::NODE_PUBLISH) {
- volume.state.set_state(VolumeState::NODE_UNPUBLISH);
- checkpointVolumeState(volumeId);
- }
+ CHECK(os::exists(targetPath));
- CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state());
+ // A previously failed `NodePublishVolume` call can be recovered through the
+ // current `NodeUnpublishVolume` call. See:
+ // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT
+ if (volume.state.state() == VolumeState::PUBLISHED ||
+ volume.state.state() == VolumeState::NODE_PUBLISH) {
+ volume.state.set_state(VolumeState::NODE_UNPUBLISH);
+ checkpointVolumeState(volumeId);
+ }
- csi::v0::NodeUnpublishVolumeRequest request;
- request.set_volume_id(volumeId);
- request.set_target_path(targetPath);
+ CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state());
- return call<csi::v0::NODE_UNPUBLISH_VOLUME>(client, std::move(request))
- .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
- VolumeData& volume = volumes.at(volumeId);
+ csi::v0::NodeUnpublishVolumeRequest request;
+ request.set_volume_id(volumeId);
+ request.set_target_path(targetPath);
- volume.state.set_state(VolumeState::VOL_READY);
- checkpointVolumeState(volumeId);
+ CHECK_SOME(nodeContainerId);
- Try<Nothing> rmdir = os::rmdir(targetPath);
- if (rmdir.isError()) {
- return Failure(
- "Failed to remove mount point '" + targetPath + "': " +
- rmdir.error());
- }
+ return call<csi::v0::NODE_UNPUBLISH_VOLUME>(
+ nodeContainerId.get(), std::move(request))
+ .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
+ VolumeData& volume = volumes.at(volumeId);
- return Nothing();
- }));
+ volume.state.set_state(VolumeState::VOL_READY);
+ checkpointVolumeState(volumeId);
+
+ Try<Nothing> rmdir = os::rmdir(targetPath);
+ if (rmdir.isError()) {
+ return Failure(
+ "Failed to remove mount point '" + targetPath + "': " +
+ rmdir.error());
+ }
+
+ return Nothing();
}));
}
@@ -2679,43 +2668,37 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
"Controller capability 'CREATE_DELETE_VOLUME' is not supported");
}
+ csi::v0::CreateVolumeRequest request;
+ request.set_name(name);
+ request.mutable_capacity_range()->set_required_bytes(capacity.bytes());
+ request.mutable_capacity_range()->set_limit_bytes(capacity.bytes());
+ *request.add_volume_capabilities() = profileInfo.capability;
+ *request.mutable_parameters() = profileInfo.parameters;
+
CHECK_SOME(controllerContainerId);
- return getService(controllerContainerId.get())
- .then(defer(self(), [=](csi::v0::Client client) {
- csi::v0::CreateVolumeRequest request;
- request.set_name(name);
- request.mutable_capacity_range()
- ->set_required_bytes(capacity.bytes());
- request.mutable_capacity_range()
- ->set_limit_bytes(capacity.bytes());
- request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
- *request.mutable_parameters() = profileInfo.parameters;
-
- return call<csi::v0::CREATE_VOLUME>(client, std::move(request))
- .then(defer(self(), [=](
- const csi::v0::CreateVolumeResponse& response) -> string {
- const csi::v0::Volume& volume = response.volume();
-
- if (volumes.contains(volume.id())) {
- // The resource provider failed over after the last `createVolume`
- // call, but before the operation status was checkpointed.
- CHECK_EQ(VolumeState::CREATED,
- volumes.at(volume.id()).state.state());
- } else {
- VolumeState volumeState;
- volumeState.set_state(VolumeState::CREATED);
- volumeState.mutable_volume_capability()
- ->CopyFrom(profileInfo.capability);
- *volumeState.mutable_parameters() = profileInfo.parameters;
- *volumeState.mutable_volume_attributes() = volume.attributes();
-
- volumes.put(volume.id(), std::move(volumeState));
- checkpointVolumeState(volume.id());
- }
+ return call<csi::v0::CREATE_VOLUME>(
+ controllerContainerId.get(), std::move(request))
+ .then(defer(self(), [=](
+ const csi::v0::CreateVolumeResponse& response) -> string {
+ const csi::v0::Volume& volume = response.volume();
- return volume.id();
- }));
+ if (volumes.contains(volume.id())) {
+ // The resource provider failed over after the last `createVolume` call,
+ // but before the operation status was checkpointed.
+ CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state());
+ } else {
+ VolumeState volumeState;
+ volumeState.set_state(VolumeState::CREATED);
+ *volumeState.mutable_volume_capability() = profileInfo.capability;
+ *volumeState.mutable_parameters() = profileInfo.parameters;
+ *volumeState.mutable_volume_attributes() = volume.attributes();
+
+ volumes.put(volume.id(), std::move(volumeState));
+ checkpointVolumeState(volume.id());
+ }
+
+ return volume.id();
}));
}
@@ -2727,8 +2710,6 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
const string& volumeId)
{
- CHECK_SOME(controllerContainerId);
-
const string volumePath = csi::paths::getVolumePath(
slave::paths::getCsiRootDir(workDir),
info.storage().plugin().type(),
@@ -2745,10 +2726,10 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
const VolumeData& volume = volumes.at(volumeId);
- Future<Nothing> deleted = Nothing();
-
CHECK(VolumeState::State_IsValid(volume.state.state()));
+ Future<Nothing> deleted = Nothing();
+
switch (volume.state.state()) {
case VolumeState::PUBLISHED:
case VolumeState::NODE_PUBLISH:
@@ -2782,12 +2763,14 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume(
// supported. Otherwise, we simply leave it as a preprovisioned volume.
if (controllerCapabilities.createDeleteVolume) {
deleted = deleted
- .then(defer(self(), &Self::getService, controllerContainerId.get()))
- .then(defer(self(), [this, volumeId](csi::v0::Client client) {
+ .then(defer(self(), [this, volumeId] {
csi::v0::DeleteVolumeRequest request;
request.set_volume_id(volumeId);
- return call<csi::v0::DELETE_VOLUME>(client, std::move(request))
+ CHECK_SOME(controllerContainerId);
+
+ return call<csi::v0::DELETE_VOLUME>(
+ controllerContainerId.get(), std::move(request))
.then([] { return Nothing(); });
}));
}
@@ -2856,45 +2839,40 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume(
"Plugin capability 'CONTROLLER_SERVICE' is not supported");
}
- CHECK_SOME(controllerContainerId);
+ google::protobuf::Map<string, string> volumeAttributes;
- return getService(controllerContainerId.get())
- .then(defer(self(), [=](csi::v0::Client client) {
- google::protobuf::Map<string, string> volumeAttributes;
+ if (metadata.isSome()) {
+ volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
+ }
- if (metadata.isSome()) {
- volumeAttributes =
- CHECK_NOTERROR(convertLabelsToStringMap(metadata.get()));
- }
+ csi::v0::ValidateVolumeCapabilitiesRequest request;
+ request.set_volume_id(volumeId);
+ *request.add_volume_capabilities() = profileInfo.capability;
+ *request.mutable_volume_attributes() = volumeAttributes;
- csi::v0::ValidateVolumeCapabilitiesRequest request;
- request.set_volume_id(volumeId);
- request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
- *request.mutable_volume_attributes() = volumeAttributes;
-
- return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
- client, std::move(request))
- .then(defer(self(), [=](
- const csi::v0::ValidateVolumeCapabilitiesResponse& response)
- -> Future<Nothing> {
- if (!response.supported()) {
- return Failure(
- "Unsupported volume capability for volume '" + volumeId +
- "': " + response.message());
- }
+ CHECK_SOME(controllerContainerId);
- VolumeState volumeState;
- volumeState.set_state(VolumeState::CREATED);
- volumeState.mutable_volume_capability()
- ->CopyFrom(profileInfo.capability);
- *volumeState.mutable_parameters() = profileInfo.parameters;
- *volumeState.mutable_volume_attributes() = volumeAttributes;
+ return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
+ controllerContainerId.get(), std::move(request))
+ .then(defer(self(), [=](
+ const csi::v0::ValidateVolumeCapabilitiesResponse& response)
+ -> Future<Nothing> {
+ if (!response.supported()) {
+ return Failure(
+ "Unsupported volume capability for volume '" + volumeId + "': " +
+ response.message());
+ }
- volumes.put(volumeId, std::move(volumeState));
- checkpointVolumeState(volumeId);
+ VolumeState volumeState;
+ volumeState.set_state(VolumeState::CREATED);
+ *volumeState.mutable_volume_capability() = profileInfo.capability;
+ *volumeState.mutable_parameters() = profileInfo.parameters;
+ *volumeState.mutable_volume_attributes() = volumeAttributes;
- return Nothing();
- }));
+ volumes.put(volumeId, std::move(volumeState));
+ checkpointVolumeState(volumeId);
+
+ return Nothing();
}));
}
@@ -2912,41 +2890,39 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
CHECK_SOME(controllerContainerId);
- return getService(controllerContainerId.get())
- .then(defer(self(), [=](csi::v0::Client client) {
- // TODO(chhsiao): Set the max entries and use a loop to do
- // multiple `ListVolumes` calls.
- return call<csi::v0::LIST_VOLUMES>(client, csi::v0::ListVolumesRequest())
- .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
- Resources resources;
-
- // Recover disk profiles from the checkpointed state.
- hashmap<string, string> volumesToProfiles;
- foreach (const Resource& resource, totalResources) {
- if (resource.disk().source().has_id() &&
- resource.disk().source().has_profile()) {
- volumesToProfiles.put(
- resource.disk().source().id(),
- resource.disk().source().profile());
- }
- }
+ // TODO(chhsiao): Set the max entries and use a loop to do
+ // multiple `ListVolumes` calls.
+ return call<csi::v0::LIST_VOLUMES>(
+ controllerContainerId.get(), csi::v0::ListVolumesRequest())
+ .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
+ Resources resources;
- foreach (const auto& entry, response.entries()) {
- resources += createRawDiskResource(
- info,
- Bytes(entry.volume().capacity_bytes()),
- volumesToProfiles.contains(entry.volume().id())
- ? volumesToProfiles.at(entry.volume().id())
- : Option<string>::none(),
- vendor,
- entry.volume().id(),
- entry.volume().attributes().empty()
- ? Option<Labels>::none()
- : convertStringMapToLabels(entry.volume().attributes()));
- }
+ // Recover disk profiles from the checkpointed state.
+ hashmap<string, string> volumesToProfiles;
+ foreach (const Resource& resource, totalResources) {
+ if (resource.disk().source().has_id() &&
+ resource.disk().source().has_profile()) {
+ volumesToProfiles.put(
+ resource.disk().source().id(),
+ resource.disk().source().profile());
+ }
+ }
- return resources;
- }));
+ foreach (const auto& entry, response.entries()) {
+ resources += createRawDiskResource(
+ info,
+ Bytes(entry.volume().capacity_bytes()),
+ volumesToProfiles.contains(entry.volume().id())
+ ? volumesToProfiles.at(entry.volume().id())
+ : Option<string>::none(),
+ vendor,
+ entry.volume().id(),
+ entry.volume().attributes().empty()
+ ? Option<Labels>::none()
+ : convertStringMapToLabels(entry.volume().attributes()));
+ }
+
+ return resources;
}));
}
@@ -2964,19 +2940,18 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
CHECK_SOME(controllerContainerId);
- return getService(controllerContainerId.get())
- .then(defer(self(), [=](csi::v0::Client client) {
- vector<Future<Resources>> futures;
+ vector<Future<Resources>> futures;
- foreachpair (const string& profile,
- const DiskProfileAdaptor::ProfileInfo& profileInfo,
- profileInfos) {
- csi::v0::GetCapacityRequest request;
- request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
- *request.mutable_parameters() = profileInfo.parameters;
+ foreachpair (const string& profile,
+ const DiskProfileAdaptor::ProfileInfo& profileInfo,
+ profileInfos) {
+ csi::v0::GetCapacityRequest request;
+ *request.add_volume_capabilities() = profileInfo.capability;
+ *request.mutable_parameters() = profileInfo.parameters;
- futures.push_back(call<csi::v0::GET_CAPACITY>(
- client, std::move(request))
+ futures.push_back(
+ call<csi::v0::GET_CAPACITY>(
+ controllerContainerId.get(), std::move(request))
.then(defer(self(), [=](
const csi::v0::GetCapacityResponse& response) -> Resources {
if (response.available_capacity() == 0) {
@@ -2984,18 +2959,14 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
}
return createRawDiskResource(
- info,
- Bytes(response.available_capacity()),
- profile,
- vendor);
+ info, Bytes(response.available_capacity()), profile, vendor);
})));
- }
+ }
- return collect(futures)
- .then([](const vector<Resources>& resources) {
- return accumulate(resources.begin(), resources.end(), Resources());
- });
- }));
+ return collect(futures)
+ .then([](const vector<Resources>& resources) {
+ return accumulate(resources.begin(), resources.end(), Resources());
+ });
}