You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/04/12 21:02:39 UTC

[1/6] mesos git commit: Made the test CSI plugin compatible to CSI v0.2.

Repository: mesos
Updated Branches:
  refs/heads/master 3fb36f158 -> f852ffacc


Made the test CSI plugin compatible to CSI v0.2.

This patch contains necessary changes for the test CSI plugin to support
CSI v0.2. The `STAGE_UNSTAGE_VOLUME` node service capability is not
implemented in this patch yet.

Review: https://reviews.apache.org/r/66411/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f852ffac
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f852ffac
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f852ffac

Branch: refs/heads/master
Commit: f852ffaccece40ddd2ba1ac1b29a117687bf6917
Parents: aeffcd7
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 12:34:02 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 14:01:52 2018 -0700

----------------------------------------------------------------------
 src/examples/test_csi_plugin.cpp | 416 +++++++++++++---------------------
 1 file changed, 152 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f852ffac/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index aed2262..357e022 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -98,20 +98,18 @@ public:
 
 
 class TestCSIPlugin
-  : public csi::Identity::Service,
-    public csi::Controller::Service,
-    public csi::Node::Service
+  : public csi::v0::Identity::Service,
+    public csi::v0::Controller::Service,
+    public csi::v0::Node::Service
 {
 public:
   TestCSIPlugin(
       const string& _workDir,
       const string& _endpoint,
-      const csi::Version& _version,
       const Bytes& _availableCapacity,
       const hashmap<string, Bytes>& _volumes)
     : workDir(_workDir),
       endpoint(_endpoint),
-      version(_version),
       availableCapacity(_availableCapacity)
   {
     // TODO(jieyu): Consider not using CHECKs here.
@@ -119,15 +117,15 @@ public:
     CHECK_SOME(paths);
 
     foreach (const string& path, paths.get()) {
-      Try<Volume> volume = parseVolumePath(path);
-      CHECK_SOME(volume);
+      Try<VolumeInfo> volumeInfo = parseVolumePath(path);
+      CHECK_SOME(volumeInfo);
 
-      CHECK(!volumes.contains(volume->id));
-      volumes.put(volume->id, volume.get());
+      CHECK(!volumes.contains(volumeInfo->id));
+      volumes.put(volumeInfo->id, volumeInfo.get());
 
-      if (!_volumes.contains(volume->id)) {
-        CHECK_GE(availableCapacity, volume->size);
-        availableCapacity -= volume->size;
+      if (!_volumes.contains(volumeInfo->id)) {
+        CHECK_GE(availableCapacity, volumeInfo->size);
+        availableCapacity -= volumeInfo->size;
       }
     }
 
@@ -136,23 +134,23 @@ public:
         continue;
       }
 
-      Volume volume;
-      volume.id = name;
-      volume.size = capacity;
+      VolumeInfo volumeInfo;
+      volumeInfo.id = name;
+      volumeInfo.size = capacity;
 
-      const string path = getVolumePath(volume);
+      const string path = getVolumePath(volumeInfo);
 
       Try<Nothing> mkdir = os::mkdir(path);
       CHECK_SOME(mkdir);
 
-      volumes.put(volume.id, volume);
+      volumes.put(volumeInfo.id, volumeInfo);
     }
 
     ServerBuilder builder;
     builder.AddListeningPort(endpoint, InsecureServerCredentials());
-    builder.RegisterService(static_cast<csi::Identity::Service*>(this));
-    builder.RegisterService(static_cast<csi::Controller::Service*>(this));
-    builder.RegisterService(static_cast<csi::Node::Service*>(this));
+    builder.RegisterService(static_cast<csi::v0::Identity::Service*>(this));
+    builder.RegisterService(static_cast<csi::v0::Controller::Service*>(this));
+    builder.RegisterService(static_cast<csi::v0::Node::Service*>(this));
     server = builder.BuildAndStart();
   }
 
@@ -163,136 +161,135 @@ public:
     }
   }
 
-  virtual Status GetSupportedVersions(
+  virtual Status GetPluginInfo(
       ServerContext* context,
-      const csi::GetSupportedVersionsRequest* request,
-      csi::GetSupportedVersionsResponse* response) override;
+      const csi::v0::GetPluginInfoRequest* request,
+      csi::v0::GetPluginInfoResponse* response) override;
 
-  virtual Status GetPluginInfo(
+  virtual Status GetPluginCapabilities(
       ServerContext* context,
-      const csi::GetPluginInfoRequest* request,
-      csi::GetPluginInfoResponse* response) override;
+      const csi::v0::GetPluginCapabilitiesRequest* request,
+      csi::v0::GetPluginCapabilitiesResponse* response) override;
+
+  virtual Status Probe(
+      ServerContext* context,
+      const csi::v0::ProbeRequest* request,
+      csi::v0::ProbeResponse* response) override;
 
   virtual Status CreateVolume(
       ServerContext* context,
-      const csi::CreateVolumeRequest* request,
-      csi::CreateVolumeResponse* response) override;
+      const csi::v0::CreateVolumeRequest* request,
+      csi::v0::CreateVolumeResponse* response) override;
 
   virtual Status DeleteVolume(
       ServerContext* context,
-      const csi::DeleteVolumeRequest* request,
-      csi::DeleteVolumeResponse* response) override;
+      const csi::v0::DeleteVolumeRequest* request,
+      csi::v0::DeleteVolumeResponse* response) override;
 
   virtual Status ControllerPublishVolume(
       ServerContext* context,
-      const csi::ControllerPublishVolumeRequest* request,
-      csi::ControllerPublishVolumeResponse* response) override;
+      const csi::v0::ControllerPublishVolumeRequest* request,
+      csi::v0::ControllerPublishVolumeResponse* response) override;
 
   virtual Status ControllerUnpublishVolume(
       ServerContext* context,
-      const csi::ControllerUnpublishVolumeRequest* request,
-      csi::ControllerUnpublishVolumeResponse* response) override;
+      const csi::v0::ControllerUnpublishVolumeRequest* request,
+      csi::v0::ControllerUnpublishVolumeResponse* response) override;
 
   virtual Status ValidateVolumeCapabilities(
       ServerContext* context,
-      const csi::ValidateVolumeCapabilitiesRequest* request,
-      csi::ValidateVolumeCapabilitiesResponse* response) override;
+      const csi::v0::ValidateVolumeCapabilitiesRequest* request,
+      csi::v0::ValidateVolumeCapabilitiesResponse* response) override;
 
   virtual Status ListVolumes(
       ServerContext* context,
-      const csi::ListVolumesRequest* request,
-      csi::ListVolumesResponse* response) override;
+      const csi::v0::ListVolumesRequest* request,
+      csi::v0::ListVolumesResponse* response) override;
 
   virtual Status GetCapacity(
       ServerContext* context,
-      const csi::GetCapacityRequest* request,
-      csi::GetCapacityResponse* response) override;
-
-  virtual Status ControllerProbe(
-      ServerContext* context,
-      const csi::ControllerProbeRequest* request,
-      csi::ControllerProbeResponse* response) override;
+      const csi::v0::GetCapacityRequest* request,
+      csi::v0::GetCapacityResponse* response) override;
 
   virtual Status ControllerGetCapabilities(
       ServerContext* context,
-      const csi::ControllerGetCapabilitiesRequest* request,
-      csi::ControllerGetCapabilitiesResponse* response) override;
+      const csi::v0::ControllerGetCapabilitiesRequest* request,
+      csi::v0::ControllerGetCapabilitiesResponse* response) override;
 
   virtual Status NodePublishVolume(
       ServerContext* context,
-      const csi::NodePublishVolumeRequest* request,
-      csi::NodePublishVolumeResponse* response) override;
+      const csi::v0::NodePublishVolumeRequest* request,
+      csi::v0::NodePublishVolumeResponse* response) override;
 
   virtual Status NodeUnpublishVolume(
       ServerContext* context,
-      const csi::NodeUnpublishVolumeRequest* request,
-      csi::NodeUnpublishVolumeResponse* response) override;
-
-  virtual Status GetNodeID(
-      ServerContext* context,
-      const csi::GetNodeIDRequest* request,
-      csi::GetNodeIDResponse* response) override;
+      const csi::v0::NodeUnpublishVolumeRequest* request,
+      csi::v0::NodeUnpublishVolumeResponse* response) override;
 
-  virtual Status NodeProbe(
+  virtual Status NodeGetId(
       ServerContext* context,
-      const csi::NodeProbeRequest* request,
-      csi::NodeProbeResponse* response) override;
+      const csi::v0::NodeGetIdRequest* request,
+      csi::v0::NodeGetIdResponse* response) override;
 
   virtual Status NodeGetCapabilities(
       ServerContext* context,
-      const csi::NodeGetCapabilitiesRequest* request,
-      csi::NodeGetCapabilitiesResponse* response) override;
+      const csi::v0::NodeGetCapabilitiesRequest* request,
+      csi::v0::NodeGetCapabilitiesResponse* response) override;
 
 private:
-  struct Volume
+  struct VolumeInfo
   {
     string id;
     Bytes size;
   };
 
-  Option<Error> validateVersion(const csi::Version& _version);
-
-  string getVolumePath(const Volume& volume);
-  Try<Volume> parseVolumePath(const string& path);
+  string getVolumePath(const VolumeInfo& volumeInfo);
+  Try<VolumeInfo> parseVolumePath(const string& path);
 
   const string workDir;
   const string endpoint;
-  const csi::Version version;
 
   Bytes availableCapacity;
-  hashmap<string, Volume> volumes;
+  hashmap<string, VolumeInfo> volumes;
 
   unique_ptr<Server> server;
 };
 
 
-Status TestCSIPlugin::GetSupportedVersions(
+Status TestCSIPlugin::GetPluginInfo(
     ServerContext* context,
-    const csi::GetSupportedVersionsRequest* request,
-    csi::GetSupportedVersionsResponse* response)
+    const csi::v0::GetPluginInfoRequest* request,
+    csi::v0::GetPluginInfoResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  response->add_supported_versions()->CopyFrom(version);
+  response->set_name(PLUGIN_NAME);
+  response->set_vendor_version(MESOS_VERSION);
 
   return Status::OK;
 }
 
 
-Status TestCSIPlugin::GetPluginInfo(
+Status TestCSIPlugin::GetPluginCapabilities(
     ServerContext* context,
-    const csi::GetPluginInfoRequest* request,
-    csi::GetPluginInfoResponse* response)
+    const csi::v0::GetPluginCapabilitiesRequest* request,
+    csi::v0::GetPluginCapabilitiesResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
+  response->add_capabilities()->mutable_service()->set_type(
+      csi::v0::PluginCapability::Service::CONTROLLER_SERVICE);
 
-  response->set_name(PLUGIN_NAME);
-  response->set_vendor_version(MESOS_VERSION);
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::Probe(
+    ServerContext* context,
+    const csi::v0::ProbeRequest* request,
+    csi::v0::ProbeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   return Status::OK;
 }
@@ -300,18 +297,13 @@ Status TestCSIPlugin::GetPluginInfo(
 
 Status TestCSIPlugin::CreateVolume(
     ServerContext* context,
-    const csi::CreateVolumeRequest* request,
-    csi::CreateVolumeResponse* response)
+    const csi::v0::CreateVolumeRequest* request,
+    csi::v0::CreateVolumeResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   if (request->name().empty()) {
     return Status(grpc::INVALID_ARGUMENT, "Volume name cannot be empty");
   }
@@ -327,12 +319,12 @@ Status TestCSIPlugin::CreateVolume(
       return Status(grpc::OUT_OF_RANGE, "Insufficient capacity");
     }
 
-    Volume volume;
-    volume.id = request->name();
-    volume.size = min(DEFAULT_VOLUME_CAPACITY, availableCapacity);
+    VolumeInfo volumeInfo;
+    volumeInfo.id = request->name();
+    volumeInfo.size = min(DEFAULT_VOLUME_CAPACITY, availableCapacity);
 
     if (request->has_capacity_range()) {
-      const csi::CapacityRange& range = request->capacity_range();
+      const csi::v0::CapacityRange& range = request->capacity_range();
 
       // The highest we can pick.
       Bytes limit = availableCapacity;
@@ -341,34 +333,34 @@ Status TestCSIPlugin::CreateVolume(
       }
 
       if (range.required_bytes() != 0 &&
-          range.required_bytes() > limit.bytes()) {
+          static_cast<size_t>(range.required_bytes()) > limit.bytes()) {
         return Status(grpc::OUT_OF_RANGE, "Cannot satisfy 'required_bytes'");
       }
 
-      volume.size = min(
+      volumeInfo.size = min(
           limit,
           max(DEFAULT_VOLUME_CAPACITY, Bytes(range.required_bytes())));
     }
 
-    const string path = getVolumePath(volume);
+    const string path = getVolumePath(volumeInfo);
 
     Try<Nothing> mkdir = os::mkdir(path);
     if (mkdir.isError()) {
       return Status(
           grpc::INTERNAL,
-          "Failed to create volume '" + volume.id + "': " + mkdir.error());
+          "Failed to create volume '" + volumeInfo.id + "': " + mkdir.error());
     }
 
-    availableCapacity -= volume.size;
-    volumes.put(volume.id, volume);
+    availableCapacity -= volumeInfo.size;
+    volumes.put(volumeInfo.id, volumeInfo);
   }
 
-  const Volume& volume = volumes.at(request->name());
+  const VolumeInfo& volumeInfo = volumes.at(request->name());
 
-  response->mutable_volume_info()->set_id(volume.id);
-  response->mutable_volume_info()->set_capacity_bytes(volume.size.bytes());
-  (*response->mutable_volume_info()->mutable_attributes())["path"] =
-    getVolumePath(volume);
+  response->mutable_volume()->set_id(volumeInfo.id);
+  response->mutable_volume()->set_capacity_bytes(volumeInfo.size.bytes());
+  (*response->mutable_volume()->mutable_attributes())["path"] =
+    getVolumePath(volumeInfo);
 
   if (alreadyExists) {
     return Status(
@@ -382,26 +374,19 @@ Status TestCSIPlugin::CreateVolume(
 
 Status TestCSIPlugin::DeleteVolume(
     ServerContext* context,
-    const csi::DeleteVolumeRequest* request,
-    csi::DeleteVolumeResponse* response)
+    const csi::v0::DeleteVolumeRequest* request,
+    csi::v0::DeleteVolumeResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   if (!volumes.contains(request->volume_id())) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Volume '" + request->volume_id() + "' is not found");
+    return Status::OK;
   }
 
-  const Volume& volume = volumes.at(request->volume_id());
-  const string path = getVolumePath(volume);
+  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
+  const string path = getVolumePath(volumeInfo);
 
   Try<Nothing> rmdir = os::rmdir(path);
   if (rmdir.isError()) {
@@ -411,8 +396,8 @@ Status TestCSIPlugin::DeleteVolume(
         rmdir.error());
   }
 
-  availableCapacity -= volume.size;
-  volumes.erase(volume.id);
+  availableCapacity -= volumeInfo.size;
+  volumes.erase(volumeInfo.id);
 
   return Status::OK;
 }
@@ -420,26 +405,21 @@ Status TestCSIPlugin::DeleteVolume(
 
 Status TestCSIPlugin::ControllerPublishVolume(
     ServerContext* context,
-    const csi::ControllerPublishVolumeRequest* request,
-    csi::ControllerPublishVolumeResponse* response)
+    const csi::v0::ControllerPublishVolumeRequest* request,
+    csi::v0::ControllerPublishVolumeResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   if (!volumes.contains(request->volume_id())) {
     return Status(
         grpc::NOT_FOUND,
         "Volume '" + request->volume_id() + "' is not found");
   }
 
-  const Volume& volume = volumes.at(request->volume_id());
-  const string path = getVolumePath(volume);
+  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
+  const string path = getVolumePath(volumeInfo);
 
   auto it = request->volume_attributes().find("path");
   if (it == request->volume_attributes().end() || it->second != path) {
@@ -459,18 +439,13 @@ Status TestCSIPlugin::ControllerPublishVolume(
 
 Status TestCSIPlugin::ControllerUnpublishVolume(
     ServerContext* context,
-    const csi::ControllerUnpublishVolumeRequest* request,
-    csi::ControllerUnpublishVolumeResponse* response)
+    const csi::v0::ControllerUnpublishVolumeRequest* request,
+    csi::v0::ControllerUnpublishVolumeResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   if (!volumes.contains(request->volume_id())) {
     return Status(
         grpc::NOT_FOUND,
@@ -490,33 +465,28 @@ Status TestCSIPlugin::ControllerUnpublishVolume(
 
 Status TestCSIPlugin::ValidateVolumeCapabilities(
     ServerContext* context,
-    const csi::ValidateVolumeCapabilitiesRequest* request,
-    csi::ValidateVolumeCapabilitiesResponse* response)
+    const csi::v0::ValidateVolumeCapabilitiesRequest* request,
+    csi::v0::ValidateVolumeCapabilitiesResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   if (!volumes.contains(request->volume_id())) {
     return Status(
         grpc::NOT_FOUND,
         "Volume '" + request->volume_id() + "' is not found");
   }
 
-  const Volume& volume = volumes.at(request->volume_id());
-  const string path = getVolumePath(volume);
+  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
+  const string path = getVolumePath(volumeInfo);
 
   auto it = request->volume_attributes().find("path");
   if (it == request->volume_attributes().end() || it->second != path) {
     return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
   }
 
-  foreach (const csi::VolumeCapability& capability,
+  foreach (const csi::v0::VolumeCapability& capability,
            request->volume_capabilities()) {
     if (capability.has_mount() &&
         (!capability.mount().fs_type().empty() ||
@@ -528,7 +498,7 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
     }
 
     if (capability.access_mode().mode() !=
-        csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
+        csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
       response->set_supported(false);
       response->set_message("Access mode is not supported");
 
@@ -544,16 +514,11 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
 
 Status TestCSIPlugin::ListVolumes(
     ServerContext* context,
-    const csi::ListVolumesRequest* request,
-    csi::ListVolumesResponse* response)
+    const csi::v0::ListVolumesRequest* request,
+    csi::v0::ListVolumesResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   // TODO(chhsiao): Support the `max_entries` field.
   if (request->max_entries() > 0) {
     return Status(grpc::ABORTED, "Field 'max_entries' is not supported");
@@ -564,11 +529,11 @@ Status TestCSIPlugin::ListVolumes(
     return Status(grpc::ABORTED, "Field 'starting_token' is not supported");
   }
 
-  foreachvalue (const Volume& volume, volumes) {
-    csi::VolumeInfo* info = response->add_entries()->mutable_volume_info();
-    info->set_id(volume.id);
-    info->set_capacity_bytes(volume.size.bytes());
-    (*info->mutable_attributes())["path"] = getVolumePath(volume);
+  foreachvalue (const VolumeInfo& volumeInfo, volumes) {
+    csi::v0::Volume* volume = response->add_entries()->mutable_volume();
+    volume->set_id(volumeInfo.id);
+    volume->set_capacity_bytes(volumeInfo.size.bytes());
+    (*volume->mutable_attributes())["path"] = getVolumePath(volumeInfo);
   }
 
   return Status::OK;
@@ -577,17 +542,12 @@ Status TestCSIPlugin::ListVolumes(
 
 Status TestCSIPlugin::GetCapacity(
     ServerContext* context,
-    const csi::GetCapacityRequest* request,
-    csi::GetCapacityResponse* response)
+    const csi::v0::GetCapacityRequest* request,
+    csi::v0::GetCapacityResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
-  foreach (const csi::VolumeCapability& capability,
+  foreach (const csi::v0::VolumeCapability& capability,
            request->volume_capabilities()) {
     // We report zero capacity for any capability other than the
     // default-constructed `MountVolume` capability since this plugin
@@ -601,7 +561,7 @@ Status TestCSIPlugin::GetCapacity(
     }
 
     if (capability.access_mode().mode() !=
-        csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
+        csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
       response->set_available_capacity(0);
 
       return Status::OK;
@@ -614,42 +574,21 @@ Status TestCSIPlugin::GetCapacity(
 }
 
 
-Status TestCSIPlugin::ControllerProbe(
-    ServerContext* context,
-    const csi::ControllerProbeRequest* request,
-    csi::ControllerProbeResponse* response)
-{
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
-
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
-  return Status::OK;
-}
-
-
 Status TestCSIPlugin::ControllerGetCapabilities(
     ServerContext* context,
-    const csi::ControllerGetCapabilitiesRequest* request,
-    csi::ControllerGetCapabilitiesResponse* response)
+    const csi::v0::ControllerGetCapabilitiesRequest* request,
+    csi::v0::ControllerGetCapabilitiesResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   response->add_capabilities()->mutable_rpc()->set_type(
-      csi::ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME);
+      csi::v0::ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME);
   response->add_capabilities()->mutable_rpc()->set_type(
-      csi::ControllerServiceCapability::RPC::PUBLISH_UNPUBLISH_VOLUME);
+      csi::v0::ControllerServiceCapability::RPC::PUBLISH_UNPUBLISH_VOLUME);
   response->add_capabilities()->mutable_rpc()->set_type(
-      csi::ControllerServiceCapability::RPC::GET_CAPACITY);
+      csi::v0::ControllerServiceCapability::RPC::GET_CAPACITY);
   response->add_capabilities()->mutable_rpc()->set_type(
-      csi::ControllerServiceCapability::RPC::LIST_VOLUMES);
+      csi::v0::ControllerServiceCapability::RPC::LIST_VOLUMES);
 
   return Status::OK;
 }
@@ -657,26 +596,21 @@ Status TestCSIPlugin::ControllerGetCapabilities(
 
 Status TestCSIPlugin::NodePublishVolume(
     ServerContext* context,
-    const csi::NodePublishVolumeRequest* request,
-    csi::NodePublishVolumeResponse* response)
+    const csi::v0::NodePublishVolumeRequest* request,
+    csi::v0::NodePublishVolumeResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
   // TODO(chhsiao): Validate required fields.
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   if (!volumes.contains(request->volume_id())) {
     return Status(
         grpc::NOT_FOUND,
         "Volume '" + request->volume_id() + "' is not found");
   }
 
-  const Volume& volume = volumes.at(request->volume_id());
-  const string path = getVolumePath(volume);
+  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
+  const string path = getVolumePath(volumeInfo);
 
   auto it = request->volume_attributes().find("path");
   if (it == request->volume_attributes().end() || it->second != path) {
@@ -736,16 +670,11 @@ Status TestCSIPlugin::NodePublishVolume(
 
 Status TestCSIPlugin::NodeUnpublishVolume(
     ServerContext* context,
-    const csi::NodeUnpublishVolumeRequest* request,
-    csi::NodeUnpublishVolumeResponse* response)
+    const csi::v0::NodeUnpublishVolumeRequest* request,
+    csi::v0::NodeUnpublishVolumeResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   if (!volumes.contains(request->volume_id())) {
     return Status(
         grpc::NOT_FOUND,
@@ -771,8 +700,8 @@ Status TestCSIPlugin::NodeUnpublishVolume(
     return Status::OK;
   }
 
-  const Volume& volume = volumes.at(request->volume_id());
-  const string path = getVolumePath(volume);
+  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
+  const string path = getVolumePath(volumeInfo);
 
   Try<Nothing> unmount = fs::unmount(request->target_path());
   if (unmount.isError()) {
@@ -786,75 +715,40 @@ Status TestCSIPlugin::NodeUnpublishVolume(
 }
 
 
-Status TestCSIPlugin::GetNodeID(
+Status TestCSIPlugin::NodeGetId(
     ServerContext* context,
-    const csi::GetNodeIDRequest* request,
-    csi::GetNodeIDResponse* response)
+    const csi::v0::NodeGetIdRequest* request,
+    csi::v0::NodeGetIdResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   response->set_node_id(NODE_ID);
 
   return Status::OK;
 }
 
 
-Status TestCSIPlugin::NodeProbe(
-    ServerContext* context,
-    const csi::NodeProbeRequest* request,
-    csi::NodeProbeResponse* response)
-{
-  LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
-
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
-  return Status::OK;
-}
-
-
 Status TestCSIPlugin::NodeGetCapabilities(
     ServerContext* context,
-    const csi::NodeGetCapabilitiesRequest* request,
-    csi::NodeGetCapabilitiesResponse* response)
+    const csi::v0::NodeGetCapabilitiesRequest* request,
+    csi::v0::NodeGetCapabilitiesResponse* response)
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  Option<Error> error = validateVersion(request->version());
-  if (error.isSome()) {
-    return Status(grpc::INVALID_ARGUMENT, error->message);
-  }
-
   return Status::OK;
 }
 
 
-Option<Error> TestCSIPlugin::validateVersion(const csi::Version& _version)
-{
-  if (version != _version) {
-    return Error("Version " + stringify(_version) + " is not supported");
-  }
-
-  return None();
-}
-
-
-string TestCSIPlugin::getVolumePath(const Volume& volume)
+string TestCSIPlugin::getVolumePath(const VolumeInfo& volumeInfo)
 {
   return path::join(
       workDir,
-      strings::join("-", stringify(volume.size), volume.id));
+      strings::join("-", stringify(volumeInfo.size), volumeInfo.id));
 }
 
 
-Try<TestCSIPlugin::Volume> TestCSIPlugin::parseVolumePath(const string& path)
+Try<TestCSIPlugin::VolumeInfo> TestCSIPlugin::parseVolumePath(
+    const string& path)
 {
   size_t pos = path.find_first_of("-");
   if (pos == string::npos) {
@@ -869,11 +763,11 @@ Try<TestCSIPlugin::Volume> TestCSIPlugin::parseVolumePath(const string& path)
     return Error("Failed to parse bytes: " + bytes.error());
   }
 
-  Volume volume;
-  volume.id = id;
-  volume.size = bytes.get();
+  VolumeInfo volumeInfo;
+  volumeInfo.id = id;
+  volumeInfo.size = bytes.get();
 
-  return volume;
+  return volumeInfo;
 }
 
 
@@ -906,11 +800,6 @@ int main(int argc, char** argv)
     return EXIT_FAILURE;
   }
 
-  csi::Version version;
-  version.set_major(0);
-  version.set_minor(1);
-  version.set_patch(0);
-
   hashmap<string, Bytes> volumes;
 
   if (flags.volumes.isSome()) {
@@ -947,7 +836,6 @@ int main(int argc, char** argv)
   unique_ptr<TestCSIPlugin> plugin(new TestCSIPlugin(
       flags.work_dir,
       flags.endpoint,
-      version,
       flags.available_capacity,
       volumes));
 


[6/6] mesos git commit: Updated CSI helpers for v0.2.

Posted by ch...@apache.org.
Updated CSI helpers for v0.2.

This patch adds new helper classes for CSI plugin and node capabilities,
removed helpers for the removed csi.Version proto message, and makes
the VolumeState proto message uses csi::v0::VolumeCapability.

NOTE: This is not future-proof if there is a breaking change in
VolumeCapability, we might need to change VolumeState to support
multiple CSI versions in the future.

Review: https://reviews.apache.org/r/66408/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea5d1087
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea5d1087
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea5d1087

Branch: refs/heads/master
Commit: ea5d10873667815a026ec76af5ae94968f206084
Parents: 33eb3eb
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 12:07:14 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 14:01:52 2018 -0700

----------------------------------------------------------------------
 src/csi/state.hpp   | 15 +++++++++++
 src/csi/state.proto |  8 +++---
 src/csi/utils.cpp   | 40 ++-------------------------
 src/csi/utils.hpp   | 70 +++++++++++++++++++++++++++++++++++++++---------
 4 files changed, 78 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5d1087/src/csi/state.hpp
----------------------------------------------------------------------
diff --git a/src/csi/state.hpp b/src/csi/state.hpp
index dcbc7ab..87ff13d 100644
--- a/src/csi/state.hpp
+++ b/src/csi/state.hpp
@@ -21,4 +21,19 @@
 // ONLY USEFUL AFTER RUNNING PROTOC.
 #include "csi/state.pb.h"
 
+namespace mesos {
+namespace csi {
+namespace state {
+
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const VolumeState::State& state)
+{
+  return stream << VolumeState::State_Name(state);
+}
+
+} // namespace state {
+} // namespace csi {
+} // namespace mesos {
+
 #endif // __CSI_STATE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5d1087/src/csi/state.proto
----------------------------------------------------------------------
diff --git a/src/csi/state.proto b/src/csi/state.proto
index 0373c8a..a252cb2 100644
--- a/src/csi/state.proto
+++ b/src/csi/state.proto
@@ -39,18 +39,18 @@ message VolumeState {
 
   // The capability used to publish the volume. This is a
   // REQUIRED field.
-  .csi.VolumeCapability volume_capability = 2;
+  .csi.v0.VolumeCapability volume_capability = 2;
 
   // Attributes of the volume to be used on the node. This field MUST
-  // match the attributes of the `VolumeInfo` returned by
-  // `CreateVolume`. This is an OPTIONAL field.
+  // match the attributes of the `Volume` returned by `CreateVolume`.
+  // This is an OPTIONAL field.
   map<string, string> volume_attributes = 3;
 
   // If the plugin has the `PUBLISH_UNPUBLISH_VOLUME` controller
   // capability, this field MUST be set to the value returned by
   // `ControllerPublishVolume`. Otherwise, this field MUST remain unset.
   // This is an OPTIONAL field.
-  map<string, string> publish_volume_info = 4;
+  map<string, string> publish_info = 4;
 
   // This field is used to check if the node has been rebooted since the
   // last time the volume is mounted. If yes, `NodePublishVolume` needs

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5d1087/src/csi/utils.cpp
----------------------------------------------------------------------
diff --git a/src/csi/utils.cpp b/src/csi/utils.cpp
index 9e04357..fd6f95d 100644
--- a/src/csi/utils.cpp
+++ b/src/csi/utils.cpp
@@ -26,6 +26,7 @@ using std::string;
 using google::protobuf::util::MessageToJsonString;
 
 namespace csi {
+namespace v0 {
 
 bool operator==(
     const ControllerServiceCapability::RPC& left,
@@ -44,14 +45,6 @@ bool operator==(
 }
 
 
-bool operator==(const Version& left, const Version& right)
-{
-  return left.major() == right.major() &&
-    left.minor() == right.minor() &&
-    left.patch() == right.patch();
-}
-
-
 bool operator==(const VolumeCapability& left, const VolumeCapability& right) {
   // NOTE: This enumeration is set when `block` or `mount` are set and
   // covers the case where neither are set.
@@ -93,12 +86,6 @@ bool operator==(const VolumeCapability& left, const VolumeCapability& right) {
 }
 
 
-bool operator!=(const Version& left, const Version& right)
-{
-  return !(left == right);
-}
-
-
 ostream& operator<<(
     ostream& stream,
     const ControllerServiceCapability::RPC::Type& type)
@@ -106,28 +93,5 @@ ostream& operator<<(
   return stream << ControllerServiceCapability::RPC::Type_Name(type);
 }
 
-
-ostream& operator<<(ostream& stream, const Version& version)
-{
-  return stream << strings::join(
-      ".",
-      version.major(),
-      version.minor(),
-      version.patch());
-}
-
-} // namespace csi {
-
-
-namespace mesos {
-namespace csi {
-namespace state {
-
-ostream& operator<<(ostream& stream, const VolumeState::State& state)
-{
-  return stream << VolumeState::State_Name(state);
-}
-
-} // namespace state {
+} // namespace v0 {
 } // namespace csi {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5d1087/src/csi/utils.hpp
----------------------------------------------------------------------
diff --git a/src/csi/utils.hpp b/src/csi/utils.hpp
index 58b071d..5ce318e 100644
--- a/src/csi/utils.hpp
+++ b/src/csi/utils.hpp
@@ -35,29 +35,21 @@
 #include "csi/state.hpp"
 
 namespace csi {
+namespace v0 {
 
 bool operator==(
     const ControllerServiceCapability& left,
     const ControllerServiceCapability& right);
 
 
-bool operator==(const Version& left, const Version& right);
-
-
 bool operator==(const VolumeCapability& left, const VolumeCapability& right);
 
 
-bool operator!=(const Version& left, const Version& right);
-
-
 std::ostream& operator<<(
     std::ostream& stream,
     const ControllerServiceCapability::RPC::Type& type);
 
 
-std::ostream& operator<<(std::ostream& stream, const Version& version);
-
-
 // Default imprementation for output protobuf messages in namespace
 // `csi`. Note that any non-template overloading of the output operator
 // would take precedence over this function template.
@@ -73,11 +65,41 @@ std::ostream& operator<<(std::ostream& stream, const Message& message)
   return stream << output;
 }
 
+} // namespace v0 {
 } // namespace csi {
 
 
 namespace mesos {
 namespace csi {
+namespace v0 {
+
+struct PluginCapabilities
+{
+  PluginCapabilities() = default;
+
+  template <typename Iterable> PluginCapabilities(const Iterable& capabilities)
+  {
+    foreach (const auto& capability, capabilities) {
+      if (capability.has_service() &&
+          PluginCapability::Service::Type_IsValid(
+              capability.service().type())) {
+        switch (capability.service().type()) {
+          case PluginCapability::Service::UNKNOWN:
+            break;
+          case PluginCapability::Service::CONTROLLER_SERVICE:
+            controllerService = true;
+            break;
+          case google::protobuf::kint32min:
+          case google::protobuf::kint32max:
+            UNREACHABLE();
+        }
+      }
+    }
+  }
+
+  bool controllerService = false;
+};
+
 
 struct ControllerCapabilities
 {
@@ -90,7 +112,7 @@ struct ControllerCapabilities
       if (capability.has_rpc() &&
           ControllerServiceCapability::RPC::Type_IsValid(
               capability.rpc().type())) {
-        switch(capability.rpc().type()) {
+        switch (capability.rpc().type()) {
           case ControllerServiceCapability::RPC::UNKNOWN:
             break;
           case ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME:
@@ -120,11 +142,33 @@ struct ControllerCapabilities
 };
 
 
-namespace state {
+struct NodeCapabilities
+{
+  NodeCapabilities() = default;
 
-std::ostream& operator<<(std::ostream& stream, const VolumeState::State& state);
+  template <typename Iterable> NodeCapabilities(const Iterable& capabilities)
+  {
+    foreach (const auto& capability, capabilities) {
+      if (capability.has_rpc() &&
+          NodeServiceCapability::RPC::Type_IsValid(capability.rpc().type())) {
+        switch (capability.rpc().type()) {
+          case NodeServiceCapability::RPC::UNKNOWN:
+            break;
+          case NodeServiceCapability::RPC::STAGE_UNSTAGE_VOLUME:
+            stageUnstageVolume = true;
+            break;
+          case google::protobuf::kint32min:
+          case google::protobuf::kint32max:
+            UNREACHABLE();
+        }
+      }
+    }
+  }
+
+  bool stageUnstageVolume = false;
+};
 
-} // namespace state {
+} // namespace v0 {
 } // namespace csi {
 } // namespace mesos {
 


[4/6] mesos git commit: Updated CSI client to support v0.2.

Posted by ch...@apache.org.
Updated CSI client to support v0.2.

To adapt the change in CSI package names (from `csi` to `csi.v0`), we
introduce a new `csi::v0` namespace in Mesos for v0 helpers. The CSI
client class is now defined in this namespace, and its public methods
are updated to reflect the changes in CSI v0.2.

Review: https://reviews.apache.org/r/66407/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/33eb3eb1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/33eb3eb1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/33eb3eb1

Branch: refs/heads/master
Commit: 33eb3eb10e8f991e380285d8a0ea8a83344e3eb2
Parents: ea7a94b
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 12:07:11 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 14:01:52 2018 -0700

----------------------------------------------------------------------
 include/csi/spec.hpp           |  4 +-
 src/csi/client.cpp             | 98 ++++++++++++++++++++++---------------
 src/csi/client.hpp             | 27 +++++-----
 src/tests/csi_client_tests.cpp | 13 ++---
 src/tests/mock_csi_plugin.cpp  |  8 +--
 src/tests/mock_csi_plugin.hpp  | 49 ++++++++++---------
 6 files changed, 113 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/33eb3eb1/include/csi/spec.hpp
----------------------------------------------------------------------
diff --git a/include/csi/spec.hpp b/include/csi/spec.hpp
index da0caae..2e9b870 100644
--- a/include/csi/spec.hpp
+++ b/include/csi/spec.hpp
@@ -29,9 +29,11 @@
 
 namespace mesos {
 namespace csi {
+namespace v0 {
 
-using namespace ::csi;
+using namespace ::csi::v0;
 
+} // namespace v0 {
 } // namespace csi {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/33eb3eb1/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index 5df788f..559e805 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -23,14 +23,15 @@ using process::grpc::RpcResult;
 
 namespace mesos {
 namespace csi {
+namespace v0 {
 
-Future<GetSupportedVersionsResponse> Client::GetSupportedVersions(
-    const GetSupportedVersionsRequest& request)
+Future<GetPluginInfoResponse> Client::GetPluginInfo(
+    const GetPluginInfoRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Identity, GetSupportedVersions), request)
-    .then([](const RpcResult<GetSupportedVersionsResponse>& result)
-        -> Future<GetSupportedVersionsResponse> {
+    .call(channel, GRPC_RPC(Identity, GetPluginInfo), request)
+    .then([](const RpcResult<GetPluginInfoResponse>& result)
+        -> Future<GetPluginInfoResponse> {
       if (result.status.ok()) {
         return result.response;
       } else {
@@ -40,13 +41,29 @@ Future<GetSupportedVersionsResponse> Client::GetSupportedVersions(
 }
 
 
-Future<GetPluginInfoResponse> Client::GetPluginInfo(
-    const GetPluginInfoRequest& request)
+Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities(
+    const GetPluginCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Identity, GetPluginInfo), request)
-    .then([](const RpcResult<GetPluginInfoResponse>& result)
-        -> Future<GetPluginInfoResponse> {
+    .call(channel, GRPC_RPC(Identity, GetPluginCapabilities), request)
+    .then([](const RpcResult<GetPluginCapabilitiesResponse>& result)
+        -> Future<GetPluginCapabilitiesResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
+}
+
+
+Future<ProbeResponse> Client::Probe(
+    const ProbeRequest& request)
+{
+  return runtime
+    .call(channel, GRPC_RPC(Identity, Probe), request)
+    .then([](const RpcResult<ProbeResponse>& result)
+        -> Future<ProbeResponse> {
       if (result.status.ok()) {
         return result.response;
       } else {
@@ -168,13 +185,13 @@ Future<GetCapacityResponse> Client::GetCapacity(
 }
 
 
-Future<ControllerProbeResponse> Client::ControllerProbe(
-    const ControllerProbeRequest& request)
+Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
+    const ControllerGetCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ControllerProbe), request)
-    .then([](const RpcResult<ControllerProbeResponse>& result)
-        -> Future<ControllerProbeResponse> {
+    .call(channel, GRPC_RPC(Controller, ControllerGetCapabilities), request)
+    .then([](const RpcResult<ControllerGetCapabilitiesResponse>& result)
+        -> Future<ControllerGetCapabilitiesResponse> {
       if (result.status.ok()) {
         return result.response;
       } else {
@@ -184,13 +201,13 @@ Future<ControllerProbeResponse> Client::ControllerProbe(
 }
 
 
-Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
-    const ControllerGetCapabilitiesRequest& request)
+Future<NodeStageVolumeResponse> Client::NodeStageVolume(
+    const NodeStageVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ControllerGetCapabilities), request)
-    .then([](const RpcResult<ControllerGetCapabilitiesResponse>& result)
-        -> Future<ControllerGetCapabilitiesResponse> {
+    .call(channel, GRPC_RPC(Node, NodeStageVolume), request)
+    .then([](const RpcResult<NodeStageVolumeResponse>& result)
+        -> Future<NodeStageVolumeResponse> {
       if (result.status.ok()) {
         return result.response;
       } else {
@@ -200,13 +217,13 @@ Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
 }
 
 
-Future<NodePublishVolumeResponse> Client::NodePublishVolume(
-    const NodePublishVolumeRequest& request)
+Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume(
+    const NodeUnstageVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodePublishVolume), request)
-    .then([](const RpcResult<NodePublishVolumeResponse>& result)
-        -> Future<NodePublishVolumeResponse> {
+    .call(channel, GRPC_RPC(Node, NodeUnstageVolume), request)
+    .then([](const RpcResult<NodeUnstageVolumeResponse>& result)
+        -> Future<NodeUnstageVolumeResponse> {
       if (result.status.ok()) {
         return result.response;
       } else {
@@ -216,13 +233,13 @@ Future<NodePublishVolumeResponse> Client::NodePublishVolume(
 }
 
 
-Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
-    const NodeUnpublishVolumeRequest& request)
+Future<NodePublishVolumeResponse> Client::NodePublishVolume(
+    const NodePublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeUnpublishVolume), request)
-    .then([](const RpcResult<NodeUnpublishVolumeResponse>& result)
-        -> Future<NodeUnpublishVolumeResponse> {
+    .call(channel, GRPC_RPC(Node, NodePublishVolume), request)
+    .then([](const RpcResult<NodePublishVolumeResponse>& result)
+        -> Future<NodePublishVolumeResponse> {
       if (result.status.ok()) {
         return result.response;
       } else {
@@ -232,13 +249,13 @@ Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
 }
 
 
-Future<GetNodeIDResponse> Client::GetNodeID(
-    const GetNodeIDRequest& request)
+Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
+    const NodeUnpublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, GetNodeID), request)
-    .then([](const RpcResult<GetNodeIDResponse>& result)
-        -> Future<GetNodeIDResponse> {
+    .call(channel, GRPC_RPC(Node, NodeUnpublishVolume), request)
+    .then([](const RpcResult<NodeUnpublishVolumeResponse>& result)
+        -> Future<NodeUnpublishVolumeResponse> {
       if (result.status.ok()) {
         return result.response;
       } else {
@@ -248,13 +265,13 @@ Future<GetNodeIDResponse> Client::GetNodeID(
 }
 
 
-Future<NodeProbeResponse> Client::NodeProbe(
-    const NodeProbeRequest& request)
+Future<NodeGetIdResponse> Client::NodeGetId(
+    const NodeGetIdRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeProbe), request)
-    .then([](const RpcResult<NodeProbeResponse>& result)
-        -> Future<NodeProbeResponse> {
+    .call(channel, GRPC_RPC(Node, NodeGetId), request)
+    .then([](const RpcResult<NodeGetIdResponse>& result)
+        -> Future<NodeGetIdResponse> {
       if (result.status.ok()) {
         return result.response;
       } else {
@@ -279,5 +296,6 @@ Future<NodeGetCapabilitiesResponse> Client::NodeGetCapabilities(
     });
 }
 
+} // namespace v0 {
 } // namespace csi {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/33eb3eb1/src/csi/client.hpp
----------------------------------------------------------------------
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 1d55994..5d84674 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -25,6 +25,7 @@
 
 namespace mesos {
 namespace csi {
+namespace v0 {
 
 class Client
 {
@@ -34,12 +35,15 @@ public:
     : channel(_channel), runtime(_runtime) {}
 
   // RPCs for the Identity service.
-  process::Future<GetSupportedVersionsResponse>
-    GetSupportedVersions(const GetSupportedVersionsRequest& request);
-
   process::Future<GetPluginInfoResponse>
     GetPluginInfo(const GetPluginInfoRequest& request);
 
+  process::Future<GetPluginCapabilitiesResponse>
+    GetPluginCapabilities(const GetPluginCapabilitiesRequest& request);
+
+  process::Future<ProbeResponse>
+    Probe(const ProbeRequest& request);
+
   // RPCs for the Controller service.
   process::Future<CreateVolumeResponse>
     CreateVolume(const CreateVolumeRequest& request);
@@ -63,24 +67,24 @@ public:
   process::Future<GetCapacityResponse>
     GetCapacity(const GetCapacityRequest& request);
 
-  process::Future<ControllerProbeResponse>
-    ControllerProbe(const ControllerProbeRequest& request);
-
   process::Future<ControllerGetCapabilitiesResponse>
     ControllerGetCapabilities(const ControllerGetCapabilitiesRequest& request);
 
   // RPCs for the Node service.
+  process::Future<NodeStageVolumeResponse>
+    NodeStageVolume(const NodeStageVolumeRequest& request);
+
+  process::Future<NodeUnstageVolumeResponse>
+    NodeUnstageVolume(const NodeUnstageVolumeRequest& request);
+
   process::Future<NodePublishVolumeResponse>
     NodePublishVolume(const NodePublishVolumeRequest& request);
 
   process::Future<NodeUnpublishVolumeResponse>
     NodeUnpublishVolume(const NodeUnpublishVolumeRequest& request);
 
-  process::Future<GetNodeIDResponse>
-    GetNodeID(const GetNodeIDRequest& request);
-
-  process::Future<NodeProbeResponse>
-    NodeProbe(const NodeProbeRequest& request);
+  process::Future<NodeGetIdResponse>
+    NodeGetId(const NodeGetIdRequest& request);
 
   process::Future<NodeGetCapabilitiesResponse>
     NodeGetCapabilities(const NodeGetCapabilitiesRequest& request);
@@ -90,6 +94,7 @@ private:
   process::grpc::client::Runtime runtime;
 };
 
+} // namespace v0 {
 } // namespace csi {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/33eb3eb1/src/tests/csi_client_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index 566a85e..f5b9eac 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -28,7 +28,7 @@
 
 using std::string;
 
-using mesos::csi::Client;
+using mesos::csi::v0::Client;
 
 using process::Future;
 
@@ -104,8 +104,9 @@ INSTANTIATE_TEST_CASE_P(
     Identity,
     CSIClientTest,
     Values(
-        RPC_PARAM(Client::GetSupportedVersions),
-        RPC_PARAM(Client::GetPluginInfo)),
+        RPC_PARAM(Client::GetPluginInfo),
+        RPC_PARAM(Client::GetPluginCapabilities),
+        RPC_PARAM(Client::Probe)),
     RPCParam::Printer());
 
 INSTANTIATE_TEST_CASE_P(
@@ -119,7 +120,6 @@ INSTANTIATE_TEST_CASE_P(
         RPC_PARAM(Client::ValidateVolumeCapabilities),
         RPC_PARAM(Client::ListVolumes),
         RPC_PARAM(Client::GetCapacity),
-        RPC_PARAM(Client::ControllerProbe),
         RPC_PARAM(Client::ControllerGetCapabilities)),
     RPCParam::Printer());
 
@@ -127,10 +127,11 @@ INSTANTIATE_TEST_CASE_P(
     Node,
     CSIClientTest,
     Values(
+        RPC_PARAM(Client::NodeStageVolume),
+        RPC_PARAM(Client::NodeUnstageVolume),
         RPC_PARAM(Client::NodePublishVolume),
         RPC_PARAM(Client::NodeUnpublishVolume),
-        RPC_PARAM(Client::GetNodeID),
-        RPC_PARAM(Client::NodeProbe),
+        RPC_PARAM(Client::NodeGetId),
         RPC_PARAM(Client::NodeGetCapabilities)),
     RPCParam::Printer());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/33eb3eb1/src/tests/mock_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.cpp b/src/tests/mock_csi_plugin.cpp
index 1dc8761..6983b84 100644
--- a/src/tests/mock_csi_plugin.cpp
+++ b/src/tests/mock_csi_plugin.cpp
@@ -19,10 +19,6 @@
 using std::string;
 using std::unique_ptr;
 
-using csi::Controller;
-using csi::Identity;
-using csi::Node;
-
 using grpc::ChannelArguments;
 using grpc::InsecureServerCredentials;
 using grpc::Server;
@@ -30,6 +26,10 @@ using grpc::ServerBuilder;
 using grpc::ServerContext;
 using grpc::Status;
 
+using mesos::csi::v0::Controller;
+using mesos::csi::v0::Identity;
+using mesos::csi::v0::Node;
+
 using process::grpc::Channel;
 
 using testing::_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/33eb3eb1/src/tests/mock_csi_plugin.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.hpp b/src/tests/mock_csi_plugin.hpp
index b19870d..6f7a5ab 100644
--- a/src/tests/mock_csi_plugin.hpp
+++ b/src/tests/mock_csi_plugin.hpp
@@ -37,34 +37,35 @@ namespace mesos {
 namespace internal {
 namespace tests {
 
-#define CSI_METHOD_FOREACH(macro)        \
-  macro(GetSupportedVersions)            \
-  macro(GetPluginInfo)                   \
-  macro(CreateVolume)                    \
-  macro(DeleteVolume)                    \
-  macro(ControllerPublishVolume)         \
-  macro(ControllerUnpublishVolume)       \
-  macro(ValidateVolumeCapabilities)      \
-  macro(ListVolumes)                     \
-  macro(GetCapacity)                     \
-  macro(ControllerProbe)                 \
-  macro(ControllerGetCapabilities)       \
-  macro(NodePublishVolume)               \
-  macro(NodeUnpublishVolume)             \
-  macro(GetNodeID)                       \
-  macro(NodeProbe)                       \
+#define CSI_METHOD_FOREACH(macro)            \
+  macro(GetPluginInfo)                       \
+  macro(GetPluginCapabilities)               \
+  macro(Probe)                               \
+  macro(CreateVolume)                        \
+  macro(DeleteVolume)                        \
+  macro(ControllerPublishVolume)             \
+  macro(ControllerUnpublishVolume)           \
+  macro(ValidateVolumeCapabilities)          \
+  macro(ListVolumes)                         \
+  macro(GetCapacity)                         \
+  macro(ControllerGetCapabilities)           \
+  macro(NodeStageVolume)                     \
+  macro(NodeUnstageVolume)                   \
+  macro(NodePublishVolume)                   \
+  macro(NodeUnpublishVolume)                 \
+  macro(NodeGetId)                           \
   macro(NodeGetCapabilities)
 
-#define DECLARE_MOCK_CSI_METHOD(name)    \
-  MOCK_METHOD3(name, grpc::Status(       \
-      grpc::ServerContext* context,      \
-      const csi::name##Request* request, \
-      csi::name##Response* response));
+#define DECLARE_MOCK_CSI_METHOD(name)        \
+  MOCK_METHOD3(name, grpc::Status(           \
+      grpc::ServerContext* context,          \
+      const csi::v0::name##Request* request, \
+      csi::v0::name##Response* response));
 
 // Definition of a mock CSI plugin to be used in tests with gmock.
-class MockCSIPlugin : public csi::Identity::Service,
-                      public csi::Controller::Service,
-                      public csi::Node::Service
+class MockCSIPlugin : public csi::v0::Identity::Service,
+                      public csi::v0::Controller::Service,
+                      public csi::v0::Node::Service
 {
 public:
   MockCSIPlugin();


[3/6] mesos git commit: Bumped the CSI spec bundle to 0.2.0.

Posted by ch...@apache.org.
Bumped the CSI spec bundle to 0.2.0.

The bundle is generated as follows:

1. Download the tarball from the following link:
   https://github.com/container-storage-interface/spec/archive/
   v0.2.0.tar.gz
2. Run the following commands:
   tar zxf v0.2.0.tar.gz
   mv spec-0.2.0 csi-0.2.0
   tar zcf csi-0.2.0.tar.gz csi-0.2.0

Review: https://reviews.apache.org/r/66398/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea7a94bd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea7a94bd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea7a94bd

Branch: refs/heads/master
Commit: ea7a94bd2a9605f91b1baad5c3b7b5e3d9505384
Parents: 3fb36f1
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 12:07:07 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 14:01:52 2018 -0700

----------------------------------------------------------------------
 3rdparty/cmake/Versions.cmake |   4 ++--
 3rdparty/csi-0.1.0.tar.gz     | Bin 67886 -> 0 bytes
 3rdparty/csi-0.2.0.tar.gz     | Bin 0 -> 70677 bytes
 3rdparty/versions.am          |   2 +-
 4 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ea7a94bd/3rdparty/cmake/Versions.cmake
----------------------------------------------------------------------
diff --git a/3rdparty/cmake/Versions.cmake b/3rdparty/cmake/Versions.cmake
index 93f0322..605cbde 100644
--- a/3rdparty/cmake/Versions.cmake
+++ b/3rdparty/cmake/Versions.cmake
@@ -2,8 +2,8 @@ set(BOOST_VERSION           "1.65.0")
 set(BOOST_HASH              "SHA256=085A1696AE2E735AACD0A497d46AACD7EEC0476E0D39937162F642B92F406476")
 set(CONCURRENTQUEUE_VERSION "7b69a8f")
 set(CONCURRENTQUEUE_HASH    "SHA256=B2741A1FB2172C2A829503A85D5EE7548BE7ED04236A3FD1EFD2B6088E065CB7")
-set(CSI_VERSION             "0.1.0")
-set(CSI_HASH                "SHA256=A19778BF1D3658C97855544D3C208BD8F2AD23A25D1B9487D5B9EF7874C50382")
+set(CSI_VERSION             "0.2.0")
+set(CSI_HASH                "SHA256=CB75FC99B03F3C7C30E407AE86BA63EB069AE4A167A26C94FE97F09CB7FF8771")
 set(CURL_VERSION            "7.57.0")
 set(CURL_HASH               "SHA256=7CE35F207562674E71DBADA6891B37E3F043C1E7A82915CB9C2A17AD3A9D659B")
 set(ELFIO_VERSION           "3.2")

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea7a94bd/3rdparty/csi-0.1.0.tar.gz
----------------------------------------------------------------------
diff --git a/3rdparty/csi-0.1.0.tar.gz b/3rdparty/csi-0.1.0.tar.gz
deleted file mode 100644
index 37b1b4c..0000000
Binary files a/3rdparty/csi-0.1.0.tar.gz and /dev/null differ

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea7a94bd/3rdparty/csi-0.2.0.tar.gz
----------------------------------------------------------------------
diff --git a/3rdparty/csi-0.2.0.tar.gz b/3rdparty/csi-0.2.0.tar.gz
new file mode 100644
index 0000000..eea1cbd
Binary files /dev/null and b/3rdparty/csi-0.2.0.tar.gz differ

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea7a94bd/3rdparty/versions.am
----------------------------------------------------------------------
diff --git a/3rdparty/versions.am b/3rdparty/versions.am
index 1bc87bb..63d879b 100644
--- a/3rdparty/versions.am
+++ b/3rdparty/versions.am
@@ -21,7 +21,7 @@
 
 BOOST_VERSION = 1.65.0
 CONCURRENTQUEUE_VERSION = 7b69a8f
-CSI_VERSION = 0.1.0
+CSI_VERSION = 0.2.0
 ELFIO_VERSION = 3.2
 GLOG_VERSION = 0.3.3
 GOOGLETEST_VERSION = 1.8.0


[5/6] mesos git commit: Adapted storage local resource provider to use CSI v0.2.

Posted by ch...@apache.org.
Adapted storage local resource provider to use CSI v0.2.

This patch contains necessary changes for the storage local resource
provider to use CSI v0.2. Support for the `STAGE_UNSTAGE_VOLUME` CSI
node service capability is not implemented in this patch yet.

Review: https://reviews.apache.org/r/66410/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aeffcd7d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aeffcd7d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aeffcd7d

Branch: refs/heads/master
Commit: aeffcd7d9a1f9c97e5e347063ceab71c43c00e2d
Parents: 6dfd259
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 12:07:19 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 14:01:52 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 512 ++++++++++++------------
 1 file changed, 257 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/aeffcd7d/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a07620d..40544e0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -366,10 +366,11 @@ private:
   void reconcileOperations(
       const Event::ReconcileOperations& reconcile);
 
-  Future<csi::Client> connect(const string& endpoint);
-  Future<csi::Client> getService(const ContainerID& containerId);
+  Future<csi::v0::Client> connect(const string& endpoint);
+  Future<csi::v0::Client> getService(const ContainerID& containerId);
   Future<Nothing> killService(const ContainerID& containerId);
 
+  Future<Nothing> prepareIdentityService();
   Future<Nothing> prepareControllerService();
   Future<Nothing> prepareNodeService();
   Future<Nothing> controllerPublish(const string& volumeId);
@@ -384,7 +385,7 @@ private:
   Future<string> validateCapability(
       const string& volumeId,
       const Option<Labels>& metadata,
-      const csi::VolumeCapability& capability);
+      const csi::v0::VolumeCapability& capability);
   Future<Resources> listVolumes();
   Future<Resources> getCapacities();
 
@@ -436,9 +437,8 @@ private:
 
   shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
-  csi::Version csiVersion;
-  csi::VolumeCapability defaultMountCapability;
-  csi::VolumeCapability defaultBlockCapability;
+  csi::v0::VolumeCapability defaultMountCapability;
+  csi::v0::VolumeCapability defaultBlockCapability;
   string bootId;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
@@ -453,14 +453,15 @@ private:
   // True if a reconciliation of storage pools is happening.
   bool reconciling;
 
-  ContainerID controllerContainerId;
-  ContainerID nodeContainerId;
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
-  hashmap<ContainerID, Owned<Promise<csi::Client>>> services;
-
-  Option<csi::GetPluginInfoResponse> controllerInfo;
-  Option<csi::GetPluginInfoResponse> nodeInfo;
-  Option<csi::ControllerCapabilities> controllerCapabilities;
+  hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services;
+
+  Option<ContainerID> nodeContainerId;
+  Option<ContainerID> controllerContainerId;
+  Option<csi::v0::GetPluginInfoResponse> pluginInfo;
+  csi::v0::PluginCapabilities pluginCapabilities;
+  csi::v0::ControllerCapabilities controllerCapabilities;
+  csi::v0::NodeCapabilities nodeCapabilities;
   Option<string> nodeId;
 
   // We maintain the following invariant: if one operation depends on
@@ -554,18 +555,13 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
-  // Set CSI version to 0.1.0.
-  csiVersion.set_major(0);
-  csiVersion.set_minor(1);
-  csiVersion.set_patch(0);
-
   // Default mount and block capabilities for pre-existing volumes.
   defaultMountCapability.mutable_mount();
   defaultMountCapability.mutable_access_mode()
-    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
   defaultBlockCapability.mutable_block();
   defaultBlockCapability.mutable_access_mode()
-    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+    ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
 
   Try<string> _bootId = os::bootId();
   if (_bootId.isError()) {
@@ -577,24 +573,24 @@ void StorageLocalResourceProviderProcess::initialize()
 
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    auto it = find(
-        container.services().begin(),
-        container.services().end(),
-        CSIPluginContainerInfo::CONTROLLER_SERVICE);
-    if (it != container.services().end()) {
-      controllerContainerId = getContainerId(info, container);
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::NODE_SERVICE)) {
+      nodeContainerId = getContainerId(info, container);
       break;
     }
   }
 
+  CHECK_SOME(nodeContainerId);
+
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    auto it = find(
-        container.services().begin(),
-        container.services().end(),
-        CSIPluginContainerInfo::NODE_SERVICE);
-    if (it != container.services().end()) {
-      nodeContainerId = getContainerId(info, container);
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::CONTROLLER_SERVICE)) {
+      controllerContainerId = getContainerId(info, container);
       break;
     }
   }
@@ -720,10 +716,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
 
     const ContainerID& containerId = containerPath->containerId;
 
+    CHECK_SOME(nodeContainerId);
+
     // Do not kill the up-to-date controller or node container.
     // Otherwise, kill them and perform cleanups.
-    if (containerId == controllerContainerId ||
-        containerId == nodeContainerId) {
+    if (nodeContainerId == containerId ||
+        controllerContainerId == containerId) {
       const string configPath = csi::paths::getContainerInfoPath(
           slave::paths::getCsiRootDir(workDir),
           info.storage().plugin().type(),
@@ -776,11 +774,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
       })));
   }
 
-  // NOTE: The `GetNodeID` CSI call is only supported if the plugin has
-  // the `PUBLISH_UNPUBLISH_VOLUME` controller capability. So to decide
-  // if `GetNodeID` should be called in `prepareNodeService`, we need to
-  // run `prepareControllerService` first.
+  // NOTE: The `Controller` service is supported if the plugin has the
+  // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is
+  // supported if the `Controller` service has the
+  // `PUBLISH_UNPUBLISH_VOLUME` capability. Therefore, we first launch
+  // the node plugin to get the plugin capabilities, then decide if we
+  // need to launch the controller plugin and get the node ID.
   return collect(futures)
+    .then(defer(self(), &Self::prepareIdentityService))
     .then(defer(self(), &Self::prepareControllerService))
     .then(defer(self(), &Self::prepareNodeService));
 }
@@ -1672,19 +1673,19 @@ void StorageLocalResourceProviderProcess::reconcileOperations(
 
 // Returns a future of a CSI client that waits for the endpoint socket
 // to appear if necessary, then connects to the socket and check its
-// supported version.
-Future<csi::Client> StorageLocalResourceProviderProcess::connect(
+// readiness.
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect(
     const string& endpoint)
 {
-  Future<csi::Client> client;
+  Future<csi::v0::Client> future;
 
   if (os::exists(endpoint)) {
-    client = csi::Client("unix://" + endpoint, runtime);
+    future = csi::v0::Client("unix://" + endpoint, runtime);
   } else {
     // Wait for the endpoint socket to appear until the timeout expires.
     Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
 
-    client = loop(
+    future = loop(
         self(),
         [=]() -> Future<Nothing> {
           if (timeout.expired()) {
@@ -1693,30 +1694,19 @@ Future<csi::Client> StorageLocalResourceProviderProcess::connect(
 
           return after(Milliseconds(10));
         },
-        [=](const Nothing&) -> ControlFlow<csi::Client> {
+        [=](const Nothing&) -> ControlFlow<csi::v0::Client> {
           if (os::exists(endpoint)) {
-            return Break(csi::Client("unix://" + endpoint, runtime));
+            return Break(csi::v0::Client("unix://" + endpoint, runtime));
           }
 
           return Continue();
         });
   }
 
-  return client
-    .then(defer(self(), [=](csi::Client client) {
-      return client.GetSupportedVersions(csi::GetSupportedVersionsRequest())
-        .then(defer(self(), [=](
-            const csi::GetSupportedVersionsResponse& response)
-            -> Future<csi::Client> {
-          auto it = find(
-              response.supported_versions().begin(),
-              response.supported_versions().end(),
-              csiVersion);
-          if (it == response.supported_versions().end()) {
-            return Failure(
-                "CSI version " + stringify(csiVersion) + " is not supported");
-          }
-
+  return future
+    .then(defer(self(), [=](csi::v0::Client client) {
+      return client.Probe(csi::v0::ProbeRequest())
+        .then(defer(self(), [=](const csi::v0::ProbeResponse& response) {
           return client;
         }));
     }));
@@ -1726,7 +1716,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::connect(
 // Returns a future of the latest CSI client for the specified plugin
 // container. If the container is not already running, this method will
 // start a new a new container daemon.
-Future<csi::Client> StorageLocalResourceProviderProcess::getService(
+Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
     const ContainerID& containerId)
 {
   if (daemons.contains(containerId)) {
@@ -1802,7 +1792,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService(
     ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
 
   CHECK(!services.contains(containerId));
-  services[containerId].reset(new Promise<csi::Client>());
+  services[containerId].reset(new Promise<csi::v0::Client>());
 
   Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
       extractParentEndpoint(url),
@@ -1815,7 +1805,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService(
         CHECK(services.at(containerId)->future().isPending());
 
         return connect(endpointPath)
-          .then(defer(self(), [=](const csi::Client& client) {
+          .then(defer(self(), [=](const csi::v0::Client& client) {
             services.at(containerId)->set(client);
             return Nothing();
           }))
@@ -1827,16 +1817,16 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService(
           }));
       })),
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
-        if (containerId == controllerContainerId) {
+        if (containerId == controllerContainerId.get()) {
           metrics.csi_controller_plugin_terminations++;
         }
 
-        if (containerId == nodeContainerId) {
+        if (containerId == nodeContainerId.get()) {
           metrics.csi_node_plugin_terminations++;
         }
 
         services.at(containerId)->discard();
-        services.at(containerId).reset(new Promise<csi::Client>());
+        services.at(containerId).reset(new Promise<csi::v0::Client>());
 
         if (os::exists(endpointPath)) {
           Try<Nothing> rm = os::rm(endpointPath);
@@ -1940,54 +1930,31 @@ Future<Nothing> StorageLocalResourceProviderProcess::killService(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
+Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService()
 {
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the plugin info and check for consistency.
-      csi::GetPluginInfoRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.GetPluginInfo(request)
-        .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) {
-          controllerInfo = response;
+  CHECK_SOME(nodeContainerId);
 
-          LOG(INFO)
-            << "Controller plugin loaded: " << stringify(controllerInfo.get());
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the plugin info.
+      return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+        .then(defer(self(), [=](
+            const csi::v0::GetPluginInfoResponse& response) {
+          pluginInfo = response;
 
-          if (nodeInfo.isSome() &&
-              (controllerInfo->name() != nodeInfo->name() ||
-               controllerInfo->vendor_version() !=
-                 nodeInfo->vendor_version())) {
-            LOG(WARNING)
-              << "Inconsistent controller and node plugin components. Please "
-                 "check with the plugin vendor to ensure compatibility.";
-          }
+          LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get());
 
-          // NOTE: We always get the latest service future before
-          // proceeding to the next step.
-          return getService(controllerContainerId);
+          // Get the latest service future before proceeding to the next step.
+          return getService(nodeContainerId.get());
         }));
     }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Probe the plugin to validate the runtime environment.
-      csi::ControllerProbeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ControllerProbe(request)
-        .then(defer(self(), [=](const csi::ControllerProbeResponse& response) {
-          return getService(controllerContainerId);
-        }));
-    }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the controller capabilities.
-      csi::ControllerGetCapabilitiesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ControllerGetCapabilities(request)
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the plugin capabilities.
+      return client.GetPluginCapabilities(
+          csi::v0::GetPluginCapabilitiesRequest())
         .then(defer(self(), [=](
-            const csi::ControllerGetCapabilitiesResponse& response) {
-          controllerCapabilities = response.capabilities();
+            const csi::v0::GetPluginCapabilitiesResponse& response) {
+          pluginCapabilities = response.capabilities();
 
           return Nothing();
         }));
@@ -1995,73 +1962,101 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
+// NOTE: This can only be called after `prepareIdentityService`.
+Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService()
 {
-  // NOTE: This can only be called after `prepareControllerService`.
-  CHECK_SOME(controllerCapabilities);
+  CHECK_SOME(pluginInfo);
 
-  return getService(nodeContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      // Get the plugin info and check for consistency.
-      csi::GetPluginInfoRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  if (!pluginCapabilities.controllerService) {
+    return Nothing();
+  }
 
-      return client.GetPluginInfo(request)
-        .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) {
-          nodeInfo = response;
+  if (controllerContainerId.isNone()) {
+    return Failure(
+        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
+  }
 
-          LOG(INFO) << "Node plugin loaded: " << stringify(nodeInfo.get());
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the controller plugin info and check for consistency.
+      return client.GetPluginInfo(csi::v0::GetPluginInfoRequest())
+        .then(defer(self(), [=](
+            const csi::v0::GetPluginInfoResponse& response) {
+          LOG(INFO) << "Controller plugin loaded: " << stringify(response);
 
-          if (controllerInfo.isSome() &&
-              (controllerInfo->name() != nodeInfo->name() ||
-               controllerInfo->vendor_version() !=
-                 nodeInfo->vendor_version())) {
+          if (pluginInfo->name() != response.name() ||
+              pluginInfo->vendor_version() != response.vendor_version()) {
             LOG(WARNING)
               << "Inconsistent controller and node plugin components. Please "
                  "check with the plugin vendor to ensure compatibility.";
           }
 
-          // NOTE: We always get the latest service future before
-          // proceeding to the next step.
-          return getService(nodeContainerId);
+          // Get the latest service future before proceeding to the next step.
+          return getService(controllerContainerId.get());
         }));
     }))
-    .then(defer(self(), [=](csi::Client client) {
-      // Probe the plugin to validate the runtime environment.
-      csi::NodeProbeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.NodeProbe(request)
-        .then(defer(self(), [=](const csi::NodeProbeResponse& response) {
-          return getService(nodeContainerId);
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the controller capabilities.
+      return client.ControllerGetCapabilities(
+          csi::v0::ControllerGetCapabilitiesRequest())
+        .then(defer(self(), [=](
+            const csi::v0::ControllerGetCapabilitiesResponse& response) {
+          controllerCapabilities = response.capabilities();
+
+          return Nothing();
         }));
-    }))
-    .then(defer(self(), [=](csi::Client client) -> Future<Nothing> {
-      if (!controllerCapabilities->publishUnpublishVolume) {
-        return Nothing();
-      }
+    }));
+}
 
-      // Get the node ID.
-      csi::GetNodeIDRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
 
-      return client.GetNodeID(request)
-        .then(defer(self(), [=](const csi::GetNodeIDResponse& response) {
-          nodeId = response.node_id();
+// NOTE: This can only be called after `prepareIdentityService` and
+// `prepareControllerService`.
+Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
+{
+  CHECK_SOME(nodeContainerId);
 
-          return Nothing();
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      // Get the node capabilities.
+      return client.NodeGetCapabilities(csi::v0::NodeGetCapabilitiesRequest())
+        .then(defer(self(), [=](
+            const csi::v0::NodeGetCapabilitiesResponse& response)
+            -> Future<csi::v0::Client> {
+          nodeCapabilities = response.capabilities();
+
+          // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+          if (nodeCapabilities.stageUnstageVolume) {
+            return Failure(
+                "Node capability 'STAGE_UNSTAGE_VOLUME' is not supported");
+          }
+
+          // Get the latest service future before proceeding to the next step.
+          return getService(nodeContainerId.get());
+        }))
+        .then(defer(self(), [=](csi::v0::Client client) -> Future<Nothing> {
+          if (!controllerCapabilities.publishUnpublishVolume) {
+            return Nothing();
+          }
+
+          // Get the node ID.
+          return client.NodeGetId(csi::v0::NodeGetIdRequest())
+            .then(defer(self(), [=](
+                const csi::v0::NodeGetIdResponse& response) {
+              nodeId = response.node_id();
+
+              return Nothing();
+            }));
         }));
     }));
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     const string& volumeId)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService`.
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
@@ -2080,11 +2075,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 
   Future<Nothing> controllerPublished;
 
-  if (controllerCapabilities->publishUnpublishVolume) {
-    controllerPublished = getService(controllerContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::ControllerPublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+  if (controllerCapabilities.publishUnpublishVolume) {
+    CHECK_SOME(controllerContainerId);
+
+    controllerPublished = getService(controllerContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::ControllerPublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_node_id(nodeId.get());
         request.mutable_volume_capability()
@@ -2095,9 +2091,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 
         return client.ControllerPublishVolume(request)
           .then(defer(self(), [=](
-              const csi::ControllerPublishVolumeResponse& response) {
-            *volumes.at(volumeId).state.mutable_publish_volume_info() =
-              response.publish_volume_info();
+              const csi::v0::ControllerPublishVolumeResponse& response) {
+            *volumes.at(volumeId).state.mutable_publish_info() =
+              response.publish_info();
 
             return Nothing();
           }));
@@ -2122,13 +2118,13 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService`.
 Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
     const string& volumeId)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService`.
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK_SOME(controllerContainerId);
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
@@ -2147,11 +2143,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
 
   Future<Nothing> controllerUnpublished;
 
-  if (controllerCapabilities->publishUnpublishVolume) {
-    controllerUnpublished = getService(controllerContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::ControllerUnpublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+  if (controllerCapabilities.publishUnpublishVolume) {
+    controllerUnpublished = getService(controllerContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::ControllerUnpublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_node_id(nodeId.get());
 
@@ -2165,7 +2160,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
   return controllerUnpublished
     .then(defer(self(), [=] {
       volumes.at(volumeId).state.set_state(csi::state::VolumeState::CREATED);
-      volumes.at(volumeId).state.mutable_publish_volume_info()->clear();
+      volumes.at(volumeId).state.mutable_publish_info()->clear();
       checkpointVolumeState(volumeId);
 
       return Nothing();
@@ -2182,6 +2177,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish(
 Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
     const string& volumeId)
 {
+  // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+
+  CHECK_SOME(nodeContainerId);
+
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
         csi::state::VolumeState::NODE_PUBLISH) {
@@ -2208,13 +2207,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
         "Failed to create mount point '" + mountPath + "': " + mkdir.error());
   }
 
-  return getService(nodeContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      csi::NodePublishVolumeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  return getService(nodeContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      csi::v0::NodePublishVolumeRequest request;
       request.set_volume_id(volumeId);
-      *request.mutable_publish_volume_info() =
-        volumes.at(volumeId).state.publish_volume_info();
+      *request.mutable_publish_info() =
+        volumes.at(volumeId).state.publish_info();
       request.set_target_path(mountPath);
       request.mutable_volume_capability()
         ->CopyFrom(volumes.at(volumeId).state.volume_capability());
@@ -2243,6 +2241,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish(
 Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
     const string& volumeId)
 {
+  // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support.
+
+  CHECK_SOME(nodeContainerId);
+
   CHECK(volumes.contains(volumeId));
   if (volumes.at(volumeId).state.state() ==
         csi::state::VolumeState::NODE_UNPUBLISH) {
@@ -2267,10 +2269,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
   Future<Nothing> nodeUnpublished;
 
   if (os::exists(mountPath)) {
-    nodeUnpublished = getService(nodeContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        csi::NodeUnpublishVolumeRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+    nodeUnpublished = getService(nodeContainerId.get())
+      .then(defer(self(), [=](csi::v0::Client client) {
+        csi::v0::NodeUnpublishVolumeRequest request;
         request.set_volume_id(volumeId);
         request.set_target_path(mountPath);
 
@@ -2310,22 +2311,22 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 
 
 // Returns a CSI volume ID.
+// NOTE: This can only be called after `prepareControllerService`.
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
     const DiskProfileAdaptor::ProfileInfo& profileInfo)
 {
-  // NOTE: This can only be called after `prepareControllerService`.
-  CHECK_SOME(controllerCapabilities);
-
-  if (!controllerCapabilities->createDeleteVolume) {
-    return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+  if (!controllerCapabilities.createDeleteVolume) {
+    return Failure(
+        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
-      csi::CreateVolumeRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
+      csi::v0::CreateVolumeRequest request;
       request.set_name(name);
       request.mutable_capacity_range()
         ->set_required_bytes(capacity.bytes());
@@ -2335,47 +2336,49 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
       *request.mutable_parameters() = profileInfo.parameters;
 
       return client.CreateVolume(request)
-        .then(defer(self(), [=](const csi::CreateVolumeResponse& response) {
-          const csi::VolumeInfo& volumeInfo = response.volume_info();
+        .then(defer(self(), [=](const csi::v0::CreateVolumeResponse& response) {
+          const csi::v0::Volume& volume = response.volume();
 
-          if (volumes.contains(volumeInfo.id())) {
+          if (volumes.contains(volume.id())) {
             // The resource provider failed over after the last
             // `CreateVolume` call, but before the operation status was
             // checkpointed.
             CHECK_EQ(csi::state::VolumeState::CREATED,
-                     volumes.at(volumeInfo.id()).state.state());
+                     volumes.at(volume.id()).state.state());
           } else {
             csi::state::VolumeState volumeState;
             volumeState.set_state(csi::state::VolumeState::CREATED);
             volumeState.mutable_volume_capability()
               ->CopyFrom(profileInfo.capability);
-            *volumeState.mutable_volume_attributes() = volumeInfo.attributes();
+            *volumeState.mutable_volume_attributes() = volume.attributes();
 
-            volumes.put(volumeInfo.id(), std::move(volumeState));
-            checkpointVolumeState(volumeInfo.id());
+            volumes.put(volume.id(), std::move(volumeState));
+            checkpointVolumeState(volume.id());
           }
 
-          return volumeInfo.id();
+          return volume.id();
         }));
     }));
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// `prepareNodeService` (since it may require `NodeUnpublishVolume`).
 Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
     const string& volumeId,
     bool preExisting)
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // `prepareNodeService` (since it may require `NodeUnpublishVolume`).
-  CHECK_SOME(controllerCapabilities);
-  CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome());
+  CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome());
 
   // We do not need the capability for pre-existing volumes since no
   // actual `DeleteVolume` call will be made.
-  if (!preExisting && !controllerCapabilities->createDeleteVolume) {
-    return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported");
+  if (!preExisting && !controllerCapabilities.createDeleteVolume) {
+    return Failure(
+        "Controller capability 'CREATE_DELETE_VOLUME' is not supported");
   }
 
+  CHECK_SOME(controllerContainerId);
+
   const string volumePath = csi::paths::getVolumePath(
       slave::paths::getCsiRootDir(workDir),
       info.storage().plugin().type(),
@@ -2399,10 +2402,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
       case csi::state::VolumeState::CREATED: {
         if (!preExisting) {
           deleted = deleted
-            .then(defer(self(), &Self::getService, controllerContainerId))
-            .then(defer(self(), [=](csi::Client client) {
-              csi::DeleteVolumeRequest request;
-              request.mutable_version()->CopyFrom(csiVersion);
+            .then(defer(self(), &Self::getService, controllerContainerId.get()))
+            .then(defer(self(), [=](csi::v0::Client client) {
+              csi::v0::DeleteVolumeRequest request;
               request.set_volume_id(volumeId);
 
               return client.DeleteVolume(request)
@@ -2443,34 +2445,41 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-// Validates if a volume has the specified capability. This is called
-// when applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing
-// volume, so we make it returns a volume ID, similar to `createVolume`.
+// Validates if a volume has the specified capability. This is called when
+// applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing volume, so we
+// make it returns a volume ID, similar to `createVolume`.
+// NOTE: This can only be called after `prepareIdentityService` and only for
+// newly discovered volumes.
 Future<string> StorageLocalResourceProviderProcess::validateCapability(
     const string& volumeId,
     const Option<Labels>& metadata,
-    const csi::VolumeCapability& capability)
+    const csi::v0::VolumeCapability& capability)
 {
-  // NOTE: This can only be called for newly discovered volumes.
   CHECK(!volumes.contains(volumeId));
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  if (!pluginCapabilities.controllerService) {
+    return Failure(
+        "Plugin capability 'CONTROLLER_SERVICE' is not supported");
+  }
+
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       google::protobuf::Map<string, string> volumeAttributes;
 
       if (metadata.isSome()) {
         volumeAttributes = convertLabelsToStringMap(metadata.get()).get();
       }
 
-      csi::ValidateVolumeCapabilitiesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
+      csi::v0::ValidateVolumeCapabilitiesRequest request;
       request.set_volume_id(volumeId);
       request.add_volume_capabilities()->CopyFrom(capability);
       *request.mutable_volume_attributes() = volumeAttributes;
 
       return client.ValidateVolumeCapabilities(request)
         .then(defer(self(), [=](
-            const csi::ValidateVolumeCapabilitiesResponse& response)
+            const csi::v0::ValidateVolumeCapabilitiesResponse& response)
             -> Future<string> {
           if (!response.supported()) {
             return Failure(
@@ -2492,27 +2501,25 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // the resource provider ID has been obtained.
-  CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
   // This is only used for reconciliation so no failure is returned.
-  if (!controllerCapabilities->listVolumes) {
+  if (!controllerCapabilities.listVolumes) {
     return Resources();
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       // TODO(chhsiao): Set the max entries and use a loop to do
       // mutliple `ListVolumes` calls.
-      csi::ListVolumesRequest request;
-      request.mutable_version()->CopyFrom(csiVersion);
-
-      return client.ListVolumes(request)
-        .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
+      return client.ListVolumes(csi::v0::ListVolumesRequest())
+        .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
           Resources resources;
 
           // Recover disk profiles from the checkpointed state.
@@ -2529,15 +2536,14 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
           foreach (const auto& entry, response.entries()) {
             resources += createRawDiskResource(
                 info,
-                Bytes(entry.volume_info().capacity_bytes()),
-                volumesToProfiles.contains(entry.volume_info().id())
-                  ? volumesToProfiles.at(entry.volume_info().id())
+                Bytes(entry.volume().capacity_bytes()),
+                volumesToProfiles.contains(entry.volume().id())
+                  ? volumesToProfiles.at(entry.volume().id())
                   : Option<string>::none(),
-                entry.volume_info().id(),
-                entry.volume_info().attributes().empty()
+                entry.volume().id(),
+                entry.volume().attributes().empty()
                   ? Option<Labels>::none()
-                  : convertStringMapToLabels(
-                        entry.volume_info().attributes()));
+                  : convertStringMapToLabels(entry.volume().attributes()));
           }
 
           return resources;
@@ -2546,20 +2552,21 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
 }
 
 
+// NOTE: This can only be called after `prepareControllerService` and
+// the resource provider ID has been obtained.
 Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 {
-  // NOTE: This can only be called after `prepareControllerService` and
-  // the resource provider ID has been obtained.
-  CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
   // This is only used for reconciliation so no failure is returned.
-  if (!controllerCapabilities->getCapacity) {
+  if (!controllerCapabilities.getCapacity) {
     return Resources();
   }
 
-  return getService(controllerContainerId)
-    .then(defer(self(), [=](csi::Client client) {
+  CHECK_SOME(controllerContainerId);
+
+  return getService(controllerContainerId.get())
+    .then(defer(self(), [=](csi::v0::Client client) {
       list<Future<Resources>> futures;
 
       foreach (const string& profile, knownProfiles) {
@@ -2570,14 +2577,13 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
         const DiskProfileAdaptor::ProfileInfo& profileInfo =
           profileInfos.at(profile);
 
-        csi::GetCapacityRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
+        csi::v0::GetCapacityRequest request;
         request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
         *request.mutable_parameters() = profileInfo.parameters;
 
         futures.push_back(client.GetCapacity(request)
           .then(defer(self(), [=](
-              const csi::GetCapacityResponse& response) -> Resources {
+              const csi::v0::GetCapacityResponse& response) -> Resources {
             if (response.available_capacity() == 0) {
               return Resources();
             }
@@ -3218,26 +3224,22 @@ Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
         "' does not follow Java package naming convention");
   }
 
-  bool hasControllerService = false;
+  // Verify that the plugin provides the CSI node service.
+  // TODO(chhsiao): We should move this check to a validation function
+  // for `CSIPluginInfo`.
   bool hasNodeService = false;
 
   foreach (const CSIPluginContainerInfo& container,
            info.storage().plugin().containers()) {
-    for (int i = 0; i < container.services_size(); i++) {
-      const CSIPluginContainerInfo::Service service = container.services(i);
-      if (service == CSIPluginContainerInfo::CONTROLLER_SERVICE) {
-        hasControllerService = true;
-      } else if (service == CSIPluginContainerInfo::NODE_SERVICE) {
-        hasNodeService = true;
-      }
+    if (container.services().end() != find(
+            container.services().begin(),
+            container.services().end(),
+            CSIPluginContainerInfo::NODE_SERVICE)) {
+      hasNodeService = true;
+      break;
     }
   }
 
-  if (!hasControllerService) {
-    return Error(
-        stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found");
-  }
-
   if (!hasNodeService) {
     return Error(
         stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");


[2/6] mesos git commit: Used `csi::v0::VolumeCapability` in disk profile adaptors.

Posted by ch...@apache.org.
Used `csi::v0::VolumeCapability` in disk profile adaptors.

To adapt the change in CSI package names (from `csi` to `csi.v0`), we
now use `csi::v0::VolumeCapability` in the disk profile adaptor module
interface.

NOTE: This is not future-proof if there is a breaking change in
`VolumeCapability`, we might need to change the module interface to
support multiple CSI versions in the future.

Review: https://reviews.apache.org/r/66409/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6dfd2594
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6dfd2594
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6dfd2594

Branch: refs/heads/master
Commit: 6dfd2594123b6218254081bf230c3fba11c65027
Parents: ea5d108
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu Apr 12 12:07:17 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu Apr 12 14:01:52 2018 -0700

----------------------------------------------------------------------
 .../mesos/resource_provider/storage/disk_profile_adaptor.hpp   | 2 +-
 src/resource_provider/storage/disk_profile.proto               | 2 +-
 src/resource_provider/storage/disk_profile_utils.cpp           | 4 ++--
 src/resource_provider/storage/disk_profile_utils.hpp           | 4 +++-
 src/tests/disk_profile_adaptor_tests.cpp                       | 6 +++---
 5 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6dfd2594/include/mesos/resource_provider/storage/disk_profile_adaptor.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resource_provider/storage/disk_profile_adaptor.hpp b/include/mesos/resource_provider/storage/disk_profile_adaptor.hpp
index 3a2d86a..739585b 100644
--- a/include/mesos/resource_provider/storage/disk_profile_adaptor.hpp
+++ b/include/mesos/resource_provider/storage/disk_profile_adaptor.hpp
@@ -63,7 +63,7 @@ public:
      * of `VolumeCapability` objects, the module will only allow a single
      * `VolumeCapability`.
      */
-    csi::VolumeCapability capability;
+    csi::v0::VolumeCapability capability;
 
     /**
      * Free-form key-value pairs which should be passed into the body

http://git-wip-us.apache.org/repos/asf/mesos/blob/6dfd2594/src/resource_provider/storage/disk_profile.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/disk_profile.proto b/src/resource_provider/storage/disk_profile.proto
index b9448d7..1c97e9c 100644
--- a/src/resource_provider/storage/disk_profile.proto
+++ b/src/resource_provider/storage/disk_profile.proto
@@ -52,7 +52,7 @@ message DiskProfileMapping {
     // NOTE: The name of this field is plural because some CSI requests
     // support multiple capabilities. However, Mesos currently does not
     // support this.
-    .csi.VolumeCapability volume_capabilities = 1;
+    .csi.v0.VolumeCapability volume_capabilities = 1;
 
     // Parameters passed to the CSI CreateVolume RPC.
     // This field is OPTIONAL.

http://git-wip-us.apache.org/repos/asf/mesos/blob/6dfd2594/src/resource_provider/storage/disk_profile_utils.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/disk_profile_utils.cpp b/src/resource_provider/storage/disk_profile_utils.cpp
index d07c11f..82c69d8 100644
--- a/src/resource_provider/storage/disk_profile_utils.cpp
+++ b/src/resource_provider/storage/disk_profile_utils.cpp
@@ -178,7 +178,7 @@ Option<Error> validate(const DiskProfileMapping& mapping)
 
 
 // TODO(chhsiao): Move this to CSI validation implementation file.
-Option<Error> validate(const csi::VolumeCapability& capability)
+Option<Error> validate(const csi::v0::VolumeCapability& capability)
 {
   if (capability.has_mount()) {
     // The total size of this repeated field may not exceed 4 KB.
@@ -201,7 +201,7 @@ Option<Error> validate(const csi::VolumeCapability& capability)
   }
 
   if (capability.access_mode().mode() ==
-      csi::VolumeCapability::AccessMode::UNKNOWN) {
+      csi::v0::VolumeCapability::AccessMode::UNKNOWN) {
     return Error("'access_mode.mode' is unknown or not set");
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6dfd2594/src/resource_provider/storage/disk_profile_utils.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/disk_profile_utils.hpp b/src/resource_provider/storage/disk_profile_utils.hpp
index 9a0a31b..8a83a15 100644
--- a/src/resource_provider/storage/disk_profile_utils.hpp
+++ b/src/resource_provider/storage/disk_profile_utils.hpp
@@ -17,6 +17,8 @@
 #ifndef __RESOURCE_PROVIDER_URI_DISK_PROFILE_UTILS_HPP__
 #define __RESOURCE_PROVIDER_URI_DISK_PROFILE_UTILS_HPP__
 
+#include <csi/spec.hpp>
+
 #include <mesos/mesos.hpp>
 
 #include <stout/option.hpp>
@@ -47,7 +49,7 @@ Option<Error> validate(const resource_provider::DiskProfileMapping& mapping);
 
 // Checks the fields inside a `VolumeCapability` according to the
 // comments above the protobuf.
-Option<Error> validate(const csi::VolumeCapability& capability);
+Option<Error> validate(const csi::v0::VolumeCapability& capability);
 
 } // namespace storage {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6dfd2594/src/tests/disk_profile_adaptor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/disk_profile_adaptor_tests.cpp b/src/tests/disk_profile_adaptor_tests.cpp
index 948f6ef..3337741 100644
--- a/src/tests/disk_profile_adaptor_tests.cpp
+++ b/src/tests/disk_profile_adaptor_tests.cpp
@@ -139,13 +139,13 @@ TEST_F(UriDiskProfileAdaptorTest, ParseExample)
   const string key = "my-profile";
   ASSERT_EQ(1u, parsed->profile_matrix().count(key));
 
-  csi::VolumeCapability capability =
+  csi::v0::VolumeCapability capability =
     parsed->profile_matrix().at(key).volume_capabilities();
 
   ASSERT_TRUE(capability.has_block());
   ASSERT_TRUE(capability.has_access_mode());
   ASSERT_EQ(
-      csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER,
+      csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER,
       capability.access_mode().mode());
 
   Map<string, string> parameters =
@@ -480,7 +480,7 @@ TEST_F(UriDiskProfileAdaptorTest, DISABLED_FetchFromFile)
   AWAIT_ASSERT_READY(mapping);
   ASSERT_TRUE(mapping.get().capability.has_block());
   ASSERT_EQ(
-      csi::VolumeCapability::AccessMode::MULTI_NODE_SINGLE_WRITER,
+      csi::v0::VolumeCapability::AccessMode::MULTI_NODE_SINGLE_WRITER,
       mapping.get().capability.access_mode().mode());
 
   Clock::resume();