You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/06/01 01:32:35 UTC

[09/12] mesos git commit: Added per-CSI-call RPC metrics for SLRP.

Added per-CSI-call RPC metrics for SLRP.

For each CSI call, e.g., `csi.v0.Identity.Probe`, we the following
metrics for SLRP:
`csi_plugin/rpcs/csi.v0.Identity.Probe/pending`
`csi_plugin/rpcs/csi.v0.Identity.Probe/successes`
`csi_plugin/rpcs/csi.v0.Identity.Probe/errors`
`csi_plugin/rpcs/csi.v0.Identity.Probe/cancelled`

To add these per-CSI-call metrics, each method in `csi::v0::Client`,
e.g., `csi::v0::Client::Probe`, is changed to
`csi::v0::Client::call<PROBE>`, to make RPC calls based on the RPC enum
value. A `call` helper function in SLRP is also added to intercept CSI
calls and update the corresponding metrics.

Review: https://reviews.apache.org/r/67255


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/15fc86d2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/15fc86d2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/15fc86d2

Branch: refs/heads/master
Commit: 15fc86d22f1fbe922ef878bb2e6f6462d2248b14
Parents: cae7d7a
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 22 17:34:06 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 31 18:29:56 2018 -0700

----------------------------------------------------------------------
 src/csi/client.cpp                         |  68 +++++++---
 src/csi/client.hpp                         | 139 ++++++++++++++-------
 src/resource_provider/storage/provider.cpp | 158 ++++++++++++++++++++----
 src/tests/csi_client_tests.cpp             |  80 ++++++------
 4 files changed, 326 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index a4ba1f1..923ee6f 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -25,7 +25,9 @@ namespace mesos {
 namespace csi {
 namespace v0 {
 
-Future<GetPluginInfoResponse> Client::GetPluginInfo(
+template <>
+Future<GetPluginInfoResponse>
+Client::call<GET_PLUGIN_INFO>(
     const GetPluginInfoRequest& request)
 {
   return runtime
@@ -37,7 +39,9 @@ Future<GetPluginInfoResponse> Client::GetPluginInfo(
 }
 
 
-Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities(
+template <>
+Future<GetPluginCapabilitiesResponse>
+Client::call<GET_PLUGIN_CAPABILITIES>(
     const GetPluginCapabilitiesRequest& request)
 {
   return runtime
@@ -52,7 +56,9 @@ Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities(
 }
 
 
-Future<ProbeResponse> Client::Probe(
+template <>
+Future<ProbeResponse>
+Client::call<PROBE>(
     const ProbeRequest& request)
 {
   return runtime
@@ -64,7 +70,9 @@ Future<ProbeResponse> Client::Probe(
 }
 
 
-Future<CreateVolumeResponse> Client::CreateVolume(
+template <>
+Future<CreateVolumeResponse>
+Client::call<CREATE_VOLUME>(
     const CreateVolumeRequest& request)
 {
   return runtime
@@ -76,7 +84,9 @@ Future<CreateVolumeResponse> Client::CreateVolume(
 }
 
 
-Future<DeleteVolumeResponse> Client::DeleteVolume(
+template <>
+Future<DeleteVolumeResponse>
+Client::call<DELETE_VOLUME>(
     const DeleteVolumeRequest& request)
 {
   return runtime
@@ -88,7 +98,9 @@ Future<DeleteVolumeResponse> Client::DeleteVolume(
 }
 
 
-Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume(
+template <>
+Future<ControllerPublishVolumeResponse>
+Client::call<CONTROLLER_PUBLISH_VOLUME>(
     const ControllerPublishVolumeRequest& request)
 {
   return runtime
@@ -103,7 +115,9 @@ Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume(
 }
 
 
-Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume(
+template <>
+Future<ControllerUnpublishVolumeResponse>
+Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
     const ControllerUnpublishVolumeRequest& request)
 {
   return runtime
@@ -118,7 +132,9 @@ Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume(
 }
 
 
-Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities(
+template <>
+Future<ValidateVolumeCapabilitiesResponse>
+Client::call<VALIDATE_VOLUME_CAPABILITIES>(
     const ValidateVolumeCapabilitiesRequest& request)
 {
   return runtime
@@ -133,7 +149,9 @@ Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities(
 }
 
 
-Future<ListVolumesResponse> Client::ListVolumes(
+template <>
+Future<ListVolumesResponse>
+Client::call<LIST_VOLUMES>(
     const ListVolumesRequest& request)
 {
   return runtime
@@ -145,7 +163,9 @@ Future<ListVolumesResponse> Client::ListVolumes(
 }
 
 
-Future<GetCapacityResponse> Client::GetCapacity(
+template <>
+Future<GetCapacityResponse>
+Client::call<GET_CAPACITY>(
     const GetCapacityRequest& request)
 {
   return runtime
@@ -157,7 +177,9 @@ Future<GetCapacityResponse> Client::GetCapacity(
 }
 
 
-Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
+template <>
+Future<ControllerGetCapabilitiesResponse>
+Client::call<CONTROLLER_GET_CAPABILITIES>(
     const ControllerGetCapabilitiesRequest& request)
 {
   return runtime
@@ -172,7 +194,9 @@ Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
 }
 
 
-Future<NodeStageVolumeResponse> Client::NodeStageVolume(
+template <>
+Future<NodeStageVolumeResponse>
+Client::call<NODE_STAGE_VOLUME>(
     const NodeStageVolumeRequest& request)
 {
   return runtime
@@ -184,7 +208,9 @@ Future<NodeStageVolumeResponse> Client::NodeStageVolume(
 }
 
 
-Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume(
+template <>
+Future<NodeUnstageVolumeResponse>
+Client::call<NODE_UNSTAGE_VOLUME>(
     const NodeUnstageVolumeRequest& request)
 {
   return runtime
@@ -196,7 +222,9 @@ Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume(
 }
 
 
-Future<NodePublishVolumeResponse> Client::NodePublishVolume(
+template <>
+Future<NodePublishVolumeResponse>
+Client::call<NODE_PUBLISH_VOLUME>(
     const NodePublishVolumeRequest& request)
 {
   return runtime
@@ -208,7 +236,9 @@ Future<NodePublishVolumeResponse> Client::NodePublishVolume(
 }
 
 
-Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
+template <>
+Future<NodeUnpublishVolumeResponse>
+Client::call<NODE_UNPUBLISH_VOLUME>(
     const NodeUnpublishVolumeRequest& request)
 {
   return runtime
@@ -220,7 +250,9 @@ Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
 }
 
 
-Future<NodeGetIdResponse> Client::NodeGetId(
+template <>
+Future<NodeGetIdResponse>
+Client::call<NODE_GET_ID>(
     const NodeGetIdRequest& request)
 {
   return runtime
@@ -232,7 +264,9 @@ Future<NodeGetIdResponse> Client::NodeGetId(
 }
 
 
-Future<NodeGetCapabilitiesResponse> Client::NodeGetCapabilities(
+template <>
+Future<NodeGetCapabilitiesResponse>
+Client::call<NODE_GET_CAPABILITIES>(
     const NodeGetCapabilitiesRequest& request)
 {
   return runtime

http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/csi/client.hpp
----------------------------------------------------------------------
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 9d7019a..1c57ac5 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -17,12 +17,12 @@
 #ifndef __CSI_CLIENT_HPP__
 #define __CSI_CLIENT_HPP__
 
-#include <string>
-
 #include <csi/spec.hpp>
 
 #include <process/grpc.hpp>
 
+#include "csi/rpc.hpp"
+
 namespace mesos {
 namespace csi {
 namespace v0 {
@@ -34,65 +34,116 @@ public:
          const process::grpc::client::Runtime& _runtime)
     : connection(_connection), runtime(_runtime) {}
 
-  // RPCs for the Identity service.
-  process::Future<GetPluginInfoResponse>
-    GetPluginInfo(const GetPluginInfoRequest& request);
+  template <RPC rpc>
+  process::Future<typename RPCTraits<rpc>::response_type> call(
+      const typename RPCTraits<rpc>::request_type& request);
 
-  process::Future<GetPluginCapabilitiesResponse>
-    GetPluginCapabilities(const GetPluginCapabilitiesRequest& request);
+private:
+  process::grpc::client::Connection connection;
+  process::grpc::client::Runtime runtime;
+};
 
-  process::Future<ProbeResponse>
-    Probe(const ProbeRequest& request);
 
-  // RPCs for the Controller service.
-  process::Future<CreateVolumeResponse>
-    CreateVolume(const CreateVolumeRequest& request);
+template <>
+process::Future<GetPluginInfoResponse>
+Client::call<GET_PLUGIN_INFO>(
+    const GetPluginInfoRequest& request);
 
-  process::Future<DeleteVolumeResponse>
-    DeleteVolume(const DeleteVolumeRequest& request);
 
-  process::Future<ControllerPublishVolumeResponse>
-    ControllerPublishVolume(const ControllerPublishVolumeRequest& request);
+template <>
+process::Future<GetPluginCapabilitiesResponse>
+Client::call<GET_PLUGIN_CAPABILITIES>(
+    const GetPluginCapabilitiesRequest& request);
 
-  process::Future<ControllerUnpublishVolumeResponse>
-    ControllerUnpublishVolume(const ControllerUnpublishVolumeRequest& request);
 
-  process::Future<ValidateVolumeCapabilitiesResponse>
-    ValidateVolumeCapabilities(
-        const ValidateVolumeCapabilitiesRequest& request);
+template <>
+process::Future<ProbeResponse>
+Client::call<PROBE>(
+    const ProbeRequest& request);
 
-  process::Future<ListVolumesResponse>
-    ListVolumes(const ListVolumesRequest& request);
 
-  process::Future<GetCapacityResponse>
-    GetCapacity(const GetCapacityRequest& request);
+template <>
+process::Future<CreateVolumeResponse>
+Client::call<CREATE_VOLUME>(
+    const CreateVolumeRequest& request);
 
-  process::Future<ControllerGetCapabilitiesResponse>
-    ControllerGetCapabilities(const ControllerGetCapabilitiesRequest& request);
 
-  // RPCs for the Node service.
-  process::Future<NodeStageVolumeResponse>
-    NodeStageVolume(const NodeStageVolumeRequest& request);
+template <>
+process::Future<DeleteVolumeResponse>
+Client::call<DELETE_VOLUME>(
+    const DeleteVolumeRequest& request);
 
-  process::Future<NodeUnstageVolumeResponse>
-    NodeUnstageVolume(const NodeUnstageVolumeRequest& request);
 
-  process::Future<NodePublishVolumeResponse>
-    NodePublishVolume(const NodePublishVolumeRequest& request);
+template <>
+process::Future<ControllerPublishVolumeResponse>
+Client::call<CONTROLLER_PUBLISH_VOLUME>(
+    const ControllerPublishVolumeRequest& request);
 
-  process::Future<NodeUnpublishVolumeResponse>
-    NodeUnpublishVolume(const NodeUnpublishVolumeRequest& request);
 
-  process::Future<NodeGetIdResponse>
-    NodeGetId(const NodeGetIdRequest& request);
+template <>
+process::Future<ControllerUnpublishVolumeResponse>
+Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
+    const ControllerUnpublishVolumeRequest& request);
 
-  process::Future<NodeGetCapabilitiesResponse>
-    NodeGetCapabilities(const NodeGetCapabilitiesRequest& request);
 
-private:
-  process::grpc::client::Connection connection;
-  process::grpc::client::Runtime runtime;
-};
+template <>
+process::Future<ValidateVolumeCapabilitiesResponse>
+Client::call<VALIDATE_VOLUME_CAPABILITIES>(
+    const ValidateVolumeCapabilitiesRequest& request);
+
+
+template <>
+process::Future<ListVolumesResponse>
+Client::call<LIST_VOLUMES>(
+    const ListVolumesRequest& request);
+
+
+template <>
+process::Future<GetCapacityResponse>
+Client::call<GET_CAPACITY>(
+    const GetCapacityRequest& request);
+
+
+template <>
+process::Future<ControllerGetCapabilitiesResponse>
+Client::call<CONTROLLER_GET_CAPABILITIES>(
+    const ControllerGetCapabilitiesRequest& request);
+
+
+template <>
+process::Future<NodeStageVolumeResponse>
+Client::call<NODE_STAGE_VOLUME>(
+    const NodeStageVolumeRequest& request);
+
+
+template <>
+process::Future<NodeUnstageVolumeResponse>
+Client::call<NODE_UNSTAGE_VOLUME>(
+    const NodeUnstageVolumeRequest& request);
+
+
+template <>
+process::Future<NodePublishVolumeResponse>
+Client::call<NODE_PUBLISH_VOLUME>(
+    const NodePublishVolumeRequest& request);
+
+
+template <>
+process::Future<NodeUnpublishVolumeResponse>
+Client::call<NODE_UNPUBLISH_VOLUME>(
+    const NodeUnpublishVolumeRequest& request);
+
+
+template <>
+process::Future<NodeGetIdResponse>
+Client::call<NODE_GET_ID>(
+    const NodeGetIdRequest& request);
+
+
+template <>
+process::Future<NodeGetCapabilitiesResponse>
+Client::call<NODE_GET_CAPABILITIES>(
+    const NodeGetCapabilitiesRequest& request);
 
 } // namespace v0 {
 } // namespace csi {

http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 8a4b037..333336e 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -59,6 +59,7 @@
 
 #include "csi/client.hpp"
 #include "csi/paths.hpp"
+#include "csi/rpc.hpp"
 #include "csi/state.hpp"
 #include "csi/utils.hpp"
 
@@ -375,6 +376,11 @@ private:
   void reconcileOperations(
       const Event::ReconcileOperations& reconcile);
 
+  template <csi::v0::RPC rpc>
+  Future<typename csi::v0::RPCTraits<rpc>::response_type> call(
+      csi::v0::Client client,
+      const typename csi::v0::RPCTraits<rpc>::request_type& request);
+
   Future<csi::v0::Client> connect(const string& endpoint);
   Future<csi::v0::Client> getService(const ContainerID& containerId);
   Future<Nothing> killService(const ContainerID& containerId);
@@ -498,6 +504,10 @@ private:
 
     // CSI plugin metrics.
     Counter csi_plugin_container_terminations;
+    hashmap<csi::v0::RPC, PushGauge> csi_plugin_rpcs_pending;
+    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_successes;
+    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_errors;
+    hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_cancelled;
 
     // Operation state metrics.
     hashmap<Offer::Operation::Type, PushGauge> operations_pending;
@@ -1752,6 +1762,29 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
 }
 
 
+template <csi::v0::RPC rpc>
+Future<typename csi::v0::RPCTraits<rpc>::response_type>
+StorageLocalResourceProviderProcess::call(
+    csi::v0::Client client,
+    const typename csi::v0::RPCTraits<rpc>::request_type& request)
+{
+  ++metrics.csi_plugin_rpcs_pending.at(rpc);
+
+  return client.call<rpc>(request)
+    .onAny(defer(self(), [=](
+        const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) {
+      --metrics.csi_plugin_rpcs_pending.at(rpc);
+      if (future.isReady()) {
+        ++metrics.csi_plugin_rpcs_successes.at(rpc);
+      } else if (future.isFailed()) {
+        ++metrics.csi_plugin_rpcs_errors.at(rpc);
+      } else {
+        ++metrics.csi_plugin_rpcs_cancelled.at(rpc);
+      }
+    }));
+}
+
+
 // Returns a future of a CSI client that waits for the endpoint socket
 // to appear if necessary, then connects to the socket and check its
 // readiness.
@@ -1786,7 +1819,7 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
 
   return future
     .then(defer(self(), [=](csi::v0::Client client) {
-      return client.Probe(csi::v0::ProbeRequest())
+      return call<csi::v0::PROBE>(client, csi::v0::ProbeRequest())
         .then(defer(self(), [=](const csi::v0::ProbeResponse& response) {
           return client;
         }));
@@ -2011,7 +2044,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
   return getService(nodeContainerId.get())
     .then(defer(self(), [=](csi::v0::Client client) {
       // Get the plugin info.
-      return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+      return call<csi::v0::GET_PLUGIN_INFO>(
+          client, csi::v0::GetPluginInfoRequest())
         .then(defer(self(), [=](
             const csi::v0::GetPluginInfoResponse& response) {
           pluginInfo = response;
@@ -2024,8 +2058,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
     }))
     .then(defer(self(), [=](csi::v0::Client client) {
       // Get the plugin capabilities.
-      return client.GetPluginCapabilities(
-          csi::v0::GetPluginCapabilitiesRequest())
+      return call<csi::v0::GET_PLUGIN_CAPABILITIES>(
+          client, csi::v0::GetPluginCapabilitiesRequest())
         .then(defer(self(), [=](
             const csi::v0::GetPluginCapabilitiesResponse& response) {
           pluginCapabilities = response.capabilities();
@@ -2053,7 +2087,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
   return getService(controllerContainerId.get())
     .then(defer(self(), [=](csi::v0::Client client) {
       // Get the controller plugin info and check for consistency.
-      return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+      return call<csi::v0::GET_PLUGIN_INFO>(
+          client, csi::v0::GetPluginInfoRequest())
         .then(defer(self(), [=](
             const csi::v0::GetPluginInfoResponse& response) {
           LOG(INFO) << "Controller plugin loaded: " << stringify(response);
@@ -2071,8 +2106,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
     }))
     .then(defer(self(), [=](csi::v0::Client client) {
       // Get the controller capabilities.
-      return client.ControllerGetCapabilities(
-          csi::v0::ControllerGetCapabilitiesRequest())
+      return call<csi::v0::CONTROLLER_GET_CAPABILITIES>(
+          client, csi::v0::ControllerGetCapabilitiesRequest())
         .then(defer(self(), [=](
             const csi::v0::ControllerGetCapabilitiesResponse& response) {
           controllerCapabilities = response.capabilities();
@@ -2092,7 +2127,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
   return getService(nodeContainerId.get())
     .then(defer(self(), [=](csi::v0::Client client) {
       // Get the node capabilities.
-      return client.NodeGetCapabilities(csi::v0::NodeGetCapabilitiesRequest())
+      return call<csi::v0::NODE_GET_CAPABILITIES>(
+          client, csi::v0::NodeGetCapabilitiesRequest())
         .then(defer(self(), [=](
             const csi::v0::NodeGetCapabilitiesResponse& response)
             -> Future<csi::v0::Client> {
@@ -2107,7 +2143,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
           }
 
           // Get the node ID.
-          return client.NodeGetId(csi::v0::NodeGetIdRequest())
+          return call<csi::v0::NODE_GET_ID>(client, csi::v0::NodeGetIdRequest())
             .then(defer(self(), [=](
                 const csi::v0::NodeGetIdResponse& response) {
               nodeId = response.node_id();
@@ -2161,7 +2197,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
       request.set_readonly(false);
       *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-      return client.ControllerPublishVolume(request)
+      return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>(
+          client, std::move(request))
         .then(defer(self(), [this, volumeId](
             const csi::v0::ControllerPublishVolumeResponse& response) {
           VolumeData& volume = volumes.at(volumeId);
@@ -2217,7 +2254,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
       request.set_volume_id(volumeId);
       request.set_node_id(nodeId.get());
 
-      return client.ControllerUnpublishVolume(request)
+      return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(
+          client, std::move(request))
         .then(defer(self(), [this, volumeId] {
           VolumeData& volume = volumes.at(volumeId);
 
@@ -2286,7 +2324,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage(
         ->CopyFrom(volume.state.volume_capability());
       *request.mutable_volume_attributes() = volume.state.volume_attributes();
 
-      return client.NodeStageVolume(request)
+      return call<csi::v0::NODE_STAGE_VOLUME>(client, std::move(request))
         .then(defer(self(), [this, volumeId] {
           VolumeData& volume = volumes.at(volumeId);
 
@@ -2349,7 +2387,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage(
       request.set_volume_id(volumeId);
       request.set_staging_target_path(stagingPath);
 
-      return client.NodeUnstageVolume(request)
+      return call<csi::v0::NODE_UNSTAGE_VOLUME>(client, std::move(request))
         .then(defer(self(), [this, volumeId] {
           VolumeData& volume = volumes.at(volumeId);
 
@@ -2420,7 +2458,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
         request.set_staging_target_path(stagingPath);
       }
 
-      return client.NodePublishVolume(request)
+      return call<csi::v0::NODE_PUBLISH_VOLUME>(client, std::move(request))
         .then(defer(self(), [this, volumeId] {
           VolumeData& volume = volumes.at(volumeId);
 
@@ -2470,7 +2508,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
       request.set_volume_id(volumeId);
       request.set_target_path(targetPath);
 
-      return client.NodeUnpublishVolume(request)
+      return call<csi::v0::NODE_UNPUBLISH_VOLUME>(client, std::move(request))
         .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> {
           VolumeData& volume = volumes.at(volumeId);
 
@@ -2515,7 +2553,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
       request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
       *request.mutable_parameters() = profileInfo.parameters;
 
-      return client.CreateVolume(request)
+      return call<csi::v0::CREATE_VOLUME>(client, std::move(request))
         .then(defer(self(), [=](const csi::v0::CreateVolumeResponse& response) {
           const csi::v0::Volume& volume = response.volume();
 
@@ -2609,11 +2647,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       if (!preExisting) {
         deleted = deleted
           .then(defer(self(), &Self::getService, controllerContainerId.get()))
-          .then(defer(self(), [volumeId](csi::v0::Client client) {
+          .then(defer(self(), [this, volumeId](csi::v0::Client client) {
             csi::v0::DeleteVolumeRequest request;
             request.set_volume_id(volumeId);
 
-            return client.DeleteVolume(request)
+            return call<csi::v0::DELETE_VOLUME>(client, std::move(request))
               .then([] { return Nothing(); });
           }));
       }
@@ -2681,7 +2719,8 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
       request.add_volume_capabilities()->CopyFrom(capability);
       *request.mutable_volume_attributes() = volumeAttributes;
 
-      return client.ValidateVolumeCapabilities(request)
+      return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(
+          client, std::move(request))
         .then(defer(self(), [=](
             const csi::v0::ValidateVolumeCapabilitiesResponse& response)
             -> Future<string> {
@@ -2722,7 +2761,7 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
     .then(defer(self(), [=](csi::v0::Client client) {
       // TODO(chhsiao): Set the max entries and use a loop to do
       // multiple `ListVolumes` calls.
-      return client.ListVolumes(csi::v0::ListVolumesRequest())
+      return call<csi::v0::LIST_VOLUMES>(client, csi::v0::ListVolumesRequest())
         .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
           Resources resources;
 
@@ -2785,7 +2824,8 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
         request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
         *request.mutable_parameters() = profileInfo.parameters;
 
-        futures.push_back(client.GetCapacity(request)
+        futures.push_back(call<csi::v0::GET_CAPACITY>(
+            client, std::move(request))
           .then(defer(self(), [=](
               const csi::v0::GetCapacityResponse& response) -> Resources {
             if (response.available_capacity() == 0) {
@@ -3433,6 +3473,66 @@ StorageLocalResourceProviderProcess::Metrics::Metrics(const string& prefix)
 {
   process::metrics::add(csi_plugin_container_terminations);
 
+  vector<csi::v0::RPC> rpcs;
+
+  // NOTE: We use a switch statement here as a compile-time sanity check so we
+  // won't forget to add metrics for new RPCs in the future.
+  csi::v0::RPC firstRpc = csi::v0::GET_PLUGIN_INFO;
+  switch (firstRpc) {
+    case csi::v0::GET_PLUGIN_INFO:
+      rpcs.push_back(csi::v0::GET_PLUGIN_INFO);
+    case csi::v0::GET_PLUGIN_CAPABILITIES:
+      rpcs.push_back(csi::v0::GET_PLUGIN_CAPABILITIES);
+    case csi::v0::PROBE:
+      rpcs.push_back(csi::v0::PROBE);
+    case csi::v0::CREATE_VOLUME:
+      rpcs.push_back(csi::v0::CREATE_VOLUME);
+    case csi::v0::DELETE_VOLUME:
+      rpcs.push_back(csi::v0::DELETE_VOLUME);
+    case csi::v0::CONTROLLER_PUBLISH_VOLUME:
+      rpcs.push_back(csi::v0::CONTROLLER_PUBLISH_VOLUME);
+    case csi::v0::CONTROLLER_UNPUBLISH_VOLUME:
+      rpcs.push_back(csi::v0::CONTROLLER_UNPUBLISH_VOLUME);
+    case csi::v0::VALIDATE_VOLUME_CAPABILITIES:
+      rpcs.push_back(csi::v0::VALIDATE_VOLUME_CAPABILITIES);
+    case csi::v0::LIST_VOLUMES:
+      rpcs.push_back(csi::v0::LIST_VOLUMES);
+    case csi::v0::GET_CAPACITY:
+      rpcs.push_back(csi::v0::GET_CAPACITY);
+    case csi::v0::CONTROLLER_GET_CAPABILITIES:
+      rpcs.push_back(csi::v0::CONTROLLER_GET_CAPABILITIES);
+    case csi::v0::NODE_STAGE_VOLUME:
+      rpcs.push_back(csi::v0::NODE_STAGE_VOLUME);
+    case csi::v0::NODE_UNSTAGE_VOLUME:
+      rpcs.push_back(csi::v0::NODE_UNSTAGE_VOLUME);
+    case csi::v0::NODE_PUBLISH_VOLUME:
+      rpcs.push_back(csi::v0::NODE_PUBLISH_VOLUME);
+    case csi::v0::NODE_UNPUBLISH_VOLUME:
+      rpcs.push_back(csi::v0::NODE_UNPUBLISH_VOLUME);
+    case csi::v0::NODE_GET_ID:
+      rpcs.push_back(csi::v0::NODE_GET_ID);
+    case csi::v0::NODE_GET_CAPABILITIES:
+      rpcs.push_back(csi::v0::NODE_GET_CAPABILITIES);
+  }
+
+  foreach (const csi::v0::RPC& rpc, rpcs) {
+    const string name = stringify(rpc);
+
+    csi_plugin_rpcs_pending.put(
+        rpc, PushGauge(prefix + "csi_plugin/rpcs/" + name + "/pending"));
+    csi_plugin_rpcs_successes.put(
+        rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/successes"));
+    csi_plugin_rpcs_errors.put(
+        rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/errors"));
+    csi_plugin_rpcs_cancelled.put(
+        rpc, Counter(prefix + "csi_plugin/rpcs/" + name + "/cancelled"));
+
+    process::metrics::add(csi_plugin_rpcs_pending.at(rpc));
+    process::metrics::add(csi_plugin_rpcs_successes.at(rpc));
+    process::metrics::add(csi_plugin_rpcs_errors.at(rpc));
+    process::metrics::add(csi_plugin_rpcs_cancelled.at(rpc));
+  }
+
   vector<Offer::Operation::Type> operationTypes;
 
   // NOTE: We use a switch statement here as a compile-time sanity check so we
@@ -3499,6 +3599,22 @@ StorageLocalResourceProviderProcess::Metrics::~Metrics()
 {
   process::metrics::remove(csi_plugin_container_terminations);
 
+  foreachvalue (const PushGauge& gauge, csi_plugin_rpcs_pending) {
+    process::metrics::remove(gauge);
+  }
+
+  foreachvalue (const Counter& counter, csi_plugin_rpcs_successes) {
+    process::metrics::remove(counter);
+  }
+
+  foreachvalue (const Counter& counter, csi_plugin_rpcs_errors) {
+    process::metrics::remove(counter);
+  }
+
+  foreachvalue (const Counter& counter, csi_plugin_rpcs_cancelled) {
+    process::metrics::remove(counter);
+  }
+
   foreachvalue (const PushGauge& gauge, operations_pending) {
     process::metrics::remove(gauge);
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/15fc86d2/src/tests/csi_client_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index d5993d6..39dea56 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -14,22 +14,25 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <functional>
+#include <ostream>
+
 #include <process/gtest.hpp>
 
-#include <stout/lambda.hpp>
-#include <stout/path.hpp>
+#include <stout/nothing.hpp>
 #include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
 
 #include <stout/tests/utils.hpp>
 
 #include "csi/client.hpp"
+#include "csi/rpc.hpp"
 
 #include "tests/mock_csi_plugin.hpp"
 
+using std::ostream;
 using std::string;
 
-using mesos::csi::v0::Client;
-
 using process::Future;
 
 using process::grpc::client::Connection;
@@ -47,22 +50,27 @@ struct RPCParam
 {
   struct Printer
   {
-    const string& operator()(const TestParamInfo<RPCParam>& info) const
+    string operator()(const TestParamInfo<RPCParam>& info) const
     {
-      return info.param.name;
+      return strings::replace(stringify(info.param.value), ".", "_");
     }
   };
 
-  template <typename Request, typename Response>
-  RPCParam(const string& _name, Future<Response>(Client::*rpc)(const Request&))
-    : name(_name),
-      call([=](const Connection& connection, const Runtime runtime) {
-        return (Client(connection, runtime).*rpc)(Request())
+  template <csi::v0::RPC rpc>
+  static RPCParam create()
+  {
+    return RPCParam{
+      rpc,
+      [](csi::v0::Client client) {
+        return client
+          .call<rpc>(typename csi::v0::RPCTraits<rpc>::request_type())
           .then([] { return Nothing(); });
-      }) {}
+      }
+    };
+  }
 
-  string name;
-  lambda::function<Future<Nothing>(const Connection&, const Runtime&)> call;
+  const csi::v0::RPC value;
+  const std::function<Future<Nothing>(csi::v0::Client)> call;
 };
 
 
@@ -95,51 +103,49 @@ protected:
 };
 
 
-#define RPC_PARAM(method) \
-  RPCParam(strings::replace(#method, "::", "_"), &method)
-
-
 INSTANTIATE_TEST_CASE_P(
     Identity,
     CSIClientTest,
     Values(
-        RPC_PARAM(Client::GetPluginInfo),
-        RPC_PARAM(Client::GetPluginCapabilities),
-        RPC_PARAM(Client::Probe)),
+        RPCParam::create<csi::v0::GET_PLUGIN_INFO>(),
+        RPCParam::create<csi::v0::GET_PLUGIN_CAPABILITIES>(),
+        RPCParam::create<csi::v0::PROBE>()),
     RPCParam::Printer());
 
+
 INSTANTIATE_TEST_CASE_P(
     Controller,
     CSIClientTest,
     Values(
-        RPC_PARAM(Client::CreateVolume),
-        RPC_PARAM(Client::DeleteVolume),
-        RPC_PARAM(Client::ControllerPublishVolume),
-        RPC_PARAM(Client::ControllerUnpublishVolume),
-        RPC_PARAM(Client::ValidateVolumeCapabilities),
-        RPC_PARAM(Client::ListVolumes),
-        RPC_PARAM(Client::GetCapacity),
-        RPC_PARAM(Client::ControllerGetCapabilities)),
+        RPCParam::create<csi::v0::CREATE_VOLUME>(),
+        RPCParam::create<csi::v0::DELETE_VOLUME>(),
+        RPCParam::create<csi::v0::CONTROLLER_PUBLISH_VOLUME>(),
+        RPCParam::create<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(),
+        RPCParam::create<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(),
+        RPCParam::create<csi::v0::LIST_VOLUMES>(),
+        RPCParam::create<csi::v0::GET_CAPACITY>(),
+        RPCParam::create<csi::v0::CONTROLLER_GET_CAPABILITIES>()),
     RPCParam::Printer());
 
+
 INSTANTIATE_TEST_CASE_P(
     Node,
     CSIClientTest,
     Values(
-        RPC_PARAM(Client::NodeStageVolume),
-        RPC_PARAM(Client::NodeUnstageVolume),
-        RPC_PARAM(Client::NodePublishVolume),
-        RPC_PARAM(Client::NodeUnpublishVolume),
-        RPC_PARAM(Client::NodeGetId),
-        RPC_PARAM(Client::NodeGetCapabilities)),
+        RPCParam::create<csi::v0::NODE_STAGE_VOLUME>(),
+        RPCParam::create<csi::v0::NODE_UNSTAGE_VOLUME>(),
+        RPCParam::create<csi::v0::NODE_PUBLISH_VOLUME>(),
+        RPCParam::create<csi::v0::NODE_UNPUBLISH_VOLUME>(),
+        RPCParam::create<csi::v0::NODE_GET_ID>(),
+        RPCParam::create<csi::v0::NODE_GET_CAPABILITIES>()),
     RPCParam::Printer());
 
 
 // This test verifies that the all methods of CSI clients work.
 TEST_P(CSIClientTest, Call)
 {
-  Future<Nothing> call = GetParam().call(connection.get(), runtime);
-  AWAIT_EXPECT_READY(call);
+  AWAIT_EXPECT_READY(
+      GetParam().call(csi::v0::Client(connection.get(), runtime)));
 }
 
 } // namespace tests {