You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by qi...@apache.org on 2020/08/03 02:51:36 UTC
[mesos] 04/04: Improved CSI service manager to support unmanaged
CSI plugins.
This is an automated email from the ASF dual-hosted git repository.
qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d2c84d14b63f137546c00d69b0309c4543811732
Author: Qian Zhang <zh...@gmail.com>
AuthorDate: Wed Jul 15 16:02:48 2020 +0800
Improved CSI service manager to support unmanaged CSI plugins.
Review: https://reviews.apache.org/r/72683
---
src/csi/service_manager.cpp | 92 ++++++++++++++++++++++++++++++++++++++++++++-
src/csi/service_manager.hpp | 12 +++++-
2 files changed, 101 insertions(+), 3 deletions(-)
diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp
index a87df96..7a8d8e5 100644
--- a/src/csi/service_manager.cpp
+++ b/src/csi/service_manager.cpp
@@ -137,6 +137,12 @@ public:
const Runtime& _runtime,
Metrics* _metrics);
+ ServiceManagerProcess(
+ const CSIPluginInfo& _info,
+ const hashset<Service>& services,
+ const Runtime& _runtime,
+ Metrics* _metrics);
+
Future<Nothing> recover();
Future<string> getServiceEndpoint(const Service& service);
@@ -180,8 +186,15 @@ private:
http::Headers headers;
Option<string> apiVersion;
+
+ // This is for the managed CSI plugin which will be launched as
+ // standalone containers.
hashmap<Service, ContainerID> serviceContainers;
+ // This is for the unmanaged CSI plugin which is already deployed
+ // out of Mesos.
+ hashmap<Service, string> serviceEndpoints;
+
hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
hashmap<ContainerID, Owned<Promise<string>>> endpoints;
};
@@ -233,8 +246,45 @@ ServiceManagerProcess::ServiceManagerProcess(
}
+ServiceManagerProcess::ServiceManagerProcess(
+ const CSIPluginInfo& _info,
+ const hashset<Service>& services,
+ const Runtime& _runtime,
+ Metrics* _metrics)
+ : ProcessBase(process::ID::generate("csi-service-manager")),
+ agentUrl(),
+ rootDir(),
+ info(_info),
+ containerPrefix(),
+ authToken(),
+ contentType(ContentType::PROTOBUF),
+ runtime(_runtime),
+ metrics(_metrics)
+{
+ foreach (const Service& service, services) {
+ foreach (const CSIPluginEndpoint& serviceEndpoint, info.endpoints()) {
+ if (serviceEndpoint.csi_service() == service) {
+ serviceEndpoints[service] = serviceEndpoint.endpoint();
+ break;
+ }
+ }
+
+ CHECK(serviceEndpoints.contains(service))
+ << service << " not found for CSI plugin type '" << info.type()
+ << "' and name '" << info.name() << "'";
+ }
+}
+
+
Future<Nothing> ServiceManagerProcess::recover()
{
+ // For the unmanaged CSI plugin, we do not need to recover anything.
+ if (!serviceEndpoints.empty()) {
+ return Nothing();
+ }
+
+ CHECK(!serviceContainers.empty());
+
return getContainers()
.then(process::defer(self(), [=](
const hashmap<ContainerID, Option<ContainerStatus>>& containers)
@@ -346,6 +396,21 @@ Future<Nothing> ServiceManagerProcess::recover()
Future<string> ServiceManagerProcess::getServiceEndpoint(const Service& service)
{
+ // For the unmanaged CSI plugin, get its endpoint from
+ // `serviceEndpoints` directly.
+ if (!serviceEndpoints.empty()) {
+ if (serviceEndpoints.contains(service)) {
+ return serviceEndpoints.at(service);
+ } else {
+ return Failure(
+ stringify(service) + " not found for CSI plugin type '" +
+ info.type() + "' and name '" + info.name() + "'");
+ }
+ }
+
+ // For the managed CSI plugin, get its endpoint via its corresponding
+ // standalone container ID.
+ CHECK(!serviceContainers.empty());
if (!serviceContainers.contains(service)) {
return Failure(
stringify(service) + " not found for CSI plugin type '" + info.type() +
@@ -362,8 +427,15 @@ Future<string> ServiceManagerProcess::getApiVersion()
return apiVersion.get();
}
- // Ensure that the plugin has been probed (which does the API version
- // detection) through `getEndpoint` before returning the API version.
+ // Ensure that the unmanaged CSI plugin has been probed (which does the API
+ // version detection) before returning the API version.
+ if (!serviceEndpoints.empty()) {
+ return probeEndpoint(serviceEndpoints.begin()->second)
+ .then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); }));
+ }
+
+ // For the managed CSI plugin, `probeEndpoint` will be internally called by
+ // `getEndpoint` to do the API version detection.
CHECK(!serviceContainers.empty());
return getEndpoint(serviceContainers.begin()->second)
.then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); }));
@@ -790,6 +862,22 @@ ServiceManager::ServiceManager(
}
+ServiceManager::ServiceManager(
+ const CSIPluginInfo& info,
+ const hashset<Service>& services,
+ const process::grpc::client::Runtime& runtime,
+ Metrics* metrics)
+ : process(new ServiceManagerProcess(
+ info,
+ services,
+ runtime,
+ metrics))
+{
+ process::spawn(CHECK_NOTNULL(process.get()));
+ recovered = process::dispatch(process.get(), &ServiceManagerProcess::recover);
+}
+
+
ServiceManager::~ServiceManager()
{
recovered.discard();
diff --git a/src/csi/service_manager.hpp b/src/csi/service_manager.hpp
index 60a0805..76a80fb 100644
--- a/src/csi/service_manager.hpp
+++ b/src/csi/service_manager.hpp
@@ -47,10 +47,12 @@ constexpr Service NODE_SERVICE = CSIPluginContainerInfo::NODE_SERVICE;
class ServiceManagerProcess;
-// Manages the service containers of a CSI plugin instance.
+// Manages the services of a CSI plugin instance.
class ServiceManager
{
public:
+ // This is for the managed CSI plugins which will be
+ // launched as standalone containers.
ServiceManager(
const process::http::URL& agentUrl,
const std::string& rootDir,
@@ -61,6 +63,14 @@ public:
const process::grpc::client::Runtime& runtime,
Metrics* metrics);
+ // This is for the unmanaged CSI plugins which we assume
+ // are already launched out of Mesos.
+ ServiceManager(
+ const CSIPluginInfo& info,
+ const hashset<Service>& services,
+ const process::grpc::client::Runtime& runtime,
+ Metrics* metrics);
+
// Since this class contains `Owned` members which should not but can be
// copied, explicitly make this class non-copyable.
//