You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/04/12 21:02:43 UTC

[5/6] mesos git commit: Adapted storage local resource provider to use CSI v0.2.

Adapted storage local resource provider to use CSI v0.2.

This patch contains necessary changes for the storage local resource
provider to use CSI v0.2. Support for the `STAGE_UNSTAGE_VOLUME` CSI
node service capability is not implemented in this patch yet.

Review: https://reviews.apache.org/r/66410/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aeffcd7d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aeffcd7d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aeffcd7d

Branch: refs/heads/master
Commit: aeffcd7d9a1f9c97e5e347063ceab71c43c00e2d
Parents: 6dfd259
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 12:07:19 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 14:01:52 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 512 ++++++++++++------------
 1 file changed, 257 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/aeffcd7d/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a07620d..40544e0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -366,10 +366,11 @@ private:
   void reconcileOperations(
       const Event::ReconcileOperations& reconcile);
 
-  Future<csi::Client> connect(const string& endpoint);
-  Future<csi::Client> getService(const ContainerID& containerId);
+  Future<csi::v0::Client> connect(const string& endpoint);
+  Future<csi::v0::Client> getService(const ContainerID& containerId);
   Future<Nothing> killService(const ContainerID& containerId);
 
+  Future<Nothing> prepareIdentityService();
   Future<Nothing> prepareControllerService();
   Future<Nothing> prepareNodeService();
   Future<Nothing> controllerPublish(const string& volumeId);
@@ -384,7 +385,7 @@ private:
   Future<string> validateCapability(
       const string& volumeId,
       const Option<Labels>& metadata,
-      const csi::VolumeCapability& capability);
+      const csi::v0::VolumeCapability& capability);
   Future<Resources> listVolumes();
   Future<Resources> getCapacities();
 
@@ -436,9 +437,8 @@ private:
 
   shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
-  csi::Version csiVersion;
-  csi::VolumeCapability defaultMountCapability;
-  csi::VolumeCapability defaultBlockCapability;
+  csi::v0::VolumeCapability defaultMountCapability;
+  csi::v0::VolumeCapability defaultBlockCapability;
   string bootId;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
@@ -453,14 +453,15 @@ private:
   // True if a reconciliation of storage pools is happening.
   bool reconciling;
 
-  ContainerID controllerContainerId;
-  ContainerID nodeContainerId;
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
-  hashmap<ContainerID, Owned<Promise<csi::Client>>> services;
-
-  Option<csi::GetPluginInfoResponse> controllerInfo;
-  Option<csi::GetPluginInfoResponse> nodeInfo;
-  Option<csi::ControllerCapabilities> controllerCapabilities;
+  hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services;
+
+  Option<ContainerID> nodeContainerId;
+  Option<ContainerID> controllerContainerId;
+  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
+  csi::v0::PluginCapabilities pluginCapabilities;
+  csi::v0::ControllerCapabilities controllerCapabilities;
+  csi::v0::NodeCapabilities nodeCapabilities;
   Option<string> nodeId;
 
   // We maintain the following invariant: if one operation depends on
@@ -554,18 +555,13 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
-  // Set CSI version to 0.1.0.
-  csiVersion.set_major(0);
-  csiVersion.set_minor(1);
-  csiVersion.set_patch(0);
-
   // Default mount and block capabilities for pre-existing volumes.
   defaultMountCapability.mutable_mount();
   defaultMountCapability.mutable_access_mode()
-    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
   defaultBlockCapability.mutable_block();
   defaultBlockCapability.mutable_access_mode()
-    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
 
   Try<string> _bootId = os::bootId();
   if (_bootId.isError()) {
@@ -577,24 +573,24 @@ void StorageLocalResourceProviderProcess::initialize()
 
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    auto it = find(
-        container.services().begin(),
-        container.services().end(),
-        CSIPluginContainerInfo::CONTROLLER_SERVICE);
-    if (it != container.services().end()) {
-      controllerContainerId = getContainerId(info, container);
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::NODE_SERVICE)) {
+      nodeContainerId = getContainerId(info, container);
       break;
     }
   }
 
+  CHECK_SOME(nodeContainerId);
+
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    auto it = find(
-        container.services().begin(),
-        container.services().end(),
-        CSIPluginContainerInfo::NODE_SERVICE);
-    if (it != container.services().end()) {
-      nodeContainerId = getContainerId(info, container);
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::CONTROLLER_SERVICE)) {
+      controllerContainerId = getContainerId(info, container);
       break;
     }
   }
@@ -720,10 +716,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
 
     const ContainerID& containerId = containerPath->containerId;
 
+    CHECK_SOME(nodeContainerId);
+
     // Do not kill the up-to-date controller or node container.
     // Otherwise, kill them and perform cleanups.
-    if (containerId == controllerContainerId ||
-        containerId == nodeContainerId) {
+    if (nodeContainerId == containerId ||
+        controllerContainerId == containerId) {
       const string configPath = csi::paths::getContainerInfoPath(
           slave::paths::getCsiRootDir(workDir),
           info.storage().plugin().type(),
@@ -776,11 +774,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
       })));
   }
 
-  // NOTE: The `GetNodeID` CSI call is only supported if the plugin has
-  // the `PUBLISH_UNPUBLISH_VOLUME` controller capability. So to decide
-  // if `GetNodeID` should be called in `prepareNodeService`, we need to
-  // run `prepareControllerService` first.
+  // NOTE: The `Controller` service is supported if the plugin has the
+  // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is
+  // supported if the `Controller` service has the
+  // `PUBLISH_UNPUBLISH_VOLUME` capability. Therefore, we first launch
+  // the node plugin to get the plugin capabilities, then decide if we
+  // need to launch the controller plugin and get the node ID.
   return collect(futures)
+    .then(defer(self(), &Self::prepareIdentityService))
     .then(defer(self(), &Self::prepareControllerService))
     .then(defer(self(), &Self::prepareNodeService));
 }
@@ -1672,19 +1673,19 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
 
 // Returns a future of a CSI client that waits for the endpoint socket
 // to appear if necessary, then connects to the socket and check its
-// supported version.
-Future<csi::Client> StorageLocalResourceProviderProcess::connect(
+// readiness.
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
     const string& endpoint)
 {
-  Future<csi::Client> client;
+  Future<csi::v0::Client> future;
 
   if (os::exists(endpoint)) {
-    client = csi::Client("unix://" + endpoint, runtime);
+    future = csi::v0::Client("unix://" + endpoint, runtime);
   } else {
     // Wait for the endpoint socket to appear until the timeout expires.
     Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
 
-    client = loop(
+    future = loop(
         self(),
         [=]() -> Future<Nothing> {
           if (timeout.expired()) {
@@ -1693,30 +1694,19 @@ Future<csi::Client> StorageLocalResourceProviderProcess::connect(
 
           return after(Milliseconds(10));
         },
-        [=](const Nothing&) -> ControlFlow<csi::Client> {
+        [=](const Nothing&) -> ControlFlow<csi::v0::Client> {
           if (os::exists(endpoint)) {
-            return Break(csi::Client("unix://" + endpoint, runtime));
+            return Break(csi::v0::Client("unix://" + endpoint, runtime));
           }
 
           return Continue();
         });
   }
 
-  return client
-    .then(defer(self(), [=](csi::Client client) {
-      return client.GetSupportedVersions(csi::GetSupportedVersionsRequest())
-        .then(defer(self(), [=](
-            const csi::GetSupportedVersionsResponse& response)
-            -> Future<csi::Client> {
-          auto it = find(
-              response.supported_versions().begin(),
-              response.supported_versions().end(),
-              csiVersion);
-          if (it == response.supported_versions().end()) {
-            return Failure(
-                "CSI version " + stringify(csiVersion) + " is not supported");
-          }
-
+  return future
+    .then(defer(self(), [=](csi::v0::Client client) {
+      return client.Probe(csi::v0::ProbeRequest())
+        .then(defer(self(), [=](const csi::v0::ProbeResponse& response) {
           return client;
         }));
     }));
@@ -1726,7 +1716,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::connect(
 // Returns a future of the latest CSI client for the specified plugin
 // container. If the container is not already running, this method will
 // start a new a new container daemon.
-Future<csi::Client> StorageLocalResourceProviderProcess::getService(
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
     const ContainerID& containerId)
 {
   if (daemons.contains(containerId)) {
@@ -1802,7 +1792,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService(
     ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
 
   CHECK(!services.contains(containerId));
-  services[containerId].reset(new Promise<csi::Client>());
+  services[containerId].reset(new Promise<csi::v0::Client>());
 
   Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
       extractParentEndpoint(url),
@@ -1815,7 +1805,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService(
         CHECK(services.at(containerId)->future().isPending());
 
         return connect(endpointPath)
-          .then(defer(self(), [=](const csi::Client& client) {
+          .then(defer(self(), [=](const csi::v0::Client& client) {
             services.at(containerId)->set(client);
             return Nothing();
           }))
@@ -1827,16 +1817,16 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService(
           }));
       })),
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
-        if (containerId == controllerContainerId) {
+        if (containerId == controllerContainerId.get()) {
           metrics.csi_controller_plugin_terminations++;
         }
 
-        if (containerId == nodeContainerId) {
+        if (containerId == nodeContainerId.get()) {
           metrics.csi_node_plugin_terminations++;
         }
 
         services.at(containerId)->discard();
-        services.at(containerId).reset(new Promise<csi::Client>());
+        services.at(containerId).reset(new Promise<csi::v0::Client>());
 
         if (os::exists(endpointPath)) {
           Try<Nothing> rm = os::rm(endpointPath);
@@ -1940,54 +1930,31 @@ Future<Nothing> StorageLocalResourceProviderProcess::killService(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
+Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
 {
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the plugin info and check for consistency.
-      csi::GetPluginInfoRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.GetPluginInfo(request)
-        .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) {
-          controllerInfo = response;
+  CHECK_SOME(nodeContainerId);
 
-          LOG(INFO)
-            << "Controller plugin loaded: " << stringify(controllerInfo.get());
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the plugin info.
+      return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+        .then(defer(self(), [=](
+            const csi::v0::GetPluginInfoResponse& response) {
+          pluginInfo = response;
 
-          if (nodeInfo.isSome() &&
-              (controllerInfo->name() != nodeInfo->name() ||
-               controllerInfo->vendor_version() !=
-                 nodeInfo->vendor_version())) {
-            LOG(WARNING)
-              << "Inconsistent controller and node plugin components. Please "
-                 "check with the plugin vendor to ensure compatibility.";
-          }
+          LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
 
-          // NOTE: We always get the latest service future before
-          // proceeding to the next step.
-          return getService(controllerContainerId);
+          // Get the latest service future before proceeding to the next step.
+          return getService(nodeContainerId.get());
         }));
     }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Probe the plugin to validate the runtime environment.
-      csi::ControllerProbeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ControllerProbe(request)
-        .then(defer(self(), [=](const csi::ControllerProbeResponse& response) {
-          return getService(controllerContainerId);
-        }));
-    }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the controller capabilities.
-      csi::ControllerGetCapabilitiesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ControllerGetCapabilities(request)
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the plugin capabilities.
+      return client.GetPluginCapabilities(
+          csi::v0::GetPluginCapabilitiesRequest())
         .then(defer(self(), [=](
-            const csi::ControllerGetCapabilitiesResponse& response) {
-          controllerCapabilities = response.capabilities();
+            const csi::v0::GetPluginCapabilitiesResponse& response) {
+          pluginCapabilities = response.capabilities();
 
           return Nothing();
         }));
@@ -1995,73 +1962,101 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
+// NOTE: This can only be called after `prepareIdentityService`.
+Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 {
-  // NOTE: This can only be called after `prepareControllerService`.
-  CHECK_SOME(controllerCapabilities);
+  CHECK_SOME(pluginInfo);
 
-  return getService(nodeContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the plugin info and check for consistency.
-      csi::GetPluginInfoRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  if (!pluginCapabilities.controllerService) {
+    return Nothing();
+  }
 
-      return client.GetPluginInfo(request)
-        .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) {
-          nodeInfo = response;
+  if (controllerContainerId.isNone()) {
+    return Failure(
+        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
+  }
 
-          LOG(INFO) << "Node plugin loaded: " << stringify(nodeInfo.get());
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the controller plugin info and check for consistency.
+      return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+        .then(defer(self(), [=](
+            const csi::v0::GetPluginInfoResponse& response) {
+          LOG(INFO) << "Controller plugin loaded: " << stringify(response);
 
-          if (controllerInfo.isSome() &&
-              (controllerInfo->name() != nodeInfo->name() ||
-               controllerInfo->vendor_version() !=
-                 nodeInfo->vendor_version())) {
+          if (pluginInfo->name() != response.name() ||
+              pluginInfo->vendor_version() != response.vendor_version()) {
             LOG(WARNING)
               << "Inconsistent controller and node plugin components. Please "
                  "check with the plugin vendor to ensure compatibility.";
           }
 
-          // NOTE: We always get the latest service future before
-          // proceeding to the next step.
-          return getService(nodeContainerId);
+          // Get the latest service future before proceeding to the next step.
+          return getService(controllerContainerId.get());
         }));
     }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Probe the plugin to validate the runtime environment.
-      csi::NodeProbeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.NodeProbe(request)
-        .then(defer(self(), [=](const csi::NodeProbeResponse& response) {
-          return getService(nodeContainerId);
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the controller capabilities.
+      return client.ControllerGetCapabilities(
+          csi::v0::ControllerGetCapabilitiesRequest())
+        .then(defer(self(), [=](
+            const csi::v0::ControllerGetCapabilitiesResponse& response) {
+          controllerCapabilities = response.capabilities();
+
+          return Nothing();
         }));
-    }))
-    .then(defer(self(), [=](csi::Client client) -> Future<Nothing> {
-      if (!controllerCapabilities->publishUnpublishVolume) {
-        return Nothing();
-      }
+    }));
+}
 
-      // Get the node ID.
-      csi::GetNodeIDRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
 
-      return client.GetNodeID(request)
-        .then(defer(self(), [=](const csi::GetNodeIDResponse& response) {
-          nodeId = response.node_id();
+// NOTE: This can only be called after `prepareIdentityService` and
+// `prepareControllerService`.
+Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
+{
+  CHECK_SOME(nodeContainerId);
 
-          return Nothing();
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the node capabilities.
+      return client.NodeGetCapabilities(csi::v0::NodeGetCapabilitiesRequest())
+        .then(defer(self(), [=](
+            const csi::v0::NodeGetCapabilitiesResponse& response)
+            -> Future<csi::v0::Client> {
+          nodeCapabilities = response.capabilities();
+
+          // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+          if (nodeCapabilities.stageUnstageVolume) {
+            return Failure(
+                "Node capability 'STAGE_UNSTAGE_VOLUME' is not supported");
+          }
+
+          // Get the latest service future before proceeding to the next step.
+          return getService(nodeContainerId.get());
+        }))
+        .then(defer(self(), [=](csi::v0::Client client) -> Future<Nothing> {
+          if (!controllerCapabilities.publishUnpublishVolume) {
+            return Nothing();
+          }
+
+          // Get the node ID.
+          return client.NodeGetId(csi::v0::NodeGetIdRequest())
+            .then(defer(self(), [=](
+                const csi::v0::NodeGetIdResponse& response) {
+              nodeId = response.node_id();
+
+              return Nothing();
+            }));
         }));
     }));
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     const string& volumeId)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService`.
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
@@ -2080,11 +2075,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 
   Future<Nothing> controllerPublished;
 
-  if (controllerCapabilities->publishUnpublishVolume) {
-    controllerPublished = getService(controllerContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::ControllerPublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+  if (controllerCapabilities.publishUnpublishVolume) {
+    CHECK_SOME(controllerContainerId);
+
+    controllerPublished = getService(controllerContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::ControllerPublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_node_id(nodeId.get());
         request.mutable_volume_capability()
@@ -2095,9 +2091,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 
         return client.ControllerPublishVolume(request)
           .then(defer(self(), [=](
-              const csi::ControllerPublishVolumeResponse& response) {
-            *volumes.at(volumeId).state.mutable_publish_volume_info() =
-              response.publish_volume_info();
+              const csi::v0::ControllerPublishVolumeResponse& response) {
+            *volumes.at(volumeId).state.mutable_publish_info() =
+              response.publish_info();
 
             return Nothing();
           }));
@@ -2122,13 +2118,13 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
     const string& volumeId)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService`.
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK_SOME(controllerContainerId);
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
@@ -2147,11 +2143,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
 
   Future<Nothing> controllerUnpublished;
 
-  if (controllerCapabilities->publishUnpublishVolume) {
-    controllerUnpublished = getService(controllerContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::ControllerUnpublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+  if (controllerCapabilities.publishUnpublishVolume) {
+    controllerUnpublished = getService(controllerContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::ControllerUnpublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_node_id(nodeId.get());
 
@@ -2165,7 +2160,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
   return controllerUnpublished
     .then(defer(self(), [=] {
       volumes.at(volumeId).state.set_state(csi::state::VolumeState::CREATED);
-      volumes.at(volumeId).state.mutable_publish_volume_info()->clear();
+      volumes.at(volumeId).state.mutable_publish_info()->clear();
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -2182,6 +2177,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
 Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
     const string& volumeId)
 {
+  // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+
+  CHECK_SOME(nodeContainerId);
+
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
         csi::state::VolumeState::NODE_PUBLISH) {
@@ -2208,13 +2207,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
         "Failed to create mount point '" + mountPath + "': " + mkdir.error());
   }
 
-  return getService(nodeContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      csi::NodePublishVolumeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      csi::v0::NodePublishVolumeRequest request;
       request.set_volume_id(volumeId);
-      *request.mutable_publish_volume_info() =
-        volumes.at(volumeId).state.publish_volume_info();
+      *request.mutable_publish_info() =
+        volumes.at(volumeId).state.publish_info();
       request.set_target_path(mountPath);
       request.mutable_volume_capability()
         ->CopyFrom(volumes.at(volumeId).state.volume_capability());
@@ -2243,6 +2241,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     const string& volumeId)
 {
+  // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+
+  CHECK_SOME(nodeContainerId);
+
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
         csi::state::VolumeState::NODE_UNPUBLISH) {
@@ -2267,10 +2269,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
   Future<Nothing> nodeUnpublished;
 
   if (os::exists(mountPath)) {
-    nodeUnpublished = getService(nodeContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::NodeUnpublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+    nodeUnpublished = getService(nodeContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::NodeUnpublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_target_path(mountPath);
 
@@ -2310,22 +2311,22 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 
 
 // Returns a CSI volume ID.
+// NOTE: This can only be called after `prepareControllerService`.
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
     const DiskProfileAdaptor::ProfileInfo& profileInfo)
 {
-  // NOTE: This can only be called after `prepareControllerService`.
-  CHECK_SOME(controllerCapabilities);
-
-  if (!controllerCapabilities->createDeleteVolume) {
-    return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+  if (!controllerCapabilities.createDeleteVolume) {
+    return Failure(
+        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      csi::CreateVolumeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      csi::v0::CreateVolumeRequest request;
       request.set_name(name);
       request.mutable_capacity_range()
         ->set_required_bytes(capacity.bytes());
@@ -2335,47 +2336,49 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
       *request.mutable_parameters() = profileInfo.parameters;
 
       return client.CreateVolume(request)
-        .then(defer(self(), [=](const csi::CreateVolumeResponse& response) {
-          const csi::VolumeInfo& volumeInfo = response.volume_info();
+        .then(defer(self(), [=](const csi::v0::CreateVolumeResponse& response) {
+          const csi::v0::Volume& volume = response.volume();
 
-          if (volumes.contains(volumeInfo.id())) {
+          if (volumes.contains(volume.id())) {
             // The resource provider failed over after the last
             // `CreateVolume` call, but before the operation status was
             // checkpointed.
             CHECK_EQ(csi::state::VolumeState::CREATED,
-                     volumes.at(volumeInfo.id()).state.state());
+                     volumes.at(volume.id()).state.state());
           } else {
             csi::state::VolumeState volumeState;
             volumeState.set_state(csi::state::VolumeState::CREATED);
             volumeState.mutable_volume_capability()
               ->CopyFrom(profileInfo.capability);
-            *volumeState.mutable_volume_attributes() = volumeInfo.attributes();
+            *volumeState.mutable_volume_attributes() = volume.attributes();
 
-            volumes.put(volumeInfo.id(), std::move(volumeState));
-            checkpointVolumeState(volumeInfo.id());
+            volumes.put(volume.id(), std::move(volumeState));
+            checkpointVolumeState(volume.id());
           }
 
-          return volumeInfo.id();
+          return volume.id();
         }));
     }));
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
 Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId,
     bool preExisting)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   // We do not need the capability for pre-existing volumes since no
   // actual `DeleteVolume` call will be made.
-  if (!preExisting && !controllerCapabilities->createDeleteVolume) {
-    return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+  if (!preExisting && !controllerCapabilities.createDeleteVolume) {
+    return Failure(
+        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
+  CHECK_SOME(controllerContainerId);
+
   const string volumePath = csi::paths::getVolumePath(
       slave::paths::getCsiRootDir(workDir),
       info.storage().plugin().type(),
@@ -2399,10 +2402,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       case csi::state::VolumeState::CREATED: {
         if (!preExisting) {
           deleted = deleted
-            .then(defer(self(), &Self::getService, controllerContainerId))
-            .then(defer(self(), [=](csi::Client client) {
-              csi::DeleteVolumeRequest request;
-              request.mutable_version()->CopyFrom(csiVersion);
+            .then(defer(self(), &Self::getService, controllerContainerId.get()))
+            .then(defer(self(), [=](csi::v0::Client client) {
+              csi::v0::DeleteVolumeRequest request;
               request.set_volume_id(volumeId);
 
               return client.DeleteVolume(request)
@@ -2443,34 +2445,41 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-// Validates if a volume has the specified capability. This is called
-// when applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing
-// volume, so we make it returns a volume ID, similar to `createVolume`.
+// Validates if a volume has the specified capability. This is called when
+// applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing volume, so we
+// make it returns a volume ID, similar to `createVolume`.
+// NOTE: This can only be called after `prepareIdentityService` and only for
+// newly discovered volumes.
 Future<string> StorageLocalResourceProviderProcess::validateCapability(
     const string& volumeId,
     const Option<Labels>& metadata,
-    const csi::VolumeCapability& capability)
+    const csi::v0::VolumeCapability& capability)
 {
-  // NOTE: This can only be called for newly discovered volumes.
   CHECK(!volumes.contains(volumeId));
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  if (!pluginCapabilities.controllerService) {
+    return Failure(
+        "Plugin capability 'CONTROLLER_SERVICE' is not supported");
+  }
+
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       google::protobuf::Map<string, string> volumeAttributes;
 
       if (metadata.isSome()) {
         volumeAttributes = convertLabelsToStringMap(metadata.get()).get();
       }
 
-      csi::ValidateVolumeCapabilitiesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+      csi::v0::ValidateVolumeCapabilitiesRequest request;
       request.set_volume_id(volumeId);
       request.add_volume_capabilities()->CopyFrom(capability);
       *request.mutable_volume_attributes() = volumeAttributes;
 
       return client.ValidateVolumeCapabilities(request)
         .then(defer(self(), [=](
-            const csi::ValidateVolumeCapabilitiesResponse& response)
+            const csi::v0::ValidateVolumeCapabilitiesResponse& response)
             -> Future<string> {
           if (!response.supported()) {
             return Failure(
@@ -2492,27 +2501,25 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // the resource provider ID has been obtained.
-  CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
   // This is only used for reconciliation so no failure is returned.
-  if (!controllerCapabilities->listVolumes) {
+  if (!controllerCapabilities.listVolumes) {
     return Resources();
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       // TODO(chhsiao): Set the max entries and use a loop to do
       // mutliple `ListVolumes` calls.
-      csi::ListVolumesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ListVolumes(request)
-        .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
+      return client.ListVolumes(csi::v0::ListVolumesRequest())
+        .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
           Resources resources;
 
           // Recover disk profiles from the checkpointed state.
@@ -2529,15 +2536,14 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
           foreach (const auto& entry, response.entries()) {
             resources += createRawDiskResource(
                 info,
-                Bytes(entry.volume_info().capacity_bytes()),
-                volumesToProfiles.contains(entry.volume_info().id())
-                  ? volumesToProfiles.at(entry.volume_info().id())
+                Bytes(entry.volume().capacity_bytes()),
+                volumesToProfiles.contains(entry.volume().id())
+                  ? volumesToProfiles.at(entry.volume().id())
                   : Option<string>::none(),
-                entry.volume_info().id(),
-                entry.volume_info().attributes().empty()
+                entry.volume().id(),
+                entry.volume().attributes().empty()
                   ? Option<Labels>::none()
-                  : convertStringMapToLabels(
-                        entry.volume_info().attributes()));
+                  : convertStringMapToLabels(entry.volume().attributes()));
           }
 
           return resources;
@@ -2546,20 +2552,21 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // the resource provider ID has been obtained.
-  CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
   // This is only used for reconciliation so no failure is returned.
-  if (!controllerCapabilities->getCapacity) {
+  if (!controllerCapabilities.getCapacity) {
     return Resources();
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       list<Future<Resources>> futures;
 
       foreach (const string& profile, knownProfiles) {
@@ -2570,14 +2577,13 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
         const DiskProfileAdaptor::ProfileInfo& profileInfo =
           profileInfos.at(profile);
 
-        csi::GetCapacityRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+        csi::v0::GetCapacityRequest request;
         request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
         *request.mutable_parameters() = profileInfo.parameters;
 
         futures.push_back(client.GetCapacity(request)
           .then(defer(self(), [=](
-              const csi::GetCapacityResponse& response) -> Resources {
+              const csi::v0::GetCapacityResponse& response) -> Resources {
             if (response.available_capacity() == 0) {
               return Resources();
             }
@@ -3218,26 +3224,22 @@ Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
         "' does not follow Java package naming convention");
   }
 
-  bool hasControllerService = false;
+  // Verify that the plugin provides the CSI node service.
+  // TODO(chhsiao): We should move this check to a validation function
+  // for `CSIPluginInfo`.
   bool hasNodeService = false;
 
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    for (int i = 0; i < container.services_size(); i++) {
-      const CSIPluginContainerInfo::Service service = container.services(i);
-      if (service == CSIPluginContainerInfo::CONTROLLER_SERVICE) {
-        hasControllerService = true;
-      } else if (service == CSIPluginContainerInfo::NODE_SERVICE) {
-        hasNodeService = true;
-      }
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::NODE_SERVICE)) {
+      hasNodeService = true;
+      break;
     }
   }
 
-  if (!hasControllerService) {
-    return Error(
-        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
-  }
-
   if (!hasNodeService) {
     return Error(
         stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");