You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2020/08/21 22:03:23 UTC

[mesos] 03/03: Initialized plugins lazily in the CSI server.

This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 4ff51041df860dbcc2247ef47a0596e5132da190
Author: Greg Mann <gr...@mesosphere.io>
AuthorDate: Thu Aug 20 19:27:23 2020 -0700

    Initialized plugins lazily in the CSI server.
    
    Review: https://reviews.apache.org/r/72779/
---
 src/slave/csi_server.cpp | 403 +++++++++++++++++++++++++++++------------------
 src/slave/csi_server.hpp |   8 +-
 2 files changed, 253 insertions(+), 158 deletions(-)

diff --git a/src/slave/csi_server.cpp b/src/slave/csi_server.cpp
index 2ba4f22..0ffe020 100644
--- a/src/slave/csi_server.cpp
+++ b/src/slave/csi_server.cpp
@@ -19,6 +19,7 @@
 #include <vector>
 
 #include <mesos/mesos.hpp>
+#include <mesos/type_utils.hpp>
 
 #include <mesos/secret/resolver.hpp>
 
@@ -58,6 +59,7 @@ using mesos::csi::state::VolumeState;
 using process::Failure;
 using process::Future;
 using process::Owned;
+using process::Promise;
 
 using process::grpc::client::Runtime;
 
@@ -85,17 +87,17 @@ public:
   CSIServerProcess(
       const process::http::URL& _agentUrl,
       const string& _rootDir,
+      const string& _pluginConfigDir,
       SecretGenerator* _secretGenerator,
-      SecretResolver* _secretResolver,
-      hashmap<string, CSIPluginInfo> _pluginConfigs)
+      SecretResolver* _secretResolver)
     : process::ProcessBase(process::ID::generate("csi-server")),
       agentUrl(_agentUrl),
       rootDir(_rootDir),
+      pluginConfigDir(_pluginConfigDir),
       secretGenerator(_secretGenerator),
-      secretResolver(_secretResolver),
-      pluginConfigs(_pluginConfigs) {}
+      secretResolver(_secretResolver) {}
 
-  Future<Nothing> start();
+  Future<Nothing> start(const SlaveID& _agentId);
 
   Future<string> publishVolume(const Volume::Source::CSIVolume& volume);
 
@@ -106,73 +108,125 @@ public:
 private:
   struct CSIPlugin
   {
-    CSIPlugin(const string& metricsPrefix) : metrics(metricsPrefix) {}
+    CSIPlugin(
+        const CSIPluginInfo& _info,
+        const string& metricsPrefix)
+      : info(_info),
+        metrics(metricsPrefix) {}
 
     CSIPluginInfo info;
     Owned<ServiceManager> serviceManager;
     Owned<VolumeManager> volumeManager;
     Runtime runtime;
     csi::Metrics metrics;
+
+    // CSI plugins are initialized lazily. When a publish/unpublish call is
+    // received for a plugin which is not yet initialized, this promise is used
+    // to perform the call after initialization is complete.
+    Promise<Nothing> initialized;
   };
 
+  // Attempts to load configuration for a plugin with the specified name and
+  // then initializes the plugin. If no name is specified, then all
+  // configurations found in the plugin config directory are loaded.
+  Try<Nothing> initializePlugin(const Option<string>& name = None());
+
   // Contains the plugins loaded by the server. The key of this map is the
   // plugin name.
   hashmap<string, CSIPlugin> plugins;
 
   const process::http::URL agentUrl;
+  Option<SlaveID> agentId;
   const string rootDir;
+  const string pluginConfigDir;
   SecretGenerator* secretGenerator;
   SecretResolver* secretResolver;
   Option<string> authToken;
-  hashmap<string, CSIPluginInfo> pluginConfigs;
-  Option<SlaveID> agentId;
 };
 
 
-Future<Nothing> CSIServerProcess::start()
+Try<Nothing> CSIServerProcess::initializePlugin(const Option<string>& name)
 {
-  Future<Nothing> result = Nothing();
+  if (name.isSome()) {
+    CHECK(!plugins.contains(name.get()));
+  }
 
-  // The contents of this principal are arbitrary. We choose to avoid a
-  // principal with a 'value' string so that we do not unintentionally collide
-  // with another real principal with restricted permissions.
-  Principal principal(Option<string>::none(), {{"key", "csi-server"}});
+  Try<list<string>> entries = os::ls(pluginConfigDir);
+  if (entries.isError()) {
+    return Error(
+        "Unable to list the CSI plugin configuration directory '" +
+        pluginConfigDir + "': " + entries.error());
+  }
+
+  // We are either looking for one specific plugin (if `name` is SOME), or we
+  // are loading all configs we find (if `name` is NONE). First, we populate
+  // `pluginConfigs` with one or more valid configurations. Then, we will
+  // initialize the plugin(s) based on the configuration(s) found.
+  hashmap<string, CSIPluginInfo> pluginConfigs;
+
+  foreach (const string& entry, entries.get()) {
+    const string path = path::join(pluginConfigDir, entry);
+
+    // Ignore directory entries.
+    if (os::stat::isdir(path)) {
+      continue;
+    }
+
+    Try<string> read = os::read(path);
+    if (read.isError()) {
+      // In case of an error we log and skip to the next entry.
+      LOG(ERROR) << "Failed to read CSI plugin configuration file '"
+                 << path << "': " << read.error();
+
+      continue;
+    }
+
+    Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
+    if (json.isError()) {
+      return Error("JSON parse of '" + path + "' failed: " + json.error());
+    }
+
+    Try<CSIPluginInfo> parse = ::protobuf::parse<CSIPluginInfo>(json.get());
+    if (parse.isError()) {
+      return Error("Protobuf parse of '" + path + "' failed: " + parse.error());
+    }
+
+    const CSIPluginInfo& csiPluginConfig = parse.get();
+    const string& type = csiPluginConfig.type();
+
+    if (pluginConfigs.contains(type)) {
+      LOG(ERROR) << "Multiple configurations for a CSI plugin are not allowed. "
+                 << "Skipping configuration file '" << path << "' since CSI "
+                 << "plugin '" << type << "' already exists";
+      continue;
+    }
+
+    if (name.isNone() || name.get() == type) {
+      pluginConfigs[type] = csiPluginConfig;
 
-  if (secretGenerator) {
-    result = secretGenerator->generate(principal)
-      .then([=](const Secret& secret) -> Future<Nothing> {
-        Option<Error> error = common::validation::validateSecret(secret);
-        if (error.isSome()) {
-          return Failure(
-              "CSI server failed to validate generated secret: " +
-              error->message);
-        }
-
-        if (secret.type() != Secret::VALUE) {
-          return Failure(
-              "CSI server expecting generated secret to be of VALUE type "
-              "instead of " + stringify(secret.type()) + " type; " +
-              "only VALUE type secrets are supported at this time");
-        }
-
-        CHECK(secret.has_value());
-
-        authToken = secret.value().data();
-
-        return Nothing();
-    });
+      if (name.isSome()) {
+        break;
+      }
+    }
   }
 
-  // Initialize CSI plugins.
-  vector<Future<Nothing>> initializations;
+  if (pluginConfigs.empty()) {
+    return Error(
+        "No valid CSI plugin configurations found in '" +
+        pluginConfigDir + "'");
+  }
 
-  foreachpair (const string& name, const CSIPluginInfo& info, pluginConfigs) {
+  foreachpair (const string& _name, const CSIPluginInfo& info, pluginConfigs) {
     // Default-construct the plugin struct so that we have a valid runtime
     // to pass into the service manager.
-    plugins.put(name, CSIPlugin("csi_plugins/" + name + "/"));
+    plugins.put(_name, CSIPlugin(info, "csi_plugins/" + _name + "/"));
+
+    CSIPlugin& plugin = plugins.at(_name);
 
     if (info.containers_size() > 0) {
-      plugins.at(name).serviceManager.reset(new ServiceManager(
+      CHECK_SOME(agentId);
+
+      plugin.serviceManager.reset(new ServiceManager(
           agentId.get(),
           agentUrl,
           rootDir,
@@ -180,56 +234,121 @@ Future<Nothing> CSIServerProcess::start()
           extractServices(info),
           "org-apache-mesos-internal-",
           authToken,
-          plugins.at(name).runtime,
-          &plugins.at(name).metrics));
+          plugin.runtime,
+          &plugin.metrics));
     } else {
       CHECK(info.endpoints_size() > 0);
 
-      plugins.at(name).serviceManager.reset(new ServiceManager(
+      plugin.serviceManager.reset(new ServiceManager(
           info,
           extractServices(info),
-          plugins.at(name).runtime,
-          &plugins.at(name).metrics));
+          plugin.runtime,
+          &plugin.metrics));
     }
 
-    initializations.push_back(plugins.at(name).serviceManager->recover()
-      .then(defer(self(), [=]() {
-        CHECK(plugins.contains(name));
+    plugin.initialized.associate(
+        plugin.serviceManager->recover()
+          .then(defer(self(), [=]() {
+            CHECK(plugins.contains(_name));
+
+            return plugins.at(_name).serviceManager->getApiVersion();
+          }))
+          .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
+            CHECK(plugins.contains(_name));
+
+            Try<Owned<VolumeManager>> volumeManager = VolumeManager::create(
+                rootDir,
+                info,
+                extractServices(info),
+                apiVersion,
+                plugins.at(_name).runtime,
+                plugins.at(_name).serviceManager.get(),
+                &plugins.at(_name).metrics,
+                secretResolver);
+
+            if (volumeManager.isError()) {
+              return Failure(
+                  "CSI server failed to create volume manager for plugin"
+                  " '" + info.name() + "': " + volumeManager.error());
+            }
+
+            plugins.at(_name).volumeManager = std::move(volumeManager.get());
+
+            return plugins.at(_name).volumeManager->recover();
+          }))
+          .onAny(defer(self(), [=](const Future<Nothing>& future) {
+            if (!future.isReady()) {
+              plugins.erase(_name);
+
+              LOG(ERROR)
+                << "CSI server failed to initialize plugin '" << _name << "': "
+                << (future.isFailed() ? future.failure() : "discarded");
+            } else {
+              LOG(INFO)
+                << "CSI server successfully initialized plugin '"
+                << _name << "'";
+            }
+          })));
+  }
 
-        return plugins.at(name).serviceManager->getApiVersion();
-      }))
-      .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
-        CHECK(plugins.contains(name));
-
-        Try<Owned<VolumeManager>> volumeManager = VolumeManager::create(
-            rootDir,
-            info,
-            extractServices(info),
-            apiVersion,
-            plugins.at(name).runtime,
-            plugins.at(name).serviceManager.get(),
-            &plugins.at(name).metrics,
-            secretResolver);
-
-        if (volumeManager.isError()) {
-          return Failure(
-              "CSI server failed to create volume manager for plugin"
-              " '" + info.name() + "': " + volumeManager.error());
-        }
-
-        plugins.at(name).volumeManager = std::move(volumeManager.get());
-
-        return plugins.at(name).volumeManager->recover();
-      })));
+  return Nothing();
+}
+
+
+Future<Nothing> CSIServerProcess::start(const SlaveID& _agentId)
+{
+  // NOTE: It's possible that the agent receives multiple
+  // `SlaveRegisteredMessage`s and detects a disconnection in between.
+  // In that case, `start` will be called multiple times from
+  // `Slave::registered`.
+  if (agentId.isSome()) {
+    CHECK_EQ(agentId.get(), _agentId)
+      << "Cannot start CSI server with agent ID " << _agentId
+      << " (expected: " << agentId.get() << ")";
+
+    return Nothing();
+  }
+
+  agentId = _agentId;
+
+  // Load all CSI plugin configurations found.
+  Try<Nothing> init = initializePlugin();
+  if (init.isError()) {
+    return Failure(
+        "CSI server failed to initialize CSI plugins: " + init.error());
   }
 
-  return result
-    .then([=]() {
-      return process::collect(initializations);
-    })
-    .then([=]() {
+  if (!secretGenerator) {
+    return Nothing();
+  }
+
+  // The contents of this principal are arbitrary. We choose to avoid a
+  // principal with a 'value' string so that we do not unintentionally collide
+  // with another real principal with restricted permissions.
+  Principal principal(Option<string>::none(), {{"key", "csi-server"}});
+
+  return secretGenerator->generate(principal)
+    .then([=](const Secret& secret) -> Future<Nothing> {
+      Option<Error> error = common::validation::validateSecret(secret);
+      if (error.isSome()) {
+        return Failure(
+            "CSI server failed to validate generated secret: " +
+            error->message);
+      }
+
+      if (secret.type() != Secret::VALUE) {
+        return Failure(
+            "CSI server expecting generated secret to be of VALUE type "
+            "instead of " + stringify(secret.type()) + " type; " +
+            "only VALUE type secrets are supported at this time");
+      }
+
+      CHECK(secret.has_value());
+
+      authToken = secret.value().data();
+
       return Nothing();
-    });
+  });
 }
 
 
@@ -238,15 +357,31 @@ Future<string> CSIServerProcess::publishVolume(
 {
   CHECK(volume.has_static_provisioning());
 
-  if (!plugins.contains(volume.plugin_name())) {
-    return Failure("Invalid CSI plugin '" + volume.plugin_name() + "'");
+  const string& name = volume.plugin_name();
+
+  if (!plugins.contains(name)) {
+    // This will attempt to load the plugin's configuration, initialize the
+    // plugin, and insert it into the `plugins` map.
+    Try<Nothing> pluginInit = initializePlugin(name);
+    if (pluginInit.isError()) {
+      return Failure(
+          "Failed to initialize CSI plugin '" +
+          name + "': " + pluginInit.error());
+    }
   }
 
-  return plugins.at(volume.plugin_name()).volumeManager->publishVolume(
-      volume.static_provisioning().volume_id(),
-      createVolumeState(volume.static_provisioning()))
-    .then([=]() {
-      CHECK(plugins.contains(volume.plugin_name()));
+  CHECK(plugins.contains(name));
+
+  return plugins.at(name).initialized.future()
+    .then(defer(self(), [=]() {
+      CHECK(plugins.contains(name));
+
+      return plugins.at(name).volumeManager->publishVolume(
+          volume.static_provisioning().volume_id(),
+          createVolumeState(volume.static_provisioning()));
+      }))
+    .then(defer(self(), [=]() {
+      CHECK(plugins.contains(name));
 
       const CSIPluginInfo& info = plugins.at(volume.plugin_name()).info;
 
@@ -257,7 +392,7 @@ Future<string> CSIServerProcess::publishVolume(
       return csi::paths::getMountTargetPath(
           mountRootDir,
           volume.static_provisioning().volume_id());
-    });
+    }));
 }
 
 
@@ -266,10 +401,22 @@ Future<Nothing> CSIServerProcess::unpublishVolume(
     const string& volumeId)
 {
   if (!plugins.contains(pluginName)) {
-    return Failure("Invalid CSI plugin '" + pluginName + "'");
+    // This will attempt to load the plugin's configuration, initialize the
+    // plugin, and insert it into the `plugins` map.
+    Try<Nothing> pluginInit = initializePlugin(pluginName);
+    if (pluginInit.isError()) {
+      return Failure(
+          "Failed to initialize CSI plugin '" +
+          pluginName + "': " + pluginInit.error());
+    }
   }
 
-  return plugins.at(pluginName).volumeManager->unpublishVolume(volumeId);
+  CHECK(plugins.contains(pluginName));
+
+  return plugins.at(pluginName).initialized.future()
+    .then(defer(self(), [=]() {
+      return plugins.at(pluginName).volumeManager->unpublishVolume(volumeId);
+    }));
 }
 
 
@@ -313,15 +460,15 @@ hashset<CSIPluginContainerInfo::Service> extractServices(
 CSIServer::CSIServer(
     const process::http::URL& agentUrl,
     const string& rootDir,
+    const string& pluginConfigDir,
     SecretGenerator* secretGenerator,
-    SecretResolver* secretResolver,
-    const hashmap<string, CSIPluginInfo>& pluginConfigs)
+    SecretResolver* secretResolver)
   : process(new CSIServerProcess(
         agentUrl,
         rootDir,
+        pluginConfigDir,
         secretGenerator,
-        secretResolver,
-        pluginConfigs))
+        secretResolver))
 {
   process::spawn(CHECK_NOTNULL(process.get()));
 }
@@ -355,73 +502,21 @@ Try<Owned<CSIServer>> CSIServer::create(
         flags.csi_plugin_config_dir.get() + "' does not exist");
   }
 
-  Try<list<string>> entries = os::ls(flags.csi_plugin_config_dir.get());
-  if (entries.isError()) {
-    return Error(
-        "Unable to list the CSI plugin configuration directory '" +
-        flags.csi_plugin_config_dir.get()+ "': " + entries.error());
-  }
-
-  hashmap<std::string, CSIPluginInfo> pluginConfigs;
-
-  foreach (const string& entry, entries.get()) {
-    const string path = path::join(flags.csi_plugin_config_dir.get(), entry);
-
-    // Ignore directory entries.
-    if (os::stat::isdir(path)) {
-      continue;
-    }
-
-    Try<string> read = os::read(path);
-    if (read.isError()) {
-      // In case of an error we log and skip to the next entry.
-      LOG(ERROR) << "Failed to read CSI plugin configuration file '"
-                 << path << "': " << read.error();
-
-      continue;
-    }
-
-    Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
-    if (json.isError()) {
-      return Error("JSON parse failed: " + json.error());
-    }
-
-    Try<CSIPluginInfo> parse = ::protobuf::parse<CSIPluginInfo>(json.get());
-    if (parse.isError()) {
-      return Error("Protobuf parse failed: " + parse.error());
-    }
-
-    const CSIPluginInfo& csiPluginConfig = parse.get();
-    const string& type = csiPluginConfig.type();
-
-    if (pluginConfigs.contains(type)) {
-      LOG(ERROR) << "Multiple configurations for a CSI plugin are not allowed. "
-                 << "Skipping configuration file '" << path << "' since CSI "
-                 << "plugin '" << type << "' already exists";
-      continue;
-    }
-
-    pluginConfigs[type] = csiPluginConfig;
-  }
-
-  if (pluginConfigs.empty()) {
-    return Error(
-        "No valid CSI plugin configurations found in '" +
-        flags.csi_plugin_config_dir.get() + "'");
-  }
-
   return new CSIServer(
       agentUrl,
       slave::paths::getCsiRootDir(flags.work_dir),
+      flags.csi_plugin_config_dir.get(),
       secretGenerator,
-      secretResolver,
-      pluginConfigs);
+      secretResolver);
 }
 
 
-Future<Nothing> CSIServer::start()
+Future<Nothing> CSIServer::start(const SlaveID& agentId)
 {
-  started.associate(process::dispatch(process.get(), &CSIServerProcess::start));
+  started.associate(process::dispatch(
+      process.get(),
+      &CSIServerProcess::start,
+      agentId));
 
   return started.future();
 }
diff --git a/src/slave/csi_server.hpp b/src/slave/csi_server.hpp
index f5ec766..de5c6b6 100644
--- a/src/slave/csi_server.hpp
+++ b/src/slave/csi_server.hpp
@@ -60,7 +60,7 @@ public:
   // Starts the CSI server. Any `publishVolume()` or `unpublishVolume()` calls
   // which were made previously will be executed after this method is called.
   // Returns a future which is satisfied once initialization is complete.
-  process::Future<Nothing> start();
+  process::Future<Nothing> start(const SlaveID& agentId);
 
   // Publish a CSI volume to this agent. If the `start()` method has not yet
   // been called, then the publishing of this volume will not be completed until
@@ -79,10 +79,10 @@ public:
 private:
   CSIServer(
       const process::http::URL& agentUrl,
-      const std::string& csiRootDir,
+      const std::string& rootDir,
+      const std::string& pluginConfigDir,
       SecretGenerator* secretGenerator,
-      SecretResolver* secretResolver,
-      const hashmap<std::string, CSIPluginInfo>& csiPluginConfigs);
+      SecretResolver* secretResolver);
 
   process::Owned<CSIServerProcess> process;