You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by qi...@apache.org on 2020/08/03 02:51:36 UTC

[mesos] 04/04: Improved CSI service manager to support unmanaged CSI plugins.

This is an automated email from the ASF dual-hosted git repository.

qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d2c84d14b63f137546c00d69b0309c4543811732
Author: Qian Zhang <zh...@gmail.com>
AuthorDate: Wed Jul 15 16:02:48 2020 +0800

    Improved CSI service manager to support unmanaged CSI plugins.
    
    Review: https://reviews.apache.org/r/72683
---
 src/csi/service_manager.cpp | 92 ++++++++++++++++++++++++++++++++++++++++++++-
 src/csi/service_manager.hpp | 12 +++++-
 2 files changed, 101 insertions(+), 3 deletions(-)

diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp
index a87df96..7a8d8e5 100644
--- a/src/csi/service_manager.cpp
+++ b/src/csi/service_manager.cpp
@@ -137,6 +137,12 @@ public:
       const Runtime& _runtime,
       Metrics* _metrics);
 
+  ServiceManagerProcess(
+      const CSIPluginInfo& _info,
+      const hashset<Service>& services,
+      const Runtime& _runtime,
+      Metrics* _metrics);
+
   Future<Nothing> recover();
 
   Future<string> getServiceEndpoint(const Service& service);
@@ -180,8 +186,15 @@ private:
 
   http::Headers headers;
   Option<string> apiVersion;
+
+  // This is for the managed CSI plugin which will be launched as
+  // standalone containers.
   hashmap<Service, ContainerID> serviceContainers;
 
+  // This is for the unmanaged CSI plugin which is already deployed
+  // out of Mesos.
+  hashmap<Service, string> serviceEndpoints;
+
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
   hashmap<ContainerID, Owned<Promise<string>>> endpoints;
 };
@@ -233,8 +246,45 @@ ServiceManagerProcess::ServiceManagerProcess(
 }
 
 
+ServiceManagerProcess::ServiceManagerProcess(
+    const CSIPluginInfo& _info,
+    const hashset<Service>& services,
+    const Runtime& _runtime,
+    Metrics* _metrics)
+  : ProcessBase(process::ID::generate("csi-service-manager")),
+    agentUrl(),
+    rootDir(),
+    info(_info),
+    containerPrefix(),
+    authToken(),
+    contentType(ContentType::PROTOBUF),
+    runtime(_runtime),
+    metrics(_metrics)
+{
+  foreach (const Service& service, services) {
+    foreach (const CSIPluginEndpoint& serviceEndpoint, info.endpoints()) {
+      if (serviceEndpoint.csi_service() == service) {
+        serviceEndpoints[service] = serviceEndpoint.endpoint();
+        break;
+      }
+    }
+
+    CHECK(serviceEndpoints.contains(service))
+      << service << " not found for CSI plugin type '" << info.type()
+      << "' and name '" << info.name() << "'";
+  }
+}
+
+
 Future<Nothing> ServiceManagerProcess::recover()
 {
+  // For the unmanaged CSI plugin, we do not need to recover anything.
+  if (!serviceEndpoints.empty()) {
+    return Nothing();
+  }
+
+  CHECK(!serviceContainers.empty());
+
   return getContainers()
     .then(process::defer(self(), [=](
         const hashmap<ContainerID, Option<ContainerStatus>>& containers)
@@ -346,6 +396,21 @@ Future<Nothing> ServiceManagerProcess::recover()
 
 Future<string> ServiceManagerProcess::getServiceEndpoint(const Service& service)
 {
+  // For the unmanaged CSI plugin, get its endpoint from
+  // `serviceEndpoints` directly.
+  if (!serviceEndpoints.empty()) {
+    if (serviceEndpoints.contains(service)) {
+      return serviceEndpoints.at(service);
+    } else {
+      return Failure(
+          stringify(service) + " not found for CSI plugin type '" +
+          info.type() + "' and name '" + info.name() + "'");
+    }
+  }
+
+  // For the managed CSI plugin, get its endpoint via its corresponding
+  // standalone container ID.
+  CHECK(!serviceContainers.empty());
   if (!serviceContainers.contains(service)) {
     return Failure(
         stringify(service) + " not found for CSI plugin type '" + info.type() +
@@ -362,8 +427,15 @@ Future<string> ServiceManagerProcess::getApiVersion()
     return apiVersion.get();
   }
 
-  // Ensure that the plugin has been probed (which does the API version
-  // detection) through `getEndpoint` before returning the API version.
+  // Ensure that the unmanaged CSI plugin has been probed (which does the API
+  // version detection) before returning the API version.
+  if (!serviceEndpoints.empty()) {
+    return probeEndpoint(serviceEndpoints.begin()->second)
+      .then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); }));
+  }
+
+  // For the managed CSI plugin, `probeEndpoint` will be internally called by
+  // `getEndpoint` to do the API version detection.
   CHECK(!serviceContainers.empty());
   return getEndpoint(serviceContainers.begin()->second)
     .then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); }));
@@ -790,6 +862,22 @@ ServiceManager::ServiceManager(
 }
 
 
+ServiceManager::ServiceManager(
+    const CSIPluginInfo& info,
+    const hashset<Service>& services,
+    const process::grpc::client::Runtime& runtime,
+    Metrics* metrics)
+  : process(new ServiceManagerProcess(
+        info,
+        services,
+        runtime,
+        metrics))
+{
+  process::spawn(CHECK_NOTNULL(process.get()));
+  recovered = process::dispatch(process.get(), &ServiceManagerProcess::recover);
+}
+
+
 ServiceManager::~ServiceManager()
 {
   recovered.discard();
diff --git a/src/csi/service_manager.hpp b/src/csi/service_manager.hpp
index 60a0805..76a80fb 100644
--- a/src/csi/service_manager.hpp
+++ b/src/csi/service_manager.hpp
@@ -47,10 +47,12 @@ constexpr Service NODE_SERVICE = CSIPluginContainerInfo::NODE_SERVICE;
 class ServiceManagerProcess;
 
 
-// Manages the service containers of a CSI plugin instance.
+// Manages the services of a CSI plugin instance.
 class ServiceManager
 {
 public:
+  // This is for the managed CSI plugins which will be
+  // launched as standalone containers.
   ServiceManager(
       const process::http::URL& agentUrl,
       const std::string& rootDir,
@@ -61,6 +63,14 @@ public:
       const process::grpc::client::Runtime& runtime,
       Metrics* metrics);
 
+  // This is for the unmanaged CSI plugins which we assume
+  // are already launched out of Mesos.
+  ServiceManager(
+      const CSIPluginInfo& info,
+      const hashset<Service>& services,
+      const process::grpc::client::Runtime& runtime,
+      Metrics* metrics);
+
   // Since this class contains `Owned` members which should not but can be
   // copied, explicitly make this class non-copyable.
   //