You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:01 UTC

[mesos] 04/15: Added skeleton code for v0 `VolumeManager`.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7e5a0e2d04722ad3a261dcd43163ebfaae73c2a3
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:23:45 2019 -0700

    Added skeleton code for v0 `VolumeManager`.
    
    Review: https://reviews.apache.org/r/70214/
---
 src/CMakeLists.txt                    |   1 +
 src/Makefile.am                       |   3 +
 src/csi/v0_volume_manager.cpp         | 276 ++++++++++++++++++++++++++++++++++
 src/csi/v0_volume_manager.hpp         | 116 ++++++++++++++
 src/csi/v0_volume_manager_process.hpp | 107 +++++++++++++
 src/csi/volume_manager.cpp            |  17 ++-
 6 files changed, 518 insertions(+), 2 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4a7518f..20e50a4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -248,6 +248,7 @@ set(CSI_SRC
   csi/rpc.cpp
   csi/service_manager.cpp
   csi/utils.cpp
+  csi/v0_volume_manager.cpp
   csi/volume_manager.cpp)
 
 set(DOCKER_SRC
diff --git a/src/Makefile.am b/src/Makefile.am
index d91d1a1..89a4090 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1579,6 +1579,9 @@ libcsi_la_SOURCES =							\
   csi/state.proto							\
   csi/utils.cpp								\
   csi/utils.hpp								\
+  csi/v0_volume_manager.cpp						\
+  csi/v0_volume_manager.hpp						\
+  csi/v0_volume_manager_process.hpp					\
   csi/volume_manager.cpp						\
   csi/volume_manager.hpp						\
   ../include/csi/spec.hpp
diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp
new file mode 100644
index 0000000..2a4d3eb
--- /dev/null
+++ b/src/csi/v0_volume_manager.cpp
@@ -0,0 +1,276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "csi/v0_volume_manager.hpp"
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+
+#include "csi/v0_volume_manager_process.hpp"
+
+namespace http = process::http;
+
+using std::string;
+using std::vector;
+
+using google::protobuf::Map;
+
+using process::Failure;
+using process::Future;
+using process::ProcessBase;
+
+using process::grpc::client::Runtime;
+
+namespace mesos{
+namespace csi {
+namespace v0 {
+
+VolumeManagerProcess::VolumeManagerProcess(
+    const http::URL& agentUrl,
+    const string& _rootDir,
+    const CSIPluginInfo& _info,
+    const hashset<Service> _services,
+    const string& containerPrefix,
+    const Option<string>& authToken,
+    const Runtime& _runtime,
+    Metrics* _metrics)
+  : ProcessBase(process::ID::generate("csi-v0-volume-manager")),
+    rootDir(_rootDir),
+    info(_info),
+    services(_services),
+    runtime(_runtime),
+    metrics(_metrics),
+    serviceManager(new ServiceManager(
+        agentUrl,
+        rootDir,
+        info,
+        services,
+        containerPrefix,
+        authToken,
+        runtime,
+        metrics))
+{
+  // This should have been validated in `VolumeManager::create`.
+  CHECK(!services.empty())
+    << "Must specify at least one service for CSI plugin type '" << info.type()
+    << "' and name '" << info.name() << "'";
+}
+
+
+Future<Nothing> VolumeManagerProcess::recover()
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<vector<VolumeInfo>> VolumeManagerProcess::listVolumes()
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<Bytes> VolumeManagerProcess::getCapacity(
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<VolumeInfo> VolumeManagerProcess::createVolume(
+    const string& name,
+    const Bytes& capacity,
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<Option<Error>> VolumeManagerProcess::validateVolume(
+    const VolumeInfo& volumeInfo,
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<bool> VolumeManagerProcess::deleteVolume(const string& volumeId)
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<Nothing> VolumeManagerProcess::attachVolume(const string& volumeId)
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<Nothing> VolumeManagerProcess::detachVolume(const string& volumeId)
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<Nothing> VolumeManagerProcess::publishVolume(const string& volumeId)
+{
+  return Failure("Unimplemented");
+}
+
+
+Future<Nothing> VolumeManagerProcess::unpublishVolume(const string& volumeId)
+{
+  return Failure("Unimplemented");
+}
+
+
+VolumeManager::VolumeManager(
+    const http::URL& agentUrl,
+    const string& rootDir,
+    const CSIPluginInfo& info,
+    const hashset<Service>& services,
+    const string& containerPrefix,
+    const Option<string>& authToken,
+    const Runtime& runtime,
+    Metrics* metrics)
+  : process(new VolumeManagerProcess(
+        agentUrl,
+        rootDir,
+        info,
+        services,
+        containerPrefix,
+        authToken,
+        runtime,
+        metrics))
+{
+  process::spawn(CHECK_NOTNULL(process.get()));
+  recovered = process::dispatch(process.get(), &VolumeManagerProcess::recover);
+}
+
+
+VolumeManager::~VolumeManager()
+{
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+Future<Nothing> VolumeManager::recover()
+{
+  return recovered;
+}
+
+
+Future<vector<VolumeInfo>> VolumeManager::listVolumes()
+{
+  return recovered
+    .then(process::defer(process.get(), &VolumeManagerProcess::listVolumes));
+}
+
+
+Future<Bytes> VolumeManager::getCapacity(
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
+{
+  return recovered
+    .then(process::defer(
+        process.get(),
+        &VolumeManagerProcess::getCapacity,
+        capability,
+        parameters));
+}
+
+
+Future<VolumeInfo> VolumeManager::createVolume(
+    const string& name,
+    const Bytes& capacity,
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
+{
+  return recovered
+    .then(process::defer(
+        process.get(),
+        &VolumeManagerProcess::createVolume,
+        name,
+        capacity,
+        capability,
+        parameters));
+}
+
+
+Future<Option<Error>> VolumeManager::validateVolume(
+    const VolumeInfo& volumeInfo,
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
+{
+  return recovered
+    .then(process::defer(
+        process.get(),
+        &VolumeManagerProcess::validateVolume,
+        volumeInfo,
+        capability,
+        parameters));
+}
+
+
+Future<bool> VolumeManager::deleteVolume(const string& volumeId)
+{
+  return recovered
+    .then(process::defer(
+        process.get(), &VolumeManagerProcess::deleteVolume, volumeId));
+}
+
+
+Future<Nothing> VolumeManager::attachVolume(const string& volumeId)
+{
+  return recovered
+    .then(process::defer(
+        process.get(), &VolumeManagerProcess::attachVolume, volumeId));
+}
+
+
+Future<Nothing> VolumeManager::detachVolume(const string& volumeId)
+{
+  return recovered
+    .then(process::defer(
+        process.get(), &VolumeManagerProcess::detachVolume, volumeId));
+}
+
+
+Future<Nothing> VolumeManager::publishVolume(const string& volumeId)
+{
+  return recovered
+    .then(process::defer(
+        process.get(), &VolumeManagerProcess::publishVolume, volumeId));
+}
+
+
+Future<Nothing> VolumeManager::unpublishVolume(const string& volumeId)
+{
+  return recovered
+    .then(process::defer(
+        process.get(), &VolumeManagerProcess::unpublishVolume, volumeId));
+}
+
+} // namespace v0 {
+} // namespace csi {
+} // namespace mesos {
diff --git a/src/csi/v0_volume_manager.hpp b/src/csi/v0_volume_manager.hpp
new file mode 100644
index 0000000..6c15f29
--- /dev/null
+++ b/src/csi/v0_volume_manager.hpp
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __CSI_V0_VOLUME_MANAGER_HPP__
+#define __CSI_V0_VOLUME_MANAGER_HPP__
+
+#include <string>
+#include <vector>
+
+#include <google/protobuf/map.h>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/csi/types.hpp>
+
+#include <process/future.hpp>
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/error.hpp>
+#include <stout/hashset.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+
+#include "csi/metrics.hpp"
+#include "csi/service_manager.hpp"
+#include "csi/volume_manager.hpp"
+
+namespace mesos {
+namespace csi {
+namespace v0 {
+
+// Forward declarations.
+class VolumeManagerProcess;
+
+
+class VolumeManager : public csi::VolumeManager
+{
+public:
+  VolumeManager(
+      const process::http::URL& agentUrl,
+      const std::string& rootDir,
+      const CSIPluginInfo& info,
+      const hashset<Service>& services,
+      const std::string& containerPrefix,
+      const Option<std::string>& authToken,
+      const process::grpc::client::Runtime& runtime,
+      Metrics* metrics);
+
+  // Since this class contains `Owned` members which should not but can be
+  // copied, explicitly make this class non-copyable.
+  //
+  // TODO(chhsiao): Remove this once MESOS-5122 is fixed.
+  VolumeManager(const VolumeManager&) = delete;
+  VolumeManager& operator=(const VolumeManager&) = delete;
+
+  ~VolumeManager() override;
+
+  process::Future<Nothing> recover() override;
+
+  process::Future<std::vector<VolumeInfo>> listVolumes() override;
+
+  process::Future<Bytes> getCapacity(
+      const types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters)
+    override;
+
+  process::Future<VolumeInfo> createVolume(
+      const std::string& name,
+      const Bytes& capacity,
+      const types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters)
+    override;
+
+  process::Future<Option<Error>> validateVolume(
+      const VolumeInfo& volumeInfo,
+      const types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters)
+    override;
+
+  process::Future<bool> deleteVolume(const std::string& volumeId) override;
+
+  process::Future<Nothing> attachVolume(const std::string& volumeId) override;
+
+  process::Future<Nothing> detachVolume(const std::string& volumeId) override;
+
+  process::Future<Nothing> publishVolume(const std::string& volumeId) override;
+
+  process::Future<Nothing> unpublishVolume(
+      const std::string& volumeId) override;
+
+private:
+  process::Owned<VolumeManagerProcess> process;
+  process::Future<Nothing> recovered;
+};
+
+} // namespace v0 {
+} // namespace csi {
+} // namespace mesos {
+
+#endif // __CSI_V0_VOLUME_MANAGER_HPP__
diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp
new file mode 100644
index 0000000..9db99de
--- /dev/null
+++ b/src/csi/v0_volume_manager_process.hpp
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __CSI_VOLUME_MANAGER_PROCESS_HPP__
+#define __CSI_VOLUME_MANAGER_PROCESS_HPP__
+
+#include <string>
+#include <vector>
+
+#include <google/protobuf/map.h>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/csi/types.hpp>
+
+#include <process/future.hpp>
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/bytes.hpp>
+#include <stout/error.hpp>
+#include <stout/hashset.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+
+#include "csi/metrics.hpp"
+#include "csi/service_manager.hpp"
+#include "csi/v0_volume_manager.hpp"
+#include "csi/volume_manager.hpp"
+
+namespace mesos {
+namespace csi {
+namespace v0 {
+
+
+class VolumeManagerProcess : public process::Process<VolumeManagerProcess>
+{
+public:
+  explicit VolumeManagerProcess(
+      const process::http::URL& agentUrl,
+      const std::string& _rootDir,
+      const CSIPluginInfo& _info,
+      const hashset<Service> _services,
+      const std::string& containerPrefix,
+      const Option<std::string>& authToken,
+      const process::grpc::client::Runtime& _runtime,
+      Metrics* _metrics);
+
+  process::Future<Nothing> recover();
+
+  process::Future<std::vector<VolumeInfo>> listVolumes();
+
+  process::Future<Bytes> getCapacity(
+      const types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters);
+
+  process::Future<VolumeInfo> createVolume(
+      const std::string& name,
+      const Bytes& capacity,
+      const types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters);
+
+  process::Future<Option<Error>> validateVolume(
+      const VolumeInfo& volumeInfo,
+      const types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters);
+
+  process::Future<bool> deleteVolume(const std::string& volumeId);
+
+  process::Future<Nothing> attachVolume(const std::string& volumeId);
+
+  process::Future<Nothing> detachVolume(const std::string& volumeId);
+
+  process::Future<Nothing> publishVolume(const std::string& volumeId);
+
+  process::Future<Nothing> unpublishVolume(const std::string& volumeId);
+
+private:
+  const std::string rootDir;
+  const CSIPluginInfo info;
+  const hashset<Service> services;
+
+  process::grpc::client::Runtime runtime;
+  Metrics* metrics;
+  process::Owned<ServiceManager> serviceManager;
+};
+
+} // namespace v0 {
+} // namespace csi {
+} // namespace mesos {
+
+#endif // __CSI_VOLUME_MANAGER_PROCESS_HPP__
diff --git a/src/csi/volume_manager.cpp b/src/csi/volume_manager.cpp
index e73f42e..cbe45cb 100644
--- a/src/csi/volume_manager.cpp
+++ b/src/csi/volume_manager.cpp
@@ -16,12 +16,18 @@
 
 #include "csi/volume_manager.hpp"
 
+#include <process/grpc.hpp>
+
+#include "csi/v0_volume_manager.hpp"
+
 namespace http = process::http;
 
 using std::string;
 
 using process::Owned;
 
+using process::grpc::client::Runtime;
+
 namespace mesos {
 namespace csi {
 
@@ -40,8 +46,15 @@ Try<Owned<VolumeManager>> VolumeManager::create(
         info.type() + "' and name '" + info.name() + "'");
   }
 
-  // TODO(chhsiao): Add a v0 VolumeManager.
-  return Error("Unimplemented");
+  return new v0::VolumeManager(
+      agentUrl,
+      rootDir,
+      info,
+      services,
+      containerPrefix,
+      authToken,
+      Runtime(),
+      metrics);
 }
 
 } // namespace csi {