You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/06/01 01:32:35 UTC
[09/12] mesos git commit: Added per-CSI-call RPC metrics for SLRP.
Added per-CSI-call RPC metrics for SLRP.
For each CSI call, e.g., `csi.v0.Identity.Probe`, we the following
metrics for SLRP:
`csi_plugin/rpcs/csi.v0.Identity.Probe/pending`
`csi_plugin/rpcs/csi.v0.Identity.Probe/successes`
`csi_plugin/rpcs/csi.v0.Identity.Probe/errors`
`csi_plugin/rpcs/csi.v0.Identity.Probe/cancelled`
To add these per-CSI-call metrics, each method in `csi::v0::Client`,
e.g., `csi::v0::Client::Probe`, is changed to
`csi::v0::Client::call<PROBE>`, to make RPC calls based on the RPC enum
value. A `call` helper function in SLRP is also added to intercept CSI
calls and update the corresponding metrics.
Review: https://reviews.apache.org/r/67255
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/15fc86d2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/15fc86d2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/15fc86d2
Branch: refs/heads/master
Commit: 15fc86d22f1fbe922ef878bb2e6f6462d2248b14
Parents: cae7d7a
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 22 17:34:06 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 31 18:29:56 2018 -0700
----------------------------------------------------------------------
src/csi/client.cpp | 68 +++++++---
src/csi/client.hpp | 139 ++++++++++++++-------
src/resource_provider/storage/provider.cpp | 158 ++++++++++++++++++++----
src/tests/csi_client_tests.cpp | 80 ++++++------
4 files changed, 326 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index a4ba1f1..923ee6f 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -25,7 +25,9 @@ namespace mesos {
namespace csi {
namespace v0 {
-Future<GetPluginInfoResponse> Client::GetPluginInfo(
+template <>
+Future<GetPluginInfoResponse>
+Client::call<GET_PLUGIN_INFO>(
const GetPluginInfoRequest& request)
{
return runtime
@@ -37,7 +39,9 @@ Future<GetPluginInfoResponse> Client::GetPluginInfo(
}
-Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities(
+template <>
+Future<GetPluginCapabilitiesResponse>
+Client::call<GET_PLUGIN_CAPABILITIES>(
const GetPluginCapabilitiesRequest& request)
{
return runtime
@@ -52,7 +56,9 @@ Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities(
}
-Future<ProbeResponse> Client::Probe(
+template <>
+Future<ProbeResponse>
+Client::call<PROBE>(
const ProbeRequest& request)
{
return runtime
@@ -64,7 +70,9 @@ Future<ProbeResponse> Client::Probe(
}
-Future<CreateVolumeResponse> Client::CreateVolume(
+template <>
+Future<CreateVolumeResponse>
+Client::call<CREATE_VOLUME>(
const CreateVolumeRequest& request)
{
return runtime
@@ -76,7 +84,9 @@ Future<CreateVolumeResponse> Client::CreateVolume(
}
-Future<DeleteVolumeResponse> Client::DeleteVolume(
+template <>
+Future<DeleteVolumeResponse>
+Client::call<DELETE_VOLUME>(
const DeleteVolumeRequest& request)
{
return runtime
@@ -88,7 +98,9 @@ Future<DeleteVolumeResponse> Client::DeleteVolume(
}
-Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume(
+template <>
+Future<ControllerPublishVolumeResponse>
+Client::call<CONTROLLER_PUBLISH_VOLUME>(
const ControllerPublishVolumeRequest& request)
{
return runtime
@@ -103,7 +115,9 @@ Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume(
}
-Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume(
+template <>
+Future<ControllerUnpublishVolumeResponse>
+Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
const ControllerUnpublishVolumeRequest& request)
{
return runtime
@@ -118,7 +132,9 @@ Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume(
}
-Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities(
+template <>
+Future<ValidateVolumeCapabilitiesResponse>
+Client::call<VALIDATE_VOLUME_CAPABILITIES>(
const ValidateVolumeCapabilitiesRequest& request)
{
return runtime
@@ -133,7 +149,9 @@ Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities(
}
-Future<ListVolumesResponse> Client::ListVolumes(
+template <>
+Future<ListVolumesResponse>
+Client::call<LIST_VOLUMES>(
const ListVolumesRequest& request)
{
return runtime
@@ -145,7 +163,9 @@ Future<ListVolumesResponse> Client::ListVolumes(
}
-Future<GetCapacityResponse> Client::GetCapacity(
+template <>
+Future<GetCapacityResponse>
+Client::call<GET_CAPACITY>(
const GetCapacityRequest& request)
{
return runtime
@@ -157,7 +177,9 @@ Future<GetCapacityResponse> Client::GetCapacity(
}
-Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
+template <>
+Future<ControllerGetCapabilitiesResponse>
+Client::call<CONTROLLER_GET_CAPABILITIES>(
const ControllerGetCapabilitiesRequest& request)
{
return runtime
@@ -172,7 +194,9 @@ Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
}
-Future<NodeStageVolumeResponse> Client::NodeStageVolume(
+template <>
+Future<NodeStageVolumeResponse>
+Client::call<NODE_STAGE_VOLUME>(
const NodeStageVolumeRequest& request)
{
return runtime
@@ -184,7 +208,9 @@ Future<NodeStageVolumeResponse> Client::NodeStageVolume(
}
-Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume(
+template <>
+Future<NodeUnstageVolumeResponse>
+Client::call<NODE_UNSTAGE_VOLUME>(
const NodeUnstageVolumeRequest& request)
{
return runtime
@@ -196,7 +222,9 @@ Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume(
}
-Future<NodePublishVolumeResponse> Client::NodePublishVolume(
+template <>
+Future<NodePublishVolumeResponse>
+Client::call<NODE_PUBLISH_VOLUME>(
const NodePublishVolumeRequest& request)
{
return runtime
@@ -208,7 +236,9 @@ Future<NodePublishVolumeResponse> Client::NodePublishVolume(
}
-Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
+template <>
+Future<NodeUnpublishVolumeResponse>
+Client::call<NODE_UNPUBLISH_VOLUME>(
const NodeUnpublishVolumeRequest& request)
{
return runtime
@@ -220,7 +250,9 @@ Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
}
-Future<NodeGetIdResponse> Client::NodeGetId(
+template <>
+Future<NodeGetIdResponse>
+Client::call<NODE_GET_ID>(
const NodeGetIdRequest& request)
{
return runtime
@@ -232,7 +264,9 @@ Future<NodeGetIdResponse> Client::NodeGetId(
}
-Future<NodeGetCapabilitiesResponse> Client::NodeGetCapabilities(
+template <>
+Future<NodeGetCapabilitiesResponse>
+Client::call<NODE_GET_CAPABILITIES>(
const NodeGetCapabilitiesRequest& request)
{
return runtime
http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/csi/client.hpp
----------------------------------------------------------------------
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 9d7019a..1c57ac5 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -17,12 +17,12 @@
#ifndef __CSI_CLIENT_HPP__
#define __CSI_CLIENT_HPP__
-#include <string>
-
#include <csi/spec.hpp>
#include <process/grpc.hpp>
+#include "csi/rpc.hpp"
+
namespace mesos {
namespace csi {
namespace v0 {
@@ -34,65 +34,116 @@ public:
const process::grpc::client::Runtime& _runtime)
: connection(_connection), runtime(_runtime) {}
- // RPCs for the Identity service.
- process::Future<GetPluginInfoResponse>
- GetPluginInfo(const GetPluginInfoRequest& request);
+ template <RPC rpc>
+ process::Future<typename RPCTraits<rpc>::response_type> call(
+ const typename RPCTraits<rpc>::request_type& request);
- process::Future<GetPluginCapabilitiesResponse>
- GetPluginCapabilities(const GetPluginCapabilitiesRequest& request);
+private:
+ process::grpc::client::Connection connection;
+ process::grpc::client::Runtime runtime;
+};
- process::Future<ProbeResponse>
- Probe(const ProbeRequest& request);
- // RPCs for the Controller service.
- process::Future<CreateVolumeResponse>
- CreateVolume(const CreateVolumeRequest& request);
+template <>
+process::Future<GetPluginInfoResponse>
+Client::call<GET_PLUGIN_INFO>(
+ const GetPluginInfoRequest& request);
- process::Future<DeleteVolumeResponse>
- DeleteVolume(const DeleteVolumeRequest& request);
- process::Future<ControllerPublishVolumeResponse>
- ControllerPublishVolume(const ControllerPublishVolumeRequest& request);
+template <>
+process::Future<GetPluginCapabilitiesResponse>
+Client::call<GET_PLUGIN_CAPABILITIES>(
+ const GetPluginCapabilitiesRequest& request);
- process::Future<ControllerUnpublishVolumeResponse>
- ControllerUnpublishVolume(const ControllerUnpublishVolumeRequest& request);
- process::Future<ValidateVolumeCapabilitiesResponse>
- ValidateVolumeCapabilities(
- const ValidateVolumeCapabilitiesRequest& request);
+template <>
+process::Future<ProbeResponse>
+Client::call<PROBE>(
+ const ProbeRequest& request);
- process::Future<ListVolumesResponse>
- ListVolumes(const ListVolumesRequest& request);
- process::Future<GetCapacityResponse>
- GetCapacity(const GetCapacityRequest& request);
+template <>
+process::Future<CreateVolumeResponse>
+Client::call<CREATE_VOLUME>(
+ const CreateVolumeRequest& request);
- process::Future<ControllerGetCapabilitiesResponse>
- ControllerGetCapabilities(const ControllerGetCapabilitiesRequest& request);
- // RPCs for the Node service.
- process::Future<NodeStageVolumeResponse>
- NodeStageVolume(const NodeStageVolumeRequest& request);
+template <>
+process::Future<DeleteVolumeResponse>
+Client::call<DELETE_VOLUME>(
+ const DeleteVolumeRequest& request);
- process::Future<NodeUnstageVolumeResponse>
- NodeUnstageVolume(const NodeUnstageVolumeRequest& request);
- process::Future<NodePublishVolumeResponse>
- NodePublishVolume(const NodePublishVolumeRequest& request);
+template <>
+process::Future<ControllerPublishVolumeResponse>
+Client::call<CONTROLLER_PUBLISH_VOLUME>(
+ const ControllerPublishVolumeRequest& request);
- process::Future<NodeUnpublishVolumeResponse>
- NodeUnpublishVolume(const NodeUnpublishVolumeRequest& request);
- process::Future<NodeGetIdResponse>
- NodeGetId(const NodeGetIdRequest& request);
+template <>
+process::Future<ControllerUnpublishVolumeResponse>
+Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
+ const ControllerUnpublishVolumeRequest& request);
- process::Future<NodeGetCapabilitiesResponse>
- NodeGetCapabilities(const NodeGetCapabilitiesRequest& request);
-private:
- process::grpc::client::Connection connection;
- process::grpc::client::Runtime runtime;
-};
+template <>
+process::Future<ValidateVolumeCapabilitiesResponse>
+Client::call<VALIDATE_VOLUME_CAPABILITIES>(
+ const ValidateVolumeCapabilitiesRequest& request);
+
+
+template <>
+process::Future<ListVolumesResponse>
+Client::call<LIST_VOLUMES>(
+ const ListVolumesRequest& request);
+
+
+template <>
+process::Future<GetCapacityResponse>
+Client::call<GET_CAPACITY>(
+ const GetCapacityRequest& request);
+
+
+template <>
+process::Future<ControllerGetCapabilitiesResponse>
+Client::call<CONTROLLER_GET_CAPABILITIES>(
+ const ControllerGetCapabilitiesRequest& request);
+
+
+template <>
+process::Future<NodeStageVolumeResponse>
+Client::call<NODE_STAGE_VOLUME>(
+ const NodeStageVolumeRequest& request);
+
+
+template <>
+process::Future<NodeUnstageVolumeResponse>
+Client::call<NODE_UNSTAGE_VOLUME>(
+ const NodeUnstageVolumeRequest& request);
+
+
+template <>
+process::Future<NodePublishVolumeResponse>
+Client::call<NODE_PUBLISH_VOLUME>(
+ const NodePublishVolumeRequest& request);
+
+
+template <>
+process::Future<NodeUnpublishVolumeResponse>
+Client::call<NODE_UNPUBLISH_VOLUME>(
+ const NodeUnpublishVolumeRequest& request);
+
+
+template <>
+process::Future<NodeGetIdResponse>
+Client::call<NODE_GET_ID>(
+ const NodeGetIdRequest& request);
+
+
+template <>
+process::Future<NodeGetCapabilitiesResponse>
+Client::call<NODE_GET_CAPABILITIES>(
+ const NodeGetCapabilitiesRequest& request);
} // namespace v0 {
} // namespace csi {
http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 8a4b037..333336e 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -59,6 +59,7 @@
#include "csi/client.hpp"
#include "csi/paths.hpp"
+#include "csi/rpc.hpp"
#include "csi/state.hpp"
#include "csi/utils.hpp"
@@ -375,6 +376,11 @@ private:
void reconcileOperations(
const Event::ReconcileOperations& reconcile);
+ template <csi::v0::RPC rpc>
+ Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
+ csi::v0::Client client,
+ const typename csi::v0::RPCTraits<rpc>::request_type& request);
+
Future<csi::v0::Client> connect(const string& endpoint);
Future<csi::v0::Client> getService(const ContainerID& containerId);
Future<Nothing> killService(const ContainerID& containerId);
@@ -498,6 +504,10 @@ private:
// CSI plugin metrics.
Counter csi_plugin_container_terminations;
+ hashmap<csi::v0::RPC, PushGauge> csi_plugin_rpcs_pending;
+ hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_successes;
+ hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_errors;
+ hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_cancelled;
// Operation state metrics.
hashmap<Offer::Operation::Type, PushGauge> operations_pending;
@@ -1752,6 +1762,29 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
}
+template <csi::v0::RPC rpc>
+Future<typename csi::v0::RPCTraits<rpc>::response_type>
+StorageLocalResourceProviderProcess::call(
+ csi::v0::Client client,
+ const typename csi::v0::RPCTraits<rpc>::request_type& request)
+{
+ ++metrics.csi_plugin_rpcs_pending.at(rpc);
+
+ return client.call<rpc>(request)
+ .onAny(defer(self(), [=](
+ const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) {
+ --metrics.csi_plugin_rpcs_pending.at(rpc);
+ if (future.isReady()) {
+ ++metrics.csi_plugin_rpcs_successes.at(rpc);
+ } else if (future.isFailed()) {
+ ++metrics.csi_plugin_rpcs_errors.at(rpc);
+ } else {
+ ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
+ }
+ }));
+}
+
+
// Returns a future of a CSI client that waits for the endpoint socket
// to appear if necessary, then connects to the socket and check its
// readiness.
@@ -1786,7 +1819,7 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
return future
.then(defer(self(), [=](csi::v0::Client client) {
- return client.Probe(csi::v0::ProbeRequest())
+ return call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
.then(defer(self(), [=](const csi::v0::ProbeResponse& response) {
return client;
}));
@@ -2011,7 +2044,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
return getService(nodeContainerId.get())
.then(defer(self(), [=](csi::v0::Client client) {
// Get the plugin info.
- return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+ return call<csi::v0::GET_PLUGIN_INFO>(
+ client, csi::v0::GetPluginInfoRequest())
.then(defer(self(), [=](
const csi::v0::GetPluginInfoResponse& response) {
pluginInfo = response;
@@ -2024,8 +2058,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
}))
.then(defer(self(), [=](csi::v0::Client client) {
// Get the plugin capabilities.
- return client.GetPluginCapabilities(
- csi::v0::GetPluginCapabilitiesRequest())
+ return call<csi::v0::GET_PLUGIN_CAPABILITIES>(
+ client, csi::v0::GetPluginCapabilitiesRequest())
.then(defer(self(), [=](
const csi::v0::GetPluginCapabilitiesResponse& response) {
pluginCapabilities = response.capabilities();
@@ -2053,7 +2087,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
return getService(controllerContainerId.get())
.then(defer(self(), [=](csi::v0::Client client) {
// Get the controller plugin info and check for consistency.
- return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+ return call<csi::v0::GET_PLUGIN_INFO>(
+ client, csi::v0::GetPluginInfoRequest())
.then(defer(self(), [=](
const csi::v0::GetPluginInfoResponse& response) {
LOG(INFO) << "Controller plugin loaded: " << stringify(response);
@@ -2071,8 +2106,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
}))
.then(defer(self(), [=](csi::v0::Client client) {
// Get the controller capabilities.
- return client.ControllerGetCapabilities(
- csi::v0::ControllerGetCapabilitiesRequest())
+ return call<csi::v0::CONTROLLER_GET_CAPABILITIES>(
+ client, csi::v0::ControllerGetCapabilitiesRequest())
.then(defer(self(), [=](
const csi::v0::ControllerGetCapabilitiesResponse& response) {
controllerCapabilities = response.capabilities();
@@ -2092,7 +2127,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
return getService(nodeContainerId.get())
.then(defer(self(), [=](csi::v0::Client client) {
// Get the node capabilities.
- return client.NodeGetCapabilities(csi::v0::NodeGetCapabilitiesRequest())
+ return call<csi::v0::NODE_GET_CAPABILITIES>(
+ client, csi::v0::NodeGetCapabilitiesRequest())
.then(defer(self(), [=](
const csi::v0::NodeGetCapabilitiesResponse& response)
-> Future<csi::v0::Client> {
@@ -2107,7 +2143,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
}
// Get the node ID.
- return client.NodeGetId(csi::v0::NodeGetIdRequest())
+ return call<csi::v0::NODE_GET_ID>(client, csi::v0::NodeGetIdRequest())
.then(defer(self(), [=](
const csi::v0::NodeGetIdResponse& response) {
nodeId = response.node_id();
@@ -2161,7 +2197,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
request.set_readonly(false);
*request.mutable_volume_attributes() = volume.state.volume_attributes();
- return client.ControllerPublishVolume(request)
+ return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
+ client, std::move(request))
.then(defer(self(), [this, volumeId](
const csi::v0::ControllerPublishVolumeResponse& response) {
VolumeData& volume = volumes.at(volumeId);
@@ -2217,7 +2254,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
request.set_volume_id(volumeId);
request.set_node_id(nodeId.get());
- return client.ControllerUnpublishVolume(request)
+ return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
+ client, std::move(request))
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
@@ -2286,7 +2324,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
->CopyFrom(volume.state.volume_capability());
*request.mutable_volume_attributes() = volume.state.volume_attributes();
- return client.NodeStageVolume(request)
+ return call<csi::v0::NODE_STAGE_VOLUME>(client, std::move(request))
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
@@ -2349,7 +2387,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
request.set_volume_id(volumeId);
request.set_staging_target_path(stagingPath);
- return client.NodeUnstageVolume(request)
+ return call<csi::v0::NODE_UNSTAGE_VOLUME>(client, std::move(request))
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
@@ -2420,7 +2458,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
request.set_staging_target_path(stagingPath);
}
- return client.NodePublishVolume(request)
+ return call<csi::v0::NODE_PUBLISH_VOLUME>(client, std::move(request))
.then(defer(self(), [this, volumeId] {
VolumeData& volume = volumes.at(volumeId);
@@ -2470,7 +2508,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
request.set_volume_id(volumeId);
request.set_target_path(targetPath);
- return client.NodeUnpublishVolume(request)
+ return call<csi::v0::NODE_UNPUBLISH_VOLUME>(client, std::move(request))
.then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
VolumeData& volume = volumes.at(volumeId);
@@ -2515,7 +2553,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
*request.mutable_parameters() = profileInfo.parameters;
- return client.CreateVolume(request)
+ return call<csi::v0::CREATE_VOLUME>(client, std::move(request))
.then(defer(self(), [=](const csi::v0::CreateVolumeResponse& response) {
const csi::v0::Volume& volume = response.volume();
@@ -2609,11 +2647,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
if (!preExisting) {
deleted = deleted
.then(defer(self(), &Self::getService, controllerContainerId.get()))
- .then(defer(self(), [volumeId](csi::v0::Client client) {
+ .then(defer(self(), [this, volumeId](csi::v0::Client client) {
csi::v0::DeleteVolumeRequest request;
request.set_volume_id(volumeId);
- return client.DeleteVolume(request)
+ return call<csi::v0::DELETE_VOLUME>(client, std::move(request))
.then([] { return Nothing(); });
}));
}
@@ -2681,7 +2719,8 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
request.add_volume_capabilities()->CopyFrom(capability);
*request.mutable_volume_attributes() = volumeAttributes;
- return client.ValidateVolumeCapabilities(request)
+ return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
+ client, std::move(request))
.then(defer(self(), [=](
const csi::v0::ValidateVolumeCapabilitiesResponse& response)
-> Future<string> {
@@ -2722,7 +2761,7 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
.then(defer(self(), [=](csi::v0::Client client) {
// TODO(chhsiao): Set the max entries and use a loop to do
// multiple `ListVolumes` calls.
- return client.ListVolumes(csi::v0::ListVolumesRequest())
+ return call<csi::v0::LIST_VOLUMES>(client, csi::v0::ListVolumesRequest())
.then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
Resources resources;
@@ -2785,7 +2824,8 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
*request.mutable_parameters() = profileInfo.parameters;
- futures.push_back(client.GetCapacity(request)
+ futures.push_back(call<csi::v0::GET_CAPACITY>(
+ client, std::move(request))
.then(defer(self(), [=](
const csi::v0::GetCapacityResponse& response) -> Resources {
if (response.available_capacity() == 0) {
@@ -3433,6 +3473,66 @@ StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix)
{
process::metrics::add(csi_plugin_container_terminations);
+ vector<csi::v0::RPC> rpcs;
+
+ // NOTE: We use a switch statement here as a compile-time sanity check so we
+ // won't forget to add metrics for new RPCs in the future.
+ csi::v0::RPC firstRpc = csi::v0::GET_PLUGIN_INFO;
+ switch (firstRpc) {
+ case csi::v0::GET_PLUGIN_INFO:
+ rpcs.push_back(csi::v0::GET_PLUGIN_INFO);
+ case csi::v0::GET_PLUGIN_CAPABILITIES:
+ rpcs.push_back(csi::v0::GET_PLUGIN_CAPABILITIES);
+ case csi::v0::PROBE:
+ rpcs.push_back(csi::v0::PROBE);
+ case csi::v0::CREATE_VOLUME:
+ rpcs.push_back(csi::v0::CREATE_VOLUME);
+ case csi::v0::DELETE_VOLUME:
+ rpcs.push_back(csi::v0::DELETE_VOLUME);
+ case csi::v0::CONTROLLER_PUBLISH_VOLUME:
+ rpcs.push_back(csi::v0::CONTROLLER_PUBLISH_VOLUME);
+ case csi::v0::CONTROLLER_UNPUBLISH_VOLUME:
+ rpcs.push_back(csi::v0::CONTROLLER_UNPUBLISH_VOLUME);
+ case csi::v0::VALIDATE_VOLUME_CAPABILITIES:
+ rpcs.push_back(csi::v0::VALIDATE_VOLUME_CAPABILITIES);
+ case csi::v0::LIST_VOLUMES:
+ rpcs.push_back(csi::v0::LIST_VOLUMES);
+ case csi::v0::GET_CAPACITY:
+ rpcs.push_back(csi::v0::GET_CAPACITY);
+ case csi::v0::CONTROLLER_GET_CAPABILITIES:
+ rpcs.push_back(csi::v0::CONTROLLER_GET_CAPABILITIES);
+ case csi::v0::NODE_STAGE_VOLUME:
+ rpcs.push_back(csi::v0::NODE_STAGE_VOLUME);
+ case csi::v0::NODE_UNSTAGE_VOLUME:
+ rpcs.push_back(csi::v0::NODE_UNSTAGE_VOLUME);
+ case csi::v0::NODE_PUBLISH_VOLUME:
+ rpcs.push_back(csi::v0::NODE_PUBLISH_VOLUME);
+ case csi::v0::NODE_UNPUBLISH_VOLUME:
+ rpcs.push_back(csi::v0::NODE_UNPUBLISH_VOLUME);
+ case csi::v0::NODE_GET_ID:
+ rpcs.push_back(csi::v0::NODE_GET_ID);
+ case csi::v0::NODE_GET_CAPABILITIES:
+ rpcs.push_back(csi::v0::NODE_GET_CAPABILITIES);
+ }
+
+ foreach (const csi::v0::RPC& rpc, rpcs) {
+ const string name = stringify(rpc);
+
+ csi_plugin_rpcs_pending.put(
+ rpc, PushGauge(prefix + "csi_plugin/rpcs/" + name + "/pending"));
+ csi_plugin_rpcs_successes.put(
+ rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/successes"));
+ csi_plugin_rpcs_errors.put(
+ rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/errors"));
+ csi_plugin_rpcs_cancelled.put(
+ rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/cancelled"));
+
+ process::metrics::add(csi_plugin_rpcs_pending.at(rpc));
+ process::metrics::add(csi_plugin_rpcs_successes.at(rpc));
+ process::metrics::add(csi_plugin_rpcs_errors.at(rpc));
+ process::metrics::add(csi_plugin_rpcs_cancelled.at(rpc));
+ }
+
vector<Offer::Operation::Type> operationTypes;
// NOTE: We use a switch statement here as a compile-time sanity check so we
@@ -3499,6 +3599,22 @@ StorageLocalResourceProviderProcess::Metrics::~Metrics()
{
process::metrics::remove(csi_plugin_container_terminations);
+ foreachvalue (const PushGauge& gauge, csi_plugin_rpcs_pending) {
+ process::metrics::remove(gauge);
+ }
+
+ foreachvalue (const Counter& counter, csi_plugin_rpcs_successes) {
+ process::metrics::remove(counter);
+ }
+
+ foreachvalue (const Counter& counter, csi_plugin_rpcs_errors) {
+ process::metrics::remove(counter);
+ }
+
+ foreachvalue (const Counter& counter, csi_plugin_rpcs_cancelled) {
+ process::metrics::remove(counter);
+ }
+
foreachvalue (const PushGauge& gauge, operations_pending) {
process::metrics::remove(gauge);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/tests/csi_client_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index d5993d6..39dea56 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -14,22 +14,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include <functional>
+#include <ostream>
+
#include <process/gtest.hpp>
-#include <stout/lambda.hpp>
-#include <stout/path.hpp>
+#include <stout/nothing.hpp>
#include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
#include <stout/tests/utils.hpp>
#include "csi/client.hpp"
+#include "csi/rpc.hpp"
#include "tests/mock_csi_plugin.hpp"
+using std::ostream;
using std::string;
-using mesos::csi::v0::Client;
-
using process::Future;
using process::grpc::client::Connection;
@@ -47,22 +50,27 @@ struct RPCParam
{
struct Printer
{
- const string& operator()(const TestParamInfo<RPCParam>& info) const
+ string operator()(const TestParamInfo<RPCParam>& info) const
{
- return info.param.name;
+ return strings::replace(stringify(info.param.value), ".", "_");
}
};
- template <typename Request, typename Response>
- RPCParam(const string& _name, Future<Response>(Client::*rpc)(const Request&))
- : name(_name),
- call([=](const Connection& connection, const Runtime runtime) {
- return (Client(connection, runtime).*rpc)(Request())
+ template <csi::v0::RPC rpc>
+ static RPCParam create()
+ {
+ return RPCParam{
+ rpc,
+ [](csi::v0::Client client) {
+ return client
+ .call<rpc>(typename csi::v0::RPCTraits<rpc>::request_type())
.then([] { return Nothing(); });
- }) {}
+ }
+ };
+ }
- string name;
- lambda::function<Future<Nothing>(const Connection&, const Runtime&)> call;
+ const csi::v0::RPC value;
+ const std::function<Future<Nothing>(csi::v0::Client)> call;
};
@@ -95,51 +103,49 @@ protected:
};
-#define RPC_PARAM(method) \
- RPCParam(strings::replace(#method, "::", "_"), &method)
-
-
INSTANTIATE_TEST_CASE_P(
Identity,
CSIClientTest,
Values(
- RPC_PARAM(Client::GetPluginInfo),
- RPC_PARAM(Client::GetPluginCapabilities),
- RPC_PARAM(Client::Probe)),
+ RPCParam::create<csi::v0::GET_PLUGIN_INFO>(),
+ RPCParam::create<csi::v0::GET_PLUGIN_CAPABILITIES>(),
+ RPCParam::create<csi::v0::PROBE>()),
RPCParam::Printer());
+
INSTANTIATE_TEST_CASE_P(
Controller,
CSIClientTest,
Values(
- RPC_PARAM(Client::CreateVolume),
- RPC_PARAM(Client::DeleteVolume),
- RPC_PARAM(Client::ControllerPublishVolume),
- RPC_PARAM(Client::ControllerUnpublishVolume),
- RPC_PARAM(Client::ValidateVolumeCapabilities),
- RPC_PARAM(Client::ListVolumes),
- RPC_PARAM(Client::GetCapacity),
- RPC_PARAM(Client::ControllerGetCapabilities)),
+ RPCParam::create<csi::v0::CREATE_VOLUME>(),
+ RPCParam::create<csi::v0::DELETE_VOLUME>(),
+ RPCParam::create<csi::v0::CONTROLLER_PUBLISH_VOLUME>(),
+ RPCParam::create<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(),
+ RPCParam::create<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(),
+ RPCParam::create<csi::v0::LIST_VOLUMES>(),
+ RPCParam::create<csi::v0::GET_CAPACITY>(),
+ RPCParam::create<csi::v0::CONTROLLER_GET_CAPABILITIES>()),
RPCParam::Printer());
+
INSTANTIATE_TEST_CASE_P(
Node,
CSIClientTest,
Values(
- RPC_PARAM(Client::NodeStageVolume),
- RPC_PARAM(Client::NodeUnstageVolume),
- RPC_PARAM(Client::NodePublishVolume),
- RPC_PARAM(Client::NodeUnpublishVolume),
- RPC_PARAM(Client::NodeGetId),
- RPC_PARAM(Client::NodeGetCapabilities)),
+ RPCParam::create<csi::v0::NODE_STAGE_VOLUME>(),
+ RPCParam::create<csi::v0::NODE_UNSTAGE_VOLUME>(),
+ RPCParam::create<csi::v0::NODE_PUBLISH_VOLUME>(),
+ RPCParam::create<csi::v0::NODE_UNPUBLISH_VOLUME>(),
+ RPCParam::create<csi::v0::NODE_GET_ID>(),
+ RPCParam::create<csi::v0::NODE_GET_CAPABILITIES>()),
RPCParam::Printer());
// This test verifies that the all methods of CSI clients work.
TEST_P(CSIClientTest, Call)
{
- Future<Nothing> call = GetParam().call(connection.get(), runtime);
- AWAIT_EXPECT_READY(call);
+ AWAIT_EXPECT_READY(
+ GetParam().call(csi::v0::Client(connection.get(), runtime)));
}
} // namespace tests {