You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/05 02:14:31 UTC

mesos git commit: Added a test CSI plugin.

Repository: mesos
Updated Branches:
  refs/heads/master ed30b030c -> b388d813d


Added a test CSI plugin.

The test CSI plugin would just create subdirectories under its working
directories to mimic the behavior of creating volumes, then bind-mount
those volumes to mimic publish.

Review: https://reviews.apache.org/r/63023/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b388d813
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b388d813
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b388d813

Branch: refs/heads/master
Commit: b388d813d005cd69ee607d234ea793923131ce15
Parents: ed30b03
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Mon Dec 4 11:38:40 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Dec 4 18:12:32 2017 -0800

----------------------------------------------------------------------
 src/Makefile.am                  |   9 +
 src/csi/utils.cpp                |  31 ++
 src/csi/utils.hpp                |  13 +
 src/examples/test_csi_plugin.cpp | 862 ++++++++++++++++++++++++++++++++++
 4 files changed, 915 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b388d813/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 08d29ab..d5ca797 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2255,6 +2255,15 @@ disk_full_framework_SOURCES = examples/disk_full_framework.cpp
 disk_full_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
 disk_full_framework_LDADD = libmesos.la $(LDADD)
 
+if ENABLE_GRPC
+if OS_LINUX
+check_PROGRAMS += test-csi-plugin
+test_csi_plugin_SOURCES = examples/test_csi_plugin.cpp
+test_csi_plugin_CPPFLAGS = $(MESOS_CPPFLAGS)
+test_csi_plugin_LDADD = libmesos.la $(LIB_PROTOBUF) $(LIB_GRPC) $(LDADD)
+endif
+endif
+
 check_PROGRAMS += test-helper
 test_helper_SOURCES =						\
   tests/active_user_test_helper.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/b388d813/src/csi/utils.cpp
----------------------------------------------------------------------
diff --git a/src/csi/utils.cpp b/src/csi/utils.cpp
index 4dd21fb..590e5f4 100644
--- a/src/csi/utils.cpp
+++ b/src/csi/utils.cpp
@@ -27,6 +27,23 @@ using google::protobuf::util::MessageToJsonString;
 
 namespace csi {
 
+bool operator==(
+    const ControllerServiceCapability::RPC& left,
+    const ControllerServiceCapability::RPC& right)
+{
+  return left.type() == right.type();
+}
+
+
+bool operator==(
+    const ControllerServiceCapability& left,
+    const ControllerServiceCapability& right)
+{
+  return left.has_rpc() == right.has_rpc() &&
+    (!left.has_rpc() || left.rpc() == right.rpc());
+}
+
+
 bool operator==(const Version& left, const Version& right)
 {
   return left.major() == right.major() &&
@@ -35,6 +52,20 @@ bool operator==(const Version& left, const Version& right)
 }
 
 
+bool operator!=(const Version& left, const Version& right)
+{
+  return !(left == right);
+}
+
+
+ostream& operator<<(
+    ostream& stream,
+    const ControllerServiceCapability::RPC::Type& type)
+{
+  return stream << ControllerServiceCapability::RPC::Type_Name(type);
+}
+
+
 ostream& operator<<(ostream& stream, const Version& version)
 {
   return stream << strings::join(

http://git-wip-us.apache.org/repos/asf/mesos/blob/b388d813/src/csi/utils.hpp
----------------------------------------------------------------------
diff --git a/src/csi/utils.hpp b/src/csi/utils.hpp
index 8ee97b2..54cf34b 100644
--- a/src/csi/utils.hpp
+++ b/src/csi/utils.hpp
@@ -34,9 +34,22 @@
 
 namespace csi {
 
+bool operator==(
+    const ControllerServiceCapability& left,
+    const ControllerServiceCapability& right);
+
+
 bool operator==(const Version& left, const Version& right);
 
 
+bool operator!=(const Version& left, const Version& right);
+
+
+std::ostream& operator<<(
+    std::ostream& stream,
+    const ControllerServiceCapability::RPC::Type& type);
+
+
 std::ostream& operator<<(std::ostream& stream, const Version& version);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b388d813/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
new file mode 100644
index 0000000..8ab936b
--- /dev/null
+++ b/src/examples/test_csi_plugin.cpp
@@ -0,0 +1,862 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc++/security/server_credentials.h>
+
+#include <stout/bytes.hpp>
+#include <stout/flags.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/option.hpp>
+#include <stout/path.hpp>
+#include <stout/stringify.hpp>
+#include <stout/strings.hpp>
+
+#include <stout/os/exists.hpp>
+#include <stout/os/ls.hpp>
+#include <stout/os/mkdir.hpp>
+#include <stout/os/rmdir.hpp>
+
+#include "csi/spec.hpp"
+#include "csi/utils.hpp"
+
+#include "linux/fs.hpp"
+
+#include "logging/logging.hpp"
+
+namespace fs = mesos::internal::fs;
+
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::list;
+using std::max;
+using std::min;
+using std::string;
+using std::unique_ptr;
+
+using grpc::InsecureServerCredentials;
+using grpc::Server;
+using grpc::ServerBuilder;
+using grpc::ServerContext;
+using grpc::Status;
+
+
+constexpr char PLUGIN_NAME[] = "org.apache.mesos.csi.test";
+constexpr char NODE_ID[] = "localhost";
+constexpr Bytes DEFAULT_VOLUME_CAPACITY = Megabytes(64);
+
+
+class Flags : public virtual mesos::internal::logging::Flags
+{
+public:
+  Flags()
+  {
+    add(&Flags::endpoint,
+        "endpoint",
+        "Path to the Unix domain socket the plugin should bind to.");
+
+    add(&Flags::work_dir,
+        "work_dir",
+        "Path to the work directory of the plugin.");
+
+    add(&Flags::total_capacity,
+        "total_capacity",
+        "The total disk capacity managed by the plugin.");
+  }
+
+  string endpoint;
+  string work_dir;
+  Bytes total_capacity;
+};
+
+
+class TestCSIPlugin
+  : public csi::Identity::Service,
+    public csi::Controller::Service,
+    public csi::Node::Service
+{
+public:
+  TestCSIPlugin(
+      const string& _workDir,
+      const string& _endpoint,
+      const csi::Version& _version,
+      const Bytes& totalCapacity)
+    : workDir(_workDir),
+      endpoint(_endpoint),
+      version(_version)
+  {
+    availableCapacity = totalCapacity;
+
+    // TODO(jieyu): Consider not using CHECKs here.
+    Try<list<string>> paths = os::ls(workDir);
+    CHECK_SOME(paths);
+
+    foreach (const string& path, paths.get()) {
+      Try<Volume> volume = parseVolumePath(path);
+      CHECK_SOME(volume);
+
+      CHECK(!volumes.contains(volume->id));
+      CHECK_GE(availableCapacity, volume->size);
+
+      availableCapacity -= volume->size;
+      volumes.put(volume->id, volume.get());
+    }
+
+    ServerBuilder builder;
+    builder.AddListeningPort(endpoint, InsecureServerCredentials());
+    builder.RegisterService(static_cast<csi::Identity::Service*>(this));
+    builder.RegisterService(static_cast<csi::Controller::Service*>(this));
+    builder.RegisterService(static_cast<csi::Node::Service*>(this));
+    server = std::move(builder.BuildAndStart());
+  }
+
+  void wait()
+  {
+    if (server) {
+      server->Wait();
+    }
+  }
+
+  virtual Status GetSupportedVersions(
+      ServerContext* context,
+      const csi::GetSupportedVersionsRequest* request,
+      csi::GetSupportedVersionsResponse* response) override;
+
+  virtual Status GetPluginInfo(
+      ServerContext* context,
+      const csi::GetPluginInfoRequest* request,
+      csi::GetPluginInfoResponse* response) override;
+
+  virtual Status CreateVolume(
+      ServerContext* context,
+      const csi::CreateVolumeRequest* request,
+      csi::CreateVolumeResponse* response) override;
+
+  virtual Status DeleteVolume(
+      ServerContext* context,
+      const csi::DeleteVolumeRequest* request,
+      csi::DeleteVolumeResponse* response) override;
+
+  virtual Status ControllerPublishVolume(
+      ServerContext* context,
+      const csi::ControllerPublishVolumeRequest* request,
+      csi::ControllerPublishVolumeResponse* response) override;
+
+  virtual Status ControllerUnpublishVolume(
+      ServerContext* context,
+      const csi::ControllerUnpublishVolumeRequest* request,
+      csi::ControllerUnpublishVolumeResponse* response) override;
+
+  virtual Status ValidateVolumeCapabilities(
+      ServerContext* context,
+      const csi::ValidateVolumeCapabilitiesRequest* request,
+      csi::ValidateVolumeCapabilitiesResponse* response) override;
+
+  virtual Status ListVolumes(
+      ServerContext* context,
+      const csi::ListVolumesRequest* request,
+      csi::ListVolumesResponse* response) override;
+
+  virtual Status GetCapacity(
+      ServerContext* context,
+      const csi::GetCapacityRequest* request,
+      csi::GetCapacityResponse* response) override;
+
+  virtual Status ControllerProbe(
+      ServerContext* context,
+      const csi::ControllerProbeRequest* request,
+      csi::ControllerProbeResponse* response) override;
+
+  virtual Status ControllerGetCapabilities(
+      ServerContext* context,
+      const csi::ControllerGetCapabilitiesRequest* request,
+      csi::ControllerGetCapabilitiesResponse* response) override;
+
+  virtual Status NodePublishVolume(
+      ServerContext* context,
+      const csi::NodePublishVolumeRequest* request,
+      csi::NodePublishVolumeResponse* response) override;
+
+  virtual Status NodeUnpublishVolume(
+      ServerContext* context,
+      const csi::NodeUnpublishVolumeRequest* request,
+      csi::NodeUnpublishVolumeResponse* response) override;
+
+  virtual Status GetNodeID(
+      ServerContext* context,
+      const csi::GetNodeIDRequest* request,
+      csi::GetNodeIDResponse* response) override;
+
+  virtual Status NodeProbe(
+      ServerContext* context,
+      const csi::NodeProbeRequest* request,
+      csi::NodeProbeResponse* response) override;
+
+  virtual Status NodeGetCapabilities(
+      ServerContext* context,
+      const csi::NodeGetCapabilitiesRequest* request,
+      csi::NodeGetCapabilitiesResponse* response) override;
+
+private:
+  struct Volume
+  {
+    string id;
+    Bytes size;
+  };
+
+  Option<Error> validateVersion(const csi::Version& _version);
+
+  string getVolumePath(const Volume& volume);
+  Try<Volume> parseVolumePath(const string& path);
+
+  const string workDir;
+  const string endpoint;
+  const csi::Version version;
+
+  Bytes availableCapacity;
+  hashmap<string, Volume> volumes;
+
+  unique_ptr<Server> server;
+};
+
+
+Status TestCSIPlugin::GetSupportedVersions(
+    ServerContext* context,
+    const csi::GetSupportedVersionsRequest* request,
+    csi::GetSupportedVersionsResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  response->add_supported_versions()->CopyFrom(version);
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::GetPluginInfo(
+    ServerContext* context,
+    const csi::GetPluginInfoRequest* request,
+    csi::GetPluginInfoResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  response->set_name(PLUGIN_NAME);
+  response->set_vendor_version(MESOS_VERSION);
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::CreateVolume(
+    ServerContext* context,
+    const csi::CreateVolumeRequest* request,
+    csi::CreateVolumeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  // TODO(chhsiao): Validate required fields.
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  if (request->name().empty()) {
+    return Status(grpc::INVALID_ARGUMENT, "Volume name cannot be empty");
+  }
+
+  if (request->name().find_first_of(os::PATH_SEPARATOR) != string::npos) {
+    return Status(grpc::INVALID_ARGUMENT, "Volume name cannot contain '/'");
+  }
+
+  bool alreadyExists = volumes.contains(request->name());
+
+  if (!alreadyExists) {
+    if (availableCapacity == Bytes(0)) {
+      return Status(grpc::OUT_OF_RANGE, "Insufficient capacity");
+    }
+
+    Volume volume;
+    volume.id = request->name();
+    volume.size = min(DEFAULT_VOLUME_CAPACITY, availableCapacity);
+
+    if (request->has_capacity_range()) {
+      const csi::CapacityRange& range = request->capacity_range();
+
+      // The highest we can pick.
+      Bytes limit = availableCapacity;
+      if (range.limit_bytes() != 0) {
+        limit = min(availableCapacity, Bytes(range.limit_bytes()));
+      }
+
+      if (range.required_bytes() != 0 &&
+          range.required_bytes() > limit.bytes()) {
+        return Status(grpc::OUT_OF_RANGE, "Cannot satisfy 'required_bytes'");
+      }
+
+      volume.size = min(
+          limit,
+          max(DEFAULT_VOLUME_CAPACITY, Bytes(range.required_bytes())));
+    }
+
+    const string path = getVolumePath(volume);
+
+    Try<Nothing> mkdir = os::mkdir(path);
+    if (mkdir.isError()) {
+      return Status(
+          grpc::INTERNAL,
+          "Failed to create volume '" + volume.id + "': " + mkdir.error());
+    }
+
+    availableCapacity -= volume.size;
+    volumes.put(volume.id, volume);
+  }
+
+  const Volume& volume = volumes.at(request->name());
+
+  response->mutable_volume_info()->set_id(volume.id);
+  response->mutable_volume_info()->set_capacity_bytes(volume.size.bytes());
+
+  if (alreadyExists) {
+    return Status(
+        grpc::ALREADY_EXISTS,
+        "Volume with name '" + request->name() + "' already exists");
+  }
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::DeleteVolume(
+    ServerContext* context,
+    const csi::DeleteVolumeRequest* request,
+    csi::DeleteVolumeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  // TODO(chhsiao): Validate required fields.
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  if (!volumes.contains(request->volume_id())) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Volume '" + request->volume_id() + "' is not found");
+  }
+
+  const Volume& volume = volumes.at(request->volume_id());
+  const string path = getVolumePath(volume);
+
+  Try<Nothing> rmdir = os::rmdir(path);
+  if (rmdir.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to delete volume '" + request->volume_id() + "': " +
+        rmdir.error());
+  }
+
+  availableCapacity -= volume.size;
+  volumes.erase(volume.id);
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::ControllerPublishVolume(
+    ServerContext* context,
+    const csi::ControllerPublishVolumeRequest* request,
+    csi::ControllerPublishVolumeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  // TODO(chhsiao): Validate required fields.
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  if (!volumes.contains(request->volume_id())) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Volume '" + request->volume_id() + "' is not found");
+  }
+
+  if (request->node_id() != NODE_ID) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Node '" + request->node_id() + "' is not found");
+  }
+
+  // Do nothing.
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::ControllerUnpublishVolume(
+    ServerContext* context,
+    const csi::ControllerUnpublishVolumeRequest* request,
+    csi::ControllerUnpublishVolumeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  // TODO(chhsiao): Validate required fields.
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  if (!volumes.contains(request->volume_id())) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Volume '" + request->volume_id() + "' is not found");
+  }
+
+  if (request->node_id() != NODE_ID) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Node '" + request->node_id() + "' is not found");
+  }
+
+  // Do nothing.
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::ValidateVolumeCapabilities(
+    ServerContext* context,
+    const csi::ValidateVolumeCapabilitiesRequest* request,
+    csi::ValidateVolumeCapabilitiesResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  // TODO(chhsiao): Validate required fields.
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  if (!volumes.contains(request->volume_id())) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Volume '" + request->volume_id() + "' is not found");
+  }
+
+  foreach (const csi::VolumeCapability& capability,
+           request->volume_capabilities()) {
+    if (!capability.has_mount()) {
+      response->set_supported(false);
+      response->set_message("Only MountVolume is supported");
+
+      return Status::OK;
+    }
+
+    if (capability.access_mode().mode() !=
+        csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
+      response->set_supported(false);
+      response->set_message("Access mode is not supported");
+
+      return Status::OK;
+    }
+  }
+
+  response->set_supported(true);
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::ListVolumes(
+    ServerContext* context,
+    const csi::ListVolumesRequest* request,
+    csi::ListVolumesResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  // TODO(chhsiao): Support the `max_entries` field.
+  if (request->max_entries() > 0) {
+    return Status(grpc::ABORTED, "Field 'max_entries' is not supported");
+  }
+
+  // TODO(chhsiao): Support the `starting_token` fields.
+  if (!request->starting_token().empty()) {
+    return Status(grpc::ABORTED, "Field 'starting_token' is not supported");
+  }
+
+  foreachvalue (const Volume& volume, volumes) {
+    csi::VolumeInfo* info = response->add_entries()->mutable_volume_info();
+    info->set_id(volume.id);
+    info->set_capacity_bytes(volume.size.bytes());
+  }
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::GetCapacity(
+    ServerContext* context,
+    const csi::GetCapacityRequest* request,
+    csi::GetCapacityResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  foreach (const csi::VolumeCapability& capability,
+           request->volume_capabilities()) {
+    if (!capability.has_mount()) {
+      response->set_available_capacity(0);
+
+      return Status::OK;
+    }
+
+    if (capability.access_mode().mode() !=
+        csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER) {
+      response->set_available_capacity(0);
+
+      return Status::OK;
+    }
+  }
+
+  response->set_available_capacity(availableCapacity.bytes());
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::ControllerProbe(
+    ServerContext* context,
+    const csi::ControllerProbeRequest* request,
+    csi::ControllerProbeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::ControllerGetCapabilities(
+    ServerContext* context,
+    const csi::ControllerGetCapabilitiesRequest* request,
+    csi::ControllerGetCapabilitiesResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  response->add_capabilities()->mutable_rpc()->set_type(
+      csi::ControllerServiceCapability::RPC::CREATE_DELETE_VOLUME);
+  response->add_capabilities()->mutable_rpc()->set_type(
+      csi::ControllerServiceCapability::RPC::PUBLISH_UNPUBLISH_VOLUME);
+  response->add_capabilities()->mutable_rpc()->set_type(
+      csi::ControllerServiceCapability::RPC::GET_CAPACITY);
+  response->add_capabilities()->mutable_rpc()->set_type(
+      csi::ControllerServiceCapability::RPC::LIST_VOLUMES);
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::NodePublishVolume(
+    ServerContext* context,
+    const csi::NodePublishVolumeRequest* request,
+    csi::NodePublishVolumeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  // TODO(chhsiao): Validate required fields.
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  if (!volumes.contains(request->volume_id())) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Volume '" + request->volume_id() + "' is not found");
+  }
+
+  if (!os::exists(request->target_path())) {
+    return Status(
+        grpc::INVALID_ARGUMENT,
+        "Target path '" + request->target_path() + "' is not found");
+  }
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to get mount table: " + table.error());
+  }
+
+  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
+    if (entry.target == request->target_path()) {
+      return Status::OK;
+    }
+  }
+
+  const Volume& volume = volumes.at(request->volume_id());
+  const string path = getVolumePath(volume);
+
+  Try<Nothing> mount = fs::mount(
+      path,
+      request->target_path(),
+      None(),
+      MS_BIND,
+      None());
+
+  if (mount.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to mount from '" + path + "' to '" +
+        request->target_path() + "': " + mount.error());
+  }
+
+  if (request->readonly()) {
+    mount = fs::mount(
+        None(),
+        request->target_path(),
+        None(),
+        MS_BIND | MS_RDONLY | MS_REMOUNT,
+        None());
+
+    return Status(
+        grpc::INTERNAL,
+        "Failed to mount '" + request->target_path() +
+        "' as read only: " + mount.error());
+  }
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::NodeUnpublishVolume(
+    ServerContext* context,
+    const csi::NodeUnpublishVolumeRequest* request,
+    csi::NodeUnpublishVolumeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  if (!volumes.contains(request->volume_id())) {
+    return Status(
+        grpc::NOT_FOUND,
+        "Volume '" + request->volume_id() + "' is not found");
+  }
+
+  Try<fs::MountInfoTable> table = fs::MountInfoTable::read();
+  if (table.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to get mount table: " + table.error());
+  }
+
+  bool found = false;
+  foreach (const fs::MountInfoTable::Entry& entry, table->entries) {
+    if (entry.target == request->target_path()) {
+      found = true;
+      break;
+    }
+  }
+
+  if (!found) {
+    return Status::OK;
+  }
+
+  const Volume& volume = volumes.at(request->volume_id());
+  const string path = getVolumePath(volume);
+
+  Try<Nothing> unmount = fs::unmount(request->target_path());
+  if (unmount.isError()) {
+    return Status(
+        grpc::INTERNAL,
+        "Failed to unmount '" + request->target_path() +
+        "': " + unmount.error());
+  }
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::GetNodeID(
+    ServerContext* context,
+    const csi::GetNodeIDRequest* request,
+    csi::GetNodeIDResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  response->set_node_id(NODE_ID);
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::NodeProbe(
+    ServerContext* context,
+    const csi::NodeProbeRequest* request,
+    csi::NodeProbeResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  return Status::OK;
+}
+
+
+Status TestCSIPlugin::NodeGetCapabilities(
+    ServerContext* context,
+    const csi::NodeGetCapabilitiesRequest* request,
+    csi::NodeGetCapabilitiesResponse* response)
+{
+  LOG(INFO) << request->GetDescriptor()->name() << " '" << request << "'";
+
+  Option<Error> error = validateVersion(request->version());
+  if (error.isSome()) {
+    return Status(grpc::INVALID_ARGUMENT, error->message);
+  }
+
+  return Status::OK;
+}
+
+
+Option<Error> TestCSIPlugin::validateVersion(const csi::Version& _version)
+{
+  if (version != _version) {
+    return Error("Version " + stringify(_version) + " is not supported");
+  }
+
+  return None();
+}
+
+
+string TestCSIPlugin::getVolumePath(const Volume& volume)
+{
+  return path::join(
+      workDir,
+      strings::join("-", stringify(volume.size), volume.id));
+}
+
+
+Try<TestCSIPlugin::Volume> TestCSIPlugin::parseVolumePath(const string& path)
+{
+  size_t pos = path.find_first_of("-");
+  if (pos == string::npos) {
+    return Error("Cannot find the delimiter");
+  }
+
+  string bytesString = path.substr(0, path.find_first_of("-"));
+  string id = path.substr(path.find_first_of("-") + 1);
+
+  Try<Bytes> bytes = Bytes::parse(bytesString);
+  if (bytes.isError()) {
+    return Error("Failed to parse bytes: " + bytes.error());
+  }
+
+  Volume volume;
+  volume.id = id;
+  volume.size = bytes.get();
+
+  return volume;
+}
+
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+  Try<flags::Warnings> load = flags.load("CSI_", argc, argv);
+
+  if (flags.help) {
+    cout << flags.usage() << endl;
+    return EXIT_SUCCESS;
+  }
+
+  if (load.isError()) {
+    cerr << flags.usage(load.error()) << endl;
+    return EXIT_FAILURE;
+  }
+
+  mesos::internal::logging::initialize(argv[0], true, flags);
+
+  // Log any flag warnings.
+  foreach (const flags::Warning& warning, load->warnings) {
+    LOG(WARNING) << warning.message;
+  }
+
+  Try<Nothing> mkdir = os::mkdir(flags.work_dir);
+  if (mkdir.isError()) {
+    cerr << "Failed to create working directory '" << flags.work_dir
+         << "': " << mkdir.error() << endl;
+    return EXIT_FAILURE;
+  }
+
+  csi::Version version;
+  version.set_major(0);
+  version.set_minor(1);
+  version.set_patch(0);
+
+  unique_ptr<TestCSIPlugin> plugin(new TestCSIPlugin(
+      flags.work_dir,
+      flags.endpoint,
+      version,
+      flags.total_capacity));
+
+  plugin->wait();
+
+  return EXIT_SUCCESS;
+}