You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2020/08/21 22:03:23 UTC
[mesos] 03/03: Initialized plugins lazily in the CSI server.
This is an automated email from the ASF dual-hosted git repository.
grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4ff51041df860dbcc2247ef47a0596e5132da190
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Thu Aug 20 19:27:23 2020 -0700
Initialized plugins lazily in the CSI server.
Review: https://reviews.apache.org/r/72779/
---
src/slave/csi_server.cpp | 403 +++++++++++++++++++++++++++++------------------
src/slave/csi_server.hpp | 8 +-
2 files changed, 253 insertions(+), 158 deletions(-)
diff --git a/src/slave/csi_server.cpp b/src/slave/csi_server.cpp
index 2ba4f22..0ffe020 100644
--- a/src/slave/csi_server.cpp
+++ b/src/slave/csi_server.cpp
@@ -19,6 +19,7 @@
#include <vector>
#include <mesos/mesos.hpp>
+#include <mesos/type_utils.hpp>
#include <mesos/secret/resolver.hpp>
@@ -58,6 +59,7 @@ using mesos::csi::state::VolumeState;
using process::Failure;
using process::Future;
using process::Owned;
+using process::Promise;
using process::grpc::client::Runtime;
@@ -85,17 +87,17 @@ public:
CSIServerProcess(
const process::http::URL& _agentUrl,
const string& _rootDir,
+ const string& _pluginConfigDir,
SecretGenerator* _secretGenerator,
- SecretResolver* _secretResolver,
- hashmap<string, CSIPluginInfo> _pluginConfigs)
+ SecretResolver* _secretResolver)
: process::ProcessBase(process::ID::generate("csi-server")),
agentUrl(_agentUrl),
rootDir(_rootDir),
+ pluginConfigDir(_pluginConfigDir),
secretGenerator(_secretGenerator),
- secretResolver(_secretResolver),
- pluginConfigs(_pluginConfigs) {}
+ secretResolver(_secretResolver) {}
- Future<Nothing> start();
+ Future<Nothing> start(const SlaveID& _agentId);
Future<string> publishVolume(const Volume::Source::CSIVolume& volume);
@@ -106,73 +108,125 @@ public:
private:
struct CSIPlugin
{
- CSIPlugin(const string& metricsPrefix) : metrics(metricsPrefix) {}
+ CSIPlugin(
+ const CSIPluginInfo& _info,
+ const string& metricsPrefix)
+ : info(_info),
+ metrics(metricsPrefix) {}
CSIPluginInfo info;
Owned<ServiceManager> serviceManager;
Owned<VolumeManager> volumeManager;
Runtime runtime;
csi::Metrics metrics;
+
+ // CSI plugins are initialized lazily. When a publish/unpublish call is
+ // received for a plugin which is not yet initialized, this promise is used
+ // to perform the call after initialization is complete.
+ Promise<Nothing> initialized;
};
+ // Attempts to load configuration for a plugin with the specified name and
+ // then initializes the plugin. If no name is specified, then all
+ // configurations found in the plugin config directory are loaded.
+ Try<Nothing> initializePlugin(const Option<string>& name = None());
+
// Contains the plugins loaded by the server. The key of this map is the
// plugin name.
hashmap<string, CSIPlugin> plugins;
const process::http::URL agentUrl;
+ Option<SlaveID> agentId;
const string rootDir;
+ const string pluginConfigDir;
SecretGenerator* secretGenerator;
SecretResolver* secretResolver;
Option<string> authToken;
- hashmap<string, CSIPluginInfo> pluginConfigs;
- Option<SlaveID> agentId;
};
-Future<Nothing> CSIServerProcess::start()
+Try<Nothing> CSIServerProcess::initializePlugin(const Option<string>& name)
{
- Future<Nothing> result = Nothing();
+ if (name.isSome()) {
+ CHECK(!plugins.contains(name.get()));
+ }
- // The contents of this principal are arbitrary. We choose to avoid a
- // principal with a 'value' string so that we do not unintentionally collide
- // with another real principal with restricted permissions.
- Principal principal(Option<string>::none(), {{"key", "csi-server"}});
+ Try<list<string>> entries = os::ls(pluginConfigDir);
+ if (entries.isError()) {
+ return Error(
+ "Unable to list the CSI plugin configuration directory '" +
+ pluginConfigDir + "': " + entries.error());
+ }
+
+ // We are either looking for one specific plugin (if `name` is SOME), or we
+ // are loading all configs we find (if `name` is NONE). First, we populate
+ // `pluginConfigs` with one or more valid configurations. Then, we will
+ // initialize the plugin(s) based on the configuration(s) found.
+ hashmap<string, CSIPluginInfo> pluginConfigs;
+
+ foreach (const string& entry, entries.get()) {
+ const string path = path::join(pluginConfigDir, entry);
+
+ // Ignore directory entries.
+ if (os::stat::isdir(path)) {
+ continue;
+ }
+
+ Try<string> read = os::read(path);
+ if (read.isError()) {
+ // In case of an error we log and skip to the next entry.
+ LOG(ERROR) << "Failed to read CSI plugin configuration file '"
+ << path << "': " << read.error();
+
+ continue;
+ }
+
+ Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
+ if (json.isError()) {
+ return Error("JSON parse of '" + path + "' failed: " + json.error());
+ }
+
+ Try<CSIPluginInfo> parse = ::protobuf::parse<CSIPluginInfo>(json.get());
+ if (parse.isError()) {
+ return Error("Protobuf parse of '" + path + "' failed: " + parse.error());
+ }
+
+ const CSIPluginInfo& csiPluginConfig = parse.get();
+ const string& type = csiPluginConfig.type();
+
+ if (pluginConfigs.contains(type)) {
+ LOG(ERROR) << "Multiple configurations for a CSI plugin are not allowed. "
+ << "Skipping configuration file '" << path << "' since CSI "
+ << "plugin '" << type << "' already exists";
+ continue;
+ }
+
+ if (name.isNone() || name.get() == type) {
+ pluginConfigs[type] = csiPluginConfig;
- if (secretGenerator) {
- result = secretGenerator->generate(principal)
- .then([=](const Secret& secret) -> Future<Nothing> {
- Option<Error> error = common::validation::validateSecret(secret);
- if (error.isSome()) {
- return Failure(
- "CSI server failed to validate generated secret: " +
- error->message);
- }
-
- if (secret.type() != Secret::VALUE) {
- return Failure(
- "CSI server expecting generated secret to be of VALUE type "
- "instead of " + stringify(secret.type()) + " type; " +
- "only VALUE type secrets are supported at this time");
- }
-
- CHECK(secret.has_value());
-
- authToken = secret.value().data();
-
- return Nothing();
- });
+ if (name.isSome()) {
+ break;
+ }
+ }
}
- // Initialize CSI plugins.
- vector<Future<Nothing>> initializations;
+ if (pluginConfigs.empty()) {
+ return Error(
+ "No valid CSI plugin configurations found in '" +
+ pluginConfigDir + "'");
+ }
- foreachpair (const string& name, const CSIPluginInfo& info, pluginConfigs) {
+ foreachpair (const string& _name, const CSIPluginInfo& info, pluginConfigs) {
// Default-construct the plugin struct so that we have a valid runtime
// to pass into the service manager.
- plugins.put(name, CSIPlugin("csi_plugins/" + name + "/"));
+ plugins.put(_name, CSIPlugin(info, "csi_plugins/" + _name + "/"));
+
+ CSIPlugin& plugin = plugins.at(_name);
if (info.containers_size() > 0) {
- plugins.at(name).serviceManager.reset(new ServiceManager(
+ CHECK_SOME(agentId);
+
+ plugin.serviceManager.reset(new ServiceManager(
agentId.get(),
agentUrl,
rootDir,
@@ -180,56 +234,121 @@ Future<Nothing> CSIServerProcess::start()
extractServices(info),
"org-apache-mesos-internal-",
authToken,
- plugins.at(name).runtime,
- &plugins.at(name).metrics));
+ plugin.runtime,
+ &plugin.metrics));
} else {
CHECK(info.endpoints_size() > 0);
- plugins.at(name).serviceManager.reset(new ServiceManager(
+ plugin.serviceManager.reset(new ServiceManager(
info,
extractServices(info),
- plugins.at(name).runtime,
- &plugins.at(name).metrics));
+ plugin.runtime,
+ &plugin.metrics));
}
- initializations.push_back(plugins.at(name).serviceManager->recover()
- .then(defer(self(), [=]() {
- CHECK(plugins.contains(name));
+ plugin.initialized.associate(
+ plugin.serviceManager->recover()
+ .then(defer(self(), [=]() {
+ CHECK(plugins.contains(_name));
+
+ return plugins.at(_name).serviceManager->getApiVersion();
+ }))
+ .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
+ CHECK(plugins.contains(_name));
+
+ Try<Owned<VolumeManager>> volumeManager = VolumeManager::create(
+ rootDir,
+ info,
+ extractServices(info),
+ apiVersion,
+ plugins.at(_name).runtime,
+ plugins.at(_name).serviceManager.get(),
+ &plugins.at(_name).metrics,
+ secretResolver);
+
+ if (volumeManager.isError()) {
+ return Failure(
+ "CSI server failed to create volume manager for plugin"
+ " '" + info.name() + "': " + volumeManager.error());
+ }
+
+ plugins.at(_name).volumeManager = std::move(volumeManager.get());
+
+ return plugins.at(_name).volumeManager->recover();
+ }))
+ .onAny(defer(self(), [=](const Future<Nothing>& future) {
+ if (!future.isReady()) {
+ plugins.erase(_name);
+
+ LOG(ERROR)
+ << "CSI server failed to initialize plugin '" << _name << "': "
+ << (future.isFailed() ? future.failure() : "discarded");
+ } else {
+ LOG(INFO)
+ << "CSI server successfully initialized plugin '"
+ << _name << "'";
+ }
+ })));
+ }
- return plugins.at(name).serviceManager->getApiVersion();
- }))
- .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
- CHECK(plugins.contains(name));
-
- Try<Owned<VolumeManager>> volumeManager = VolumeManager::create(
- rootDir,
- info,
- extractServices(info),
- apiVersion,
- plugins.at(name).runtime,
- plugins.at(name).serviceManager.get(),
- &plugins.at(name).metrics,
- secretResolver);
-
- if (volumeManager.isError()) {
- return Failure(
- "CSI server failed to create volume manager for plugin"
- " '" + info.name() + "': " + volumeManager.error());
- }
-
- plugins.at(name).volumeManager = std::move(volumeManager.get());
-
- return plugins.at(name).volumeManager->recover();
- })));
+ return Nothing();
+}
+
+
+Future<Nothing> CSIServerProcess::start(const SlaveID& _agentId)
+{
+ // NOTE: It's possible that the agent receives multiple
+ // `SlaveRegisteredMessage`s and detects a disconnection in between.
+ // In that case, `start` will be called multiple times from
+ // `Slave::registered`.
+ if (agentId.isSome()) {
+ CHECK_EQ(agentId.get(), _agentId)
+ << "Cannot start CSI server with agent ID " << _agentId
+ << " (expected: " << agentId.get() << ")";
+
+ return Nothing();
+ }
+
+ agentId = _agentId;
+
+ // Load all CSI plugin configurations found.
+ Try<Nothing> init = initializePlugin();
+ if (init.isError()) {
+ return Failure(
+ "CSI server failed to initialize CSI plugins: " + init.error());
}
- return result
- .then([=]() {
- return process::collect(initializations);
- })
- .then([=]() {
+ if (!secretGenerator) {
+ return Nothing();
+ }
+
+ // The contents of this principal are arbitrary. We choose to avoid a
+ // principal with a 'value' string so that we do not unintentionally collide
+ // with another real principal with restricted permissions.
+ Principal principal(Option<string>::none(), {{"key", "csi-server"}});
+
+ return secretGenerator->generate(principal)
+ .then([=](const Secret& secret) -> Future<Nothing> {
+ Option<Error> error = common::validation::validateSecret(secret);
+ if (error.isSome()) {
+ return Failure(
+ "CSI server failed to validate generated secret: " +
+ error->message);
+ }
+
+ if (secret.type() != Secret::VALUE) {
+ return Failure(
+ "CSI server expecting generated secret to be of VALUE type "
+ "instead of " + stringify(secret.type()) + " type; " +
+ "only VALUE type secrets are supported at this time");
+ }
+
+ CHECK(secret.has_value());
+
+ authToken = secret.value().data();
+
return Nothing();
- });
+ });
}
@@ -238,15 +357,31 @@ Future<string> CSIServerProcess::publishVolume(
{
CHECK(volume.has_static_provisioning());
- if (!plugins.contains(volume.plugin_name())) {
- return Failure("Invalid CSI plugin '" + volume.plugin_name() + "'");
+ const string& name = volume.plugin_name();
+
+ if (!plugins.contains(name)) {
+ // This will attempt to load the plugin's configuration, initialize the
+ // plugin, and insert it into the `plugins` map.
+ Try<Nothing> pluginInit = initializePlugin(name);
+ if (pluginInit.isError()) {
+ return Failure(
+ "Failed to initialize CSI plugin '" +
+ name + "': " + pluginInit.error());
+ }
}
- return plugins.at(volume.plugin_name()).volumeManager->publishVolume(
- volume.static_provisioning().volume_id(),
- createVolumeState(volume.static_provisioning()))
- .then([=]() {
- CHECK(plugins.contains(volume.plugin_name()));
+ CHECK(plugins.contains(name));
+
+ return plugins.at(name).initialized.future()
+ .then(defer(self(), [=]() {
+ CHECK(plugins.contains(name));
+
+ return plugins.at(name).volumeManager->publishVolume(
+ volume.static_provisioning().volume_id(),
+ createVolumeState(volume.static_provisioning()));
+ }))
+ .then(defer(self(), [=]() {
+ CHECK(plugins.contains(name));
const CSIPluginInfo& info = plugins.at(volume.plugin_name()).info;
@@ -257,7 +392,7 @@ Future<string> CSIServerProcess::publishVolume(
return csi::paths::getMountTargetPath(
mountRootDir,
volume.static_provisioning().volume_id());
- });
+ }));
}
@@ -266,10 +401,22 @@ Future<Nothing> CSIServerProcess::unpublishVolume(
const string& volumeId)
{
if (!plugins.contains(pluginName)) {
- return Failure("Invalid CSI plugin '" + pluginName + "'");
+ // This will attempt to load the plugin's configuration, initialize the
+ // plugin, and insert it into the `plugins` map.
+ Try<Nothing> pluginInit = initializePlugin(pluginName);
+ if (pluginInit.isError()) {
+ return Failure(
+ "Failed to initialize CSI plugin '" +
+ pluginName + "': " + pluginInit.error());
+ }
}
- return plugins.at(pluginName).volumeManager->unpublishVolume(volumeId);
+ CHECK(plugins.contains(pluginName));
+
+ return plugins.at(pluginName).initialized.future()
+ .then(defer(self(), [=]() {
+ return plugins.at(pluginName).volumeManager->unpublishVolume(volumeId);
+ }));
}
@@ -313,15 +460,15 @@ hashset<CSIPluginContainerInfo::Service> extractServices(
CSIServer::CSIServer(
const process::http::URL& agentUrl,
const string& rootDir,
+ const string& pluginConfigDir,
SecretGenerator* secretGenerator,
- SecretResolver* secretResolver,
- const hashmap<string, CSIPluginInfo>& pluginConfigs)
+ SecretResolver* secretResolver)
: process(new CSIServerProcess(
agentUrl,
rootDir,
+ pluginConfigDir,
secretGenerator,
- secretResolver,
- pluginConfigs))
+ secretResolver))
{
process::spawn(CHECK_NOTNULL(process.get()));
}
@@ -355,73 +502,21 @@ Try<Owned<CSIServer>> CSIServer::create(
flags.csi_plugin_config_dir.get() + "' does not exist");
}
- Try<list<string>> entries = os::ls(flags.csi_plugin_config_dir.get());
- if (entries.isError()) {
- return Error(
- "Unable to list the CSI plugin configuration directory '" +
- flags.csi_plugin_config_dir.get()+ "': " + entries.error());
- }
-
- hashmap<std::string, CSIPluginInfo> pluginConfigs;
-
- foreach (const string& entry, entries.get()) {
- const string path = path::join(flags.csi_plugin_config_dir.get(), entry);
-
- // Ignore directory entries.
- if (os::stat::isdir(path)) {
- continue;
- }
-
- Try<string> read = os::read(path);
- if (read.isError()) {
- // In case of an error we log and skip to the next entry.
- LOG(ERROR) << "Failed to read CSI plugin configuration file '"
- << path << "': " << read.error();
-
- continue;
- }
-
- Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
- if (json.isError()) {
- return Error("JSON parse failed: " + json.error());
- }
-
- Try<CSIPluginInfo> parse = ::protobuf::parse<CSIPluginInfo>(json.get());
- if (parse.isError()) {
- return Error("Protobuf parse failed: " + parse.error());
- }
-
- const CSIPluginInfo& csiPluginConfig = parse.get();
- const string& type = csiPluginConfig.type();
-
- if (pluginConfigs.contains(type)) {
- LOG(ERROR) << "Multiple configurations for a CSI plugin are not allowed. "
- << "Skipping configuration file '" << path << "' since CSI "
- << "plugin '" << type << "' already exists";
- continue;
- }
-
- pluginConfigs[type] = csiPluginConfig;
- }
-
- if (pluginConfigs.empty()) {
- return Error(
- "No valid CSI plugin configurations found in '" +
- flags.csi_plugin_config_dir.get() + "'");
- }
-
return new CSIServer(
agentUrl,
slave::paths::getCsiRootDir(flags.work_dir),
+ flags.csi_plugin_config_dir.get(),
secretGenerator,
- secretResolver,
- pluginConfigs);
+ secretResolver);
}
-Future<Nothing> CSIServer::start()
+Future<Nothing> CSIServer::start(const SlaveID& agentId)
{
- started.associate(process::dispatch(process.get(), &CSIServerProcess::start));
+ started.associate(process::dispatch(
+ process.get(),
+ &CSIServerProcess::start,
+ agentId));
return started.future();
}
diff --git a/src/slave/csi_server.hpp b/src/slave/csi_server.hpp
index f5ec766..de5c6b6 100644
--- a/src/slave/csi_server.hpp
+++ b/src/slave/csi_server.hpp
@@ -60,7 +60,7 @@ public:
// Starts the CSI server. Any `publishVolume()` or `unpublishVolume()` calls
// which were made previously will be executed after this method is called.
// Returns a future which is satisfied once initialization is complete.
- process::Future<Nothing> start();
+ process::Future<Nothing> start(const SlaveID& agentId);
// Publish a CSI volume to this agent. If the `start()` method has not yet
// been called, then the publishing of this volume will not be completed until
@@ -79,10 +79,10 @@ public:
private:
CSIServer(
const process::http::URL& agentUrl,
- const std::string& csiRootDir,
+ const std::string& rootDir,
+ const std::string& pluginConfigDir,
SecretGenerator* secretGenerator,
- SecretResolver* secretResolver,
- const hashmap<std::string, CSIPluginInfo>& csiPluginConfigs);
+ SecretResolver* secretResolver);
process::Owned<CSIServerProcess> process;