You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/09 06:13:14 UTC

[mesos] 05/06: Refactored the test CSI plugin to make it easy to support CSI v1.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 8e60bc7a921a2d4b37e5bec32ad3a10ddec96e78
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Apr 4 18:48:24 2019 -0700

    Refactored the test CSI plugin to make it easy to support CSI v1.
    
    Review: https://reviews.apache.org/r/70403
---
 src/examples/test_csi_plugin.cpp | 951 +++++++++++++++++++++++++--------------
 1 file changed, 606 insertions(+), 345 deletions(-)

diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 4321f8f..54753d9 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -14,9 +14,15 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <algorithm>
+#include <limits>
 #include <memory>
 #include <thread>
 #include <utility>
+#include <vector>
+
+#include <google/protobuf/map.h>
+#include <google/protobuf/message.h>
 
 #include <grpcpp/grpcpp.h>
 
@@ -25,16 +31,24 @@
 
 #include <mesos/type_utils.hpp>
 
+#include <mesos/csi/types.hpp>
 #include <mesos/csi/v0.hpp>
 
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+
 #include <stout/bytes.hpp>
 #include <stout/flags.hpp>
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/path.hpp>
+#include <stout/some.hpp>
 #include <stout/stringify.hpp>
 #include <stout/strings.hpp>
+#include <stout/try.hpp>
+#include <stout/unreachable.hpp>
 
 #include <stout/os/exists.hpp>
 #include <stout/os/ls.hpp>
@@ -47,6 +61,7 @@
 
 #include "logging/logging.hpp"
 
+namespace http = process::http;
 namespace fs = mesos::internal::fs;
 
 using std::cerr;
@@ -59,6 +74,10 @@ using std::string;
 using std::unique_ptr;
 using std::vector;
 
+using google::protobuf::Map;
+using google::protobuf::MapPair;
+using google::protobuf::RepeatedPtrField;
+
 using grpc::AsyncGenericService;
 using grpc::ByteBuffer;
 using grpc::ClientContext;
@@ -73,6 +92,10 @@ using grpc::ServerContext;
 using grpc::Status;
 using grpc::WriteOptions;
 
+using mesos::csi::types::VolumeCapability;
+
+using process::grpc::StatusError;
+
 constexpr char PLUGIN_NAME[] = "org.apache.mesos.csi.test";
 constexpr char NODE_ID[] = "localhost";
 constexpr Bytes DEFAULT_VOLUME_CAPACITY = Megabytes(64);
@@ -144,7 +167,7 @@ public:
     // Construct the default mount volume capability.
     defaultVolumeCapability.mutable_mount();
     defaultVolumeCapability.mutable_access_mode()
-      ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
+      ->set_mode(VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
 
     // Scan for preprovisioned volumes.
     //
@@ -280,12 +303,69 @@ private:
   string getVolumePath(const VolumeInfo& volumeInfo);
   Try<VolumeInfo> parseVolumePath(const string& path);
 
+  Try<VolumeInfo, StatusError> createVolume(
+      const string& name,
+      const Bytes& requiredBytes,
+      const Bytes& limitBytes,
+      const RepeatedPtrField<VolumeCapability>& capabilities,
+      const Map<string, string> parameters);
+
+  Try<Nothing, StatusError> deleteVolume(const string& volumeId);
+
+  Try<Nothing, StatusError> controllerPublishVolume(
+      const string& volumeId,
+      const string& nodeId,
+      const VolumeCapability& capability,
+      bool readonly,
+      const Map<string, string>& volumeContext);
+
+  Try<Nothing, StatusError> controllerUnpublishVolume(
+      const string& volumeId, const string& nodeId);
+
+  // Returns `StatusError` if the volume does not exist; returns `Option<Error>`
+  // with an error set if the volume is not compatible with the given arguments.
+  Try<Option<Error>, StatusError> validateVolumeCapabilities(
+      const string& volumeId,
+      const Map<string, string>& volumeContext,
+      const RepeatedPtrField<VolumeCapability>& capabilities,
+      const Option<Map<string, string>>& parameters = None());
+
+  Try<vector<VolumeInfo>, StatusError> listVolumes(
+      const Option<int32_t>& maxEntries,
+      const Option<string>& startingToken);
+
+  Try<Bytes, StatusError> getCapacity(
+      const RepeatedPtrField<VolumeCapability>& capabilities,
+      const Map<string, string>& parameters);
+
+  Try<Nothing, StatusError> nodeStageVolume(
+      const string& volumeId,
+      const Map<string, string>& publishContext,
+      const string& stagingPath,
+      const VolumeCapability& capability,
+      const Map<string, string>& volumeContext);
+
+  Try<Nothing, StatusError> nodeUnstageVolume(
+      const string& volumeId, const string& stagingPath);
+
+  Try<Nothing, StatusError> nodePublishVolume(
+      const string& volumeId,
+      const Map<string, string>& publishContext,
+      const string& stagingPath,
+      const string& targetPath,
+      const VolumeCapability& capability,
+      bool readonly,
+      const Map<string, string>& volumeContext);
+
+  Try<Nothing, StatusError> nodeUnpublishVolume(
+      const string& volumeId, const string& targetPath);
+
   const string workDir;
   const string endpoint;
 
   Bytes availableCapacity;
-  csi::v0::VolumeCapability defaultVolumeCapability;
-  google::protobuf::Map<string, string> createParameters;
+  VolumeCapability defaultVolumeCapability;
+  Map<string, string> createParameters;
   hashmap<string, VolumeInfo> volumes;
 };
 
@@ -349,92 +429,26 @@ Status TestCSIPlugin::CreateVolume(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Validate required fields.
+  // TODO(chhsiao): Validate the request.
 
-  if (request->name().empty()) {
-    return Status(grpc::INVALID_ARGUMENT, "Volume name cannot be empty");
-  }
-
-  if (strings::contains(request->name(), stringify(os::PATH_SEPARATOR))) {
-    return Status(
-        grpc::INVALID_ARGUMENT,
-        "Volume name cannot contain '" + stringify(os::PATH_SEPARATOR) + "'");
-  }
+  Try<VolumeInfo, StatusError> result = createVolume(
+      request->name(),
+      request->capacity_range().required_bytes()
+        ? request->capacity_range().required_bytes() : 1,
+      request->capacity_range().limit_bytes()
+        ? request->capacity_range().limit_bytes()
+        : std::numeric_limits<int64_t>::max(),
+      mesos::csi::v0::devolve(request->volume_capabilities()),
+      request->parameters());
 
-  foreach (const csi::v0::VolumeCapability& capability,
-           request->volume_capabilities()) {
-    if (capability != defaultVolumeCapability) {
-      return Status(grpc::INVALID_ARGUMENT, "Unsupported volume capabilities");
-    }
+  if (result.isError()) {
+    return result.error().status;
   }
 
-  if (request->parameters() != createParameters) {
-    return Status(grpc::INVALID_ARGUMENT, "Unsupported create parameters");
-  }
-
-  // The volume ID is determined by `name`, so we check whether the volume
-  // corresponding to `name` is compatible to the request if it exists.
-  if (volumes.contains(request->name())) {
-    const VolumeInfo volumeInfo = volumes.at(request->name());
-
-    if (request->has_capacity_range()) {
-      const csi::v0::CapacityRange& range = request->capacity_range();
-
-      if (range.limit_bytes() != 0 &&
-          volumeInfo.size > Bytes(range.limit_bytes())) {
-        return Status(grpc::ALREADY_EXISTS, "Cannot satisfy 'limit_bytes'");
-      } else if (range.required_bytes() != 0 &&
-                 volumeInfo.size < Bytes(range.required_bytes())) {
-        return Status(grpc::ALREADY_EXISTS, "Cannot satisfy 'required_bytes'");
-      }
-    }
-  } else {
-    if (availableCapacity == Bytes(0)) {
-      return Status(grpc::OUT_OF_RANGE, "Insufficient capacity");
-    }
-
-    VolumeInfo volumeInfo;
-    volumeInfo.id = request->name();
-    volumeInfo.size = min(DEFAULT_VOLUME_CAPACITY, availableCapacity);
-
-    if (request->has_capacity_range()) {
-      const csi::v0::CapacityRange& range = request->capacity_range();
-
-      // The highest we can pick.
-      Bytes limit = range.limit_bytes() != 0
-        ? min(availableCapacity, Bytes(range.limit_bytes()))
-        : availableCapacity;
-
-      if (range.required_bytes() != 0 &&
-          limit < Bytes(range.required_bytes())) {
-        return Status(grpc::OUT_OF_RANGE, "Cannot satisfy 'required_bytes'");
-      }
-
-      volumeInfo.size = min(
-          limit,
-          max(DEFAULT_VOLUME_CAPACITY, Bytes(range.required_bytes())));
-    }
-
-    const string path = getVolumePath(volumeInfo);
-
-    Try<Nothing> mkdir = os::mkdir(path);
-    if (mkdir.isError()) {
-      return Status(
-          grpc::INTERNAL,
-          "Failed to create volume '" + volumeInfo.id + "': " + mkdir.error());
-    }
-
-    CHECK_GE(availableCapacity, volumeInfo.size);
-    availableCapacity -= volumeInfo.size;
-    volumes.put(volumeInfo.id, std::move(volumeInfo));
-  }
-
-  const VolumeInfo& volumeInfo = volumes.at(request->name());
-
-  response->mutable_volume()->set_id(volumeInfo.id);
-  response->mutable_volume()->set_capacity_bytes(volumeInfo.size.bytes());
+  response->mutable_volume()->set_id(result->id);
+  response->mutable_volume()->set_capacity_bytes(result->size.bytes());
   (*response->mutable_volume()->mutable_attributes())["path"] =
-    getVolumePath(volumeInfo);
+    getVolumePath(result.get());
 
   return Status::OK;
 }
@@ -447,26 +461,14 @@ Status TestCSIPlugin::DeleteVolume(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Validate required fields.
+  // TODO(chhsiao): Validate the request.
 
-  if (!volumes.contains(request->volume_id())) {
-    return Status::OK;
-  }
+  Try<Nothing, StatusError> result = deleteVolume(request->volume_id());
 
-  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
-  const string path = getVolumePath(volumeInfo);
-
-  Try<Nothing> rmdir = os::rmdir(path);
-  if (rmdir.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to delete volume '" + request->volume_id() + "': " +
-        rmdir.error());
+  if (result.isError()) {
+    return result.error().status;
   }
 
-  availableCapacity += volumeInfo.size;
-  volumes.erase(volumeInfo.id);
-
   return Status::OK;
 }
 
@@ -478,29 +480,19 @@ Status TestCSIPlugin::ControllerPublishVolume(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Validate required fields.
+  // TODO(chhsiao): Validate the request.
 
-  if (!volumes.contains(request->volume_id())) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Volume '" + request->volume_id() + "' is not found");
-  }
+  Try<Nothing, StatusError> result = controllerPublishVolume(
+      request->volume_id(),
+      request->node_id(),
+      mesos::csi::v0::devolve(request->volume_capability()),
+      request->readonly(),
+      request->volume_attributes());
 
-  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
-  const string path = getVolumePath(volumeInfo);
-
-  auto it = request->volume_attributes().find("path");
-  if (it == request->volume_attributes().end() || it->second != path) {
-    return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+  if (result.isError()) {
+    return result.error().status;
   }
 
-  if (request->node_id() != NODE_ID) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Node '" + request->node_id() + "' is not found");
-  }
-
-  // Do nothing.
   return Status::OK;
 }
 
@@ -512,21 +504,15 @@ Status TestCSIPlugin::ControllerUnpublishVolume(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Validate required fields.
+  // TODO(chhsiao): Validate the request.
 
-  if (!volumes.contains(request->volume_id())) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Volume '" + request->volume_id() + "' is not found");
-  }
+  Try<Nothing, StatusError> result =
+    controllerUnpublishVolume(request->volume_id(), request->node_id());
 
-  if (request->node_id() != NODE_ID) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Node '" + request->node_id() + "' is not found");
+  if (result.isError()) {
+    return result.error().status;
   }
 
-  // Do nothing.
   return Status::OK;
 }
 
@@ -538,36 +524,24 @@ Status TestCSIPlugin::ValidateVolumeCapabilities(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Validate required fields.
-
-  if (!volumes.contains(request->volume_id())) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Volume '" + request->volume_id() + "' is not found");
-  }
+  // TODO(chhsiao): Validate the request.
 
-  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
-  const string path = getVolumePath(volumeInfo);
+  Try<Option<Error>, StatusError> result = validateVolumeCapabilities(
+      request->volume_id(),
+      request->volume_attributes(),
+      mesos::csi::v0::devolve(request->volume_capabilities()));
 
-  auto it = request->volume_attributes().find("path");
-  if (it == request->volume_attributes().end() || it->second != path) {
-    return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
+  if (result.isError()) {
+    return result.error().status;
   }
 
-  foreach (const csi::v0::VolumeCapability& capability,
-           request->volume_capabilities()) {
-    if (capability != defaultVolumeCapability) {
-      response->set_supported(false);
-      response->set_message("Unsupported volume capabilities");
-
-      return Status::OK;
-    }
+  if (result->isSome()) {
+    response->set_supported(false);
+    response->set_message(result->get().message);
+  } else {
+    response->set_supported(true);
   }
 
-  // TODO(chhsiao): Validate the parameters once we get CSI v1.
-
-  response->set_supported(true);
-
   return Status::OK;
 }
 
@@ -579,17 +553,17 @@ Status TestCSIPlugin::ListVolumes(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Support the `max_entries` field.
-  if (request->max_entries() > 0) {
-    return Status(grpc::ABORTED, "Field 'max_entries' is not supported");
-  }
+  Try<vector<VolumeInfo>, StatusError> result = listVolumes(
+      request->max_entries()
+        ? request->max_entries() : Option<int32_t>::none(),
+      !request->starting_token().empty()
+        ? request->starting_token() : Option<string>::none());
 
-  // TODO(chhsiao): Support the `starting_token` fields.
-  if (!request->starting_token().empty()) {
-    return Status(grpc::ABORTED, "Field 'starting_token' is not supported");
+  if (result.isError()) {
+    return result.error().status;
   }
 
-  foreachvalue (const VolumeInfo& volumeInfo, volumes) {
+  foreach (const VolumeInfo& volumeInfo, result.get()) {
     csi::v0::Volume* volume = response->add_entries()->mutable_volume();
     volume->set_id(volumeInfo.id);
     volume->set_capacity_bytes(volumeInfo.size.bytes());
@@ -607,25 +581,15 @@ Status TestCSIPlugin::GetCapacity(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  foreach (const csi::v0::VolumeCapability& capability,
-           request->volume_capabilities()) {
-    // We report zero capacity for any capability other than the
-    // default-constructed mount volume capability since this plugin
-    // does not support any filesystem types and mount flags.
-    if (capability != defaultVolumeCapability) {
-      response->set_available_capacity(0);
-
-      return Status::OK;
-    }
-  }
-
-  if (request->parameters() != createParameters) {
-      response->set_available_capacity(0);
+  Try<Bytes, StatusError> result = getCapacity(
+      mesos::csi::v0::devolve(request->volume_capabilities()),
+      request->parameters());
 
-      return Status::OK;
+  if (result.isError()) {
+    return result.error().status;
   }
 
-  response->set_available_capacity(availableCapacity.bytes());
+  response->set_available_capacity(result->bytes());
 
   return Status::OK;
 }
@@ -658,53 +622,17 @@ Status TestCSIPlugin::NodeStageVolume(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Validate required fields.
-
-  if (!volumes.contains(request->volume_id())) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Volume '" + request->volume_id() + "' is not found");
-  }
-
-  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
-  const string path = getVolumePath(volumeInfo);
-
-  auto it = request->volume_attributes().find("path");
-  if (it == request->volume_attributes().end() || it->second != path) {
-    return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
-  }
-
-  if (!os::exists(request->staging_target_path())) {
-    return Status(
-        grpc::INVALID_ARGUMENT,
-        "Target path '" + request->staging_target_path() + "' is not found");
-  }
+  // TODO(chhsiao): Validate the request.
 
-  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
-  if (table.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to get mount table: " + table.error());
-  }
-
-  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
-    if (entry.target == request->staging_target_path()) {
-      return Status::OK;
-    }
-  }
-
-  Try<Nothing> mount = fs::mount(
-      path,
+  Try<Nothing, StatusError> result = nodeStageVolume(
+      request->volume_id(),
+      request->publish_info(),
       request->staging_target_path(),
-      None(),
-      MS_BIND,
-      None());
+      mesos::csi::v0::devolve(request->volume_capability()),
+      request->volume_attributes());
 
-  if (mount.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to mount from '" + path + "' to '" +
-        request->staging_target_path() + "': " + mount.error());
+  if (result.isError()) {
+    return result.error().status;
   }
 
   return Status::OK;
@@ -718,39 +646,13 @@ Status TestCSIPlugin::NodeUnstageVolume(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Validate required fields.
-
-  if (!volumes.contains(request->volume_id())) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Volume '" + request->volume_id() + "' is not found");
-  }
-
-  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
-  if (table.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to get mount table: " + table.error());
-  }
-
-  bool found = false;
-  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
-    if (entry.target == request->staging_target_path()) {
-      found = true;
-      break;
-    }
-  }
+  // TODO(chhsiao): Validate the request.
 
-  if (!found) {
-    return Status::OK;
-  }
+  Try<Nothing, StatusError> result =
+    nodeUnstageVolume(request->volume_id(), request->staging_target_path());
 
-  Try<Nothing> unmount = fs::unmount(request->staging_target_path());
-  if (unmount.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to unmount '" + request->staging_target_path() +
-        "': " + unmount.error());
+  if (result.isError()) {
+    return result.error().status;
   }
 
   return Status::OK;
@@ -764,73 +666,19 @@ Status TestCSIPlugin::NodePublishVolume(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  // TODO(chhsiao): Validate required fields.
-
-  if (!volumes.contains(request->volume_id())) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Volume '" + request->volume_id() + "' is not found");
-  }
-
-  const VolumeInfo& volumeInfo = volumes.at(request->volume_id());
-  const string path = getVolumePath(volumeInfo);
-
-  auto it = request->volume_attributes().find("path");
-  if (it == request->volume_attributes().end() || it->second != path) {
-    return Status(grpc::INVALID_ARGUMENT, "Invalid volume attributes");
-  }
-
-  if (!os::exists(request->target_path())) {
-    return Status(
-        grpc::INVALID_ARGUMENT,
-        "Target path '" + request->target_path() + "' is not found");
-  }
-
-  if (request->staging_target_path().empty()) {
-    return Status(
-        grpc::FAILED_PRECONDITION,
-        "Expecting 'staging_target_path' to be set");
-  }
-
-  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
-  if (table.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to get mount table: " + table.error());
-  }
-
-  bool found = false;
-  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
-    if (entry.target == request->staging_target_path()) {
-      found = true;
-      break;
-    }
-  }
-
-  if (!found) {
-    return Status(
-        grpc::FAILED_PRECONDITION,
-        "Volume '" + request->volume_id() + "' has not been staged yet");
-  }
-
-  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
-    if (entry.target == request->target_path()) {
-      return Status::OK;
-    }
-  }
+  // TODO(chhsiao): Validate the request.
 
-  Try<Nothing> mount = fs::mount(
+  Try<Nothing, StatusError> result = nodePublishVolume(
+      request->volume_id(),
+      request->publish_info(),
       request->staging_target_path(),
       request->target_path(),
-      None(),
-      MS_BIND | (request->readonly() ? MS_RDONLY : 0),
-      None());
+      mesos::csi::v0::devolve(request->volume_capability()),
+      request->readonly(),
+      request->volume_attributes());
 
-  if (mount.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to mount from '" + path + "' to '" +
-        request->target_path() + "': " + mount.error());
+  if (result.isError()) {
+    return result.error().status;
   }
 
   return Status::OK;
@@ -844,37 +692,13 @@ Status TestCSIPlugin::NodeUnpublishVolume(
 {
   LOG(INFO) << request->GetDescriptor()->name() << " '" << *request << "'";
 
-  if (!volumes.contains(request->volume_id())) {
-    return Status(
-        grpc::NOT_FOUND,
-        "Volume '" + request->volume_id() + "' is not found");
-  }
-
-  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
-  if (table.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to get mount table: " + table.error());
-  }
-
-  bool found = false;
-  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
-    if (entry.target == request->target_path()) {
-      found = true;
-      break;
-    }
-  }
+  // TODO(chhsiao): Validate the request.
 
-  if (!found) {
-    return Status::OK;
-  }
+  Try<Nothing, StatusError> result =
+    nodeUnpublishVolume(request->volume_id(), request->target_path());
 
-  Try<Nothing> unmount = fs::unmount(request->target_path());
-  if (unmount.isError()) {
-    return Status(
-        grpc::INTERNAL,
-        "Failed to unmount '" + request->target_path() +
-        "': " + unmount.error());
+  if (result.isError()) {
+    return result.error().status;
   }
 
   return Status::OK;
@@ -940,6 +764,443 @@ Try<TestCSIPlugin::VolumeInfo> TestCSIPlugin::parseVolumePath(
 }
 
 
+Try<TestCSIPlugin::VolumeInfo, StatusError> TestCSIPlugin::createVolume(
+    const string& name,
+    const Bytes& requiredBytes,
+    const Bytes& limitBytes,
+    const RepeatedPtrField<VolumeCapability>& capabilities,
+    const Map<string, string> parameters)
+{
+  // The volume ID is determined by `name`, with reserved characters escaped.
+  const string volumeId = http::encode(name);
+
+  foreach (const VolumeCapability& capability, capabilities) {
+    if (capability != defaultVolumeCapability) {
+      return StatusError(Status(
+          grpc::INVALID_ARGUMENT, "Unsupported volume capabilities"));
+    }
+  }
+
+  if (parameters != createParameters) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Unsupported create parameters"));
+  }
+
+  if (volumes.contains(volumeId)) {
+    const VolumeInfo& volumeInfo = volumes.at(volumeId);
+
+    if (volumeInfo.size > limitBytes) {
+      return StatusError(Status(
+          grpc::ALREADY_EXISTS, "Cannot satisfy limit bytes"));
+    }
+
+    if (volumeInfo.size < requiredBytes) {
+      return StatusError(Status(
+          grpc::ALREADY_EXISTS, "Cannot satisfy required bytes"));
+    }
+
+    return volumeInfo;
+  } else {
+    if (availableCapacity < requiredBytes) {
+      return StatusError(Status(grpc::OUT_OF_RANGE, "Insufficient capacity"));
+    }
+
+    VolumeInfo volumeInfo;
+    volumeInfo.id = volumeId;
+
+    // We assume that `requiredBytes <= limitBytes` has been verified.
+    const Bytes defaultSize = min(availableCapacity, DEFAULT_VOLUME_CAPACITY);
+    volumeInfo.size = min(max(defaultSize, requiredBytes), limitBytes);
+
+    const string path = getVolumePath(volumeInfo);
+
+    Try<Nothing> mkdir = os::mkdir(path);
+    if (mkdir.isError()) {
+      return StatusError(Status(
+          grpc::INTERNAL,
+          "Failed to create volume '" + volumeInfo.id + "': " + mkdir.error()));
+    }
+
+    CHECK_GE(availableCapacity, volumeInfo.size);
+    availableCapacity -= volumeInfo.size;
+    volumes.put(volumeInfo.id, volumeInfo);
+
+    return volumeInfo;
+  }
+
+  UNREACHABLE();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::deleteVolume(const string& volumeId)
+{
+  if (!volumes.contains(volumeId)) {
+    // Return a success for idempotency.
+    return Nothing();
+  }
+
+  const VolumeInfo& volumeInfo = volumes.at(volumeId);
+  const string path = getVolumePath(volumeInfo);
+
+  Try<Nothing> rmdir = os::rmdir(path);
+  if (rmdir.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL,
+        "Failed to delete volume '" + volumeId + "': " + rmdir.error()));
+  }
+
+  availableCapacity += volumeInfo.size;
+  volumes.erase(volumeInfo.id);
+
+  return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::controllerPublishVolume(
+    const string& volumeId,
+    const string& nodeId,
+    const VolumeCapability& capability,
+    bool readonly,
+    const Map<string, string>& volumeContext)
+{
+  if (!volumes.contains(volumeId)) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+  }
+
+  if (nodeId != NODE_ID) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Node '" + nodeId + "' does not exist"));
+  }
+
+  if (capability != defaultVolumeCapability) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
+  }
+
+  if (readonly) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Unsupported read-only mode"));
+  }
+
+  const VolumeInfo& volumeInfo = volumes.at(volumeId);
+  const string path = getVolumePath(volumeInfo);
+
+  if (!volumeContext.count("path") || volumeContext.at("path") != path) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Invalid volume context"));
+  }
+
+  // Do nothing.
+  return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::controllerUnpublishVolume(
+    const string& volumeId, const string& nodeId)
+{
+  if (!volumes.contains(volumeId)) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+  }
+
+  if (nodeId != NODE_ID) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Node '" + nodeId + "' does not exist"));
+  }
+
+  // Do nothing.
+  return Nothing();
+}
+
+
+Try<Option<Error>, StatusError> TestCSIPlugin::validateVolumeCapabilities(
+    const string& volumeId,
+    const Map<string, string>& volumeContext,
+    const RepeatedPtrField<VolumeCapability>& capabilities,
+    const Option<Map<string, string>>& parameters)
+{
+  if (!volumes.contains(volumeId)) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+  }
+
+  const VolumeInfo& volumeInfo = volumes.at(volumeId);
+  const string path = getVolumePath(volumeInfo);
+
+  if (!volumeContext.count("path") || volumeContext.at("path") != path) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Invalid volume context"));
+  }
+
+  foreach (const VolumeCapability& capability, capabilities) {
+    if (capability != defaultVolumeCapability) {
+      return Some(Error("Unsupported volume capabilities"));
+    }
+  }
+
+  if (parameters.isSome() && parameters.get() != createParameters) {
+    return Some(Error("Mismatched parameters"));
+  }
+
+  return None();
+}
+
+
+Try<vector<TestCSIPlugin::VolumeInfo>, StatusError> TestCSIPlugin::listVolumes(
+    const Option<int32_t>& maxEntries,
+    const Option<string>& startingToken)
+{
+  // TODO(chhsiao): Support max entries.
+  if (maxEntries.isSome()) {
+    return StatusError(Status(
+        grpc::ABORTED, "Specifying max entries is not supported"));
+  }
+
+  // TODO(chhsiao): Support starting token.
+  if (startingToken.isSome()) {
+    return StatusError(Status(
+        grpc::ABORTED, "Specifying starting token is not supported"));
+  }
+
+  return volumes.values();
+}
+
+
+Try<Bytes, StatusError> TestCSIPlugin::getCapacity(
+    const RepeatedPtrField<VolumeCapability>& capabilities,
+    const Map<string, string>& parameters)
+{
+  // We report zero capacity if any capability other than the default mount
+  // volume capability is given. If no capacity is given, the total available
+  // capacity  will be returned.
+  foreach (const VolumeCapability& capability, capabilities) {
+    if (capability != defaultVolumeCapability) {
+      return Bytes(0);
+    }
+  }
+
+  if (parameters != createParameters) {
+    return Bytes(0);
+  }
+
+  return availableCapacity;
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::nodeStageVolume(
+    const string& volumeId,
+    const Map<string, string>& publishContext,
+    const string& stagingPath,
+    const VolumeCapability& capability,
+    const Map<string, string>& volumeContext)
+{
+  if (!volumes.contains(volumeId)) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+  }
+
+  if (!publishContext.empty()) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Invalid publish context"));
+  }
+
+  if (!os::exists(stagingPath)) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT,
+        "Staging path '" + stagingPath + "' does not exist"));
+  }
+
+  if (capability != defaultVolumeCapability) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
+  }
+
+  const VolumeInfo& volumeInfo = volumes.at(volumeId);
+  const string path = getVolumePath(volumeInfo);
+
+  if (!volumeContext.count("path") || volumeContext.at("path") != path) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Invalid volume context"));
+  }
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL, "Failed to get mount table: " + table.error()));
+  }
+
+  if (std::any_of(
+          table->entries.begin(),
+          table->entries.end(),
+          [&](const fs::MountInfoTable::Entry& entry) {
+            return entry.target == stagingPath;
+          })) {
+    return Nothing();
+  }
+
+  Try<Nothing> mount = fs::mount(path, stagingPath, None(), MS_BIND, None());
+  if (mount.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL,
+        "Failed to mount from '" + path + "' to '" + stagingPath +
+          "': " + mount.error()));
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::nodeUnstageVolume(
+    const string& volumeId, const string& stagingPath)
+{
+  if (!volumes.contains(volumeId)) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+  }
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL, "Failed to get mount table: " + table.error()));
+  }
+
+  if (std::none_of(
+          table->entries.begin(),
+          table->entries.end(),
+          [&](const fs::MountInfoTable::Entry& entry) {
+            return entry.target == stagingPath;
+          })) {
+    return Nothing();
+  }
+
+  Try<Nothing> unmount = fs::unmount(stagingPath);
+  if (unmount.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL,
+        "Failed to unmount '" + stagingPath + "': " + unmount.error()));
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::nodePublishVolume(
+    const string& volumeId,
+    const Map<string, string>& publishContext,
+    const string& stagingPath,
+    const string& targetPath,
+    const VolumeCapability& capability,
+    bool readonly,
+    const Map<string, string>& volumeContext)
+{
+  if (!volumes.contains(volumeId)) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+  }
+
+  if (!publishContext.empty()) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Invalid publish context"));
+  }
+
+  if (!os::exists(targetPath)) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT,
+        "Target path '" + targetPath + "' does not exist"));
+  }
+
+  if (capability != defaultVolumeCapability) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Unsupported volume capability"));
+  }
+
+  const VolumeInfo& volumeInfo = volumes.at(volumeId);
+  const string path = getVolumePath(volumeInfo);
+
+  if (!volumeContext.count("path") || volumeContext.at("path") != path) {
+    return StatusError(Status(
+        grpc::INVALID_ARGUMENT, "Invalid volume context"));
+  }
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL, "Failed to get mount table: " + table.error()));
+  }
+
+  if (std::none_of(
+          table->entries.begin(),
+          table->entries.end(),
+          [&](const fs::MountInfoTable::Entry& entry) {
+            return entry.target == stagingPath;
+          })) {
+    return StatusError(Status(
+        grpc::FAILED_PRECONDITION,
+        "Volume '" + volumeId + "' has not been staged yet"));
+  }
+
+  if (std::any_of(
+          table->entries.begin(),
+          table->entries.end(),
+          [&](const fs::MountInfoTable::Entry& entry) {
+            return entry.target == targetPath;
+          })) {
+    return Nothing();
+  }
+
+  Try<Nothing> mount = fs::mount(
+      stagingPath,
+      targetPath,
+      None(),
+      MS_BIND | (readonly ? MS_RDONLY : 0),
+      None());
+
+  if (mount.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL,
+        "Failed to mount from '" + stagingPath + "' to '" + targetPath +
+          "': " + mount.error()));
+  }
+
+  return Nothing();
+}
+
+
+Try<Nothing, StatusError> TestCSIPlugin::nodeUnpublishVolume(
+    const string& volumeId, const string& targetPath)
+{
+  if (!volumes.contains(volumeId)) {
+    return StatusError(Status(
+        grpc::NOT_FOUND, "Volume '" + volumeId + "' does not exist"));
+  }
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL, "Failed to get mount table: " + table.error()));
+  }
+
+  if (std::none_of(
+          table->entries.begin(),
+          table->entries.end(),
+          [&](const fs::MountInfoTable::Entry& entry) {
+            return entry.target == targetPath;
+          })) {
+    return Nothing();
+  }
+
+  Try<Nothing> unmount = fs::unmount(targetPath);
+  if (unmount.isError()) {
+    return StatusError(Status(
+        grpc::INTERNAL,
+        "Failed to unmount '" + targetPath + "': " + unmount.error()));
+  }
+
+  return Nothing();
+}
+
+
 // Serves CSI calls from the given endpoint through forwarding the calls to
 // another CSI endpoint and returning back the results.
 class CSIProxy